ulogd: json: send messages to a remote host / unix socket

Message ID 8360D1AD-5F94-4E9B-8B61-045BB1FED10F@waterwave.ch
State Under Review
Delegated to: Pablo Neira
Headers show
Series
  • ulogd: json: send messages to a remote host / unix socket
Related show

Commit Message

Andreas Jaggi May 1, 2018, 12:16 p.m.
Extend the JSON output plugin so that the generated JSON stream can be
sent to a remote host via TCP/UDP or to a local unix socket.

Signed-off-by: Andreas Jaggi <andreas.jaggi@waterwave.ch>
---
 output/ulogd_output_JSON.c | 225 +++++++++++++++++++++++++++++++++----
 ulogd.conf.in              |  11 ++
 2 files changed, 214 insertions(+), 22 deletions(-)

Comments

Arturo Borrero Gonzalez May 9, 2018, 10:15 a.m. | #1
On 1 May 2018 at 14:16, Andreas Jaggi <andreas.jaggi@waterwave.ch> wrote:
> Extend the JSON output plugin so that the generated JSON stream can be
> sent to a remote host via TCP/UDP or to a local unix socket.
>
> Signed-off-by: Andreas Jaggi <andreas.jaggi@waterwave.ch>
> ---
>  output/ulogd_output_JSON.c | 225 +++++++++++++++++++++++++++++++++----
>  ulogd.conf.in              |  11 ++
>  2 files changed, 214 insertions(+), 22 deletions(-)
>

HI Andreas, thanks for working on this.

Some review below.

> +static int _connect_socket(struct ulogd_pluginstance *pi)
> +{
> +       struct json_priv *op = (struct json_priv *) &pi->private;
> +       struct addrinfo hints;
> +       struct addrinfo *result, *rp;
> +       struct sockaddr_un u_addr;
> +       int sfd, s;
> +
> +       if ( op->sock != -1 ) {
> +               close(op->sock);
> +               op->sock = -1;
> +       }
> +       if ( op->mode == JSON_MODE_UNIX ) {
> +               ulogd_log(ULOGD_DEBUG, "connecting to unix:%s\n", file_ce(pi->config_kset).u.string);
> +
> +               sfd = socket(AF_UNIX, SOCK_STREAM, 0);
> +               if (sfd == -1 ) {
> +                       ulogd_log(ULOGD_ERROR, "Could not connect\n");
> +                       return -1;
> +               }
> +               u_addr.sun_family = AF_UNIX;
> +               strncpy(u_addr.sun_path, file_ce(pi->config_kset).u.string, sizeof(u_addr.sun_path) - 1);
> +               if ( connect(sfd, (struct sockaddr *) &u_addr, sizeof(struct sockaddr_un)) == -1 ) {
> +                       ulogd_log(ULOGD_ERROR, "Could not connect\n");
> +                       close(sfd);
> +                       return -1;
> +               }
> +       } else {
> +               ulogd_log(ULOGD_DEBUG, "connecting to %s:%s\n", host_ce(pi->config_kset).u.string, port_ce(pi->config_kset).u.string);
> +
> +               memset(&hints, 0, sizeof(struct addrinfo));
> +               hints.ai_family = AF_UNSPEC;
> +               hints.ai_socktype = op->mode == JSON_MODE_UDP ? SOCK_DGRAM : SOCK_STREAM;
> +               hints.ai_protocol = 0;
> +               hints.ai_flags = 0;
> +
> +               s = getaddrinfo(host_ce(pi->config_kset).u.string, port_ce(pi->config_kset).u.string, &hints, &result);
> +               if (s != 0) {
> +                       ulogd_log(ULOGD_ERROR, "getaddrinfo: %s\n", gai_strerror(s));
> +                       return -1;
> +               }
> +
> +               for (rp = result; rp != NULL; rp = rp->ai_next) {
> +                       int on = 1;
> +
> +                       sfd = socket(rp->ai_family, rp->ai_socktype,
> +                                       rp->ai_protocol);
> +                       if (sfd == -1)
> +                               continue;
> +
> +                       setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR,
> +                                  (char *) &on, sizeof(on));
> +
> +                       if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1)
> +                               break;
> +
> +                       close(sfd);
> +               }
> +
> +               freeaddrinfo(result);
> +
> +               if (rp == NULL) {
> +                       ulogd_log(ULOGD_ERROR, "Could not connect\n");
> +                       return -1;
> +               }
> +       }
> +
> +       op->sock = sfd;
> +
> +       return 0;
> +}
> +

^^^^

could we split the function above in smaller chunks?

something like _connect_socket_unix() and _connect_socket_net()


> @@ -218,13 +331,41 @@ static int json_interp(struct ulogd_pluginstance *upi)
>                 }
>         }
>
> -       json_dumpf(msg, opi->of, 0);
> -       fprintf(opi->of, "\n");
>
> +       buf = json_dumps(msg, 0);
>         json_decref(msg);
> -
> -       if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
> -               fflush(opi->of);
> +       if (buf == NULL) {
> +               ulogd_log(ULOGD_ERROR, "Could not create message\n");
> +               return ULOGD_IRET_ERR;
> +       }
> +       buflen = strlen(buf);
> +       buf = realloc(buf, sizeof(char)*(buflen+2));
> +       if (buf == NULL) {
> +               ulogd_log(ULOGD_ERROR, "Could not create message\n");
> +               return ULOGD_IRET_ERR;
> +       }
> +       strncat(buf, "\n", 1);
> +       buflen++;
> +
> +       if ( opi->mode == JSON_MODE_FILE ) {
> +               fprintf(opi->of, "%s", buf);
> +               free(buf);
> +               if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
> +                       fflush(opi->of);
> +       } else {
> +               if ( opi->sock != -1 ) {
> +                       ret = send(opi->sock, buf, buflen, MSG_NOSIGNAL);
> +               }
> +               free(buf);
> +               if (ret != buflen) {
> +                       ulogd_log(ULOGD_ERROR, "Failure sending message: %s\n", strerror(errno));
> +                       if (ret == -1 || opi->sock == -1) {
> +                               return _connect_socket(upi);
> +                       } else {
> +                               return ULOGD_IRET_ERR;
> +                       }
> +               }
> +       }

^^^^

Also here, I see we check several times if ( opi->mode == JSON_MODE_FILE ).
May I suggest to introduce smaller functions depending on the mode?

Something like:

if ( opi->mode == JSON_MODE_FILE )
   json_interp_file()
else
   json_interp_xxxx()

> @@ -293,6 +451,22 @@ static int json_init(struct ulogd_pluginstance *upi)
>
>         *op->cached_tz = '\0';
>
> +       if ( op->mode == JSON_MODE_FILE ) {
> +               op->of = fopen(upi->config_kset->ces[0].u.string, "a");
> +               if (!op->of) {
> +                       ulogd_log(ULOGD_FATAL, "can't open JSON log file: %s\n",
> +                               strerror(errno));
> +                       return -1;
> +               }
> +       } else {
> +               if ( host_ce(upi->config_kset).u.string == NULL )
> +                       return -1;
> +               if ( port_ce(upi->config_kset).u.string == NULL)
> +                       return -1;

^^^^

Please check the coding style.

Try "(this)" instead of "( this )" or even "( this)", i.e, no spaces
after '(', or before ')'.

>
> @@ -300,8 +474,15 @@ static int json_fini(struct ulogd_pluginstance *pi)
>  {
>         struct json_priv *op = (struct json_priv *) &pi->private;
>
> -       if (op->of != stdout)
> -               fclose(op->of);
> +       if ( op->mode == JSON_MODE_FILE ) {
> +               if (op->of != stdout)
> +                       fclose(op->of);
> +       } else {
> +               if ( op->sock != -1 ) {
> +                       close(op->sock);
> +                       op->sock = -1;
> +               }
> +       }
>

^^^^

same, something like:

if JSON_MODE_FILE
   close_file()
else
   close_socket()

>         return 0;
>  }
> diff --git a/ulogd.conf.in b/ulogd.conf.in
> index 2fcf39a..e94a3a2 100644
> --- a/ulogd.conf.in
> +++ b/ulogd.conf.in
> @@ -212,6 +212,17 @@ sync=1
>  # Uncomment the following line to use JSON v1 event format that
>  # can provide better compatility with some JSON file reader.
>  #eventv1=1
> +# Uncomment the following lines to send the JSON logs to a remote host via UDP
> +#mode="udp"
> +#host="192.0.2.10"
> +#port="10210"
> +# Uncomment the following lines to send the JSON logs to a remote host via TCP
> +#mode="tcp"
> +#host="192.0.2.10"
> +#port="10210"
> +# Uncomment the following lines to send the JSON logs to a local unix socket
> +#mode="unix"
> +#file="/var/run/ulogd.socket"

This new feature looks great! thanks again.
--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Patch

diff --git a/output/ulogd_output_JSON.c b/output/ulogd_output_JSON.c
index 4d8e3e9..f2ef96d 100644
--- a/output/ulogd_output_JSON.c
+++ b/output/ulogd_output_JSON.c
@@ -24,6 +24,10 @@ 
 #include <time.h>
 #include <errno.h>
 #include <inttypes.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netdb.h>
 #include <ulogd/ulogd.h>
 #include <ulogd/conffile.h>
 #include <jansson.h>
@@ -36,6 +40,10 @@ 
 #define ULOGD_JSON_DEFAULT_DEVICE "Netfilter"
 #endif
 
+#define host_ce(x)	(x->ces[JSON_CONF_HOST])
+#define port_ce(x)	(x->ces[JSON_CONF_PORT])
+#define mode_ce(x)	(x->ces[JSON_CONF_MODE])
+#define file_ce(x)	(x->ces[JSON_CONF_FILENAME])
 #define unlikely(x) __builtin_expect((x),0)
 
 struct json_priv {
@@ -44,6 +52,15 @@  struct json_priv {
 	int usec_idx;
 	long cached_gmtoff;
 	char cached_tz[6];	/* eg +0200 */
+	int mode;
+	int sock;
+};
+
+enum json_mode {
+	JSON_MODE_FILE = 0,
+	JSON_MODE_TCP,
+	JSON_MODE_UDP,
+	JSON_MODE_UNIX
 };
 
 enum json_conf {
@@ -53,6 +70,9 @@  enum json_conf {
 	JSON_CONF_EVENTV1,
 	JSON_CONF_DEVICE,
 	JSON_CONF_BOOLEAN_LABEL,
+	JSON_CONF_MODE,
+	JSON_CONF_HOST,
+	JSON_CONF_PORT,
 	JSON_CONF_MAX
 };
 
@@ -95,15 +115,108 @@  static struct config_keyset json_kset = {
 			.options = CONFIG_OPT_NONE,
 			.u = { .value = 0 },
 		},
+		[JSON_CONF_MODE] = {
+			.key = "mode",
+			.type = CONFIG_TYPE_STRING,
+			.options = CONFIG_OPT_NONE,
+			.u = { .string = "file" },
+		},
+		[JSON_CONF_HOST] = {
+			.key = "host",
+			.type = CONFIG_TYPE_STRING,
+			.options = CONFIG_OPT_NONE,
+			.u = { .string = "127.0.0.1" },
+		},
+		[JSON_CONF_PORT] = {
+			.key = "port",
+			.type = CONFIG_TYPE_STRING,
+			.options = CONFIG_OPT_NONE,
+			.u = { .string = "12345" },
+		},
 	},
 };
 
+static int _connect_socket(struct ulogd_pluginstance *pi)
+{
+	struct json_priv *op = (struct json_priv *) &pi->private;
+	struct addrinfo hints;
+	struct addrinfo *result, *rp;
+	struct sockaddr_un u_addr;
+	int sfd, s;
+
+	if ( op->sock != -1 ) {
+		close(op->sock);
+		op->sock = -1;
+	}
+	if ( op->mode == JSON_MODE_UNIX ) {
+		ulogd_log(ULOGD_DEBUG, "connecting to unix:%s\n", file_ce(pi->config_kset).u.string);
+
+		sfd = socket(AF_UNIX, SOCK_STREAM, 0);
+		if (sfd == -1 ) {
+			ulogd_log(ULOGD_ERROR, "Could not connect\n");
+			return -1;
+		}
+		u_addr.sun_family = AF_UNIX;
+		strncpy(u_addr.sun_path, file_ce(pi->config_kset).u.string, sizeof(u_addr.sun_path) - 1);
+		if ( connect(sfd, (struct sockaddr *) &u_addr, sizeof(struct sockaddr_un)) == -1 ) {
+			ulogd_log(ULOGD_ERROR, "Could not connect\n");
+			close(sfd);
+			return -1;
+		}
+	} else {
+		ulogd_log(ULOGD_DEBUG, "connecting to %s:%s\n", host_ce(pi->config_kset).u.string, port_ce(pi->config_kset).u.string);
+
+		memset(&hints, 0, sizeof(struct addrinfo));
+		hints.ai_family = AF_UNSPEC;
+		hints.ai_socktype = op->mode == JSON_MODE_UDP ? SOCK_DGRAM : SOCK_STREAM;
+		hints.ai_protocol = 0;
+		hints.ai_flags = 0;
+
+		s = getaddrinfo(host_ce(pi->config_kset).u.string, port_ce(pi->config_kset).u.string, &hints, &result);
+		if (s != 0) {
+			ulogd_log(ULOGD_ERROR, "getaddrinfo: %s\n", gai_strerror(s));
+			return -1;
+		}
+
+		for (rp = result; rp != NULL; rp = rp->ai_next) {
+			int on = 1;
+
+			sfd = socket(rp->ai_family, rp->ai_socktype,
+					rp->ai_protocol);
+			if (sfd == -1)
+				continue;
+
+			setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR,
+				   (char *) &on, sizeof(on));
+
+			if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1)
+				break;
+
+			close(sfd);
+		}
+
+		freeaddrinfo(result);
+
+		if (rp == NULL) {
+			ulogd_log(ULOGD_ERROR, "Could not connect\n");
+			return -1;
+		}
+	}
+
+	op->sock = sfd;
+
+	return 0;
+}
+
 #define MAX_LOCAL_TIME_STRING 38
 
 static int json_interp(struct ulogd_pluginstance *upi)
 {
 	struct json_priv *opi = (struct json_priv *) &upi->private;
 	unsigned int i;
+	char *buf;
+	size_t buflen;
+	int ret;
 	json_t *msg;
 
 	msg = json_object();
@@ -218,13 +331,41 @@  static int json_interp(struct ulogd_pluginstance *upi)
 		}
 	}
 
-	json_dumpf(msg, opi->of, 0);
-	fprintf(opi->of, "\n");
 
+	buf = json_dumps(msg, 0);
 	json_decref(msg);
-
-	if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
-		fflush(opi->of);
+	if (buf == NULL) {
+		ulogd_log(ULOGD_ERROR, "Could not create message\n");
+		return ULOGD_IRET_ERR;
+	}
+	buflen = strlen(buf);
+	buf = realloc(buf, sizeof(char)*(buflen+2));
+	if (buf == NULL) {
+		ulogd_log(ULOGD_ERROR, "Could not create message\n");
+		return ULOGD_IRET_ERR;
+	}
+	strncat(buf, "\n", 1);
+	buflen++;
+
+	if ( opi->mode == JSON_MODE_FILE ) {
+		fprintf(opi->of, "%s", buf);
+		free(buf);
+		if (upi->config_kset->ces[JSON_CONF_SYNC].u.value != 0)
+			fflush(opi->of);
+	} else {
+		if ( opi->sock != -1 ) {
+			ret = send(opi->sock, buf, buflen, MSG_NOSIGNAL);
+		}
+		free(buf);
+		if (ret != buflen) {
+			ulogd_log(ULOGD_ERROR, "Failure sending message: %s\n", strerror(errno));
+			if (ret == -1 || opi->sock == -1) {
+				return _connect_socket(upi);
+			} else {
+				return ULOGD_IRET_ERR;
+			}
+		}
+	}
 
 	return ULOGD_IRET_OK;
 }
@@ -236,15 +377,24 @@  static void sighup_handler_print(struct ulogd_pluginstance *upi, int signal)
 
 	switch (signal) {
 	case SIGHUP:
-		ulogd_log(ULOGD_NOTICE, "JSON: reopening logfile\n");
-		oi->of = fopen(upi->config_kset->ces[0].u.string, "a");
-		if (!oi->of) {
-			ulogd_log(ULOGD_ERROR, "can't open JSON "
-					       "log file: %s\n",
-				  strerror(errno));
-			oi->of = old;
+		if ( oi->mode == JSON_MODE_FILE ) {
+			ulogd_log(ULOGD_NOTICE, "JSON: reopening logfile\n");
+			oi->of = fopen(upi->config_kset->ces[0].u.string, "a");
+			if (!oi->of) {
+				ulogd_log(ULOGD_ERROR, "can't open JSON "
+						       "log file: %s\n",
+					  strerror(errno));
+				oi->of = old;
+			} else {
+				fclose(old);
+			}
 		} else {
-			fclose(old);
+			ulogd_log(ULOGD_NOTICE, "JSON: reopening socket\n");
+			if ( _connect_socket(upi) < 0) {
+				ulogd_log(ULOGD_ERROR, "can't open JSON "
+						       "socket: %s\n",
+					  strerror(errno));
+			}
 		}
 		break;
 	default:
@@ -255,6 +405,8 @@  static void sighup_handler_print(struct ulogd_pluginstance *upi, int signal)
 static int json_configure(struct ulogd_pluginstance *upi,
 			    struct ulogd_pluginstance_stack *stack)
 {
+	struct json_priv *op = (struct json_priv *) &upi->private;
+	char *mode_str = mode_ce(upi->config_kset).u.string;
 	int ret;
 
 	ret = ulogd_wildcard_inputkeys(upi);
@@ -265,6 +417,19 @@  static int json_configure(struct ulogd_pluginstance *upi,
 	if (ret < 0)
 		return ret;
 
+	if (!strcasecmp(mode_str, "udp")) {
+		op->mode = JSON_MODE_UDP;
+	} else if (!strcasecmp(mode_str, "tcp")) {
+		op->mode = JSON_MODE_TCP;
+	} else if (!strcasecmp(mode_str, "unix")) {
+		op->mode = JSON_MODE_UNIX;
+	} else if (!strcasecmp(mode_str, "file")) {
+		op->mode = JSON_MODE_FILE;
+	} else {
+		ulogd_log(ULOGD_ERROR, "unknown mode '%s'\n", mode_str);
+		return -EINVAL;
+	}
+
 	return 0;
 }
 
@@ -273,13 +438,6 @@  static int json_init(struct ulogd_pluginstance *upi)
 	struct json_priv *op = (struct json_priv *) &upi->private;
 	unsigned int i;
 
-	op->of = fopen(upi->config_kset->ces[0].u.string, "a");
-	if (!op->of) {
-		ulogd_log(ULOGD_FATAL, "can't open JSON log file: %s\n",
-			strerror(errno));
-		return -1;
-	}
-
 	/* search for time */
 	op->sec_idx = -1;
 	op->usec_idx = -1;
@@ -293,6 +451,22 @@  static int json_init(struct ulogd_pluginstance *upi)
 
 	*op->cached_tz = '\0';
 
+	if ( op->mode == JSON_MODE_FILE ) {
+		op->of = fopen(upi->config_kset->ces[0].u.string, "a");
+		if (!op->of) {
+			ulogd_log(ULOGD_FATAL, "can't open JSON log file: %s\n",
+				strerror(errno));
+			return -1;
+		}
+	} else {
+		if ( host_ce(upi->config_kset).u.string == NULL )
+			return -1;
+		if ( port_ce(upi->config_kset).u.string == NULL)
+			return -1;
+		op->sock = -1;
+		return _connect_socket(upi);
+	}
+
 	return 0;
 }
 
@@ -300,8 +474,15 @@  static int json_fini(struct ulogd_pluginstance *pi)
 {
 	struct json_priv *op = (struct json_priv *) &pi->private;
 
-	if (op->of != stdout)
-		fclose(op->of);
+	if ( op->mode == JSON_MODE_FILE ) {
+		if (op->of != stdout)
+			fclose(op->of);
+	} else {
+		if ( op->sock != -1 ) {
+			close(op->sock);
+			op->sock = -1;
+		}
+	}
 
 	return 0;
 }
diff --git a/ulogd.conf.in b/ulogd.conf.in
index 2fcf39a..e94a3a2 100644
--- a/ulogd.conf.in
+++ b/ulogd.conf.in
@@ -212,6 +212,17 @@  sync=1
 # Uncomment the following line to use JSON v1 event format that
 # can provide better compatility with some JSON file reader.
 #eventv1=1
+# Uncomment the following lines to send the JSON logs to a remote host via UDP
+#mode="udp"
+#host="192.0.2.10"
+#port="10210"
+# Uncomment the following lines to send the JSON logs to a remote host via TCP
+#mode="tcp"
+#host="192.0.2.10"
+#port="10210"
+# Uncomment the following lines to send the JSON logs to a local unix socket
+#mode="unix"
+#file="/var/run/ulogd.socket"
 
 [pcap1]
 #default file is /var/log/ulogd.pcap