core/s2smanager.lua

changeset 631
6957fe7b0313
parent 621
cd2cab5400fc
child 739
1def06cd9311
equal deleted inserted replaced
627:3712d36b6d25 631:6957fe7b0313
19 19
20 20
21 21
22 local hosts = hosts; 22 local hosts = hosts;
23 local sessions = sessions; 23 local sessions = sessions;
24 local core_process_stanza = function(a, b) core_process_stanza(a, b); end
24 local socket = require "socket"; 25 local socket = require "socket";
25 local format = string.format; 26 local format = string.format;
26 local t_insert, t_sort = table.insert, table.sort; 27 local t_insert, t_sort = table.insert, table.sort;
27 local get_traceback = debug.traceback; 28 local get_traceback = debug.traceback;
28 local tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber 29 local tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber
51 local incoming_s2s = incoming_s2s; 52 local incoming_s2s = incoming_s2s;
52 53
53 module "s2smanager" 54 module "s2smanager"
54 55
55 local function compare_srv_priorities(a,b) return a.priority < b.priority or a.weight < b.weight; end 56 local function compare_srv_priorities(a,b) return a.priority < b.priority or a.weight < b.weight; end
57
58 local function bounce_sendq(session)
59 local sendq = session.sendq;
60 if sendq then
61 session.log("debug", "sending error replies for "..#sendq.." queued stanzas because of failed outgoing connection to "..tostring(session.to_host));
62 local dummy = {
63 type = "s2sin";
64 send = function(s)
65 (session.log or log)("error", "Replying to to an s2s error reply, please report this! Traceback: %s", get_traceback());
66 end;
67 dummy = true;
68 };
69 for i, data in ipairs(sendq) do
70 local reply = data[2];
71 local xmlns = reply.attr.xmlns;
72 if not xmlns or xmlns == "jabber:client" or xmlns == "jabber:server" then
73 reply.attr.type = "error";
74 reply:tag("error", {type = "cancel"})
75 :tag("remote-server-not-found", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up();
76 core_process_stanza(dummy, reply);
77 end
78 sendq[i] = nil;
79 end
80 session.sendq = nil;
81 end
82 end
56 83
57 function send_to_host(from_host, to_host, data) 84 function send_to_host(from_host, to_host, data)
58 local host = hosts[from_host].s2sout[to_host]; 85 local host = hosts[from_host].s2sout[to_host];
59 if host then 86 if host then
60 -- We have a connection to this host already 87 -- We have a connection to this host already
64 host.log("debug", "dialback had not been initiated"); 91 host.log("debug", "dialback had not been initiated");
65 initiate_dialback(host); 92 initiate_dialback(host);
66 end 93 end
67 94
68 -- Queue stanza until we are able to send it 95 -- Queue stanza until we are able to send it
69 if host.sendq then t_insert(host.sendq, tostring(data)); 96 if host.sendq then t_insert(host.sendq, {tostring(data), st.reply(data)});
70 else host.sendq = { tostring(data) }; end 97 else host.sendq = { {tostring(data), st.reply(data)} }; end
71 host.log("debug", "stanza [%s] queued ", data.name); 98 host.log("debug", "stanza [%s] queued ", data.name);
72 elseif host.type == "local" or host.type == "component" then 99 elseif host.type == "local" or host.type == "component" then
73 log("error", "Trying to send a stanza to ourselves??") 100 log("error", "Trying to send a stanza to ourselves??")
74 log("error", "Traceback: %s", get_traceback()); 101 log("error", "Traceback: %s", get_traceback());
75 log("error", "Stanza: %s", tostring(data)); 102 log("error", "Stanza: %s", tostring(data));
85 end 112 end
86 else 113 else
87 log("debug", "opening a new outgoing connection for this stanza"); 114 log("debug", "opening a new outgoing connection for this stanza");
88 local host_session = new_outgoing(from_host, to_host); 115 local host_session = new_outgoing(from_host, to_host);
89 -- Store in buffer 116 -- Store in buffer
90 host_session.sendq = { tostring(data) }; 117 host_session.sendq = { {tostring(data), st.reply(data)} };
118 if not host_session.conn then destroy_session(host_session); end
91 end 119 end
92 end 120 end
93 121
94 local open_sessions = 0; 122 local open_sessions = 0;
95 123
162 -- Ok, we're going to try to connect 190 -- Ok, we're going to try to connect
163 conn:settimeout(0); 191 conn:settimeout(0);
164 local success, err = conn:connect(connect_host, connect_port); 192 local success, err = conn:connect(connect_host, connect_port);
165 if not success and err ~= "timeout" then 193 if not success and err ~= "timeout" then
166 log("warn", "s2s connect() failed: %s", err); 194 log("warn", "s2s connect() failed: %s", err);
195 return false;
167 end 196 end
168 197
169 local cl = connlisteners_get("xmppserver"); 198 local cl = connlisteners_get("xmppserver");
170 conn = wraptlsclient(cl, conn, connect_host, connect_port, 0, cl.default_mode or 1, hosts[from_host].ssl_ctx ); 199 conn = wraptlsclient(cl, conn, connect_host, connect_port, 0, cl.default_mode or 1, hosts[from_host].ssl_ctx );
171 host_session.conn = conn; 200 host_session.conn = conn;
276 305
277 if session.direction == "outgoing" then 306 if session.direction == "outgoing" then
278 if sendq then 307 if sendq then
279 session.log("debug", "sending "..#sendq.." queued stanzas across new outgoing connection to "..session.to_host); 308 session.log("debug", "sending "..#sendq.." queued stanzas across new outgoing connection to "..session.to_host);
280 for i, data in ipairs(sendq) do 309 for i, data in ipairs(sendq) do
281 send(data); 310 send(data[1]);
282 sendq[i] = nil; 311 sendq[i] = nil;
283 end 312 end
284 session.sendq = nil; 313 session.sendq = nil;
285 end 314 end
286 end 315 end
287 end 316 end
288 317
289 function destroy_session(session) 318 function destroy_session(session)
290 (session.log or log)("info", "Destroying "..tostring(session.direction).." session "..tostring(session.from_host).."->"..tostring(session.to_host)); 319 (session.log or log)("info", "Destroying "..tostring(session.direction).." session "..tostring(session.from_host).."->"..tostring(session.to_host));
291 320
292 -- FIXME: Flush sendq here/report errors to originators
293 321
294 if session.direction == "outgoing" then 322 if session.direction == "outgoing" then
295 hosts[session.from_host].s2sout[session.to_host] = nil; 323 hosts[session.from_host].s2sout[session.to_host] = nil;
324 bounce_sendq(session);
296 elseif session.direction == "incoming" then 325 elseif session.direction == "incoming" then
297 incoming_s2s[session] = nil; 326 incoming_s2s[session] = nil;
298 end 327 end
299 328
300 for k in pairs(session) do 329 for k in pairs(session) do

mercurial