clients.lua

Mon, 04 Jan 2016 16:27:00 +0000

author
Matthew Wild <mwild1@gmail.com>
date
Mon, 04 Jan 2016 16:27:00 +0000
changeset 16
d35376a53644
parent 15
67858d731518
child 17
dd2a570367a9
permissions
-rw-r--r--

main, geoip: Add GeoIP lookup support for watcher info

local socket = require "socket";
local server = require "net.server_select";
local http_server = require"net.http.server";
local new_uuid = require "util.uuid".generate;
local json = require "cjson";
local formdecode = require "util.http".formdecode;
local log = require "util.logger".init("clients");

local response_head = 	table.concat({
	"HTTP/1.1 200 OK";
	"Max-Age: 0";
	"Expires: 0";
	"Cache-Control: no-cache, private";
	"Pragma: no-cache";
	"Content-Type: multipart/x-mixed-replace; boundary=--BoundaryString";
	"Keep-Alive: timeout=5, max=99";
	"Connection: Keep-Alive";
	"Transfer-Encoding: chunked";
	"Set-Cookie: COOKIE_STRING";
	"";
	"";	
}, "\r\n");

local listener = { onconnect = function () end; onincoming = function () end; }

local last_chunk;

local data_stream = require "eventstreams".new();

local have_clients = false;

-- [conn] = cookie
local clients = {};
-- [cookie] = conn
local client_by_cookie = {};
-- [conn] = last_active_timestamp
local active_clients = {};

local activity_timeout = 70;

local function update_have_clients()
	if have_clients and not next(active_clients) then
		have_clients = false;
		log("debug", "No more clients");
		events.fire_event("no-clients");
	elseif not have_clients and next(active_clients) then
		have_clients = true;
		log("debug", "Active clients");
		events.fire_event("have-clients");
	end
end


-- Called when a HTTP stream client closes
function listener.ondisconnect(conn)
	client_by_cookie[clients[conn]] = nil;
	clients[conn] = nil;
	active_clients[conn] = nil;
	update_have_clients();
end
listener.ondetach = listener.ondisconnect;

function handle_request(event, path)
	local path = event.request.url.path;
	if path ~= "/cam" then
		return 404;
	end

	local conn = event.response.conn;

	local cookie = event.request.headers.cookie;
	if cookie then
		log("debug", "Client %s connected", cookie);
	else
		cookie = new_uuid();
		log("debug", "New client connected, assigned %s", cookie);
	end
	conn:write((response_head:gsub("COOKIE_STRING", cookie)));
	clients[conn] = cookie;
	active_clients[conn] = os.time();
	client_by_cookie[cookie] = conn;

	update_have_clients();

	events.fire_event("new-client", { conn = conn, cookie = cookie });

	if last_chunk then
		conn:write(last_chunk);
	end

	conn:setlistener(listener);

	return true;
end

local function mark_active(request)
	local cookie = request.headers.cookie;
	if not cookie then
		log("warn", "Active client with no cookie");
		return;
	end
	local conn = client_by_cookie[cookie];
	if not conn then
		log("warn", "Active client with no connection");
		return;
	end
	active_clients[conn] = os.time();
	update_have_clients();
end

function handle_active(event, path)
	mark_active(event.request);
	return "OK";
end

function handle_watchers(event)
	local active, total = 0, 0;
	for client in pairs(clients) do
		total = total + 1;
		if active_clients[client] then
			active = active + 1;
		end
	end
	mark_active(event.request);
	return tostring(total);
end

function handle_watcher_info(event)
	local watchers = {};
	for client in pairs(clients) do
		local bytes_rx, bytes_tx, age_sec = client:socket():getstats();
		table.insert(watchers, json.encode{
			location = client.location or "(Unknown location)";
			active = not not active_clients[client];
			age = math.floor(age_sec);
			sent = math.floor(bytes_tx/1024);
		});
	end
	mark_active(event.request);
	return "[\n  "..table.concat(watchers, ",\n  ").."\n]";
end

-- Called when a HTTP stream client closes
local function client_closed(request)
	local stream = request._watching;
	stream:remove_watcher(request.conn);
end

function handle_notifications(event)
	event.response._watching = data_stream;
	event.response.on_destroy = client_closed;
	data_stream:add_watcher(event.response.conn);
	return true;
end

function handle_push(event, path)
	local push_event = event.request.path:match("[^/]+$");
	if not push_event then
		log("debug", "Error parsing push path: %s", tostring(event.request.path));
		return;
	end
	log("debug", "Pushing notification for %s", tostring(push_event));
	local params = formdecode(event.request.url.query or "");
	local data;
	if type(params) == "table" then
		data = {};
		for _, e in ipairs(params) do
			data[e.name] = e.value;
		end
	end
	data_stream:push(json.encode({ event = push_event, data = data }));
	return 200;
end

events.add_handler("image-changed", function (event)
	log("debug", "New image");
	local chunk_data = table.concat({
		"--BoundaryString",
		"Content-Type: image/jpeg";
		"Content-Length: "..#event.image;
		"";
		event.image;
	}, "\r\n");
	last_chunk = ("%x\r\n%s\r\n"):format(#chunk_data, chunk_data);
	
	local time_now = os.time();
	for client in pairs(clients) do
		local active_time = active_clients[client];
		if active_time and time_now - active_time < activity_timeout then
			client:write(last_chunk);
		else
			active_clients[client] = nil;
		end
	end
	update_have_clients();
end);

http_server.add_host("localhost");
http_server.set_default_host("localhost");
http_server.add_handler("GET localhost/*", handle_request);
http_server.add_handler("GET localhost/active", handle_active);
http_server.add_handler("GET localhost/watchers", handle_watchers);
http_server.add_handler("GET localhost/watcher_info", handle_watcher_info);
http_server.add_handler("GET localhost/notifications", handle_notifications);
http_server.add_handler("GET localhost/push/*", handle_push);
http_server.listen_on(8006);

mercurial