Sun, 01 Aug 2010 13:24:24 +0100
Add 'incr' and 'decr' commands (32-bit only)
local server = require "net.server"; local log = require "util.logger".init("memcached"); local memcache = require "util.memcache"; local cache = memcache.new(); local memcached_listener = {}; local command_handlers = {}; --- Network handlers function memcached_listener.onconnect(conn) end function memcached_listener.onincoming(conn, line) local command, params_pos = line:match("^(%S+) ?()"); local command_handler = command_handlers[command]; if command_handler then local ok, err = command_handler(conn, line:sub(params_pos)); if ok == false then conn:write("CLIENT_ERROR "..err.."\r\n"); end elseif command then log("warn", "Client sent unknown command: %s", command); conn:write("ERROR\r\n"); end end function memcached_listener.ondisconnect(conn, err) end --- Command handlers local function generic_store_command(store_method, conn, params) local key, flags, exptime, bytes, reply = params:match("(%S+) (%d+) (%d+) (%d+) ?(.*)$"); flags, exptime, bytes, reply = tonumber(flags), tonumber(exptime), tonumber(bytes), reply ~= "noreply"; if not (flags and exptime and bytes) then if reply then return false, "Invalid parameter(s)"; else return nil; end end conn:set_mode("*a"); local received_count, received_buffer = 0, {}; local function handle_data(conn, data) log("debug", "Received data of length "..#data.." out of "..bytes); received_count = received_count + #data; received_buffer[#received_buffer+1] = data; if received_count >= bytes then received_buffer = table.concat(received_buffer); local ok, err = store_method(cache, key, flags, exptime, received_buffer:sub(1,bytes)); if reply then if ok then if err == true then conn:send("STORED\r\n"); else conn:send("NOT_STORED\r\n"); end else conn:send("SERVER_ERROR "..(err or "Unknown error").."\r\n"); end end conn:setlistener(memcached_listener); conn:set_mode("*l"); if received_count > bytes then log("debug", "Re-handling %d extra bytes", received_count-bytes); memcached_listener.onincoming(conn, received_buffer:sub(bytes+1)); end end end conn:setlistener({ onincoming = handle_data; ondisconnect = memcached_listener.ondisconnect; }); log("debug", "Waiting for "..bytes.." bytes from client"); return true; end function command_handlers.set(conn, params) return generic_store_command(cache.set, conn, params); end function command_handlers.add(conn, params) return generic_store_command(cache.add, conn, params); end function command_handlers.replace(conn, params) return generic_store_command(cache.replace, conn, params); end local function generic_increment_decrement_command(method, conn, params) local key, amount = params:match("^(%S+) (%d+)"); local reply = params:match(" (noreply)$") ~= "noreply"; amount = tonumber(amount); local ok, err; if not (key and amount) then if reply then return false, "Invalid parameter(s)"; else return nil; end end local ok, new_value = method(cache, key, amount); if ok and reply then conn:write(new_value.."\r\n"); end return true; end function command_handlers.incr(conn, params) return generic_increment_decrement_command(cache.incr, conn, params); end function command_handlers.decr(conn, params) return generic_increment_decrement_command(cache.decr, conn, params); end function command_handlers.get(conn, keys) for key in keys:gmatch("%S+") do local flags, data = cache:get(key); if data then conn:write("VALUE "..key.." "..flags.." "..#data.."\r\n"..data.."\r\n"); end end conn:write("END\r\n"); return true; end function command_handlers.delete(conn, params) local key, keyend = params:match("^(%S+)()"); local time, reply = params:match(" (%d+)", keyend), params:match(" (noreply)$", keyend); time, reply = tonumber(time), reply ~= "noreply"; local ok, err; if not key then ok, err = false, "Unable to determine key from request"; else ok, err = cache:delete(key, time); if ok then if err then conn:write("DELETED\r\n"); else conn:write("NOT_FOUND\r\n"); end end end if not reply then return nil; end return ok, err; end function command_handlers.version(conn) conn:write("VERSION Mooncached 0.1\r\n"); return true; end function command_handlers.quit(conn) conn:close(); return true; end logger.setwriter(function (name, level, format, ...) return print(name, level, format:format(...)); end); server.addserver("*", 11211, memcached_listener, "*l"); server.loop();