|
1 -- Admin credentials |
|
2 local jid, password = "user@example.com", "secret""; |
|
3 |
|
4 -- Domain that users are on |
|
5 local domain = "example.com"; |
|
6 |
|
7 -- Number of connections to the XMPP server |
|
8 local max_workers = 20; |
|
9 |
|
10 local input_filename = assert(arg[1], "No user file specified"); |
|
11 |
|
12 local verse = require "verse".init("client"); |
|
13 local async = require "util.async"; |
|
14 local promise = require "util.promise"; |
|
15 |
|
16 local function fs_encode(s) |
|
17 return (s:gsub("%W", function (c) |
|
18 return ("%02x"):format(c:byte()); |
|
19 end)); |
|
20 end |
|
21 |
|
22 local function get_user_pep_nodes(c, username) |
|
23 return promise.new(function (resolve, reject) |
|
24 local request = verse.iq({ type = "get", to = username.."@"..domain }) |
|
25 :tag("query", { xmlns = "http://jabber.org/protocol/disco#items" }); |
|
26 c:send_iq(request, function (response) |
|
27 if response.attr.type == "error" then |
|
28 return reject(response); |
|
29 end |
|
30 local nodes = {}; |
|
31 for item in response.tags[1]:childtags("item") do |
|
32 table.insert(nodes, item.attr.node); |
|
33 end |
|
34 resolve(nodes); |
|
35 end); |
|
36 end); |
|
37 end |
|
38 |
|
39 local function get_user_pep_data(c, username, node) |
|
40 return promise.new(function (resolve, reject) |
|
41 local request = verse.iq({ type = "get", to = username.."@"..domain }) |
|
42 :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" }) |
|
43 :tag("items", { node = node }); |
|
44 c:send_iq(request, function (response) |
|
45 if response.attr.type == "error" then |
|
46 return reject(response); |
|
47 end |
|
48 resolve(response |
|
49 :get_child("pubsub", "http://jabber.org/protocol/pubsub") |
|
50 :get_child("items") |
|
51 ); |
|
52 end); |
|
53 end); |
|
54 end |
|
55 |
|
56 |
|
57 local function connect_clients(n_clients) |
|
58 local client_promises = {}; |
|
59 for i = 1, n_clients do |
|
60 local c = verse.new(); |
|
61 client_promises[i] = promise.new(function (resolve, reject) |
|
62 c:hook("ready", function () |
|
63 resolve(c); |
|
64 end); |
|
65 c:hook("disconnected", reject); |
|
66 c:connect_client(jid, password); |
|
67 end); |
|
68 end |
|
69 return promise.all(client_promises); |
|
70 end |
|
71 |
|
72 local function run_worker(client, users_file) |
|
73 return promise.new(function (resolve, reject) |
|
74 local processed = 0; |
|
75 local line = users_file:read("*l"); |
|
76 if not line then resolve(processed); end |
|
77 client:onready(function (self) |
|
78 processed = processed + 1; |
|
79 local new_line = users_file:read("*l"); |
|
80 if new_line then |
|
81 self:run(new_line); |
|
82 else |
|
83 resolve(processed); |
|
84 return; |
|
85 end |
|
86 end) |
|
87 :onerror(reject) |
|
88 :run(line); |
|
89 end); |
|
90 end |
|
91 connect_clients(max_workers):next(function (connections) |
|
92 local clients = {}; |
|
93 for i, conn_ in ipairs(connections) do |
|
94 local conn = conn_; |
|
95 clients[i] = async.runner(function (username) |
|
96 --print("Getting nodes for "..username.."..."); |
|
97 local nodes, err = async.wait_for(get_user_pep_nodes(conn, username)); |
|
98 if not nodes then |
|
99 print("Error:", err); |
|
100 return; |
|
101 end |
|
102 local f = io.open(fs_encode(username)..".xml", "w+"); |
|
103 f:write("<pep>"); |
|
104 for _, node_name in ipairs(nodes) do |
|
105 local data = async.wait_for(get_user_pep_data(conn, username, node_name)); |
|
106 f:write(tostring(data)); |
|
107 end |
|
108 f:write("</pep>"); |
|
109 f:close(); |
|
110 end); |
|
111 end |
|
112 return clients; |
|
113 end):next(function (clients) |
|
114 local users_file = assert(io.open(input_filename)); |
|
115 |
|
116 local workers = {}; |
|
117 for i = 1, #clients do |
|
118 workers[i] = run_worker(clients[i], users_file); |
|
119 end |
|
120 promise.all(workers):next(function (status) |
|
121 local total = 0; |
|
122 for i, processed in ipairs(status) do |
|
123 print(("Worker %d processed %d users"):format(i, processed)); |
|
124 total = total + processed; |
|
125 end |
|
126 print(("Total: %d users processed"):format(total)); |
|
127 verse.quit(); |
|
128 end, function (err) |
|
129 print("Fatal error:", err); |
|
130 os.exit(1); |
|
131 end); |
|
132 |
|
133 end):catch(function (err) |
|
134 print("Initialization error:", err); |
|
135 os.exit(1); |
|
136 end); |
|
137 |
|
138 print("Starting loop...") |
|
139 local start_time = os.time(); |
|
140 verse.loop() |
|
141 print(("All done in %d seconds!"):format(os.time()-start_time)); |