diff mbox

[RFC,1/4] backend: multi-client-socket

Message ID 1458825402-9592-2-git-send-email-b.reynal@virtualopensystems.com
State New
Headers show

Commit Message

Baptiste Reynal March 24, 2016, 1:16 p.m. UTC
This patch updates the multi-client-socket, adding the following
functionalities :
- both UNIX and network socket are now supported :
  -object multi-socket-backend,id=<id>,path=<socket_path>,listen=<on/off>
  -object multi-socket-backend,host=<host>,port=<port>,,listen=<on/off>
- the socket now uses NMMessage for communication. The old API is kept
  for backward compatibility
- multi_socket_send_and_block_to function added, to send a message and
  wait for the answer

Signed-off-by: Baptiste Reynal <b.reynal@virtualopensystems.com>
---
 backends/multi-socket.c     | 420 ++++++++++++++++++++++++++++++--------------
 include/qemu/multi-socket.h |  57 +++++-
 2 files changed, 334 insertions(+), 143 deletions(-)
diff mbox

Patch

diff --git a/backends/multi-socket.c b/backends/multi-socket.c
index 2cfbb50..daf06b7 100644
--- a/backends/multi-socket.c
+++ b/backends/multi-socket.c
@@ -12,25 +12,128 @@ 
 #include "qemu/multi-socket.h"
 #include "qemu/error-report.h"
 
-typedef struct MSHandler MSHandler;
-typedef struct MSRegHandler MSRegHandler;
+void multi_socket_add_reg_handler(MSBackend *backend,
+        void (*reg)(MSClient *client, void *opaque),
+        void *opaque)
+{
+    MSBackendClass *msc = MULTI_SOCKET_BACKEND_GET_CLASS(backend);
 
-struct MSHandler {
-    char *name;
-    void (*read)(MSClient *client, const char *message, void *opaque);
-    void *opaque;
+    if (msc->add_reg_handler) {
+        msc->add_reg_handler(backend, reg, opaque);
+    }
+}
 
-    QLIST_ENTRY(MSHandler) next;
-};
+void multi_socket_add_handler(MSBackend *backend, const char *name,
+        void (*read)(MSClient *client, const char *message, void *opaque),
+        void *opaque)
+{
+    MSBackendClass *msc = MULTI_SOCKET_BACKEND_GET_CLASS(backend);
 
-struct MSRegHandler {
-    void (*reg)(MSClient *client, void *opaque);
-    void *opaque;
+    if (strlen(name) > MS_CMD_SIZE) {
+        error_report("Command \"%s\" size is too big.", name);
+        return;
+    }
 
-    QLIST_ENTRY(MSRegHandler) next;
-};
+    if (msc->add_handler) {
+        msc->add_handler(backend, name, read, opaque);
+    }
+}
+
+int multi_socket_send_fds_to(MSClient *client, int *fds, int count,
+        const char *message, int size) {
+    int cmd_len, payload_len;
+    MSBackendClass *msc = MULTI_SOCKET_BACKEND_GET_CLASS(client->backend);
+    MSMessage ms_message;
+
+    cmd_len = strlen(message) + 1;
+    payload_len = size - cmd_len;
+
+    if (cmd_len > MS_CMD_SIZE ||
+            payload_len > MS_PAYLOAD_SIZE) {
+        error_report("Command \"%s\" size is too big.", message);
+    }
+
+    memcpy(ms_message.cmd, message, cmd_len);
+    memcpy(ms_message.payload, message + cmd_len, payload_len);
+
+    if (msc->send_fds_to) {
+        return msc->send_fds_to(client, fds, count, &ms_message);
+    } else {
+        return -1;
+    }
+}
+
+int multi_socket_write_to(MSClient *client, const char *message, int size)
+{
+    int cmd_len, payload_len;
+    MSBackendClass *msc = MULTI_SOCKET_BACKEND_GET_CLASS(client->backend);
+    MSMessage ms_message;
+
+    cmd_len = strlen(message) + 1;
+    payload_len = size - cmd_len;
+
+    msc = MULTI_SOCKET_BACKEND_GET_CLASS(client->backend);
+    if (cmd_len > MS_CMD_SIZE ||
+            payload_len > MS_PAYLOAD_SIZE) {
+        error_report("Command \"%s\" size is too big.", message);
+    }
+
+    memcpy(ms_message.cmd, message, cmd_len);
+    memcpy(ms_message.payload, message + cmd_len, payload_len);
+
+    if (msc->write_to) {
+        return msc->write_to(client, &ms_message);
+    } else {
+        return -1;
+    }
+}
+
+int multi_socket_write_message_to(MSClient *client, MSMessage *message)
+{
+    MSBackendClass *msc = MULTI_SOCKET_BACKEND_GET_CLASS(client->backend);
+
+    if (msc->write_to) {
+        return msc->write_to(client, message);
+    } else {
+        return -1;
+    }
+}
+
+int multi_socket_get_fds_num_from(MSClient *client)
+{
+    MSBackendClass *msc = MULTI_SOCKET_BACKEND_GET_CLASS(client->backend);
+
+    if (msc->get_fds_num_from) {
+        return msc->get_fds_num_from(client);
+    } else {
+        return -1;
+    }
+}
+
+int multi_socket_get_fds_from(MSClient *client, int *fds)
+{
+    MSBackendClass *msc = MULTI_SOCKET_BACKEND_GET_CLASS(client->backend);
+
+    if (msc->get_fds_from) {
+        return msc->get_fds_from(client, fds);
+    } else {
+        return -1;
+    }
+}
+
+int multi_socket_write_and_block_to(MSClient *client, MSMessage *message)
+{
+    MSBackendClass *msc = MULTI_SOCKET_BACKEND_GET_CLASS(client->backend);
+
+    if (msc->write_and_block_to) {
+        return msc->write_and_block_to(client, message);
+    } else {
+        return -1;
+    }
+}
 
-static void multi_socket_get_fds(MSClient *client, struct msghdr msg)
+typedef struct MSHandler MSHandler;
+static void ms_get_fds(MSClient *client, struct msghdr msg)
 {
     struct cmsghdr *cmsg;
 
@@ -58,51 +161,7 @@  static void multi_socket_get_fds(MSClient *client, struct msghdr msg)
     }
 }
 
-static gboolean
-multi_socket_read_handler(GIOChannel *channel, GIOCondition cond, void *opaque)
-{
-    MSClient *client = (MSClient *) opaque;
-    MSBackend *backend = client->backend;
-
-    char message[BUFFER_SIZE];
-    struct MSHandler *h;
-
-    struct msghdr msg = { NULL, };
-    struct iovec iov[1];
-    union {
-        struct cmsghdr cmsg;
-        char control[CMSG_SPACE(sizeof(int) * MAX_FDS)];
-    } msg_control;
-    int flags = 0;
-    ssize_t ret;
-
-    iov[0].iov_base = message;
-    iov[0].iov_len = BUFFER_SIZE;
-
-    msg.msg_iov = iov;
-    msg.msg_iovlen = 1;
-    msg.msg_control = &msg_control;
-    msg.msg_controllen = sizeof(msg_control);
-
-    ret = recvmsg(client->fd, &msg, flags);
-
-    if (ret > 0) {
-        multi_socket_get_fds(client, msg);
-
-        /* handler callback */
-        QLIST_FOREACH(h, &backend->handlers, next) {
-            if (!strncmp(h->name, message, strlen(h->name))) {
-                h->read(client, message + strlen(h->name) + 1, h->opaque);
-                return TRUE;
-            }
-        }
-        error_report("Unrecognized message: %s", message);
-    }
-
-    return FALSE;
-}
-
-void multi_socket_add_reg_handler(MSBackend *backend,
+static void ms_add_reg_handler(MSBackend *backend,
         void (*reg)(MSClient *client, void *opaque), void *opaque)
 {
     struct MSRegHandler *h;
@@ -115,44 +174,32 @@  void multi_socket_add_reg_handler(MSBackend *backend,
     QLIST_INSERT_HEAD(&backend->reg_handlers, h, next);
 }
 
-void multi_socket_add_handler(MSBackend *backend,
-        const char *name,
-        void (*read)(MSClient *c, const char *message, void *opaque),
+static void ms_add_handler(MSBackend *backend,
+        const char *cmd,
+        void (*read)(MSClient *client, const char *payload, void *opaque),
         void *opaque)
 {
-    struct MSHandler *h;
+    struct MSHandler *handler;
 
-    /* check that the handler name is not taken */
-    QLIST_FOREACH(h, &backend->handlers, next) {
-        if (!strcmp(h->name, name)) {
-            error_report("Handler %s already exists", name);
-            return;
+    /* check the command is available */
+    QLIST_FOREACH(handler, &backend->handlers, next) {
+        if (!strcmp(handler->cmd, cmd)) {
+                error_report("Handler %s already exists", cmd);
+                return;
         }
     }
 
-    h = g_malloc(sizeof(struct MSHandler));
+    handler = g_malloc(sizeof(struct MSHandler));
 
-    h->name = g_strdup(name);
-    h->read = read;
-    h->opaque = opaque;
+    handler->cmd = g_strdup(cmd);
+    handler->read = read;
+    handler->opaque = opaque;
 
-    QLIST_INSERT_HEAD(&backend->handlers, h, next);
+    QLIST_INSERT_HEAD(&backend->handlers, handler, next);
 }
 
-static void multi_socket_init_client(MSBackend *backend,
-        MSClient *client, int fd, GIOFunc handler)
-{
-    client->backend = backend;
-    client->fd = fd;
-    client->chan = g_io_channel_unix_new(fd);
-    client->tag = g_io_add_watch(client->chan, G_IO_IN, handler, client);
-
-    g_io_channel_set_encoding(client->chan, NULL, NULL);
-    g_io_channel_set_buffered(client->chan, FALSE);
-}
-
-int multi_socket_send_fds_to(MSClient *client, int *fds, int count,
-        const char *message, int size)
+static int ms_send_fds_to(MSClient *client, int *fds, int count,
+        MSMessage *message)
 {
     struct msghdr msgh;
     struct iovec iov;
@@ -165,9 +212,8 @@  int multi_socket_send_fds_to(MSClient *client, int *fds, int count,
     memset(&msgh, 0, sizeof(msgh));
     memset(control, 0, sizeof(control));
 
-    /* set the payload */
-    iov.iov_base = (uint8_t *) message;
-    iov.iov_len = size;
+    iov.iov_base = message;
+    iov.iov_len = sizeof(MSMessage);
 
     msgh.msg_iov = &iov;
     msgh.msg_iovlen = 1;
@@ -189,38 +235,92 @@  int multi_socket_send_fds_to(MSClient *client, int *fds, int count,
     return r;
 }
 
-int multi_socket_write_to(MSClient *client, const char *message, int size)
+static int ms_write_to(MSClient *client, MSMessage *message)
 {
-    return multi_socket_send_fds_to(client, 0, 0, message, size);
+    return ms_send_fds_to(client, 0, 0, message);
 }
 
-int multi_socket_get_fds_num_from(MSClient *client)
+static int ms_get_fds_num_from(MSClient *client)
 {
     return client->rcvfds_num;
 }
 
-int multi_socket_get_fds_from(MSClient *client, int *fds)
+static int ms_get_fds_from(MSClient *client, int *fds)
 {
     memcpy(fds, client->rcvfds, client->rcvfds_num * sizeof(int));
+    return ms_get_fds_num_from(client);
+}
 
-    return client->rcvfds_num;
+static gboolean
+ms_read(GIOChannel *channel, GIOCondition cond, void *opaque)
+{
+    MSClient *client = (MSClient *) opaque;
+    MSBackend *backend = client->backend;
+    MSMessage message;
+    MSHandler *handler;
+    int ret;
+
+    struct msghdr msg = { NULL };
+    struct iovec iov[1];
+    union {
+        struct cmsghdr cmsg;
+        char control[CMSG_SPACE(sizeof(int) * MS_MAX_FDS)];
+    } msg_control;
+    int flags = MSG_WAITALL;
+
+    iov[0].iov_base = &message;
+    iov[0].iov_len = sizeof(MSMessage);
+
+    msg.msg_iov = iov;
+    msg.msg_iovlen = 1;
+    msg.msg_control = &msg_control;
+    msg.msg_controllen = sizeof(msg_control);
+
+    ret = recvmsg(client->fd, &msg, flags);
+
+    if (ret > 0) {
+        ms_get_fds(client, msg);
+
+        /* handler callback */
+        QLIST_FOREACH(handler, &backend->handlers, next) {
+            if (!strcmp(handler->cmd, message.cmd)) {
+                handler->read(client, message.payload, handler->opaque);
+                return TRUE;
+            }
+        }
+
+        error_report("Unrecognized command: %s", message.cmd);
+    }
+
+    return FALSE;
 }
 
-static void multi_socket_add_client(MSBackend *backend, int fd)
+static void multi_socket_init_client(MSBackend *backend, MSClient *client,
+        int fd, GIOFunc handler)
 {
-    MSClient *c = g_malloc(sizeof(MSClient));
-    MSRegHandler *h;
+    client->backend = backend;
+    client->fd = fd;
+    client->chan = g_io_channel_unix_new(fd);
+    client->tag = g_io_add_watch(client->chan, G_IO_IN, handler, client);
 
-    multi_socket_init_client(backend, c, fd, multi_socket_read_handler);
-    QLIST_FOREACH(h, &backend->reg_handlers, next) {
-        h->reg(c, h->opaque);
+    g_io_channel_set_encoding(client->chan, NULL, NULL);
+    g_io_channel_set_buffered(client->chan, FALSE);
+}
+
+static void ms_add_client(MSBackend *backend, int fd)
+{
+    MSClient *client = g_malloc(sizeof(MSClient));
+    MSRegHandler *handler;
+
+    multi_socket_init_client(backend, client, fd, ms_read);
+    QLIST_FOREACH(handler, &backend->reg_handlers, next) {
+        handler->reg(client, handler->opaque);
     }
 
-    QLIST_INSERT_HEAD(&backend->clients, c, next);
+    QLIST_INSERT_HEAD(&backend->clients, client, next);
 }
 
-static gboolean
-multi_socket_accept(GIOChannel *channel, GIOCondition cond, void *opaque)
+static gboolean ms_accept(GIOChannel *channel, GIOCondition cond, void *opaque)
 {
     MSClient *client = (MSClient *) opaque;
     MSBackend *backend = client->backend;
@@ -230,11 +330,10 @@  multi_socket_accept(GIOChannel *channel, GIOCondition cond, void *opaque)
     int fd;
 
     len = sizeof(uaddr);
-
     fd = qemu_accept(backend->listener.fd, (struct sockaddr *) &uaddr, &len);
 
     if (fd > 0) {
-        multi_socket_add_client(backend, fd);
+        ms_add_client(backend, fd);
         return true;
     } else {
         perror("Error creating socket.");
@@ -242,54 +341,73 @@  multi_socket_accept(GIOChannel *channel, GIOCondition cond, void *opaque)
     }
 }
 
-static void multi_socket_init_socket(MSBackend *backend)
+static int ms_write_and_block_to(MSClient *client, MSMessage *message)
 {
-    int fd;
+    int ret;
 
-    backend->addr = g_new0(SocketAddress, 1);
-    backend->addr->kind = SOCKET_ADDRESS_KIND_UNIX;
-    backend->addr->q_unix = g_new0(UnixSocketAddress, 1);
-    /* TODO change name with real path */
-    backend->addr->q_unix->path = g_strdup(backend->path);
-
-    if (backend->listen) {
-        fd = socket_listen(backend->addr, NULL);
+    g_source_remove(client->tag);
 
-        if (fd < 0) {
-            perror("Error: Impossible to open socket.");
-            exit(-1);
-        }
-
-        multi_socket_init_client(backend, &backend->listener, fd,
-                multi_socket_accept);
-    } else {
-        fd = socket_connect(backend->addr, NULL, NULL, NULL);
+    ret = ms_write_to(client, message);
+    ms_read(client->chan, G_IO_IN, client);
 
-        if (fd < 0) {
-            perror("Error: Unavailable socket server.");
-            exit(-1);
-        }
+    g_io_add_watch(client->chan, G_IO_IN, ms_read, client);
 
-        multi_socket_init_client(backend, &backend->listener,
-                fd, multi_socket_read_handler);
-    }
+    return ret;
 }
 
 static void multi_socket_backend_complete(UserCreatable *uc, Error **errp)
 {
     MSBackend *backend = MULTI_SOCKET_BACKEND(uc);
+    GIOFunc handler;
 
     QLIST_INIT(&backend->clients);
+    QLIST_INIT(&backend->reg_handlers);
     QLIST_INIT(&backend->handlers);
 
-    multi_socket_init_socket(backend);
+    backend->addr = g_new0(SocketAddress, 1);
+
+    if (backend->path) {
+        backend->addr->kind = SOCKET_ADDRESS_KIND_UNIX;
+        backend->addr->q_unix = g_new0(UnixSocketAddress, 1);
+        backend->addr->q_unix->path = g_strdup(backend->path);
+    } else {
+        backend->addr->kind = SOCKET_ADDRESS_KIND_INET;
+        backend->addr->inet = g_new0(InetSocketAddress, 1);
+        backend->addr->inet->host = g_strdup(backend->host);
+        backend->addr->inet->port = g_strdup(backend->port);
+    }
+
+    if (backend->listen) {
+        backend->fd = socket_listen(backend->addr, NULL);
+        handler = ms_accept;
+    } else {
+        backend->fd = socket_connect(backend->addr, NULL, NULL, NULL);
+        handler = ms_read;
+    }
+
+    if (backend->fd < 0) {
+        perror("Error: Impossible to open socket.");
+        exit(-1);
+    }
+
+    multi_socket_init_client(backend, &backend->listener,
+            backend->fd, handler);
 }
 
 static void multi_socket_class_init(ObjectClass *oc, void *data)
 {
     UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
+    MSBackendClass *msc = MULTI_SOCKET_BACKEND_CLASS(oc);
 
     ucc->complete = multi_socket_backend_complete;
+
+    msc->add_reg_handler = ms_add_reg_handler;
+    msc->add_handler = ms_add_handler;
+    msc->write_to = ms_write_to;
+    msc->send_fds_to = ms_send_fds_to;
+    msc->get_fds_num_from = ms_get_fds_num_from;
+    msc->get_fds_from = ms_get_fds_from;
+    msc->write_and_block_to = ms_write_and_block_to;
 }
 
 static bool multi_socket_backend_get_listen(Object *o, Error **errp)
@@ -325,13 +443,45 @@  static void multi_socket_set_path(Object *o, const char *str, Error **errp)
     backend->path = g_strdup(str);
 }
 
+static char *multi_socket_get_host(Object *o, Error **errp)
+{
+    MSBackend *backend = MULTI_SOCKET_BACKEND(o);
+
+    return g_strdup(backend->host);
+}
+
+static void multi_socket_set_host(Object *o, const char *str, Error **errp)
+{
+    MSBackend *backend = MULTI_SOCKET_BACKEND(o);
+
+    backend->host = g_strdup(str);
+}
+
+static char *multi_socket_get_port(Object *o, Error **errp)
+{
+    MSBackend *backend = MULTI_SOCKET_BACKEND(o);
+
+    return g_strdup(backend->port);
+}
+
+static void multi_socket_set_port(Object *o, const char *str, Error **errp)
+{
+    MSBackend *backend = MULTI_SOCKET_BACKEND(o);
+
+    backend->port = g_strdup(str);
+}
+
 static void multi_socket_instance_init(Object *o)
 {
     object_property_add_bool(o, "listen",
-                        multi_socket_backend_get_listen,
-                        multi_socket_backend_set_listen, NULL);
+            multi_socket_backend_get_listen,
+            multi_socket_backend_set_listen, NULL);
     object_property_add_str(o, "path", multi_socket_get_path,
                         multi_socket_set_path, NULL);
+    object_property_add_str(o, "host", multi_socket_get_host,
+                        multi_socket_set_host, NULL);
+    object_property_add_str(o, "port", multi_socket_get_port,
+                        multi_socket_set_port, NULL);
 }
 
 static const TypeInfo multi_socket_backend_info = {
diff --git a/include/qemu/multi-socket.h b/include/qemu/multi-socket.h
index dee866a..8a43e8a 100644
--- a/include/qemu/multi-socket.h
+++ b/include/qemu/multi-socket.h
@@ -1,13 +1,14 @@ 
 /*
- * QEMU Multi Client socket
+ * QEMU multi client socket
  *
  * Copyright (C) 2015 - Virtual Open Systems
  *
  * Author: Baptiste Reynal <b.reynal@virtualopensystems.com>
  *
- * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * This work is licensed under the terms of the GNU GPL, version 2. See
  * the COPYING file in the top-level directory.
  */
+
 #ifndef QEMU_MS_H
 #define QEMU_MS_H
 
@@ -25,15 +26,23 @@ 
 #define MULTI_SOCKET_BACKEND_CLASS(klass) \
     OBJECT_CLASS_CHECK(MSBackendClass, (klass), TYPE_MULTI_SOCKET_BACKEND)
 
-#define MAX_FDS     32
-#define BUFFER_SIZE 32
+#define MS_CMD_SIZE     32
+#define MS_PAYLOAD_SIZE 256
+#define MS_MAX_FDS      32
 
 typedef struct MSBackend MSBackend;
 typedef struct MSBackendClass MSBackendClass;
 typedef struct MSClient MSClient;
+typedef struct MSHandler MSHandler;
+typedef struct MSRegHandler MSRegHandler;
+typedef struct MSMessage MSMessage;
+
+struct MSMessage {
+    char cmd[MS_CMD_SIZE];
+    char payload[MS_PAYLOAD_SIZE];
+};
 
 struct MSClient {
-    /* private */
     int fd;
     MSBackend *backend;
     GIOChannel *chan;
@@ -45,17 +54,46 @@  struct MSClient {
     QLIST_ENTRY(MSClient) next;
 };
 
+struct MSHandler {
+    char *cmd;
+    void (*read)(MSClient *client, const char *payload, void *opaque);
+    void *opaque;
+
+    QLIST_ENTRY(MSHandler) next;
+};
+
+struct MSRegHandler {
+    void (*reg)(MSClient *client, void *opaque);
+    void *opaque;
+
+    QLIST_ENTRY(MSRegHandler) next;
+};
+
 struct MSBackendClass {
-    /* private */
     ObjectClass parent_class;
+
+    void (*add_reg_handler)(MSBackend *backend,
+            void (*reg)(MSClient *client, void *opaque),
+            void *opaque);
+    void (*add_handler)(MSBackend *backend, const char *cmd,
+            void (*read)(MSClient *client, const char *payload, void *opaque),
+            void *opaque);
+    int (*write_to)(MSClient *client, MSMessage *message);
+    int (*send_fds_to)(MSClient *client, int *fds, int count,
+            MSMessage *message);
+    int (*get_fds_num_from)(MSClient *client);
+    int (*get_fds_from)(MSClient *client, int *fds);
+    int (*write_and_block_to)(MSClient *client, MSMessage *message);
 };
 
 struct MSBackend {
-    /* private */
     Object parent;
 
-    /* protected */
+    int fd;
+
     char *path;
+    char *host;
+    char *port;
     SocketAddress *addr;
 
     QLIST_HEAD(clients_head, MSClient) clients;
@@ -107,6 +145,7 @@  int multi_socket_send_fds_to(MSClient *client, int *fds, int count,
  * @size: size of the message
  */
 int multi_socket_write_to(MSClient *client, const char *message, int size);
+int multi_socket_write_message_to(MSClient *client, MSMessage *message);
 
 /* Get fds size received with the last message.
  *
@@ -121,4 +160,6 @@  int multi_socket_get_fds_num_from(MSClient *client);
  */
 int multi_socket_get_fds_from(MSClient *client, int *fds);
 
+int multi_socket_write_and_block_to(MSClient *client, MSMessage *message);
+
 #endif