main.lua

changeset 0
550f506de75a
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/main.lua	Thu Dec 03 17:05:27 2020 +0000
@@ -0,0 +1,141 @@
+-- Admin credentials
+local jid, password = "user@example.com", "secret"";
+
+-- Domain that users are on
+local domain = "example.com";
+
+-- Number of connections to the XMPP server
+local max_workers = 20;
+
+local input_filename = assert(arg[1], "No user file specified");
+
+local verse = require "verse".init("client");
+local async = require "util.async";
+local promise = require "util.promise";
+
+local function fs_encode(s)
+	return (s:gsub("%W", function (c)
+		return ("%02x"):format(c:byte());
+	end));
+end
+
+local function get_user_pep_nodes(c, username)
+	return promise.new(function (resolve, reject)
+		local request = verse.iq({ type = "get", to = username.."@"..domain })
+			:tag("query", { xmlns = "http://jabber.org/protocol/disco#items" });
+		c:send_iq(request, function (response)
+			if response.attr.type == "error" then
+				return reject(response);
+			end
+			local nodes = {};
+			for item in response.tags[1]:childtags("item") do
+				table.insert(nodes, item.attr.node);
+			end
+			resolve(nodes);
+		end);
+	end);
+end
+
+local function get_user_pep_data(c, username, node)
+	return promise.new(function (resolve, reject)
+		local request = verse.iq({ type = "get", to = username.."@"..domain })
+			:tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" })
+				:tag("items", { node = node });
+		c:send_iq(request, function (response)
+			if response.attr.type == "error" then
+				return reject(response);
+			end
+			resolve(response
+				:get_child("pubsub", "http://jabber.org/protocol/pubsub")
+					:get_child("items")
+			);
+		end);
+	end);
+end
+
+
+local function connect_clients(n_clients)
+	local client_promises = {};
+	for i = 1, n_clients do
+		local c = verse.new();
+		client_promises[i] = promise.new(function (resolve, reject)
+			c:hook("ready", function ()
+				resolve(c);
+			end);
+			c:hook("disconnected", reject);
+			c:connect_client(jid, password);
+		end);
+	end
+	return promise.all(client_promises);
+end
+
+local function run_worker(client, users_file)
+	return promise.new(function (resolve, reject)
+		local processed = 0;
+		local line = users_file:read("*l");
+		if not line then resolve(processed); end
+		client:onready(function (self)
+				processed = processed + 1;
+				local new_line = users_file:read("*l");
+				if new_line then
+					self:run(new_line);
+				else
+					resolve(processed);
+					return;
+				end
+			end)
+			:onerror(reject)
+			:run(line);
+	end);
+end
+connect_clients(max_workers):next(function (connections)
+	local clients = {};
+	for i, conn_ in ipairs(connections) do
+		local conn = conn_;
+		clients[i] = async.runner(function (username)
+			--print("Getting nodes for "..username.."...");
+			local nodes, err = async.wait_for(get_user_pep_nodes(conn, username));
+			if not nodes then
+				print("Error:", err);
+				return;
+			end
+			local f = io.open(fs_encode(username)..".xml", "w+");
+			f:write("<pep>");
+			for _, node_name in ipairs(nodes) do
+				local data = async.wait_for(get_user_pep_data(conn, username, node_name));
+				f:write(tostring(data));
+			end
+			f:write("</pep>");
+			f:close();
+		end);
+	end
+	return clients;
+end):next(function (clients)
+	local users_file = assert(io.open(input_filename));
+
+	local workers = {};
+	for i = 1, #clients do
+		workers[i] = run_worker(clients[i], users_file);
+	end
+	promise.all(workers):next(function (status)
+		local total = 0;
+		for i, processed in ipairs(status) do
+			print(("Worker %d processed %d users"):format(i, processed));
+			total = total + processed;
+		end
+		print(("Total: %d users processed"):format(total));
+		verse.quit();
+	end, function (err)
+		print("Fatal error:", err);
+		os.exit(1);
+	end);
+
+end):catch(function (err)
+	print("Initialization error:", err);
+	os.exit(1);
+end);
+
+print("Starting loop...")
+local start_time = os.time();
+verse.loop()
+print(("All done in %d seconds!"):format(os.time()-start_time));

mercurial