Merge with tip.

Mon, 30 Nov 2009 23:25:10 +0100

author
Tobias Markmann <tm@ayena.de>
date
Mon, 30 Nov 2009 23:25:10 +0100
changeset 2285
efea91c740a2
parent 2284
e0b2d934f316 (diff)
parent 2265
7fe644057dc2 (current diff)
child 2286
3c17fc919f7b

Merge with tip.

--- a/plugins/mod_compression.lua	Mon Nov 30 20:53:25 2009 +0100
+++ b/plugins/mod_compression.lua	Mon Nov 30 23:25:10 2009 +0100
@@ -8,16 +8,16 @@
 local st = require "util.stanza";
 local zlib = require "zlib";
 local pcall = pcall;
-
 local xmlns_compression_feature = "http://jabber.org/features/compress"
 local xmlns_compression_protocol = "http://jabber.org/protocol/compress"
+local xmlns_stream = "http://etherx.jabber.org/streams";
 local compression_stream_feature = st.stanza("compression", {xmlns=xmlns_compression_feature}):tag("method"):text("zlib"):up();
 
 local compression_level = module:get_option("compression_level");
-
 -- if not defined assume admin wants best compression
 if compression_level == nil then compression_level = 9 end;
 
+
 compression_level = tonumber(compression_level);
 if not compression_level or compression_level < 1 or compression_level > 9 then
 	module:log("warn", "Invalid compression level in config: %s", tostring(compression_level));
@@ -34,89 +34,182 @@
 		end
 );
 
+module:hook("s2s-stream-features",
+		function (data)
+			local session, features = data.session, data.features;
+			-- FIXME only advertise compression support when TLS layer has no compression enabled
+			if not session.compressed then 
+				module:log("debug", "s2s-stream-features YAY YAHOO")
+				features:add_child(compression_stream_feature);
+			end
+		end
+);
+
+-- S2Sout handling aka the client perspective in the S2S connection
+module:hook_stanza(xmlns_stream, "features",
+		function (session, stanza)
+			if not session.compressed then
+				module:log("debug", "FEATURES: "..stanza:pretty_print())
+				-- does remote server support compression?
+				local comp_st = stanza:child_with_name("compression");
+				if comp_st then
+					-- do we support the mechanism
+					for a in comp_st:children() do
+						local algorithm = a[1]
+						if algorithm == "zlib" then
+							session.sends2s(st.stanza("compress", {xmlns=xmlns_compression_protocol}):tag("method"):text("zlib"))
+							session.log("info", "Enabled compression using zlib.")
+							return true;
+						end
+					end
+					session.log("debug", "Remote server supports no compression algorithm we support.")
+				end
+			end
+		end
+, 250);
+
+
+-- returns either nil or a fully functional ready to use inflate stream
+local function get_deflate_stream(session)
+	local status, deflate_stream = pcall(zlib.deflate, compression_level);
+	if status == false then
+		local error_st = st.stanza("failure", {xmlns=xmlns_compression_protocol}):tag("setup-failed");
+		(session.sends2s or session.send)(error_st);
+		session.log("error", "Failed to create zlib.deflate filter.");
+		module:log("error", deflate_stream);
+		return
+	end
+	return deflate_stream
+end
+
+-- returns either nil or a fully functional ready to use inflate stream
+local function get_inflate_stream(session)
+	local status, inflate_stream = pcall(zlib.inflate);
+	if status == false then
+		local error_st = st.stanza("failure", {xmlns=xmlns_compression_protocol}):tag("setup-failed");
+		(session.sends2s or session.send)(error_st);
+		session.log("error", "Failed to create zlib.deflate filter.");
+		module:log("error", inflate_stream);
+		return
+	end
+	return inflate_stream
+end
+
+-- setup compression for a stream
+local function setup_compression(session, deflate_stream)
+	local old_send = (session.sends2s or session.send);
+	
+	local new_send = function(t)
+			--TODO: Better code injection in the sending process
+			session.log(t)
+			local status, compressed, eof = pcall(deflate_stream, tostring(t), 'sync');
+			if status == false then
+				session:close({
+					condition = "undefined-condition";
+					text = compressed;
+					extra = st.stanza("failure", {xmlns="http://jabber.org/protocol/compress"}):tag("processing-failed");
+				});
+				module:log("warn", compressed);
+				return;
+			end
+			session.conn:write(compressed);
+		end;
+	
+	if session.sends2s then session.sends2s = new_send
+	elseif session.send then session.send = new_send end
+end
+
+-- setup decompression for a stream
+local function setup_decompression(session, inflate_stream)
+	local old_data = session.data
+	session.data = function(conn, data)
+			local status, decompressed, eof = pcall(inflate_stream, data);
+			if status == false then
+				session:close({
+					condition = "undefined-condition";
+					text = decompressed;
+					extra = st.stanza("failure", {xmlns="http://jabber.org/protocol/compress"}):tag("processing-failed");
+				});
+				module:log("warn", decompressed);
+				return;
+			end
+			old_data(conn, decompressed);
+		end;
+end
+
 -- TODO Support compression on S2S level too.
-module:add_handler({"c2s_unauthed", "c2s"}, "compress", xmlns_compression_protocol,
+module:add_handler({"s2sout_unauthed", "s2sout"}, "compressed", xmlns_compression_protocol, 
+		function(session ,stanza)
+			session.log("debug", "Activating compression...")
+			-- create deflate and inflate streams
+			deflate_stream = get_deflate_stream(session);
+			if not deflate_stream then return end
+			
+			inflate_stream = get_inflate_stream(session);
+			if not inflate_stream then return end
+			
+			-- setup compression for session.w
+			setup_compression(session, deflate_stream);
+				
+			-- setup decompression for session.data
+			setup_decompression(session, inflate_stream);
+			local session_reset_stream = session.reset_stream;
+			session.reset_stream = function(session)
+					session_reset_stream(session);
+					setup_decompression(session, inflate_stream);
+					return true;
+				end;
+			session:reset_stream();
+			local default_stream_attr = {xmlns = "jabber:server", ["xmlns:stream"] = "http://etherx.jabber.org/streams",
+										version = "1.0", to = session.to_host, from = session.from_host};
+			session.sends2s("<?xml version='1.0'?>");
+			session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag());			
+			session.compressed = true;
+		end
+);
+
+module:add_handler({"c2s_unauthed", "c2s", "s2sin_unauthed", "s2sin"}, "compress", xmlns_compression_protocol,
 		function(session, stanza)
 			-- fail if we are already compressed
 			if session.compressed then
 				local error_st = st.stanza("failure", {xmlns=xmlns_compression_protocol}):tag("unsupported-method");
-				session.send(error_st);
-				session:log("warn", "Tried to establish another compression layer.");
+				(session.sends2s or session.send)(error_st);
+				session.log("warn", "Tried to establish another compression layer.");
 			end
 			
 			-- checking if the compression method is supported
 			local method = stanza:child_with_name("method")[1];
 			if method == "zlib" then
 				session.log("info", method.." compression selected.");
-				session.send(st.stanza("compressed", {xmlns=xmlns_compression_protocol}));
-				session:reset_stream();
 				
 				-- create deflate and inflate streams
-				local status, deflate_stream = pcall(zlib.deflate, compression_level);
-				if status == false then
-					local error_st = st.stanza("failure", {xmlns=xmlns_compression_protocol}):tag("setup-failed");
-					session.send(error_st);
-					session:log("error", "Failed to create zlib.deflate filter.");
-					module:log("error", deflate_stream);
-					return
-				end
+				deflate_stream = get_deflate_stream(session);
+				if not deflate_stream then return end
 				
-				local status, inflate_stream = pcall(zlib.inflate);
-				if status == false then
-					local error_st = st.stanza("failure", {xmlns=xmlns_compression_protocol}):tag("setup-failed");
-					session.send(error_st);
-					session:log("error", "Failed to create zlib.deflate filter.");
-					module:log("error", inflate_stream);
-					return
-				end
+				inflate_stream = get_inflate_stream(session);
+				if not inflate_stream then return end
+				
+				(session.sends2s or session.send)(st.stanza("compressed", {xmlns=xmlns_compression_protocol}));
+				session:reset_stream();
 				
 				-- setup compression for session.w
-				local old_send = session.send;
-				
-				session.send = function(t)
-						local status, compressed, eof = pcall(deflate_stream, tostring(t), 'sync');
-						if status == false then
-							session:close({
-								condition = "undefined-condition";
-								text = compressed;
-								extra = st.stanza("failure", {xmlns="http://jabber.org/protocol/compress"}):tag("processing-failed");
-							});
-							module:log("warn", compressed);
-							return;
-						end
-						old_send(compressed);
-					end;
+				setup_compression(session, deflate_stream);
 					
 				-- setup decompression for session.data
-				local function setup_decompression(session)
-					local old_data = session.data
-					session.data = function(conn, data)
-							local status, decompressed, eof = pcall(inflate_stream, data);
-							if status == false then
-								session:close({
-									condition = "undefined-condition";
-									text = decompressed;
-									extra = st.stanza("failure", {xmlns="http://jabber.org/protocol/compress"}):tag("processing-failed");
-								});
-								module:log("warn", decompressed);
-								return;
-							end
-							old_data(conn, decompressed);
-						end;
-				end
-				setup_decompression(session);
+				setup_decompression(session, inflate_stream);
 				
 				local session_reset_stream = session.reset_stream;
 				session.reset_stream = function(session)
 						session_reset_stream(session);
-						setup_decompression(session);
+						setup_decompression(session, inflate_stream);
 						return true;
 					end;
 				session.compressed = true;
 			else
 				session.log("info", method.." compression selected. But we don't support it.");
 				local error_st = st.stanza("failure", {xmlns=xmlns_compression_protocol}):tag("unsupported-method");
-				session.send(error_st);
+				(session.sends2s or session.send)(error_st);
 			end
 		end
 );
+

mercurial