mooncached.lua

Sun, 01 Aug 2010 13:24:24 +0100

author
Matthew Wild <mwild1@gmail.com>
date
Sun, 01 Aug 2010 13:24:24 +0100
changeset 19
61c4d7f8279c
parent 17
5bede08f2f55
permissions
-rw-r--r--

Add 'incr' and 'decr' commands (32-bit only)

local server = require "net.server";
local log = require "util.logger".init("memcached");
local memcache = require "util.memcache";

local cache = memcache.new();

local memcached_listener = {};
local command_handlers = {};

--- Network handlers

function memcached_listener.onconnect(conn)
end

function memcached_listener.onincoming(conn, line)
	local command, params_pos = line:match("^(%S+) ?()");
	local command_handler = command_handlers[command];
	if command_handler then
		local ok, err = command_handler(conn, line:sub(params_pos));
		if ok == false then
			conn:write("CLIENT_ERROR "..err.."\r\n");
		end
	elseif command then
		log("warn", "Client sent unknown command: %s", command);
		conn:write("ERROR\r\n");
	end
end

function memcached_listener.ondisconnect(conn, err)
end

--- Command handlers

local function generic_store_command(store_method, conn, params)
	local key, flags, exptime, bytes, reply = params:match("(%S+) (%d+) (%d+) (%d+) ?(.*)$");
	flags, exptime, bytes, reply = tonumber(flags), tonumber(exptime), tonumber(bytes), reply ~= "noreply";
	if not (flags and exptime and bytes) then
		if reply then
			return false, "Invalid parameter(s)";
		else
			return nil;
		end
	end
	conn:set_mode("*a");
	local received_count, received_buffer = 0, {};
	local function handle_data(conn, data)
		log("debug", "Received data of length "..#data.." out of "..bytes);
		received_count = received_count + #data;
		received_buffer[#received_buffer+1] = data;
		if received_count >= bytes then
			received_buffer = table.concat(received_buffer);
			local ok, err = store_method(cache, key, flags, exptime, received_buffer:sub(1,bytes));
			if reply then
				if ok then
					if err == true then
						conn:send("STORED\r\n");
					else
						conn:send("NOT_STORED\r\n");
					end
				else
					conn:send("SERVER_ERROR "..(err or "Unknown error").."\r\n");
				end
			end
			conn:setlistener(memcached_listener);
			conn:set_mode("*l");
			if received_count > bytes then
				log("debug", "Re-handling %d extra bytes", received_count-bytes);
				memcached_listener.onincoming(conn, received_buffer:sub(bytes+1));
			end
		end
	end
	conn:setlistener({
		onincoming = handle_data;
		ondisconnect = memcached_listener.ondisconnect;
	});
	log("debug", "Waiting for "..bytes.." bytes from client");
	return true;
end

function command_handlers.set(conn, params)
	return generic_store_command(cache.set, conn, params);
end

function command_handlers.add(conn, params)
	return generic_store_command(cache.add, conn, params);
end

function command_handlers.replace(conn, params)
	return generic_store_command(cache.replace, conn, params);
end

local function generic_increment_decrement_command(method, conn, params)
	local key, amount = params:match("^(%S+) (%d+)");
	local reply = params:match(" (noreply)$") ~= "noreply";
	amount = tonumber(amount);
	local ok, err;
	if not (key and amount) then
		if reply then
			return false, "Invalid parameter(s)";
		else
			return nil;
		end
	end
	local ok, new_value = method(cache, key, amount);
	if ok and reply then
		conn:write(new_value.."\r\n");
	end
	return true;
end

function command_handlers.incr(conn, params)
	return generic_increment_decrement_command(cache.incr, conn, params);
end

function command_handlers.decr(conn, params)
	return generic_increment_decrement_command(cache.decr, conn, params);
end

function command_handlers.get(conn, keys)
	for key in keys:gmatch("%S+") do
		local flags, data = cache:get(key);
		if data then
			conn:write("VALUE "..key.." "..flags.." "..#data.."\r\n"..data.."\r\n");
		end
	end
	conn:write("END\r\n");
	return true;
end

function command_handlers.delete(conn, params)
	local key, keyend = params:match("^(%S+)()");
	local time, reply = params:match(" (%d+)", keyend), params:match(" (noreply)$", keyend);
	time, reply = tonumber(time), reply ~= "noreply";
	local ok, err;
	if not key then
		ok, err = false, "Unable to determine key from request";
	else
		ok, err = cache:delete(key, time);
		if ok then
			if err then
				conn:write("DELETED\r\n");
			else
				conn:write("NOT_FOUND\r\n");
			end
		end
	end
	if not reply then
		return nil;
	end
	return ok, err;
end

function command_handlers.version(conn)
	conn:write("VERSION Mooncached 0.1\r\n");
	return true;
end

function command_handlers.quit(conn)
	conn:close();
	return true;
end

logger.setwriter(function (name, level, format, ...) return print(name, level, format:format(...)); end);

server.addserver("*", 11211, memcached_listener, "*l");

server.loop();

mercurial