bosh.lua

changeset 93
2442e751f3cb
parent 89
1752a9097e6b
child 161
b177bcea2006
equal deleted inserted replaced
92:dcccef33f0eb 93:2442e751f3cb
7 local stream_mt = setmetatable({}, { __index = verse.stream_mt }); 7 local stream_mt = setmetatable({}, { __index = verse.stream_mt });
8 stream_mt.__index = stream_mt; 8 stream_mt.__index = stream_mt;
9 9
10 local xmlns_stream = "http://etherx.jabber.org/streams"; 10 local xmlns_stream = "http://etherx.jabber.org/streams";
11 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; 11 local xmlns_bosh = "http://jabber.org/protocol/httpbind";
12
13 local reconnect_timeout = 5;
12 14
13 function verse.new_bosh(logger, url) 15 function verse.new_bosh(logger, url)
14 local stream = { 16 local stream = {
15 bosh_conn_pool = {}; 17 bosh_conn_pool = {};
16 bosh_waiting_requests = {}; 18 bosh_waiting_requests = {};
19 bosh_url = url; 21 bosh_url = url;
20 conn = {}; 22 conn = {};
21 }; 23 };
22 function stream:reopen() 24 function stream:reopen()
23 self.bosh_need_restart = true; 25 self.bosh_need_restart = true;
24 self:flush(true); 26 self:flush();
25 end
26 function stream.bosh_response_handler(response, code, request)
27 return stream:_handle_response(response, code, request);
28 end 27 end
29 local conn = verse.new(logger, stream); 28 local conn = verse.new(logger, stream);
30 return setmetatable(conn, stream_mt); 29 return setmetatable(conn, stream_mt);
31 end 30 end
32 31
38 self:debug("Putting into BOSH send buffer: %s", tostring(data)); 37 self:debug("Putting into BOSH send buffer: %s", tostring(data));
39 self.bosh_outgoing_buffer[#self.bosh_outgoing_buffer+1] = st.clone(data); 38 self.bosh_outgoing_buffer[#self.bosh_outgoing_buffer+1] = st.clone(data);
40 self:flush(); --TODO: Optimize by doing this on next tick (give a chance for data to buffer) 39 self:flush(); --TODO: Optimize by doing this on next tick (give a chance for data to buffer)
41 end 40 end
42 41
43 function stream_mt:flush(force) 42 function stream_mt:flush()
44 if self.connected 43 if self.connected
45 and #self.bosh_waiting_requests < self.bosh_max_requests 44 and #self.bosh_waiting_requests < self.bosh_max_requests
46 and (force or #self.bosh_outgoing_buffer > 0) then 45 and (#self.bosh_waiting_requests == 0
46 or #self.bosh_outgoing_buffer > 0
47 or self.bosh_need_restart) then
47 self:debug("Flushing..."); 48 self:debug("Flushing...");
48 local payload = self:_make_body(); 49 local payload = self:_make_body();
49 local buffer = self.bosh_outgoing_buffer; 50 local buffer = self.bosh_outgoing_buffer;
50 for i, stanza in ipairs(buffer) do 51 for i, stanza in ipairs(buffer) do
51 payload:add_child(stanza); 52 payload:add_child(stanza);
52 buffer[i] = nil; 53 buffer[i] = nil;
53 end 54 end
54 local request = http.request(self.bosh_url, { body = tostring(payload) }, self.bosh_response_handler); 55 self:_make_request(payload);
56 else
57 self:debug("Decided not to flush.");
58 end
59 end
60
61 function stream_mt:_make_request(payload)
62 local request, err = http.request(self.bosh_url, { body = tostring(payload) }, function (response, code, request)
63 if code ~= 0 then
64 self.inactive_since = nil;
65 return self:_handle_response(response, code, request);
66 end
67
68 -- Connection issues, we need to retry this request
69 local time = os.time();
70 if not self.inactive_since then
71 self.inactive_since = time; -- So we know when it is time to give up
72 elseif time - self.inactive_since > self.bosh_max_inactivity then
73 return self:_disconnected();
74 else
75 self:debug("%d seconds left to reconnect, retrying in %d seconds...",
76 self.bosh_max_inactivity - (time - self.inactive_since), reconnect_timeout);
77 end
78
79 -- Set up reconnect timer
80 timer.add_task(reconnect_timeout, function ()
81 self:debug("Retrying request...");
82 -- Remove old request
83 for i, waiting_request in ipairs(self.bosh_waiting_requests) do
84 if waiting_request == request then
85 table.remove(self.bosh_waiting_requests, i);
86 break;
87 end
88 end
89 self:_make_request(payload);
90 end);
91 end);
92 if request then
55 table.insert(self.bosh_waiting_requests, request); 93 table.insert(self.bosh_waiting_requests, request);
56 else 94 else
57 self:debug("Decided not to flush."); 95 self:warn("Request failed instantly: %s", err);
58 end 96 end
97 end
98
99 function stream_mt:_disconnected()
100 self.connected = nil;
101 self:event("disconnected");
59 end 102 end
60 103
61 function stream_mt:_send_session_request() 104 function stream_mt:_send_session_request()
62 local body = self:_make_body(); 105 local body = self:_make_body();
63 106
70 -- XEP-0206 113 -- XEP-0206
71 body.attr.from = self.jid; 114 body.attr.from = self.jid;
72 body.attr.to = self.host; 115 body.attr.to = self.host;
73 body.attr.secure = 'true'; 116 body.attr.secure = 'true';
74 117
75 http.request(self.bosh_url, { body = tostring(body) }, function (response) 118 http.request(self.bosh_url, { body = tostring(body) }, function (response, code)
119 if code == 0 then
120 -- Failed to connect
121 return self:_disconnected();
122 end
76 -- Handle session creation response 123 -- Handle session creation response
77 local payload = self:_parse_response(response) 124 local payload = self:_parse_response(response)
78 if not payload then 125 if not payload then
79 self:warn("Invalid session creation response"); 126 self:warn("Invalid session creation response");
80 self:event("disconnected"); 127 self:_disconnected();
81 return; 128 return;
82 end 129 end
83 self.bosh_sid = payload.attr.sid; 130 self.bosh_sid = payload.attr.sid; -- Session id
84 self.bosh_wait = tonumber(payload.attr.wait); 131 self.bosh_wait = tonumber(payload.attr.wait); -- How long the server may hold connections for
85 self.bosh_hold = tonumber(payload.attr.hold); 132 self.bosh_hold = tonumber(payload.attr.hold); -- How many connections the server may hold
86 self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold; 133 self.bosh_max_inactivity = tonumber(payload.attr.inactivity); -- Max amount of time with no connections
134 self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold; -- Max simultaneous requests we can make
87 self.connected = true; 135 self.connected = true;
88 self:event("connected"); 136 self:event("connected");
89 self:_handle_response_payload(payload); 137 self:_handle_response_payload(payload);
90 end); 138 end);
91 end 139 end
92 140
93 function stream_mt:_handle_response(response, code, request) 141 function stream_mt:_handle_response(response, code, request)
94 if self.bosh_waiting_requests[1] ~= request then 142 if self.bosh_waiting_requests[1] ~= request then
95 self:warn("Server replied to request that wasn't the oldest"); 143 self:warn("Server replied to request that wasn't the oldest");
144 for i, waiting_request in ipairs(self.bosh_waiting_requests) do
145 if waiting_request == request then
146 self.bosh_waiting_requests[i] = nil;
147 break;
148 end
149 end
96 else 150 else
97 table.remove(self.bosh_waiting_requests, 1); 151 table.remove(self.bosh_waiting_requests, 1);
98 end 152 end
99 local payload = self:_parse_response(response); 153 local payload = self:_parse_response(response);
100 if payload then 154 if payload then
101 self:_handle_response_payload(payload); 155 self:_handle_response_payload(payload);
102 end 156 end
103 if #self.bosh_waiting_requests == 0 then 157 self:flush();
104 self:debug("We have no requests open, so forcing flush...");
105 self:flush(true);
106 else
107 self:debug("We have %d requests open, so no need to force a flush", #self.bosh_waiting_requests);
108 end
109 end 158 end
110 159
111 function stream_mt:_handle_response_payload(payload) 160 function stream_mt:_handle_response_payload(payload)
112 for stanza in payload:childtags() do 161 for stanza in payload:childtags() do
113 if stanza.attr.xmlns == xmlns_stream then 162 if stanza.attr.xmlns == xmlns_stream then
116 self:event("stream/"..stanza.attr.xmlns, stanza); 165 self:event("stream/"..stanza.attr.xmlns, stanza);
117 else 166 else
118 self:event("stanza", stanza); 167 self:event("stanza", stanza);
119 end 168 end
120 end 169 end
170 if payload.attr.type == "terminate" then
171 self:_disconnected({reason = payload.attr.condition});
172 end
121 end 173 end
122 174
123 local stream_callbacks = { 175 local stream_callbacks = {
124 stream_ns = "http://jabber.org/protocol/httpbind", stream_tag = "body", 176 stream_ns = "http://jabber.org/protocol/httpbind", stream_tag = "body",
125 default_ns = "jabber:client", 177 default_ns = "jabber:client",
126 streamopened = function (session, attr) session.notopen = nil; session.payload = verse.stanza("body", attr); return true; end; 178 streamopened = function (session, attr) session.notopen = nil; session.payload = verse.stanza("body", attr); return true; end;
127 handlestanza = function (session, stanza) session.payload:add_child(stanza); end; 179 handlestanza = function (session, stanza) session.payload:add_child(stanza); end;
128 }; 180 };
129 function stream_mt:_parse_response(response) 181 function stream_mt:_parse_response(response)
130 self:debug("Parsing response: %s", response); 182 self:debug("Parsing response: %s", response);
183 if response == nil then
184 self:debug("%s", debug.traceback());
185 self:_disconnected();
186 return;
187 end
131 local session = { notopen = true, log = self.log }; 188 local session = { notopen = true, log = self.log };
132 local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1"); 189 local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1");
133 parser:parse(response); 190 parser:parse(response);
134 return session.payload; 191 return session.payload;
135 end 192 end

mercurial