diff mbox series

[LEDE-DEV] ubusd: Use linked list for queued messages

Message ID 20180502205530.9032-1-i@qbox.audio
State Changes Requested
Delegated to: John Crispin
Headers show
Series [LEDE-DEV] ubusd: Use linked list for queued messages | expand

Commit Message

Benjamin Hansmann May 2, 2018, 8:55 p.m. UTC
The fixed size array for queuing messages led to discarding messages
when it was full, using a linked list instead solves this issue.

Having the list_head link in the ubus_msg_buf itself avoids the
allocation of more memory for an independent list.

The motivation was that for a recursive "ubus list" the function
ubusd_proto.c:ubusd_handle_lookup() produces more than n messages in
one uloop cycle when n objects are registered on the bus.

Signed-off-by: Benjamin Hansmann <i@qbox.audio>
---
 ubusd.c       | 19 ++++++++-----------
 ubusd.h       |  6 +++---
 ubusd_proto.c |  1 +
 3 files changed, 12 insertions(+), 14 deletions(-)

Comments

Alexandru Ardelean May 3, 2018, 7:16 a.m. UTC | #1
On Wed, May 2, 2018 at 11:55 PM, Benjamin Hansmann <i@qbox.audio> wrote:
> The fixed size array for queuing messages led to discarding messages
> when it was full, using a linked list instead solves this issue.
>
> Having the list_head link in the ubus_msg_buf itself avoids the
> allocation of more memory for an independent list.
>
> The motivation was that for a recursive "ubus list" the function
> ubusd_proto.c:ubusd_handle_lookup() produces more than n messages in
> one uloop cycle when n objects are registered on the bus.
>

Hey,

I second this patch.
I also proposed it a while back
http://patchwork.ozlabs.org/patch/772366/

This was part of a series of ubus fixes.
I added a test that shows the issue [I was seeing]:
http://patchwork.ozlabs.org/patch/772365/

The issue [I was seeing] was]more of a fast-producer - slow-consumer issue.
This was caused mostly when running a ubus cmd on the TTY serial [
causing a slowdown which showed the issue ].
But in my case, another patch had a greater impact that the dynamic TX queue ;
this one: http://patchwork.ozlabs.org/patch/772364/
I was never hitting the 32 entries limit.

I also discarded this patch [at the time] because I was unsure whether
it causes more issues [than it solves].
And did not have time to go more in-depth with it.

But I think, that if this helps your case, it should be good.

Also [with this occasion]: thanks Felix for merging my other ubus patches.

Thanks
Alex

> Signed-off-by: Benjamin Hansmann <i@qbox.audio>
> ---
>  ubusd.c       | 19 ++++++++-----------
>  ubusd.h       |  6 +++---
>  ubusd_proto.c |  1 +
>  3 files changed, 12 insertions(+), 14 deletions(-)
>
> diff --git a/ubusd.c b/ubusd.c
> index ba1ff07..b7e1f79 100644
> --- a/ubusd.c
> +++ b/ubusd.c
> @@ -138,11 +138,8 @@ static int ubus_msg_writev(int fd, struct ubus_msg_buf *ub, int offset)
>
>  static void ubus_msg_enqueue(struct ubus_client *cl, struct ubus_msg_buf *ub)
>  {
> -       if (cl->tx_queue[cl->txq_tail])
> -               return;
> -
> -       cl->tx_queue[cl->txq_tail] = ubus_msg_ref(ub);
> -       cl->txq_tail = (cl->txq_tail + 1) % ARRAY_SIZE(cl->tx_queue);
> +       struct ubus_msg_buf *qub = ubus_msg_ref(ub);
> +       list_add_tail(&qub->queue, &cl->tx_queue);
>  }
>
>  /* takes the msgbuf reference */
> @@ -153,7 +150,7 @@ void ubus_msg_send(struct ubus_client *cl, struct ubus_msg_buf *ub)
>         if (ub->hdr.type != UBUS_MSG_MONITOR)
>                 ubusd_monitor_message(cl, ub, true);
>
> -       if (!cl->tx_queue[cl->txq_cur]) {
> +       if (list_empty(&cl->tx_queue)) {
>                 written = ubus_msg_writev(cl->sock.fd, ub, 0);
>
>                 if (written < 0)
> @@ -172,20 +169,20 @@ void ubus_msg_send(struct ubus_client *cl, struct ubus_msg_buf *ub)
>
>  static struct ubus_msg_buf *ubus_msg_head(struct ubus_client *cl)
>  {
> -       return cl->tx_queue[cl->txq_cur];
> +       if (list_empty(&cl->tx_queue))
> +               return NULL;
> +       return list_first_entry(&cl->tx_queue, struct ubus_msg_buf, queue);
>  }
>
>  static void ubus_msg_dequeue(struct ubus_client *cl)
>  {
>         struct ubus_msg_buf *ub = ubus_msg_head(cl);
> -
>         if (!ub)
>                 return;
>
> -       ubus_msg_free(ub);
> +       list_del_init(&ub->queue);
>         cl->txq_ofs = 0;
> -       cl->tx_queue[cl->txq_cur] = NULL;
> -       cl->txq_cur = (cl->txq_cur + 1) % ARRAY_SIZE(cl->tx_queue);
> +       ubus_msg_free(ub);
>  }
>
>  static void handle_client_disconnect(struct ubus_client *cl)
> diff --git a/ubusd.h b/ubusd.h
> index 4d87920..375f31f 100644
> --- a/ubusd.h
> +++ b/ubusd.h
> @@ -23,13 +23,13 @@
>  #include "ubusmsg.h"
>  #include "ubusd_acl.h"
>
> -#define UBUSD_CLIENT_BACKLOG   32
>  #define UBUS_OBJ_HASH_BITS     4
>
>  extern struct blob_buf b;
>
>  struct ubus_msg_buf {
>         uint32_t refcount; /* ~0: uses external data buffer */
> +       struct list_head queue;
>         struct ubus_msghdr hdr;
>         struct blob_attr *data;
>         int fd;
> @@ -48,8 +48,8 @@ struct ubus_client {
>
>         struct list_head objects;
>
> -       struct ubus_msg_buf *tx_queue[UBUSD_CLIENT_BACKLOG];
> -       unsigned int txq_cur, txq_tail, txq_ofs;
> +       struct list_head tx_queue;
> +       unsigned int txq_ofs;
>
>         struct ubus_msg_buf *pending_msg;
>         struct ubus_msg_buf *retmsg;
> diff --git a/ubusd_proto.c b/ubusd_proto.c
> index 2d04b5a..ac9d075 100644
> --- a/ubusd_proto.c
> +++ b/ubusd_proto.c
> @@ -495,6 +495,7 @@ struct ubus_client *ubusd_proto_new_client(int fd, uloop_fd_handler cb)
>                 goto free;
>
>         INIT_LIST_HEAD(&cl->objects);
> +       INIT_LIST_HEAD(&cl->tx_queue);
>         cl->sock.fd = fd;
>         cl->sock.cb = cb;
>         cl->pending_msg_fd = -1;
> --
> 2.11.0
>
>
> _______________________________________________
> Lede-dev mailing list
> Lede-dev@lists.infradead.org
> http://lists.infradead.org/mailman/listinfo/lede-dev
Felix Fietkau May 3, 2018, 9:07 a.m. UTC | #2
Hi Benjamin,

On 2018-05-02 22:55, Benjamin Hansmann wrote:
> The fixed size array for queuing messages led to discarding messages
> when it was full, using a linked list instead solves this issue.
> 
> Having the list_head link in the ubus_msg_buf itself avoids the
> allocation of more memory for an independent list.
> 
> The motivation was that for a recursive "ubus list" the function
> ubusd_proto.c:ubusd_handle_lookup() produces more than n messages in
> one uloop cycle when n objects are registered on the bus.
> 
> Signed-off-by: Benjamin Hansmann <i@qbox.audio>
The reason for using an array is that a message can be queued for
multiple clients. Doing that with your patch could corrupt the list in
that case.

- Felix
Alexandru Ardelean May 3, 2018, 9:46 a.m. UTC | #3
On Thu, May 3, 2018 at 12:07 PM, Felix Fietkau <nbd@nbd.name> wrote:
> Hi Benjamin,
>
> On 2018-05-02 22:55, Benjamin Hansmann wrote:
>> The fixed size array for queuing messages led to discarding messages
>> when it was full, using a linked list instead solves this issue.
>>
>> Having the list_head link in the ubus_msg_buf itself avoids the
>> allocation of more memory for an independent list.
>>
>> The motivation was that for a recursive "ubus list" the function
>> ubusd_proto.c:ubusd_handle_lookup() produces more than n messages in
>> one uloop cycle when n objects are registered on the bus.
>>
>> Signed-off-by: Benjamin Hansmann <i@qbox.audio>
> The reason for using an array is that a message can be queued for
> multiple clients. Doing that with your patch could corrupt the list in
> that case.
>

Hmm, I also forgot about this stuff.
Since ubus events rely on the sharing of messages.

It sounds like there would be 2 options to handle this.
1) Make the TX queue configurable at compile-time [default being 32 as is now]
2) Do a realloc of the queue array, and potentially a realloc() back
to a stable state after a grace period ; similar to the
`msgbuf_reduction_counter` logic in libubus-io

> - Felix
>
> _______________________________________________
> Lede-dev mailing list
> Lede-dev@lists.infradead.org
> http://lists.infradead.org/mailman/listinfo/lede-dev
Benjamin Hansmann May 3, 2018, 10:14 a.m. UTC | #4
Hi Felix,

I understand. I just sent an alternative patch which maintains an
independent linked list which I think should circumvent the corruption
of the same. This of course introduces some additional memory
allocation on the heap.

When this shouldn't be an option, what would be your suggestion to deal
with the problem of a higher number of objects and then trying to list
them? Maybe we could also wait until the written data to the socket is
flushed for each found object in that case?

Benjamin

On Thu, 2018-05-03 at 11:07 +0200, Felix Fietkau wrote:
> Hi Benjamin,
> 
> On 2018-05-02 22:55, Benjamin Hansmann wrote:
> > The fixed size array for queuing messages led to discarding
> > messages
> > when it was full, using a linked list instead solves this issue.
> > 
> > Having the list_head link in the ubus_msg_buf itself avoids the
> > allocation of more memory for an independent list.
> > 
> > The motivation was that for a recursive "ubus list" the function
> > ubusd_proto.c:ubusd_handle_lookup() produces more than n messages
> > in
> > one uloop cycle when n objects are registered on the bus.
> > 
> > Signed-off-by: Benjamin Hansmann <i@qbox.audio>
> 
> The reason for using an array is that a message can be queued for
> multiple clients. Doing that with your patch could corrupt the list
> in
> that case.
> 
> - Felix
> 
> _______________________________________________
> Lede-dev mailing list
> Lede-dev@lists.infradead.org
> http://lists.infradead.org/mailman/listinfo/lede-dev
diff mbox series

Patch

diff --git a/ubusd.c b/ubusd.c
index ba1ff07..b7e1f79 100644
--- a/ubusd.c
+++ b/ubusd.c
@@ -138,11 +138,8 @@  static int ubus_msg_writev(int fd, struct ubus_msg_buf *ub, int offset)
 
 static void ubus_msg_enqueue(struct ubus_client *cl, struct ubus_msg_buf *ub)
 {
-	if (cl->tx_queue[cl->txq_tail])
-		return;
-
-	cl->tx_queue[cl->txq_tail] = ubus_msg_ref(ub);
-	cl->txq_tail = (cl->txq_tail + 1) % ARRAY_SIZE(cl->tx_queue);
+	struct ubus_msg_buf *qub = ubus_msg_ref(ub);
+	list_add_tail(&qub->queue, &cl->tx_queue);
 }
 
 /* takes the msgbuf reference */
@@ -153,7 +150,7 @@  void ubus_msg_send(struct ubus_client *cl, struct ubus_msg_buf *ub)
 	if (ub->hdr.type != UBUS_MSG_MONITOR)
 		ubusd_monitor_message(cl, ub, true);
 
-	if (!cl->tx_queue[cl->txq_cur]) {
+	if (list_empty(&cl->tx_queue)) {
 		written = ubus_msg_writev(cl->sock.fd, ub, 0);
 
 		if (written < 0)
@@ -172,20 +169,20 @@  void ubus_msg_send(struct ubus_client *cl, struct ubus_msg_buf *ub)
 
 static struct ubus_msg_buf *ubus_msg_head(struct ubus_client *cl)
 {
-	return cl->tx_queue[cl->txq_cur];
+	if (list_empty(&cl->tx_queue))
+		return NULL;
+	return list_first_entry(&cl->tx_queue, struct ubus_msg_buf, queue);
 }
 
 static void ubus_msg_dequeue(struct ubus_client *cl)
 {
 	struct ubus_msg_buf *ub = ubus_msg_head(cl);
-
 	if (!ub)
 		return;
 
-	ubus_msg_free(ub);
+	list_del_init(&ub->queue);
 	cl->txq_ofs = 0;
-	cl->tx_queue[cl->txq_cur] = NULL;
-	cl->txq_cur = (cl->txq_cur + 1) % ARRAY_SIZE(cl->tx_queue);
+	ubus_msg_free(ub);
 }
 
 static void handle_client_disconnect(struct ubus_client *cl)
diff --git a/ubusd.h b/ubusd.h
index 4d87920..375f31f 100644
--- a/ubusd.h
+++ b/ubusd.h
@@ -23,13 +23,13 @@ 
 #include "ubusmsg.h"
 #include "ubusd_acl.h"
 
-#define UBUSD_CLIENT_BACKLOG	32
 #define UBUS_OBJ_HASH_BITS	4
 
 extern struct blob_buf b;
 
 struct ubus_msg_buf {
 	uint32_t refcount; /* ~0: uses external data buffer */
+	struct list_head queue;
 	struct ubus_msghdr hdr;
 	struct blob_attr *data;
 	int fd;
@@ -48,8 +48,8 @@  struct ubus_client {
 
 	struct list_head objects;
 
-	struct ubus_msg_buf *tx_queue[UBUSD_CLIENT_BACKLOG];
-	unsigned int txq_cur, txq_tail, txq_ofs;
+	struct list_head tx_queue;
+	unsigned int txq_ofs;
 
 	struct ubus_msg_buf *pending_msg;
 	struct ubus_msg_buf *retmsg;
diff --git a/ubusd_proto.c b/ubusd_proto.c
index 2d04b5a..ac9d075 100644
--- a/ubusd_proto.c
+++ b/ubusd_proto.c
@@ -495,6 +495,7 @@  struct ubus_client *ubusd_proto_new_client(int fd, uloop_fd_handler cb)
 		goto free;
 
 	INIT_LIST_HEAD(&cl->objects);
+	INIT_LIST_HEAD(&cl->tx_queue);
 	cl->sock.fd = fd;
 	cl->sock.cb = cb;
 	cl->pending_msg_fd = -1;