scansion/async.lua

changeset 18
5913cc9d1b5b
parent 5
6a95814d66ce
equal deleted inserted replaced
17:610917f3ea97 18:5913cc9d1b5b
1 --local log = require "util.logger".init("util.async"); 1 local log = require "verse".new_logger("util.async");
2 2
3 local function runner_continue(thread) 3 local function runner_continue(thread)
4 -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure) 4 -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure)
5 if coroutine.status(thread) ~= "suspended" then -- This should suffice 5 if coroutine.status(thread) ~= "suspended" then -- This should suffice
6 return false; 6 return false;
14 if error_handler then error_handler(runner, debug.traceback(thread, state)); end 14 if error_handler then error_handler(runner, debug.traceback(thread, state)); end
15 elseif state == "ready" then 15 elseif state == "ready" then
16 -- If state is 'ready', it is our responsibility to update runner.state from 'waiting'. 16 -- If state is 'ready', it is our responsibility to update runner.state from 'waiting'.
17 -- We also have to :run(), because the queue might have further items that will not be 17 -- We also have to :run(), because the queue might have further items that will not be
18 -- processed otherwise. FIXME: It's probably best to do this in a nexttick (0 timer). 18 -- processed otherwise. FIXME: It's probably best to do this in a nexttick (0 timer).
19 log("debug", "Runner is ready (finished) - restarting", debug.traceback());
19 runner.state = "ready"; 20 runner.state = "ready";
20 runner:run(); 21 runner:run();
21 end 22 end
22 return true; 23 return true;
23 end 24 end
52 end 53 end
53 local guard = guards[id]; 54 local guard = guards[id];
54 if not guard then 55 if not guard then
55 guard = {}; 56 guard = {};
56 guards[id] = guard; 57 guards[id] = guard;
57 --log("debug", "New guard!"); 58 log("debug", "New guard!");
58 else 59 else
59 table.insert(guard, thread); 60 table.insert(guard, thread);
60 --log("debug", "Guarded. %d threads waiting.", #guard) 61 log("debug", "Guarded. %d threads waiting.", #guard)
61 coroutine.yield("wait"); 62 coroutine.yield("wait");
62 end 63 end
63 local function exit() 64 local function exit()
64 local next_waiting = table.remove(guard, 1); 65 local next_waiting = table.remove(guard, 1);
65 if next_waiting then 66 if next_waiting then
66 --log("debug", "guard: Executing next waiting thread (%d left)", #guard) 67 log("debug", "guard: Executing next waiting thread (%d left)", #guard)
67 runner_continue(next_waiting); 68 runner_continue(next_waiting);
68 else 69 else
69 --log("debug", "Guard off duty.") 70 log("debug", "Guard off duty.")
70 guards[id] = nil; 71 guards[id] = nil;
71 end 72 end
72 end 73 end
73 if func then 74 if func then
74 func(); 75 func();
98 queue = {}, watchers = watchers or empty_watchers, data = data } 99 queue = {}, watchers = watchers or empty_watchers, data = data }
99 , runner_mt); 100 , runner_mt);
100 end 101 end
101 102
102 function runner_mt:run(input) 103 function runner_mt:run(input)
104 log("debug", "RUNNING", debug.traceback())
103 if input ~= nil then 105 if input ~= nil then
104 table.insert(self.queue, input); 106 table.insert(self.queue, input);
105 end 107 end
106 if self.state ~= "ready" then 108 if self.state ~= "ready" then
107 return true, self.state, #self.queue; 109 return true, self.state, #self.queue;

mercurial