diff mbox

[1/7] qemu-ga: move channel/transport functionality into wrapper class

Message ID 1327328505-31106-2-git-send-email-mdroth@linux.vnet.ibm.com
State New
Headers show

Commit Message

Michael Roth Jan. 23, 2012, 2:21 p.m. UTC
This is monstly in preparation for the win32 port, which won't use
GIO channels for reasons that will be made clearer later. Here the
GAChannel class is just a loose wrapper around GIOChannel
calls/callbacks, but we also roll the logic/configuration for handling
FDs and also for managing unix socket connections into it, which makes
the abstraction much more complete and further aids in the win32 port
since isa-serial/unix-listen will not be supported initially.

There's also a bit of refactoring in the main logic to consolidate the
exit paths so we can do common cleanup for things like pid files, which
weren't always cleaned up previously.

Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com>
---
 Makefile.objs          |    1 +
 qemu-ga.c              |  306 ++++++++++++------------------------------------
 qga/channel-posix.c    |  246 ++++++++++++++++++++++++++++++++++++++
 qga/channel.h          |   33 +++++
 qga/guest-agent-core.h |    2 +-
 5 files changed, 355 insertions(+), 233 deletions(-)
 create mode 100644 qga/channel-posix.c
 create mode 100644 qga/channel.h

Comments

Anthony Liguori Jan. 23, 2012, 8:24 p.m. UTC | #1
On 01/23/2012 08:21 AM, Michael Roth wrote:
> This is monstly in preparation for the win32 port, which won't use
> GIO channels for reasons that will be made clearer later. Here the
> GAChannel class is just a loose wrapper around GIOChannel
> calls/callbacks, but we also roll the logic/configuration for handling
> FDs and also for managing unix socket connections into it, which makes
> the abstraction much more complete and further aids in the win32 port
> since isa-serial/unix-listen will not be supported initially.
>
> There's also a bit of refactoring in the main logic to consolidate the
> exit paths so we can do common cleanup for things like pid files, which
> weren't always cleaned up previously.
>
> Signed-off-by: Michael Roth<mdroth@linux.vnet.ibm.com>

This doesn't apply very well.  Can you rebase?

Regards,

Anthony Liguori

> ---
>   Makefile.objs          |    1 +
>   qemu-ga.c              |  306 ++++++++++++------------------------------------
>   qga/channel-posix.c    |  246 ++++++++++++++++++++++++++++++++++++++
>   qga/channel.h          |   33 +++++
>   qga/guest-agent-core.h |    2 +-
>   5 files changed, 355 insertions(+), 233 deletions(-)
>   create mode 100644 qga/channel-posix.c
>   create mode 100644 qga/channel.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index f94c881..72141cc 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -417,6 +417,7 @@ common-obj-y += qmp.o hmp.o
>   # guest agent
>
>   qga-nested-y = guest-agent-commands.o guest-agent-command-state.o
> +qga-nested-y += channel-posix.o
>   qga-obj-y = $(addprefix qga/, $(qga-nested-y))
>   qga-obj-y += qemu-ga.o qemu-sockets.o module.o qemu-option.o
>   qga-obj-$(CONFIG_WIN32) += oslib-win32.o
> diff --git a/qemu-ga.c b/qemu-ga.c
> index 8e5b075..5f89ab9 100644
> --- a/qemu-ga.c
> +++ b/qemu-ga.c
> @@ -15,9 +15,7 @@
>   #include<stdbool.h>
>   #include<glib.h>
>   #include<getopt.h>
> -#include<termios.h>
>   #include<syslog.h>
> -#include "qemu_socket.h"
>   #include "json-streamer.h"
>   #include "json-parser.h"
>   #include "qint.h"
> @@ -29,19 +27,15 @@
>   #include "error_int.h"
>   #include "qapi/qmp-core.h"
>   #include "qga-qapi-types.h"
> +#include "qga/channel.h"
>
>   #define QGA_VIRTIO_PATH_DEFAULT "/dev/virtio-ports/org.qemu.guest_agent.0"
>   #define QGA_PIDFILE_DEFAULT "/var/run/qemu-ga.pid"
> -#define QGA_BAUDRATE_DEFAULT B38400 /* for isa-serial channels */
> -#define QGA_TIMEOUT_DEFAULT 30*1000 /* ms */
>
>   struct GAState {
>       JSONMessageParser parser;
>       GMainLoop *main_loop;
> -    GIOChannel *conn_channel;
> -    GIOChannel *listen_channel;
> -    const char *path;
> -    const char *method;
> +    GAChannel *channel;
>       bool virtio; /* fastpath to check for virtio to deal with poll() quirks */
>       GACommandState *command_state;
>       GLogLevelFlags log_level;
> @@ -60,7 +54,7 @@ static void quit_handler(int sig)
>       }
>   }
>
> -static void register_signal_handlers(void)
> +static gboolean register_signal_handlers(void)
>   {
>       struct sigaction sigact;
>       int ret;
> @@ -71,12 +65,14 @@ static void register_signal_handlers(void)
>       ret = sigaction(SIGINT,&sigact, NULL);
>       if (ret == -1) {
>           g_error("error configuring signal handler: %s", strerror(errno));
> -        exit(EXIT_FAILURE);
> +        return false;
>       }
>       ret = sigaction(SIGTERM,&sigact, NULL);
>       if (ret == -1) {
>           g_error("error configuring signal handler: %s", strerror(errno));
> +        return false;
>       }
> +    return true;
>   }
>
>   static void usage(const char *cmd)
> @@ -101,8 +97,6 @@ static void usage(const char *cmd)
>       , cmd, QGA_VERSION, QGA_VIRTIO_PATH_DEFAULT, QGA_PIDFILE_DEFAULT);
>   }
>
> -static void conn_channel_close(GAState *s);
> -
>   static GuestAgentSupportLevel ga_support_level;
>
>   static int ga_cmp_support_levels(GuestAgentSupportLevel a,
> @@ -250,40 +244,13 @@ fail:
>       exit(EXIT_FAILURE);
>   }
>
> -static int conn_channel_send_buf(GIOChannel *channel, const char *buf,
> -                                 gsize count)
> +static int send_response(GAState *s, QObject *payload)
>   {
> -    GError *err = NULL;
> -    gsize written = 0;
> -    GIOStatus status;
> -
> -    while (count) {
> -        status = g_io_channel_write_chars(channel, buf, count,&written,&err);
> -        g_debug("sending data, count: %d", (int)count);
> -        if (err != NULL) {
> -            g_warning("error sending newline: %s", err->message);
> -            return err->code;
> -        }
> -        if (status == G_IO_STATUS_ERROR || status == G_IO_STATUS_EOF) {
> -            return -EPIPE;
> -        }
> -
> -        if (status == G_IO_STATUS_NORMAL) {
> -            count -= written;
> -        }
> -    }
> -
> -    return 0;
> -}
> -
> -static int conn_channel_send_payload(GIOChannel *channel, QObject *payload)
> -{
> -    int ret = 0;
>       const char *buf;
>       QString *payload_qstr;
> -    GError *err = NULL;
> +    GIOStatus status;
>
> -    g_assert(payload&&  channel);
> +    g_assert(payload&&  s->channel);
>
>       payload_qstr = qobject_to_json(payload);
>       if (!payload_qstr) {
> @@ -292,24 +259,13 @@ static int conn_channel_send_payload(GIOChannel *channel, QObject *payload)
>
>       qstring_append_chr(payload_qstr, '\n');
>       buf = qstring_get_str(payload_qstr);
> -    ret = conn_channel_send_buf(channel, buf, strlen(buf));
> -    if (ret) {
> -        goto out_free;
> -    }
> -
> -    g_io_channel_flush(channel,&err);
> -    if (err != NULL) {
> -        g_warning("error flushing payload: %s", err->message);
> -        ret = err->code;
> -        goto out_free;
> -    }
> -
> -out_free:
> +    status = ga_channel_write_all(s->channel, buf, strlen(buf));
>       QDECREF(payload_qstr);
> -    if (err) {
> -        g_error_free(err);
> +    if (status != G_IO_STATUS_NORMAL) {
> +        return -EIO;
>       }
> -    return ret;
> +
> +    return 0;
>   }
>
>   static void process_command(GAState *s, QDict *req)
> @@ -321,9 +277,9 @@ static void process_command(GAState *s, QDict *req)
>       g_debug("processing command");
>       rsp = qmp_dispatch(QOBJECT(req));
>       if (rsp) {
> -        ret = conn_channel_send_payload(s->conn_channel, rsp);
> +        ret = send_response(s, rsp);
>           if (ret) {
> -            g_warning("error sending payload: %s", strerror(ret));
> +            g_warning("error sending response: %s", strerror(ret));
>           }
>           qobject_decref(rsp);
>       } else {
> @@ -373,38 +329,42 @@ static void process_event(JSONMessageParser *parser, QList *tokens)
>               qdict_put_obj(qdict, "error", error_get_qobject(err));
>               error_free(err);
>           }
> -        ret = conn_channel_send_payload(s->conn_channel, QOBJECT(qdict));
> +        ret = send_response(s, QOBJECT(qdict));
>           if (ret) {
> -            g_warning("error sending payload: %s", strerror(ret));
> +            g_warning("error sending error response: %s", strerror(ret));
>           }
>       }
>
>       QDECREF(qdict);
>   }
>
> -static gboolean conn_channel_read(GIOChannel *channel, GIOCondition condition,
> -                                  gpointer data)
> +/* false return signals GAChannel to close the current client connection */
> +static gboolean channel_event_cb(GIOCondition condition, gpointer data)
>   {
>       GAState *s = data;
> -    gchar buf[1024];
> +    gchar buf[QGA_READ_COUNT_DEFAULT+1];
>       gsize count;
>       GError *err = NULL;
> -    memset(buf, 0, 1024);
> -    GIOStatus status = g_io_channel_read_chars(channel, buf, 1024,
> -&count,&err);
> +    GIOStatus status = ga_channel_read(s->channel, buf, QGA_READ_COUNT_DEFAULT,&count);
>       if (err != NULL) {
>           g_warning("error reading channel: %s", err->message);
> -        conn_channel_close(s);
>           g_error_free(err);
>           return false;
>       }
>       switch (status) {
>       case G_IO_STATUS_ERROR:
> -        g_warning("problem");
> +        g_warning("error reading channel");
>           return false;
>       case G_IO_STATUS_NORMAL:
> +        buf[count] = 0;
>           g_debug("read data, count: %d, data: %s", (int)count, buf);
>           json_message_parser_feed(&s->parser, (char *)buf, (int)count);
> +        break;
> +    case G_IO_STATUS_EOF:
> +        g_debug("received EOF");
> +        if (!s->virtio) {
> +            return false;
> +        }
>       case G_IO_STATUS_AGAIN:
>           /* virtio causes us to spin here when no process is attached to
>            * host-side chardev. sleep a bit to mitigate this
> @@ -413,180 +373,49 @@ static gboolean conn_channel_read(GIOChannel *channel, GIOCondition condition,
>               usleep(100*1000);
>           }
>           return true;
> -    case G_IO_STATUS_EOF:
> -        g_debug("received EOF");
> -        conn_channel_close(s);
> -        if (s->virtio) {
> -            return true;
> -        }
> -        return false;
>       default:
>           g_warning("unknown channel read status, closing");
> -        conn_channel_close(s);
>           return false;
>       }
>       return true;
>   }
>
> -static int conn_channel_add(GAState *s, int fd)
> +static gboolean channel_init(GAState *s, const gchar *method, const gchar *path)
>   {
> -    GIOChannel *conn_channel;
> -    GError *err = NULL;
> +    GAChannelMethod channel_method;
>
> -    g_assert(s&&  !s->conn_channel);
> -    conn_channel = g_io_channel_unix_new(fd);
> -    g_assert(conn_channel);
> -    g_io_channel_set_encoding(conn_channel, NULL,&err);
> -    if (err != NULL) {
> -        g_warning("error setting channel encoding to binary");
> -        g_error_free(err);
> -        return -1;
> +    if (method == NULL) {
> +        method = "virtio-serial";
>       }
> -    g_io_add_watch(conn_channel, G_IO_IN | G_IO_HUP,
> -                   conn_channel_read, s);
> -    s->conn_channel = conn_channel;
> -    return 0;
> -}
>
> -static gboolean listen_channel_accept(GIOChannel *channel,
> -                                      GIOCondition condition, gpointer data)
> -{
> -    GAState *s = data;
> -    g_assert(channel != NULL);
> -    int ret, conn_fd;
> -    bool accepted = false;
> -    struct sockaddr_un addr;
> -    socklen_t addrlen = sizeof(addr);
> -
> -    conn_fd = qemu_accept(g_io_channel_unix_get_fd(s->listen_channel),
> -                             (struct sockaddr *)&addr,&addrlen);
> -    if (conn_fd == -1) {
> -        g_warning("error converting fd to gsocket: %s", strerror(errno));
> -        goto out;
> -    }
> -    fcntl(conn_fd, F_SETFL, O_NONBLOCK);
> -    ret = conn_channel_add(s, conn_fd);
> -    if (ret) {
> -        g_warning("error setting up connection");
> -        goto out;
> -    }
> -    accepted = true;
> -
> -out:
> -    /* only accept 1 connection at a time */
> -    return !accepted;
> -}
> -
> -/* start polling for readable events on listen fd, new==true
> - * indicates we should use the existing s->listen_channel
> - */
> -static int listen_channel_add(GAState *s, int listen_fd, bool new)
> -{
> -    if (new) {
> -        s->listen_channel = g_io_channel_unix_new(listen_fd);
> -    }
> -    g_io_add_watch(s->listen_channel, G_IO_IN,
> -                   listen_channel_accept, s);
> -    return 0;
> -}
> -
> -/* cleanup state for closed connection/session, start accepting new
> - * connections if we're in listening mode
> - */
> -static void conn_channel_close(GAState *s)
> -{
> -    if (strcmp(s->method, "unix-listen") == 0) {
> -        g_io_channel_shutdown(s->conn_channel, true, NULL);
> -        listen_channel_add(s, 0, false);
> -    } else if (strcmp(s->method, "virtio-serial") == 0) {
> -        /* we spin on EOF for virtio-serial, so back off a bit. also,
> -         * dont close the connection in this case, it'll resume normal
> -         * operation when another process connects to host chardev
> -         */
> -        usleep(100*1000);
> -        goto out_noclose;
> -    }
> -    g_io_channel_unref(s->conn_channel);
> -    s->conn_channel = NULL;
> -out_noclose:
> -    return;
> -}
> -
> -static void init_guest_agent(GAState *s)
> -{
> -    struct termios tio;
> -    int ret, fd;
> -
> -    if (s->method == NULL) {
> -        /* try virtio-serial as our default */
> -        s->method = "virtio-serial";
> -    }
> -
> -    if (s->path == NULL) {
> -        if (strcmp(s->method, "virtio-serial") != 0) {
> +    if (path == NULL) {
> +        if (strcmp(method, "virtio-serial") != 0) {
>               g_critical("must specify a path for this channel");
> -            exit(EXIT_FAILURE);
> +            return false;
>           }
>           /* try the default path for the virtio-serial port */
> -        s->path = QGA_VIRTIO_PATH_DEFAULT;
> +        path = QGA_VIRTIO_PATH_DEFAULT;
>       }
>
> -    if (strcmp(s->method, "virtio-serial") == 0) {
> -        s->virtio = true;
> -        fd = qemu_open(s->path, O_RDWR | O_NONBLOCK | O_ASYNC);
> -        if (fd == -1) {
> -            g_critical("error opening channel: %s", strerror(errno));
> -            exit(EXIT_FAILURE);
> -        }
> -        ret = conn_channel_add(s, fd);
> -        if (ret) {
> -            g_critical("error adding channel to main loop");
> -            exit(EXIT_FAILURE);
> -        }
> -    } else if (strcmp(s->method, "isa-serial") == 0) {
> -        fd = qemu_open(s->path, O_RDWR | O_NOCTTY);
> -        if (fd == -1) {
> -            g_critical("error opening channel: %s", strerror(errno));
> -            exit(EXIT_FAILURE);
> -        }
> -        tcgetattr(fd,&tio);
> -        /* set up serial port for non-canonical, dumb byte streaming */
> -        tio.c_iflag&= ~(IGNBRK | BRKINT | IGNPAR | PARMRK | INPCK | ISTRIP |
> -                         INLCR | IGNCR | ICRNL | IXON | IXOFF | IXANY |
> -                         IMAXBEL);
> -        tio.c_oflag = 0;
> -        tio.c_lflag = 0;
> -        tio.c_cflag |= QGA_BAUDRATE_DEFAULT;
> -        /* 1 available byte min or reads will block (we'll set non-blocking
> -         * elsewhere, else we have to deal with read()=0 instead)
> -         */
> -        tio.c_cc[VMIN] = 1;
> -        tio.c_cc[VTIME] = 0;
> -        /* flush everything waiting for read/xmit, it's garbage at this point */
> -        tcflush(fd, TCIFLUSH);
> -        tcsetattr(fd, TCSANOW,&tio);
> -        ret = conn_channel_add(s, fd);
> -        if (ret) {
> -            g_error("error adding channel to main loop");
> -        }
> -    } else if (strcmp(s->method, "unix-listen") == 0) {
> -        fd = unix_listen(s->path, NULL, strlen(s->path));
> -        if (fd == -1) {
> -            g_critical("error opening path: %s", strerror(errno));
> -            exit(EXIT_FAILURE);
> -        }
> -        ret = listen_channel_add(s, fd, true);
> -        if (ret) {
> -            g_critical("error binding/listening to specified socket");
> -            exit(EXIT_FAILURE);
> -        }
> +    if (strcmp(method, "virtio-serial") == 0) {
> +        s->virtio = true; /* virtio requires special handling in some cases */
> +        channel_method = GA_CHANNEL_VIRTIO_SERIAL;
> +    } else if (strcmp(method, "isa-serial") == 0) {
> +        channel_method = GA_CHANNEL_ISA_SERIAL;
> +    } else if (strcmp(method, "unix-listen") == 0) {
> +        channel_method = GA_CHANNEL_UNIX_LISTEN;
>       } else {
> -        g_critical("unsupported channel method/type: %s", s->method);
> -        exit(EXIT_FAILURE);
> +        g_critical("unsupported channel method/type: %s", method);
> +        return false;
>       }
>
> -    json_message_parser_init(&s->parser, process_event);
> -    s->main_loop = g_main_loop_new(NULL, false);
> +    s->channel = ga_channel_new(channel_method, path, channel_event_cb, s);
> +    if (!s->channel) {
> +        g_critical("failed to create guest agent channel");
> +        return false;
> +    }
> +
> +    return true;
>   }
>
>   int main(int argc, char **argv)
> @@ -689,9 +518,6 @@ int main(int argc, char **argv)
>
>       ga_set_support_level(default_support_level);
>       s = g_malloc0(sizeof(GAState));
> -    s->conn_channel = NULL;
> -    s->path = path;
> -    s->method = method;
>       s->log_file = log_file;
>       s->log_level = log_level;
>       g_log_set_default_handler(ga_log, s);
> @@ -700,15 +526,31 @@ int main(int argc, char **argv)
>       s->command_state = ga_command_state_new();
>       ga_command_state_init(s, s->command_state);
>       ga_command_state_init_all(s->command_state);
> +    json_message_parser_init(&s->parser, process_event);
>       ga_state = s;
> +    if (!register_signal_handlers()) {
> +        g_critical("failed to register signal handlers");
> +        goto out_bad;
> +    }
>
> -    init_guest_agent(ga_state);
> -    register_signal_handlers();
> -
> +    s->main_loop = g_main_loop_new(NULL, false);
> +    if (!channel_init(ga_state, method, path)) {
> +        g_critical("failed to initialize guest agent channel");
> +        goto out_bad;
> +    }
>       g_main_loop_run(ga_state->main_loop);
>
>       ga_command_state_cleanup_all(ga_state->command_state);
> -    unlink(pidfile);
> +    ga_channel_free(ga_state->channel);
>
> +    if (daemonize) {
> +        unlink(pidfile);
> +    }
>       return 0;
> +
> +out_bad:
> +    if (daemonize) {
> +        unlink(pidfile);
> +    }
> +    return EXIT_FAILURE;
>   }
> diff --git a/qga/channel-posix.c b/qga/channel-posix.c
> new file mode 100644
> index 0000000..40f7658
> --- /dev/null
> +++ b/qga/channel-posix.c
> @@ -0,0 +1,246 @@
> +#include<glib.h>
> +#include<termios.h>
> +#include "qemu_socket.h"
> +#include "qga/channel.h"
> +
> +#define GA_CHANNEL_BAUDRATE_DEFAULT B38400 /* for isa-serial channels */
> +
> +struct GAChannel {
> +    GIOChannel *listen_channel;
> +    GIOChannel *client_channel;
> +    GAChannelMethod method;
> +    GAChannelCallback event_cb;
> +    gpointer user_data;
> +};
> +
> +static int ga_channel_client_add(GAChannel *c, int fd);
> +
> +static gboolean ga_channel_listen_accept(GIOChannel *channel,
> +                                         GIOCondition condition, gpointer data)
> +{
> +    GAChannel *c = data;
> +    int ret, client_fd;
> +    bool accepted = false;
> +    struct sockaddr_un addr;
> +    socklen_t addrlen = sizeof(addr);
> +
> +    g_assert(channel != NULL);
> +
> +    client_fd = qemu_accept(g_io_channel_unix_get_fd(channel),
> +                            (struct sockaddr *)&addr,&addrlen);
> +    if (client_fd == -1) {
> +        g_warning("error converting fd to gsocket: %s", strerror(errno));
> +        goto out;
> +    }
> +    fcntl(client_fd, F_SETFL, O_NONBLOCK);
> +    ret = ga_channel_client_add(c, client_fd);
> +    if (ret) {
> +        g_warning("error setting up connection");
> +        goto out;
> +    }
> +    accepted = true;
> +
> +out:
> +    /* only accept 1 connection at a time */
> +    return !accepted;
> +}
> +
> +/* start polling for readable events on listen fd, new==true
> + * indicates we should use the existing s->listen_channel
> + */
> +static void ga_channel_listen_add(GAChannel *c, int listen_fd, bool create)
> +{
> +    if (create) {
> +        c->listen_channel = g_io_channel_unix_new(listen_fd);
> +    }
> +    g_io_add_watch(c->listen_channel, G_IO_IN, ga_channel_listen_accept, c);
> +}
> +
> +static void ga_channel_listen_close(GAChannel *c)
> +{
> +    g_assert(c->method == GA_CHANNEL_UNIX_LISTEN);
> +    g_assert(c->listen_channel);
> +    g_io_channel_shutdown(c->listen_channel, true, NULL);
> +    g_io_channel_unref(c->listen_channel);
> +    c->listen_channel = NULL;
> +}
> +
> +/* cleanup state for closed connection/session, start accepting new
> + * connections if we're in listening mode
> + */
> +static void ga_channel_client_close(GAChannel *c)
> +{
> +    g_assert(c->client_channel);
> +    g_io_channel_shutdown(c->client_channel, true, NULL);
> +    g_io_channel_unref(c->client_channel);
> +    c->client_channel = NULL;
> +    if (c->method == GA_CHANNEL_UNIX_LISTEN&&  c->listen_channel) {
> +        ga_channel_listen_add(c, 0, false);
> +    }
> +}
> +
> +static gboolean ga_channel_client_event(GIOChannel *channel,
> +                                        GIOCondition condition, gpointer data)
> +{
> +    GAChannel *c = data;
> +    gboolean client_cont;
> +
> +    g_assert(c);
> +    if (c->event_cb) {
> +        client_cont = c->event_cb(condition, c->user_data);
> +        if (!client_cont) {
> +            ga_channel_client_close(c);
> +            return false;
> +        }
> +    }
> +    return true;
> +}
> +
> +static int ga_channel_client_add(GAChannel *c, int fd)
> +{
> +    GIOChannel *client_channel;
> +    GError *err = NULL;
> +
> +    g_assert(c&&  !c->client_channel);
> +    client_channel = g_io_channel_unix_new(fd);
> +    g_assert(client_channel);
> +    g_io_channel_set_encoding(client_channel, NULL,&err);
> +    if (err != NULL) {
> +        g_warning("error setting channel encoding to binary");
> +        g_error_free(err);
> +        return -1;
> +    }
> +    g_io_add_watch(client_channel, G_IO_IN | G_IO_HUP,
> +                   ga_channel_client_event, c);
> +    c->client_channel = client_channel;
> +    return 0;
> +}
> +
> +static gboolean ga_channel_open(GAChannel *c, const gchar *path, GAChannelMethod method)
> +{
> +    int ret;
> +    c->method = method;
> +
> +    switch (c->method) {
> +    case GA_CHANNEL_VIRTIO_SERIAL: {
> +        int fd = qemu_open(path, O_RDWR | O_NONBLOCK | O_ASYNC);
> +        if (fd == -1) {
> +            g_critical("error opening channel: %s", strerror(errno));
> +            exit(EXIT_FAILURE);
> +        }
> +        ret = ga_channel_client_add(c, fd);
> +        if (ret) {
> +            g_critical("error adding channel to main loop");
> +            return false;
> +        }
> +        break;
> +    }
> +    case GA_CHANNEL_ISA_SERIAL: {
> +        struct termios tio;
> +        int fd = qemu_open(path, O_RDWR | O_NOCTTY | O_NONBLOCK);
> +        if (fd == -1) {
> +            g_critical("error opening channel: %s", strerror(errno));
> +            exit(EXIT_FAILURE);
> +        }
> +        tcgetattr(fd,&tio);
> +        /* set up serial port for non-canonical, dumb byte streaming */
> +        tio.c_iflag&= ~(IGNBRK | BRKINT | IGNPAR | PARMRK | INPCK | ISTRIP |
> +                         INLCR | IGNCR | ICRNL | IXON | IXOFF | IXANY |
> +                         IMAXBEL);
> +        tio.c_oflag = 0;
> +        tio.c_lflag = 0;
> +        tio.c_cflag |= GA_CHANNEL_BAUDRATE_DEFAULT;
> +        /* 1 available byte min or reads will block (we'll set non-blocking
> +         * elsewhere, else we have to deal with read()=0 instead)
> +         */
> +        tio.c_cc[VMIN] = 1;
> +        tio.c_cc[VTIME] = 0;
> +        /* flush everything waiting for read/xmit, it's garbage at this point */
> +        tcflush(fd, TCIFLUSH);
> +        tcsetattr(fd, TCSANOW,&tio);
> +        ret = ga_channel_client_add(c, fd);
> +        if (ret) {
> +            g_error("error adding channel to main loop");
> +        }
> +        break;
> +    }
> +    case GA_CHANNEL_UNIX_LISTEN: {
> +        int fd = unix_listen(path, NULL, strlen(path));
> +        if (fd == -1) {
> +            g_critical("error opening path: %s", strerror(errno));
> +            return false;
> +        }
> +        ga_channel_listen_add(c, fd, true);
> +        break;
> +    }
> +    default:
> +        g_critical("error binding/listening to specified socket");
> +        return false;
> +    }
> +
> +    return true;
> +}
> +
> +GIOStatus ga_channel_write_all(GAChannel *c, const gchar *buf, gsize size)
> +{
> +    GError *err = NULL;
> +    gsize written = 0;
> +    GIOStatus status = G_IO_STATUS_NORMAL;
> +
> +    while (size) {
> +        status = g_io_channel_write_chars(c->client_channel, buf, size,
> +&written,&err);
> +        g_debug("sending data, count: %d", (int)size);
> +        if (err != NULL) {
> +            g_warning("error writing to channel: %s", err->message);
> +            return G_IO_STATUS_ERROR;
> +        }
> +        if (status != G_IO_STATUS_NORMAL) {
> +            break;
> +        }
> +        size -= written;
> +    }
> +
> +    if (status == G_IO_STATUS_NORMAL) {
> +        status = g_io_channel_flush(c->client_channel,&err);
> +        if (err != NULL) {
> +            g_warning("error flushing channel: %s", err->message);
> +            return G_IO_STATUS_ERROR;
> +        }
> +    }
> +
> +    return status;
> +}
> +
> +GIOStatus ga_channel_read(GAChannel *c, gchar *buf, gsize size, gsize *count)
> +{
> +    return g_io_channel_read_chars(c->client_channel, buf, size, count, NULL);
> +}
> +
> +GAChannel *ga_channel_new(GAChannelMethod method, const gchar *path,
> +                          GAChannelCallback cb, gpointer opaque)
> +{
> +    GAChannel *c = g_malloc0(sizeof(GAChannel));
> +    c->event_cb = cb;
> +    c->user_data = opaque;
> +
> +    if (!ga_channel_open(c, path, method)) {
> +        g_critical("error opening channel");
> +        ga_channel_free(c);
> +        return NULL;
> +    }
> +
> +    return c;
> +}
> +
> +void ga_channel_free(GAChannel *c)
> +{
> +    if (c->method == GA_CHANNEL_UNIX_LISTEN
> +&&  c->listen_channel) {
> +        ga_channel_listen_close(c);
> +    }
> +    if (c->client_channel) {
> +        ga_channel_client_close(c);
> +    }
> +    g_free(c);
> +}
> diff --git a/qga/channel.h b/qga/channel.h
> new file mode 100644
> index 0000000..3704ea9
> --- /dev/null
> +++ b/qga/channel.h
> @@ -0,0 +1,33 @@
> +/*
> + * QEMU Guest Agent channel declarations
> + *
> + * Copyright IBM Corp. 2012
> + *
> + * Authors:
> + *  Michael Roth<mdroth@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + */
> +#ifndef QGA_CHANNEL_H
> +#define QGA_CHANNEL_H
> +
> +#include<glib.h>
> +
> +typedef struct GAChannel GAChannel;
> +
> +typedef enum {
> +    GA_CHANNEL_VIRTIO_SERIAL,
> +    GA_CHANNEL_ISA_SERIAL,
> +    GA_CHANNEL_UNIX_LISTEN,
> +} GAChannelMethod;
> +
> +typedef gboolean (*GAChannelCallback)(GIOCondition condition, gpointer opaque);
> +
> +GAChannel *ga_channel_new(GAChannelMethod method, const gchar *path,
> +                          GAChannelCallback cb, gpointer opaque);
> +void ga_channel_free(GAChannel *c);
> +GIOStatus ga_channel_read(GAChannel *c, gchar *buf, gsize size, gsize *count);
> +GIOStatus ga_channel_write_all(GAChannel *c, const gchar *buf, gsize size);
> +
> +#endif
> diff --git a/qga/guest-agent-core.h b/qga/guest-agent-core.h
> index 7327e73..891ed06 100644
> --- a/qga/guest-agent-core.h
> +++ b/qga/guest-agent-core.h
> @@ -22,7 +22,7 @@
>   #define QGA_SUPPORT_LEVEL_MAJOR_MIN 1
>   #define QGA_SUPPORT_LEVEL_MINOR_MIN 0
>   #define QGA_SUPPORT_LEVEL_MICRO_MIN 0
> -#define QGA_READ_COUNT_DEFAULT 4<<  10
> +#define QGA_READ_COUNT_DEFAULT 4096
>
>   typedef struct GAState GAState;
>   typedef struct GACommandState GACommandState;
Anthony Liguori Jan. 23, 2012, 8:39 p.m. UTC | #2
On 01/23/2012 02:24 PM, Anthony Liguori wrote:
> On 01/23/2012 08:21 AM, Michael Roth wrote:
>> This is monstly in preparation for the win32 port, which won't use
>> GIO channels for reasons that will be made clearer later. Here the
>> GAChannel class is just a loose wrapper around GIOChannel
>> calls/callbacks, but we also roll the logic/configuration for handling
>> FDs and also for managing unix socket connections into it, which makes
>> the abstraction much more complete and further aids in the win32 port
>> since isa-serial/unix-listen will not be supported initially.
>>
>> There's also a bit of refactoring in the main logic to consolidate the
>> exit paths so we can do common cleanup for things like pid files, which
>> weren't always cleaned up previously.
>>
>> Signed-off-by: Michael Roth<mdroth@linux.vnet.ibm.com>
>
> This doesn't apply very well. Can you rebase?

Your header is missing [PATCH 0/7] so i didn't see it.  How about sending a 
proper PULL request with the accumulated patches?

Regards,

Anthony Liguori

>
> Regards,
>
> Anthony Liguori
>
>> ---
>> Makefile.objs | 1 +
>> qemu-ga.c | 306 ++++++++++++------------------------------------
>> qga/channel-posix.c | 246 ++++++++++++++++++++++++++++++++++++++
>> qga/channel.h | 33 +++++
>> qga/guest-agent-core.h | 2 +-
>> 5 files changed, 355 insertions(+), 233 deletions(-)
>> create mode 100644 qga/channel-posix.c
>> create mode 100644 qga/channel.h
>>
>> diff --git a/Makefile.objs b/Makefile.objs
>> index f94c881..72141cc 100644
>> --- a/Makefile.objs
>> +++ b/Makefile.objs
>> @@ -417,6 +417,7 @@ common-obj-y += qmp.o hmp.o
>> # guest agent
>>
>> qga-nested-y = guest-agent-commands.o guest-agent-command-state.o
>> +qga-nested-y += channel-posix.o
>> qga-obj-y = $(addprefix qga/, $(qga-nested-y))
>> qga-obj-y += qemu-ga.o qemu-sockets.o module.o qemu-option.o
>> qga-obj-$(CONFIG_WIN32) += oslib-win32.o
>> diff --git a/qemu-ga.c b/qemu-ga.c
>> index 8e5b075..5f89ab9 100644
>> --- a/qemu-ga.c
>> +++ b/qemu-ga.c
>> @@ -15,9 +15,7 @@
>> #include<stdbool.h>
>> #include<glib.h>
>> #include<getopt.h>
>> -#include<termios.h>
>> #include<syslog.h>
>> -#include "qemu_socket.h"
>> #include "json-streamer.h"
>> #include "json-parser.h"
>> #include "qint.h"
>> @@ -29,19 +27,15 @@
>> #include "error_int.h"
>> #include "qapi/qmp-core.h"
>> #include "qga-qapi-types.h"
>> +#include "qga/channel.h"
>>
>> #define QGA_VIRTIO_PATH_DEFAULT "/dev/virtio-ports/org.qemu.guest_agent.0"
>> #define QGA_PIDFILE_DEFAULT "/var/run/qemu-ga.pid"
>> -#define QGA_BAUDRATE_DEFAULT B38400 /* for isa-serial channels */
>> -#define QGA_TIMEOUT_DEFAULT 30*1000 /* ms */
>>
>> struct GAState {
>> JSONMessageParser parser;
>> GMainLoop *main_loop;
>> - GIOChannel *conn_channel;
>> - GIOChannel *listen_channel;
>> - const char *path;
>> - const char *method;
>> + GAChannel *channel;
>> bool virtio; /* fastpath to check for virtio to deal with poll() quirks */
>> GACommandState *command_state;
>> GLogLevelFlags log_level;
>> @@ -60,7 +54,7 @@ static void quit_handler(int sig)
>> }
>> }
>>
>> -static void register_signal_handlers(void)
>> +static gboolean register_signal_handlers(void)
>> {
>> struct sigaction sigact;
>> int ret;
>> @@ -71,12 +65,14 @@ static void register_signal_handlers(void)
>> ret = sigaction(SIGINT,&sigact, NULL);
>> if (ret == -1) {
>> g_error("error configuring signal handler: %s", strerror(errno));
>> - exit(EXIT_FAILURE);
>> + return false;
>> }
>> ret = sigaction(SIGTERM,&sigact, NULL);
>> if (ret == -1) {
>> g_error("error configuring signal handler: %s", strerror(errno));
>> + return false;
>> }
>> + return true;
>> }
>>
>> static void usage(const char *cmd)
>> @@ -101,8 +97,6 @@ static void usage(const char *cmd)
>> , cmd, QGA_VERSION, QGA_VIRTIO_PATH_DEFAULT, QGA_PIDFILE_DEFAULT);
>> }
>>
>> -static void conn_channel_close(GAState *s);
>> -
>> static GuestAgentSupportLevel ga_support_level;
>>
>> static int ga_cmp_support_levels(GuestAgentSupportLevel a,
>> @@ -250,40 +244,13 @@ fail:
>> exit(EXIT_FAILURE);
>> }
>>
>> -static int conn_channel_send_buf(GIOChannel *channel, const char *buf,
>> - gsize count)
>> +static int send_response(GAState *s, QObject *payload)
>> {
>> - GError *err = NULL;
>> - gsize written = 0;
>> - GIOStatus status;
>> -
>> - while (count) {
>> - status = g_io_channel_write_chars(channel, buf, count,&written,&err);
>> - g_debug("sending data, count: %d", (int)count);
>> - if (err != NULL) {
>> - g_warning("error sending newline: %s", err->message);
>> - return err->code;
>> - }
>> - if (status == G_IO_STATUS_ERROR || status == G_IO_STATUS_EOF) {
>> - return -EPIPE;
>> - }
>> -
>> - if (status == G_IO_STATUS_NORMAL) {
>> - count -= written;
>> - }
>> - }
>> -
>> - return 0;
>> -}
>> -
>> -static int conn_channel_send_payload(GIOChannel *channel, QObject *payload)
>> -{
>> - int ret = 0;
>> const char *buf;
>> QString *payload_qstr;
>> - GError *err = NULL;
>> + GIOStatus status;
>>
>> - g_assert(payload&& channel);
>> + g_assert(payload&& s->channel);
>>
>> payload_qstr = qobject_to_json(payload);
>> if (!payload_qstr) {
>> @@ -292,24 +259,13 @@ static int conn_channel_send_payload(GIOChannel
>> *channel, QObject *payload)
>>
>> qstring_append_chr(payload_qstr, '\n');
>> buf = qstring_get_str(payload_qstr);
>> - ret = conn_channel_send_buf(channel, buf, strlen(buf));
>> - if (ret) {
>> - goto out_free;
>> - }
>> -
>> - g_io_channel_flush(channel,&err);
>> - if (err != NULL) {
>> - g_warning("error flushing payload: %s", err->message);
>> - ret = err->code;
>> - goto out_free;
>> - }
>> -
>> -out_free:
>> + status = ga_channel_write_all(s->channel, buf, strlen(buf));
>> QDECREF(payload_qstr);
>> - if (err) {
>> - g_error_free(err);
>> + if (status != G_IO_STATUS_NORMAL) {
>> + return -EIO;
>> }
>> - return ret;
>> +
>> + return 0;
>> }
>>
>> static void process_command(GAState *s, QDict *req)
>> @@ -321,9 +277,9 @@ static void process_command(GAState *s, QDict *req)
>> g_debug("processing command");
>> rsp = qmp_dispatch(QOBJECT(req));
>> if (rsp) {
>> - ret = conn_channel_send_payload(s->conn_channel, rsp);
>> + ret = send_response(s, rsp);
>> if (ret) {
>> - g_warning("error sending payload: %s", strerror(ret));
>> + g_warning("error sending response: %s", strerror(ret));
>> }
>> qobject_decref(rsp);
>> } else {
>> @@ -373,38 +329,42 @@ static void process_event(JSONMessageParser *parser,
>> QList *tokens)
>> qdict_put_obj(qdict, "error", error_get_qobject(err));
>> error_free(err);
>> }
>> - ret = conn_channel_send_payload(s->conn_channel, QOBJECT(qdict));
>> + ret = send_response(s, QOBJECT(qdict));
>> if (ret) {
>> - g_warning("error sending payload: %s", strerror(ret));
>> + g_warning("error sending error response: %s", strerror(ret));
>> }
>> }
>>
>> QDECREF(qdict);
>> }
>>
>> -static gboolean conn_channel_read(GIOChannel *channel, GIOCondition condition,
>> - gpointer data)
>> +/* false return signals GAChannel to close the current client connection */
>> +static gboolean channel_event_cb(GIOCondition condition, gpointer data)
>> {
>> GAState *s = data;
>> - gchar buf[1024];
>> + gchar buf[QGA_READ_COUNT_DEFAULT+1];
>> gsize count;
>> GError *err = NULL;
>> - memset(buf, 0, 1024);
>> - GIOStatus status = g_io_channel_read_chars(channel, buf, 1024,
>> -&count,&err);
>> + GIOStatus status = ga_channel_read(s->channel, buf,
>> QGA_READ_COUNT_DEFAULT,&count);
>> if (err != NULL) {
>> g_warning("error reading channel: %s", err->message);
>> - conn_channel_close(s);
>> g_error_free(err);
>> return false;
>> }
>> switch (status) {
>> case G_IO_STATUS_ERROR:
>> - g_warning("problem");
>> + g_warning("error reading channel");
>> return false;
>> case G_IO_STATUS_NORMAL:
>> + buf[count] = 0;
>> g_debug("read data, count: %d, data: %s", (int)count, buf);
>> json_message_parser_feed(&s->parser, (char *)buf, (int)count);
>> + break;
>> + case G_IO_STATUS_EOF:
>> + g_debug("received EOF");
>> + if (!s->virtio) {
>> + return false;
>> + }
>> case G_IO_STATUS_AGAIN:
>> /* virtio causes us to spin here when no process is attached to
>> * host-side chardev. sleep a bit to mitigate this
>> @@ -413,180 +373,49 @@ static gboolean conn_channel_read(GIOChannel *channel,
>> GIOCondition condition,
>> usleep(100*1000);
>> }
>> return true;
>> - case G_IO_STATUS_EOF:
>> - g_debug("received EOF");
>> - conn_channel_close(s);
>> - if (s->virtio) {
>> - return true;
>> - }
>> - return false;
>> default:
>> g_warning("unknown channel read status, closing");
>> - conn_channel_close(s);
>> return false;
>> }
>> return true;
>> }
>>
>> -static int conn_channel_add(GAState *s, int fd)
>> +static gboolean channel_init(GAState *s, const gchar *method, const gchar *path)
>> {
>> - GIOChannel *conn_channel;
>> - GError *err = NULL;
>> + GAChannelMethod channel_method;
>>
>> - g_assert(s&& !s->conn_channel);
>> - conn_channel = g_io_channel_unix_new(fd);
>> - g_assert(conn_channel);
>> - g_io_channel_set_encoding(conn_channel, NULL,&err);
>> - if (err != NULL) {
>> - g_warning("error setting channel encoding to binary");
>> - g_error_free(err);
>> - return -1;
>> + if (method == NULL) {
>> + method = "virtio-serial";
>> }
>> - g_io_add_watch(conn_channel, G_IO_IN | G_IO_HUP,
>> - conn_channel_read, s);
>> - s->conn_channel = conn_channel;
>> - return 0;
>> -}
>>
>> -static gboolean listen_channel_accept(GIOChannel *channel,
>> - GIOCondition condition, gpointer data)
>> -{
>> - GAState *s = data;
>> - g_assert(channel != NULL);
>> - int ret, conn_fd;
>> - bool accepted = false;
>> - struct sockaddr_un addr;
>> - socklen_t addrlen = sizeof(addr);
>> -
>> - conn_fd = qemu_accept(g_io_channel_unix_get_fd(s->listen_channel),
>> - (struct sockaddr *)&addr,&addrlen);
>> - if (conn_fd == -1) {
>> - g_warning("error converting fd to gsocket: %s", strerror(errno));
>> - goto out;
>> - }
>> - fcntl(conn_fd, F_SETFL, O_NONBLOCK);
>> - ret = conn_channel_add(s, conn_fd);
>> - if (ret) {
>> - g_warning("error setting up connection");
>> - goto out;
>> - }
>> - accepted = true;
>> -
>> -out:
>> - /* only accept 1 connection at a time */
>> - return !accepted;
>> -}
>> -
>> -/* start polling for readable events on listen fd, new==true
>> - * indicates we should use the existing s->listen_channel
>> - */
>> -static int listen_channel_add(GAState *s, int listen_fd, bool new)
>> -{
>> - if (new) {
>> - s->listen_channel = g_io_channel_unix_new(listen_fd);
>> - }
>> - g_io_add_watch(s->listen_channel, G_IO_IN,
>> - listen_channel_accept, s);
>> - return 0;
>> -}
>> -
>> -/* cleanup state for closed connection/session, start accepting new
>> - * connections if we're in listening mode
>> - */
>> -static void conn_channel_close(GAState *s)
>> -{
>> - if (strcmp(s->method, "unix-listen") == 0) {
>> - g_io_channel_shutdown(s->conn_channel, true, NULL);
>> - listen_channel_add(s, 0, false);
>> - } else if (strcmp(s->method, "virtio-serial") == 0) {
>> - /* we spin on EOF for virtio-serial, so back off a bit. also,
>> - * dont close the connection in this case, it'll resume normal
>> - * operation when another process connects to host chardev
>> - */
>> - usleep(100*1000);
>> - goto out_noclose;
>> - }
>> - g_io_channel_unref(s->conn_channel);
>> - s->conn_channel = NULL;
>> -out_noclose:
>> - return;
>> -}
>> -
>> -static void init_guest_agent(GAState *s)
>> -{
>> - struct termios tio;
>> - int ret, fd;
>> -
>> - if (s->method == NULL) {
>> - /* try virtio-serial as our default */
>> - s->method = "virtio-serial";
>> - }
>> -
>> - if (s->path == NULL) {
>> - if (strcmp(s->method, "virtio-serial") != 0) {
>> + if (path == NULL) {
>> + if (strcmp(method, "virtio-serial") != 0) {
>> g_critical("must specify a path for this channel");
>> - exit(EXIT_FAILURE);
>> + return false;
>> }
>> /* try the default path for the virtio-serial port */
>> - s->path = QGA_VIRTIO_PATH_DEFAULT;
>> + path = QGA_VIRTIO_PATH_DEFAULT;
>> }
>>
>> - if (strcmp(s->method, "virtio-serial") == 0) {
>> - s->virtio = true;
>> - fd = qemu_open(s->path, O_RDWR | O_NONBLOCK | O_ASYNC);
>> - if (fd == -1) {
>> - g_critical("error opening channel: %s", strerror(errno));
>> - exit(EXIT_FAILURE);
>> - }
>> - ret = conn_channel_add(s, fd);
>> - if (ret) {
>> - g_critical("error adding channel to main loop");
>> - exit(EXIT_FAILURE);
>> - }
>> - } else if (strcmp(s->method, "isa-serial") == 0) {
>> - fd = qemu_open(s->path, O_RDWR | O_NOCTTY);
>> - if (fd == -1) {
>> - g_critical("error opening channel: %s", strerror(errno));
>> - exit(EXIT_FAILURE);
>> - }
>> - tcgetattr(fd,&tio);
>> - /* set up serial port for non-canonical, dumb byte streaming */
>> - tio.c_iflag&= ~(IGNBRK | BRKINT | IGNPAR | PARMRK | INPCK | ISTRIP |
>> - INLCR | IGNCR | ICRNL | IXON | IXOFF | IXANY |
>> - IMAXBEL);
>> - tio.c_oflag = 0;
>> - tio.c_lflag = 0;
>> - tio.c_cflag |= QGA_BAUDRATE_DEFAULT;
>> - /* 1 available byte min or reads will block (we'll set non-blocking
>> - * elsewhere, else we have to deal with read()=0 instead)
>> - */
>> - tio.c_cc[VMIN] = 1;
>> - tio.c_cc[VTIME] = 0;
>> - /* flush everything waiting for read/xmit, it's garbage at this point */
>> - tcflush(fd, TCIFLUSH);
>> - tcsetattr(fd, TCSANOW,&tio);
>> - ret = conn_channel_add(s, fd);
>> - if (ret) {
>> - g_error("error adding channel to main loop");
>> - }
>> - } else if (strcmp(s->method, "unix-listen") == 0) {
>> - fd = unix_listen(s->path, NULL, strlen(s->path));
>> - if (fd == -1) {
>> - g_critical("error opening path: %s", strerror(errno));
>> - exit(EXIT_FAILURE);
>> - }
>> - ret = listen_channel_add(s, fd, true);
>> - if (ret) {
>> - g_critical("error binding/listening to specified socket");
>> - exit(EXIT_FAILURE);
>> - }
>> + if (strcmp(method, "virtio-serial") == 0) {
>> + s->virtio = true; /* virtio requires special handling in some cases */
>> + channel_method = GA_CHANNEL_VIRTIO_SERIAL;
>> + } else if (strcmp(method, "isa-serial") == 0) {
>> + channel_method = GA_CHANNEL_ISA_SERIAL;
>> + } else if (strcmp(method, "unix-listen") == 0) {
>> + channel_method = GA_CHANNEL_UNIX_LISTEN;
>> } else {
>> - g_critical("unsupported channel method/type: %s", s->method);
>> - exit(EXIT_FAILURE);
>> + g_critical("unsupported channel method/type: %s", method);
>> + return false;
>> }
>>
>> - json_message_parser_init(&s->parser, process_event);
>> - s->main_loop = g_main_loop_new(NULL, false);
>> + s->channel = ga_channel_new(channel_method, path, channel_event_cb, s);
>> + if (!s->channel) {
>> + g_critical("failed to create guest agent channel");
>> + return false;
>> + }
>> +
>> + return true;
>> }
>>
>> int main(int argc, char **argv)
>> @@ -689,9 +518,6 @@ int main(int argc, char **argv)
>>
>> ga_set_support_level(default_support_level);
>> s = g_malloc0(sizeof(GAState));
>> - s->conn_channel = NULL;
>> - s->path = path;
>> - s->method = method;
>> s->log_file = log_file;
>> s->log_level = log_level;
>> g_log_set_default_handler(ga_log, s);
>> @@ -700,15 +526,31 @@ int main(int argc, char **argv)
>> s->command_state = ga_command_state_new();
>> ga_command_state_init(s, s->command_state);
>> ga_command_state_init_all(s->command_state);
>> + json_message_parser_init(&s->parser, process_event);
>> ga_state = s;
>> + if (!register_signal_handlers()) {
>> + g_critical("failed to register signal handlers");
>> + goto out_bad;
>> + }
>>
>> - init_guest_agent(ga_state);
>> - register_signal_handlers();
>> -
>> + s->main_loop = g_main_loop_new(NULL, false);
>> + if (!channel_init(ga_state, method, path)) {
>> + g_critical("failed to initialize guest agent channel");
>> + goto out_bad;
>> + }
>> g_main_loop_run(ga_state->main_loop);
>>
>> ga_command_state_cleanup_all(ga_state->command_state);
>> - unlink(pidfile);
>> + ga_channel_free(ga_state->channel);
>>
>> + if (daemonize) {
>> + unlink(pidfile);
>> + }
>> return 0;
>> +
>> +out_bad:
>> + if (daemonize) {
>> + unlink(pidfile);
>> + }
>> + return EXIT_FAILURE;
>> }
>> diff --git a/qga/channel-posix.c b/qga/channel-posix.c
>> new file mode 100644
>> index 0000000..40f7658
>> --- /dev/null
>> +++ b/qga/channel-posix.c
>> @@ -0,0 +1,246 @@
>> +#include<glib.h>
>> +#include<termios.h>
>> +#include "qemu_socket.h"
>> +#include "qga/channel.h"
>> +
>> +#define GA_CHANNEL_BAUDRATE_DEFAULT B38400 /* for isa-serial channels */
>> +
>> +struct GAChannel {
>> + GIOChannel *listen_channel;
>> + GIOChannel *client_channel;
>> + GAChannelMethod method;
>> + GAChannelCallback event_cb;
>> + gpointer user_data;
>> +};
>> +
>> +static int ga_channel_client_add(GAChannel *c, int fd);
>> +
>> +static gboolean ga_channel_listen_accept(GIOChannel *channel,
>> + GIOCondition condition, gpointer data)
>> +{
>> + GAChannel *c = data;
>> + int ret, client_fd;
>> + bool accepted = false;
>> + struct sockaddr_un addr;
>> + socklen_t addrlen = sizeof(addr);
>> +
>> + g_assert(channel != NULL);
>> +
>> + client_fd = qemu_accept(g_io_channel_unix_get_fd(channel),
>> + (struct sockaddr *)&addr,&addrlen);
>> + if (client_fd == -1) {
>> + g_warning("error converting fd to gsocket: %s", strerror(errno));
>> + goto out;
>> + }
>> + fcntl(client_fd, F_SETFL, O_NONBLOCK);
>> + ret = ga_channel_client_add(c, client_fd);
>> + if (ret) {
>> + g_warning("error setting up connection");
>> + goto out;
>> + }
>> + accepted = true;
>> +
>> +out:
>> + /* only accept 1 connection at a time */
>> + return !accepted;
>> +}
>> +
>> +/* start polling for readable events on listen fd, new==true
>> + * indicates we should use the existing s->listen_channel
>> + */
>> +static void ga_channel_listen_add(GAChannel *c, int listen_fd, bool create)
>> +{
>> + if (create) {
>> + c->listen_channel = g_io_channel_unix_new(listen_fd);
>> + }
>> + g_io_add_watch(c->listen_channel, G_IO_IN, ga_channel_listen_accept, c);
>> +}
>> +
>> +static void ga_channel_listen_close(GAChannel *c)
>> +{
>> + g_assert(c->method == GA_CHANNEL_UNIX_LISTEN);
>> + g_assert(c->listen_channel);
>> + g_io_channel_shutdown(c->listen_channel, true, NULL);
>> + g_io_channel_unref(c->listen_channel);
>> + c->listen_channel = NULL;
>> +}
>> +
>> +/* cleanup state for closed connection/session, start accepting new
>> + * connections if we're in listening mode
>> + */
>> +static void ga_channel_client_close(GAChannel *c)
>> +{
>> + g_assert(c->client_channel);
>> + g_io_channel_shutdown(c->client_channel, true, NULL);
>> + g_io_channel_unref(c->client_channel);
>> + c->client_channel = NULL;
>> + if (c->method == GA_CHANNEL_UNIX_LISTEN&& c->listen_channel) {
>> + ga_channel_listen_add(c, 0, false);
>> + }
>> +}
>> +
>> +static gboolean ga_channel_client_event(GIOChannel *channel,
>> + GIOCondition condition, gpointer data)
>> +{
>> + GAChannel *c = data;
>> + gboolean client_cont;
>> +
>> + g_assert(c);
>> + if (c->event_cb) {
>> + client_cont = c->event_cb(condition, c->user_data);
>> + if (!client_cont) {
>> + ga_channel_client_close(c);
>> + return false;
>> + }
>> + }
>> + return true;
>> +}
>> +
>> +static int ga_channel_client_add(GAChannel *c, int fd)
>> +{
>> + GIOChannel *client_channel;
>> + GError *err = NULL;
>> +
>> + g_assert(c&& !c->client_channel);
>> + client_channel = g_io_channel_unix_new(fd);
>> + g_assert(client_channel);
>> + g_io_channel_set_encoding(client_channel, NULL,&err);
>> + if (err != NULL) {
>> + g_warning("error setting channel encoding to binary");
>> + g_error_free(err);
>> + return -1;
>> + }
>> + g_io_add_watch(client_channel, G_IO_IN | G_IO_HUP,
>> + ga_channel_client_event, c);
>> + c->client_channel = client_channel;
>> + return 0;
>> +}
>> +
>> +static gboolean ga_channel_open(GAChannel *c, const gchar *path,
>> GAChannelMethod method)
>> +{
>> + int ret;
>> + c->method = method;
>> +
>> + switch (c->method) {
>> + case GA_CHANNEL_VIRTIO_SERIAL: {
>> + int fd = qemu_open(path, O_RDWR | O_NONBLOCK | O_ASYNC);
>> + if (fd == -1) {
>> + g_critical("error opening channel: %s", strerror(errno));
>> + exit(EXIT_FAILURE);
>> + }
>> + ret = ga_channel_client_add(c, fd);
>> + if (ret) {
>> + g_critical("error adding channel to main loop");
>> + return false;
>> + }
>> + break;
>> + }
>> + case GA_CHANNEL_ISA_SERIAL: {
>> + struct termios tio;
>> + int fd = qemu_open(path, O_RDWR | O_NOCTTY | O_NONBLOCK);
>> + if (fd == -1) {
>> + g_critical("error opening channel: %s", strerror(errno));
>> + exit(EXIT_FAILURE);
>> + }
>> + tcgetattr(fd,&tio);
>> + /* set up serial port for non-canonical, dumb byte streaming */
>> + tio.c_iflag&= ~(IGNBRK | BRKINT | IGNPAR | PARMRK | INPCK | ISTRIP |
>> + INLCR | IGNCR | ICRNL | IXON | IXOFF | IXANY |
>> + IMAXBEL);
>> + tio.c_oflag = 0;
>> + tio.c_lflag = 0;
>> + tio.c_cflag |= GA_CHANNEL_BAUDRATE_DEFAULT;
>> + /* 1 available byte min or reads will block (we'll set non-blocking
>> + * elsewhere, else we have to deal with read()=0 instead)
>> + */
>> + tio.c_cc[VMIN] = 1;
>> + tio.c_cc[VTIME] = 0;
>> + /* flush everything waiting for read/xmit, it's garbage at this point */
>> + tcflush(fd, TCIFLUSH);
>> + tcsetattr(fd, TCSANOW,&tio);
>> + ret = ga_channel_client_add(c, fd);
>> + if (ret) {
>> + g_error("error adding channel to main loop");
>> + }
>> + break;
>> + }
>> + case GA_CHANNEL_UNIX_LISTEN: {
>> + int fd = unix_listen(path, NULL, strlen(path));
>> + if (fd == -1) {
>> + g_critical("error opening path: %s", strerror(errno));
>> + return false;
>> + }
>> + ga_channel_listen_add(c, fd, true);
>> + break;
>> + }
>> + default:
>> + g_critical("error binding/listening to specified socket");
>> + return false;
>> + }
>> +
>> + return true;
>> +}
>> +
>> +GIOStatus ga_channel_write_all(GAChannel *c, const gchar *buf, gsize size)
>> +{
>> + GError *err = NULL;
>> + gsize written = 0;
>> + GIOStatus status = G_IO_STATUS_NORMAL;
>> +
>> + while (size) {
>> + status = g_io_channel_write_chars(c->client_channel, buf, size,
>> +&written,&err);
>> + g_debug("sending data, count: %d", (int)size);
>> + if (err != NULL) {
>> + g_warning("error writing to channel: %s", err->message);
>> + return G_IO_STATUS_ERROR;
>> + }
>> + if (status != G_IO_STATUS_NORMAL) {
>> + break;
>> + }
>> + size -= written;
>> + }
>> +
>> + if (status == G_IO_STATUS_NORMAL) {
>> + status = g_io_channel_flush(c->client_channel,&err);
>> + if (err != NULL) {
>> + g_warning("error flushing channel: %s", err->message);
>> + return G_IO_STATUS_ERROR;
>> + }
>> + }
>> +
>> + return status;
>> +}
>> +
>> +GIOStatus ga_channel_read(GAChannel *c, gchar *buf, gsize size, gsize *count)
>> +{
>> + return g_io_channel_read_chars(c->client_channel, buf, size, count, NULL);
>> +}
>> +
>> +GAChannel *ga_channel_new(GAChannelMethod method, const gchar *path,
>> + GAChannelCallback cb, gpointer opaque)
>> +{
>> + GAChannel *c = g_malloc0(sizeof(GAChannel));
>> + c->event_cb = cb;
>> + c->user_data = opaque;
>> +
>> + if (!ga_channel_open(c, path, method)) {
>> + g_critical("error opening channel");
>> + ga_channel_free(c);
>> + return NULL;
>> + }
>> +
>> + return c;
>> +}
>> +
>> +void ga_channel_free(GAChannel *c)
>> +{
>> + if (c->method == GA_CHANNEL_UNIX_LISTEN
>> +&& c->listen_channel) {
>> + ga_channel_listen_close(c);
>> + }
>> + if (c->client_channel) {
>> + ga_channel_client_close(c);
>> + }
>> + g_free(c);
>> +}
>> diff --git a/qga/channel.h b/qga/channel.h
>> new file mode 100644
>> index 0000000..3704ea9
>> --- /dev/null
>> +++ b/qga/channel.h
>> @@ -0,0 +1,33 @@
>> +/*
>> + * QEMU Guest Agent channel declarations
>> + *
>> + * Copyright IBM Corp. 2012
>> + *
>> + * Authors:
>> + * Michael Roth<mdroth@linux.vnet.ibm.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
>> + * See the COPYING file in the top-level directory.
>> + */
>> +#ifndef QGA_CHANNEL_H
>> +#define QGA_CHANNEL_H
>> +
>> +#include<glib.h>
>> +
>> +typedef struct GAChannel GAChannel;
>> +
>> +typedef enum {
>> + GA_CHANNEL_VIRTIO_SERIAL,
>> + GA_CHANNEL_ISA_SERIAL,
>> + GA_CHANNEL_UNIX_LISTEN,
>> +} GAChannelMethod;
>> +
>> +typedef gboolean (*GAChannelCallback)(GIOCondition condition, gpointer opaque);
>> +
>> +GAChannel *ga_channel_new(GAChannelMethod method, const gchar *path,
>> + GAChannelCallback cb, gpointer opaque);
>> +void ga_channel_free(GAChannel *c);
>> +GIOStatus ga_channel_read(GAChannel *c, gchar *buf, gsize size, gsize *count);
>> +GIOStatus ga_channel_write_all(GAChannel *c, const gchar *buf, gsize size);
>> +
>> +#endif
>> diff --git a/qga/guest-agent-core.h b/qga/guest-agent-core.h
>> index 7327e73..891ed06 100644
>> --- a/qga/guest-agent-core.h
>> +++ b/qga/guest-agent-core.h
>> @@ -22,7 +22,7 @@
>> #define QGA_SUPPORT_LEVEL_MAJOR_MIN 1
>> #define QGA_SUPPORT_LEVEL_MINOR_MIN 0
>> #define QGA_SUPPORT_LEVEL_MICRO_MIN 0
>> -#define QGA_READ_COUNT_DEFAULT 4<< 10
>> +#define QGA_READ_COUNT_DEFAULT 4096
>>
>> typedef struct GAState GAState;
>> typedef struct GACommandState GACommandState;
>
>
Michael Roth Jan. 23, 2012, 8:56 p.m. UTC | #3
On 01/23/2012 02:24 PM, Anthony Liguori wrote:
> On 01/23/2012 08:21 AM, Michael Roth wrote:
>> This is monstly in preparation for the win32 port, which won't use
>> GIO channels for reasons that will be made clearer later. Here the
>> GAChannel class is just a loose wrapper around GIOChannel
>> calls/callbacks, but we also roll the logic/configuration for handling
>> FDs and also for managing unix socket connections into it, which makes
>> the abstraction much more complete and further aids in the win32 port
>> since isa-serial/unix-listen will not be supported initially.
>>
>> There's also a bit of refactoring in the main logic to consolidate the
>> exit paths so we can do common cleanup for things like pid files, which
>> weren't always cleaned up previously.
>>
>> Signed-off-by: Michael Roth<mdroth@linux.vnet.ibm.com>
>
> This doesn't apply very well. Can you rebase?

I think the issue is that it depends on the 4 patches I mentioned that 
are currently floating around on the list (the first 2 for patchability, 
the latter 2 for functional/performance issues):

[PATCH] qemu-ga: Add schema documentation for types
[PATCH] qemu-ga: add guest-set-support-level command
[PATCH] main-loop: Fix SetEvent() on uninitialized handle on win32
[PATCH] main-loop: For tools, initialize timers as part of 
qemu_init_main_loop()

I think the first 2 are more likely to get merged along with Luiz's 
guest-suspend series, so I was hesitant to re-include them here as 1 
unit (since that would break Luiz's patches).

The git repo has the whole series though. Can also resubmit in a 
different form if preferred.

>
> Regards,
>
> Anthony Liguori
>
>> ---
>> Makefile.objs | 1 +
>> qemu-ga.c | 306 ++++++++++++------------------------------------
>> qga/channel-posix.c | 246 ++++++++++++++++++++++++++++++++++++++
>> qga/channel.h | 33 +++++
>> qga/guest-agent-core.h | 2 +-
>> 5 files changed, 355 insertions(+), 233 deletions(-)
>> create mode 100644 qga/channel-posix.c
>> create mode 100644 qga/channel.h
>>
>> diff --git a/Makefile.objs b/Makefile.objs
>> index f94c881..72141cc 100644
>> --- a/Makefile.objs
>> +++ b/Makefile.objs
>> @@ -417,6 +417,7 @@ common-obj-y += qmp.o hmp.o
>> # guest agent
>>
>> qga-nested-y = guest-agent-commands.o guest-agent-command-state.o
>> +qga-nested-y += channel-posix.o
>> qga-obj-y = $(addprefix qga/, $(qga-nested-y))
>> qga-obj-y += qemu-ga.o qemu-sockets.o module.o qemu-option.o
>> qga-obj-$(CONFIG_WIN32) += oslib-win32.o
>> diff --git a/qemu-ga.c b/qemu-ga.c
>> index 8e5b075..5f89ab9 100644
>> --- a/qemu-ga.c
>> +++ b/qemu-ga.c
>> @@ -15,9 +15,7 @@
>> #include<stdbool.h>
>> #include<glib.h>
>> #include<getopt.h>
>> -#include<termios.h>
>> #include<syslog.h>
>> -#include "qemu_socket.h"
>> #include "json-streamer.h"
>> #include "json-parser.h"
>> #include "qint.h"
>> @@ -29,19 +27,15 @@
>> #include "error_int.h"
>> #include "qapi/qmp-core.h"
>> #include "qga-qapi-types.h"
>> +#include "qga/channel.h"
>>
>> #define QGA_VIRTIO_PATH_DEFAULT
>> "/dev/virtio-ports/org.qemu.guest_agent.0"
>> #define QGA_PIDFILE_DEFAULT "/var/run/qemu-ga.pid"
>> -#define QGA_BAUDRATE_DEFAULT B38400 /* for isa-serial channels */
>> -#define QGA_TIMEOUT_DEFAULT 30*1000 /* ms */
>>
>> struct GAState {
>> JSONMessageParser parser;
>> GMainLoop *main_loop;
>> - GIOChannel *conn_channel;
>> - GIOChannel *listen_channel;
>> - const char *path;
>> - const char *method;
>> + GAChannel *channel;
>> bool virtio; /* fastpath to check for virtio to deal with poll()
>> quirks */
>> GACommandState *command_state;
>> GLogLevelFlags log_level;
>> @@ -60,7 +54,7 @@ static void quit_handler(int sig)
>> }
>> }
>>
>> -static void register_signal_handlers(void)
>> +static gboolean register_signal_handlers(void)
>> {
>> struct sigaction sigact;
>> int ret;
>> @@ -71,12 +65,14 @@ static void register_signal_handlers(void)
>> ret = sigaction(SIGINT,&sigact, NULL);
>> if (ret == -1) {
>> g_error("error configuring signal handler: %s", strerror(errno));
>> - exit(EXIT_FAILURE);
>> + return false;
>> }
>> ret = sigaction(SIGTERM,&sigact, NULL);
>> if (ret == -1) {
>> g_error("error configuring signal handler: %s", strerror(errno));
>> + return false;
>> }
>> + return true;
>> }
>>
>> static void usage(const char *cmd)
>> @@ -101,8 +97,6 @@ static void usage(const char *cmd)
>> , cmd, QGA_VERSION, QGA_VIRTIO_PATH_DEFAULT, QGA_PIDFILE_DEFAULT);
>> }
>>
>> -static void conn_channel_close(GAState *s);
>> -
>> static GuestAgentSupportLevel ga_support_level;
>>
>> static int ga_cmp_support_levels(GuestAgentSupportLevel a,
>> @@ -250,40 +244,13 @@ fail:
>> exit(EXIT_FAILURE);
>> }
>>
>> -static int conn_channel_send_buf(GIOChannel *channel, const char *buf,
>> - gsize count)
>> +static int send_response(GAState *s, QObject *payload)
>> {
>> - GError *err = NULL;
>> - gsize written = 0;
>> - GIOStatus status;
>> -
>> - while (count) {
>> - status = g_io_channel_write_chars(channel, buf, count,&written,&err);
>> - g_debug("sending data, count: %d", (int)count);
>> - if (err != NULL) {
>> - g_warning("error sending newline: %s", err->message);
>> - return err->code;
>> - }
>> - if (status == G_IO_STATUS_ERROR || status == G_IO_STATUS_EOF) {
>> - return -EPIPE;
>> - }
>> -
>> - if (status == G_IO_STATUS_NORMAL) {
>> - count -= written;
>> - }
>> - }
>> -
>> - return 0;
>> -}
>> -
>> -static int conn_channel_send_payload(GIOChannel *channel, QObject
>> *payload)
>> -{
>> - int ret = 0;
>> const char *buf;
>> QString *payload_qstr;
>> - GError *err = NULL;
>> + GIOStatus status;
>>
>> - g_assert(payload&& channel);
>> + g_assert(payload&& s->channel);
>>
>> payload_qstr = qobject_to_json(payload);
>> if (!payload_qstr) {
>> @@ -292,24 +259,13 @@ static int conn_channel_send_payload(GIOChannel
>> *channel, QObject *payload)
>>
>> qstring_append_chr(payload_qstr, '\n');
>> buf = qstring_get_str(payload_qstr);
>> - ret = conn_channel_send_buf(channel, buf, strlen(buf));
>> - if (ret) {
>> - goto out_free;
>> - }
>> -
>> - g_io_channel_flush(channel,&err);
>> - if (err != NULL) {
>> - g_warning("error flushing payload: %s", err->message);
>> - ret = err->code;
>> - goto out_free;
>> - }
>> -
>> -out_free:
>> + status = ga_channel_write_all(s->channel, buf, strlen(buf));
>> QDECREF(payload_qstr);
>> - if (err) {
>> - g_error_free(err);
>> + if (status != G_IO_STATUS_NORMAL) {
>> + return -EIO;
>> }
>> - return ret;
>> +
>> + return 0;
>> }
>>
>> static void process_command(GAState *s, QDict *req)
>> @@ -321,9 +277,9 @@ static void process_command(GAState *s, QDict *req)
>> g_debug("processing command");
>> rsp = qmp_dispatch(QOBJECT(req));
>> if (rsp) {
>> - ret = conn_channel_send_payload(s->conn_channel, rsp);
>> + ret = send_response(s, rsp);
>> if (ret) {
>> - g_warning("error sending payload: %s", strerror(ret));
>> + g_warning("error sending response: %s", strerror(ret));
>> }
>> qobject_decref(rsp);
>> } else {
>> @@ -373,38 +329,42 @@ static void process_event(JSONMessageParser
>> *parser, QList *tokens)
>> qdict_put_obj(qdict, "error", error_get_qobject(err));
>> error_free(err);
>> }
>> - ret = conn_channel_send_payload(s->conn_channel, QOBJECT(qdict));
>> + ret = send_response(s, QOBJECT(qdict));
>> if (ret) {
>> - g_warning("error sending payload: %s", strerror(ret));
>> + g_warning("error sending error response: %s", strerror(ret));
>> }
>> }
>>
>> QDECREF(qdict);
>> }
>>
>> -static gboolean conn_channel_read(GIOChannel *channel, GIOCondition
>> condition,
>> - gpointer data)
>> +/* false return signals GAChannel to close the current client
>> connection */
>> +static gboolean channel_event_cb(GIOCondition condition, gpointer data)
>> {
>> GAState *s = data;
>> - gchar buf[1024];
>> + gchar buf[QGA_READ_COUNT_DEFAULT+1];
>> gsize count;
>> GError *err = NULL;
>> - memset(buf, 0, 1024);
>> - GIOStatus status = g_io_channel_read_chars(channel, buf, 1024,
>> -&count,&err);
>> + GIOStatus status = ga_channel_read(s->channel, buf,
>> QGA_READ_COUNT_DEFAULT,&count);
>> if (err != NULL) {
>> g_warning("error reading channel: %s", err->message);
>> - conn_channel_close(s);
>> g_error_free(err);
>> return false;
>> }
>> switch (status) {
>> case G_IO_STATUS_ERROR:
>> - g_warning("problem");
>> + g_warning("error reading channel");
>> return false;
>> case G_IO_STATUS_NORMAL:
>> + buf[count] = 0;
>> g_debug("read data, count: %d, data: %s", (int)count, buf);
>> json_message_parser_feed(&s->parser, (char *)buf, (int)count);
>> + break;
>> + case G_IO_STATUS_EOF:
>> + g_debug("received EOF");
>> + if (!s->virtio) {
>> + return false;
>> + }
>> case G_IO_STATUS_AGAIN:
>> /* virtio causes us to spin here when no process is attached to
>> * host-side chardev. sleep a bit to mitigate this
>> @@ -413,180 +373,49 @@ static gboolean conn_channel_read(GIOChannel
>> *channel, GIOCondition condition,
>> usleep(100*1000);
>> }
>> return true;
>> - case G_IO_STATUS_EOF:
>> - g_debug("received EOF");
>> - conn_channel_close(s);
>> - if (s->virtio) {
>> - return true;
>> - }
>> - return false;
>> default:
>> g_warning("unknown channel read status, closing");
>> - conn_channel_close(s);
>> return false;
>> }
>> return true;
>> }
>>
>> -static int conn_channel_add(GAState *s, int fd)
>> +static gboolean channel_init(GAState *s, const gchar *method, const
>> gchar *path)
>> {
>> - GIOChannel *conn_channel;
>> - GError *err = NULL;
>> + GAChannelMethod channel_method;
>>
>> - g_assert(s&& !s->conn_channel);
>> - conn_channel = g_io_channel_unix_new(fd);
>> - g_assert(conn_channel);
>> - g_io_channel_set_encoding(conn_channel, NULL,&err);
>> - if (err != NULL) {
>> - g_warning("error setting channel encoding to binary");
>> - g_error_free(err);
>> - return -1;
>> + if (method == NULL) {
>> + method = "virtio-serial";
>> }
>> - g_io_add_watch(conn_channel, G_IO_IN | G_IO_HUP,
>> - conn_channel_read, s);
>> - s->conn_channel = conn_channel;
>> - return 0;
>> -}
>>
>> -static gboolean listen_channel_accept(GIOChannel *channel,
>> - GIOCondition condition, gpointer data)
>> -{
>> - GAState *s = data;
>> - g_assert(channel != NULL);
>> - int ret, conn_fd;
>> - bool accepted = false;
>> - struct sockaddr_un addr;
>> - socklen_t addrlen = sizeof(addr);
>> -
>> - conn_fd = qemu_accept(g_io_channel_unix_get_fd(s->listen_channel),
>> - (struct sockaddr *)&addr,&addrlen);
>> - if (conn_fd == -1) {
>> - g_warning("error converting fd to gsocket: %s", strerror(errno));
>> - goto out;
>> - }
>> - fcntl(conn_fd, F_SETFL, O_NONBLOCK);
>> - ret = conn_channel_add(s, conn_fd);
>> - if (ret) {
>> - g_warning("error setting up connection");
>> - goto out;
>> - }
>> - accepted = true;
>> -
>> -out:
>> - /* only accept 1 connection at a time */
>> - return !accepted;
>> -}
>> -
>> -/* start polling for readable events on listen fd, new==true
>> - * indicates we should use the existing s->listen_channel
>> - */
>> -static int listen_channel_add(GAState *s, int listen_fd, bool new)
>> -{
>> - if (new) {
>> - s->listen_channel = g_io_channel_unix_new(listen_fd);
>> - }
>> - g_io_add_watch(s->listen_channel, G_IO_IN,
>> - listen_channel_accept, s);
>> - return 0;
>> -}
>> -
>> -/* cleanup state for closed connection/session, start accepting new
>> - * connections if we're in listening mode
>> - */
>> -static void conn_channel_close(GAState *s)
>> -{
>> - if (strcmp(s->method, "unix-listen") == 0) {
>> - g_io_channel_shutdown(s->conn_channel, true, NULL);
>> - listen_channel_add(s, 0, false);
>> - } else if (strcmp(s->method, "virtio-serial") == 0) {
>> - /* we spin on EOF for virtio-serial, so back off a bit. also,
>> - * dont close the connection in this case, it'll resume normal
>> - * operation when another process connects to host chardev
>> - */
>> - usleep(100*1000);
>> - goto out_noclose;
>> - }
>> - g_io_channel_unref(s->conn_channel);
>> - s->conn_channel = NULL;
>> -out_noclose:
>> - return;
>> -}
>> -
>> -static void init_guest_agent(GAState *s)
>> -{
>> - struct termios tio;
>> - int ret, fd;
>> -
>> - if (s->method == NULL) {
>> - /* try virtio-serial as our default */
>> - s->method = "virtio-serial";
>> - }
>> -
>> - if (s->path == NULL) {
>> - if (strcmp(s->method, "virtio-serial") != 0) {
>> + if (path == NULL) {
>> + if (strcmp(method, "virtio-serial") != 0) {
>> g_critical("must specify a path for this channel");
>> - exit(EXIT_FAILURE);
>> + return false;
>> }
>> /* try the default path for the virtio-serial port */
>> - s->path = QGA_VIRTIO_PATH_DEFAULT;
>> + path = QGA_VIRTIO_PATH_DEFAULT;
>> }
>>
>> - if (strcmp(s->method, "virtio-serial") == 0) {
>> - s->virtio = true;
>> - fd = qemu_open(s->path, O_RDWR | O_NONBLOCK | O_ASYNC);
>> - if (fd == -1) {
>> - g_critical("error opening channel: %s", strerror(errno));
>> - exit(EXIT_FAILURE);
>> - }
>> - ret = conn_channel_add(s, fd);
>> - if (ret) {
>> - g_critical("error adding channel to main loop");
>> - exit(EXIT_FAILURE);
>> - }
>> - } else if (strcmp(s->method, "isa-serial") == 0) {
>> - fd = qemu_open(s->path, O_RDWR | O_NOCTTY);
>> - if (fd == -1) {
>> - g_critical("error opening channel: %s", strerror(errno));
>> - exit(EXIT_FAILURE);
>> - }
>> - tcgetattr(fd,&tio);
>> - /* set up serial port for non-canonical, dumb byte streaming */
>> - tio.c_iflag&= ~(IGNBRK | BRKINT | IGNPAR | PARMRK | INPCK | ISTRIP |
>> - INLCR | IGNCR | ICRNL | IXON | IXOFF | IXANY |
>> - IMAXBEL);
>> - tio.c_oflag = 0;
>> - tio.c_lflag = 0;
>> - tio.c_cflag |= QGA_BAUDRATE_DEFAULT;
>> - /* 1 available byte min or reads will block (we'll set non-blocking
>> - * elsewhere, else we have to deal with read()=0 instead)
>> - */
>> - tio.c_cc[VMIN] = 1;
>> - tio.c_cc[VTIME] = 0;
>> - /* flush everything waiting for read/xmit, it's garbage at this
>> point */
>> - tcflush(fd, TCIFLUSH);
>> - tcsetattr(fd, TCSANOW,&tio);
>> - ret = conn_channel_add(s, fd);
>> - if (ret) {
>> - g_error("error adding channel to main loop");
>> - }
>> - } else if (strcmp(s->method, "unix-listen") == 0) {
>> - fd = unix_listen(s->path, NULL, strlen(s->path));
>> - if (fd == -1) {
>> - g_critical("error opening path: %s", strerror(errno));
>> - exit(EXIT_FAILURE);
>> - }
>> - ret = listen_channel_add(s, fd, true);
>> - if (ret) {
>> - g_critical("error binding/listening to specified socket");
>> - exit(EXIT_FAILURE);
>> - }
>> + if (strcmp(method, "virtio-serial") == 0) {
>> + s->virtio = true; /* virtio requires special handling in some cases */
>> + channel_method = GA_CHANNEL_VIRTIO_SERIAL;
>> + } else if (strcmp(method, "isa-serial") == 0) {
>> + channel_method = GA_CHANNEL_ISA_SERIAL;
>> + } else if (strcmp(method, "unix-listen") == 0) {
>> + channel_method = GA_CHANNEL_UNIX_LISTEN;
>> } else {
>> - g_critical("unsupported channel method/type: %s", s->method);
>> - exit(EXIT_FAILURE);
>> + g_critical("unsupported channel method/type: %s", method);
>> + return false;
>> }
>>
>> - json_message_parser_init(&s->parser, process_event);
>> - s->main_loop = g_main_loop_new(NULL, false);
>> + s->channel = ga_channel_new(channel_method, path, channel_event_cb, s);
>> + if (!s->channel) {
>> + g_critical("failed to create guest agent channel");
>> + return false;
>> + }
>> +
>> + return true;
>> }
>>
>> int main(int argc, char **argv)
>> @@ -689,9 +518,6 @@ int main(int argc, char **argv)
>>
>> ga_set_support_level(default_support_level);
>> s = g_malloc0(sizeof(GAState));
>> - s->conn_channel = NULL;
>> - s->path = path;
>> - s->method = method;
>> s->log_file = log_file;
>> s->log_level = log_level;
>> g_log_set_default_handler(ga_log, s);
>> @@ -700,15 +526,31 @@ int main(int argc, char **argv)
>> s->command_state = ga_command_state_new();
>> ga_command_state_init(s, s->command_state);
>> ga_command_state_init_all(s->command_state);
>> + json_message_parser_init(&s->parser, process_event);
>> ga_state = s;
>> + if (!register_signal_handlers()) {
>> + g_critical("failed to register signal handlers");
>> + goto out_bad;
>> + }
>>
>> - init_guest_agent(ga_state);
>> - register_signal_handlers();
>> -
>> + s->main_loop = g_main_loop_new(NULL, false);
>> + if (!channel_init(ga_state, method, path)) {
>> + g_critical("failed to initialize guest agent channel");
>> + goto out_bad;
>> + }
>> g_main_loop_run(ga_state->main_loop);
>>
>> ga_command_state_cleanup_all(ga_state->command_state);
>> - unlink(pidfile);
>> + ga_channel_free(ga_state->channel);
>>
>> + if (daemonize) {
>> + unlink(pidfile);
>> + }
>> return 0;
>> +
>> +out_bad:
>> + if (daemonize) {
>> + unlink(pidfile);
>> + }
>> + return EXIT_FAILURE;
>> }
>> diff --git a/qga/channel-posix.c b/qga/channel-posix.c
>> new file mode 100644
>> index 0000000..40f7658
>> --- /dev/null
>> +++ b/qga/channel-posix.c
>> @@ -0,0 +1,246 @@
>> +#include<glib.h>
>> +#include<termios.h>
>> +#include "qemu_socket.h"
>> +#include "qga/channel.h"
>> +
>> +#define GA_CHANNEL_BAUDRATE_DEFAULT B38400 /* for isa-serial channels */
>> +
>> +struct GAChannel {
>> + GIOChannel *listen_channel;
>> + GIOChannel *client_channel;
>> + GAChannelMethod method;
>> + GAChannelCallback event_cb;
>> + gpointer user_data;
>> +};
>> +
>> +static int ga_channel_client_add(GAChannel *c, int fd);
>> +
>> +static gboolean ga_channel_listen_accept(GIOChannel *channel,
>> + GIOCondition condition, gpointer data)
>> +{
>> + GAChannel *c = data;
>> + int ret, client_fd;
>> + bool accepted = false;
>> + struct sockaddr_un addr;
>> + socklen_t addrlen = sizeof(addr);
>> +
>> + g_assert(channel != NULL);
>> +
>> + client_fd = qemu_accept(g_io_channel_unix_get_fd(channel),
>> + (struct sockaddr *)&addr,&addrlen);
>> + if (client_fd == -1) {
>> + g_warning("error converting fd to gsocket: %s", strerror(errno));
>> + goto out;
>> + }
>> + fcntl(client_fd, F_SETFL, O_NONBLOCK);
>> + ret = ga_channel_client_add(c, client_fd);
>> + if (ret) {
>> + g_warning("error setting up connection");
>> + goto out;
>> + }
>> + accepted = true;
>> +
>> +out:
>> + /* only accept 1 connection at a time */
>> + return !accepted;
>> +}
>> +
>> +/* start polling for readable events on listen fd, new==true
>> + * indicates we should use the existing s->listen_channel
>> + */
>> +static void ga_channel_listen_add(GAChannel *c, int listen_fd, bool
>> create)
>> +{
>> + if (create) {
>> + c->listen_channel = g_io_channel_unix_new(listen_fd);
>> + }
>> + g_io_add_watch(c->listen_channel, G_IO_IN, ga_channel_listen_accept,
>> c);
>> +}
>> +
>> +static void ga_channel_listen_close(GAChannel *c)
>> +{
>> + g_assert(c->method == GA_CHANNEL_UNIX_LISTEN);
>> + g_assert(c->listen_channel);
>> + g_io_channel_shutdown(c->listen_channel, true, NULL);
>> + g_io_channel_unref(c->listen_channel);
>> + c->listen_channel = NULL;
>> +}
>> +
>> +/* cleanup state for closed connection/session, start accepting new
>> + * connections if we're in listening mode
>> + */
>> +static void ga_channel_client_close(GAChannel *c)
>> +{
>> + g_assert(c->client_channel);
>> + g_io_channel_shutdown(c->client_channel, true, NULL);
>> + g_io_channel_unref(c->client_channel);
>> + c->client_channel = NULL;
>> + if (c->method == GA_CHANNEL_UNIX_LISTEN&& c->listen_channel) {
>> + ga_channel_listen_add(c, 0, false);
>> + }
>> +}
>> +
>> +static gboolean ga_channel_client_event(GIOChannel *channel,
>> + GIOCondition condition, gpointer data)
>> +{
>> + GAChannel *c = data;
>> + gboolean client_cont;
>> +
>> + g_assert(c);
>> + if (c->event_cb) {
>> + client_cont = c->event_cb(condition, c->user_data);
>> + if (!client_cont) {
>> + ga_channel_client_close(c);
>> + return false;
>> + }
>> + }
>> + return true;
>> +}
>> +
>> +static int ga_channel_client_add(GAChannel *c, int fd)
>> +{
>> + GIOChannel *client_channel;
>> + GError *err = NULL;
>> +
>> + g_assert(c&& !c->client_channel);
>> + client_channel = g_io_channel_unix_new(fd);
>> + g_assert(client_channel);
>> + g_io_channel_set_encoding(client_channel, NULL,&err);
>> + if (err != NULL) {
>> + g_warning("error setting channel encoding to binary");
>> + g_error_free(err);
>> + return -1;
>> + }
>> + g_io_add_watch(client_channel, G_IO_IN | G_IO_HUP,
>> + ga_channel_client_event, c);
>> + c->client_channel = client_channel;
>> + return 0;
>> +}
>> +
>> +static gboolean ga_channel_open(GAChannel *c, const gchar *path,
>> GAChannelMethod method)
>> +{
>> + int ret;
>> + c->method = method;
>> +
>> + switch (c->method) {
>> + case GA_CHANNEL_VIRTIO_SERIAL: {
>> + int fd = qemu_open(path, O_RDWR | O_NONBLOCK | O_ASYNC);
>> + if (fd == -1) {
>> + g_critical("error opening channel: %s", strerror(errno));
>> + exit(EXIT_FAILURE);
>> + }
>> + ret = ga_channel_client_add(c, fd);
>> + if (ret) {
>> + g_critical("error adding channel to main loop");
>> + return false;
>> + }
>> + break;
>> + }
>> + case GA_CHANNEL_ISA_SERIAL: {
>> + struct termios tio;
>> + int fd = qemu_open(path, O_RDWR | O_NOCTTY | O_NONBLOCK);
>> + if (fd == -1) {
>> + g_critical("error opening channel: %s", strerror(errno));
>> + exit(EXIT_FAILURE);
>> + }
>> + tcgetattr(fd,&tio);
>> + /* set up serial port for non-canonical, dumb byte streaming */
>> + tio.c_iflag&= ~(IGNBRK | BRKINT | IGNPAR | PARMRK | INPCK | ISTRIP |
>> + INLCR | IGNCR | ICRNL | IXON | IXOFF | IXANY |
>> + IMAXBEL);
>> + tio.c_oflag = 0;
>> + tio.c_lflag = 0;
>> + tio.c_cflag |= GA_CHANNEL_BAUDRATE_DEFAULT;
>> + /* 1 available byte min or reads will block (we'll set non-blocking
>> + * elsewhere, else we have to deal with read()=0 instead)
>> + */
>> + tio.c_cc[VMIN] = 1;
>> + tio.c_cc[VTIME] = 0;
>> + /* flush everything waiting for read/xmit, it's garbage at this
>> point */
>> + tcflush(fd, TCIFLUSH);
>> + tcsetattr(fd, TCSANOW,&tio);
>> + ret = ga_channel_client_add(c, fd);
>> + if (ret) {
>> + g_error("error adding channel to main loop");
>> + }
>> + break;
>> + }
>> + case GA_CHANNEL_UNIX_LISTEN: {
>> + int fd = unix_listen(path, NULL, strlen(path));
>> + if (fd == -1) {
>> + g_critical("error opening path: %s", strerror(errno));
>> + return false;
>> + }
>> + ga_channel_listen_add(c, fd, true);
>> + break;
>> + }
>> + default:
>> + g_critical("error binding/listening to specified socket");
>> + return false;
>> + }
>> +
>> + return true;
>> +}
>> +
>> +GIOStatus ga_channel_write_all(GAChannel *c, const gchar *buf, gsize
>> size)
>> +{
>> + GError *err = NULL;
>> + gsize written = 0;
>> + GIOStatus status = G_IO_STATUS_NORMAL;
>> +
>> + while (size) {
>> + status = g_io_channel_write_chars(c->client_channel, buf, size,
>> +&written,&err);
>> + g_debug("sending data, count: %d", (int)size);
>> + if (err != NULL) {
>> + g_warning("error writing to channel: %s", err->message);
>> + return G_IO_STATUS_ERROR;
>> + }
>> + if (status != G_IO_STATUS_NORMAL) {
>> + break;
>> + }
>> + size -= written;
>> + }
>> +
>> + if (status == G_IO_STATUS_NORMAL) {
>> + status = g_io_channel_flush(c->client_channel,&err);
>> + if (err != NULL) {
>> + g_warning("error flushing channel: %s", err->message);
>> + return G_IO_STATUS_ERROR;
>> + }
>> + }
>> +
>> + return status;
>> +}
>> +
>> +GIOStatus ga_channel_read(GAChannel *c, gchar *buf, gsize size, gsize
>> *count)
>> +{
>> + return g_io_channel_read_chars(c->client_channel, buf, size, count,
>> NULL);
>> +}
>> +
>> +GAChannel *ga_channel_new(GAChannelMethod method, const gchar *path,
>> + GAChannelCallback cb, gpointer opaque)
>> +{
>> + GAChannel *c = g_malloc0(sizeof(GAChannel));
>> + c->event_cb = cb;
>> + c->user_data = opaque;
>> +
>> + if (!ga_channel_open(c, path, method)) {
>> + g_critical("error opening channel");
>> + ga_channel_free(c);
>> + return NULL;
>> + }
>> +
>> + return c;
>> +}
>> +
>> +void ga_channel_free(GAChannel *c)
>> +{
>> + if (c->method == GA_CHANNEL_UNIX_LISTEN
>> +&& c->listen_channel) {
>> + ga_channel_listen_close(c);
>> + }
>> + if (c->client_channel) {
>> + ga_channel_client_close(c);
>> + }
>> + g_free(c);
>> +}
>> diff --git a/qga/channel.h b/qga/channel.h
>> new file mode 100644
>> index 0000000..3704ea9
>> --- /dev/null
>> +++ b/qga/channel.h
>> @@ -0,0 +1,33 @@
>> +/*
>> + * QEMU Guest Agent channel declarations
>> + *
>> + * Copyright IBM Corp. 2012
>> + *
>> + * Authors:
>> + * Michael Roth<mdroth@linux.vnet.ibm.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>> later.
>> + * See the COPYING file in the top-level directory.
>> + */
>> +#ifndef QGA_CHANNEL_H
>> +#define QGA_CHANNEL_H
>> +
>> +#include<glib.h>
>> +
>> +typedef struct GAChannel GAChannel;
>> +
>> +typedef enum {
>> + GA_CHANNEL_VIRTIO_SERIAL,
>> + GA_CHANNEL_ISA_SERIAL,
>> + GA_CHANNEL_UNIX_LISTEN,
>> +} GAChannelMethod;
>> +
>> +typedef gboolean (*GAChannelCallback)(GIOCondition condition,
>> gpointer opaque);
>> +
>> +GAChannel *ga_channel_new(GAChannelMethod method, const gchar *path,
>> + GAChannelCallback cb, gpointer opaque);
>> +void ga_channel_free(GAChannel *c);
>> +GIOStatus ga_channel_read(GAChannel *c, gchar *buf, gsize size, gsize
>> *count);
>> +GIOStatus ga_channel_write_all(GAChannel *c, const gchar *buf, gsize
>> size);
>> +
>> +#endif
>> diff --git a/qga/guest-agent-core.h b/qga/guest-agent-core.h
>> index 7327e73..891ed06 100644
>> --- a/qga/guest-agent-core.h
>> +++ b/qga/guest-agent-core.h
>> @@ -22,7 +22,7 @@
>> #define QGA_SUPPORT_LEVEL_MAJOR_MIN 1
>> #define QGA_SUPPORT_LEVEL_MINOR_MIN 0
>> #define QGA_SUPPORT_LEVEL_MICRO_MIN 0
>> -#define QGA_READ_COUNT_DEFAULT 4<< 10
>> +#define QGA_READ_COUNT_DEFAULT 4096
>>
>> typedef struct GAState GAState;
>> typedef struct GACommandState GACommandState;
>
diff mbox

Patch

diff --git a/Makefile.objs b/Makefile.objs
index f94c881..72141cc 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -417,6 +417,7 @@  common-obj-y += qmp.o hmp.o
 # guest agent
 
 qga-nested-y = guest-agent-commands.o guest-agent-command-state.o
+qga-nested-y += channel-posix.o
 qga-obj-y = $(addprefix qga/, $(qga-nested-y))
 qga-obj-y += qemu-ga.o qemu-sockets.o module.o qemu-option.o
 qga-obj-$(CONFIG_WIN32) += oslib-win32.o
diff --git a/qemu-ga.c b/qemu-ga.c
index 8e5b075..5f89ab9 100644
--- a/qemu-ga.c
+++ b/qemu-ga.c
@@ -15,9 +15,7 @@ 
 #include <stdbool.h>
 #include <glib.h>
 #include <getopt.h>
-#include <termios.h>
 #include <syslog.h>
-#include "qemu_socket.h"
 #include "json-streamer.h"
 #include "json-parser.h"
 #include "qint.h"
@@ -29,19 +27,15 @@ 
 #include "error_int.h"
 #include "qapi/qmp-core.h"
 #include "qga-qapi-types.h"
+#include "qga/channel.h"
 
 #define QGA_VIRTIO_PATH_DEFAULT "/dev/virtio-ports/org.qemu.guest_agent.0"
 #define QGA_PIDFILE_DEFAULT "/var/run/qemu-ga.pid"
-#define QGA_BAUDRATE_DEFAULT B38400 /* for isa-serial channels */
-#define QGA_TIMEOUT_DEFAULT 30*1000 /* ms */
 
 struct GAState {
     JSONMessageParser parser;
     GMainLoop *main_loop;
-    GIOChannel *conn_channel;
-    GIOChannel *listen_channel;
-    const char *path;
-    const char *method;
+    GAChannel *channel;
     bool virtio; /* fastpath to check for virtio to deal with poll() quirks */
     GACommandState *command_state;
     GLogLevelFlags log_level;
@@ -60,7 +54,7 @@  static void quit_handler(int sig)
     }
 }
 
-static void register_signal_handlers(void)
+static gboolean register_signal_handlers(void)
 {
     struct sigaction sigact;
     int ret;
@@ -71,12 +65,14 @@  static void register_signal_handlers(void)
     ret = sigaction(SIGINT, &sigact, NULL);
     if (ret == -1) {
         g_error("error configuring signal handler: %s", strerror(errno));
-        exit(EXIT_FAILURE);
+        return false;
     }
     ret = sigaction(SIGTERM, &sigact, NULL);
     if (ret == -1) {
         g_error("error configuring signal handler: %s", strerror(errno));
+        return false;
     }
+    return true;
 }
 
 static void usage(const char *cmd)
@@ -101,8 +97,6 @@  static void usage(const char *cmd)
     , cmd, QGA_VERSION, QGA_VIRTIO_PATH_DEFAULT, QGA_PIDFILE_DEFAULT);
 }
 
-static void conn_channel_close(GAState *s);
-
 static GuestAgentSupportLevel ga_support_level;
 
 static int ga_cmp_support_levels(GuestAgentSupportLevel a,
@@ -250,40 +244,13 @@  fail:
     exit(EXIT_FAILURE);
 }
 
-static int conn_channel_send_buf(GIOChannel *channel, const char *buf,
-                                 gsize count)
+static int send_response(GAState *s, QObject *payload)
 {
-    GError *err = NULL;
-    gsize written = 0;
-    GIOStatus status;
-
-    while (count) {
-        status = g_io_channel_write_chars(channel, buf, count, &written, &err);
-        g_debug("sending data, count: %d", (int)count);
-        if (err != NULL) {
-            g_warning("error sending newline: %s", err->message);
-            return err->code;
-        }
-        if (status == G_IO_STATUS_ERROR || status == G_IO_STATUS_EOF) {
-            return -EPIPE;
-        }
-
-        if (status == G_IO_STATUS_NORMAL) {
-            count -= written;
-        }
-    }
-
-    return 0;
-}
-
-static int conn_channel_send_payload(GIOChannel *channel, QObject *payload)
-{
-    int ret = 0;
     const char *buf;
     QString *payload_qstr;
-    GError *err = NULL;
+    GIOStatus status;
 
-    g_assert(payload && channel);
+    g_assert(payload && s->channel);
 
     payload_qstr = qobject_to_json(payload);
     if (!payload_qstr) {
@@ -292,24 +259,13 @@  static int conn_channel_send_payload(GIOChannel *channel, QObject *payload)
 
     qstring_append_chr(payload_qstr, '\n');
     buf = qstring_get_str(payload_qstr);
-    ret = conn_channel_send_buf(channel, buf, strlen(buf));
-    if (ret) {
-        goto out_free;
-    }
-
-    g_io_channel_flush(channel, &err);
-    if (err != NULL) {
-        g_warning("error flushing payload: %s", err->message);
-        ret = err->code;
-        goto out_free;
-    }
-
-out_free:
+    status = ga_channel_write_all(s->channel, buf, strlen(buf));
     QDECREF(payload_qstr);
-    if (err) {
-        g_error_free(err);
+    if (status != G_IO_STATUS_NORMAL) {
+        return -EIO;
     }
-    return ret;
+
+    return 0;
 }
 
 static void process_command(GAState *s, QDict *req)
@@ -321,9 +277,9 @@  static void process_command(GAState *s, QDict *req)
     g_debug("processing command");
     rsp = qmp_dispatch(QOBJECT(req));
     if (rsp) {
-        ret = conn_channel_send_payload(s->conn_channel, rsp);
+        ret = send_response(s, rsp);
         if (ret) {
-            g_warning("error sending payload: %s", strerror(ret));
+            g_warning("error sending response: %s", strerror(ret));
         }
         qobject_decref(rsp);
     } else {
@@ -373,38 +329,42 @@  static void process_event(JSONMessageParser *parser, QList *tokens)
             qdict_put_obj(qdict, "error", error_get_qobject(err));
             error_free(err);
         }
-        ret = conn_channel_send_payload(s->conn_channel, QOBJECT(qdict));
+        ret = send_response(s, QOBJECT(qdict));
         if (ret) {
-            g_warning("error sending payload: %s", strerror(ret));
+            g_warning("error sending error response: %s", strerror(ret));
         }
     }
 
     QDECREF(qdict);
 }
 
-static gboolean conn_channel_read(GIOChannel *channel, GIOCondition condition,
-                                  gpointer data)
+/* false return signals GAChannel to close the current client connection */
+static gboolean channel_event_cb(GIOCondition condition, gpointer data)
 {
     GAState *s = data;
-    gchar buf[1024];
+    gchar buf[QGA_READ_COUNT_DEFAULT+1];
     gsize count;
     GError *err = NULL;
-    memset(buf, 0, 1024);
-    GIOStatus status = g_io_channel_read_chars(channel, buf, 1024,
-                                               &count, &err);
+    GIOStatus status = ga_channel_read(s->channel, buf, QGA_READ_COUNT_DEFAULT, &count);
     if (err != NULL) {
         g_warning("error reading channel: %s", err->message);
-        conn_channel_close(s);
         g_error_free(err);
         return false;
     }
     switch (status) {
     case G_IO_STATUS_ERROR:
-        g_warning("problem");
+        g_warning("error reading channel");
         return false;
     case G_IO_STATUS_NORMAL:
+        buf[count] = 0;
         g_debug("read data, count: %d, data: %s", (int)count, buf);
         json_message_parser_feed(&s->parser, (char *)buf, (int)count);
+        break;
+    case G_IO_STATUS_EOF:
+        g_debug("received EOF");
+        if (!s->virtio) {
+            return false;
+        }
     case G_IO_STATUS_AGAIN:
         /* virtio causes us to spin here when no process is attached to
          * host-side chardev. sleep a bit to mitigate this
@@ -413,180 +373,49 @@  static gboolean conn_channel_read(GIOChannel *channel, GIOCondition condition,
             usleep(100*1000);
         }
         return true;
-    case G_IO_STATUS_EOF:
-        g_debug("received EOF");
-        conn_channel_close(s);
-        if (s->virtio) {
-            return true;
-        }
-        return false;
     default:
         g_warning("unknown channel read status, closing");
-        conn_channel_close(s);
         return false;
     }
     return true;
 }
 
-static int conn_channel_add(GAState *s, int fd)
+static gboolean channel_init(GAState *s, const gchar *method, const gchar *path)
 {
-    GIOChannel *conn_channel;
-    GError *err = NULL;
+    GAChannelMethod channel_method;
 
-    g_assert(s && !s->conn_channel);
-    conn_channel = g_io_channel_unix_new(fd);
-    g_assert(conn_channel);
-    g_io_channel_set_encoding(conn_channel, NULL, &err);
-    if (err != NULL) {
-        g_warning("error setting channel encoding to binary");
-        g_error_free(err);
-        return -1;
+    if (method == NULL) {
+        method = "virtio-serial";
     }
-    g_io_add_watch(conn_channel, G_IO_IN | G_IO_HUP,
-                   conn_channel_read, s);
-    s->conn_channel = conn_channel;
-    return 0;
-}
 
-static gboolean listen_channel_accept(GIOChannel *channel,
-                                      GIOCondition condition, gpointer data)
-{
-    GAState *s = data;
-    g_assert(channel != NULL);
-    int ret, conn_fd;
-    bool accepted = false;
-    struct sockaddr_un addr;
-    socklen_t addrlen = sizeof(addr);
-
-    conn_fd = qemu_accept(g_io_channel_unix_get_fd(s->listen_channel),
-                             (struct sockaddr *)&addr, &addrlen);
-    if (conn_fd == -1) {
-        g_warning("error converting fd to gsocket: %s", strerror(errno));
-        goto out;
-    }
-    fcntl(conn_fd, F_SETFL, O_NONBLOCK);
-    ret = conn_channel_add(s, conn_fd);
-    if (ret) {
-        g_warning("error setting up connection");
-        goto out;
-    }
-    accepted = true;
-
-out:
-    /* only accept 1 connection at a time */
-    return !accepted;
-}
-
-/* start polling for readable events on listen fd, new==true
- * indicates we should use the existing s->listen_channel
- */
-static int listen_channel_add(GAState *s, int listen_fd, bool new)
-{
-    if (new) {
-        s->listen_channel = g_io_channel_unix_new(listen_fd);
-    }
-    g_io_add_watch(s->listen_channel, G_IO_IN,
-                   listen_channel_accept, s);
-    return 0;
-}
-
-/* cleanup state for closed connection/session, start accepting new
- * connections if we're in listening mode
- */
-static void conn_channel_close(GAState *s)
-{
-    if (strcmp(s->method, "unix-listen") == 0) {
-        g_io_channel_shutdown(s->conn_channel, true, NULL);
-        listen_channel_add(s, 0, false);
-    } else if (strcmp(s->method, "virtio-serial") == 0) {
-        /* we spin on EOF for virtio-serial, so back off a bit. also,
-         * dont close the connection in this case, it'll resume normal
-         * operation when another process connects to host chardev
-         */
-        usleep(100*1000);
-        goto out_noclose;
-    }
-    g_io_channel_unref(s->conn_channel);
-    s->conn_channel = NULL;
-out_noclose:
-    return;
-}
-
-static void init_guest_agent(GAState *s)
-{
-    struct termios tio;
-    int ret, fd;
-
-    if (s->method == NULL) {
-        /* try virtio-serial as our default */
-        s->method = "virtio-serial";
-    }
-
-    if (s->path == NULL) {
-        if (strcmp(s->method, "virtio-serial") != 0) {
+    if (path == NULL) {
+        if (strcmp(method, "virtio-serial") != 0) {
             g_critical("must specify a path for this channel");
-            exit(EXIT_FAILURE);
+            return false;
         }
         /* try the default path for the virtio-serial port */
-        s->path = QGA_VIRTIO_PATH_DEFAULT;
+        path = QGA_VIRTIO_PATH_DEFAULT;
     }
 
-    if (strcmp(s->method, "virtio-serial") == 0) {
-        s->virtio = true;
-        fd = qemu_open(s->path, O_RDWR | O_NONBLOCK | O_ASYNC);
-        if (fd == -1) {
-            g_critical("error opening channel: %s", strerror(errno));
-            exit(EXIT_FAILURE);
-        }
-        ret = conn_channel_add(s, fd);
-        if (ret) {
-            g_critical("error adding channel to main loop");
-            exit(EXIT_FAILURE);
-        }
-    } else if (strcmp(s->method, "isa-serial") == 0) {
-        fd = qemu_open(s->path, O_RDWR | O_NOCTTY);
-        if (fd == -1) {
-            g_critical("error opening channel: %s", strerror(errno));
-            exit(EXIT_FAILURE);
-        }
-        tcgetattr(fd, &tio);
-        /* set up serial port for non-canonical, dumb byte streaming */
-        tio.c_iflag &= ~(IGNBRK | BRKINT | IGNPAR | PARMRK | INPCK | ISTRIP |
-                         INLCR | IGNCR | ICRNL | IXON | IXOFF | IXANY |
-                         IMAXBEL);
-        tio.c_oflag = 0;
-        tio.c_lflag = 0;
-        tio.c_cflag |= QGA_BAUDRATE_DEFAULT;
-        /* 1 available byte min or reads will block (we'll set non-blocking
-         * elsewhere, else we have to deal with read()=0 instead)
-         */
-        tio.c_cc[VMIN] = 1;
-        tio.c_cc[VTIME] = 0;
-        /* flush everything waiting for read/xmit, it's garbage at this point */
-        tcflush(fd, TCIFLUSH);
-        tcsetattr(fd, TCSANOW, &tio);
-        ret = conn_channel_add(s, fd);
-        if (ret) {
-            g_error("error adding channel to main loop");
-        }
-    } else if (strcmp(s->method, "unix-listen") == 0) {
-        fd = unix_listen(s->path, NULL, strlen(s->path));
-        if (fd == -1) {
-            g_critical("error opening path: %s", strerror(errno));
-            exit(EXIT_FAILURE);
-        }
-        ret = listen_channel_add(s, fd, true);
-        if (ret) {
-            g_critical("error binding/listening to specified socket");
-            exit(EXIT_FAILURE);
-        }
+    if (strcmp(method, "virtio-serial") == 0) {
+        s->virtio = true; /* virtio requires special handling in some cases */
+        channel_method = GA_CHANNEL_VIRTIO_SERIAL;
+    } else if (strcmp(method, "isa-serial") == 0) {
+        channel_method = GA_CHANNEL_ISA_SERIAL;
+    } else if (strcmp(method, "unix-listen") == 0) {
+        channel_method = GA_CHANNEL_UNIX_LISTEN;
     } else {
-        g_critical("unsupported channel method/type: %s", s->method);
-        exit(EXIT_FAILURE);
+        g_critical("unsupported channel method/type: %s", method);
+        return false;
     }
 
-    json_message_parser_init(&s->parser, process_event);
-    s->main_loop = g_main_loop_new(NULL, false);
+    s->channel = ga_channel_new(channel_method, path, channel_event_cb, s);
+    if (!s->channel) {
+        g_critical("failed to create guest agent channel");
+        return false;
+    }
+
+    return true;
 }
 
 int main(int argc, char **argv)
@@ -689,9 +518,6 @@  int main(int argc, char **argv)
 
     ga_set_support_level(default_support_level);
     s = g_malloc0(sizeof(GAState));
-    s->conn_channel = NULL;
-    s->path = path;
-    s->method = method;
     s->log_file = log_file;
     s->log_level = log_level;
     g_log_set_default_handler(ga_log, s);
@@ -700,15 +526,31 @@  int main(int argc, char **argv)
     s->command_state = ga_command_state_new();
     ga_command_state_init(s, s->command_state);
     ga_command_state_init_all(s->command_state);
+    json_message_parser_init(&s->parser, process_event);
     ga_state = s;
+    if (!register_signal_handlers()) {
+        g_critical("failed to register signal handlers");
+        goto out_bad;
+    }
 
-    init_guest_agent(ga_state);
-    register_signal_handlers();
-
+    s->main_loop = g_main_loop_new(NULL, false);
+    if (!channel_init(ga_state, method, path)) {
+        g_critical("failed to initialize guest agent channel");
+        goto out_bad;
+    }
     g_main_loop_run(ga_state->main_loop);
 
     ga_command_state_cleanup_all(ga_state->command_state);
-    unlink(pidfile);
+    ga_channel_free(ga_state->channel);
 
+    if (daemonize) {
+        unlink(pidfile);
+    }
     return 0;
+
+out_bad:
+    if (daemonize) {
+        unlink(pidfile);
+    }
+    return EXIT_FAILURE;
 }
diff --git a/qga/channel-posix.c b/qga/channel-posix.c
new file mode 100644
index 0000000..40f7658
--- /dev/null
+++ b/qga/channel-posix.c
@@ -0,0 +1,246 @@ 
+#include <glib.h>
+#include <termios.h>
+#include "qemu_socket.h"
+#include "qga/channel.h"
+
+#define GA_CHANNEL_BAUDRATE_DEFAULT B38400 /* for isa-serial channels */
+
+struct GAChannel {
+    GIOChannel *listen_channel;
+    GIOChannel *client_channel;
+    GAChannelMethod method;
+    GAChannelCallback event_cb;
+    gpointer user_data;
+};
+
+static int ga_channel_client_add(GAChannel *c, int fd);
+
+static gboolean ga_channel_listen_accept(GIOChannel *channel,
+                                         GIOCondition condition, gpointer data)
+{
+    GAChannel *c = data;
+    int ret, client_fd;
+    bool accepted = false;
+    struct sockaddr_un addr;
+    socklen_t addrlen = sizeof(addr);
+
+    g_assert(channel != NULL);
+
+    client_fd = qemu_accept(g_io_channel_unix_get_fd(channel),
+                            (struct sockaddr *)&addr, &addrlen);
+    if (client_fd == -1) {
+        g_warning("error converting fd to gsocket: %s", strerror(errno));
+        goto out;
+    }
+    fcntl(client_fd, F_SETFL, O_NONBLOCK);
+    ret = ga_channel_client_add(c, client_fd);
+    if (ret) {
+        g_warning("error setting up connection");
+        goto out;
+    }
+    accepted = true;
+
+out:
+    /* only accept 1 connection at a time */
+    return !accepted;
+}
+
+/* start polling for readable events on listen fd, new==true
+ * indicates we should use the existing s->listen_channel
+ */
+static void ga_channel_listen_add(GAChannel *c, int listen_fd, bool create)
+{
+    if (create) {
+        c->listen_channel = g_io_channel_unix_new(listen_fd);
+    }
+    g_io_add_watch(c->listen_channel, G_IO_IN, ga_channel_listen_accept, c);
+}
+
+static void ga_channel_listen_close(GAChannel *c)
+{
+    g_assert(c->method == GA_CHANNEL_UNIX_LISTEN);
+    g_assert(c->listen_channel);
+    g_io_channel_shutdown(c->listen_channel, true, NULL);
+    g_io_channel_unref(c->listen_channel);
+    c->listen_channel = NULL;
+}
+
+/* cleanup state for closed connection/session, start accepting new
+ * connections if we're in listening mode
+ */
+static void ga_channel_client_close(GAChannel *c)
+{
+    g_assert(c->client_channel);
+    g_io_channel_shutdown(c->client_channel, true, NULL);
+    g_io_channel_unref(c->client_channel);
+    c->client_channel = NULL;
+    if (c->method == GA_CHANNEL_UNIX_LISTEN && c->listen_channel) {
+        ga_channel_listen_add(c, 0, false);
+    }
+}
+
+static gboolean ga_channel_client_event(GIOChannel *channel,
+                                        GIOCondition condition, gpointer data)
+{
+    GAChannel *c = data;
+    gboolean client_cont;
+
+    g_assert(c);
+    if (c->event_cb) {
+        client_cont = c->event_cb(condition, c->user_data);
+        if (!client_cont) {
+            ga_channel_client_close(c);
+            return false;
+        }
+    }
+    return true;
+}
+
+static int ga_channel_client_add(GAChannel *c, int fd)
+{
+    GIOChannel *client_channel;
+    GError *err = NULL;
+
+    g_assert(c && !c->client_channel);
+    client_channel = g_io_channel_unix_new(fd);
+    g_assert(client_channel);
+    g_io_channel_set_encoding(client_channel, NULL, &err);
+    if (err != NULL) {
+        g_warning("error setting channel encoding to binary");
+        g_error_free(err);
+        return -1;
+    }
+    g_io_add_watch(client_channel, G_IO_IN | G_IO_HUP,
+                   ga_channel_client_event, c);
+    c->client_channel = client_channel;
+    return 0;
+}
+
+static gboolean ga_channel_open(GAChannel *c, const gchar *path, GAChannelMethod method)
+{
+    int ret;
+    c->method = method;
+
+    switch (c->method) {
+    case GA_CHANNEL_VIRTIO_SERIAL: {
+        int fd = qemu_open(path, O_RDWR | O_NONBLOCK | O_ASYNC);
+        if (fd == -1) {
+            g_critical("error opening channel: %s", strerror(errno));
+            exit(EXIT_FAILURE);
+        }
+        ret = ga_channel_client_add(c, fd);
+        if (ret) {
+            g_critical("error adding channel to main loop");
+            return false;
+        }
+        break;
+    }
+    case GA_CHANNEL_ISA_SERIAL: {
+        struct termios tio;
+        int fd = qemu_open(path, O_RDWR | O_NOCTTY | O_NONBLOCK);
+        if (fd == -1) {
+            g_critical("error opening channel: %s", strerror(errno));
+            exit(EXIT_FAILURE);
+        }
+        tcgetattr(fd, &tio);
+        /* set up serial port for non-canonical, dumb byte streaming */
+        tio.c_iflag &= ~(IGNBRK | BRKINT | IGNPAR | PARMRK | INPCK | ISTRIP |
+                         INLCR | IGNCR | ICRNL | IXON | IXOFF | IXANY |
+                         IMAXBEL);
+        tio.c_oflag = 0;
+        tio.c_lflag = 0;
+        tio.c_cflag |= GA_CHANNEL_BAUDRATE_DEFAULT;
+        /* 1 available byte min or reads will block (we'll set non-blocking
+         * elsewhere, else we have to deal with read()=0 instead)
+         */
+        tio.c_cc[VMIN] = 1;
+        tio.c_cc[VTIME] = 0;
+        /* flush everything waiting for read/xmit, it's garbage at this point */
+        tcflush(fd, TCIFLUSH);
+        tcsetattr(fd, TCSANOW, &tio);
+        ret = ga_channel_client_add(c, fd);
+        if (ret) {
+            g_error("error adding channel to main loop");
+        }
+        break;
+    }
+    case GA_CHANNEL_UNIX_LISTEN: {
+        int fd = unix_listen(path, NULL, strlen(path));
+        if (fd == -1) {
+            g_critical("error opening path: %s", strerror(errno));
+            return false;
+        }
+        ga_channel_listen_add(c, fd, true);
+        break;
+    }
+    default:
+        g_critical("error binding/listening to specified socket");
+        return false;
+    }
+
+    return true;
+}
+
+GIOStatus ga_channel_write_all(GAChannel *c, const gchar *buf, gsize size)
+{
+    GError *err = NULL;
+    gsize written = 0;
+    GIOStatus status = G_IO_STATUS_NORMAL;
+
+    while (size) {
+        status = g_io_channel_write_chars(c->client_channel, buf, size,
+                                          &written, &err);
+        g_debug("sending data, count: %d", (int)size);
+        if (err != NULL) {
+            g_warning("error writing to channel: %s", err->message);
+            return G_IO_STATUS_ERROR;
+        }
+        if (status != G_IO_STATUS_NORMAL) {
+            break;
+        }
+        size -= written;
+    }
+
+    if (status == G_IO_STATUS_NORMAL) {
+        status = g_io_channel_flush(c->client_channel, &err);
+        if (err != NULL) {
+            g_warning("error flushing channel: %s", err->message);
+            return G_IO_STATUS_ERROR;
+        }
+    }
+
+    return status;
+}
+
+GIOStatus ga_channel_read(GAChannel *c, gchar *buf, gsize size, gsize *count)
+{
+    return g_io_channel_read_chars(c->client_channel, buf, size, count, NULL);
+}
+
+GAChannel *ga_channel_new(GAChannelMethod method, const gchar *path,
+                          GAChannelCallback cb, gpointer opaque)
+{
+    GAChannel *c = g_malloc0(sizeof(GAChannel));
+    c->event_cb = cb;
+    c->user_data = opaque;
+
+    if (!ga_channel_open(c, path, method)) {
+        g_critical("error opening channel");
+        ga_channel_free(c);
+        return NULL;
+    }
+
+    return c;
+}
+
+void ga_channel_free(GAChannel *c)
+{
+    if (c->method == GA_CHANNEL_UNIX_LISTEN
+        && c->listen_channel) {
+        ga_channel_listen_close(c);
+    }
+    if (c->client_channel) {
+        ga_channel_client_close(c);
+    }
+    g_free(c);
+}
diff --git a/qga/channel.h b/qga/channel.h
new file mode 100644
index 0000000..3704ea9
--- /dev/null
+++ b/qga/channel.h
@@ -0,0 +1,33 @@ 
+/*
+ * QEMU Guest Agent channel declarations
+ *
+ * Copyright IBM Corp. 2012
+ *
+ * Authors:
+ *  Michael Roth      <mdroth@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#ifndef QGA_CHANNEL_H
+#define QGA_CHANNEL_H
+
+#include <glib.h>
+
+typedef struct GAChannel GAChannel;
+
+typedef enum {
+    GA_CHANNEL_VIRTIO_SERIAL,
+    GA_CHANNEL_ISA_SERIAL,
+    GA_CHANNEL_UNIX_LISTEN,
+} GAChannelMethod;
+
+typedef gboolean (*GAChannelCallback)(GIOCondition condition, gpointer opaque);
+
+GAChannel *ga_channel_new(GAChannelMethod method, const gchar *path,
+                          GAChannelCallback cb, gpointer opaque);
+void ga_channel_free(GAChannel *c);
+GIOStatus ga_channel_read(GAChannel *c, gchar *buf, gsize size, gsize *count);
+GIOStatus ga_channel_write_all(GAChannel *c, const gchar *buf, gsize size);
+
+#endif
diff --git a/qga/guest-agent-core.h b/qga/guest-agent-core.h
index 7327e73..891ed06 100644
--- a/qga/guest-agent-core.h
+++ b/qga/guest-agent-core.h
@@ -22,7 +22,7 @@ 
 #define QGA_SUPPORT_LEVEL_MAJOR_MIN 1
 #define QGA_SUPPORT_LEVEL_MINOR_MIN 0
 #define QGA_SUPPORT_LEVEL_MICRO_MIN 0
-#define QGA_READ_COUNT_DEFAULT 4 << 10
+#define QGA_READ_COUNT_DEFAULT 4096
 
 typedef struct GAState GAState;
 typedef struct GACommandState GACommandState;