19 |
19 |
20 |
20 |
21 |
21 |
22 local hosts = hosts; |
22 local hosts = hosts; |
23 local sessions = sessions; |
23 local sessions = sessions; |
|
24 local core_process_stanza = function(a, b) core_process_stanza(a, b); end |
24 local socket = require "socket"; |
25 local socket = require "socket"; |
25 local format = string.format; |
26 local format = string.format; |
26 local t_insert, t_sort = table.insert, table.sort; |
27 local t_insert, t_sort = table.insert, table.sort; |
27 local get_traceback = debug.traceback; |
28 local get_traceback = debug.traceback; |
28 local tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber |
29 local tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber |
51 local incoming_s2s = incoming_s2s; |
52 local incoming_s2s = incoming_s2s; |
52 |
53 |
53 module "s2smanager" |
54 module "s2smanager" |
54 |
55 |
55 local function compare_srv_priorities(a,b) return a.priority < b.priority or a.weight < b.weight; end |
56 local function compare_srv_priorities(a,b) return a.priority < b.priority or a.weight < b.weight; end |
|
57 |
|
58 local function bounce_sendq(session) |
|
59 local sendq = session.sendq; |
|
60 if sendq then |
|
61 session.log("debug", "sending error replies for "..#sendq.." queued stanzas because of failed outgoing connection to "..tostring(session.to_host)); |
|
62 local dummy = { |
|
63 type = "s2sin"; |
|
64 send = function(s) |
|
65 (session.log or log)("error", "Replying to to an s2s error reply, please report this! Traceback: %s", get_traceback()); |
|
66 end; |
|
67 dummy = true; |
|
68 }; |
|
69 for i, data in ipairs(sendq) do |
|
70 local reply = data[2]; |
|
71 local xmlns = reply.attr.xmlns; |
|
72 if not xmlns or xmlns == "jabber:client" or xmlns == "jabber:server" then |
|
73 reply.attr.type = "error"; |
|
74 reply:tag("error", {type = "cancel"}) |
|
75 :tag("remote-server-not-found", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up(); |
|
76 core_process_stanza(dummy, reply); |
|
77 end |
|
78 sendq[i] = nil; |
|
79 end |
|
80 session.sendq = nil; |
|
81 end |
|
82 end |
56 |
83 |
57 function send_to_host(from_host, to_host, data) |
84 function send_to_host(from_host, to_host, data) |
58 local host = hosts[from_host].s2sout[to_host]; |
85 local host = hosts[from_host].s2sout[to_host]; |
59 if host then |
86 if host then |
60 -- We have a connection to this host already |
87 -- We have a connection to this host already |
64 host.log("debug", "dialback had not been initiated"); |
91 host.log("debug", "dialback had not been initiated"); |
65 initiate_dialback(host); |
92 initiate_dialback(host); |
66 end |
93 end |
67 |
94 |
68 -- Queue stanza until we are able to send it |
95 -- Queue stanza until we are able to send it |
69 if host.sendq then t_insert(host.sendq, tostring(data)); |
96 if host.sendq then t_insert(host.sendq, {tostring(data), st.reply(data)}); |
70 else host.sendq = { tostring(data) }; end |
97 else host.sendq = { {tostring(data), st.reply(data)} }; end |
71 host.log("debug", "stanza [%s] queued ", data.name); |
98 host.log("debug", "stanza [%s] queued ", data.name); |
72 elseif host.type == "local" or host.type == "component" then |
99 elseif host.type == "local" or host.type == "component" then |
73 log("error", "Trying to send a stanza to ourselves??") |
100 log("error", "Trying to send a stanza to ourselves??") |
74 log("error", "Traceback: %s", get_traceback()); |
101 log("error", "Traceback: %s", get_traceback()); |
75 log("error", "Stanza: %s", tostring(data)); |
102 log("error", "Stanza: %s", tostring(data)); |
85 end |
112 end |
86 else |
113 else |
87 log("debug", "opening a new outgoing connection for this stanza"); |
114 log("debug", "opening a new outgoing connection for this stanza"); |
88 local host_session = new_outgoing(from_host, to_host); |
115 local host_session = new_outgoing(from_host, to_host); |
89 -- Store in buffer |
116 -- Store in buffer |
90 host_session.sendq = { tostring(data) }; |
117 host_session.sendq = { {tostring(data), st.reply(data)} }; |
|
118 if not host_session.conn then destroy_session(host_session); end |
91 end |
119 end |
92 end |
120 end |
93 |
121 |
94 local open_sessions = 0; |
122 local open_sessions = 0; |
95 |
123 |
162 -- Ok, we're going to try to connect |
190 -- Ok, we're going to try to connect |
163 conn:settimeout(0); |
191 conn:settimeout(0); |
164 local success, err = conn:connect(connect_host, connect_port); |
192 local success, err = conn:connect(connect_host, connect_port); |
165 if not success and err ~= "timeout" then |
193 if not success and err ~= "timeout" then |
166 log("warn", "s2s connect() failed: %s", err); |
194 log("warn", "s2s connect() failed: %s", err); |
|
195 return false; |
167 end |
196 end |
168 |
197 |
169 local cl = connlisteners_get("xmppserver"); |
198 local cl = connlisteners_get("xmppserver"); |
170 conn = wraptlsclient(cl, conn, connect_host, connect_port, 0, cl.default_mode or 1, hosts[from_host].ssl_ctx ); |
199 conn = wraptlsclient(cl, conn, connect_host, connect_port, 0, cl.default_mode or 1, hosts[from_host].ssl_ctx ); |
171 host_session.conn = conn; |
200 host_session.conn = conn; |
276 |
305 |
277 if session.direction == "outgoing" then |
306 if session.direction == "outgoing" then |
278 if sendq then |
307 if sendq then |
279 session.log("debug", "sending "..#sendq.." queued stanzas across new outgoing connection to "..session.to_host); |
308 session.log("debug", "sending "..#sendq.." queued stanzas across new outgoing connection to "..session.to_host); |
280 for i, data in ipairs(sendq) do |
309 for i, data in ipairs(sendq) do |
281 send(data); |
310 send(data[1]); |
282 sendq[i] = nil; |
311 sendq[i] = nil; |
283 end |
312 end |
284 session.sendq = nil; |
313 session.sendq = nil; |
285 end |
314 end |
286 end |
315 end |
287 end |
316 end |
288 |
317 |
289 function destroy_session(session) |
318 function destroy_session(session) |
290 (session.log or log)("info", "Destroying "..tostring(session.direction).." session "..tostring(session.from_host).."->"..tostring(session.to_host)); |
319 (session.log or log)("info", "Destroying "..tostring(session.direction).." session "..tostring(session.from_host).."->"..tostring(session.to_host)); |
291 |
320 |
292 -- FIXME: Flush sendq here/report errors to originators |
|
293 |
321 |
294 if session.direction == "outgoing" then |
322 if session.direction == "outgoing" then |
295 hosts[session.from_host].s2sout[session.to_host] = nil; |
323 hosts[session.from_host].s2sout[session.to_host] = nil; |
|
324 bounce_sendq(session); |
296 elseif session.direction == "incoming" then |
325 elseif session.direction == "incoming" then |
297 incoming_s2s[session] = nil; |
326 incoming_s2s[session] = nil; |
298 end |
327 end |
299 |
328 |
300 for k in pairs(session) do |
329 for k in pairs(session) do |