plugins/mod_bosh.lua

changeset 3043
1fadbb2e3ca0
parent 2961
db3c0ecce3f4
parent 3042
b1961f6c9853
child 3070
3238b58fd118
equal deleted inserted replaced
3009:06f7d8054065 3043:1fadbb2e3ca0
61 function on_destroy_request(request) 61 function on_destroy_request(request)
62 waiting_requests[request] = nil; 62 waiting_requests[request] = nil;
63 local session = sessions[request.sid]; 63 local session = sessions[request.sid];
64 if session then 64 if session then
65 local requests = session.requests; 65 local requests = session.requests;
66 for i,r in pairs(requests) do 66 for i,r in ipairs(requests) do
67 if r == request then requests[i] = nil; break; end 67 if r == request then
68 t_remove(requests, i);
69 break;
70 end
68 end 71 end
69 72
70 -- If this session now has no requests open, mark it as inactive 73 -- If this session now has no requests open, mark it as inactive
71 if #requests == 0 and session.bosh_max_inactive and not inactive_sessions[session] then 74 if #requests == 0 and session.bosh_max_inactive and not inactive_sessions[session] then
72 inactive_sessions[session] = os_time(); 75 inactive_sessions[session] = os_time();
88 return; 91 return;
89 end 92 end
90 --log("debug", "Handling new request %s: %s\n----------", request.id, tostring(body)); 93 --log("debug", "Handling new request %s: %s\n----------", request.id, tostring(body));
91 request.notopen = true; 94 request.notopen = true;
92 request.log = log; 95 request.log = log;
96 request.on_destroy = on_destroy_request;
97
93 local parser = lxp.new(init_xmlhandlers(request, stream_callbacks), "\1"); 98 local parser = lxp.new(init_xmlhandlers(request, stream_callbacks), "\1");
94 99
95 parser:parse(body); 100 parser:parse(body);
96 101
97 local session = sessions[request.sid]; 102 local session = sessions[request.sid];
116 local resp = t_concat(session.send_buffer); 121 local resp = t_concat(session.send_buffer);
117 session.send_buffer = {}; 122 session.send_buffer = {};
118 session.send(resp); 123 session.send(resp);
119 end 124 end
120 125
121 if not request.destroyed and session.bosh_wait then 126 if not request.destroyed then
122 request.reply_before = os_time() + session.bosh_wait; 127 -- We're keeping this request open, to respond later
123 request.on_destroy = on_destroy_request; 128 log("debug", "Have nothing to say, so leaving request unanswered for now");
124 waiting_requests[request] = true; 129 if session.bosh_wait then
125 end 130 request.reply_before = os_time() + session.bosh_wait;
126 131 waiting_requests[request] = true;
127 log("debug", "Have nothing to say, so leaving request unanswered for now"); 132 end
128 return true; 133 if inactive_sessions[session] then
134 -- Session was marked as inactive, since we have
135 -- a request open now, unmark it
136 inactive_sessions[session] = nil;
137 end
138 end
139
140 return true; -- Inform httpserver we shall reply later
129 end 141 end
130 end 142 end
131 143
132 144
133 local function bosh_reset_stream(session) session.notopen = true; end 145 local function bosh_reset_stream(session) session.notopen = true; end
160 return; 172 return;
161 end 173 end
162 174
163 -- New session 175 -- New session
164 sid = new_uuid(); 176 sid = new_uuid();
165 local session = { type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid), host = attr.to, bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid, 177 local session = { type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to, bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid,
166 bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, 178 bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY,
167 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, close = bosh_close_stream, 179 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, close = bosh_close_stream,
168 dispatch_stanza = core_process_stanza, log = logger.init("bosh"..sid), secure = request.secure }; 180 dispatch_stanza = core_process_stanza, log = logger.init("bosh"..sid), secure = request.secure };
169 sessions[sid] = session; 181 sessions[sid] = session;
170 182
172 local r, send_buffer = session.requests, session.send_buffer; 184 local r, send_buffer = session.requests, session.send_buffer;
173 local response = { headers = default_headers } 185 local response = { headers = default_headers }
174 function session.send(s) 186 function session.send(s)
175 --log("debug", "Sending BOSH data: %s", tostring(s)); 187 --log("debug", "Sending BOSH data: %s", tostring(s));
176 local oldest_request = r[1]; 188 local oldest_request = r[1];
177 while oldest_request and oldest_request.destroyed do
178 t_remove(r, 1);
179 waiting_requests[oldest_request] = nil;
180 oldest_request = r[1];
181 end
182 if oldest_request then 189 if oldest_request then
183 log("debug", "We have an open request, so sending on that"); 190 log("debug", "We have an open request, so sending on that");
184 response.body = t_concat{"<body xmlns='http://jabber.org/protocol/httpbind' sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", tostring(s), "</body>" }; 191 response.body = t_concat{"<body xmlns='http://jabber.org/protocol/httpbind' sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", tostring(s), "</body>" };
185 oldest_request:send(response); 192 oldest_request:send(response);
186 --log("debug", "Sent"); 193 --log("debug", "Sent");
191 t_remove(r, 1); 198 t_remove(r, 1);
192 end 199 end
193 else 200 else
194 log("debug", "Destroying the request now..."); 201 log("debug", "Destroying the request now...");
195 oldest_request:destroy(); 202 oldest_request:destroy();
196 t_remove(r, 1);
197 end 203 end
198 elseif s ~= "" then 204 elseif s ~= "" then
199 log("debug", "Saved to send buffer because there are %d open requests", #r); 205 log("debug", "Saved to send buffer because there are %d open requests", #r);
200 -- Hmm, no requests are open :( 206 -- Hmm, no requests are open :(
201 t_insert(session.send_buffer, tostring(s)); 207 t_insert(session.send_buffer, tostring(s));
233 local diff = rid - session.rid; 239 local diff = rid - session.rid;
234 if diff > 1 then 240 if diff > 1 then
235 session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid); 241 session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid);
236 elseif diff <= 0 then 242 elseif diff <= 0 then
237 -- Repeated, ignore 243 -- Repeated, ignore
238 session.log("debug", "rid repeated (on request %s), ignoring: %d", request.id, session.rid); 244 session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff);
239 request.notopen = nil; 245 request.notopen = nil;
246 request.sid = sid;
240 t_insert(session.requests, request); 247 t_insert(session.requests, request);
241 return; 248 return;
242 end 249 end
243 session.rid = rid; 250 session.rid = rid;
244 end 251 end
246 if attr.type == "terminate" then 253 if attr.type == "terminate" then
247 -- Client wants to end this session 254 -- Client wants to end this session
248 session:close(); 255 session:close();
249 request.notopen = nil; 256 request.notopen = nil;
250 return; 257 return;
251 end
252
253 -- If session was inactive, make sure it is now marked as not
254 if #session.requests == 0 then
255 (session.log or log)("debug", "BOSH client now active again at %d", os_time());
256 inactive_sessions[session] = nil;
257 end 258 end
258 259
259 if session.notopen then 260 if session.notopen then
260 local features = st.stanza("stream:features"); 261 local features = st.stanza("stream:features");
261 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); 262 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });

mercurial