1 --[[ |
|
2 LuaEvent - Copyright (C) 2007 Thomas Harning <harningt@gmail.com> |
|
3 Licensed as LGPL - See doc/COPYING for details. |
|
4 ]] |
|
5 module("luaevent", package.seeall) |
|
6 require("luaevent.core") |
|
7 |
|
8 local EV_READ = luaevent.core.EV_READ |
|
9 local EV_WRITE = luaevent.core.EV_WRITE |
|
10 local fair = false -- Not recommended for most cases... |
|
11 local base = luaevent.core.new() |
|
12 local sockMap = setmetatable({}, {'__mode', 'kv'}) |
|
13 local function addevent(sock, ...) |
|
14 local item = base:addevent(sock, ...) |
|
15 if not item then print("FAILED TO SETUP ITEM") return item end |
|
16 local fd = sock:getfd() |
|
17 sockMap[item] = fd |
|
18 print("SETUP ITEM FOR: ", fd) |
|
19 if not hookedObjectMt then |
|
20 hookedObjectMt = true |
|
21 --[[ |
|
22 local mt = debug.getmetatable(item) |
|
23 local oldGC = mt.__gc |
|
24 mt.__gc = function(...) |
|
25 print("RELEASING ITEM FOR: ", sockMap[(...)]) |
|
26 return oldGC(...) |
|
27 end]] |
|
28 end |
|
29 return item |
|
30 end |
|
31 -- Weak keys.. the keys are the client sockets |
|
32 local clientTable = setmetatable({}, {'__mode', 'kv'}) |
|
33 |
|
34 local function getWrapper() |
|
35 local running = coroutine.running() |
|
36 return function(...) |
|
37 if coroutine.running() == running then return end |
|
38 return select(2, coroutine.resume(running, ...)) |
|
39 end |
|
40 end |
|
41 |
|
42 function send(sock, data, start, stop) |
|
43 local s, err |
|
44 local from = start or 1 |
|
45 local sent = 0 |
|
46 repeat |
|
47 from = from + sent |
|
48 s, err, sent = sock:send(data, from, stop) |
|
49 -- Add extra coro swap for fairness |
|
50 -- CURRENTLY DISABLED FOR TESTING...... |
|
51 if fair and math.random(100) > 90 then |
|
52 if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end |
|
53 coroutine.yield(EV_WRITE) |
|
54 end |
|
55 if s or err ~= "timeout" then return s, err, sent end |
|
56 if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end |
|
57 coroutine.yield(EV_WRITE) |
|
58 until false |
|
59 end |
|
60 function receive(sock, pattern, part) |
|
61 local s, err |
|
62 pattern = pattern or '*l' |
|
63 repeat |
|
64 s, err, part = sock:receive(pattern, part) |
|
65 if s or err ~= "timeout" then return s, err, part end |
|
66 if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_READ, getWrapper()) end |
|
67 coroutine.yield(EV_READ) |
|
68 until false |
|
69 end |
|
70 -- same as above but with special treatment when reading chunks, |
|
71 -- unblocks on any data received. |
|
72 function receivePartial(client, pattern) |
|
73 local s, err, part |
|
74 pattern = pattern or "*l" |
|
75 repeat |
|
76 s, err, part = client:receive(pattern) |
|
77 if s or ( (type(pattern)=="number") and part~="" and part ~=nil ) or |
|
78 err ~= "timeout" then return s, err, part end |
|
79 if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_READ, getWrapper()) end |
|
80 coroutine.yield(EV_READ) |
|
81 until false |
|
82 end |
|
83 function connect(sock, ...) |
|
84 sock:settimeout(0) |
|
85 local ret, err = sock:connect(...) |
|
86 if ret or err ~= "timeout" then return ret, err end |
|
87 if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end |
|
88 coroutine.yield(EV_WRITE) |
|
89 ret, err = sock:connect(...) |
|
90 if err == "already connected" then |
|
91 return 1 |
|
92 end |
|
93 return ret, err |
|
94 end |
|
95 -- Deprecated.. |
|
96 function flush(sock) |
|
97 end |
|
98 local function clientCoroutine(sock, handler) |
|
99 -- Figure out what to do ...... |
|
100 return handler(sock) |
|
101 end |
|
102 local function handleClient(co, client, handler) |
|
103 local ok, res, event = coroutine.resume(co, client, handler) |
|
104 end |
|
105 local function serverCoroutine(sock, callback) |
|
106 local listenItem = addevent(sock, EV_READ, getWrapper()) |
|
107 repeat |
|
108 local event = coroutine.yield(EV_READ) |
|
109 -- Get new socket |
|
110 local client = sock:accept() |
|
111 if client then |
|
112 client:settimeout(0) |
|
113 local co = coroutine.create(clientCoroutine) |
|
114 handleClient(co, client, callback) |
|
115 end |
|
116 until false |
|
117 end |
|
118 function addserver(sock, callback) |
|
119 local coro = coroutine.create(serverCoroutine) |
|
120 assert(coroutine.resume(coro, sock, callback)) |
|
121 end |
|
122 function addthread(func, ...) |
|
123 return coroutine.resume(coroutine.create(func), ...) |
|
124 end |
|
125 local _skt_mt = {__index = { |
|
126 connect = function(self, ...) |
|
127 return connect(self.socket, ...) |
|
128 end, |
|
129 send = function (self, data) |
|
130 return send(self.socket, data) |
|
131 end, |
|
132 |
|
133 receive = function (self, pattern) |
|
134 if (self.timeout==0) then |
|
135 return receivePartial(self.socket, pattern) |
|
136 end |
|
137 return receive(self.socket, pattern) |
|
138 end, |
|
139 |
|
140 flush = function (self) |
|
141 return flush(self.socket) |
|
142 end, |
|
143 |
|
144 settimeout = function (self,time) |
|
145 self.timeout=time |
|
146 return |
|
147 end, |
|
148 |
|
149 close = function(self) |
|
150 clientTable[self.socket]:close() |
|
151 self.socket:close() |
|
152 end |
|
153 }} |
|
154 function wrap(sock) |
|
155 return setmetatable({socket = sock}, _skt_mt) |
|
156 end |
|
157 loop = function(...) base:loop(...) end |
|