* Committing what will be version 0.1.2

Sat, 18 Aug 2007 20:57:33 +0000

author
Thomas Harning Jr <harningt@gmail.com>
date
Sat, 18 Aug 2007 20:57:33 +0000
changeset 11
8339f6236a3c
parent 10
88ce07d62597
child 12
a9b590350c03

* Committing what will be version 0.1.2
Main feature:
Callback/coroutine issues resolved as described in COROUTINE_MANAGEMENT

luaevent/CHANGELOG file | annotate | diff | comparison | revisions
luaevent/COROUTINE_MANAGEMENT file | annotate | diff | comparison | revisions
luaevent/include/luaevent.h file | annotate | diff | comparison | revisions
luaevent/luaevent.lua file | annotate | diff | comparison | revisions
luaevent/src/luaevent.c file | annotate | diff | comparison | revisions
luaevent/test/test.lua file | annotate | diff | comparison | revisions
luaevent/test/testClient.lua file | annotate | diff | comparison | revisions
--- a/luaevent/CHANGELOG	Wed Jun 13 04:32:12 2007 +0000
+++ b/luaevent/CHANGELOG	Sat Aug 18 20:57:33 2007 +0000
@@ -1,3 +1,12 @@
+======
+0.1.2 - Revision 15 - 2007-08-18
++ Setup system to use new coro management as described in COROUTINE_MANAGEMENT
+  The callbacks are called from the event_loop 'thread' rather than that which they are
+  created in.  This will prevent the self-resume problem as well as dead-thread problems.
+- Recognized issues to fix in next release:
+  * Socket/event closing needs to be cleaned
+  * luaevent.lua needs refactoring
+  * luaevent.[ch] need to be cleaned up
 ======
 0.1.1 - Revision 14 - 2007-06-13
 + Fixed event-handling code to cancel events on nothing being returned
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/luaevent/COROUTINE_MANAGEMENT	Sat Aug 18 20:57:33 2007 +0000
@@ -0,0 +1,39 @@
+Due to the issue w/ self-resuming threads and crashing out threads, 
+a management system needs to be in place.
+
+Example thread system:
+
+MAIN
+EVENT_LOOP --------running---
+WAITING ON READ
+WAITING ON WRITE
+WAITING ON CONNECT
+
+
+Since main and the other 'waiting' threads are yielded, it is unsafe to call things arbitrarily on them 
+or resume them from themselves...
+However the EVENT_LOOP one is running and thus can execute the callbacks (which can resume the threads)
+Each of the 'waiting' events are attached to an event and contain a pointer, this pointer can be setup to point
+to a per event_base item which will be updated w/ the lua_State of whatever calls EVENT_LOOP...
+this will guarantee that the thread will be resumed from the currently running EVENT_LOOP
+
+
+Other system that's more complicated and less likely:
+
+MAIN
+EVENT_LOOP a -----running---
+
+WAITING ON READ a
+WAITING ON WRITE a
+
+EVENT_LOOP b ----yielded
+WAITING ON READ b
+
+
+Since there can only be one event_loop running per event_base, you do not have to worry about 
+cross-pollination of the different waits...
+
+NOTES:
+If the event_loop thread goes away... then the waiting coroutines will have no way to get back...
+though in this case, they are dead in the water anyways.. until a new event_loop starts...
+in which case the lua_State references has been updated...
\ No newline at end of file
--- a/luaevent/include/luaevent.h	Wed Jun 13 04:32:12 2007 +0000
+++ b/luaevent/include/luaevent.h	Sat Aug 18 20:57:33 2007 +0000
@@ -9,10 +9,14 @@
 #include <event.h>
 
 typedef struct {
+	struct event_base* base;
+	lua_State* loop_L;
+} le_base;
+
+typedef struct {
 	struct event ev;
-	lua_State* L;
+	le_base* base;
 	int callbackRef;
-	int objectRef; /* TEMP */
 } le_callback;
 
 int luaopen_luaevent(lua_State* L);
--- a/luaevent/luaevent.lua	Wed Jun 13 04:32:12 2007 +0000
+++ b/luaevent/luaevent.lua	Sat Aug 18 20:57:33 2007 +0000
@@ -7,9 +7,37 @@
 
 local EV_READ = luaevent.core.EV_READ
 local EV_WRITE = luaevent.core.EV_WRITE
-local fair = false
+local fair = false -- Not recommended for most cases...
+local base = luaevent.core.new()
+local sockMap = setmetatable({}, {'__mode', 'kv'})
+local function addevent(sock, ...)
+	local item = base:addevent(sock, ...)
+	if not item then print("FAILED TO SETUP ITEM") return item end
+	local fd = sock:getfd()
+	sockMap[item] = fd
+	print("SETUP ITEM FOR: ", fd)
+	if not hookedObjectMt then
+		hookedObjectMt = true
+		--[[
+		local mt = debug.getmetatable(item)
+		local oldGC = mt.__gc
+		mt.__gc = function(...)
+			print("RELEASING ITEM FOR: ", sockMap[(...)])
+			return oldGC(...)
+		end]]
+	end
+	return item
+end
+-- Weak keys.. the keys are the client sockets
+local clientTable = setmetatable({}, {'__mode', 'kv'})
 
-local hookedObjectMt = false
+local function getWrapper()
+	local running = coroutine.running()
+	return function(...)
+		if coroutine.running() == running then return end
+		return select(2, coroutine.resume(running, ...))
+	end
+end
 
 function send(sock, data, start, stop)
 	local s, err
@@ -21,9 +49,11 @@
 		-- Add extra coro swap for fairness
 		-- CURRENTLY DISABLED FOR TESTING......
 		if fair and math.random(100) > 90 then
+			if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end
 			coroutine.yield(EV_WRITE)
 		end
 		if s or err ~= "timeout" then return s, err, sent end
+		if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end
 		coroutine.yield(EV_WRITE)
 	until false
 end
@@ -33,6 +63,7 @@
 	repeat
 		s, err, part = sock:receive(pattern, part)
 		if s or err ~= "timeout" then return s, err, part end
+		if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_READ, getWrapper()) end
 		coroutine.yield(EV_READ)
 	until false
 end
@@ -45,6 +76,7 @@
 	s, err, part = client:receive(pattern)
 	if s or ( (type(pattern)=="number") and part~="" and part ~=nil ) or 
 		err ~= "timeout" then return s, err, part end
+		if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_READ, getWrapper()) end
 		coroutine.yield(EV_READ)
 	until false
 end
@@ -52,6 +84,7 @@
 	sock:settimeout(0)
 	local ret, err = sock:connect(...)
 	if ret or err ~= "timeout" then return ret, err end
+	if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end
 	coroutine.yield(EV_WRITE)
 	ret, err = sock:connect(...)
 	if err == "already connected" then
@@ -66,74 +99,59 @@
 	-- Figure out what to do ......
 	return handler(sock)
 end
-
+local function handleClient(co, client, handler)
+	local ok, res, event = coroutine.resume(co, client, handler)
+end
 local function serverCoroutine(sock, callback)
+	local listenItem = addevent(sock, EV_READ, getWrapper())
 	repeat
 		local event = coroutine.yield(EV_READ)
 		-- Get new socket
 		local client = sock:accept()
 		if client then
-			--cl[#cl + 1] = client
 			client:settimeout(0)
-			local coFunc = coroutine.wrap(clientCoroutine)
-			luaevent.core.addevent(client, coFunc, client, callback)
+			local co = coroutine.create(clientCoroutine)
+			handleClient(co, client, callback)
 		end
 	until false
 end
-
-local oldAddEvent = luaevent.core.addevent
-luaevent.core.addevent = function(...)
-	local item = oldAddEvent(...)
-	if not item then print("FAILED TO SETUP ITEM") return item end
-	print("SETUP ITEM FOR: ", debug.getmetatable(item).getfd(item))
-	if not hookedObjectMt then
-		hookedObjectMt = true
-		local mt = debug.getmetatable(item)
-		local oldGC = mt.__gc
-		mt.__gc = function(...)
-			print("RELEASING ITEM FOR: ", mt.getfd(...))
-			return oldGC(...)
-		end
-	end
-	return item
+function addserver(sock, callback)
+	local coro = coroutine.create(serverCoroutine)
+	assert(coroutine.resume(coro, sock, callback))
 end
-
-function addserver(sock, callback)
-	local coFunc = coroutine.wrap(serverCoroutine)
-	luaevent.core.addevent(sock, coFunc, sock, callback)
-end
-function addthread(sock, func, ...)
-	local coFunc = coroutine.wrap(func)
-	luaevent.core.addevent(sock, coFunc, ...)
+function addthread(func, ...)
+	return coroutine.resume(coroutine.create(func), ...)
 end
 local _skt_mt = {__index = {
 	connect = function(self, ...)
 		return connect(self.socket, ...)
 	end,
 	send = function (self, data)
-		return send (self.socket, data)
+		return send(self.socket, data)
 	end,
 	
 	receive = function (self, pattern)
 		if (self.timeout==0) then
   			return receivePartial(self.socket, pattern)
   		end
-		return receive (self.socket, pattern)
+		return receive(self.socket, pattern)
 	end,
 	
 	flush = function (self)
-		return flush (self.socket)
+		return flush(self.socket)
 	end,
 	
 	settimeout = function (self,time)
 		self.timeout=time
 		return
 	end,
+	
 	close = function(self)
+		clientTable[self.socket]:close()
 		self.socket:close()
 	end
 }}
 function wrap(sock)
 	return setmetatable({socket = sock}, _skt_mt)
 end
-loop = luaevent.core.loop
\ No newline at end of file
+loop = function(...) base:loop(...) end
\ No newline at end of file
--- a/luaevent/src/luaevent.c	Wed Jun 13 04:32:12 2007 +0000
+++ b/luaevent/src/luaevent.c	Sat Aug 18 20:57:33 2007 +0000
@@ -1,111 +1,87 @@
 /* LuaEvent - Copyright (C) 2007 Thomas Harning <harningt@gmail.com>
  * Licensed as LGPL - See doc/COPYING for details */
-
-#include "luaevent.h"
+ #include "luaevent.h"
 
 #include <lua.h>
 #include <lauxlib.h>
+#include <assert.h>
 
 #define EVENT_BASE_MT "EVENT_BASE_MT"
 #define EVENT_CALLBACK_ARG_MT "EVENT_CALLBACK_ARG_MT"
-#define EVENT_BASE_LOCATION 1
+#define MAIN_THREAD_LOCATION 1
 
-static void setEventBase(lua_State* L, struct event_base* base) {
-	struct event_base** pbase = lua_newuserdata(L, sizeof(base));
-	*pbase = base;
-	luaL_getmetatable(L, EVENT_BASE_MT);
-	lua_setmetatable(L, -2);
-	lua_rawseti(L, LUA_ENVIRONINDEX, EVENT_BASE_LOCATION);
+void setMainThread(lua_State* L) {
+	lua_pushthread(L);
+	lua_rawseti(L, LUA_ENVIRONINDEX, MAIN_THREAD_LOCATION);
 }
-struct event_base* getEventBase(lua_State* L) {
-	struct event_base* base;
-	lua_rawgeti(L, LUA_ENVIRONINDEX, EVENT_BASE_LOCATION);
-	base = *(struct event_base**)lua_topointer(L, -1);
+lua_State* getMainThread(lua_State* L) {
+	lua_State* g_L;
+	lua_rawgeti(L, LUA_ENVIRONINDEX, MAIN_THREAD_LOCATION);
+	g_L = lua_tothread(L, -1);
 	lua_pop(L, 1);
-	return base;
-}
-
-static void freeCallbackArgs(le_callback* arg) {
-	if(arg->L) {
-		lua_State* L = arg->L;
-		arg->L = NULL;
-		event_del(&arg->ev);
-		luaL_unref(L, LUA_REGISTRYINDEX, arg->callbackRef);
-		luaL_unref(L, LUA_REGISTRYINDEX, arg->objectRef);
-	}
+	return g_L;
 }
 
-static int call_callback_function(lua_State* L, int argCount) {
-	int ret;
-	if(lua_pcall(L, argCount, 1, 0) || !(lua_isnil(L, -1) || lua_isnumber(L, -1))) {
-		printf("ERROR IN INIT: %s\n", lua_tostring(L, -1));
-		lua_pop(L, 1);
-		return -1;
-	}
-	/* Lua_isnil returns 1 if the value is nil... */
-	ret = lua_tointeger(L, -1) | -lua_isnil(L, -1);
-	lua_pop(L, 1);
-	if(ret < 0) { /* Done, no need to setup event */
-		return -1;
-	}
-	if(ret != EV_READ && ret != EV_WRITE) {
-		printf("BAD RET_VAL IN INIT: %i\n", ret);
-	}
-	return ret;
+int luaevent_newbase(lua_State* L) {
+	le_base *base = (le_base*)lua_newuserdata(L, sizeof(le_base));
+	base->loop_L = NULL; /* No running loop */
+	base->base = event_init();
+	luaL_getmetatable(L, EVENT_BASE_MT);
+	lua_setmetatable(L, -2);
+	return 1;
 }
 
-static void luaevent_callback(int fd, short event, void* p);
-
-static void setup_event(le_callback* arg, int fd, short event, int resetEvent) {
-	/* Setup event... */
-	if(resetEvent) event_del(&arg->ev);
-	event_set(&arg->ev, fd, event| EV_PERSIST, luaevent_callback, arg);
-	if(!resetEvent) event_base_set(getEventBase(arg->L), &arg->ev);
-	event_add(&arg->ev, NULL);
+void freeCallbackArgs(le_callback* arg, lua_State* L) {
+	if(arg->base) {
+		arg->base = NULL;
+		event_del(&arg->ev);
+		luaL_unref(L, LUA_REGISTRYINDEX, arg->callbackRef);
+	}
 }
-
 /* le_callback is allocated at the beginning of the coroutine in which it
 is used, no need to manually de-allocate */
 
 /* Index for coroutine is fd as integer for *nix, as lightuserdata for Win */
 static void luaevent_callback(int fd, short event, void* p) {
 	le_callback* arg = p;
-	lua_State* L = arg->L;
+	lua_State* L;
 	int ret;
+	assert(arg && arg->base && arg->base->loop_L);
+	L = arg->base->loop_L;
 	lua_rawgeti(L, LUA_REGISTRYINDEX, arg->callbackRef);
 	lua_pushinteger(L, event);
-	
-	if(-1 == (ret = call_callback_function(L, 1))) {
-		freeCallbackArgs(arg);
-		return;
+	lua_call(L, 1, 1);
+	ret = lua_tointeger(L, -1);
+	lua_pop(L, 1);
+	if(ret == -1) {
+		freeCallbackArgs(arg, L);
+	} else {
+		struct event *ev = &arg->ev;
+		int newEvent = ret;
+		if(newEvent != event) { // Need to hook up new event...
+			event_del(ev);
+			event_set(ev, fd, EV_PERSIST | newEvent, luaevent_callback, arg);
+			event_add(ev, NULL);
+		}
 	}
-	
-	if(event != ret)
-		setup_event(arg, fd, ret, 1);
 }
 
 static int luaevent_base_gc(lua_State* L) {
-	struct event_base** pbase = luaL_checkudata(L, 1, EVENT_BASE_MT);
-	if(*pbase) {
-		event_base_free(*pbase);
-		*pbase = NULL;
+	le_base *base = luaL_checkudata(L, 1, EVENT_BASE_MT);
+	if(base->base) {
+		event_base_free(base->base);
+		base->base = NULL;
 	}
 	return 0;
 }
 
 static int luaevent_cb_gc(lua_State* L) {
 	le_callback* arg = luaL_checkudata(L, 1, EVENT_CALLBACK_ARG_MT);
-	freeCallbackArgs(arg);
+	freeCallbackArgs(arg, L);
 	return 0;
 }
 
-static int luaevent_cb_getfd(lua_State* L) {
-	le_callback* arg = luaL_checkudata(L, 1, EVENT_CALLBACK_ARG_MT);
-	lua_pushinteger(L, arg->ev.ev_fd);
-	return 1;
-}
-
-static int getSocketFd(lua_State* L, int idx) {
+int getSocketFd(lua_State* L, int idx) {
 	int fd;
 	luaL_checktype(L, idx, LUA_TUSERDATA);
 	lua_getfield(L, idx, "getfd");
@@ -118,52 +94,48 @@
 	return fd;
 }
 
-static void push_new_callback(lua_State* L, int callbackRef, int fd, short event) {
-	le_callback* arg = lua_newuserdata(L, sizeof(*arg));
+/* sock, event, callback */
+static int luaevent_addevent(lua_State* L) {
+	int fd, event, callbackRef;
+	le_callback* arg;
+	le_base *base = luaL_checkudata(L, 1, EVENT_BASE_MT);
+	fd = getSocketFd(L, 2);
+	event = luaL_checkinteger(L, 3);
+	luaL_checktype(L, 4, LUA_TFUNCTION);
+	lua_pushvalue(L, 4);
+	callbackRef = luaL_ref(L, LUA_REGISTRYINDEX);
+	arg = lua_newuserdata(L, sizeof(*arg));
 	luaL_getmetatable(L, EVENT_CALLBACK_ARG_MT);
 	lua_setmetatable(L, -2);
 	
-	arg->L = L;
+	arg->base = base;
 	arg->callbackRef = callbackRef;
-	lua_pushvalue(L, -1);
-	arg->objectRef = luaL_ref(L, LUA_REGISTRYINDEX);
-	setup_event(arg, fd, event, 0);
-}
-/* Expected to be called at the beginning of the coro that uses it.. 
-Value must be kept until coro is complete....
-*/
-/* sock, callback */
-static int luaevent_addevent(lua_State* L) {
-	int fd, callbackRef;
-	int top, ret;
-	fd = getSocketFd(L, 1);
-	luaL_checktype(L, 2, LUA_TFUNCTION);
-	top = lua_gettop(L);
-	/* Preserve the callback function */
-	lua_pushvalue(L, 2);
-	callbackRef = luaL_ref(L, LUA_REGISTRYINDEX);
-	/* Call the callback with all arguments after it to get the loop primed.. */
-	if(-1 == (ret = call_callback_function(L, top - 2))) {
-		luaL_unref(L, LUA_REGISTRYINDEX, callbackRef);
-		return 0;
-	}
-	
-	push_new_callback(L, callbackRef, fd, ret);
+	/* Setup event... */
+	event_set(&arg->ev, fd, event | EV_PERSIST, luaevent_callback, arg);
+	event_base_set(base->base, &arg->ev);
+	event_add(&arg->ev, NULL);
 	return 1;
 }
 
 static int luaevent_loop(lua_State* L) {
-	int ret = event_base_loop(getEventBase(L), 0);
+	le_base *base = luaL_checkudata(L, 1, EVENT_BASE_MT);
+	base->loop_L = L;
+	int ret = event_base_loop(base->base, 0);
 	lua_pushinteger(L, ret);
 	return 1;
 }
 
-static luaL_Reg funcs[] = {
+static luaL_Reg base_funcs[] = {
 	{ "addevent", luaevent_addevent },
 	{ "loop", luaevent_loop },
 	{ NULL, NULL }
 };
 
+static luaL_Reg funcs[] = {
+	{ "new", luaevent_newbase },
+	{ NULL, NULL }
+};
+
 typedef struct {
 	const char* name;
 	int value;
@@ -191,18 +163,21 @@
 	lua_replace(L, LUA_ENVIRONINDEX);
 	/* Setup metatable */
 	luaL_newmetatable(L, EVENT_BASE_MT);
+	lua_newtable(L);
+	luaL_register(L, NULL, base_funcs);
+	lua_setfield(L, -2, "__index");
 	lua_pushcfunction(L, luaevent_base_gc);
 	lua_setfield(L, -2, "__gc");
 	lua_pop(L, 1);
 	luaL_newmetatable(L, EVENT_CALLBACK_ARG_MT);
 	lua_pushcfunction(L, luaevent_cb_gc);
 	lua_setfield(L, -2, "__gc");
-	lua_pushcfunction(L, luaevent_cb_getfd);
-	lua_setfield(L, -2, "getfd");
+	lua_newtable(L);
+	lua_pushcfunction(L, luaevent_cb_gc);
+	lua_setfield(L, -2, "close");
+	lua_setfield(L, -2, "__index");
 	lua_pop(L, 1);
 
-	setEventBase(L, event_init());
-	
 	luaL_register(L, "luaevent.core", funcs);
 	setNamedIntegers(L, consts);
 	return 1;
--- a/luaevent/test/test.lua	Wed Jun 13 04:32:12 2007 +0000
+++ b/luaevent/test/test.lua	Sat Aug 18 20:57:33 2007 +0000
@@ -5,20 +5,30 @@
 
 require"luaevent"
 require"socket"
-local function echoHandler(skt)
-	while true do
-		local data,ret = luaevent.receive(skt, 10)
-		if data == "quit" or ret == 'closed' or not data then
-			break
-		end
-		--collectgarbage()
-		if not luaevent.send(skt, data) then return end
-	end
-	if skt then skt:close() end
+local oldPrint = print
+print = function(...)
+	oldPrint("SRV", ...)
 end
 
+local function echoHandler(skt)
+  while true do
+    local data,ret = luaevent.receive(skt, 10)
+    --print("GOT: ", data, ret)
+    if data == "quit" or ret == 'closed' then
+      break
+    end
+    luaevent.send(skt, data)
+    collectgarbage()
+  end
+  skt:close()
+  --print("DONE")
+end
 local server = assert(socket.bind("localhost", 20000))
 server:settimeout(0)
-
+local coro = coroutine.create
+coroutine.create = function(...)
+	local ret = coro(...)
+	return ret
+end
 luaevent.addserver(server, echoHandler)
-luaevent.loop()
\ No newline at end of file
+luaevent.loop()
--- a/luaevent/test/testClient.lua	Wed Jun 13 04:32:12 2007 +0000
+++ b/luaevent/test/testClient.lua	Sat Aug 18 20:57:33 2007 +0000
@@ -1,29 +1,19 @@
 require"luaevent"
 require"socket"
-local function setupHook(thread)
-	if not thread then debug.sethook(function(event) print("TRACE >: ", debug.getinfo(2, 'n').name) end, 'c')
-	else debug.sethook(thread, function(event) print("TRACE ", thread,">: ", debug.getinfo(2, 'n').name) end, 'c') end
+local oldPrint = print
+print = function(...)
+	oldPrint("CLT", ...)
 end
-local count = 100
-local function func(sock)
+
+local function func()
+	print("ACTIVATED")
+	local sock = socket.tcp()
+	--sock:
 	sock = luaevent.wrap(sock)
-	assert(sock:connect("localhost", 20000))
-	for i = 1, 2 do
-		local maxZ = 10
-		for z = 1, maxZ do
-			assert(sock:send("Greet me  "))
-		end
-		assert(sock:receive(10 * maxZ))
-	end
-	if skt then skt:close() end
-	count = count - 1
-	if count > 0 then
-		--local sock = assert(socket.tcp())
-		--luaevent.addthread(sock, func, sock)
-	end
+	print(assert(sock:connect("localhost", 20000)))
+	for i = 1, 100 do assert(sock:send("Greet me  ")) assert(sock:receive(10)) collectgarbage() end
 end
-for i = 1, 500 do
-	local sock = assert(socket.tcp())
-	luaevent.addthread(sock, func, sock)
-end
-luaevent.loop()
+
+luaevent.addthread(func)
+
+luaevent.loop()
\ No newline at end of file

mercurial