Thu, 03 Dec 2020 17:05:27 +0000
Initial commit
-- Admin credentials local jid, password = "user@example.com", "secret""; -- Domain that users are on local domain = "example.com"; -- Number of connections to the XMPP server local max_workers = 20; local input_filename = assert(arg[1], "No user file specified"); local verse = require "verse".init("client"); local async = require "util.async"; local promise = require "util.promise"; local function fs_encode(s) return (s:gsub("%W", function (c) return ("%02x"):format(c:byte()); end)); end local function get_user_pep_nodes(c, username) return promise.new(function (resolve, reject) local request = verse.iq({ type = "get", to = username.."@"..domain }) :tag("query", { xmlns = "http://jabber.org/protocol/disco#items" }); c:send_iq(request, function (response) if response.attr.type == "error" then return reject(response); end local nodes = {}; for item in response.tags[1]:childtags("item") do table.insert(nodes, item.attr.node); end resolve(nodes); end); end); end local function get_user_pep_data(c, username, node) return promise.new(function (resolve, reject) local request = verse.iq({ type = "get", to = username.."@"..domain }) :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" }) :tag("items", { node = node }); c:send_iq(request, function (response) if response.attr.type == "error" then return reject(response); end resolve(response :get_child("pubsub", "http://jabber.org/protocol/pubsub") :get_child("items") ); end); end); end local function connect_clients(n_clients) local client_promises = {}; for i = 1, n_clients do local c = verse.new(); client_promises[i] = promise.new(function (resolve, reject) c:hook("ready", function () resolve(c); end); c:hook("disconnected", reject); c:connect_client(jid, password); end); end return promise.all(client_promises); end local function run_worker(client, users_file) return promise.new(function (resolve, reject) local processed = 0; local line = users_file:read("*l"); if not line then resolve(processed); end client:onready(function (self) processed = processed + 1; local new_line = users_file:read("*l"); if new_line then self:run(new_line); else resolve(processed); return; end end) :onerror(reject) :run(line); end); end connect_clients(max_workers):next(function (connections) local clients = {}; for i, conn_ in ipairs(connections) do local conn = conn_; clients[i] = async.runner(function (username) --print("Getting nodes for "..username.."..."); local nodes, err = async.wait_for(get_user_pep_nodes(conn, username)); if not nodes then print("Error:", err); return; end local f = io.open(fs_encode(username)..".xml", "w+"); f:write("<pep>"); for _, node_name in ipairs(nodes) do local data = async.wait_for(get_user_pep_data(conn, username, node_name)); f:write(tostring(data)); end f:write("</pep>"); f:close(); end); end return clients; end):next(function (clients) local users_file = assert(io.open(input_filename)); local workers = {}; for i = 1, #clients do workers[i] = run_worker(clients[i], users_file); end promise.all(workers):next(function (status) local total = 0; for i, processed in ipairs(status) do print(("Worker %d processed %d users"):format(i, processed)); total = total + processed; end print(("Total: %d users processed"):format(total)); verse.quit(); end, function (err) print("Fatal error:", err); os.exit(1); end); end):catch(function (err) print("Initialization error:", err); os.exit(1); end); print("Starting loop...") local start_time = os.time(); verse.loop() print(("All done in %d seconds!"):format(os.time()-start_time));