bosh.lua

changeset 87
d59073722924
child 89
1752a9097e6b
equal deleted inserted replaced
86:508f653e9d46 87:d59073722924
1
2 local init_xmlhandlers = require "core.xmlhandlers";
3 local st = require "util.stanza";
4
5 local stream_mt = setmetatable({}, { __index = verse.stream_mt });
6 stream_mt.__index = stream_mt;
7
8 local xmlns_stream = "http://etherx.jabber.org/streams";
9 local xmlns_bosh = "http://jabber.org/protocol/httpbind";
10
11 function verse.new_bosh(logger, url)
12 local stream = {
13 bosh_conn_pool = {};
14 bosh_waiting_requests = {};
15 bosh_rid = math.random(1,999999);
16 bosh_outgoing_buffer = {};
17 bosh_url = url;
18 conn = {};
19 };
20 function stream:reopen()
21 self.bosh_need_restart = true;
22 self:flush(true);
23 end
24 function stream.bosh_response_handler(response, code, request)
25 return stream:_handle_response(response, code, request);
26 end
27 local conn = verse.new(logger, stream);
28 conn:add_plugin("http");
29 return setmetatable(conn, stream_mt);
30 end
31
32 function stream_mt:connect()
33 self:_send_session_request();
34 end
35
36 function stream_mt:send(data)
37 self:debug("Putting into BOSH send buffer: %s", tostring(data));
38 self.bosh_outgoing_buffer[#self.bosh_outgoing_buffer+1] = st.clone(data);
39 self:flush(); --TODO: Optimize by doing this on next tick (give a chance for data to buffer)
40 end
41
42 function stream_mt:flush(force)
43 if self.connected
44 and #self.bosh_waiting_requests < self.bosh_max_requests
45 and (force or #self.bosh_outgoing_buffer > 0) then
46 self:debug("Flushing...");
47 local payload = self:_make_body();
48 local buffer = self.bosh_outgoing_buffer;
49 for i, stanza in ipairs(buffer) do
50 payload:add_child(stanza);
51 buffer[i] = nil;
52 end
53 local request = self.http.request(self.bosh_url, { body = tostring(payload) }, self.bosh_response_handler);
54 table.insert(self.bosh_waiting_requests, request);
55 else
56 self:debug("Decided not to flush.");
57 end
58 end
59
60 function stream_mt:_send_session_request()
61 local body = self:_make_body();
62
63 -- XEP-0124
64 body.attr.hold = "1";
65 body.attr.wait = "60";
66 body.attr["xml:lang"] = "en";
67 body.attr.ver = "1.6";
68
69 -- XEP-0206
70 body.attr.from = self.jid;
71 body.attr.to = self.host;
72 body.attr.secure = 'true';
73
74 self.http.request(self.bosh_url, { body = tostring(body) }, function (response)
75 -- Handle session creation response
76 local payload = self:_parse_response(response)
77 if not payload then
78 self:warn("Invalid session creation response");
79 self:event("disconnected");
80 return;
81 end
82 self.bosh_sid = payload.attr.sid;
83 self.bosh_wait = tonumber(payload.attr.wait);
84 self.bosh_hold = tonumber(payload.attr.hold);
85 self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold;
86 self.connected = true;
87 self:event("connected");
88 self:_handle_response_payload(payload);
89 end);
90 end
91
92 function stream_mt:_handle_response(response, code, request)
93 if self.bosh_waiting_requests[1] ~= request then
94 self:warn("Server replied to request that wasn't the oldest");
95 else
96 table.remove(self.bosh_waiting_requests, 1);
97 end
98 local payload = self:_parse_response(response);
99 if payload then
100 self:_handle_response_payload(payload);
101 end
102 if #self.bosh_waiting_requests == 0 then
103 self:debug("We have no requests open, so forcing flush...");
104 self:flush(true);
105 else
106 self:debug("We have %d requests open, so no need to force a flush", #self.bosh_waiting_requests);
107 end
108 end
109
110 function stream_mt:_handle_response_payload(payload)
111 for stanza in payload:childtags() do
112 if stanza.attr.xmlns == xmlns_stream then
113 self:event("stream-"..stanza.name, stanza);
114 elseif stanza.attr.xmlns then
115 self:event("stream/"..stanza.attr.xmlns, stanza);
116 else
117 self:event("stanza", stanza);
118 end
119 end
120 end
121
122 local stream_callbacks = {
123 stream_ns = "http://jabber.org/protocol/httpbind", stream_tag = "body",
124 default_ns = "jabber:client",
125 streamopened = function (session, attr) session.notopen = nil; session.payload = verse.stanza("body", attr); return true; end;
126 handlestanza = function (session, stanza) session.payload:add_child(stanza); end;
127 };
128 function stream_mt:_parse_response(response)
129 self:debug("Parsing response: %s", response);
130 local session = { notopen = true, log = self.log };
131 local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1");
132 parser:parse(response);
133 return session.payload;
134 end
135
136 function stream_mt:_make_body()
137 self.bosh_rid = self.bosh_rid + 1;
138 local body = verse.stanza("body", {
139 xmlns = xmlns_bosh;
140 content = "text/xml; charset=utf-8";
141 sid = self.bosh_sid;
142 rid = self.bosh_rid;
143 });
144 if self.bosh_need_restart then
145 self.bosh_need_restart = nil;
146 body.attr.restart = 'true';
147 end
148 return body;
149 end

mercurial