verse.bosh: Implemented retry/reconnect logic, and handling of disconnects (either CM-intiated or due to connection failures)

Sun, 08 Aug 2010 01:21:22 +0100

author
Matthew Wild <mwild1@gmail.com>
date
Sun, 08 Aug 2010 01:21:22 +0100
changeset 93
2442e751f3cb
parent 92
dcccef33f0eb
child 94
b40465267fb5

verse.bosh: Implemented retry/reconnect logic, and handling of disconnects (either CM-intiated or due to connection failures)

bosh.lua file | annotate | diff | comparison | revisions
--- a/bosh.lua	Sun Aug 08 01:18:12 2010 +0100
+++ b/bosh.lua	Sun Aug 08 01:21:22 2010 +0100
@@ -10,6 +10,8 @@
 local xmlns_stream = "http://etherx.jabber.org/streams";
 local xmlns_bosh = "http://jabber.org/protocol/httpbind";
 
+local reconnect_timeout = 5;
+
 function verse.new_bosh(logger, url)
 	local stream = {
 		bosh_conn_pool = {};
@@ -21,10 +23,7 @@
 	};
 	function stream:reopen()
 		self.bosh_need_restart = true;
-		self:flush(true);
-	end
-	function stream.bosh_response_handler(response, code, request)
-		return stream:_handle_response(response, code, request);
+		self:flush();
 	end
 	local conn = verse.new(logger, stream);
 	return setmetatable(conn, stream_mt);
@@ -40,10 +39,12 @@
 	self:flush(); --TODO: Optimize by doing this on next tick (give a chance for data to buffer)
 end
 
-function stream_mt:flush(force)
+function stream_mt:flush()
 	if self.connected
 	and #self.bosh_waiting_requests < self.bosh_max_requests
-	and (force or #self.bosh_outgoing_buffer > 0) then
+	and (#self.bosh_waiting_requests == 0
+		or #self.bosh_outgoing_buffer > 0
+		or self.bosh_need_restart) then
 		self:debug("Flushing...");
 		local payload = self:_make_body();
 		local buffer = self.bosh_outgoing_buffer;
@@ -51,13 +52,55 @@
 			payload:add_child(stanza);
 			buffer[i] = nil;
 		end
-		local request = http.request(self.bosh_url, { body = tostring(payload) }, self.bosh_response_handler);
-		table.insert(self.bosh_waiting_requests, request);
+		self:_make_request(payload);
 	else
 		self:debug("Decided not to flush.");
 	end
 end
 
+function stream_mt:_make_request(payload)
+	local request, err = http.request(self.bosh_url, { body = tostring(payload) }, function (response, code, request)
+		if code ~= 0 then
+			self.inactive_since = nil;
+			return self:_handle_response(response, code, request);
+		end
+		
+		-- Connection issues, we need to retry this request
+		local time = os.time();
+		if not self.inactive_since then
+			self.inactive_since = time; -- So we know when it is time to give up
+		elseif time - self.inactive_since > self.bosh_max_inactivity then
+			return self:_disconnected();
+		else
+			self:debug("%d seconds left to reconnect, retrying in %d seconds...", 
+				self.bosh_max_inactivity - (time - self.inactive_since), reconnect_timeout);
+		end
+		
+		-- Set up reconnect timer
+		timer.add_task(reconnect_timeout, function ()
+			self:debug("Retrying request...");
+			-- Remove old request
+			for i, waiting_request in ipairs(self.bosh_waiting_requests) do
+				if waiting_request == request then
+					table.remove(self.bosh_waiting_requests, i);
+					break;
+				end
+			end
+			self:_make_request(payload);
+		end);
+	end);
+	if request then
+		table.insert(self.bosh_waiting_requests, request);
+	else
+		self:warn("Request failed instantly: %s", err);
+	end
+end
+
+function stream_mt:_disconnected()
+	self.connected = nil;
+	self:event("disconnected");
+end
+
 function stream_mt:_send_session_request()
 	local body = self:_make_body();
 	
@@ -72,18 +115,23 @@
 	body.attr.to = self.host;
 	body.attr.secure = 'true';
 	
-	http.request(self.bosh_url, { body = tostring(body) }, function (response)
+	http.request(self.bosh_url, { body = tostring(body) }, function (response, code)
+		if code == 0 then
+			-- Failed to connect
+			return self:_disconnected();
+		end
 		-- Handle session creation response
 		local payload = self:_parse_response(response)
 		if not payload then
 			self:warn("Invalid session creation response");
-			self:event("disconnected");
+			self:_disconnected();
 			return;
 		end
-		self.bosh_sid = payload.attr.sid;
-		self.bosh_wait = tonumber(payload.attr.wait);
-		self.bosh_hold = tonumber(payload.attr.hold);
-		self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold;
+		self.bosh_sid = payload.attr.sid; -- Session id
+		self.bosh_wait = tonumber(payload.attr.wait); -- How long the server may hold connections for
+		self.bosh_hold = tonumber(payload.attr.hold); -- How many connections the server may hold
+		self.bosh_max_inactivity = tonumber(payload.attr.inactivity); -- Max amount of time with no connections
+		self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold; -- Max simultaneous requests we can make
 		self.connected = true;
 		self:event("connected");
 		self:_handle_response_payload(payload);
@@ -93,6 +141,12 @@
 function stream_mt:_handle_response(response, code, request)
 	if self.bosh_waiting_requests[1] ~= request then
 		self:warn("Server replied to request that wasn't the oldest");
+		for i, waiting_request in ipairs(self.bosh_waiting_requests) do
+			if waiting_request == request then
+				self.bosh_waiting_requests[i] = nil;
+				break;
+			end
+		end
 	else
 		table.remove(self.bosh_waiting_requests, 1);
 	end
@@ -100,12 +154,7 @@
 	if payload then
 		self:_handle_response_payload(payload);
 	end
-	if #self.bosh_waiting_requests == 0 then
-		self:debug("We have no requests open, so forcing flush...");
-		self:flush(true);
-	else
-		self:debug("We have %d requests open, so no need to force a flush", #self.bosh_waiting_requests);
-	end
+	self:flush();
 end
 
 function stream_mt:_handle_response_payload(payload)
@@ -118,6 +167,9 @@
 			self:event("stanza", stanza);
 		end
 	end
+	if payload.attr.type == "terminate" then
+		self:_disconnected({reason = payload.attr.condition});
+	end
 end
 
 local stream_callbacks = {
@@ -128,6 +180,11 @@
 };
 function stream_mt:_parse_response(response)
 	self:debug("Parsing response: %s", response);
+	if response == nil then
+		self:debug("%s", debug.traceback());
+		self:_disconnected();
+		return;
+	end
 	local session = { notopen = true, log = self.log };
 	local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1");
 	parser:parse(response);

mercurial