util/async.lua

Thu, 03 Dec 2020 17:05:27 +0000

author
Matthew Wild <mwild1@gmail.com>
date
Thu, 03 Dec 2020 17:05:27 +0000
changeset 0
550f506de75a
permissions
-rw-r--r--

Initial commit

0
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
1 local logger = require "util.logger";
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
2 local log = logger.init("util.async");
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
3 local new_id = require "util.id".short;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
4 local xpcall = require "util.xpcall".xpcall;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
5
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
6 local function checkthread()
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
7 local thread, main = coroutine.running();
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
8 if not thread or main then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
9 error("Not running in an async context, see https://prosody.im/doc/developers/util/async");
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
10 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
11 return thread;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
12 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
13
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
14 local function runner_from_thread(thread)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
15 local level = 0;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
16 -- Find the 'level' of the top-most function (0 == current level, 1 == caller, ...)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
17 while debug.getinfo(thread, level, "") do level = level + 1; end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
18 local name, runner = debug.getlocal(thread, level-1, 1);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
19 if name ~= "self" or type(runner) ~= "table" or runner.thread ~= thread then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
20 return nil;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
21 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
22 return runner;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
23 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
24
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
25 local function call_watcher(runner, watcher_name, ...)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
26 local watcher = runner.watchers[watcher_name];
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
27 if not watcher then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
28 return false;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
29 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
30 runner:log("debug", "Calling '%s' watcher", watcher_name);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
31 local ok, err = xpcall(watcher, debug.traceback, runner, ...);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
32 if not ok then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
33 runner:log("error", "Error in '%s' watcher: %s", watcher_name, err);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
34 return nil, err;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
35 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
36 return true;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
37 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
38
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
39 local function runner_continue(thread)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
40 -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
41 if coroutine.status(thread) ~= "suspended" then -- This should suffice
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
42 log("error", "unexpected async state: thread not suspended");
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
43 return false;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
44 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
45 local ok, state, runner = coroutine.resume(thread);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
46 if not ok then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
47 local err = state;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
48 -- Running the coroutine failed, which means we have to find the runner manually,
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
49 -- in order to inform the error handler
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
50 runner = runner_from_thread(thread);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
51 if not runner then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
52 log("error", "unexpected async state: unable to locate runner during error handling");
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
53 return false;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
54 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
55 call_watcher(runner, "error", debug.traceback(thread, err));
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
56 runner.state = "ready";
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
57 return runner:run();
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
58 elseif state == "ready" then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
59 -- If state is 'ready', it is our responsibility to update runner.state from 'waiting'.
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
60 -- We also have to :run(), because the queue might have further items that will not be
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
61 -- processed otherwise. FIXME: It's probably best to do this in a nexttick (0 timer).
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
62 runner.state = "ready";
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
63 runner:run();
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
64 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
65 return true;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
66 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
67
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
68 local function waiter(num)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
69 local thread = checkthread();
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
70 num = num or 1;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
71 local waiting;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
72 return function ()
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
73 if num == 0 then return; end -- already done
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
74 waiting = true;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
75 coroutine.yield("wait");
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
76 end, function ()
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
77 num = num - 1;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
78 if num == 0 and waiting then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
79 runner_continue(thread);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
80 elseif num < 0 then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
81 error("done() called too many times");
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
82 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
83 end;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
84 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
85
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
86 local function guarder()
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
87 local guards = {};
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
88 local default_id = {};
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
89 return function (id, func)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
90 id = id or default_id;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
91 local thread = checkthread();
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
92 local guard = guards[id];
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
93 if not guard then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
94 guard = {};
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
95 guards[id] = guard;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
96 log("debug", "New guard!");
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
97 else
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
98 table.insert(guard, thread);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
99 log("debug", "Guarded. %d threads waiting.", #guard)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
100 coroutine.yield("wait");
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
101 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
102 local function exit()
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
103 local next_waiting = table.remove(guard, 1);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
104 if next_waiting then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
105 log("debug", "guard: Executing next waiting thread (%d left)", #guard)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
106 runner_continue(next_waiting);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
107 else
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
108 log("debug", "Guard off duty.")
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
109 guards[id] = nil;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
110 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
111 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
112 if func then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
113 func();
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
114 exit();
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
115 return;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
116 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
117 return exit;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
118 end;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
119 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
120
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
121 local runner_mt = {};
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
122 runner_mt.__index = runner_mt;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
123
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
124 local function runner_create_thread(func, self)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
125 local thread = coroutine.create(function (self) -- luacheck: ignore 432/self
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
126 while true do
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
127 func(coroutine.yield("ready", self));
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
128 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
129 end);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
130 debug.sethook(thread, debug.gethook());
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
131 assert(coroutine.resume(thread, self)); -- Start it up, it will return instantly to wait for the first input
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
132 return thread;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
133 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
134
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
135 local function default_error_watcher(runner, err)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
136 runner:log("error", "Encountered error: %s", err);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
137 error(err);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
138 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
139 local function default_func(f) f(); end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
140 local function runner(func, watchers, data)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
141 local id = new_id();
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
142 local _log = logger.init("runner" .. id);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
143 return setmetatable({ func = func or default_func, thread = false, state = "ready", notified_state = "ready",
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
144 queue = {}, watchers = watchers or { error = default_error_watcher }, data = data, id = id, _log = _log; }
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
145 , runner_mt);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
146 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
147
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
148 -- Add a task item for the runner to process
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
149 function runner_mt:run(input)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
150 if input ~= nil then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
151 table.insert(self.queue, input);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
152 --self:log("debug", "queued new work item, %d items queued", #self.queue);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
153 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
154 if self.state ~= "ready" then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
155 -- The runner is busy. Indicate that the task item has been
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
156 -- queued, and return information about the current runner state
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
157 return true, self.state, #self.queue;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
158 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
159
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
160 local q, thread = self.queue, self.thread;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
161 if not thread or coroutine.status(thread) == "dead" then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
162 --luacheck: ignore 143/coroutine
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
163 if thread and coroutine.close then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
164 coroutine.close(thread);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
165 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
166 self:log("debug", "creating new coroutine");
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
167 -- Create a new coroutine for this runner
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
168 thread = runner_create_thread(self.func, self);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
169 self.thread = thread;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
170 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
171
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
172 -- Process task item(s) while the queue is not empty, and we're not blocked
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
173 local n, state, err = #q, self.state, nil;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
174 self.state = "running";
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
175 --self:log("debug", "running main loop");
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
176 while n > 0 and state == "ready" and not err do
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
177 local consumed;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
178 -- Loop through queue items, and attempt to run them
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
179 for i = 1,n do
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
180 local queued_input = q[i];
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
181 local ok, new_state = coroutine.resume(thread, queued_input);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
182 if not ok then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
183 -- There was an error running the coroutine, save the error, mark runner as ready to begin again
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
184 consumed, state, err = i, "ready", debug.traceback(thread, new_state);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
185 self.thread = nil;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
186 break;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
187 elseif new_state == "wait" then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
188 -- Runner is blocked on waiting for a task item to complete
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
189 consumed, state = i, "waiting";
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
190 break;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
191 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
192 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
193 -- Loop ended - either queue empty because all tasks passed without blocking (consumed == nil)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
194 -- or runner is blocked/errored, and consumed will contain the number of tasks processed so far
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
195 if not consumed then consumed = n; end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
196 -- Remove consumed items from the queue array
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
197 if q[n+1] ~= nil then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
198 n = #q;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
199 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
200 for i = 1, n do
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
201 q[i] = q[consumed+i];
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
202 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
203 n = #q;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
204 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
205 -- Runner processed all items it can, so save current runner state
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
206 self.state = state;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
207 if err or state ~= self.notified_state then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
208 self:log("debug", "changed state from %s to %s", self.notified_state, err and ("error ("..state..")") or state);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
209 if err then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
210 state = "error"
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
211 else
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
212 self.notified_state = state;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
213 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
214 local handler = self.watchers[state];
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
215 if handler then handler(self, err); end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
216 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
217 if n > 0 then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
218 return self:run();
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
219 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
220 return true, state, n;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
221 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
222
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
223 -- Add a task item to the queue without invoking the runner, even if it is idle
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
224 function runner_mt:enqueue(input)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
225 table.insert(self.queue, input);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
226 self:log("debug", "queued new work item, %d items queued", #self.queue);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
227 return self;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
228 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
229
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
230 function runner_mt:log(level, fmt, ...)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
231 return self._log(level, fmt, ...);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
232 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
233
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
234 function runner_mt:onready(f)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
235 self.watchers.ready = f;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
236 return self;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
237 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
238
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
239 function runner_mt:onwaiting(f)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
240 self.watchers.waiting = f;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
241 return self;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
242 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
243
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
244 function runner_mt:onerror(f)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
245 self.watchers.error = f;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
246 return self;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
247 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
248
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
249 local function ready()
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
250 return pcall(checkthread);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
251 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
252
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
253 local function wait_for(promise)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
254 local async_wait, async_done = waiter();
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
255 local ret, err = nil, nil;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
256 promise:next(
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
257 function (r) ret = r; end,
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
258 function (e) err = e; end)
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
259 :finally(async_done);
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
260 async_wait();
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
261 if ret then
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
262 return ret;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
263 else
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
264 return nil, err;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
265 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
266 end
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
267
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
268 return {
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
269 ready = ready;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
270 waiter = waiter;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
271 guarder = guarder;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
272 runner = runner;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
273 wait = wait_for; -- COMPAT w/trunk pre-0.12
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
274 wait_for = wait_for;
550f506de75a Initial commit
Matthew Wild <mwild1@gmail.com>
parents:
diff changeset
275 };

mercurial