diff mbox

[LEDE-DEV,2/3] ubusd: make `tx_queue` backlog dynamic

Message ID 1496833771-32723-2-git-send-email-ardeleanalex@gmail.com
State Rejected
Headers show

Commit Message

Alexandru Ardelean June 7, 2017, 11:09 a.m. UTC
It's not very often that the tx_queue is used,
to store backlog messages to send to a client.

And for most cases, 32 backlog messages seems to be enough.
In fact, for most cases, I've seen ~1 entry in the queue
being used every now-n-then.

The issue is more visible/present with the `ubus list` command.
I've traced the code to ubusd_handle_lookup() in:

```
    if (!attr[UBUS_ATTR_OBJPATH]) {
        avl_for_each_element(&path, obj, path)
            ubusd_send_obj(cl, ub, obj);
        return 0;
    }
```
The code-path eventually leads to `ubus_msg_send()`.
It seems that once the first element is queued, then
the condition check for `(!cl->tx_queue[cl->txq_cur])`
will queue all messages iterated in the above snippet,
without trying any writes.

This can be solved, either by making the queue dynamic
and allow it to expand above the current fixed limit (1).

Or, by forcing/allowing writes during the tx_queue-ing (2).

This patch implements (1).

Signed-off-by: Alexandru Ardelean <ardeleanalex@gmail.com>
---
 ubusd.c       | 18 +++++++++---------
 ubusd.h       |  6 +++---
 ubusd_proto.c |  1 +
 3 files changed, 13 insertions(+), 12 deletions(-)

Comments

Alexandru Ardelean June 7, 2017, 11:14 a.m. UTC | #1
On Wed, Jun 7, 2017 at 2:09 PM, Alexandru Ardelean
<ardeleanalex@gmail.com> wrote:
> It's not very often that the tx_queue is used,
> to store backlog messages to send to a client.
>
> And for most cases, 32 backlog messages seems to be enough.
> In fact, for most cases, I've seen ~1 entry in the queue
> being used every now-n-then.
>
> The issue is more visible/present with the `ubus list` command.
> I've traced the code to ubusd_handle_lookup() in:
>
> ```
>     if (!attr[UBUS_ATTR_OBJPATH]) {
>         avl_for_each_element(&path, obj, path)
>             ubusd_send_obj(cl, ub, obj);
>         return 0;
>     }
> ```
> The code-path eventually leads to `ubus_msg_send()`.
> It seems that once the first element is queued, then
> the condition check for `(!cl->tx_queue[cl->txq_cur])`
> will queue all messages iterated in the above snippet,
> without trying any writes.
>
> This can be solved, either by making the queue dynamic
> and allow it to expand above the current fixed limit (1).
>
> Or, by forcing/allowing writes during the tx_queue-ing (2).
>
> This patch implements (1).

To add to this.
I also implemented (2).
Link:
https://github.com/commodo/ubus/commit/78ac089e3664f3b0422271c84283f754119d4f4d

However, it does not seem to help with the issue.
I.e. the lua script in the previous patch, manages to fill up the
tx_queue even with
trying to send data from the tx_queue [ while queue-ing ].
Tho, I do feel that the patch (from Github) is forcing things a bit.

>
> Signed-off-by: Alexandru Ardelean <ardeleanalex@gmail.com>
> ---
>  ubusd.c       | 18 +++++++++---------
>  ubusd.h       |  6 +++---
>  ubusd_proto.c |  1 +
>  3 files changed, 13 insertions(+), 12 deletions(-)
>
> diff --git a/ubusd.c b/ubusd.c
> index f060b38..8fdb85b 100644
> --- a/ubusd.c
> +++ b/ubusd.c
> @@ -59,6 +59,7 @@ struct ubus_msg_buf *ubus_msg_new(void *data, int len, bool shared)
>                 return NULL;
>
>         ub->fd = -1;
> +       INIT_LIST_HEAD(&ub->list);
>
>         if (shared) {
>                 ub->refcount = ~0;
> @@ -138,11 +139,9 @@ static int ubus_msg_writev(int fd, struct ubus_msg_buf *ub, int offset)
>
>  static void ubus_msg_enqueue(struct ubus_client *cl, struct ubus_msg_buf *ub)
>  {
> -       if (cl->tx_queue[cl->txq_tail])
> -               return;
> -
> -       cl->tx_queue[cl->txq_tail] = ubus_msg_ref(ub);
> -       cl->txq_tail = (cl->txq_tail + 1) % ARRAY_SIZE(cl->tx_queue);
> +       ub = ubus_msg_ref(ub);
> +       if (ub)
> +               list_add_tail(&ub->list, &cl->tx_queue);
>  }
>
>  /* takes the msgbuf reference */
> @@ -153,7 +152,7 @@ void ubus_msg_send(struct ubus_client *cl, struct ubus_msg_buf *ub, bool free)
>         if (ub->hdr.type != UBUS_MSG_MONITOR)
>                 ubusd_monitor_message(cl, ub, true);
>
> -       if (!cl->tx_queue[cl->txq_cur]) {
> +       if (list_empty(&cl->tx_queue)) {
>                 written = ubus_msg_writev(cl->sock.fd, ub, 0);
>
>                 if (written < 0)
> @@ -176,7 +175,9 @@ out:
>
>  static struct ubus_msg_buf *ubus_msg_head(struct ubus_client *cl)
>  {
> -       return cl->tx_queue[cl->txq_cur];
> +       if (list_empty(&cl->tx_queue))
> +               return NULL;
> +       return list_first_entry(&cl->tx_queue, struct ubus_msg_buf, list);
>  }
>
>  static void ubus_msg_dequeue(struct ubus_client *cl)
> @@ -186,10 +187,9 @@ static void ubus_msg_dequeue(struct ubus_client *cl)
>         if (!ub)
>                 return;
>
> +       list_del(&ub->list);
>         ubus_msg_free(ub);
>         cl->txq_ofs = 0;
> -       cl->tx_queue[cl->txq_cur] = NULL;
> -       cl->txq_cur = (cl->txq_cur + 1) % ARRAY_SIZE(cl->tx_queue);
>  }
>
>  static void handle_client_disconnect(struct ubus_client *cl)
> diff --git a/ubusd.h b/ubusd.h
> index 5031ed4..6748c65 100644
> --- a/ubusd.h
> +++ b/ubusd.h
> @@ -23,12 +23,12 @@
>  #include "ubusmsg.h"
>  #include "ubusd_acl.h"
>
> -#define UBUSD_CLIENT_BACKLOG   32
>  #define UBUS_OBJ_HASH_BITS     4
>
>  extern struct blob_buf b;
>
>  struct ubus_msg_buf {
> +       struct list_head list;
>         uint32_t refcount; /* ~0: uses external data buffer */
>         struct ubus_msghdr hdr;
>         struct blob_attr *data;
> @@ -47,8 +47,8 @@ struct ubus_client {
>
>         struct list_head objects;
>
> -       struct ubus_msg_buf *tx_queue[UBUSD_CLIENT_BACKLOG];
> -       unsigned int txq_cur, txq_tail, txq_ofs;
> +       unsigned int txq_ofs;
> +       struct list_head tx_queue;
>
>         struct ubus_msg_buf *pending_msg;
>         int pending_msg_offset;
> diff --git a/ubusd_proto.c b/ubusd_proto.c
> index 72da7a7..631047b 100644
> --- a/ubusd_proto.c
> +++ b/ubusd_proto.c
> @@ -480,6 +480,7 @@ struct ubus_client *ubusd_proto_new_client(int fd, uloop_fd_handler cb)
>                 goto free;
>
>         INIT_LIST_HEAD(&cl->objects);
> +       INIT_LIST_HEAD(&cl->tx_queue);
>         cl->sock.fd = fd;
>         cl->sock.cb = cb;
>         cl->pending_msg_fd = -1;
> --
> 2.7.4
>
Felix Fietkau June 7, 2017, 1:44 p.m. UTC | #2
On 2017-06-07 13:09, Alexandru Ardelean wrote:
> It's not very often that the tx_queue is used,
> to store backlog messages to send to a client.
> 
> And for most cases, 32 backlog messages seems to be enough.
> In fact, for most cases, I've seen ~1 entry in the queue
> being used every now-n-then.
> 
> The issue is more visible/present with the `ubus list` command.
> I've traced the code to ubusd_handle_lookup() in:
> 
> ```
>     if (!attr[UBUS_ATTR_OBJPATH]) {
>         avl_for_each_element(&path, obj, path)
>             ubusd_send_obj(cl, ub, obj);
>         return 0;
>     }
> ```
> The code-path eventually leads to `ubus_msg_send()`.
> It seems that once the first element is queued, then
> the condition check for `(!cl->tx_queue[cl->txq_cur])`
> will queue all messages iterated in the above snippet,
> without trying any writes.
> 
> This can be solved, either by making the queue dynamic
> and allow it to expand above the current fixed limit (1).
> 
> Or, by forcing/allowing writes during the tx_queue-ing (2).
> 
> This patch implements (1).
> 
> Signed-off-by: Alexandru Ardelean <ardeleanalex@gmail.com>
If I remember correctly, I chose the backlog array because a message can
be referenced multiple times (e.g. in the notify/subscribe case).
This would break in your linked-list implementation if a message needs
to be queued for multiple clients.

- Felix
Felix Fietkau June 7, 2017, 1:48 p.m. UTC | #3
On 2017-06-07 15:44, Felix Fietkau wrote:
> On 2017-06-07 13:09, Alexandru Ardelean wrote:
>> It's not very often that the tx_queue is used,
>> to store backlog messages to send to a client.
>> 
>> And for most cases, 32 backlog messages seems to be enough.
>> In fact, for most cases, I've seen ~1 entry in the queue
>> being used every now-n-then.
>> 
>> The issue is more visible/present with the `ubus list` command.
>> I've traced the code to ubusd_handle_lookup() in:
>> 
>> ```
>>     if (!attr[UBUS_ATTR_OBJPATH]) {
>>         avl_for_each_element(&path, obj, path)
>>             ubusd_send_obj(cl, ub, obj);
>>         return 0;
>>     }
>> ```
>> The code-path eventually leads to `ubus_msg_send()`.
>> It seems that once the first element is queued, then
>> the condition check for `(!cl->tx_queue[cl->txq_cur])`
>> will queue all messages iterated in the above snippet,
>> without trying any writes.
>> 
>> This can be solved, either by making the queue dynamic
>> and allow it to expand above the current fixed limit (1).
>> 
>> Or, by forcing/allowing writes during the tx_queue-ing (2).
>> 
>> This patch implements (1).
>> 
>> Signed-off-by: Alexandru Ardelean <ardeleanalex@gmail.com>
> If I remember correctly, I chose the backlog array because a message can
> be referenced multiple times (e.g. in the notify/subscribe case).
> This would break in your linked-list implementation if a message needs
> to be queued for multiple clients.
I just re-checked and it seems that multiple references are no longer
used, so this implementation would probably work. I will take a closer look.

- Felix
Alexandru Ardelean June 7, 2017, 2:02 p.m. UTC | #4
On Wed, Jun 7, 2017 at 4:48 PM, Felix Fietkau <nbd@nbd.name> wrote:
> On 2017-06-07 15:44, Felix Fietkau wrote:
>> On 2017-06-07 13:09, Alexandru Ardelean wrote:
>>> It's not very often that the tx_queue is used,
>>> to store backlog messages to send to a client.
>>>
>>> And for most cases, 32 backlog messages seems to be enough.
>>> In fact, for most cases, I've seen ~1 entry in the queue
>>> being used every now-n-then.
>>>
>>> The issue is more visible/present with the `ubus list` command.
>>> I've traced the code to ubusd_handle_lookup() in:
>>>
>>> ```
>>>     if (!attr[UBUS_ATTR_OBJPATH]) {
>>>         avl_for_each_element(&path, obj, path)
>>>             ubusd_send_obj(cl, ub, obj);
>>>         return 0;
>>>     }
>>> ```
>>> The code-path eventually leads to `ubus_msg_send()`.
>>> It seems that once the first element is queued, then
>>> the condition check for `(!cl->tx_queue[cl->txq_cur])`
>>> will queue all messages iterated in the above snippet,
>>> without trying any writes.
>>>
>>> This can be solved, either by making the queue dynamic
>>> and allow it to expand above the current fixed limit (1).
>>>
>>> Or, by forcing/allowing writes during the tx_queue-ing (2).
>>>
>>> This patch implements (1).
>>>
>>> Signed-off-by: Alexandru Ardelean <ardeleanalex@gmail.com>
>> If I remember correctly, I chose the backlog array because a message can
>> be referenced multiple times (e.g. in the notify/subscribe case).
>> This would break in your linked-list implementation if a message needs
>> to be queued for multiple clients.
> I just re-checked and it seems that multiple references are no longer
> used, so this implementation would probably work. I will take a closer look.

I also need to take a closer look.
The ubus_msg_ref() incrememts the refcount for non-shared buffers.
So, there may be a loose end I may have missed somewhere.

To be honest, I'm getting lost a bit within the code sometimes, as it
seems to have been one single file, and gradually split.
And I keep having to fight some wrong assumptions.

Regarding this dynamic queue.
One idea I was having is to make the array expand [via realloc] up to
a limit [let's say 64k] if needed.
I doubt anyone would use 64k netifd interfaces in a near future.
~200 sounds almost around the corner of getting there.
Would this be an alternative ?

Another proposal I would have, is to re-organize the code of ubus
[ubusd mostly] in a series of gradual commits.
The aim would be to reduce shared stuff between files, to make the
code easier to follow.
[ The big thing I am looking at, is the shared `struct blob_buf b;` in
ubusd_proto.c ; that one really makes stuff hard to follow ]
That would maybe make ubusd take up a bit more memory [ don't think
it's too much ], because some more stuff would be alloc-ed per
client/buffer.
I would be glad to do it, if that's fine with you.

Alex

>
> - Felix
Alexandru Ardelean June 12, 2017, 9:17 a.m. UTC | #5
On Wed, Jun 7, 2017 at 5:02 PM, Alexandru Ardelean
<ardeleanalex@gmail.com> wrote:
> On Wed, Jun 7, 2017 at 4:48 PM, Felix Fietkau <nbd@nbd.name> wrote:
>> On 2017-06-07 15:44, Felix Fietkau wrote:
>>> On 2017-06-07 13:09, Alexandru Ardelean wrote:
>>>> It's not very often that the tx_queue is used,
>>>> to store backlog messages to send to a client.
>>>>
>>>> And for most cases, 32 backlog messages seems to be enough.
>>>> In fact, for most cases, I've seen ~1 entry in the queue
>>>> being used every now-n-then.
>>>>
>>>> The issue is more visible/present with the `ubus list` command.
>>>> I've traced the code to ubusd_handle_lookup() in:
>>>>
>>>> ```
>>>>     if (!attr[UBUS_ATTR_OBJPATH]) {
>>>>         avl_for_each_element(&path, obj, path)
>>>>             ubusd_send_obj(cl, ub, obj);
>>>>         return 0;
>>>>     }
>>>> ```
>>>> The code-path eventually leads to `ubus_msg_send()`.
>>>> It seems that once the first element is queued, then
>>>> the condition check for `(!cl->tx_queue[cl->txq_cur])`
>>>> will queue all messages iterated in the above snippet,
>>>> without trying any writes.
>>>>
>>>> This can be solved, either by making the queue dynamic
>>>> and allow it to expand above the current fixed limit (1).
>>>>
>>>> Or, by forcing/allowing writes during the tx_queue-ing (2).
>>>>
>>>> This patch implements (1).
>>>>
>>>> Signed-off-by: Alexandru Ardelean <ardeleanalex@gmail.com>
>>> If I remember correctly, I chose the backlog array because a message can
>>> be referenced multiple times (e.g. in the notify/subscribe case).
>>> This would break in your linked-list implementation if a message needs
>>> to be queued for multiple clients.
>> I just re-checked and it seems that multiple references are no longer
>> used, so this implementation would probably work. I will take a closer look.
>
> I also need to take a closer look.
> The ubus_msg_ref() incrememts the refcount for non-shared buffers.
> So, there may be a loose end I may have missed somewhere.
>
> To be honest, I'm getting lost a bit within the code sometimes, as it
> seems to have been one single file, and gradually split.
> And I keep having to fight some wrong assumptions.
>
> Regarding this dynamic queue.
> One idea I was having is to make the array expand [via realloc] up to
> a limit [let's say 64k] if needed.
> I doubt anyone would use 64k netifd interfaces in a near future.
> ~200 sounds almost around the corner of getting there.
> Would this be an alternative ?
>
> Another proposal I would have, is to re-organize the code of ubus
> [ubusd mostly] in a series of gradual commits.
> The aim would be to reduce shared stuff between files, to make the
> code easier to follow.
> [ The big thing I am looking at, is the shared `struct blob_buf b;` in
> ubusd_proto.c ; that one really makes stuff hard to follow ]
> That would maybe make ubusd take up a bit more memory [ don't think
> it's too much ], because some more stuff would be alloc-ed per
> client/buffer.
> I would be glad to do it, if that's fine with you.

So, let's leave this for later.
I'll start on the re-organization.
I'd prefer we not waste energy on this, since this change is still a
bit vague, and not very obvious.

Thanks
Alex

>
> Alex
>
>>
>> - Felix
diff mbox

Patch

diff --git a/ubusd.c b/ubusd.c
index f060b38..8fdb85b 100644
--- a/ubusd.c
+++ b/ubusd.c
@@ -59,6 +59,7 @@  struct ubus_msg_buf *ubus_msg_new(void *data, int len, bool shared)
 		return NULL;
 
 	ub->fd = -1;
+	INIT_LIST_HEAD(&ub->list);
 
 	if (shared) {
 		ub->refcount = ~0;
@@ -138,11 +139,9 @@  static int ubus_msg_writev(int fd, struct ubus_msg_buf *ub, int offset)
 
 static void ubus_msg_enqueue(struct ubus_client *cl, struct ubus_msg_buf *ub)
 {
-	if (cl->tx_queue[cl->txq_tail])
-		return;
-
-	cl->tx_queue[cl->txq_tail] = ubus_msg_ref(ub);
-	cl->txq_tail = (cl->txq_tail + 1) % ARRAY_SIZE(cl->tx_queue);
+	ub = ubus_msg_ref(ub);
+	if (ub)
+		list_add_tail(&ub->list, &cl->tx_queue);
 }
 
 /* takes the msgbuf reference */
@@ -153,7 +152,7 @@  void ubus_msg_send(struct ubus_client *cl, struct ubus_msg_buf *ub, bool free)
 	if (ub->hdr.type != UBUS_MSG_MONITOR)
 		ubusd_monitor_message(cl, ub, true);
 
-	if (!cl->tx_queue[cl->txq_cur]) {
+	if (list_empty(&cl->tx_queue)) {
 		written = ubus_msg_writev(cl->sock.fd, ub, 0);
 
 		if (written < 0)
@@ -176,7 +175,9 @@  out:
 
 static struct ubus_msg_buf *ubus_msg_head(struct ubus_client *cl)
 {
-	return cl->tx_queue[cl->txq_cur];
+	if (list_empty(&cl->tx_queue))
+		return NULL;
+	return list_first_entry(&cl->tx_queue, struct ubus_msg_buf, list);
 }
 
 static void ubus_msg_dequeue(struct ubus_client *cl)
@@ -186,10 +187,9 @@  static void ubus_msg_dequeue(struct ubus_client *cl)
 	if (!ub)
 		return;
 
+	list_del(&ub->list);
 	ubus_msg_free(ub);
 	cl->txq_ofs = 0;
-	cl->tx_queue[cl->txq_cur] = NULL;
-	cl->txq_cur = (cl->txq_cur + 1) % ARRAY_SIZE(cl->tx_queue);
 }
 
 static void handle_client_disconnect(struct ubus_client *cl)
diff --git a/ubusd.h b/ubusd.h
index 5031ed4..6748c65 100644
--- a/ubusd.h
+++ b/ubusd.h
@@ -23,12 +23,12 @@ 
 #include "ubusmsg.h"
 #include "ubusd_acl.h"
 
-#define UBUSD_CLIENT_BACKLOG	32
 #define UBUS_OBJ_HASH_BITS	4
 
 extern struct blob_buf b;
 
 struct ubus_msg_buf {
+	struct list_head list;
 	uint32_t refcount; /* ~0: uses external data buffer */
 	struct ubus_msghdr hdr;
 	struct blob_attr *data;
@@ -47,8 +47,8 @@  struct ubus_client {
 
 	struct list_head objects;
 
-	struct ubus_msg_buf *tx_queue[UBUSD_CLIENT_BACKLOG];
-	unsigned int txq_cur, txq_tail, txq_ofs;
+	unsigned int txq_ofs;
+	struct list_head tx_queue;
 
 	struct ubus_msg_buf *pending_msg;
 	int pending_msg_offset;
diff --git a/ubusd_proto.c b/ubusd_proto.c
index 72da7a7..631047b 100644
--- a/ubusd_proto.c
+++ b/ubusd_proto.c
@@ -480,6 +480,7 @@  struct ubus_client *ubusd_proto_new_client(int fd, uloop_fd_handler cb)
 		goto free;
 
 	INIT_LIST_HEAD(&cl->objects);
+	INIT_LIST_HEAD(&cl->tx_queue);
 	cl->sock.fd = fd;
 	cl->sock.cb = cb;
 	cl->pending_msg_fd = -1;