1 --local log = require "util.logger".init("util.async"); |
1 local log = require "verse".new_logger("util.async"); |
2 |
2 |
3 local function runner_continue(thread) |
3 local function runner_continue(thread) |
4 -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure) |
4 -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure) |
5 if coroutine.status(thread) ~= "suspended" then -- This should suffice |
5 if coroutine.status(thread) ~= "suspended" then -- This should suffice |
6 return false; |
6 return false; |
14 if error_handler then error_handler(runner, debug.traceback(thread, state)); end |
14 if error_handler then error_handler(runner, debug.traceback(thread, state)); end |
15 elseif state == "ready" then |
15 elseif state == "ready" then |
16 -- If state is 'ready', it is our responsibility to update runner.state from 'waiting'. |
16 -- If state is 'ready', it is our responsibility to update runner.state from 'waiting'. |
17 -- We also have to :run(), because the queue might have further items that will not be |
17 -- We also have to :run(), because the queue might have further items that will not be |
18 -- processed otherwise. FIXME: It's probably best to do this in a nexttick (0 timer). |
18 -- processed otherwise. FIXME: It's probably best to do this in a nexttick (0 timer). |
|
19 log("debug", "Runner is ready (finished) - restarting", debug.traceback()); |
19 runner.state = "ready"; |
20 runner.state = "ready"; |
20 runner:run(); |
21 runner:run(); |
21 end |
22 end |
22 return true; |
23 return true; |
23 end |
24 end |
52 end |
53 end |
53 local guard = guards[id]; |
54 local guard = guards[id]; |
54 if not guard then |
55 if not guard then |
55 guard = {}; |
56 guard = {}; |
56 guards[id] = guard; |
57 guards[id] = guard; |
57 --log("debug", "New guard!"); |
58 log("debug", "New guard!"); |
58 else |
59 else |
59 table.insert(guard, thread); |
60 table.insert(guard, thread); |
60 --log("debug", "Guarded. %d threads waiting.", #guard) |
61 log("debug", "Guarded. %d threads waiting.", #guard) |
61 coroutine.yield("wait"); |
62 coroutine.yield("wait"); |
62 end |
63 end |
63 local function exit() |
64 local function exit() |
64 local next_waiting = table.remove(guard, 1); |
65 local next_waiting = table.remove(guard, 1); |
65 if next_waiting then |
66 if next_waiting then |
66 --log("debug", "guard: Executing next waiting thread (%d left)", #guard) |
67 log("debug", "guard: Executing next waiting thread (%d left)", #guard) |
67 runner_continue(next_waiting); |
68 runner_continue(next_waiting); |
68 else |
69 else |
69 --log("debug", "Guard off duty.") |
70 log("debug", "Guard off duty.") |
70 guards[id] = nil; |
71 guards[id] = nil; |
71 end |
72 end |
72 end |
73 end |
73 if func then |
74 if func then |
74 func(); |
75 func(); |
98 queue = {}, watchers = watchers or empty_watchers, data = data } |
99 queue = {}, watchers = watchers or empty_watchers, data = data } |
99 , runner_mt); |
100 , runner_mt); |
100 end |
101 end |
101 |
102 |
102 function runner_mt:run(input) |
103 function runner_mt:run(input) |
|
104 log("debug", "RUNNING", debug.traceback()) |
103 if input ~= nil then |
105 if input ~= nil then |
104 table.insert(self.queue, input); |
106 table.insert(self.queue, input); |
105 end |
107 end |
106 if self.state ~= "ready" then |
108 if self.state ~= "ready" then |
107 return true, self.state, #self.queue; |
109 return true, self.state, #self.queue; |