diff mbox series

ulogd: json: send messages to a remote host / unix socket

Message ID 8360D1AD-5F94-4E9B-8B61-045BB1FED10F@waterwave.ch
State Changes Requested
Delegated to: Pablo Neira
Headers show
Series ulogd: json: send messages to a remote host / unix socket | expand

Commit Message

Andreas Jaggi May 1, 2018, 12:16 p.m. UTC
Extend the JSON output plugin so that the generated JSON stream can be
sent to a remote host via TCP/UDP or to a local unix socket.

Signed-off-by: Andreas Jaggi <andreas.jaggi@waterwave.ch>
---
 output/ulogd_output_JSON.c | 225 +++++++++++++++++++++++++++++++++----
 ulogd.conf.in              |  11 ++
 2 files changed, 214 insertions(+), 22 deletions(-)

Comments

Arturo Borrero Gonzalez May 9, 2018, 10:15 a.m. UTC | #1
On 1 May 2018 at 14:16, Andreas Jaggi <andreas.jaggi@waterwave.ch> wrote:
> Extend the JSON output plugin so that the generated JSON stream can be
> sent to a remote host via TCP/UDP or to a local unix socket.
>
> Signed-off-by: Andreas Jaggi <andreas.jaggi@waterwave.ch>
> ---
>  output/ulogd_output_JSON.c | 225 +++++++++++++++++++++++++++++++++----
>  ulogd.conf.in              |  11 ++
>  2 files changed, 214 insertions(+), 22 deletions(-)
>

HI Andreas, thanks for working on this.

Some review below.

> +static int _connect_socket(struct ulogd_pluginstance *pi)
> +{
> +       struct json_priv *op = (struct json_priv *) &pi->private;
> +       struct addrinfo hints;
> +       struct addrinfo *result, *rp;
> +       struct sockaddr_un u_addr;
> +       int sfd, s;
> +
> +       if ( op->sock != -1 ) {
> +               close(op->sock);
> +               op->sock = -1;
> +       }
> +       if ( op->mode == JSON_MODE_UNIX ) {
> +               ulogd_log(ULOGD_DEBUG, "connecting to unix:%s\n", file_ce(pi->config_kset).u.string);
> +
> +               sfd = socket(AF_UNIX, SOCK_STREAM, 0);
> +               if (sfd == -1 ) {
> +                       ulogd_log(ULOGD_ERROR, "Could not connect\n");
> +                       return -1;
> +               }
> +               u_addr.sun_family = AF_UNIX;
> +               strncpy(u_addr.sun_path, file_ce(pi->config_kset).u.string, sizeof(u_addr.sun_path) - 1);
> +               if ( connect(sfd, (struct sockaddr *) &u_addr, sizeof(struct sockaddr_un)) == -1 ) {
> +                       ulogd_log(ULOGD_ERROR, "Could not connect\n");
> +                       close(sfd);
> +                       return -1;
> +               }
> +       } else {
> +               ulogd_log(ULOGD_DEBUG, "connecting to %s:%s\n", host_ce(pi->config_kset).u.string, port_ce(pi->config_kset).u.string);
> +
> +               memset(&hints, 0, sizeof(struct addrinfo));
> +               hints.ai_family = AF_UNSPEC;
> +               hints.ai_socktype = op->mode == JSON_MODE_UDP ? SOCK_DGRAM : SOCK_STREAM;
> +               hints.ai_protocol = 0;
> +               hints.ai_flags = 0;
> +
> +               s = getaddrinfo(host_ce(pi->config_kset).u.string, port_ce(pi->config_kset).u.string, &hints, &result);
> +               if (s != 0) {
> +                       ulogd_log(ULOGD_ERROR, "getaddrinfo: %s\n", gai_strerror(s));
> +                       return -1;
> +               }
> +
> +               for (rp = result; rp != NULL; rp = rp->ai_next) {
> +                       int on = 1;
> +
> +                       sfd = socket(rp->ai_family, rp->ai_socktype,
> +                                       rp->ai_protocol);
> +                       if (sfd == -1)
> +                               continue;
> +
> +                       setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR,
> +                                  (char *) &on, sizeof(on));
> +
> +                       if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1)
> +                               break;
> +
> +                       close(sfd);
> +               }
> +
> +               freeaddrinfo(result);
> +
> +               if (rp == NULL) {
> +                       ulogd_log(ULOGD_ERROR, "Could not connect\n");
> +                       return -1;
> +               }
> +       }
> +
> +       op->sock = sfd;
> +
> +       return 0;
> +}
> +

^^^^

could we split the function above in smaller chunks?

something like _connect_socket_unix() and _connect_socket_net()


> @@ -218,13 +331,41 @@ static int json_interp(struct ulogd_pluginstance *upi)
>                 }
>         }
>
> -       json_dumpf(msg, opi->of, 0);
> -       fprintf(opi->of, "\n");
>
> +       buf = json_dumps(msg, 0);
>         json_decref(msg);
> -
> -       if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
> -               fflush(opi->of);
> +       if (buf == NULL) {
> +               ulogd_log(ULOGD_ERROR, "Could not create message\n");
> +               return ULOGD_IRET_ERR;
> +       }
> +       buflen = strlen(buf);
> +       buf = realloc(buf, sizeof(char)*(buflen+2));
> +       if (buf == NULL) {
> +               ulogd_log(ULOGD_ERROR, "Could not create message\n");
> +               return ULOGD_IRET_ERR;
> +       }
> +       strncat(buf, "\n", 1);
> +       buflen++;
> +
> +       if ( opi->mode == JSON_MODE_FILE ) {
> +               fprintf(opi->of, "%s", buf);
> +               free(buf);
> +               if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
> +                       fflush(opi->of);
> +       } else {
> +               if ( opi->sock != -1 ) {
> +                       ret = send(opi->sock, buf, buflen, MSG_NOSIGNAL);
> +               }
> +               free(buf);
> +               if (ret != buflen) {
> +                       ulogd_log(ULOGD_ERROR, "Failure sending message: %s\n", strerror(errno));
> +                       if (ret == -1 || opi->sock == -1) {
> +                               return _connect_socket(upi);
> +                       } else {
> +                               return ULOGD_IRET_ERR;
> +                       }
> +               }
> +       }

^^^^

Also here, I see we check several times if ( opi->mode == JSON_MODE_FILE ).
May I suggest to introduce smaller functions depending on the mode?

Something like:

if ( opi->mode == JSON_MODE_FILE )
   json_interp_file()
else
   json_interp_xxxx()

> @@ -293,6 +451,22 @@ static int json_init(struct ulogd_pluginstance *upi)
>
>         *op->cached_tz = '\0';
>
> +       if ( op->mode == JSON_MODE_FILE ) {
> +               op->of = fopen(upi->config_kset->ces[0].u.string, "a");
> +               if (!op->of) {
> +                       ulogd_log(ULOGD_FATAL, "can't open JSON log file: %s\n",
> +                               strerror(errno));
> +                       return -1;
> +               }
> +       } else {
> +               if ( host_ce(upi->config_kset).u.string == NULL )
> +                       return -1;
> +               if ( port_ce(upi->config_kset).u.string == NULL)
> +                       return -1;

^^^^

Please check the coding style.

Try "(this)" instead of "( this )" or even "( this)", i.e, no spaces
after '(', or before ')'.

>
> @@ -300,8 +474,15 @@ static int json_fini(struct ulogd_pluginstance *pi)
>  {
>         struct json_priv *op = (struct json_priv *) &pi->private;
>
> -       if (op->of != stdout)
> -               fclose(op->of);
> +       if ( op->mode == JSON_MODE_FILE ) {
> +               if (op->of != stdout)
> +                       fclose(op->of);
> +       } else {
> +               if ( op->sock != -1 ) {
> +                       close(op->sock);
> +                       op->sock = -1;
> +               }
> +       }
>

^^^^

same, something like:

if JSON_MODE_FILE
   close_file()
else
   close_socket()

>         return 0;
>  }
> diff --git a/ulogd.conf.in b/ulogd.conf.in
> index 2fcf39a..e94a3a2 100644
> --- a/ulogd.conf.in
> +++ b/ulogd.conf.in
> @@ -212,6 +212,17 @@ sync=1
>  # Uncomment the following line to use JSON v1 event format that
>  # can provide better compatility with some JSON file reader.
>  #eventv1=1
> +# Uncomment the following lines to send the JSON logs to a remote host via UDP
> +#mode="udp"
> +#host="192.0.2.10"
> +#port="10210"
> +# Uncomment the following lines to send the JSON logs to a remote host via TCP
> +#mode="tcp"
> +#host="192.0.2.10"
> +#port="10210"
> +# Uncomment the following lines to send the JSON logs to a local unix socket
> +#mode="unix"
> +#file="/var/run/ulogd.socket"

This new feature looks great! thanks again.
--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Andreas Jaggi May 26, 2018, 10:55 p.m. UTC | #2
Hi Arturo

Thanks for the review, find below the reworked patch.
Let me know if there are other parts to improve.

Cheers
Andreas



Extend the JSON output plugin so that the generated JSON stream can be
sent to a remote host via TCP/UDP or to a local unix socket.

Signed-off-by: Andreas Jaggi <andreas.jaggi@waterwave.ch>
---
 output/ulogd_output_JSON.c | 296 ++++++++++++++++++++++++++++++++++---
 ulogd.conf.in              |  11 ++
 2 files changed, 286 insertions(+), 21 deletions(-)

diff --git a/output/ulogd_output_JSON.c b/output/ulogd_output_JSON.c
index 4d8e3e9..fed20b7 100644
--- a/output/ulogd_output_JSON.c
+++ b/output/ulogd_output_JSON.c
@@ -20,10 +20,15 @@
 
 #include <stdio.h>
 #include <stdlib.h>
+#include <unistd.h>
 #include <string.h>
 #include <time.h>
 #include <errno.h>
 #include <inttypes.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netdb.h>
 #include <ulogd/ulogd.h>
 #include <ulogd/conffile.h>
 #include <jansson.h>
@@ -36,6 +41,10 @@
 #define ULOGD_JSON_DEFAULT_DEVICE "Netfilter"
 #endif
 
+#define host_ce(x)	(x->ces[JSON_CONF_HOST])
+#define port_ce(x)	(x->ces[JSON_CONF_PORT])
+#define mode_ce(x)	(x->ces[JSON_CONF_MODE])
+#define file_ce(x)	(x->ces[JSON_CONF_FILENAME])
 #define unlikely(x) __builtin_expect((x),0)
 
 struct json_priv {
@@ -44,6 +53,15 @@ struct json_priv {
 	int usec_idx;
 	long cached_gmtoff;
 	char cached_tz[6];	/* eg +0200 */
+	int mode;
+	int sock;
+};
+
+enum json_mode {
+	JSON_MODE_FILE = 0,
+	JSON_MODE_TCP,
+	JSON_MODE_UDP,
+	JSON_MODE_UNIX
 };
 
 enum json_conf {
@@ -53,6 +71,9 @@ enum json_conf {
 	JSON_CONF_EVENTV1,
 	JSON_CONF_DEVICE,
 	JSON_CONF_BOOLEAN_LABEL,
+	JSON_CONF_MODE,
+	JSON_CONF_HOST,
+	JSON_CONF_PORT,
 	JSON_CONF_MAX
 };
 
@@ -95,15 +116,165 @@ static struct config_keyset json_kset = {
 			.options = CONFIG_OPT_NONE,
 			.u = { .value = 0 },
 		},
+		[JSON_CONF_MODE] = {
+			.key = "mode",
+			.type = CONFIG_TYPE_STRING,
+			.options = CONFIG_OPT_NONE,
+			.u = { .string = "file" },
+		},
+		[JSON_CONF_HOST] = {
+			.key = "host",
+			.type = CONFIG_TYPE_STRING,
+			.options = CONFIG_OPT_NONE,
+			.u = { .string = "127.0.0.1" },
+		},
+		[JSON_CONF_PORT] = {
+			.key = "port",
+			.type = CONFIG_TYPE_STRING,
+			.options = CONFIG_OPT_NONE,
+			.u = { .string = "12345" },
+		},
 	},
 };
 
+static int _connect_socket_unix(struct ulogd_pluginstance *pi)
+{
+	struct json_priv *op = (struct json_priv *) &pi->private;
+	struct sockaddr_un u_addr;
+	int sfd;
+
+	if (op->sock != -1) {
+		close(op->sock);
+		op->sock = -1;
+	}
+	ulogd_log(ULOGD_DEBUG, "connecting to unix:%s\n",
+		  file_ce(pi->config_kset).u.string);
+
+	sfd = socket(AF_UNIX, SOCK_STREAM, 0);
+	if (sfd == -1) {
+		return -1;
+	}
+	u_addr.sun_family = AF_UNIX;
+	strncpy(u_addr.sun_path, file_ce(pi->config_kset).u.string,
+		sizeof(u_addr.sun_path) - 1);
+	if (connect(sfd, (struct sockaddr *) &u_addr, sizeof(struct sockaddr_un)) == -1) {
+		close(sfd);
+		return -1;
+	}
+
+	op->sock = sfd;
+
+	return 0;
+}
+
+static int _connect_socket_net(struct ulogd_pluginstance *pi)
+{
+	struct json_priv *op = (struct json_priv *) &pi->private;
+	struct addrinfo hints;
+	struct addrinfo *result, *rp;
+	int sfd, s;
+
+	if (op->sock != -1) {
+		close(op->sock);
+		op->sock = -1;
+	}
+
+	ulogd_log(ULOGD_DEBUG, "connecting to %s:%s\n",
+		  host_ce(pi->config_kset).u.string,
+		  port_ce(pi->config_kset).u.string);
+
+	memset(&hints, 0, sizeof(struct addrinfo));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = op->mode == JSON_MODE_UDP ? SOCK_DGRAM : SOCK_STREAM;
+	hints.ai_protocol = 0;
+	hints.ai_flags = 0;
+
+	s = getaddrinfo(host_ce(pi->config_kset).u.string,
+			port_ce(pi->config_kset).u.string, &hints, &result);
+	if (s != 0) {
+		ulogd_log(ULOGD_ERROR, "getaddrinfo: %s\n", gai_strerror(s));
+		return -1;
+	}
+
+	for (rp = result; rp != NULL; rp = rp->ai_next) {
+		int on = 1;
+
+		sfd = socket(rp->ai_family, rp->ai_socktype,
+				rp->ai_protocol);
+		if (sfd == -1)
+			continue;
+
+		setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR,
+			   (char *) &on, sizeof(on));
+
+		if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1)
+			break;
+
+		close(sfd);
+	}
+
+	freeaddrinfo(result);
+
+	if (rp == NULL) {
+		return -1;
+	}
+
+	op->sock = sfd;
+
+	return 0;
+}
+
+static int _connect_socket(struct ulogd_pluginstance *pi)
+{
+	struct json_priv *op = (struct json_priv *) &pi->private;
+
+	if (op->mode == JSON_MODE_UNIX)
+		return _connect_socket_unix(pi);
+	else
+		return _connect_socket_net(pi);
+}
+
+static int json_interp_socket(struct ulogd_pluginstance *upi, char *buf, int buflen)
+{
+	struct json_priv *opi = (struct json_priv *) &upi->private;
+	int ret = 0;
+
+	if (opi->sock != -1)
+		ret = send(opi->sock, buf, buflen, MSG_NOSIGNAL);
+	free(buf);
+	if (ret != buflen) {
+		ulogd_log(ULOGD_ERROR, "Failure sending message: %s\n",
+			  strerror(errno));
+		if (ret == -1 || opi->sock == -1)
+			return _connect_socket(upi);
+		else
+			return ULOGD_IRET_ERR;
+	}
+
+	return ULOGD_IRET_OK;
+}
+
+static int json_interp_file(struct ulogd_pluginstance *upi, char *buf)
+{
+	struct json_priv *opi = (struct json_priv *) &upi->private;
+
+	fprintf(opi->of, "%s", buf);
+	free(buf);
+
+	if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
+		fflush(opi->of);
+
+	return ULOGD_IRET_OK;
+}
+
 #define MAX_LOCAL_TIME_STRING 38
 
 static int json_interp(struct ulogd_pluginstance *upi)
 {
 	struct json_priv *opi = (struct json_priv *) &upi->private;
 	unsigned int i;
+	char *buf;
+	int buflen;
 	json_t *msg;
 
 	msg = json_object();
@@ -218,34 +389,65 @@ static int json_interp(struct ulogd_pluginstance *upi)
 		}
 	}
 
-	json_dumpf(msg, opi->of, 0);
-	fprintf(opi->of, "\n");
 
+	buf = json_dumps(msg, 0);
 	json_decref(msg);
+	if (buf == NULL) {
+		ulogd_log(ULOGD_ERROR, "Could not create message\n");
+		return ULOGD_IRET_ERR;
+	}
+	buflen = strlen(buf);
+	buf = realloc(buf, sizeof(char)*(buflen+2));
+	if (buf == NULL) {
+		ulogd_log(ULOGD_ERROR, "Could not create message\n");
+		return ULOGD_IRET_ERR;
+	}
+	strncat(buf, "\n", 1);
+	buflen++;
 
-	if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
-		fflush(opi->of);
+	if (opi->mode == JSON_MODE_FILE)
+		return json_interp_file(upi, buf);
+	else
+		return json_interp_socket(upi, buf, buflen);
+}
 
-	return ULOGD_IRET_OK;
+static void reopen_file(struct ulogd_pluginstance *upi)
+{
+	struct json_priv *oi = (struct json_priv *) &upi->private;
+	FILE *old = oi->of;
+
+	ulogd_log(ULOGD_NOTICE, "JSON: reopening logfile\n");
+	oi->of = fopen(upi->config_kset->ces[0].u.string, "a");
+	if (!oi->of) {
+		ulogd_log(ULOGD_ERROR, "can't open JSON "
+				       "log file: %s\n",
+			  strerror(errno));
+		oi->of = old;
+	} else {
+		fclose(old);
+	}
+}
+
+static void reopen_socket(struct ulogd_pluginstance *upi)
+{
+	ulogd_log(ULOGD_NOTICE, "JSON: reopening socket\n");
+	if (_connect_socket(upi) < 0) {
+		ulogd_log(ULOGD_ERROR, "can't open JSON "
+				       "socket: %s\n",
+			  strerror(errno));
+	}
 }
 
 static void sighup_handler_print(struct ulogd_pluginstance *upi, int signal)
 {
 	struct json_priv *oi = (struct json_priv *) &upi->private;
-	FILE *old = oi->of;
 
 	switch (signal) {
 	case SIGHUP:
-		ulogd_log(ULOGD_NOTICE, "JSON: reopening logfile\n");
-		oi->of = fopen(upi->config_kset->ces[0].u.string, "a");
-		if (!oi->of) {
-			ulogd_log(ULOGD_ERROR, "can't open JSON "
-					       "log file: %s\n",
-				  strerror(errno));
-			oi->of = old;
-		} else {
-			fclose(old);
-		}
+		if (oi->mode == JSON_MODE_FILE)
+			reopen_file(upi);
+		else
+			reopen_socket(upi);
 		break;
 	default:
 		break;
@@ -255,6 +457,8 @@ static void sighup_handler_print(struct ulogd_pluginstance *upi, int signal)
 static int json_configure(struct ulogd_pluginstance *upi,
 			    struct ulogd_pluginstance_stack *stack)
 {
+	struct json_priv *op = (struct json_priv *) &upi->private;
+	char *mode_str = mode_ce(upi->config_kset).u.string;
 	int ret;
 
 	ret = ulogd_wildcard_inputkeys(upi);
@@ -265,13 +469,25 @@ static int json_configure(struct ulogd_pluginstance *upi,
 	if (ret < 0)
 		return ret;
 
+	if (!strcasecmp(mode_str, "udp")) {
+		op->mode = JSON_MODE_UDP;
+	} else if (!strcasecmp(mode_str, "tcp")) {
+		op->mode = JSON_MODE_TCP;
+	} else if (!strcasecmp(mode_str, "unix")) {
+		op->mode = JSON_MODE_UNIX;
+	} else if (!strcasecmp(mode_str, "file")) {
+		op->mode = JSON_MODE_FILE;
+	} else {
+		ulogd_log(ULOGD_ERROR, "unknown mode '%s'\n", mode_str);
+		return -EINVAL;
+	}
+
 	return 0;
 }
 
-static int json_init(struct ulogd_pluginstance *upi)
+static int json_init_file(struct ulogd_pluginstance *upi)
 {
 	struct json_priv *op = (struct json_priv *) &upi->private;
-	unsigned int i;
 
 	op->of = fopen(upi->config_kset->ces[0].u.string, "a");
 	if (!op->of) {
@@ -280,6 +496,27 @@ static int json_init(struct ulogd_pluginstance *upi)
 		return -1;
 	}
 
+	return 0;
+}
+
+static int json_init_socket(struct ulogd_pluginstance *upi)
+{
+	struct json_priv *op = (struct json_priv *) &upi->private;
+
+	if (host_ce(upi->config_kset).u.string == NULL)
+		return -1;
+	if (port_ce(upi->config_kset).u.string == NULL)
+		return -1;
+
+	op->sock = -1;
+	return _connect_socket(upi);
+}
+
+static int json_init(struct ulogd_pluginstance *upi)
+{
+	struct json_priv *op = (struct json_priv *) &upi->private;
+	unsigned int i;
+
 	/* search for time */
 	op->sec_idx = -1;
 	op->usec_idx = -1;
@@ -293,15 +530,32 @@ static int json_init(struct ulogd_pluginstance *upi)
 
 	*op->cached_tz = '\0';
 
-	return 0;
+	if (op->mode == JSON_MODE_FILE)
+		return json_init_file(upi);
+	else
+		return json_init_socket(upi);
+}
+
+static void close_file(FILE *of) {
+	if (of != stdout)
+		fclose(of);
+}
+
+static void close_socket(struct json_priv *op) {
+	if (op->sock != -1) {
+		close(op->sock);
+		op->sock = -1;
+	}
 }
 
 static int json_fini(struct ulogd_pluginstance *pi)
 {
 	struct json_priv *op = (struct json_priv *) &pi->private;
 
-	if (op->of != stdout)
-		fclose(op->of);
+	if (op->mode == JSON_MODE_FILE)
+		close_file(op->of);
+	else
+		close_socket(op);
 
 	return 0;
 }
diff --git a/ulogd.conf.in b/ulogd.conf.in
index 2fcf39a..e94a3a2 100644
--- a/ulogd.conf.in
+++ b/ulogd.conf.in
@@ -212,6 +212,17 @@ sync=1
 # Uncomment the following line to use JSON v1 event format that
 # can provide better compatility with some JSON file reader.
 #eventv1=1
+# Uncomment the following lines to send the JSON logs to a remote host via UDP
+#mode="udp"
+#host="192.0.2.10"
+#port="10210"
+# Uncomment the following lines to send the JSON logs to a remote host via TCP
+#mode="tcp"
+#host="192.0.2.10"
+#port="10210"
+# Uncomment the following lines to send the JSON logs to a local unix socket
+#mode="unix"
+#file="/var/run/ulogd.socket"
 
 [pcap1]
 #default file is /var/log/ulogd.pcap
Arturo Borrero Gonzalez May 28, 2018, 9:32 a.m. UTC | #3
On 27 May 2018 at 00:55, Andreas Jaggi <andreas.jaggi@waterwave.ch> wrote:
> Hi Arturo
>
> Thanks for the review, find below the reworked patch.
> Let me know if there are other parts to improve.
>


Thanks Andreas! the patch looks great.
Minor nitpicks below.


> +static int _connect_socket_unix(struct ulogd_pluginstance *pi)
> +{
> +       struct json_priv *op = (struct json_priv *) &pi->private;
> +       struct sockaddr_un u_addr;
> +       int sfd;
> +
> +       if (op->sock != -1) {
> +               close(op->sock);
> +               op->sock = -1;
> +       }

^^^
this socket closing could be your new close_socket() function, right?

> +       ulogd_log(ULOGD_DEBUG, "connecting to unix:%s\n",
> +                 file_ce(pi->config_kset).u.string);
> +
> +       sfd = socket(AF_UNIX, SOCK_STREAM, 0);
> +       if (sfd == -1) {
> +               return -1;
> +       }
> +       u_addr.sun_family = AF_UNIX;
> +       strncpy(u_addr.sun_path, file_ce(pi->config_kset).u.string,
> +               sizeof(u_addr.sun_path) - 1);
> +       if (connect(sfd, (struct sockaddr *) &u_addr, sizeof(struct sockaddr_un)) == -1) {
> +               close(sfd);
> +               return -1;
> +       }
> +
> +       op->sock = sfd;
> +
> +       return 0;
> +}
> +
> +static int _connect_socket_net(struct ulogd_pluginstance *pi)
> +{
> +       struct json_priv *op = (struct json_priv *) &pi->private;
> +       struct addrinfo hints;
> +       struct addrinfo *result, *rp;
> +       int sfd, s;
> +
> +       if (op->sock != -1) {
> +               close(op->sock);
> +               op->sock = -1;
> +       }
> +

^^^
same here


> +       ulogd_log(ULOGD_DEBUG, "connecting to %s:%s\n",
> +                 host_ce(pi->config_kset).u.string,
> +                 port_ce(pi->config_kset).u.string);
> +
> +       memset(&hints, 0, sizeof(struct addrinfo));
> +       hints.ai_family = AF_UNSPEC;
> +       hints.ai_socktype = op->mode == JSON_MODE_UDP ? SOCK_DGRAM : SOCK_STREAM;
> +       hints.ai_protocol = 0;
> +       hints.ai_flags = 0;
> +
> +       s = getaddrinfo(host_ce(pi->config_kset).u.string,
> +                       port_ce(pi->config_kset).u.string, &hints, &result);
> +       if (s != 0) {
> +               ulogd_log(ULOGD_ERROR, "getaddrinfo: %s\n", gai_strerror(s));
> +               return -1;
> +       }
> +
> +       for (rp = result; rp != NULL; rp = rp->ai_next) {
> +               int on = 1;
> +
> +               sfd = socket(rp->ai_family, rp->ai_socktype,
> +                               rp->ai_protocol);
> +               if (sfd == -1)
> +                       continue;
> +
> +               setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR,
> +                          (char *) &on, sizeof(on));
> +
> +               if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1)
> +                       break;
> +
> +               close(sfd);
> +       }
> +
> +       freeaddrinfo(result);
> +
> +       if (rp == NULL) {
> +               return -1;
> +       }
> +
> +       op->sock = sfd;
> +
> +       return 0;
> +}
> +
> +static int _connect_socket(struct ulogd_pluginstance *pi)
> +{
> +       struct json_priv *op = (struct json_priv *) &pi->private;
> +
> +       if (op->mode == JSON_MODE_UNIX)
> +               return _connect_socket_unix(pi);
> +       else
> +               return _connect_socket_net(pi);
> +}
> +
> +static int json_interp_socket(struct ulogd_pluginstance *upi, char *buf, int buflen)
> +{
> +       struct json_priv *opi = (struct json_priv *) &upi->private;
> +       int ret = 0;
> +
> +       if (opi->sock != -1)
> +               ret = send(opi->sock, buf, buflen, MSG_NOSIGNAL);
> +       free(buf);
> +       if (ret != buflen) {
> +               ulogd_log(ULOGD_ERROR, "Failure sending message: %s\n",
> +                         strerror(errno));
> +               if (ret == -1 || opi->sock == -1)
> +                       return _connect_socket(upi);
> +               else
> +                       return ULOGD_IRET_ERR;
> +       }
> +
> +       return ULOGD_IRET_OK;
> +}
> +
> +static int json_interp_file(struct ulogd_pluginstance *upi, char *buf)
> +{
> +       struct json_priv *opi = (struct json_priv *) &upi->private;
> +
> +       fprintf(opi->of, "%s", buf);
> +       free(buf);
> +
> +       if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
> +               fflush(opi->of);
> +
> +       return ULOGD_IRET_OK;
> +}
> +
>  #define MAX_LOCAL_TIME_STRING 38
>
>  static int json_interp(struct ulogd_pluginstance *upi)
>  {
>         struct json_priv *opi = (struct json_priv *) &upi->private;
>         unsigned int i;
> +       char *buf;
> +       int buflen;
>         json_t *msg;
>
>         msg = json_object();
> @@ -218,34 +389,65 @@ static int json_interp(struct ulogd_pluginstance *upi)
>                 }
>         }
>
> -       json_dumpf(msg, opi->of, 0);
> -       fprintf(opi->of, "\n");
>
> +       buf = json_dumps(msg, 0);
>         json_decref(msg);
> +       if (buf == NULL) {
> +               ulogd_log(ULOGD_ERROR, "Could not create message\n");
> +               return ULOGD_IRET_ERR;
> +       }
> +       buflen = strlen(buf);
> +       buf = realloc(buf, sizeof(char)*(buflen+2));
> +       if (buf == NULL) {
> +               ulogd_log(ULOGD_ERROR, "Could not create message\n");
> +               return ULOGD_IRET_ERR;
> +       }
> +       strncat(buf, "\n", 1);
> +       buflen++;
>
> -       if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
> -               fflush(opi->of);
> +       if (opi->mode == JSON_MODE_FILE)
> +               return json_interp_file(upi, buf);
> +       else
> +               return json_interp_socket(upi, buf, buflen);
> +}
>
> -       return ULOGD_IRET_OK;
> +static void reopen_file(struct ulogd_pluginstance *upi)
> +{
> +       struct json_priv *oi = (struct json_priv *) &upi->private;
> +       FILE *old = oi->of;
> +
> +       ulogd_log(ULOGD_NOTICE, "JSON: reopening logfile\n");
> +       oi->of = fopen(upi->config_kset->ces[0].u.string, "a");
> +       if (!oi->of) {
> +               ulogd_log(ULOGD_ERROR, "can't open JSON "
> +                                      "log file: %s\n",
> +                         strerror(errno));
> +               oi->of = old;
> +       } else {
> +               fclose(old);
> +       }
> +}
> +
> +static void reopen_socket(struct ulogd_pluginstance *upi)
> +{
> +       ulogd_log(ULOGD_NOTICE, "JSON: reopening socket\n");
> +       if (_connect_socket(upi) < 0) {
> +               ulogd_log(ULOGD_ERROR, "can't open JSON "
> +                                      "socket: %s\n",
> +                         strerror(errno));
> +       }
>  }
>
>  static void sighup_handler_print(struct ulogd_pluginstance *upi, int signal)
>  {
>         struct json_priv *oi = (struct json_priv *) &upi->private;
> -       FILE *old = oi->of;
>
>         switch (signal) {
>         case SIGHUP:
> -               ulogd_log(ULOGD_NOTICE, "JSON: reopening logfile\n");
> -               oi->of = fopen(upi->config_kset->ces[0].u.string, "a");
> -               if (!oi->of) {
> -                       ulogd_log(ULOGD_ERROR, "can't open JSON "
> -                                              "log file: %s\n",
> -                                 strerror(errno));
> -                       oi->of = old;
> -               } else {
> -                       fclose(old);
> -               }
> +               if (oi->mode == JSON_MODE_FILE)
> +                       reopen_file(upi);
> +               else
> +                       reopen_socket(upi);
>                 break;
>         default:
>                 break;
> @@ -255,6 +457,8 @@ static void sighup_handler_print(struct ulogd_pluginstance *upi, int signal)
>  static int json_configure(struct ulogd_pluginstance *upi,
>                             struct ulogd_pluginstance_stack *stack)
>  {
> +       struct json_priv *op = (struct json_priv *) &upi->private;
> +       char *mode_str = mode_ce(upi->config_kset).u.string;
>         int ret;
>
>         ret = ulogd_wildcard_inputkeys(upi);
> @@ -265,13 +469,25 @@ static int json_configure(struct ulogd_pluginstance *upi,
>         if (ret < 0)
>                 return ret;
>
> +       if (!strcasecmp(mode_str, "udp")) {
> +               op->mode = JSON_MODE_UDP;
> +       } else if (!strcasecmp(mode_str, "tcp")) {
> +               op->mode = JSON_MODE_TCP;
> +       } else if (!strcasecmp(mode_str, "unix")) {
> +               op->mode = JSON_MODE_UNIX;
> +       } else if (!strcasecmp(mode_str, "file")) {
> +               op->mode = JSON_MODE_FILE;
> +       } else {
> +               ulogd_log(ULOGD_ERROR, "unknown mode '%s'\n", mode_str);
> +               return -EINVAL;
> +       }
> +
>         return 0;
>  }
>
> -static int json_init(struct ulogd_pluginstance *upi)
> +static int json_init_file(struct ulogd_pluginstance *upi)
>  {
>         struct json_priv *op = (struct json_priv *) &upi->private;
> -       unsigned int i;
>
>         op->of = fopen(upi->config_kset->ces[0].u.string, "a");
>         if (!op->of) {
> @@ -280,6 +496,27 @@ static int json_init(struct ulogd_pluginstance *upi)
>                 return -1;
>         }
>
> +       return 0;
> +}
> +
> +static int json_init_socket(struct ulogd_pluginstance *upi)
> +{
> +       struct json_priv *op = (struct json_priv *) &upi->private;
> +
> +       if (host_ce(upi->config_kset).u.string == NULL)
> +               return -1;
> +       if (port_ce(upi->config_kset).u.string == NULL)
> +               return -1;
> +
> +       op->sock = -1;
> +       return _connect_socket(upi);
> +}
> +
> +static int json_init(struct ulogd_pluginstance *upi)
> +{
> +       struct json_priv *op = (struct json_priv *) &upi->private;
> +       unsigned int i;
> +
>         /* search for time */
>         op->sec_idx = -1;
>         op->usec_idx = -1;
> @@ -293,15 +530,32 @@ static int json_init(struct ulogd_pluginstance *upi)
>
>         *op->cached_tz = '\0';
>
> -       return 0;
> +       if (op->mode == JSON_MODE_FILE)
> +               return json_init_file(upi);
> +       else
> +               return json_init_socket(upi);
> +}
> +
> +static void close_file(FILE *of) {
> +       if (of != stdout)
> +               fclose(of);
> +}
> +
> +static void close_socket(struct json_priv *op) {
> +       if (op->sock != -1) {
> +               close(op->sock);
> +               op->sock = -1;
> +       }
>  }
>
>  static int json_fini(struct ulogd_pluginstance *pi)
>  {
>         struct json_priv *op = (struct json_priv *) &pi->private;
>
> -       if (op->of != stdout)
> -               fclose(op->of);
> +       if (op->mode == JSON_MODE_FILE)
> +               close_file(op->of);
> +       else
> +               close_socket(op);
>
>         return 0;
>  }
--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Andreas Jaggi May 30, 2018, 8:15 p.m. UTC | #4
Well spotted indeed :-)
Here comes the next iteration.

Cheers
Andreas


Extend the JSON output plugin so that the generated JSON stream can be
sent to a remote host via TCP/UDP or to a local unix socket.

Signed-off-by: Andreas Jaggi <andreas.jaggi@waterwave.ch>
---
 output/ulogd_output_JSON.c | 291 ++++++++++++++++++++++++++++++++++---
 ulogd.conf.in              |  11 ++
 2 files changed, 281 insertions(+), 21 deletions(-)

diff --git a/output/ulogd_output_JSON.c b/output/ulogd_output_JSON.c
index 4d8e3e9..6edfa90 100644
--- a/output/ulogd_output_JSON.c
+++ b/output/ulogd_output_JSON.c
@@ -20,10 +20,15 @@
 
 #include <stdio.h>
 #include <stdlib.h>
+#include <unistd.h>
 #include <string.h>
 #include <time.h>
 #include <errno.h>
 #include <inttypes.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netdb.h>
 #include <ulogd/ulogd.h>
 #include <ulogd/conffile.h>
 #include <jansson.h>
@@ -36,6 +41,10 @@
 #define ULOGD_JSON_DEFAULT_DEVICE "Netfilter"
 #endif
 
+#define host_ce(x)	(x->ces[JSON_CONF_HOST])
+#define port_ce(x)	(x->ces[JSON_CONF_PORT])
+#define mode_ce(x)	(x->ces[JSON_CONF_MODE])
+#define file_ce(x)	(x->ces[JSON_CONF_FILENAME])
 #define unlikely(x) __builtin_expect((x),0)
 
 struct json_priv {
@@ -44,6 +53,15 @@ struct json_priv {
 	int usec_idx;
 	long cached_gmtoff;
 	char cached_tz[6];	/* eg +0200 */
+	int mode;
+	int sock;
+};
+
+enum json_mode {
+	JSON_MODE_FILE = 0,
+	JSON_MODE_TCP,
+	JSON_MODE_UDP,
+	JSON_MODE_UNIX
 };
 
 enum json_conf {
@@ -53,6 +71,9 @@ enum json_conf {
 	JSON_CONF_EVENTV1,
 	JSON_CONF_DEVICE,
 	JSON_CONF_BOOLEAN_LABEL,
+	JSON_CONF_MODE,
+	JSON_CONF_HOST,
+	JSON_CONF_PORT,
 	JSON_CONF_MAX
 };
 
@@ -95,15 +116,167 @@ static struct config_keyset json_kset = {
 			.options = CONFIG_OPT_NONE,
 			.u = { .value = 0 },
 		},
+		[JSON_CONF_MODE] = {
+			.key = "mode",
+			.type = CONFIG_TYPE_STRING,
+			.options = CONFIG_OPT_NONE,
+			.u = { .string = "file" },
+		},
+		[JSON_CONF_HOST] = {
+			.key = "host",
+			.type = CONFIG_TYPE_STRING,
+			.options = CONFIG_OPT_NONE,
+			.u = { .string = "127.0.0.1" },
+		},
+		[JSON_CONF_PORT] = {
+			.key = "port",
+			.type = CONFIG_TYPE_STRING,
+			.options = CONFIG_OPT_NONE,
+			.u = { .string = "12345" },
+		},
 	},
 };
 
+static void close_socket(struct json_priv *op) {
+	if (op->sock != -1) {
+		close(op->sock);
+		op->sock = -1;
+	}
+}
+
+static int _connect_socket_unix(struct ulogd_pluginstance *pi)
+{
+	struct json_priv *op = (struct json_priv *) &pi->private;
+	struct sockaddr_un u_addr;
+	int sfd;
+
+	close_socket(op);
+
+	ulogd_log(ULOGD_DEBUG, "connecting to unix:%s\n",
+		  file_ce(pi->config_kset).u.string);
+
+	sfd = socket(AF_UNIX, SOCK_STREAM, 0);
+	if (sfd == -1) {
+		return -1;
+	}
+	u_addr.sun_family = AF_UNIX;
+	strncpy(u_addr.sun_path, file_ce(pi->config_kset).u.string,
+		sizeof(u_addr.sun_path) - 1);
+	if (connect(sfd, (struct sockaddr *) &u_addr, sizeof(struct sockaddr_un)) == -1) {
+		close(sfd);
+		return -1;
+	}
+
+	op->sock = sfd;
+
+	return 0;
+}
+
+static int _connect_socket_net(struct ulogd_pluginstance *pi)
+{
+	struct json_priv *op = (struct json_priv *) &pi->private;
+	struct addrinfo hints;
+	struct addrinfo *result, *rp;
+	int sfd, s;
+
+	close_socket(op);
+
+	ulogd_log(ULOGD_DEBUG, "connecting to %s:%s\n",
+		  host_ce(pi->config_kset).u.string,
+		  port_ce(pi->config_kset).u.string);
+
+	memset(&hints, 0, sizeof(struct addrinfo));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = op->mode == JSON_MODE_UDP ? SOCK_DGRAM : SOCK_STREAM;
+	hints.ai_protocol = 0;
+	hints.ai_flags = 0;
+
+	s = getaddrinfo(host_ce(pi->config_kset).u.string,
+			port_ce(pi->config_kset).u.string, &hints, &result);
+	if (s != 0) {
+		ulogd_log(ULOGD_ERROR, "getaddrinfo: %s\n", gai_strerror(s));
+		return -1;
+	}
+
+	for (rp = result; rp != NULL; rp = rp->ai_next) {
+		int on = 1;
+
+		sfd = socket(rp->ai_family, rp->ai_socktype,
+				rp->ai_protocol);
+		if (sfd == -1)
+			continue;
+
+		setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR,
+			   (char *) &on, sizeof(on));
+
+		if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1)
+			break;
+
+		close(sfd);
+	}
+
+	freeaddrinfo(result);
+
+	if (rp == NULL) {
+		return -1;
+	}
+
+	op->sock = sfd;
+
+	return 0;
+}
+
+static int _connect_socket(struct ulogd_pluginstance *pi)
+{
+	struct json_priv *op = (struct json_priv *) &pi->private;
+
+	if (op->mode == JSON_MODE_UNIX)
+		return _connect_socket_unix(pi);
+	else
+		return _connect_socket_net(pi);
+}
+
+static int json_interp_socket(struct ulogd_pluginstance *upi, char *buf, int buflen)
+{
+	struct json_priv *opi = (struct json_priv *) &upi->private;
+	int ret = 0;
+
+	if (opi->sock != -1)
+		ret = send(opi->sock, buf, buflen, MSG_NOSIGNAL);
+	free(buf);
+	if (ret != buflen) {
+		ulogd_log(ULOGD_ERROR, "Failure sending message: %s\n",
+			  strerror(errno));
+		if (ret == -1 || opi->sock == -1)
+			return _connect_socket(upi);
+		else
+			return ULOGD_IRET_ERR;
+	}
+
+	return ULOGD_IRET_OK;
+}
+
+static int json_interp_file(struct ulogd_pluginstance *upi, char *buf)
+{
+	struct json_priv *opi = (struct json_priv *) &upi->private;
+
+	fprintf(opi->of, "%s", buf);
+	free(buf);
+
+	if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
+		fflush(opi->of);
+
+	return ULOGD_IRET_OK;
+}
+
 #define MAX_LOCAL_TIME_STRING 38
 
 static int json_interp(struct ulogd_pluginstance *upi)
 {
 	struct json_priv *opi = (struct json_priv *) &upi->private;
 	unsigned int i;
+	char *buf;
+	int buflen;
 	json_t *msg;
 
 	msg = json_object();
@@ -218,34 +391,65 @@ static int json_interp(struct ulogd_pluginstance *upi)
 		}
 	}
 
-	json_dumpf(msg, opi->of, 0);
-	fprintf(opi->of, "\n");
 
+	buf = json_dumps(msg, 0);
 	json_decref(msg);
+	if (buf == NULL) {
+		ulogd_log(ULOGD_ERROR, "Could not create message\n");
+		return ULOGD_IRET_ERR;
+	}
+	buflen = strlen(buf);
+	buf = realloc(buf, sizeof(char)*(buflen+2));
+	if (buf == NULL) {
+		ulogd_log(ULOGD_ERROR, "Could not create message\n");
+		return ULOGD_IRET_ERR;
+	}
+	strncat(buf, "\n", 1);
+	buflen++;
 
-	if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
-		fflush(opi->of);
+	if (opi->mode == JSON_MODE_FILE)
+		return json_interp_file(upi, buf);
+	else
+		return json_interp_socket(upi, buf, buflen);
+}
 
-	return ULOGD_IRET_OK;
+static void reopen_file(struct ulogd_pluginstance *upi)
+{
+	struct json_priv *oi = (struct json_priv *) &upi->private;
+	FILE *old = oi->of;
+
+	ulogd_log(ULOGD_NOTICE, "JSON: reopening logfile\n");
+	oi->of = fopen(upi->config_kset->ces[0].u.string, "a");
+	if (!oi->of) {
+		ulogd_log(ULOGD_ERROR, "can't open JSON "
+				       "log file: %s\n",
+			  strerror(errno));
+		oi->of = old;
+	} else {
+		fclose(old);
+	}
+}
+
+static void reopen_socket(struct ulogd_pluginstance *upi)
+{
+	ulogd_log(ULOGD_NOTICE, "JSON: reopening socket\n");
+	if (_connect_socket(upi) < 0) {
+		ulogd_log(ULOGD_ERROR, "can't open JSON "
+				       "socket: %s\n",
+			  strerror(errno));
+	}
 }
 
 static void sighup_handler_print(struct ulogd_pluginstance *upi, int signal)
 {
 	struct json_priv *oi = (struct json_priv *) &upi->private;
-	FILE *old = oi->of;
 
 	switch (signal) {
 	case SIGHUP:
-		ulogd_log(ULOGD_NOTICE, "JSON: reopening logfile\n");
-		oi->of = fopen(upi->config_kset->ces[0].u.string, "a");
-		if (!oi->of) {
-			ulogd_log(ULOGD_ERROR, "can't open JSON "
-					       "log file: %s\n",
-				  strerror(errno));
-			oi->of = old;
-		} else {
-			fclose(old);
-		}
+		if (oi->mode == JSON_MODE_FILE)
+			reopen_file(upi);
+		else
+			reopen_socket(upi);
 		break;
 	default:
 		break;
@@ -255,6 +459,8 @@ static void sighup_handler_print(struct ulogd_pluginstance *upi, int signal)
 static int json_configure(struct ulogd_pluginstance *upi,
 			    struct ulogd_pluginstance_stack *stack)
 {
+	struct json_priv *op = (struct json_priv *) &upi->private;
+	char *mode_str = mode_ce(upi->config_kset).u.string;
 	int ret;
 
 	ret = ulogd_wildcard_inputkeys(upi);
@@ -265,13 +471,25 @@ static int json_configure(struct ulogd_pluginstance *upi,
 	if (ret < 0)
 		return ret;
 
+	if (!strcasecmp(mode_str, "udp")) {
+		op->mode = JSON_MODE_UDP;
+	} else if (!strcasecmp(mode_str, "tcp")) {
+		op->mode = JSON_MODE_TCP;
+	} else if (!strcasecmp(mode_str, "unix")) {
+		op->mode = JSON_MODE_UNIX;
+	} else if (!strcasecmp(mode_str, "file")) {
+		op->mode = JSON_MODE_FILE;
+	} else {
+		ulogd_log(ULOGD_ERROR, "unknown mode '%s'\n", mode_str);
+		return -EINVAL;
+	}
+
 	return 0;
 }
 
-static int json_init(struct ulogd_pluginstance *upi)
+static int json_init_file(struct ulogd_pluginstance *upi)
 {
 	struct json_priv *op = (struct json_priv *) &upi->private;
-	unsigned int i;
 
 	op->of = fopen(upi->config_kset->ces[0].u.string, "a");
 	if (!op->of) {
@@ -280,6 +498,27 @@ static int json_init(struct ulogd_pluginstance *upi)
 		return -1;
 	}
 
+	return 0;
+}
+
+static int json_init_socket(struct ulogd_pluginstance *upi)
+{
+	struct json_priv *op = (struct json_priv *) &upi->private;
+
+	if (host_ce(upi->config_kset).u.string == NULL)
+		return -1;
+	if (port_ce(upi->config_kset).u.string == NULL)
+		return -1;
+
+	op->sock = -1;
+	return _connect_socket(upi);
+}
+
+static int json_init(struct ulogd_pluginstance *upi)
+{
+	struct json_priv *op = (struct json_priv *) &upi->private;
+	unsigned int i;
+
 	/* search for time */
 	op->sec_idx = -1;
 	op->usec_idx = -1;
@@ -293,15 +532,25 @@ static int json_init(struct ulogd_pluginstance *upi)
 
 	*op->cached_tz = '\0';
 
-	return 0;
+	if (op->mode == JSON_MODE_FILE)
+		return json_init_file(upi);
+	else
+		return json_init_socket(upi);
+}
+
+static void close_file(FILE *of) {
+	if (of != stdout)
+		fclose(of);
 }
 
 static int json_fini(struct ulogd_pluginstance *pi)
 {
 	struct json_priv *op = (struct json_priv *) &pi->private;
 
-	if (op->of != stdout)
-		fclose(op->of);
+	if (op->mode == JSON_MODE_FILE)
+		close_file(op->of);
+	else
+		close_socket(op);
 
 	return 0;
 }
diff --git a/ulogd.conf.in b/ulogd.conf.in
index 2fcf39a..e94a3a2 100644
--- a/ulogd.conf.in
+++ b/ulogd.conf.in
@@ -212,6 +212,17 @@ sync=1
 # Uncomment the following line to use JSON v1 event format that
 # can provide better compatility with some JSON file reader.
 #eventv1=1
+# Uncomment the following lines to send the JSON logs to a remote host via UDP
+#mode="udp"
+#host="192.0.2.10"
+#port="10210"
+# Uncomment the following lines to send the JSON logs to a remote host via TCP
+#mode="tcp"
+#host="192.0.2.10"
+#port="10210"
+# Uncomment the following lines to send the JSON logs to a local unix socket
+#mode="unix"
+#file="/var/run/ulogd.socket"
 
 [pcap1]
 #default file is /var/log/ulogd.pcap
Pablo Neira Ayuso June 1, 2018, 7:27 a.m. UTC | #5
On Wed, May 30, 2018 at 10:15:36PM +0200, Andreas Jaggi wrote:
> Well spotted indeed :-)
> Here comes the next iteration.
> 
> Cheers
> Andreas
> 
> 
> Extend the JSON output plugin so that the generated JSON stream can be
> sent to a remote host via TCP/UDP or to a local unix socket.

Applied, thanks Andreas.
--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox series

Patch

diff --git a/output/ulogd_output_JSON.c b/output/ulogd_output_JSON.c
index 4d8e3e9..f2ef96d 100644
--- a/output/ulogd_output_JSON.c
+++ b/output/ulogd_output_JSON.c
@@ -24,6 +24,10 @@ 
 #include <time.h>
 #include <errno.h>
 #include <inttypes.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netdb.h>
 #include <ulogd/ulogd.h>
 #include <ulogd/conffile.h>
 #include <jansson.h>
@@ -36,6 +40,10 @@ 
 #define ULOGD_JSON_DEFAULT_DEVICE "Netfilter"
 #endif
 
+#define host_ce(x)	(x->ces[JSON_CONF_HOST])
+#define port_ce(x)	(x->ces[JSON_CONF_PORT])
+#define mode_ce(x)	(x->ces[JSON_CONF_MODE])
+#define file_ce(x)	(x->ces[JSON_CONF_FILENAME])
 #define unlikely(x) __builtin_expect((x),0)
 
 struct json_priv {
@@ -44,6 +52,15 @@  struct json_priv {
 	int usec_idx;
 	long cached_gmtoff;
 	char cached_tz[6];	/* eg +0200 */
+	int mode;
+	int sock;
+};
+
+enum json_mode {
+	JSON_MODE_FILE = 0,
+	JSON_MODE_TCP,
+	JSON_MODE_UDP,
+	JSON_MODE_UNIX
 };
 
 enum json_conf {
@@ -53,6 +70,9 @@  enum json_conf {
 	JSON_CONF_EVENTV1,
 	JSON_CONF_DEVICE,
 	JSON_CONF_BOOLEAN_LABEL,
+	JSON_CONF_MODE,
+	JSON_CONF_HOST,
+	JSON_CONF_PORT,
 	JSON_CONF_MAX
 };
 
@@ -95,15 +115,108 @@  static struct config_keyset json_kset = {
 			.options = CONFIG_OPT_NONE,
 			.u = { .value = 0 },
 		},
+		[JSON_CONF_MODE] = {
+			.key = "mode",
+			.type = CONFIG_TYPE_STRING,
+			.options = CONFIG_OPT_NONE,
+			.u = { .string = "file" },
+		},
+		[JSON_CONF_HOST] = {
+			.key = "host",
+			.type = CONFIG_TYPE_STRING,
+			.options = CONFIG_OPT_NONE,
+			.u = { .string = "127.0.0.1" },
+		},
+		[JSON_CONF_PORT] = {
+			.key = "port",
+			.type = CONFIG_TYPE_STRING,
+			.options = CONFIG_OPT_NONE,
+			.u = { .string = "12345" },
+		},
 	},
 };
 
+static int _connect_socket(struct ulogd_pluginstance *pi)
+{
+	struct json_priv *op = (struct json_priv *) &pi->private;
+	struct addrinfo hints;
+	struct addrinfo *result, *rp;
+	struct sockaddr_un u_addr;
+	int sfd, s;
+
+	if ( op->sock != -1 ) {
+		close(op->sock);
+		op->sock = -1;
+	}
+	if ( op->mode == JSON_MODE_UNIX ) {
+		ulogd_log(ULOGD_DEBUG, "connecting to unix:%s\n", file_ce(pi->config_kset).u.string);
+
+		sfd = socket(AF_UNIX, SOCK_STREAM, 0);
+		if (sfd == -1 ) {
+			ulogd_log(ULOGD_ERROR, "Could not connect\n");
+			return -1;
+		}
+		u_addr.sun_family = AF_UNIX;
+		strncpy(u_addr.sun_path, file_ce(pi->config_kset).u.string, sizeof(u_addr.sun_path) - 1);
+		if ( connect(sfd, (struct sockaddr *) &u_addr, sizeof(struct sockaddr_un)) == -1 ) {
+			ulogd_log(ULOGD_ERROR, "Could not connect\n");
+			close(sfd);
+			return -1;
+		}
+	} else {
+		ulogd_log(ULOGD_DEBUG, "connecting to %s:%s\n", host_ce(pi->config_kset).u.string, port_ce(pi->config_kset).u.string);
+
+		memset(&hints, 0, sizeof(struct addrinfo));
+		hints.ai_family = AF_UNSPEC;
+		hints.ai_socktype = op->mode == JSON_MODE_UDP ? SOCK_DGRAM : SOCK_STREAM;
+		hints.ai_protocol = 0;
+		hints.ai_flags = 0;
+
+		s = getaddrinfo(host_ce(pi->config_kset).u.string, port_ce(pi->config_kset).u.string, &hints, &result);
+		if (s != 0) {
+			ulogd_log(ULOGD_ERROR, "getaddrinfo: %s\n", gai_strerror(s));
+			return -1;
+		}
+
+		for (rp = result; rp != NULL; rp = rp->ai_next) {
+			int on = 1;
+
+			sfd = socket(rp->ai_family, rp->ai_socktype,
+					rp->ai_protocol);
+			if (sfd == -1)
+				continue;
+
+			setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR,
+				   (char *) &on, sizeof(on));
+
+			if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1)
+				break;
+
+			close(sfd);
+		}
+
+		freeaddrinfo(result);
+
+		if (rp == NULL) {
+			ulogd_log(ULOGD_ERROR, "Could not connect\n");
+			return -1;
+		}
+	}
+
+	op->sock = sfd;
+
+	return 0;
+}
+
 #define MAX_LOCAL_TIME_STRING 38
 
 static int json_interp(struct ulogd_pluginstance *upi)
 {
 	struct json_priv *opi = (struct json_priv *) &upi->private;
 	unsigned int i;
+	char *buf;
+	size_t buflen;
+	int ret;
 	json_t *msg;
 
 	msg = json_object();
@@ -218,13 +331,41 @@  static int json_interp(struct ulogd_pluginstance *upi)
 		}
 	}
 
-	json_dumpf(msg, opi->of, 0);
-	fprintf(opi->of, "\n");
 
+	buf = json_dumps(msg, 0);
 	json_decref(msg);
-
-	if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
-		fflush(opi->of);
+	if (buf == NULL) {
+		ulogd_log(ULOGD_ERROR, "Could not create message\n");
+		return ULOGD_IRET_ERR;
+	}
+	buflen = strlen(buf);
+	buf = realloc(buf, sizeof(char)*(buflen+2));
+	if (buf == NULL) {
+		ulogd_log(ULOGD_ERROR, "Could not create message\n");
+		return ULOGD_IRET_ERR;
+	}
+	strncat(buf, "\n", 1);
+	buflen++;
+
+	if ( opi->mode == JSON_MODE_FILE ) {
+		fprintf(opi->of, "%s", buf);
+		free(buf);
+		if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
+			fflush(opi->of);
+	} else {
+		if ( opi->sock != -1 ) {
+			ret = send(opi->sock, buf, buflen, MSG_NOSIGNAL);
+		}
+		free(buf);
+		if (ret != buflen) {
+			ulogd_log(ULOGD_ERROR, "Failure sending message: %s\n", strerror(errno));
+			if (ret == -1 || opi->sock == -1) {
+				return _connect_socket(upi);
+			} else {
+				return ULOGD_IRET_ERR;
+			}
+		}
+	}
 
 	return ULOGD_IRET_OK;
 }
@@ -236,15 +377,24 @@  static void sighup_handler_print(struct ulogd_pluginstance *upi, int signal)
 
 	switch (signal) {
 	case SIGHUP:
-		ulogd_log(ULOGD_NOTICE, "JSON: reopening logfile\n");
-		oi->of = fopen(upi->config_kset->ces[0].u.string, "a");
-		if (!oi->of) {
-			ulogd_log(ULOGD_ERROR, "can't open JSON "
-					       "log file: %s\n",
-				  strerror(errno));
-			oi->of = old;
+		if ( oi->mode == JSON_MODE_FILE ) {
+			ulogd_log(ULOGD_NOTICE, "JSON: reopening logfile\n");
+			oi->of = fopen(upi->config_kset->ces[0].u.string, "a");
+			if (!oi->of) {
+				ulogd_log(ULOGD_ERROR, "can't open JSON "
+						       "log file: %s\n",
+					  strerror(errno));
+				oi->of = old;
+			} else {
+				fclose(old);
+			}
 		} else {
-			fclose(old);
+			ulogd_log(ULOGD_NOTICE, "JSON: reopening socket\n");
+			if ( _connect_socket(upi) < 0) {
+				ulogd_log(ULOGD_ERROR, "can't open JSON "
+						       "socket: %s\n",
+					  strerror(errno));
+			}
 		}
 		break;
 	default:
@@ -255,6 +405,8 @@  static void sighup_handler_print(struct ulogd_pluginstance *upi, int signal)
 static int json_configure(struct ulogd_pluginstance *upi,
 			    struct ulogd_pluginstance_stack *stack)
 {
+	struct json_priv *op = (struct json_priv *) &upi->private;
+	char *mode_str = mode_ce(upi->config_kset).u.string;
 	int ret;
 
 	ret = ulogd_wildcard_inputkeys(upi);
@@ -265,6 +417,19 @@  static int json_configure(struct ulogd_pluginstance *upi,
 	if (ret < 0)
 		return ret;
 
+	if (!strcasecmp(mode_str, "udp")) {
+		op->mode = JSON_MODE_UDP;
+	} else if (!strcasecmp(mode_str, "tcp")) {
+		op->mode = JSON_MODE_TCP;
+	} else if (!strcasecmp(mode_str, "unix")) {
+		op->mode = JSON_MODE_UNIX;
+	} else if (!strcasecmp(mode_str, "file")) {
+		op->mode = JSON_MODE_FILE;
+	} else {
+		ulogd_log(ULOGD_ERROR, "unknown mode '%s'\n", mode_str);
+		return -EINVAL;
+	}
+
 	return 0;
 }
 
@@ -273,13 +438,6 @@  static int json_init(struct ulogd_pluginstance *upi)
 	struct json_priv *op = (struct json_priv *) &upi->private;
 	unsigned int i;
 
-	op->of = fopen(upi->config_kset->ces[0].u.string, "a");
-	if (!op->of) {
-		ulogd_log(ULOGD_FATAL, "can't open JSON log file: %s\n",
-			strerror(errno));
-		return -1;
-	}
-
 	/* search for time */
 	op->sec_idx = -1;
 	op->usec_idx = -1;
@@ -293,6 +451,22 @@  static int json_init(struct ulogd_pluginstance *upi)
 
 	*op->cached_tz = '\0';
 
+	if ( op->mode == JSON_MODE_FILE ) {
+		op->of = fopen(upi->config_kset->ces[0].u.string, "a");
+		if (!op->of) {
+			ulogd_log(ULOGD_FATAL, "can't open JSON log file: %s\n",
+				strerror(errno));
+			return -1;
+		}
+	} else {
+		if ( host_ce(upi->config_kset).u.string == NULL )
+			return -1;
+		if ( port_ce(upi->config_kset).u.string == NULL)
+			return -1;
+		op->sock = -1;
+		return _connect_socket(upi);
+	}
+
 	return 0;
 }
 
@@ -300,8 +474,15 @@  static int json_fini(struct ulogd_pluginstance *pi)
 {
 	struct json_priv *op = (struct json_priv *) &pi->private;
 
-	if (op->of != stdout)
-		fclose(op->of);
+	if ( op->mode == JSON_MODE_FILE ) {
+		if (op->of != stdout)
+			fclose(op->of);
+	} else {
+		if ( op->sock != -1 ) {
+			close(op->sock);
+			op->sock = -1;
+		}
+	}
 
 	return 0;
 }
diff --git a/ulogd.conf.in b/ulogd.conf.in
index 2fcf39a..e94a3a2 100644
--- a/ulogd.conf.in
+++ b/ulogd.conf.in
@@ -212,6 +212,17 @@  sync=1
 # Uncomment the following line to use JSON v1 event format that
 # can provide better compatility with some JSON file reader.
 #eventv1=1
+# Uncomment the following lines to send the JSON logs to a remote host via UDP
+#mode="udp"
+#host="192.0.2.10"
+#port="10210"
+# Uncomment the following lines to send the JSON logs to a remote host via TCP
+#mode="tcp"
+#host="192.0.2.10"
+#port="10210"
+# Uncomment the following lines to send the JSON logs to a local unix socket
+#mode="unix"
+#file="/var/run/ulogd.socket"
 
 [pcap1]
 #default file is /var/log/ulogd.pcap