61 function on_destroy_request(request) |
61 function on_destroy_request(request) |
62 waiting_requests[request] = nil; |
62 waiting_requests[request] = nil; |
63 local session = sessions[request.sid]; |
63 local session = sessions[request.sid]; |
64 if session then |
64 if session then |
65 local requests = session.requests; |
65 local requests = session.requests; |
66 for i,r in pairs(requests) do |
66 for i,r in ipairs(requests) do |
67 if r == request then requests[i] = nil; break; end |
67 if r == request then |
|
68 t_remove(requests, i); |
|
69 break; |
|
70 end |
68 end |
71 end |
69 |
72 |
70 -- If this session now has no requests open, mark it as inactive |
73 -- If this session now has no requests open, mark it as inactive |
71 if #requests == 0 and session.bosh_max_inactive and not inactive_sessions[session] then |
74 if #requests == 0 and session.bosh_max_inactive and not inactive_sessions[session] then |
72 inactive_sessions[session] = os_time(); |
75 inactive_sessions[session] = os_time(); |
88 return; |
91 return; |
89 end |
92 end |
90 --log("debug", "Handling new request %s: %s\n----------", request.id, tostring(body)); |
93 --log("debug", "Handling new request %s: %s\n----------", request.id, tostring(body)); |
91 request.notopen = true; |
94 request.notopen = true; |
92 request.log = log; |
95 request.log = log; |
|
96 request.on_destroy = on_destroy_request; |
|
97 |
93 local parser = lxp.new(init_xmlhandlers(request, stream_callbacks), "\1"); |
98 local parser = lxp.new(init_xmlhandlers(request, stream_callbacks), "\1"); |
94 |
99 |
95 parser:parse(body); |
100 parser:parse(body); |
96 |
101 |
97 local session = sessions[request.sid]; |
102 local session = sessions[request.sid]; |
116 local resp = t_concat(session.send_buffer); |
121 local resp = t_concat(session.send_buffer); |
117 session.send_buffer = {}; |
122 session.send_buffer = {}; |
118 session.send(resp); |
123 session.send(resp); |
119 end |
124 end |
120 |
125 |
121 if not request.destroyed and session.bosh_wait then |
126 if not request.destroyed then |
122 request.reply_before = os_time() + session.bosh_wait; |
127 -- We're keeping this request open, to respond later |
123 request.on_destroy = on_destroy_request; |
128 log("debug", "Have nothing to say, so leaving request unanswered for now"); |
124 waiting_requests[request] = true; |
129 if session.bosh_wait then |
125 end |
130 request.reply_before = os_time() + session.bosh_wait; |
126 |
131 waiting_requests[request] = true; |
127 log("debug", "Have nothing to say, so leaving request unanswered for now"); |
132 end |
128 return true; |
133 if inactive_sessions[session] then |
|
134 -- Session was marked as inactive, since we have |
|
135 -- a request open now, unmark it |
|
136 inactive_sessions[session] = nil; |
|
137 end |
|
138 end |
|
139 |
|
140 return true; -- Inform httpserver we shall reply later |
129 end |
141 end |
130 end |
142 end |
131 |
143 |
132 |
144 |
133 local function bosh_reset_stream(session) session.notopen = true; end |
145 local function bosh_reset_stream(session) session.notopen = true; end |
160 return; |
172 return; |
161 end |
173 end |
162 |
174 |
163 -- New session |
175 -- New session |
164 sid = new_uuid(); |
176 sid = new_uuid(); |
165 local session = { type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid), host = attr.to, bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid, |
177 local session = { type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to, bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid, |
166 bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, |
178 bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, |
167 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, close = bosh_close_stream, |
179 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, close = bosh_close_stream, |
168 dispatch_stanza = core_process_stanza, log = logger.init("bosh"..sid), secure = request.secure }; |
180 dispatch_stanza = core_process_stanza, log = logger.init("bosh"..sid), secure = request.secure }; |
169 sessions[sid] = session; |
181 sessions[sid] = session; |
170 |
182 |
172 local r, send_buffer = session.requests, session.send_buffer; |
184 local r, send_buffer = session.requests, session.send_buffer; |
173 local response = { headers = default_headers } |
185 local response = { headers = default_headers } |
174 function session.send(s) |
186 function session.send(s) |
175 --log("debug", "Sending BOSH data: %s", tostring(s)); |
187 --log("debug", "Sending BOSH data: %s", tostring(s)); |
176 local oldest_request = r[1]; |
188 local oldest_request = r[1]; |
177 while oldest_request and oldest_request.destroyed do |
|
178 t_remove(r, 1); |
|
179 waiting_requests[oldest_request] = nil; |
|
180 oldest_request = r[1]; |
|
181 end |
|
182 if oldest_request then |
189 if oldest_request then |
183 log("debug", "We have an open request, so sending on that"); |
190 log("debug", "We have an open request, so sending on that"); |
184 response.body = t_concat{"<body xmlns='http://jabber.org/protocol/httpbind' sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", tostring(s), "</body>" }; |
191 response.body = t_concat{"<body xmlns='http://jabber.org/protocol/httpbind' sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", tostring(s), "</body>" }; |
185 oldest_request:send(response); |
192 oldest_request:send(response); |
186 --log("debug", "Sent"); |
193 --log("debug", "Sent"); |
191 t_remove(r, 1); |
198 t_remove(r, 1); |
192 end |
199 end |
193 else |
200 else |
194 log("debug", "Destroying the request now..."); |
201 log("debug", "Destroying the request now..."); |
195 oldest_request:destroy(); |
202 oldest_request:destroy(); |
196 t_remove(r, 1); |
|
197 end |
203 end |
198 elseif s ~= "" then |
204 elseif s ~= "" then |
199 log("debug", "Saved to send buffer because there are %d open requests", #r); |
205 log("debug", "Saved to send buffer because there are %d open requests", #r); |
200 -- Hmm, no requests are open :( |
206 -- Hmm, no requests are open :( |
201 t_insert(session.send_buffer, tostring(s)); |
207 t_insert(session.send_buffer, tostring(s)); |
233 local diff = rid - session.rid; |
239 local diff = rid - session.rid; |
234 if diff > 1 then |
240 if diff > 1 then |
235 session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid); |
241 session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid); |
236 elseif diff <= 0 then |
242 elseif diff <= 0 then |
237 -- Repeated, ignore |
243 -- Repeated, ignore |
238 session.log("debug", "rid repeated (on request %s), ignoring: %d", request.id, session.rid); |
244 session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff); |
239 request.notopen = nil; |
245 request.notopen = nil; |
|
246 request.sid = sid; |
240 t_insert(session.requests, request); |
247 t_insert(session.requests, request); |
241 return; |
248 return; |
242 end |
249 end |
243 session.rid = rid; |
250 session.rid = rid; |
244 end |
251 end |
246 if attr.type == "terminate" then |
253 if attr.type == "terminate" then |
247 -- Client wants to end this session |
254 -- Client wants to end this session |
248 session:close(); |
255 session:close(); |
249 request.notopen = nil; |
256 request.notopen = nil; |
250 return; |
257 return; |
251 end |
|
252 |
|
253 -- If session was inactive, make sure it is now marked as not |
|
254 if #session.requests == 0 then |
|
255 (session.log or log)("debug", "BOSH client now active again at %d", os_time()); |
|
256 inactive_sessions[session] = nil; |
|
257 end |
258 end |
258 |
259 |
259 if session.notopen then |
260 if session.notopen then |
260 local features = st.stanza("stream:features"); |
261 local features = st.stanza("stream:features"); |
261 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); |
262 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); |