init.lua

Thu, 10 Jun 2021 11:58:23 +0200

author
Kim Alvefur <zash@zash.se>
date
Thu, 10 Jun 2021 11:58:23 +0200
changeset 445
b119dc4d8bc2
parent 444
12c1be0044c6
child 461
fa5c40e5e079
permissions
-rw-r--r--

plugins.smacks: Don't warn about zero stanzas acked

It's only if the count somehow goes backwards that something is really
wrong. An ack for zero stanzas is fine and we don't need to do anything.


-- Use LuaRocks if available
pcall(require, "luarocks.require");

local socket = require"socket";

-- Load LuaSec if available
pcall(require, "ssl");

local server = require "net.server";
local events = require "util.events";
local logger = require "util.logger";

local verse = {};
verse.server = server;

local stream = {};
stream.__index = stream;
verse.stream_mt = stream;

verse.plugins = {};

function verse.init(...)
	for i=1,select("#", ...) do
		local ok, err = pcall(require, "verse."..select(i,...));
		if not ok then
			error("Verse connection module not found: verse."..select(i,...)..err);
		end
	end
	return verse;
end


local max_id = 0;

function verse.new(logger, base)
	local t = setmetatable(base or {}, stream);
	max_id = max_id + 1;
	t.id = tostring(max_id);
	t.logger = logger or verse.new_logger("stream"..t.id);
	t.events = events.new();
	t.plugins = {};
	t.verse = verse;
	return t;
end

verse.add_task = require "util.timer".add_task;

verse.logger = logger.init; -- COMPAT: Deprecated
verse.new_logger = logger.init;
verse.log = verse.logger("verse");

local function format(format, ...)
	local n, arg, maxn = 0, { ... }, select('#', ...);
	return (format:gsub("%%(.)", function (c) if n <= maxn then n = n + 1; return tostring(arg[n]); end end));
end

function verse.set_log_handler(log_handler, levels)
	levels = levels or { "debug", "info", "warn", "error" };
	logger.reset();
	if io.type(log_handler) == "file" then
		local f = log_handler;
		function log_handler(name, level, message)
			f:write(name, "\t", level, "\t", message, "\n");
		end
	end
	if log_handler then
		local function _log_handler(name, level, message, ...)
			return log_handler(name, level, format(message, ...));
		end
		for i, level in ipairs(levels) do
			logger.add_level_sink(level, _log_handler);
		end
	end
end

function verse._default_log_handler(name, level, message)
	return io.stderr:write(name, "\t", level, "\t", message, "\n");
end
verse.set_log_handler(verse._default_log_handler, { "error" });

local function error_handler(err)
	verse.log("error", "Error: %s", err);
	verse.log("error", "Traceback: %s", debug.traceback());
end

function verse.set_error_handler(new_error_handler)
	error_handler = new_error_handler;
end

function verse.loop()
	return xpcall(server.loop, error_handler);
end

function verse.step()
	return xpcall(server.step, error_handler);
end

function verse.quit()
	return server.setquitting("once");
end

function stream:listen(host, port)
	host = host or "localhost";
	port = port or 0;
	local conn, err = server.addserver(host, port, verse.new_listener(self, "server"), "*a");
	if conn then
		self:debug("Bound to %s:%s", host, port);
		self.server = conn;
	end
	return conn, err;
end

function stream:connect(connect_host, connect_port)
	connect_host = connect_host or "localhost";
	connect_port = tonumber(connect_port) or 5222;

	-- Create and initiate connection
	local conn = socket.tcp()
	conn:settimeout(0);
	conn:setoption("keepalive", true);
	local success, err = conn:connect(connect_host, connect_port);

	if not success and err ~= "timeout" then
		self:warn("connect() to %s:%d failed: %s", connect_host, connect_port, err);
		return self:event("disconnected", { reason = err }) or false, err;
	end

	local conn = server.wrapclient(conn, connect_host, connect_port, verse.new_listener(self), "*a");
	if not conn then
		self:warn("connection initialisation failed: %s", err);
		return self:event("disconnected", { reason = err }) or false, err;
	end
	self:set_conn(conn);
	return true;
end

function stream:set_conn(conn)
	self.conn = conn;
	self.send = function (stream, data)
		self:event("outgoing", data);
		data = tostring(data);
		self:event("outgoing-raw", data);
		return conn:write(data);
	end;
end

function stream:close(reason)
	if not self.conn then
		verse.log("error", "Attempt to close disconnected connection - possibly a bug");
		return;
	end
	local on_disconnect = self.conn.disconnect();
	self.conn:close();
	on_disconnect(self.conn, reason);
end

-- Logging functions
function stream:debug(...)
	return self.logger("debug", ...);
end

function stream:info(...)
	return self.logger("info", ...);
end

function stream:warn(...)
	return self.logger("warn", ...);
end

function stream:error(...)
	return self.logger("error", ...);
end

-- Event handling
function stream:event(name, ...)
	self:debug("Firing event: "..tostring(name));
	return self.events.fire_event(name, ...);
end

function stream:hook(name, ...)
	return self.events.add_handler(name, ...);
end

function stream:unhook(name, handler)
	return self.events.remove_handler(name, handler);
end

function verse.eventable(object)
        object.events = events.new();
        object.hook, object.unhook = stream.hook, stream.unhook;
        local fire_event = object.events.fire_event;
        function object:event(name, ...)
                return fire_event(name, ...);
        end
        return object;
end

function stream:add_plugin(name)
	if self.plugins[name] then return true; end
	if require("verse.plugins."..name) then
		local ok, err = verse.plugins[name](self);
		if ok ~= false then
			self:debug("Loaded %s plugin", name);
			self.plugins[name] = true;
		else
			self:warn("Failed to load %s plugin: %s", name, err);
		end
	end
	return self;
end

-- Listener factory
function verse.new_listener(stream)
	local conn_listener = {};

	function conn_listener.onconnect(conn)
		if stream.server then
			local client = verse.new();
			conn:setlistener(verse.new_listener(client));
			client:set_conn(conn);
			stream:event("connected", { client = client });
		else
			stream.connected = true;
			stream:event("connected");
		end
	end

	function conn_listener.onincoming(conn, data)
		stream:event("incoming-raw", data);
	end

	function conn_listener.ondisconnect(conn, err)
		if conn ~= stream.conn then return end
		stream.connected = false;
		stream:event("disconnected", { reason = err });
	end

	function conn_listener.ondrain(conn)
		stream:event("drained");
	end

	function conn_listener.onstatus(conn, new_status)
		stream:event("status", new_status);
	end

	function conn_listener.onreadtimeout(conn)
		return stream:event("read-timeout");
	end
	return conn_listener;
end

return verse;

mercurial