# HG changeset patch # User Kim Alvefur # Date 1360457670 -3600 # Node ID 369d4638d775472dd7be7f7759ba412ee21251c8 # Parent e04f1066470447b360d92ebccd6e1a2e4de56ac2 plugins.smacks: Re-send unacked outgoing stanzas on resumption diff -r e04f10664704 -r 369d4638d775 plugins/smacks.lua --- a/plugins/smacks.lua Sun Feb 10 00:10:19 2013 +0100 +++ b/plugins/smacks.lua Sun Feb 10 01:54:30 2013 +0100 @@ -17,7 +17,21 @@ stream:debug("Increasing handled stanzas to %d for %s", handled_stanza_count, stanza:top_tag()); end end - + + -- Catch outgoing stanzas + function outgoing_stanza(stanza) + -- NOTE: This will not behave nice if stanzas are serialized before this point + if stanza.name and not stanza.attr.xmlns then + -- serialize stanzas in order to bypass this on resumption + outgoing_queue[#outgoing_queue+1] = tostring(stanza); + verse.add_task(1, function() + if #outgoing_queue > 0 then + stream:send(verse.stanza("r", { xmlns = xmlns_sm })); + end + end); + end + end + local function on_disconnect() stream:debug("smacks: connection lost"); stream.stream_management_supported = nil; @@ -49,28 +63,29 @@ end elseif stanza.name == "enabled" then stream.smacks = true; - -- Catch outgoing stanzas - local old_send = stream.send; - function stream.send(stream, stanza) - stream:warn("SENDING"); - if stanza.name and not stanza.attr.xmlns then - outgoing_queue[#outgoing_queue+1] = stanza; - local ret = old_send(stream, stanza); - old_send(stream, verse.stanza("r", { xmlns = xmlns_sm })); - return ret; - end - return old_send(stream, stanza); - end - -- Catch incoming stanzas + + -- Catch stanzas stream:hook("stanza", incoming_stanza); + stream:hook("outgoing", outgoing_stanza); if stanza.attr.id then stream.resumption_token = stanza.attr.id; stream:hook("disconnected", on_disconnect, 100); end elseif stanza.name == "resumed" then - --TODO: Check h of the resumed element, discard any acked stanzas from - -- our queue (to prevent duplicates), then re-send any lost stanzas. + local new_ack = tonumber(stanza.attr.h); + if new_ack > last_ack then + local old_unacked = #outgoing_queue; + for i=last_ack+1,new_ack do + table.remove(outgoing_queue, 1); + end + stream:debug("Received ack: New ack: "..new_ack.." Last ack: "..last_ack.." Unacked stanzas now: "..#outgoing_queue.." (was "..old_unacked..")"); + last_ack = new_ack; + end + for i=1,#outgoing_queue do + stream:send(outgoing_queue[i]); + end + outgoing_queue = {}; stream:debug("Resumed successfully"); stream:event("resumed"); else