diff -r 000000000000 -r 550f506de75a main.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/main.lua Thu Dec 03 17:05:27 2020 +0000 @@ -0,0 +1,141 @@ +-- Admin credentials +local jid, password = "user@example.com", "secret""; + +-- Domain that users are on +local domain = "example.com"; + +-- Number of connections to the XMPP server +local max_workers = 20; + +local input_filename = assert(arg[1], "No user file specified"); + +local verse = require "verse".init("client"); +local async = require "util.async"; +local promise = require "util.promise"; + +local function fs_encode(s) + return (s:gsub("%W", function (c) + return ("%02x"):format(c:byte()); + end)); +end + +local function get_user_pep_nodes(c, username) + return promise.new(function (resolve, reject) + local request = verse.iq({ type = "get", to = username.."@"..domain }) + :tag("query", { xmlns = "http://jabber.org/protocol/disco#items" }); + c:send_iq(request, function (response) + if response.attr.type == "error" then + return reject(response); + end + local nodes = {}; + for item in response.tags[1]:childtags("item") do + table.insert(nodes, item.attr.node); + end + resolve(nodes); + end); + end); +end + +local function get_user_pep_data(c, username, node) + return promise.new(function (resolve, reject) + local request = verse.iq({ type = "get", to = username.."@"..domain }) + :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" }) + :tag("items", { node = node }); + c:send_iq(request, function (response) + if response.attr.type == "error" then + return reject(response); + end + resolve(response + :get_child("pubsub", "http://jabber.org/protocol/pubsub") + :get_child("items") + ); + end); + end); +end + + +local function connect_clients(n_clients) + local client_promises = {}; + for i = 1, n_clients do + local c = verse.new(); + client_promises[i] = promise.new(function (resolve, reject) + c:hook("ready", function () + resolve(c); + end); + c:hook("disconnected", reject); + c:connect_client(jid, password); + end); + end + return promise.all(client_promises); +end + +local function run_worker(client, users_file) + return promise.new(function (resolve, reject) + local processed = 0; + local line = users_file:read("*l"); + if not line then resolve(processed); end + client:onready(function (self) + processed = processed + 1; + local new_line = users_file:read("*l"); + if new_line then + self:run(new_line); + else + resolve(processed); + return; + end + end) + :onerror(reject) + :run(line); + end); +end +connect_clients(max_workers):next(function (connections) + local clients = {}; + for i, conn_ in ipairs(connections) do + local conn = conn_; + clients[i] = async.runner(function (username) + --print("Getting nodes for "..username.."..."); + local nodes, err = async.wait_for(get_user_pep_nodes(conn, username)); + if not nodes then + print("Error:", err); + return; + end + local f = io.open(fs_encode(username)..".xml", "w+"); + f:write(""); + for _, node_name in ipairs(nodes) do + local data = async.wait_for(get_user_pep_data(conn, username, node_name)); + f:write(tostring(data)); + end + f:write(""); + f:close(); + end); + end + return clients; +end):next(function (clients) + local users_file = assert(io.open(input_filename)); + + local workers = {}; + for i = 1, #clients do + workers[i] = run_worker(clients[i], users_file); + end + promise.all(workers):next(function (status) + local total = 0; + for i, processed in ipairs(status) do + print(("Worker %d processed %d users"):format(i, processed)); + total = total + processed; + end + print(("Total: %d users processed"):format(total)); + verse.quit(); + end, function (err) + print("Fatal error:", err); + os.exit(1); + end); + +end):catch(function (err) + print("Initialization error:", err); + os.exit(1); +end); + +print("Starting loop...") +local start_time = os.time(); +verse.loop() +print(("All done in %d seconds!"):format(os.time()-start_time));