|
1 local st = require "util.stanza"; |
|
2 |
|
3 local xmlns_sm = "urn:xmpp:sm:2"; |
|
4 |
|
5 function verse.plugins.smacks(stream) |
|
6 -- State for outgoing stanzas |
|
7 local outgoing_queue = {}; |
|
8 local last_ack = 0; |
|
9 |
|
10 -- State for incoming stanzas |
|
11 local handled_stanza_count = 0; |
|
12 |
|
13 -- Catch incoming stanzas |
|
14 local function incoming_stanza(stanza) |
|
15 if stanza.attr.xmlns == "jabber:client" or not stanza.attr.xmlns then |
|
16 handled_stanza_count = handled_stanza_count + 1; |
|
17 stream:debug("Increasing handled stanzas to %d for %s", handled_stanza_count, stanza:top_tag()); |
|
18 end |
|
19 end |
|
20 |
|
21 local function on_disconnect() |
|
22 stream.stream_management_supported = nil; |
|
23 if stream.resumption_token then |
|
24 stream.authenticated = nil; |
|
25 stream:connect(stream.connect_host or stream.host, stream.connect_port or 5222); |
|
26 stream:reopen(); |
|
27 return true; |
|
28 end |
|
29 end |
|
30 |
|
31 local function handle_sm_command(stanza) |
|
32 if stanza.name == "r" then -- Request for acks for stanzas we received |
|
33 stream:send(verse.stanza("a", { xmlns = xmlns_sm, h = tostring(handled_stanza_count) })); |
|
34 elseif stanza.name == "a" then -- Ack for stanzas we sent |
|
35 local new_ack = tonumber(stanza.attr.h); |
|
36 if new_ack > last_ack then |
|
37 local old_unacked = #outgoing_queue; |
|
38 for i=last_ack+1,new_ack do |
|
39 table.remove(outgoing_queue, 1); |
|
40 end |
|
41 stream:debug("Received ack: New ack: "..new_ack.." Last ack: "..last_ack.." Unacked stanzas now: "..#outgoing_queue.." (was "..old_unacked..")"); |
|
42 last_ack = new_ack; |
|
43 else |
|
44 stream:warn("Received bad ack for "..new_ack.." when last ack was "..last_ack); |
|
45 end |
|
46 elseif stanza.name == "enabled" then |
|
47 stream.smacks = true; |
|
48 -- Catch outgoing stanzas |
|
49 local old_send = stream.send; |
|
50 function stream.send(stream, stanza) |
|
51 stream:warn("SENDING"); |
|
52 if not stanza.attr.xmlns then |
|
53 outgoing_queue[#outgoing_queue+1] = stanza; |
|
54 local ret = old_send(stream, stanza); |
|
55 old_send(stream, verse.stanza("r", { xmlns = xmlns_sm })); |
|
56 return ret; |
|
57 end |
|
58 return old_send(stream, stanza); |
|
59 end |
|
60 -- Catch incoming stanzas |
|
61 stream:hook("stanza", incoming_stanza); |
|
62 |
|
63 if stanza.attr.id then |
|
64 stream.resumption_token = stanza.attr.id; |
|
65 stream:hook("disconnected", on_disconnect, 100); |
|
66 end |
|
67 elseif stanza.name == "resumed" then |
|
68 stream:debug("Resumed successfully"); |
|
69 stream:send(verse.message{to="me@matthewwild.co.uk", type="chat"}:tag("body"):text("Hi again!")); |
|
70 else |
|
71 stream:warn("Don't know how to handle "..xmlns_sm.."/"..stanza.name); |
|
72 end |
|
73 end |
|
74 |
|
75 local function on_bind_success() |
|
76 if not stream.smacks then |
|
77 --stream:unhook("bind-success", on_bind_success); |
|
78 stream:send(st.stanza("enable", { xmlns = xmlns_sm, resume = "true" })); |
|
79 end |
|
80 end |
|
81 |
|
82 local function on_features(features) |
|
83 if features:get_child("sm", xmlns_sm) then |
|
84 stream.stream_management_supported = true; |
|
85 if stream.smacks and stream.bound then -- Already enabled in a previous session - resume |
|
86 stream:send(st.stanza("resume", { xmlns = xmlns_sm, |
|
87 h = handled_stanza_count, previd = stream.resumption_token })); |
|
88 else |
|
89 stream:hook("bind-success", on_bind_success); |
|
90 end |
|
91 return true; |
|
92 end |
|
93 end |
|
94 |
|
95 stream:hook("stream-features", on_features, 150); |
|
96 stream:hook("stream/"..xmlns_sm, handle_sm_command); |
|
97 --stream:hook("ready", on_stream_ready, 500); |
|
98 end |