clients.lua: Add realtime push support

Mon, 04 Jan 2016 11:44:40 +0000

author
Matthew Wild <mwild1@gmail.com>
date
Mon, 04 Jan 2016 11:44:40 +0000
changeset 8
2e4c32c4fb6b
parent 7
59655d6c45b3
child 9
7da8b3c95bb1

clients.lua: Add realtime push support

clients.lua file | annotate | diff | comparison | revisions
--- a/clients.lua	Mon Jan 04 11:44:19 2016 +0000
+++ b/clients.lua	Mon Jan 04 11:44:40 2016 +0000
@@ -2,6 +2,8 @@
 local server = require "net.server_select";
 local http_server = require"net.http.server";
 local new_uuid = require "util.uuid".generate;
+local json = require "cjson";
+local formdecode = require "util.http".formdecode;
 local log = require "util.logger".init("clients");
 
 local response_head = 	table.concat({
@@ -23,6 +25,8 @@
 
 local last_chunk;
 
+local data_stream = require "eventstreams".new();
+
 local have_clients = false;
 
 -- [conn] = cookie
@@ -119,6 +123,38 @@
 	return tostring(total);
 end
 
+-- Called when a HTTP stream client closes
+local function client_closed(request)
+	local stream = request._watching;
+	stream:remove_watcher(request.conn);
+end
+
+function handle_notifications(event)
+	event.response._watching = data_stream;
+	event.response.on_destroy = client_closed;
+	data_stream:add_watcher(event.response.conn);
+	return true;
+end
+
+function handle_push(event, path)
+	local push_event = event.request.path:match("[^/]+$");
+	if not push_event then
+		log("debug", "Error parsing push path: %s", tostring(event.request.path));
+		return;
+	end
+	log("debug", "Pushing notification for %s", tostring(push_event));
+	local params = formdecode(event.request.url.query or "");
+	local data;
+	if type(params) == "table" then
+		data = {};
+		for _, e in ipairs(params) do
+			data[e.name] = e.value;
+		end
+	end
+	data_stream:push(json.encode({ event = push_event, data = data }));
+	return 200;
+end
+
 events.add_handler("image-changed", function (event)
 	log("debug", "New image");
 	local chunk_data = table.concat({
@@ -147,4 +183,6 @@
 http_server.add_handler("GET localhost/*", handle_request);
 http_server.add_handler("GET localhost/active", handle_active);
 http_server.add_handler("GET localhost/watchers", handle_watchers);
+http_server.add_handler("GET localhost/notifications", handle_notifications);
+http_server.add_handler("GET localhost/push/*", handle_push);
 http_server.listen_on(8006);

mercurial