diff mbox series

[RFC,4/8] virtio: Implement in-order handling for virtio devices

Message ID 20240321155717.1392787-5-jonah.palmer@oracle.com
State New
Headers show
Series virtio,vhost: Add VIRTIO_F_IN_ORDER support | expand

Commit Message

Jonah Palmer March 21, 2024, 3:57 p.m. UTC
Implements in-order handling for most virtio devices using the
VIRTIO_F_IN_ORDER transport feature, specifically those who call
virtqueue_push to push their used elements onto the used ring.

The logic behind this implementation is as follows:

1.) virtqueue_pop always enqueues VirtQueueElements in-order.

virtqueue_pop always retrieves one or more buffer descriptors in-order
from the available ring and converts them into a VirtQueueElement. This
means that the order in which VirtQueueElements are enqueued are
in-order by default.

By virtue, as VirtQueueElements are created, we can assign a sequential
key value to them. This preserves the order of buffers that have been
made available to the device by the driver.

As VirtQueueElements are assigned a key value, the current sequence
number is incremented.

2.) Requests can be completed out-of-order.

While most devices complete requests in the same order that they were
enqueued by default, some devices don't (e.g. virtio-blk). The goal of
this out-of-order handling is to reduce the impact of devices that
process elements in-order by default while also guaranteeing compliance
with the VIRTIO_F_IN_ORDER feature.

Below is the logic behind handling completed requests (which may or may
not be in-order).

3.) Does the incoming used VirtQueueElement preserve the correct order?

In other words, is the sequence number (key) assigned to the
VirtQueueElement the expected number that would preserve the original
order?

3a.)
If it does... immediately push the used element onto the used ring.
Then increment the next expected sequence number and check to see if
any previous out-of-order VirtQueueElements stored on the hash table
has a key that matches this next expected sequence number.

For each VirtQueueElement found on the hash table with a matching key:
push the element on the used ring, remove the key-value pair from the
hash table, and then increment the next expected sequence number. Repeat
this process until we're unable to find an element with a matching key.

Note that if the device uses batching (e.g. virtio-net), then we skip
the virtqueue_flush call and let the device call it themselves.

3b.)
If it does not... stash the VirtQueueElement, along with relevant data,
as a InOrderVQElement on the hash table. The key used is the order_key
that was assigned when the VirtQueueElement was created.

Signed-off-by: Jonah Palmer <jonah.palmer@oracle.com>
---
 hw/virtio/virtio.c         | 70 ++++++++++++++++++++++++++++++++++++--
 include/hw/virtio/virtio.h |  8 +++++
 2 files changed, 76 insertions(+), 2 deletions(-)

Comments

Eugenio Perez Martin March 22, 2024, 10:46 a.m. UTC | #1
On Thu, Mar 21, 2024 at 4:57 PM Jonah Palmer <jonah.palmer@oracle.com> wrote:
>
> Implements in-order handling for most virtio devices using the
> VIRTIO_F_IN_ORDER transport feature, specifically those who call
> virtqueue_push to push their used elements onto the used ring.
>
> The logic behind this implementation is as follows:
>
> 1.) virtqueue_pop always enqueues VirtQueueElements in-order.
>
> virtqueue_pop always retrieves one or more buffer descriptors in-order
> from the available ring and converts them into a VirtQueueElement. This
> means that the order in which VirtQueueElements are enqueued are
> in-order by default.
>
> By virtue, as VirtQueueElements are created, we can assign a sequential
> key value to them. This preserves the order of buffers that have been
> made available to the device by the driver.
>
> As VirtQueueElements are assigned a key value, the current sequence
> number is incremented.
>
> 2.) Requests can be completed out-of-order.
>
> While most devices complete requests in the same order that they were
> enqueued by default, some devices don't (e.g. virtio-blk). The goal of
> this out-of-order handling is to reduce the impact of devices that
> process elements in-order by default while also guaranteeing compliance
> with the VIRTIO_F_IN_ORDER feature.
>
> Below is the logic behind handling completed requests (which may or may
> not be in-order).
>
> 3.) Does the incoming used VirtQueueElement preserve the correct order?
>
> In other words, is the sequence number (key) assigned to the
> VirtQueueElement the expected number that would preserve the original
> order?
>
> 3a.)
> If it does... immediately push the used element onto the used ring.
> Then increment the next expected sequence number and check to see if
> any previous out-of-order VirtQueueElements stored on the hash table
> has a key that matches this next expected sequence number.
>
> For each VirtQueueElement found on the hash table with a matching key:
> push the element on the used ring, remove the key-value pair from the
> hash table, and then increment the next expected sequence number. Repeat
> this process until we're unable to find an element with a matching key.
>
> Note that if the device uses batching (e.g. virtio-net), then we skip
> the virtqueue_flush call and let the device call it themselves.
>
> 3b.)
> If it does not... stash the VirtQueueElement, along with relevant data,
> as a InOrderVQElement on the hash table. The key used is the order_key
> that was assigned when the VirtQueueElement was created.
>
> Signed-off-by: Jonah Palmer <jonah.palmer@oracle.com>
> ---
>  hw/virtio/virtio.c         | 70 ++++++++++++++++++++++++++++++++++++--
>  include/hw/virtio/virtio.h |  8 +++++
>  2 files changed, 76 insertions(+), 2 deletions(-)
>
> diff --git a/hw/virtio/virtio.c b/hw/virtio/virtio.c
> index 40124545d6..40e4377f1e 100644
> --- a/hw/virtio/virtio.c
> +++ b/hw/virtio/virtio.c
> @@ -992,12 +992,56 @@ void virtqueue_flush(VirtQueue *vq, unsigned int count)
>      }
>  }
>
> +void virtqueue_order_element(VirtQueue *vq, const VirtQueueElement *elem,
> +                             unsigned int len, unsigned int idx,
> +                             unsigned int count)
> +{
> +    InOrderVQElement *in_order_elem;
> +
> +    if (elem->order_key == vq->current_order_idx) {
> +        /* Element is in-order, push to used ring */
> +        virtqueue_fill(vq, elem, len, idx);
> +
> +        /* Batching? Don't flush */
> +        if (count) {
> +            virtqueue_flush(vq, count);

The "count" parameter is the number of heads used, but here you're
only using one head (elem). Same with the other virtqueue_flush in the
function.

Also, this function sometimes replaces virtqueue_fill and other
replaces virtqueue_fill + virtqueue_flush (both examples in patch
6/8). I have the impression the series would be simpler if
virtqueue_order_element is a static function just handling the
virtio_vdev_has_feature(vq->vdev, VIRTIO_F_IN_ORDER) path of
virtqueue_fill, so the caller does not need to know if the in_order
feature is on or off.

> +        }
> +
> +        /* Increment next expected order, search for more in-order elements */
> +        while ((in_order_elem = g_hash_table_lookup(vq->in_order_ht,
> +                        GUINT_TO_POINTER(++vq->current_order_idx))) != NULL) {
> +            /* Found in-order element, push to used ring */
> +            virtqueue_fill(vq, in_order_elem->elem, in_order_elem->len,
> +                           in_order_elem->idx);
> +
> +            /* Batching? Don't flush */
> +            if (count) {
> +                virtqueue_flush(vq, in_order_elem->count);
> +            }
> +
> +            /* Remove key-value pair from hash table */
> +            g_hash_table_remove(vq->in_order_ht,
> +                                GUINT_TO_POINTER(vq->current_order_idx));
> +        }
> +    } else {
> +        /* Element is out-of-order, stash in hash table */
> +        in_order_elem = virtqueue_alloc_in_order_element(elem, len, idx,
> +                                                         count);
> +        g_hash_table_insert(vq->in_order_ht, GUINT_TO_POINTER(elem->order_key),
> +                            in_order_elem);
> +    }
> +}
> +
>  void virtqueue_push(VirtQueue *vq, const VirtQueueElement *elem,
>                      unsigned int len)
>  {
>      RCU_READ_LOCK_GUARD();
> -    virtqueue_fill(vq, elem, len, 0);
> -    virtqueue_flush(vq, 1);
> +    if (virtio_vdev_has_feature(vq->vdev, VIRTIO_F_IN_ORDER)) {
> +        virtqueue_order_element(vq, elem, len, 0, 1);
> +    } else {
> +        virtqueue_fill(vq, elem, len, 0);
> +        virtqueue_flush(vq, 1);
> +    }
>  }
>
>  /* Called within rcu_read_lock().  */
> @@ -1478,6 +1522,18 @@ void virtqueue_map(VirtIODevice *vdev, VirtQueueElement *elem)
>                                                                          false);
>  }
>
> +void *virtqueue_alloc_in_order_element(const VirtQueueElement *elem,
> +                                       unsigned int len, unsigned int idx,
> +                                       unsigned int count)
> +{
> +    InOrderVQElement *in_order_elem = g_malloc(sizeof(InOrderVQElement));
> +    in_order_elem->elem = elem;
> +    in_order_elem->len = len;
> +    in_order_elem->idx = idx;
> +    in_order_elem->count = count;
> +    return in_order_elem;
> +}
> +
>  static void *virtqueue_alloc_element(size_t sz, unsigned out_num, unsigned in_num)
>  {
>      VirtQueueElement *elem;
> @@ -1626,6 +1682,11 @@ static void *virtqueue_split_pop(VirtQueue *vq, size_t sz)
>          elem->in_sg[i] = iov[out_num + i];
>      }
>
> +    /* Assign key for in-order processing */
> +    if (virtio_vdev_has_feature(vdev, VIRTIO_F_IN_ORDER)) {
> +        elem->order_key = vq->current_order_key++;

Since you're adding this in both split_pop and packed_pop, why not add
it in virtqueue_pop?

> +    }
> +
>      vq->inuse++;
>
>      trace_virtqueue_pop(vq, elem, elem->in_num, elem->out_num);
> @@ -1762,6 +1823,11 @@ static void *virtqueue_packed_pop(VirtQueue *vq, size_t sz)
>          vq->last_avail_wrap_counter ^= 1;
>      }
>
> +    /* Assign key for in-order processing */
> +    if (virtio_vdev_has_feature(vdev, VIRTIO_F_IN_ORDER)) {
> +        elem->order_key = vq->current_order_key++;
> +    }
> +
>      vq->shadow_avail_idx = vq->last_avail_idx;
>      vq->shadow_avail_wrap_counter = vq->last_avail_wrap_counter;
>
> diff --git a/include/hw/virtio/virtio.h b/include/hw/virtio/virtio.h
> index f83d7e1fee..eeeda397a9 100644
> --- a/include/hw/virtio/virtio.h
> +++ b/include/hw/virtio/virtio.h
> @@ -275,6 +275,14 @@ void virtio_del_queue(VirtIODevice *vdev, int n);
>
>  void virtio_delete_queue(VirtQueue *vq);
>
> +void *virtqueue_alloc_in_order_element(const VirtQueueElement *elem,
> +                                       unsigned int len, unsigned int idx,
> +                                       unsigned int count);
> +
> +void virtqueue_order_element(VirtQueue *vq, const VirtQueueElement *elem,
> +                             unsigned int len, unsigned int idx,
> +                             unsigned int count);
> +
>  void virtqueue_push(VirtQueue *vq, const VirtQueueElement *elem,
>                      unsigned int len);
>  void virtqueue_flush(VirtQueue *vq, unsigned int count);
> --
> 2.39.3
>
Jonah Palmer March 25, 2024, 5:34 p.m. UTC | #2
On 3/22/24 6:46 AM, Eugenio Perez Martin wrote:
> On Thu, Mar 21, 2024 at 4:57 PM Jonah Palmer <jonah.palmer@oracle.com> wrote:
>>
>> Implements in-order handling for most virtio devices using the
>> VIRTIO_F_IN_ORDER transport feature, specifically those who call
>> virtqueue_push to push their used elements onto the used ring.
>>
>> The logic behind this implementation is as follows:
>>
>> 1.) virtqueue_pop always enqueues VirtQueueElements in-order.
>>
>> virtqueue_pop always retrieves one or more buffer descriptors in-order
>> from the available ring and converts them into a VirtQueueElement. This
>> means that the order in which VirtQueueElements are enqueued are
>> in-order by default.
>>
>> By virtue, as VirtQueueElements are created, we can assign a sequential
>> key value to them. This preserves the order of buffers that have been
>> made available to the device by the driver.
>>
>> As VirtQueueElements are assigned a key value, the current sequence
>> number is incremented.
>>
>> 2.) Requests can be completed out-of-order.
>>
>> While most devices complete requests in the same order that they were
>> enqueued by default, some devices don't (e.g. virtio-blk). The goal of
>> this out-of-order handling is to reduce the impact of devices that
>> process elements in-order by default while also guaranteeing compliance
>> with the VIRTIO_F_IN_ORDER feature.
>>
>> Below is the logic behind handling completed requests (which may or may
>> not be in-order).
>>
>> 3.) Does the incoming used VirtQueueElement preserve the correct order?
>>
>> In other words, is the sequence number (key) assigned to the
>> VirtQueueElement the expected number that would preserve the original
>> order?
>>
>> 3a.)
>> If it does... immediately push the used element onto the used ring.
>> Then increment the next expected sequence number and check to see if
>> any previous out-of-order VirtQueueElements stored on the hash table
>> has a key that matches this next expected sequence number.
>>
>> For each VirtQueueElement found on the hash table with a matching key:
>> push the element on the used ring, remove the key-value pair from the
>> hash table, and then increment the next expected sequence number. Repeat
>> this process until we're unable to find an element with a matching key.
>>
>> Note that if the device uses batching (e.g. virtio-net), then we skip
>> the virtqueue_flush call and let the device call it themselves.
>>
>> 3b.)
>> If it does not... stash the VirtQueueElement, along with relevant data,
>> as a InOrderVQElement on the hash table. The key used is the order_key
>> that was assigned when the VirtQueueElement was created.
>>
>> Signed-off-by: Jonah Palmer <jonah.palmer@oracle.com>
>> ---
>>   hw/virtio/virtio.c         | 70 ++++++++++++++++++++++++++++++++++++--
>>   include/hw/virtio/virtio.h |  8 +++++
>>   2 files changed, 76 insertions(+), 2 deletions(-)
>>
>> diff --git a/hw/virtio/virtio.c b/hw/virtio/virtio.c
>> index 40124545d6..40e4377f1e 100644
>> --- a/hw/virtio/virtio.c
>> +++ b/hw/virtio/virtio.c
>> @@ -992,12 +992,56 @@ void virtqueue_flush(VirtQueue *vq, unsigned int count)
>>       }
>>   }
>>
>> +void virtqueue_order_element(VirtQueue *vq, const VirtQueueElement *elem,
>> +                             unsigned int len, unsigned int idx,
>> +                             unsigned int count)
>> +{
>> +    InOrderVQElement *in_order_elem;
>> +
>> +    if (elem->order_key == vq->current_order_idx) {
>> +        /* Element is in-order, push to used ring */
>> +        virtqueue_fill(vq, elem, len, idx);
>> +
>> +        /* Batching? Don't flush */
>> +        if (count) {
>> +            virtqueue_flush(vq, count);
> 
> The "count" parameter is the number of heads used, but here you're
> only using one head (elem). Same with the other virtqueue_flush in the
> function.
> 

True. This acts more as a flag than an actual count since, unless we're 
batching (which in the current setup, the device would explicitly call 
virtqueue_flush separately), this value will be either 0 or 1.

> Also, this function sometimes replaces virtqueue_fill and other
> replaces virtqueue_fill + virtqueue_flush (both examples in patch
> 6/8). I have the impression the series would be simpler if
> virtqueue_order_element is a static function just handling the
> virtio_vdev_has_feature(vq->vdev, VIRTIO_F_IN_ORDER) path of
> virtqueue_fill, so the caller does not need to know if the in_order
> feature is on or off.
> 

Originally I wanted this function to replace virtqueue_fill + 
virtqueue_flush but after looking at virtio_net_receive_rcu and 
vhost_svq_flush, where multiple virtqueue_fill's can be called before a 
single virtqueue_flush, I added this 'if (count)' conditional to handle 
both cases.

I did consider virtqueue_order_element just handling the virtqueue_fill 
path but then I wasn't sure how to handle calling virtqueue_flush when 
retrieving out-of-order data from the hash table.

For example, devices that call virtqueue_push would call virtqueue_fill 
and then virtqueue_flush afterwards. In the scenario where, say, elem1 
was found out of order and put into the hash table, and then elem0 comes 
along. For elem0 we'd call virtqueue_fill and then we should call 
virtqueue_flush to keep the order going. Then we'd find elem1 and do the 
same. I have trouble seeing how we could properly call virtqueue_flush 
after finding out-of-order elements (that are now ready to be placed on 
the used ring in-order) in the hash table.

>> +        }
>> +
>> +        /* Increment next expected order, search for more in-order elements */
>> +        while ((in_order_elem = g_hash_table_lookup(vq->in_order_ht,
>> +                        GUINT_TO_POINTER(++vq->current_order_idx))) != NULL) {
>> +            /* Found in-order element, push to used ring */
>> +            virtqueue_fill(vq, in_order_elem->elem, in_order_elem->len,
>> +                           in_order_elem->idx);
>> +
>> +            /* Batching? Don't flush */
>> +            if (count) {
>> +                virtqueue_flush(vq, in_order_elem->count);
>> +            }
>> +
>> +            /* Remove key-value pair from hash table */
>> +            g_hash_table_remove(vq->in_order_ht,
>> +                                GUINT_TO_POINTER(vq->current_order_idx));
>> +        }
>> +    } else {
>> +        /* Element is out-of-order, stash in hash table */
>> +        in_order_elem = virtqueue_alloc_in_order_element(elem, len, idx,
>> +                                                         count);
>> +        g_hash_table_insert(vq->in_order_ht, GUINT_TO_POINTER(elem->order_key),
>> +                            in_order_elem);
>> +    }
>> +}
>> +
>>   void virtqueue_push(VirtQueue *vq, const VirtQueueElement *elem,
>>                       unsigned int len)
>>   {
>>       RCU_READ_LOCK_GUARD();
>> -    virtqueue_fill(vq, elem, len, 0);
>> -    virtqueue_flush(vq, 1);
>> +    if (virtio_vdev_has_feature(vq->vdev, VIRTIO_F_IN_ORDER)) {
>> +        virtqueue_order_element(vq, elem, len, 0, 1);
>> +    } else {
>> +        virtqueue_fill(vq, elem, len, 0);
>> +        virtqueue_flush(vq, 1);
>> +    }
>>   }
>>
>>   /* Called within rcu_read_lock().  */
>> @@ -1478,6 +1522,18 @@ void virtqueue_map(VirtIODevice *vdev, VirtQueueElement *elem)
>>                                                                           false);
>>   }
>>
>> +void *virtqueue_alloc_in_order_element(const VirtQueueElement *elem,
>> +                                       unsigned int len, unsigned int idx,
>> +                                       unsigned int count)
>> +{
>> +    InOrderVQElement *in_order_elem = g_malloc(sizeof(InOrderVQElement));
>> +    in_order_elem->elem = elem;
>> +    in_order_elem->len = len;
>> +    in_order_elem->idx = idx;
>> +    in_order_elem->count = count;
>> +    return in_order_elem;
>> +}
>> +
>>   static void *virtqueue_alloc_element(size_t sz, unsigned out_num, unsigned in_num)
>>   {
>>       VirtQueueElement *elem;
>> @@ -1626,6 +1682,11 @@ static void *virtqueue_split_pop(VirtQueue *vq, size_t sz)
>>           elem->in_sg[i] = iov[out_num + i];
>>       }
>>
>> +    /* Assign key for in-order processing */
>> +    if (virtio_vdev_has_feature(vdev, VIRTIO_F_IN_ORDER)) {
>> +        elem->order_key = vq->current_order_key++;
> 
> Since you're adding this in both split_pop and packed_pop, why not add
> it in virtqueue_pop?
> 

I wanted to add this order_key to the VirtQueueElement after it was 
created. I suppose I could do this directly in virtqueue_alloc_element 
but I'd have to add another parameter to it, which might be unnecessary 
given it'd only be applicable for this specific in_order feature.

I also suppose I could just capture the VirtQueueElement being returned 
from virtqueue_packed_pop/virtqueue_split_pop and make the assignment 
there, but it felt out of place to do it in virtqueue_pop.

>> +    }
>> +
>>       vq->inuse++;
>>
>>       trace_virtqueue_pop(vq, elem, elem->in_num, elem->out_num);
>> @@ -1762,6 +1823,11 @@ static void *virtqueue_packed_pop(VirtQueue *vq, size_t sz)
>>           vq->last_avail_wrap_counter ^= 1;
>>       }
>>
>> +    /* Assign key for in-order processing */
>> +    if (virtio_vdev_has_feature(vdev, VIRTIO_F_IN_ORDER)) {
>> +        elem->order_key = vq->current_order_key++;
>> +    }
>> +
>>       vq->shadow_avail_idx = vq->last_avail_idx;
>>       vq->shadow_avail_wrap_counter = vq->last_avail_wrap_counter;
>>
>> diff --git a/include/hw/virtio/virtio.h b/include/hw/virtio/virtio.h
>> index f83d7e1fee..eeeda397a9 100644
>> --- a/include/hw/virtio/virtio.h
>> +++ b/include/hw/virtio/virtio.h
>> @@ -275,6 +275,14 @@ void virtio_del_queue(VirtIODevice *vdev, int n);
>>
>>   void virtio_delete_queue(VirtQueue *vq);
>>
>> +void *virtqueue_alloc_in_order_element(const VirtQueueElement *elem,
>> +                                       unsigned int len, unsigned int idx,
>> +                                       unsigned int count);
>> +
>> +void virtqueue_order_element(VirtQueue *vq, const VirtQueueElement *elem,
>> +                             unsigned int len, unsigned int idx,
>> +                             unsigned int count);
>> +
>>   void virtqueue_push(VirtQueue *vq, const VirtQueueElement *elem,
>>                       unsigned int len);
>>   void virtqueue_flush(VirtQueue *vq, unsigned int count);
>> --
>> 2.39.3
>>
>
Eugenio Perez Martin March 25, 2024, 7:45 p.m. UTC | #3
On Mon, Mar 25, 2024 at 6:35 PM Jonah Palmer <jonah.palmer@oracle.com> wrote:
>
>
>
> On 3/22/24 6:46 AM, Eugenio Perez Martin wrote:
> > On Thu, Mar 21, 2024 at 4:57 PM Jonah Palmer <jonah.palmer@oracle.com> wrote:
> >>
> >> Implements in-order handling for most virtio devices using the
> >> VIRTIO_F_IN_ORDER transport feature, specifically those who call
> >> virtqueue_push to push their used elements onto the used ring.
> >>
> >> The logic behind this implementation is as follows:
> >>
> >> 1.) virtqueue_pop always enqueues VirtQueueElements in-order.
> >>
> >> virtqueue_pop always retrieves one or more buffer descriptors in-order
> >> from the available ring and converts them into a VirtQueueElement. This
> >> means that the order in which VirtQueueElements are enqueued are
> >> in-order by default.
> >>
> >> By virtue, as VirtQueueElements are created, we can assign a sequential
> >> key value to them. This preserves the order of buffers that have been
> >> made available to the device by the driver.
> >>
> >> As VirtQueueElements are assigned a key value, the current sequence
> >> number is incremented.
> >>
> >> 2.) Requests can be completed out-of-order.
> >>
> >> While most devices complete requests in the same order that they were
> >> enqueued by default, some devices don't (e.g. virtio-blk). The goal of
> >> this out-of-order handling is to reduce the impact of devices that
> >> process elements in-order by default while also guaranteeing compliance
> >> with the VIRTIO_F_IN_ORDER feature.
> >>
> >> Below is the logic behind handling completed requests (which may or may
> >> not be in-order).
> >>
> >> 3.) Does the incoming used VirtQueueElement preserve the correct order?
> >>
> >> In other words, is the sequence number (key) assigned to the
> >> VirtQueueElement the expected number that would preserve the original
> >> order?
> >>
> >> 3a.)
> >> If it does... immediately push the used element onto the used ring.
> >> Then increment the next expected sequence number and check to see if
> >> any previous out-of-order VirtQueueElements stored on the hash table
> >> has a key that matches this next expected sequence number.
> >>
> >> For each VirtQueueElement found on the hash table with a matching key:
> >> push the element on the used ring, remove the key-value pair from the
> >> hash table, and then increment the next expected sequence number. Repeat
> >> this process until we're unable to find an element with a matching key.
> >>
> >> Note that if the device uses batching (e.g. virtio-net), then we skip
> >> the virtqueue_flush call and let the device call it themselves.
> >>
> >> 3b.)
> >> If it does not... stash the VirtQueueElement, along with relevant data,
> >> as a InOrderVQElement on the hash table. The key used is the order_key
> >> that was assigned when the VirtQueueElement was created.
> >>
> >> Signed-off-by: Jonah Palmer <jonah.palmer@oracle.com>
> >> ---
> >>   hw/virtio/virtio.c         | 70 ++++++++++++++++++++++++++++++++++++--
> >>   include/hw/virtio/virtio.h |  8 +++++
> >>   2 files changed, 76 insertions(+), 2 deletions(-)
> >>
> >> diff --git a/hw/virtio/virtio.c b/hw/virtio/virtio.c
> >> index 40124545d6..40e4377f1e 100644
> >> --- a/hw/virtio/virtio.c
> >> +++ b/hw/virtio/virtio.c
> >> @@ -992,12 +992,56 @@ void virtqueue_flush(VirtQueue *vq, unsigned int count)
> >>       }
> >>   }
> >>
> >> +void virtqueue_order_element(VirtQueue *vq, const VirtQueueElement *elem,
> >> +                             unsigned int len, unsigned int idx,
> >> +                             unsigned int count)
> >> +{
> >> +    InOrderVQElement *in_order_elem;
> >> +
> >> +    if (elem->order_key == vq->current_order_idx) {
> >> +        /* Element is in-order, push to used ring */
> >> +        virtqueue_fill(vq, elem, len, idx);
> >> +
> >> +        /* Batching? Don't flush */
> >> +        if (count) {
> >> +            virtqueue_flush(vq, count);
> >
> > The "count" parameter is the number of heads used, but here you're
> > only using one head (elem). Same with the other virtqueue_flush in the
> > function.
> >
>
> True. This acts more as a flag than an actual count since, unless we're
> batching (which in the current setup, the device would explicitly call
> virtqueue_flush separately), this value will be either 0 or 1.
>

If possible, I think it is better to keep consistent with the current
API: fill+flush for batching, push for a single shot.

> > Also, this function sometimes replaces virtqueue_fill and other
> > replaces virtqueue_fill + virtqueue_flush (both examples in patch
> > 6/8). I have the impression the series would be simpler if
> > virtqueue_order_element is a static function just handling the
> > virtio_vdev_has_feature(vq->vdev, VIRTIO_F_IN_ORDER) path of
> > virtqueue_fill, so the caller does not need to know if the in_order
> > feature is on or off.
> >
>
> Originally I wanted this function to replace virtqueue_fill +
> virtqueue_flush but after looking at virtio_net_receive_rcu and
> vhost_svq_flush, where multiple virtqueue_fill's can be called before a
> single virtqueue_flush, I added this 'if (count)' conditional to handle
> both cases.
>
> I did consider virtqueue_order_element just handling the virtqueue_fill
> path but then I wasn't sure how to handle calling virtqueue_flush when
> retrieving out-of-order data from the hash table.
>
> For example, devices that call virtqueue_push would call virtqueue_fill
> and then virtqueue_flush afterwards. In the scenario where, say, elem1
> was found out of order and put into the hash table, and then elem0 comes
> along. For elem0 we'd call virtqueue_fill and then we should call
> virtqueue_flush to keep the order going. Then we'd find elem1 and do the
> same. I have trouble seeing how we could properly call virtqueue_flush
> after finding out-of-order elements (that are now ready to be placed on
> the used ring in-order) in the hash table.
>

I see, that's a good point indeed. The way I thought, it is a no-op in
that case, and the later virtqueue_flush needs to check if it has
pending buffers to use.

The next question is what to do with the virtqueue_fill idx and
virtqueue_flush count parameters. More on that in the cover letter, as
the discussion goes that direction there.

> >> +        }
> >> +
> >> +        /* Increment next expected order, search for more in-order elements */
> >> +        while ((in_order_elem = g_hash_table_lookup(vq->in_order_ht,
> >> +                        GUINT_TO_POINTER(++vq->current_order_idx))) != NULL) {
> >> +            /* Found in-order element, push to used ring */
> >> +            virtqueue_fill(vq, in_order_elem->elem, in_order_elem->len,
> >> +                           in_order_elem->idx);
> >> +
> >> +            /* Batching? Don't flush */
> >> +            if (count) {
> >> +                virtqueue_flush(vq, in_order_elem->count);
> >> +            }
> >> +
> >> +            /* Remove key-value pair from hash table */
> >> +            g_hash_table_remove(vq->in_order_ht,
> >> +                                GUINT_TO_POINTER(vq->current_order_idx));
> >> +        }
> >> +    } else {
> >> +        /* Element is out-of-order, stash in hash table */
> >> +        in_order_elem = virtqueue_alloc_in_order_element(elem, len, idx,
> >> +                                                         count);
> >> +        g_hash_table_insert(vq->in_order_ht, GUINT_TO_POINTER(elem->order_key),
> >> +                            in_order_elem);
> >> +    }
> >> +}
> >> +
> >>   void virtqueue_push(VirtQueue *vq, const VirtQueueElement *elem,
> >>                       unsigned int len)
> >>   {
> >>       RCU_READ_LOCK_GUARD();
> >> -    virtqueue_fill(vq, elem, len, 0);
> >> -    virtqueue_flush(vq, 1);
> >> +    if (virtio_vdev_has_feature(vq->vdev, VIRTIO_F_IN_ORDER)) {
> >> +        virtqueue_order_element(vq, elem, len, 0, 1);
> >> +    } else {
> >> +        virtqueue_fill(vq, elem, len, 0);
> >> +        virtqueue_flush(vq, 1);
> >> +    }
> >>   }
> >>
> >>   /* Called within rcu_read_lock().  */
> >> @@ -1478,6 +1522,18 @@ void virtqueue_map(VirtIODevice *vdev, VirtQueueElement *elem)
> >>                                                                           false);
> >>   }
> >>
> >> +void *virtqueue_alloc_in_order_element(const VirtQueueElement *elem,
> >> +                                       unsigned int len, unsigned int idx,
> >> +                                       unsigned int count)
> >> +{
> >> +    InOrderVQElement *in_order_elem = g_malloc(sizeof(InOrderVQElement));
> >> +    in_order_elem->elem = elem;
> >> +    in_order_elem->len = len;
> >> +    in_order_elem->idx = idx;
> >> +    in_order_elem->count = count;
> >> +    return in_order_elem;
> >> +}
> >> +
> >>   static void *virtqueue_alloc_element(size_t sz, unsigned out_num, unsigned in_num)
> >>   {
> >>       VirtQueueElement *elem;
> >> @@ -1626,6 +1682,11 @@ static void *virtqueue_split_pop(VirtQueue *vq, size_t sz)
> >>           elem->in_sg[i] = iov[out_num + i];
> >>       }
> >>
> >> +    /* Assign key for in-order processing */
> >> +    if (virtio_vdev_has_feature(vdev, VIRTIO_F_IN_ORDER)) {
> >> +        elem->order_key = vq->current_order_key++;
> >
> > Since you're adding this in both split_pop and packed_pop, why not add
> > it in virtqueue_pop?
> >
>
> I wanted to add this order_key to the VirtQueueElement after it was
> created. I suppose I could do this directly in virtqueue_alloc_element
> but I'd have to add another parameter to it, which might be unnecessary
> given it'd only be applicable for this specific in_order feature.
>
> I also suppose I could just capture the VirtQueueElement being returned
> from virtqueue_packed_pop/virtqueue_split_pop and make the assignment
> there, but it felt out of place to do it in virtqueue_pop.
>

I see. I keep finding it simpler to do the assignment in one point
only, as it is not a specific split / packed operation. But let's see.

Thanks!

> >> +    }
> >> +
> >>       vq->inuse++;
> >>
> >>       trace_virtqueue_pop(vq, elem, elem->in_num, elem->out_num);
> >> @@ -1762,6 +1823,11 @@ static void *virtqueue_packed_pop(VirtQueue *vq, size_t sz)
> >>           vq->last_avail_wrap_counter ^= 1;
> >>       }
> >>
> >> +    /* Assign key for in-order processing */
> >> +    if (virtio_vdev_has_feature(vdev, VIRTIO_F_IN_ORDER)) {
> >> +        elem->order_key = vq->current_order_key++;
> >> +    }
> >> +
> >>       vq->shadow_avail_idx = vq->last_avail_idx;
> >>       vq->shadow_avail_wrap_counter = vq->last_avail_wrap_counter;
> >>
> >> diff --git a/include/hw/virtio/virtio.h b/include/hw/virtio/virtio.h
> >> index f83d7e1fee..eeeda397a9 100644
> >> --- a/include/hw/virtio/virtio.h
> >> +++ b/include/hw/virtio/virtio.h
> >> @@ -275,6 +275,14 @@ void virtio_del_queue(VirtIODevice *vdev, int n);
> >>
> >>   void virtio_delete_queue(VirtQueue *vq);
> >>
> >> +void *virtqueue_alloc_in_order_element(const VirtQueueElement *elem,
> >> +                                       unsigned int len, unsigned int idx,
> >> +                                       unsigned int count);
> >> +
> >> +void virtqueue_order_element(VirtQueue *vq, const VirtQueueElement *elem,
> >> +                             unsigned int len, unsigned int idx,
> >> +                             unsigned int count);
> >> +
> >>   void virtqueue_push(VirtQueue *vq, const VirtQueueElement *elem,
> >>                       unsigned int len);
> >>   void virtqueue_flush(VirtQueue *vq, unsigned int count);
> >> --
> >> 2.39.3
> >>
> >
>
diff mbox series

Patch

diff --git a/hw/virtio/virtio.c b/hw/virtio/virtio.c
index 40124545d6..40e4377f1e 100644
--- a/hw/virtio/virtio.c
+++ b/hw/virtio/virtio.c
@@ -992,12 +992,56 @@  void virtqueue_flush(VirtQueue *vq, unsigned int count)
     }
 }
 
+void virtqueue_order_element(VirtQueue *vq, const VirtQueueElement *elem,
+                             unsigned int len, unsigned int idx,
+                             unsigned int count)
+{
+    InOrderVQElement *in_order_elem;
+
+    if (elem->order_key == vq->current_order_idx) {
+        /* Element is in-order, push to used ring */
+        virtqueue_fill(vq, elem, len, idx);
+
+        /* Batching? Don't flush */
+        if (count) {
+            virtqueue_flush(vq, count);
+        }
+
+        /* Increment next expected order, search for more in-order elements */
+        while ((in_order_elem = g_hash_table_lookup(vq->in_order_ht,
+                        GUINT_TO_POINTER(++vq->current_order_idx))) != NULL) {
+            /* Found in-order element, push to used ring */
+            virtqueue_fill(vq, in_order_elem->elem, in_order_elem->len,
+                           in_order_elem->idx);
+
+            /* Batching? Don't flush */
+            if (count) {
+                virtqueue_flush(vq, in_order_elem->count);
+            }
+
+            /* Remove key-value pair from hash table */
+            g_hash_table_remove(vq->in_order_ht,
+                                GUINT_TO_POINTER(vq->current_order_idx));
+        }
+    } else {
+        /* Element is out-of-order, stash in hash table */
+        in_order_elem = virtqueue_alloc_in_order_element(elem, len, idx,
+                                                         count);
+        g_hash_table_insert(vq->in_order_ht, GUINT_TO_POINTER(elem->order_key),
+                            in_order_elem);
+    }
+}
+
 void virtqueue_push(VirtQueue *vq, const VirtQueueElement *elem,
                     unsigned int len)
 {
     RCU_READ_LOCK_GUARD();
-    virtqueue_fill(vq, elem, len, 0);
-    virtqueue_flush(vq, 1);
+    if (virtio_vdev_has_feature(vq->vdev, VIRTIO_F_IN_ORDER)) {
+        virtqueue_order_element(vq, elem, len, 0, 1);
+    } else {
+        virtqueue_fill(vq, elem, len, 0);
+        virtqueue_flush(vq, 1);
+    }
 }
 
 /* Called within rcu_read_lock().  */
@@ -1478,6 +1522,18 @@  void virtqueue_map(VirtIODevice *vdev, VirtQueueElement *elem)
                                                                         false);
 }
 
+void *virtqueue_alloc_in_order_element(const VirtQueueElement *elem,
+                                       unsigned int len, unsigned int idx,
+                                       unsigned int count)
+{
+    InOrderVQElement *in_order_elem = g_malloc(sizeof(InOrderVQElement));
+    in_order_elem->elem = elem;
+    in_order_elem->len = len;
+    in_order_elem->idx = idx;
+    in_order_elem->count = count;
+    return in_order_elem;
+}
+
 static void *virtqueue_alloc_element(size_t sz, unsigned out_num, unsigned in_num)
 {
     VirtQueueElement *elem;
@@ -1626,6 +1682,11 @@  static void *virtqueue_split_pop(VirtQueue *vq, size_t sz)
         elem->in_sg[i] = iov[out_num + i];
     }
 
+    /* Assign key for in-order processing */
+    if (virtio_vdev_has_feature(vdev, VIRTIO_F_IN_ORDER)) {
+        elem->order_key = vq->current_order_key++;
+    }
+
     vq->inuse++;
 
     trace_virtqueue_pop(vq, elem, elem->in_num, elem->out_num);
@@ -1762,6 +1823,11 @@  static void *virtqueue_packed_pop(VirtQueue *vq, size_t sz)
         vq->last_avail_wrap_counter ^= 1;
     }
 
+    /* Assign key for in-order processing */
+    if (virtio_vdev_has_feature(vdev, VIRTIO_F_IN_ORDER)) {
+        elem->order_key = vq->current_order_key++;
+    }
+
     vq->shadow_avail_idx = vq->last_avail_idx;
     vq->shadow_avail_wrap_counter = vq->last_avail_wrap_counter;
 
diff --git a/include/hw/virtio/virtio.h b/include/hw/virtio/virtio.h
index f83d7e1fee..eeeda397a9 100644
--- a/include/hw/virtio/virtio.h
+++ b/include/hw/virtio/virtio.h
@@ -275,6 +275,14 @@  void virtio_del_queue(VirtIODevice *vdev, int n);
 
 void virtio_delete_queue(VirtQueue *vq);
 
+void *virtqueue_alloc_in_order_element(const VirtQueueElement *elem,
+                                       unsigned int len, unsigned int idx,
+                                       unsigned int count);
+
+void virtqueue_order_element(VirtQueue *vq, const VirtQueueElement *elem,
+                             unsigned int len, unsigned int idx,
+                             unsigned int count);
+
 void virtqueue_push(VirtQueue *vq, const VirtQueueElement *elem,
                     unsigned int len);
 void virtqueue_flush(VirtQueue *vq, unsigned int count);