plugins.smacks: Re-send unacked outgoing stanzas on resumption

Sun, 10 Feb 2013 01:54:30 +0100

author
Kim Alvefur <zash@zash.se>
date
Sun, 10 Feb 2013 01:54:30 +0100
changeset 321
369d4638d775
parent 320
e04f10664704
child 322
819b35b8fcf6

plugins.smacks: Re-send unacked outgoing stanzas on resumption

plugins/smacks.lua file | annotate | diff | comparison | revisions
--- a/plugins/smacks.lua	Sun Feb 10 00:10:19 2013 +0100
+++ b/plugins/smacks.lua	Sun Feb 10 01:54:30 2013 +0100
@@ -17,7 +17,21 @@
 			stream:debug("Increasing handled stanzas to %d for %s", handled_stanza_count, stanza:top_tag());
 		end
 	end
-	
+
+	-- Catch outgoing stanzas
+	function outgoing_stanza(stanza)
+		-- NOTE: This will not behave nice if stanzas are serialized before this point
+		if stanza.name and not stanza.attr.xmlns then
+			-- serialize stanzas in order to bypass this on resumption
+			outgoing_queue[#outgoing_queue+1] = tostring(stanza);
+			verse.add_task(1, function()
+				if #outgoing_queue > 0 then
+					stream:send(verse.stanza("r", { xmlns = xmlns_sm }));
+				end
+			end);
+		end
+	end
+
 	local function on_disconnect()
 		stream:debug("smacks: connection lost");
 		stream.stream_management_supported = nil;
@@ -49,28 +63,29 @@
 			end
 		elseif stanza.name == "enabled" then
 			stream.smacks = true;
-			-- Catch outgoing stanzas
-			local old_send = stream.send;
-			function stream.send(stream, stanza)
-				stream:warn("SENDING");
-				if stanza.name and not stanza.attr.xmlns then
-					outgoing_queue[#outgoing_queue+1] = stanza;
-					local ret = old_send(stream, stanza);
-					old_send(stream, verse.stanza("r", { xmlns = xmlns_sm }));
-					return ret;
-				end
-				return old_send(stream, stanza);
-			end
-			-- Catch incoming stanzas
+
+			-- Catch stanzas
 			stream:hook("stanza", incoming_stanza);
+			stream:hook("outgoing", outgoing_stanza);
 			
 			if stanza.attr.id then
 				stream.resumption_token = stanza.attr.id;
 				stream:hook("disconnected", on_disconnect, 100);
 			end
 		elseif stanza.name == "resumed" then
-			--TODO: Check h of the resumed element, discard any acked stanzas from
-			--      our queue (to prevent duplicates), then re-send any lost stanzas.
+			local new_ack = tonumber(stanza.attr.h);
+			if new_ack > last_ack then
+				local old_unacked = #outgoing_queue;
+				for i=last_ack+1,new_ack do
+					table.remove(outgoing_queue, 1);
+				end
+				stream:debug("Received ack: New ack: "..new_ack.." Last ack: "..last_ack.." Unacked stanzas now: "..#outgoing_queue.." (was "..old_unacked..")");
+				last_ack = new_ack;
+			end
+			for i=1,#outgoing_queue do
+				stream:send(outgoing_queue[i]);
+			end
+			outgoing_queue = {};
 			stream:debug("Resumed successfully");
 			stream:event("resumed");
 		else

mercurial