Thu, 03 Dec 2020 17:05:27 +0000
Initial commit
0 | 1 | -- Admin credentials |
2 | local jid, password = "user@example.com", "secret""; | |
3 | ||
4 | -- Domain that users are on | |
5 | local domain = "example.com"; | |
6 | ||
7 | -- Number of connections to the XMPP server | |
8 | local max_workers = 20; | |
9 | ||
10 | local input_filename = assert(arg[1], "No user file specified"); | |
11 | ||
12 | local verse = require "verse".init("client"); | |
13 | local async = require "util.async"; | |
14 | local promise = require "util.promise"; | |
15 | ||
16 | local function fs_encode(s) | |
17 | return (s:gsub("%W", function (c) | |
18 | return ("%02x"):format(c:byte()); | |
19 | end)); | |
20 | end | |
21 | ||
22 | local function get_user_pep_nodes(c, username) | |
23 | return promise.new(function (resolve, reject) | |
24 | local request = verse.iq({ type = "get", to = username.."@"..domain }) | |
25 | :tag("query", { xmlns = "http://jabber.org/protocol/disco#items" }); | |
26 | c:send_iq(request, function (response) | |
27 | if response.attr.type == "error" then | |
28 | return reject(response); | |
29 | end | |
30 | local nodes = {}; | |
31 | for item in response.tags[1]:childtags("item") do | |
32 | table.insert(nodes, item.attr.node); | |
33 | end | |
34 | resolve(nodes); | |
35 | end); | |
36 | end); | |
37 | end | |
38 | ||
39 | local function get_user_pep_data(c, username, node) | |
40 | return promise.new(function (resolve, reject) | |
41 | local request = verse.iq({ type = "get", to = username.."@"..domain }) | |
42 | :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" }) | |
43 | :tag("items", { node = node }); | |
44 | c:send_iq(request, function (response) | |
45 | if response.attr.type == "error" then | |
46 | return reject(response); | |
47 | end | |
48 | resolve(response | |
49 | :get_child("pubsub", "http://jabber.org/protocol/pubsub") | |
50 | :get_child("items") | |
51 | ); | |
52 | end); | |
53 | end); | |
54 | end | |
55 | ||
56 | ||
57 | local function connect_clients(n_clients) | |
58 | local client_promises = {}; | |
59 | for i = 1, n_clients do | |
60 | local c = verse.new(); | |
61 | client_promises[i] = promise.new(function (resolve, reject) | |
62 | c:hook("ready", function () | |
63 | resolve(c); | |
64 | end); | |
65 | c:hook("disconnected", reject); | |
66 | c:connect_client(jid, password); | |
67 | end); | |
68 | end | |
69 | return promise.all(client_promises); | |
70 | end | |
71 | ||
72 | local function run_worker(client, users_file) | |
73 | return promise.new(function (resolve, reject) | |
74 | local processed = 0; | |
75 | local line = users_file:read("*l"); | |
76 | if not line then resolve(processed); end | |
77 | client:onready(function (self) | |
78 | processed = processed + 1; | |
79 | local new_line = users_file:read("*l"); | |
80 | if new_line then | |
81 | self:run(new_line); | |
82 | else | |
83 | resolve(processed); | |
84 | return; | |
85 | end | |
86 | end) | |
87 | :onerror(reject) | |
88 | :run(line); | |
89 | end); | |
90 | end | |
91 | connect_clients(max_workers):next(function (connections) | |
92 | local clients = {}; | |
93 | for i, conn_ in ipairs(connections) do | |
94 | local conn = conn_; | |
95 | clients[i] = async.runner(function (username) | |
96 | --print("Getting nodes for "..username.."..."); | |
97 | local nodes, err = async.wait_for(get_user_pep_nodes(conn, username)); | |
98 | if not nodes then | |
99 | print("Error:", err); | |
100 | return; | |
101 | end | |
102 | local f = io.open(fs_encode(username)..".xml", "w+"); | |
103 | f:write("<pep>"); | |
104 | for _, node_name in ipairs(nodes) do | |
105 | local data = async.wait_for(get_user_pep_data(conn, username, node_name)); | |
106 | f:write(tostring(data)); | |
107 | end | |
108 | f:write("</pep>"); | |
109 | f:close(); | |
110 | end); | |
111 | end | |
112 | return clients; | |
113 | end):next(function (clients) | |
114 | local users_file = assert(io.open(input_filename)); | |
115 | ||
116 | local workers = {}; | |
117 | for i = 1, #clients do | |
118 | workers[i] = run_worker(clients[i], users_file); | |
119 | end | |
120 | promise.all(workers):next(function (status) | |
121 | local total = 0; | |
122 | for i, processed in ipairs(status) do | |
123 | print(("Worker %d processed %d users"):format(i, processed)); | |
124 | total = total + processed; | |
125 | end | |
126 | print(("Total: %d users processed"):format(total)); | |
127 | verse.quit(); | |
128 | end, function (err) | |
129 | print("Fatal error:", err); | |
130 | os.exit(1); | |
131 | end); | |
132 | ||
133 | end):catch(function (err) | |
134 | print("Initialization error:", err); | |
135 | os.exit(1); | |
136 | end); | |
137 | ||
138 | print("Starting loop...") | |
139 | local start_time = os.time(); | |
140 | verse.loop() | |
141 | print(("All done in %d seconds!"):format(os.time()-start_time)); |