|
1 |
|
2 local init_xmlhandlers = require "core.xmlhandlers"; |
|
3 local st = require "util.stanza"; |
|
4 |
|
5 local stream_mt = setmetatable({}, { __index = verse.stream_mt }); |
|
6 stream_mt.__index = stream_mt; |
|
7 |
|
8 local xmlns_stream = "http://etherx.jabber.org/streams"; |
|
9 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; |
|
10 |
|
11 function verse.new_bosh(logger, url) |
|
12 local stream = { |
|
13 bosh_conn_pool = {}; |
|
14 bosh_waiting_requests = {}; |
|
15 bosh_rid = math.random(1,999999); |
|
16 bosh_outgoing_buffer = {}; |
|
17 bosh_url = url; |
|
18 conn = {}; |
|
19 }; |
|
20 function stream:reopen() |
|
21 self.bosh_need_restart = true; |
|
22 self:flush(true); |
|
23 end |
|
24 function stream.bosh_response_handler(response, code, request) |
|
25 return stream:_handle_response(response, code, request); |
|
26 end |
|
27 local conn = verse.new(logger, stream); |
|
28 conn:add_plugin("http"); |
|
29 return setmetatable(conn, stream_mt); |
|
30 end |
|
31 |
|
32 function stream_mt:connect() |
|
33 self:_send_session_request(); |
|
34 end |
|
35 |
|
36 function stream_mt:send(data) |
|
37 self:debug("Putting into BOSH send buffer: %s", tostring(data)); |
|
38 self.bosh_outgoing_buffer[#self.bosh_outgoing_buffer+1] = st.clone(data); |
|
39 self:flush(); --TODO: Optimize by doing this on next tick (give a chance for data to buffer) |
|
40 end |
|
41 |
|
42 function stream_mt:flush(force) |
|
43 if self.connected |
|
44 and #self.bosh_waiting_requests < self.bosh_max_requests |
|
45 and (force or #self.bosh_outgoing_buffer > 0) then |
|
46 self:debug("Flushing..."); |
|
47 local payload = self:_make_body(); |
|
48 local buffer = self.bosh_outgoing_buffer; |
|
49 for i, stanza in ipairs(buffer) do |
|
50 payload:add_child(stanza); |
|
51 buffer[i] = nil; |
|
52 end |
|
53 local request = self.http.request(self.bosh_url, { body = tostring(payload) }, self.bosh_response_handler); |
|
54 table.insert(self.bosh_waiting_requests, request); |
|
55 else |
|
56 self:debug("Decided not to flush."); |
|
57 end |
|
58 end |
|
59 |
|
60 function stream_mt:_send_session_request() |
|
61 local body = self:_make_body(); |
|
62 |
|
63 -- XEP-0124 |
|
64 body.attr.hold = "1"; |
|
65 body.attr.wait = "60"; |
|
66 body.attr["xml:lang"] = "en"; |
|
67 body.attr.ver = "1.6"; |
|
68 |
|
69 -- XEP-0206 |
|
70 body.attr.from = self.jid; |
|
71 body.attr.to = self.host; |
|
72 body.attr.secure = 'true'; |
|
73 |
|
74 self.http.request(self.bosh_url, { body = tostring(body) }, function (response) |
|
75 -- Handle session creation response |
|
76 local payload = self:_parse_response(response) |
|
77 if not payload then |
|
78 self:warn("Invalid session creation response"); |
|
79 self:event("disconnected"); |
|
80 return; |
|
81 end |
|
82 self.bosh_sid = payload.attr.sid; |
|
83 self.bosh_wait = tonumber(payload.attr.wait); |
|
84 self.bosh_hold = tonumber(payload.attr.hold); |
|
85 self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold; |
|
86 self.connected = true; |
|
87 self:event("connected"); |
|
88 self:_handle_response_payload(payload); |
|
89 end); |
|
90 end |
|
91 |
|
92 function stream_mt:_handle_response(response, code, request) |
|
93 if self.bosh_waiting_requests[1] ~= request then |
|
94 self:warn("Server replied to request that wasn't the oldest"); |
|
95 else |
|
96 table.remove(self.bosh_waiting_requests, 1); |
|
97 end |
|
98 local payload = self:_parse_response(response); |
|
99 if payload then |
|
100 self:_handle_response_payload(payload); |
|
101 end |
|
102 if #self.bosh_waiting_requests == 0 then |
|
103 self:debug("We have no requests open, so forcing flush..."); |
|
104 self:flush(true); |
|
105 else |
|
106 self:debug("We have %d requests open, so no need to force a flush", #self.bosh_waiting_requests); |
|
107 end |
|
108 end |
|
109 |
|
110 function stream_mt:_handle_response_payload(payload) |
|
111 for stanza in payload:childtags() do |
|
112 if stanza.attr.xmlns == xmlns_stream then |
|
113 self:event("stream-"..stanza.name, stanza); |
|
114 elseif stanza.attr.xmlns then |
|
115 self:event("stream/"..stanza.attr.xmlns, stanza); |
|
116 else |
|
117 self:event("stanza", stanza); |
|
118 end |
|
119 end |
|
120 end |
|
121 |
|
122 local stream_callbacks = { |
|
123 stream_ns = "http://jabber.org/protocol/httpbind", stream_tag = "body", |
|
124 default_ns = "jabber:client", |
|
125 streamopened = function (session, attr) session.notopen = nil; session.payload = verse.stanza("body", attr); return true; end; |
|
126 handlestanza = function (session, stanza) session.payload:add_child(stanza); end; |
|
127 }; |
|
128 function stream_mt:_parse_response(response) |
|
129 self:debug("Parsing response: %s", response); |
|
130 local session = { notopen = true, log = self.log }; |
|
131 local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1"); |
|
132 parser:parse(response); |
|
133 return session.payload; |
|
134 end |
|
135 |
|
136 function stream_mt:_make_body() |
|
137 self.bosh_rid = self.bosh_rid + 1; |
|
138 local body = verse.stanza("body", { |
|
139 xmlns = xmlns_bosh; |
|
140 content = "text/xml; charset=utf-8"; |
|
141 sid = self.bosh_sid; |
|
142 rid = self.bosh_rid; |
|
143 }); |
|
144 if self.bosh_need_restart then |
|
145 self.bosh_need_restart = nil; |
|
146 body.attr.restart = 'true'; |
|
147 end |
|
148 return body; |
|
149 end |