main.lua

changeset 0
550f506de75a
equal deleted inserted replaced
-1:000000000000 0:550f506de75a
1 -- Admin credentials
2 local jid, password = "user@example.com", "secret"";
3
4 -- Domain that users are on
5 local domain = "example.com";
6
7 -- Number of connections to the XMPP server
8 local max_workers = 20;
9
10 local input_filename = assert(arg[1], "No user file specified");
11
12 local verse = require "verse".init("client");
13 local async = require "util.async";
14 local promise = require "util.promise";
15
16 local function fs_encode(s)
17 return (s:gsub("%W", function (c)
18 return ("%02x"):format(c:byte());
19 end));
20 end
21
22 local function get_user_pep_nodes(c, username)
23 return promise.new(function (resolve, reject)
24 local request = verse.iq({ type = "get", to = username.."@"..domain })
25 :tag("query", { xmlns = "http://jabber.org/protocol/disco#items" });
26 c:send_iq(request, function (response)
27 if response.attr.type == "error" then
28 return reject(response);
29 end
30 local nodes = {};
31 for item in response.tags[1]:childtags("item") do
32 table.insert(nodes, item.attr.node);
33 end
34 resolve(nodes);
35 end);
36 end);
37 end
38
39 local function get_user_pep_data(c, username, node)
40 return promise.new(function (resolve, reject)
41 local request = verse.iq({ type = "get", to = username.."@"..domain })
42 :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" })
43 :tag("items", { node = node });
44 c:send_iq(request, function (response)
45 if response.attr.type == "error" then
46 return reject(response);
47 end
48 resolve(response
49 :get_child("pubsub", "http://jabber.org/protocol/pubsub")
50 :get_child("items")
51 );
52 end);
53 end);
54 end
55
56
57 local function connect_clients(n_clients)
58 local client_promises = {};
59 for i = 1, n_clients do
60 local c = verse.new();
61 client_promises[i] = promise.new(function (resolve, reject)
62 c:hook("ready", function ()
63 resolve(c);
64 end);
65 c:hook("disconnected", reject);
66 c:connect_client(jid, password);
67 end);
68 end
69 return promise.all(client_promises);
70 end
71
72 local function run_worker(client, users_file)
73 return promise.new(function (resolve, reject)
74 local processed = 0;
75 local line = users_file:read("*l");
76 if not line then resolve(processed); end
77 client:onready(function (self)
78 processed = processed + 1;
79 local new_line = users_file:read("*l");
80 if new_line then
81 self:run(new_line);
82 else
83 resolve(processed);
84 return;
85 end
86 end)
87 :onerror(reject)
88 :run(line);
89 end);
90 end
91 connect_clients(max_workers):next(function (connections)
92 local clients = {};
93 for i, conn_ in ipairs(connections) do
94 local conn = conn_;
95 clients[i] = async.runner(function (username)
96 --print("Getting nodes for "..username.."...");
97 local nodes, err = async.wait_for(get_user_pep_nodes(conn, username));
98 if not nodes then
99 print("Error:", err);
100 return;
101 end
102 local f = io.open(fs_encode(username)..".xml", "w+");
103 f:write("<pep>");
104 for _, node_name in ipairs(nodes) do
105 local data = async.wait_for(get_user_pep_data(conn, username, node_name));
106 f:write(tostring(data));
107 end
108 f:write("</pep>");
109 f:close();
110 end);
111 end
112 return clients;
113 end):next(function (clients)
114 local users_file = assert(io.open(input_filename));
115
116 local workers = {};
117 for i = 1, #clients do
118 workers[i] = run_worker(clients[i], users_file);
119 end
120 promise.all(workers):next(function (status)
121 local total = 0;
122 for i, processed in ipairs(status) do
123 print(("Worker %d processed %d users"):format(i, processed));
124 total = total + processed;
125 end
126 print(("Total: %d users processed"):format(total));
127 verse.quit();
128 end, function (err)
129 print("Fatal error:", err);
130 os.exit(1);
131 end);
132
133 end):catch(function (err)
134 print("Initialization error:", err);
135 os.exit(1);
136 end);
137
138 print("Starting loop...")
139 local start_time = os.time();
140 verse.loop()
141 print(("All done in %d seconds!"):format(os.time()-start_time));

mercurial