7 local stream_mt = setmetatable({}, { __index = verse.stream_mt }); |
7 local stream_mt = setmetatable({}, { __index = verse.stream_mt }); |
8 stream_mt.__index = stream_mt; |
8 stream_mt.__index = stream_mt; |
9 |
9 |
10 local xmlns_stream = "http://etherx.jabber.org/streams"; |
10 local xmlns_stream = "http://etherx.jabber.org/streams"; |
11 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; |
11 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; |
|
12 |
|
13 local reconnect_timeout = 5; |
12 |
14 |
13 function verse.new_bosh(logger, url) |
15 function verse.new_bosh(logger, url) |
14 local stream = { |
16 local stream = { |
15 bosh_conn_pool = {}; |
17 bosh_conn_pool = {}; |
16 bosh_waiting_requests = {}; |
18 bosh_waiting_requests = {}; |
38 self:debug("Putting into BOSH send buffer: %s", tostring(data)); |
37 self:debug("Putting into BOSH send buffer: %s", tostring(data)); |
39 self.bosh_outgoing_buffer[#self.bosh_outgoing_buffer+1] = st.clone(data); |
38 self.bosh_outgoing_buffer[#self.bosh_outgoing_buffer+1] = st.clone(data); |
40 self:flush(); --TODO: Optimize by doing this on next tick (give a chance for data to buffer) |
39 self:flush(); --TODO: Optimize by doing this on next tick (give a chance for data to buffer) |
41 end |
40 end |
42 |
41 |
43 function stream_mt:flush(force) |
42 function stream_mt:flush() |
44 if self.connected |
43 if self.connected |
45 and #self.bosh_waiting_requests < self.bosh_max_requests |
44 and #self.bosh_waiting_requests < self.bosh_max_requests |
46 and (force or #self.bosh_outgoing_buffer > 0) then |
45 and (#self.bosh_waiting_requests == 0 |
|
46 or #self.bosh_outgoing_buffer > 0 |
|
47 or self.bosh_need_restart) then |
47 self:debug("Flushing..."); |
48 self:debug("Flushing..."); |
48 local payload = self:_make_body(); |
49 local payload = self:_make_body(); |
49 local buffer = self.bosh_outgoing_buffer; |
50 local buffer = self.bosh_outgoing_buffer; |
50 for i, stanza in ipairs(buffer) do |
51 for i, stanza in ipairs(buffer) do |
51 payload:add_child(stanza); |
52 payload:add_child(stanza); |
52 buffer[i] = nil; |
53 buffer[i] = nil; |
53 end |
54 end |
54 local request = http.request(self.bosh_url, { body = tostring(payload) }, self.bosh_response_handler); |
55 self:_make_request(payload); |
|
56 else |
|
57 self:debug("Decided not to flush."); |
|
58 end |
|
59 end |
|
60 |
|
61 function stream_mt:_make_request(payload) |
|
62 local request, err = http.request(self.bosh_url, { body = tostring(payload) }, function (response, code, request) |
|
63 if code ~= 0 then |
|
64 self.inactive_since = nil; |
|
65 return self:_handle_response(response, code, request); |
|
66 end |
|
67 |
|
68 -- Connection issues, we need to retry this request |
|
69 local time = os.time(); |
|
70 if not self.inactive_since then |
|
71 self.inactive_since = time; -- So we know when it is time to give up |
|
72 elseif time - self.inactive_since > self.bosh_max_inactivity then |
|
73 return self:_disconnected(); |
|
74 else |
|
75 self:debug("%d seconds left to reconnect, retrying in %d seconds...", |
|
76 self.bosh_max_inactivity - (time - self.inactive_since), reconnect_timeout); |
|
77 end |
|
78 |
|
79 -- Set up reconnect timer |
|
80 timer.add_task(reconnect_timeout, function () |
|
81 self:debug("Retrying request..."); |
|
82 -- Remove old request |
|
83 for i, waiting_request in ipairs(self.bosh_waiting_requests) do |
|
84 if waiting_request == request then |
|
85 table.remove(self.bosh_waiting_requests, i); |
|
86 break; |
|
87 end |
|
88 end |
|
89 self:_make_request(payload); |
|
90 end); |
|
91 end); |
|
92 if request then |
55 table.insert(self.bosh_waiting_requests, request); |
93 table.insert(self.bosh_waiting_requests, request); |
56 else |
94 else |
57 self:debug("Decided not to flush."); |
95 self:warn("Request failed instantly: %s", err); |
58 end |
96 end |
|
97 end |
|
98 |
|
99 function stream_mt:_disconnected() |
|
100 self.connected = nil; |
|
101 self:event("disconnected"); |
59 end |
102 end |
60 |
103 |
61 function stream_mt:_send_session_request() |
104 function stream_mt:_send_session_request() |
62 local body = self:_make_body(); |
105 local body = self:_make_body(); |
63 |
106 |
70 -- XEP-0206 |
113 -- XEP-0206 |
71 body.attr.from = self.jid; |
114 body.attr.from = self.jid; |
72 body.attr.to = self.host; |
115 body.attr.to = self.host; |
73 body.attr.secure = 'true'; |
116 body.attr.secure = 'true'; |
74 |
117 |
75 http.request(self.bosh_url, { body = tostring(body) }, function (response) |
118 http.request(self.bosh_url, { body = tostring(body) }, function (response, code) |
|
119 if code == 0 then |
|
120 -- Failed to connect |
|
121 return self:_disconnected(); |
|
122 end |
76 -- Handle session creation response |
123 -- Handle session creation response |
77 local payload = self:_parse_response(response) |
124 local payload = self:_parse_response(response) |
78 if not payload then |
125 if not payload then |
79 self:warn("Invalid session creation response"); |
126 self:warn("Invalid session creation response"); |
80 self:event("disconnected"); |
127 self:_disconnected(); |
81 return; |
128 return; |
82 end |
129 end |
83 self.bosh_sid = payload.attr.sid; |
130 self.bosh_sid = payload.attr.sid; -- Session id |
84 self.bosh_wait = tonumber(payload.attr.wait); |
131 self.bosh_wait = tonumber(payload.attr.wait); -- How long the server may hold connections for |
85 self.bosh_hold = tonumber(payload.attr.hold); |
132 self.bosh_hold = tonumber(payload.attr.hold); -- How many connections the server may hold |
86 self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold; |
133 self.bosh_max_inactivity = tonumber(payload.attr.inactivity); -- Max amount of time with no connections |
|
134 self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold; -- Max simultaneous requests we can make |
87 self.connected = true; |
135 self.connected = true; |
88 self:event("connected"); |
136 self:event("connected"); |
89 self:_handle_response_payload(payload); |
137 self:_handle_response_payload(payload); |
90 end); |
138 end); |
91 end |
139 end |
92 |
140 |
93 function stream_mt:_handle_response(response, code, request) |
141 function stream_mt:_handle_response(response, code, request) |
94 if self.bosh_waiting_requests[1] ~= request then |
142 if self.bosh_waiting_requests[1] ~= request then |
95 self:warn("Server replied to request that wasn't the oldest"); |
143 self:warn("Server replied to request that wasn't the oldest"); |
|
144 for i, waiting_request in ipairs(self.bosh_waiting_requests) do |
|
145 if waiting_request == request then |
|
146 self.bosh_waiting_requests[i] = nil; |
|
147 break; |
|
148 end |
|
149 end |
96 else |
150 else |
97 table.remove(self.bosh_waiting_requests, 1); |
151 table.remove(self.bosh_waiting_requests, 1); |
98 end |
152 end |
99 local payload = self:_parse_response(response); |
153 local payload = self:_parse_response(response); |
100 if payload then |
154 if payload then |
101 self:_handle_response_payload(payload); |
155 self:_handle_response_payload(payload); |
102 end |
156 end |
103 if #self.bosh_waiting_requests == 0 then |
157 self:flush(); |
104 self:debug("We have no requests open, so forcing flush..."); |
|
105 self:flush(true); |
|
106 else |
|
107 self:debug("We have %d requests open, so no need to force a flush", #self.bosh_waiting_requests); |
|
108 end |
|
109 end |
158 end |
110 |
159 |
111 function stream_mt:_handle_response_payload(payload) |
160 function stream_mt:_handle_response_payload(payload) |
112 for stanza in payload:childtags() do |
161 for stanza in payload:childtags() do |
113 if stanza.attr.xmlns == xmlns_stream then |
162 if stanza.attr.xmlns == xmlns_stream then |
116 self:event("stream/"..stanza.attr.xmlns, stanza); |
165 self:event("stream/"..stanza.attr.xmlns, stanza); |
117 else |
166 else |
118 self:event("stanza", stanza); |
167 self:event("stanza", stanza); |
119 end |
168 end |
120 end |
169 end |
|
170 if payload.attr.type == "terminate" then |
|
171 self:_disconnected({reason = payload.attr.condition}); |
|
172 end |
121 end |
173 end |
122 |
174 |
123 local stream_callbacks = { |
175 local stream_callbacks = { |
124 stream_ns = "http://jabber.org/protocol/httpbind", stream_tag = "body", |
176 stream_ns = "http://jabber.org/protocol/httpbind", stream_tag = "body", |
125 default_ns = "jabber:client", |
177 default_ns = "jabber:client", |
126 streamopened = function (session, attr) session.notopen = nil; session.payload = verse.stanza("body", attr); return true; end; |
178 streamopened = function (session, attr) session.notopen = nil; session.payload = verse.stanza("body", attr); return true; end; |
127 handlestanza = function (session, stanza) session.payload:add_child(stanza); end; |
179 handlestanza = function (session, stanza) session.payload:add_child(stanza); end; |
128 }; |
180 }; |
129 function stream_mt:_parse_response(response) |
181 function stream_mt:_parse_response(response) |
130 self:debug("Parsing response: %s", response); |
182 self:debug("Parsing response: %s", response); |
|
183 if response == nil then |
|
184 self:debug("%s", debug.traceback()); |
|
185 self:_disconnected(); |
|
186 return; |
|
187 end |
131 local session = { notopen = true, log = self.log }; |
188 local session = { notopen = true, log = self.log }; |
132 local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1"); |
189 local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1"); |
133 parser:parse(response); |
190 parser:parse(response); |
134 return session.payload; |
191 return session.payload; |
135 end |
192 end |