memcached.lua

changeset 6
4b67f2a8134b
parent 5
a702c02caacc
child 7
2835b44fe517
equal deleted inserted replaced
5:a702c02caacc 6:4b67f2a8134b
1 local server = require "net.server";
2 local log = require "util.logger".init("memcached");
3 local memcache = require "util.memcache";
4
5 local cache = memcache.new();
6
7 local memcached_listener = {};
8 local command_handlers = {};
9
10 --- Network handlers
11
12 function memcached_listener.onconnect(conn)
13 end
14
15 function memcached_listener.onincoming(conn, line)
16 local command, params_pos = line:match("^(%S+) ?()");
17 local command_handler = command_handlers[command];
18 if command_handler then
19 local ok, err = command_handler(conn, line:sub(params_pos));
20 if ok == false then
21 conn:write("CLIENT_ERROR "..err.."\r\n");
22 end
23 elseif command then
24 log("warn", "Client sent unknown command: %s", command);
25 conn:write("ERROR\r\n");
26 end
27 end
28
29 function memcached_listener.ondisconnect(conn, err)
30 end
31
32 --- Command handlers
33
34 function command_handlers.set(conn, params)
35 local key, flags, exptime, bytes, reply = params:match("(%S+) (%d+) (%d+) (%d+) ?(.*)$");
36 flags, exptime, bytes, reply = tonumber(flags), tonumber(exptime), tonumber(bytes), reply ~= "noreply";
37 if not (flags and exptime and bytes) then
38 return false, "Invalid parameter(s)";
39 end
40 conn:set_mode("*a");
41 local received_count, received_buffer = 0, {};
42 local function handle_data(conn, data)
43 log("debug", "Received data of length "..#data.." out of "..bytes);
44 received_count = received_count + #data;
45 received_buffer[#received_buffer+1] = data;
46 if received_count >= bytes then
47 received_buffer = table.concat(received_buffer);
48 local ok, err = cache:set(key, flags, exptime, received_buffer:sub(1,bytes));
49 if ok then
50 conn:send("STORED\r\n");
51 else
52 conn:send("SERVER_ERROR "..(err or "Unknown error").."\r\n");
53 end
54 conn:setlistener(memcached_listener);
55 conn:set_mode("*l");
56 if received_count > bytes then
57 log("debug", "Re-handling %d extra bytes", received_count-bytes);
58 memcached_listener.onincoming(conn, received_buffer:sub(bytes+1));
59 end
60 end
61 end
62 conn:setlistener({
63 onincoming = handle_data;
64 ondisconnect = memcached_listener.ondisconnect;
65 });
66 log("debug", "Waiting for "..bytes.." bytes from client");
67 return true;
68 end
69
70 function command_handlers.get(conn, keys)
71 for key in keys:gmatch("%S+") do
72 local flags, data = cache:get(key);
73 if flags then
74 conn:write("VALUE "..key.." "..flags.." "..#data.."\r\n"..data.."\r\n");
75 end
76 end
77 conn:write("END\r\n");
78 return true;
79 end
80
81 function command_handlers.quit(conn)
82 conn:close();
83 return true;
84 end
85
86 logger.setwriter(function (name, level, format, ...) return print(name, level, format:format(...)); end);
87
88 server.addserver("*", 11211, memcached_listener, "*l");
89
90 server.loop();

mercurial