diff mbox

[ovs-dev,7/9] datapath-windows: Add support for multiple event queue in Event.c

Message ID 20160713233838.47648-8-vsairam@vmware.com
State Superseded
Delegated to: Guru Shetty
Headers show

Commit Message

Sairam Venugopal July 13, 2016, 11:38 p.m. UTC
Update Event.c to have multiple event queues and mechanism to retrieve the
associated queue. Introduce OvsPostCtEvent and OvsRemoveCtEventEntry
similar to OvsPostVportEvent and OvsRemoveVportEventEntry.

Signed-off-by: Sairam Venugopal <vsairam@vmware.com>
---
 datapath-windows/ovsext/Event.c | 265 +++++++++++++++++++++++++++++++++-------
 datapath-windows/ovsext/Event.h |  17 ++-
 2 files changed, 239 insertions(+), 43 deletions(-)

Comments

Paul Boca July 22, 2016, 10:52 a.m. UTC | #1
Acked-by: Paul Boca <pboca@cloudbasesolutions.com>


> -----Original Message-----

> From: dev [mailto:dev-bounces@openvswitch.org] On Behalf Of Sairam

> Venugopal

> Sent: Thursday, July 14, 2016 2:39 AM

> To: dev@openvswitch.org

> Subject: [ovs-dev] [PATCH 7/9] datapath-windows: Add support for multiple

> event queue in Event.c

> 

> Update Event.c to have multiple event queues and mechanism to retrieve the

> associated queue. Introduce OvsPostCtEvent and OvsRemoveCtEventEntry

> similar to OvsPostVportEvent and OvsRemoveVportEventEntry.

> 

> Signed-off-by: Sairam Venugopal <vsairam@vmware.com>

> ---

>  datapath-windows/ovsext/Event.c | 265

> +++++++++++++++++++++++++++++++++-------

>  datapath-windows/ovsext/Event.h |  17 ++-

>  2 files changed, 239 insertions(+), 43 deletions(-)

> 

> diff --git a/datapath-windows/ovsext/Event.c b/datapath-

> windows/ovsext/Event.c

> index 8ff0322..ffc32a6 100644

> --- a/datapath-windows/ovsext/Event.c

> +++ b/datapath-windows/ovsext/Event.c

> @@ -26,36 +26,62 @@

>  #define OVS_DBG_MOD OVS_DBG_EVENT

>  #include "Debug.h"

> 

> -LIST_ENTRY ovsEventQueue;

> -static NDIS_SPIN_LOCK eventQueueLock;

> -UINT32 ovsNumEventQueue;

> +LIST_ENTRY ovsEventQueueArr[OVS_MCAST_EVENT_ENTRIES_MAX];

> +static NDIS_SPIN_LOCK

> eventQueueLockArr[OVS_MCAST_EVENT_ENTRIES_MAX];

> +UINT32 ovsNumEventQueueArr[OVS_MCAST_EVENT_ENTRIES_MAX];

> 

>  NTSTATUS

>  OvsInitEventQueue()

>  {

> -    InitializeListHead(&ovsEventQueue);

> -    NdisAllocateSpinLock(&eventQueueLock);

> +    for (int i = 0; i < OVS_MCAST_EVENT_ENTRIES_MAX; i++) {

> +        InitializeListHead(&ovsEventQueueArr[i]);

> +        NdisAllocateSpinLock(&eventQueueLockArr[i]);

> +    }

>      return STATUS_SUCCESS;

>  }

> 

>  VOID

>  OvsCleanupEventQueue()

>  {

> -    ASSERT(IsListEmpty(&ovsEventQueue));

> -    ASSERT(ovsNumEventQueue == 0);

> -    NdisFreeSpinLock(&eventQueueLock);

> +    for (int i = 0; i < OVS_MCAST_EVENT_ENTRIES_MAX; i++) {

> +        ASSERT(IsListEmpty(&ovsEventQueueArr[i]));

> +        ASSERT(ovsNumEventQueueArr[i] == 0);

> +        NdisFreeSpinLock(&eventQueueLockArr[i]);

> +    }

>  }

> 

>  static __inline VOID

> -OvsAcquireEventQueueLock()

> +OvsAcquireEventQueueLock(int eventId)

>  {

> -    NdisAcquireSpinLock(&eventQueueLock);

> +    NdisAcquireSpinLock(&eventQueueLockArr[eventId]);

>  }

> 

>  static __inline VOID

> -OvsReleaseEventQueueLock()

> +OvsReleaseEventQueueLock(int eventId)

>  {

> -   NdisReleaseSpinLock(&eventQueueLock);

> +   NdisReleaseSpinLock(&eventQueueLockArr[eventId]);

> +}

> +

> +NDIS_STATUS

> +OvsGetMcastEventId(UINT32 protocol, UINT32 mcastMask, UINT32

> *eventId)

> +{

> +    switch (protocol) {

> +    case NETLINK_GENERIC:

> +        *eventId = OVS_MCAST_VPORT_EVENT;

> +        return NDIS_STATUS_SUCCESS;

> +    case NETLINK_NETFILTER:

> +        if ((mcastMask & OVS_EVENT_CT_NEW)

> +            || (mcastMask & OVS_EVENT_CT_DELETE)) {

> +            *eventId =  OVS_MCAST_CT_EVENT;

> +            return NDIS_STATUS_SUCCESS;

> +        }

> +        break;

> +    default:

> +        goto error;

> +    }

> +

> +error:

> +    return NDIS_STATUS_INVALID_PARAMETER;

>  }

> 

>  /*

> @@ -68,14 +94,17 @@ OvsCleanupEvent(POVS_OPEN_INSTANCE instance)

>  {

>      POVS_EVENT_QUEUE queue;

>      PIRP irp = NULL;

> +    UINT32 eventId;

>      queue = (POVS_EVENT_QUEUE)instance->eventQueue;

>      if (queue) {

>          POVS_EVENT_QUEUE_ELEM elem;

>          PLIST_ENTRY link, next;

> 

> -        OvsAcquireEventQueueLock();

> +        /* Handle the error */

> +        OvsGetMcastEventId(instance->protocol, instance->mcastMask,

> &eventId);

> +        OvsAcquireEventQueueLock(eventId);

>          RemoveEntryList(&queue->queueLink);

> -        ovsNumEventQueue--;

> +        ovsNumEventQueueArr[eventId]--;

>          if (queue->pendingIrp) {

>              PDRIVER_CANCEL cancelRoutine;

>              irp = queue->pendingIrp;

> @@ -86,7 +115,7 @@ OvsCleanupEvent(POVS_OPEN_INSTANCE instance)

>              }

>          }

>          instance->eventQueue = NULL;

> -        OvsReleaseEventQueueLock();

> +        OvsReleaseEventQueueLock(eventId);

>          if (irp) {

>              OvsCompleteIrpRequest(irp, 0, STATUS_SUCCESS);

>          }

> @@ -115,7 +144,7 @@ OvsPostVportEvent(POVS_VPORT_EVENT_ENTRY

> event)

>      POVS_EVENT_QUEUE queue;

>      PLIST_ENTRY link;

>      LIST_ENTRY list;

> -   PLIST_ENTRY entry;

> +    PLIST_ENTRY entry;

>      PIRP irp;

> 

>      InitializeListHead(&list);

> @@ -123,9 +152,9 @@ OvsPostVportEvent(POVS_VPORT_EVENT_ENTRY

> event)

>      OVS_LOG_TRACE("Enter: portNo: %#x, status: %#x", event->portNo,

>                    event->type);

> 

> -    OvsAcquireEventQueueLock();

> +    OvsAcquireEventQueueLock(OVS_MCAST_VPORT_EVENT);

> 

> -    LIST_FORALL(&ovsEventQueue, link) {

> +    LIST_FORALL(&ovsEventQueueArr[OVS_MCAST_VPORT_EVENT], link) {

>          queue = CONTAINING_RECORD(link, OVS_EVENT_QUEUE, queueLink);

>          if ((event->type & queue->mask) == 0) {

>              continue;

> @@ -137,7 +166,7 @@ OvsPostVportEvent(POVS_VPORT_EVENT_ENTRY

> event)

> 

>          if (elem == NULL) {

>              OVS_LOG_WARN("Fail to allocate memory for event");

> -            OvsReleaseEventQueueLock();

> +            OvsReleaseEventQueueLock(OVS_MCAST_VPORT_EVENT);

>              return;

>          }

> 

> @@ -157,7 +186,7 @@ OvsPostVportEvent(POVS_VPORT_EVENT_ENTRY

> event)

>              }

>          }

>      }

> -    OvsReleaseEventQueueLock();

> +    OvsReleaseEventQueueLock(OVS_MCAST_VPORT_EVENT);

>      while (!IsListEmpty(&list)) {

>          entry = RemoveHeadList(&list);

>          irp = CONTAINING_RECORD(entry, IRP, Tail.Overlay.ListEntry);

> @@ -189,17 +218,25 @@ OvsSubscribeEventIoctl(PFILE_OBJECT fileObject,

>      NTSTATUS status = STATUS_SUCCESS;

>      POVS_OPEN_INSTANCE instance;

>      POVS_EVENT_QUEUE queue = NULL;

> +    UINT32 eventId;

> 

>      OVS_LOG_TRACE("Enter: fileObject: %p, inputLength: %d", fileObject,

>                    inputLength);

> 

> -    if (inputLength < sizeof (OVS_EVENT_SUBSCRIBE) ||

> -        (request->mask & OVS_EVENT_MASK_ALL) == 0) {

> -        OVS_LOG_TRACE("Exit: subscribe failed with invalid request.");

> +    if (request->protocol == NETLINK_GENERIC) {

> +        if (inputLength < sizeof (OVS_EVENT_SUBSCRIBE) ||

> +            (request->mask & OVS_EVENT_MASK_ALL) == 0) {

> +            OVS_LOG_TRACE("Exit: subscribe failed with invalid request.");

> +            return STATUS_INVALID_PARAMETER;

> +        }

> +    }

> +

> +    status = OvsGetMcastEventId(request->protocol, request->mask,

> &eventId);

> +    if (status != NDIS_STATUS_SUCCESS) {

>          return STATUS_INVALID_PARAMETER;

>      }

> 

> -    OvsAcquireEventQueueLock();

> +    OvsAcquireEventQueueLock(eventId);

> 

>      instance = OvsGetOpenInstance(fileObject, request->dpNo);

> 

> @@ -214,9 +251,23 @@ OvsSubscribeEventIoctl(PFILE_OBJECT fileObject,

>       */

>      queue = (POVS_EVENT_QUEUE)instance->eventQueue;

>      if (request->subscribe && queue) {

> -        if (queue->mask != request->mask) {

> +        if (request->protocol == NETLINK_GENERIC

> +            && queue->mask != request->mask) {

>              status = STATUS_INVALID_PARAMETER;

> -            OVS_LOG_WARN("Can not chnage mask when the queue is

> subscribed");

> +            OVS_LOG_WARN("Can not change mask when the queue is

> subscribed");

> +            goto done_event_subscribe;

> +        }

> +        if (request->protocol == NETLINK_NETFILTER) {

> +            if (queue->mask == request->mask) {

> +                /* Resubscribing to subscribed event */

> +                status = STATUS_SUCCESS;

> +                goto done_event_subscribe;

> +            } else {

> +                /* Update the instance and queue mask to reflect this */

> +                queue->mask |= request->mask;

> +                instance->mcastMask |= request->mask;

> +            }

> +            status = STATUS_SUCCESS;

>              goto done_event_subscribe;

>          }

>      } else if (!request->subscribe && queue == NULL) {

> @@ -234,20 +285,28 @@ OvsSubscribeEventIoctl(PFILE_OBJECT fileObject,

>          }

>          InitializeListHead(&queue->elemList);

>          queue->mask = request->mask;

> +        queue->mcastEventId = eventId;

>          queue->pendingIrp = NULL;

>          queue->numElems = 0;

> -        InsertHeadList(&ovsEventQueue, &queue->queueLink);

> -        ovsNumEventQueue++;

> +        InsertHeadList(&ovsEventQueueArr[eventId], &queue->queueLink);

> +        ovsNumEventQueueArr[eventId]++;

>          instance->eventQueue = queue;

> +        instance->mcastMask = request->mask;

>          queue->instance = instance;

>      } else {

>          queue = (POVS_EVENT_QUEUE)instance->eventQueue;

> -        RemoveEntryList(&queue->queueLink);

> -        ovsNumEventQueue--;

> -        instance->eventQueue = NULL;

> +        queue->mask &= ~(request->mask);

> +        instance->mcastMask &= ~(request->mask);

> +        if (!queue->mask) {

> +            /* No other mcast group exists */

> +            RemoveEntryList(&queue->queueLink);

> +            ovsNumEventQueueArr[eventId]--;

> +            instance->eventQueue = NULL;

> +            instance->mcastMask = 0;

> +        }

>      }

>  done_event_subscribe:

> -    if (!request->subscribe && queue) {

> +    if (!request->subscribe && queue && !queue->mask) {

>          POVS_EVENT_QUEUE_ELEM elem;

>          PLIST_ENTRY link, next;

>          PIRP irp = NULL;

> @@ -260,7 +319,7 @@ done_event_subscribe:

>                  irp = NULL;

>              }

>          }

> -        OvsReleaseEventQueueLock();

> +        OvsReleaseEventQueueLock(eventId);

>          if (irp) {

>              OvsCompleteIrpRequest(queue->pendingIrp, 0, STATUS_SUCCESS);

>          }

> @@ -270,7 +329,7 @@ done_event_subscribe:

>          }

>          OvsFreeMemoryWithTag(queue, OVS_EVENT_POOL_TAG);

>      } else {

> -        OvsReleaseEventQueueLock();

> +        OvsReleaseEventQueueLock(eventId);

>      }

>      OVS_LOG_TRACE("Exit: subscribe event with status: %#x.", status);

>      return status;

> @@ -294,6 +353,8 @@ OvsCancelIrp(PDEVICE_OBJECT deviceObject,

>      PFILE_OBJECT fileObject;

>      POVS_EVENT_QUEUE queue;

>      POVS_OPEN_INSTANCE instance;

> +    UINT32 eventId;

> +    NDIS_STATUS status;

> 

>      UNREFERENCED_PARAMETER(deviceObject);

> 

> @@ -305,17 +366,30 @@ OvsCancelIrp(PDEVICE_OBJECT deviceObject,

>      if (fileObject == NULL) {

>          goto done;

>      }

> -    OvsAcquireEventQueueLock();

> +

>      instance = (POVS_OPEN_INSTANCE)fileObject->FsContext;

> -    if (instance == NULL || instance->eventQueue == NULL) {

> -        OvsReleaseEventQueueLock();

> +    if (instance == NULL) {

> +        goto done;

> +    }

> +

> +    status = OvsGetMcastEventId(instance->protocol,

> +                                instance->mcastMask,

> +                                &eventId);

> +    if (status != NDIS_STATUS_SUCCESS) {

> +        goto done;

> +    }

> +

> +    OvsAcquireEventQueueLock(eventId);

> +    if (instance->eventQueue == NULL) {

> +        OvsReleaseEventQueueLock(eventId);

>          goto done;

>      }

> +

>      queue = instance->eventQueue;

>      if (queue->pendingIrp == irp) {

>          queue->pendingIrp = NULL;

>      }

> -    OvsReleaseEventQueueLock();

> +    OvsReleaseEventQueueLock(eventId);

>  done:

>      OvsCompleteIrpRequest(irp, 0, STATUS_CANCELLED);

>  }

> @@ -346,6 +420,7 @@ OvsWaitEventIoctl(PIRP irp,

>      POVS_OPEN_INSTANCE instance;

>      BOOLEAN cancelled = FALSE;

>      PDRIVER_CANCEL cancelRoutine;

> +    UINT32 eventId;

> 

>      OVS_LOG_TRACE("Enter: inputLength: %u", inputLength);

> 

> @@ -362,7 +437,14 @@ OvsWaitEventIoctl(PIRP irp,

>          return STATUS_INVALID_PARAMETER;

>      }

> 

> -    OvsAcquireEventQueueLock();

> +    status = OvsGetMcastEventId(instance->protocol,

> +                                instance->mcastMask,

> +                                &eventId);

> +    if (status != NDIS_STATUS_SUCCESS) {

> +        return STATUS_INVALID_PARAMETER;

> +    }

> +    OvsAcquireEventQueueLock(eventId);

> +

>      queue = (POVS_EVENT_QUEUE)instance->eventQueue;

>      if (queue == NULL) {

>          OVS_LOG_TRACE("Exit: Event queue does not exist");

> @@ -388,7 +470,7 @@ OvsWaitEventIoctl(PIRP irp,

>      }

> 

>  unlock:

> -    OvsReleaseEventQueueLock();

> +    OvsReleaseEventQueueLock(eventId);

>      if (cancelled) {

>          OvsCompleteIrpRequest(irp, 0, STATUS_CANCELLED);

>          OVS_LOG_INFO("Event IRP cancelled: %p", irp);

> @@ -414,7 +496,7 @@ OvsRemoveVportEventEntry(POVS_OPEN_INSTANCE

> instance,

>      POVS_EVENT_QUEUE queue;

>      POVS_EVENT_QUEUE_ELEM elem;

> 

> -    OvsAcquireEventQueueLock();

> +    OvsAcquireEventQueueLock(OVS_MCAST_VPORT_EVENT);

> 

>      queue = (POVS_EVENT_QUEUE)instance->eventQueue;

> 

> @@ -432,6 +514,105 @@

> OvsRemoveVportEventEntry(POVS_OPEN_INSTANCE instance,

>      }

> 

>  remove_event_done:

> -    OvsReleaseEventQueueLock();

> +    OvsReleaseEventQueueLock(OVS_MCAST_VPORT_EVENT);

> +    return status;

> +}

> +

> +/*

> + * --------------------------------------------------------------------------

> + * OvsPostCtEvent - used to post a Conntrack related event

> + *

> + * Side effects: User thread may be woken up.

> + * XXX - Try to consolidate PostEvent for Vport/Ct events

> + * --------------------------------------------------------------------------

> + */

> +VOID

> +OvsPostCtEvent(POVS_CT_EVENT_ENTRY ctEvent)

> +{

> +    POVS_EVENT_QUEUE_ELEM elem;

> +    POVS_EVENT_QUEUE queue;

> +    PLIST_ENTRY link;

> +    LIST_ENTRY list;

> +    PLIST_ENTRY entry;

> +    PIRP irp;

> +

> +    InitializeListHead(&list);

> +

> +    OvsAcquireEventQueueLock(OVS_MCAST_CT_EVENT);

> +

> +    LIST_FORALL(&ovsEventQueueArr[OVS_MCAST_CT_EVENT], link) {

> +        queue = CONTAINING_RECORD(link, OVS_EVENT_QUEUE, queueLink);

> +        if ((ctEvent->type & queue->mask) == 0) {

> +            continue;

> +        }

> +        ctEvent->type &= queue->mask;

> +

> +        elem = (POVS_EVENT_QUEUE_ELEM)OvsAllocateMemoryWithTag(

> +            sizeof(*elem), OVS_EVENT_POOL_TAG);

> +

> +        if (elem == NULL) {

> +            OvsReleaseEventQueueLock(OVS_MCAST_CT_EVENT);

> +            return;

> +        }

> +

> +        RtlCopyMemory(&elem->ctEvent, ctEvent, sizeof elem->ctEvent);

> +        InsertTailList(&queue->elemList, &elem->link);

> +        queue->numElems++;

> +

> +        if (queue->pendingIrp != NULL) {

> +            PDRIVER_CANCEL cancelRoutine;

> +            irp = queue->pendingIrp;

> +            queue->pendingIrp = NULL;

> +            cancelRoutine = IoSetCancelRoutine(irp, NULL);

> +            if (cancelRoutine) {

> +                InsertTailList(&list, &irp->Tail.Overlay.ListEntry);

> +            }

> +        }

> +    }

> +

> +    OvsReleaseEventQueueLock(OVS_MCAST_CT_EVENT);

> +    while (!IsListEmpty(&list)) {

> +        entry = RemoveHeadList(&list);

> +        irp = CONTAINING_RECORD(entry, IRP, Tail.Overlay.ListEntry);

> +        OvsCompleteIrpRequest(irp, 0, STATUS_SUCCESS);

> +    }

> +}

> +

> +/*

> + *--------------------------------------------------------------------------

> + * Poll event queued in the event queue.always synchronous.

> + *

> + * Results:

> + *     STATUS_SUCCESS event was dequeued

> + *     STATUS_UNSUCCESSFUL the queue is empty.

> + * --------------------------------------------------------------------------

> + */

> +NTSTATUS

> +OvsRemoveCtEventEntry(POVS_OPEN_INSTANCE instance,

> +                      POVS_CT_EVENT_ENTRY entry)

> +{

> +    NTSTATUS status = STATUS_UNSUCCESSFUL;

> +    POVS_EVENT_QUEUE queue;

> +    POVS_EVENT_QUEUE_ELEM elem;

> +

> +    OvsAcquireEventQueueLock(OVS_MCAST_CT_EVENT);

> +

> +    queue = (POVS_EVENT_QUEUE)instance->eventQueue;

> +

> +    if (queue == NULL) {

> +        ASSERT(queue);

> +        goto remove_event_done;

> +    }

> +

> +    if (queue->numElems) {

> +        elem = (POVS_EVENT_QUEUE_ELEM)RemoveHeadList(&queue-

> >elemList);

> +        *entry = elem->ctEvent;

> +        OvsFreeMemoryWithTag(elem, OVS_EVENT_POOL_TAG);

> +        queue->numElems--;

> +        status = STATUS_SUCCESS;

> +    }

> +

> +remove_event_done:

> +    OvsReleaseEventQueueLock(OVS_MCAST_CT_EVENT);

>      return status;

>  }

> diff --git a/datapath-windows/ovsext/Event.h b/datapath-

> windows/ovsext/Event.h

> index 255594c..b579463 100644

> --- a/datapath-windows/ovsext/Event.h

> +++ b/datapath-windows/ovsext/Event.h

> @@ -16,16 +16,28 @@

> 

>  #ifndef __EVENT_H_

>  #define __EVENT_H_ 1

> +#include "Conntrack.h"

> +

> +typedef struct _OVS_CT_EVENT_ENTRY {

> +    OVS_CT_ENTRY entry;

> +    UINT8 type;

> +    UINT64 pad[10];

> +} OVS_CT_EVENT_ENTRY, *POVS_CT_EVENT_ENTRY;

> 

>  typedef struct _OVS_EVENT_QUEUE_ELEM {

>      LIST_ENTRY link;

> -    OVS_VPORT_EVENT_ENTRY vportEvent;

> +    union {

> +        OVS_VPORT_EVENT_ENTRY vportEvent;

> +        OVS_CT_EVENT_ENTRY ctEvent;

> +    };

>  } OVS_EVENT_QUEUE_ELEM, *POVS_EVENT_QUEUE_ELEM;

> 

>  typedef struct _OVS_EVENT_QUEUE {

>      LIST_ENTRY queueLink;

>      LIST_ENTRY elemList;

>      UINT32 mask;

> +    UINT32 mcastEventId;

> +    UINT32 protocol;

>      UINT16 numElems;

>      BOOLEAN pollAll;

>      PIRP pendingIrp;

> @@ -39,6 +51,7 @@ struct _OVS_OPEN_INSTANCE;

> 

>  VOID OvsCleanupEvent(struct _OVS_OPEN_INSTANCE *instance);

>  VOID OvsPostVportEvent(POVS_VPORT_EVENT_ENTRY event);

> +VOID OvsPostCtEvent(POVS_CT_EVENT_ENTRY ctEvent);

>  NTSTATUS OvsSubscribeEventIoctl(PFILE_OBJECT fileObject, PVOID

> inputBuffer,

>                                  UINT32 inputLength);

>  NTSTATUS OvsPollEventIoctl(PFILE_OBJECT fileObject, PVOID inputBuffer,

> @@ -48,5 +61,7 @@ NTSTATUS OvsWaitEventIoctl(PIRP irp, PFILE_OBJECT

> fileObject,

>                             PVOID inputBuffer, UINT32 inputLength);

>  NTSTATUS OvsRemoveVportEventEntry(POVS_OPEN_INSTANCE instance,

>                                    POVS_VPORT_EVENT_ENTRY entry);

> +NTSTATUS OvsRemoveCtEventEntry(POVS_OPEN_INSTANCE instance,

> +                               POVS_CT_EVENT_ENTRY entry);

> 

>  #endif /* __EVENT_H_ */

> --

> 2.9.0.windows.1

> 

> _______________________________________________

> dev mailing list

> dev@openvswitch.org

> http://openvswitch.org/mailman/listinfo/dev
Alin Serdean July 23, 2016, 1:10 a.m. UTC | #2
Looks good. Just a few questions from my side so I get up to speed with the changes in events.
Why do we need a lock per eventid ?

> +typedef struct _OVS_CT_EVENT_ENTRY {

> +    OVS_CT_ENTRY entry;

> +    UINT8 type;

> +    UINT64 pad[10];

> +} OVS_CT_EVENT_ENTRY, *POVS_CT_EVENT_ENTRY;


Why such a big pad?

Thanks,
Alin.

> -----Mesaj original-----

> De la: dev [mailto:dev-bounces@openvswitch.org] În numele Sairam

> Venugopal

> Trimis: Thursday, July 14, 2016 2:39 AM

> Către: dev@openvswitch.org

> Subiect: [ovs-dev] [PATCH 7/9] datapath-windows: Add support for multiple

> event queue in Event.c

> 

> Update Event.c to have multiple event queues and mechanism to retrieve

> the associated queue. Introduce OvsPostCtEvent and

> OvsRemoveCtEventEntry similar to OvsPostVportEvent and

> OvsRemoveVportEventEntry.
Sairam Venugopal July 25, 2016, 8:19 p.m. UTC | #3
I added the padding to keep the sizeof(OVS_CT_EVENT_ENTRY) ==
sizeof(OVS_VPORT_EVENT_ENTRY)


I should have sent this as part of Patch 0. This is the general idea:

1. Currently we only support subscribing/unsubscribing to Vport related
events. Even for VPORT, we only care about the VPORT-Down event.
2. Event.c maintains a single queue of Event entries that are then read by
user space. This worked because we only supported 1 event type.
3. In order to add support for multiple events, I modified the event queue
into an array (size 2). The size of the event queue array is driven by
value of enum (we explicitly add new event types here).
4. I decided to make Event Queue an array to avoid creating new ones for
every event type. This also meant smaller code changes and not having
multiple if-else in the code.
5. Each event queue can be uniquely identified based on the mcast
EventType and hence the requirement for multiple locks based on the event
id.
6. Though we may not have multiple event queues subscribed for 1 socket, I
still wanted to ensure that we have support for it if it were ever
requested.
7. Each event type will be added to the underlying enum and subtypes can
be represent by means of masks (eg: vport up/down, ct delete/add/update
etc.,)
/* Supported mcast event groups */
enum OVS_MCAST_EVENT_ENTRIES {
    OVS_MCAST_VPORT_EVENT,
    OVS_MCAST_CT_EVENT,
    __OVS_MCAST_EVENT_ENTRIES_MAX
};


I hope this clarifies the questions.

Thanks,
Sairam

On 7/22/16, 6:10 PM, "Alin Serdean" <aserdean@cloudbasesolutions.com>
wrote:

>Looks good. Just a few questions from my side so I get up to speed with

>the changes in events.

>Why do we need a lock per eventid ?

>

>> +typedef struct _OVS_CT_EVENT_ENTRY {

>> +    OVS_CT_ENTRY entry;

>> +    UINT8 type;

>> +    UINT64 pad[10];

>> +} OVS_CT_EVENT_ENTRY, *POVS_CT_EVENT_ENTRY;

>

>Why such a big pad?

>

>Thanks,

>Alin.

>

>> -----Mesaj original-----

>> De la: dev [mailto:dev-bounces@openvswitch.org] În numele Sairam

>> Venugopal

>> Trimis: Thursday, July 14, 2016 2:39 AM

>> Către: dev@openvswitch.org

>> Subiect: [ovs-dev] [PATCH 7/9] datapath-windows: Add support for

>>multiple

>> event queue in Event.c

>> 

>> Update Event.c to have multiple event queues and mechanism to retrieve

>> the associated queue. Introduce OvsPostCtEvent and

>> OvsRemoveCtEventEntry similar to OvsPostVportEvent and

>> OvsRemoveVportEventEntry.
Alin Serdean July 25, 2016, 9:50 p.m. UTC | #4
Thanks a lot for the info. It would be nice to have it in the initial cover letter just to know the broader idea about the intent :).
From the code I could see the direction you want to go with it, but I wanted some confirmation.

Acked-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions.com>


> -----Mesaj original-----

> De la: Sairam Venugopal [mailto:vsairam@vmware.com]

> Trimis: Monday, July 25, 2016 11:19 PM

> Către: Alin Serdean <aserdean@cloudbasesolutions.com>;

> dev@openvswitch.org

> Subiect: Re: [ovs-dev] [PATCH 7/9] datapath-windows: Add support for

> multiple event queue in Event.c

> 

> I added the padding to keep the sizeof(OVS_CT_EVENT_ENTRY) ==

> sizeof(OVS_VPORT_EVENT_ENTRY)

> 

> 

> I should have sent this as part of Patch 0. This is the general idea:

> 

> 1. Currently we only support subscribing/unsubscribing to Vport related

> events. Even for VPORT, we only care about the VPORT-Down event.

> 2. Event.c maintains a single queue of Event entries that are then read by

> user space. This worked because we only supported 1 event type.

> 3. In order to add support for multiple events, I modified the event queue

> into an array (size 2). The size of the event queue array is driven by value of

> enum (we explicitly add new event types here).

> 4. I decided to make Event Queue an array to avoid creating new ones for

> every event type. This also meant smaller code changes and not having

> multiple if-else in the code.

> 5. Each event queue can be uniquely identified based on the mcast

> EventType and hence the requirement for multiple locks based on the event

> id.

> 6. Though we may not have multiple event queues subscribed for 1 socket, I

> still wanted to ensure that we have support for it if it were ever requested.

[Alin Gabriel Serdean: ] That was my main concern because it can be added in the future. Good call :).
> 7. Each event type will be added to the underlying enum and subtypes can

> be represent by means of masks (eg: vport up/down, ct delete/add/update

> etc.,)

> /* Supported mcast event groups */

> enum OVS_MCAST_EVENT_ENTRIES {

>     OVS_MCAST_VPORT_EVENT,

>     OVS_MCAST_CT_EVENT,

>     __OVS_MCAST_EVENT_ENTRIES_MAX

> };

> 

> 

> I hope this clarifies the questions.

> 

> Thanks,

> Sairam

> 

> On 7/22/16, 6:10 PM, "Alin Serdean" <aserdean@cloudbasesolutions.com>

> wrote:

> 

> >Looks good. Just a few questions from my side so I get up to speed with

> >the changes in events.

> >Why do we need a lock per eventid ?

> >

> >> +typedef struct _OVS_CT_EVENT_ENTRY {

> >> +    OVS_CT_ENTRY entry;

> >> +    UINT8 type;

> >> +    UINT64 pad[10];

> >> +} OVS_CT_EVENT_ENTRY, *POVS_CT_EVENT_ENTRY;

> >

> >Why such a big pad?

> >

> >Thanks,

> >Alin.

> >

> >> -----Mesaj original-----

> >> De la: dev [mailto:dev-bounces@openvswitch.org] În numele Sairam

> >>Venugopal

> >> Trimis: Thursday, July 14, 2016 2:39 AM

> >> Către: dev@openvswitch.org

> >> Subiect: [ovs-dev] [PATCH 7/9] datapath-windows: Add support for

> >>multiple  event queue in Event.c

> >>

> >> Update Event.c to have multiple event queues and mechanism to

> >> retrieve the associated queue. Introduce OvsPostCtEvent and

> >> OvsRemoveCtEventEntry similar to OvsPostVportEvent and

> >> OvsRemoveVportEventEntry.
diff mbox

Patch

diff --git a/datapath-windows/ovsext/Event.c b/datapath-windows/ovsext/Event.c
index 8ff0322..ffc32a6 100644
--- a/datapath-windows/ovsext/Event.c
+++ b/datapath-windows/ovsext/Event.c
@@ -26,36 +26,62 @@ 
 #define OVS_DBG_MOD OVS_DBG_EVENT
 #include "Debug.h"
 
-LIST_ENTRY ovsEventQueue;
-static NDIS_SPIN_LOCK eventQueueLock;
-UINT32 ovsNumEventQueue;
+LIST_ENTRY ovsEventQueueArr[OVS_MCAST_EVENT_ENTRIES_MAX];
+static NDIS_SPIN_LOCK eventQueueLockArr[OVS_MCAST_EVENT_ENTRIES_MAX];
+UINT32 ovsNumEventQueueArr[OVS_MCAST_EVENT_ENTRIES_MAX];
 
 NTSTATUS
 OvsInitEventQueue()
 {
-    InitializeListHead(&ovsEventQueue);
-    NdisAllocateSpinLock(&eventQueueLock);
+    for (int i = 0; i < OVS_MCAST_EVENT_ENTRIES_MAX; i++) {
+        InitializeListHead(&ovsEventQueueArr[i]);
+        NdisAllocateSpinLock(&eventQueueLockArr[i]);
+    }
     return STATUS_SUCCESS;
 }
 
 VOID
 OvsCleanupEventQueue()
 {
-    ASSERT(IsListEmpty(&ovsEventQueue));
-    ASSERT(ovsNumEventQueue == 0);
-    NdisFreeSpinLock(&eventQueueLock);
+    for (int i = 0; i < OVS_MCAST_EVENT_ENTRIES_MAX; i++) {
+        ASSERT(IsListEmpty(&ovsEventQueueArr[i]));
+        ASSERT(ovsNumEventQueueArr[i] == 0);
+        NdisFreeSpinLock(&eventQueueLockArr[i]);
+    }
 }
 
 static __inline VOID
-OvsAcquireEventQueueLock()
+OvsAcquireEventQueueLock(int eventId)
 {
-    NdisAcquireSpinLock(&eventQueueLock);
+    NdisAcquireSpinLock(&eventQueueLockArr[eventId]);
 }
 
 static __inline VOID
-OvsReleaseEventQueueLock()
+OvsReleaseEventQueueLock(int eventId)
 {
-   NdisReleaseSpinLock(&eventQueueLock);
+   NdisReleaseSpinLock(&eventQueueLockArr[eventId]);
+}
+
+NDIS_STATUS
+OvsGetMcastEventId(UINT32 protocol, UINT32 mcastMask, UINT32 *eventId)
+{
+    switch (protocol) {
+    case NETLINK_GENERIC:
+        *eventId = OVS_MCAST_VPORT_EVENT;
+        return NDIS_STATUS_SUCCESS;
+    case NETLINK_NETFILTER:
+        if ((mcastMask & OVS_EVENT_CT_NEW)
+            || (mcastMask & OVS_EVENT_CT_DELETE)) {
+            *eventId =  OVS_MCAST_CT_EVENT;
+            return NDIS_STATUS_SUCCESS;
+        }
+        break;
+    default:
+        goto error;
+    }
+
+error:
+    return NDIS_STATUS_INVALID_PARAMETER;
 }
 
 /*
@@ -68,14 +94,17 @@  OvsCleanupEvent(POVS_OPEN_INSTANCE instance)
 {
     POVS_EVENT_QUEUE queue;
     PIRP irp = NULL;
+    UINT32 eventId;
     queue = (POVS_EVENT_QUEUE)instance->eventQueue;
     if (queue) {
         POVS_EVENT_QUEUE_ELEM elem;
         PLIST_ENTRY link, next;
 
-        OvsAcquireEventQueueLock();
+        /* Handle the error */
+        OvsGetMcastEventId(instance->protocol, instance->mcastMask, &eventId);
+        OvsAcquireEventQueueLock(eventId);
         RemoveEntryList(&queue->queueLink);
-        ovsNumEventQueue--;
+        ovsNumEventQueueArr[eventId]--;
         if (queue->pendingIrp) {
             PDRIVER_CANCEL cancelRoutine;
             irp = queue->pendingIrp;
@@ -86,7 +115,7 @@  OvsCleanupEvent(POVS_OPEN_INSTANCE instance)
             }
         }
         instance->eventQueue = NULL;
-        OvsReleaseEventQueueLock();
+        OvsReleaseEventQueueLock(eventId);
         if (irp) {
             OvsCompleteIrpRequest(irp, 0, STATUS_SUCCESS);
         }
@@ -115,7 +144,7 @@  OvsPostVportEvent(POVS_VPORT_EVENT_ENTRY event)
     POVS_EVENT_QUEUE queue;
     PLIST_ENTRY link;
     LIST_ENTRY list;
-   PLIST_ENTRY entry;
+    PLIST_ENTRY entry;
     PIRP irp;
 
     InitializeListHead(&list);
@@ -123,9 +152,9 @@  OvsPostVportEvent(POVS_VPORT_EVENT_ENTRY event)
     OVS_LOG_TRACE("Enter: portNo: %#x, status: %#x", event->portNo,
                   event->type);
 
-    OvsAcquireEventQueueLock();
+    OvsAcquireEventQueueLock(OVS_MCAST_VPORT_EVENT);
 
-    LIST_FORALL(&ovsEventQueue, link) {
+    LIST_FORALL(&ovsEventQueueArr[OVS_MCAST_VPORT_EVENT], link) {
         queue = CONTAINING_RECORD(link, OVS_EVENT_QUEUE, queueLink);
         if ((event->type & queue->mask) == 0) {
             continue;
@@ -137,7 +166,7 @@  OvsPostVportEvent(POVS_VPORT_EVENT_ENTRY event)
 
         if (elem == NULL) {
             OVS_LOG_WARN("Fail to allocate memory for event");
-            OvsReleaseEventQueueLock();
+            OvsReleaseEventQueueLock(OVS_MCAST_VPORT_EVENT);
             return;
         }
 
@@ -157,7 +186,7 @@  OvsPostVportEvent(POVS_VPORT_EVENT_ENTRY event)
             }
         }
     }
-    OvsReleaseEventQueueLock();
+    OvsReleaseEventQueueLock(OVS_MCAST_VPORT_EVENT);
     while (!IsListEmpty(&list)) {
         entry = RemoveHeadList(&list);
         irp = CONTAINING_RECORD(entry, IRP, Tail.Overlay.ListEntry);
@@ -189,17 +218,25 @@  OvsSubscribeEventIoctl(PFILE_OBJECT fileObject,
     NTSTATUS status = STATUS_SUCCESS;
     POVS_OPEN_INSTANCE instance;
     POVS_EVENT_QUEUE queue = NULL;
+    UINT32 eventId;
 
     OVS_LOG_TRACE("Enter: fileObject: %p, inputLength: %d", fileObject,
                   inputLength);
 
-    if (inputLength < sizeof (OVS_EVENT_SUBSCRIBE) ||
-        (request->mask & OVS_EVENT_MASK_ALL) == 0) {
-        OVS_LOG_TRACE("Exit: subscribe failed with invalid request.");
+    if (request->protocol == NETLINK_GENERIC) {
+        if (inputLength < sizeof (OVS_EVENT_SUBSCRIBE) ||
+            (request->mask & OVS_EVENT_MASK_ALL) == 0) {
+            OVS_LOG_TRACE("Exit: subscribe failed with invalid request.");
+            return STATUS_INVALID_PARAMETER;
+        }
+    }
+
+    status = OvsGetMcastEventId(request->protocol, request->mask, &eventId);
+    if (status != NDIS_STATUS_SUCCESS) {
         return STATUS_INVALID_PARAMETER;
     }
 
-    OvsAcquireEventQueueLock();
+    OvsAcquireEventQueueLock(eventId);
 
     instance = OvsGetOpenInstance(fileObject, request->dpNo);
 
@@ -214,9 +251,23 @@  OvsSubscribeEventIoctl(PFILE_OBJECT fileObject,
      */
     queue = (POVS_EVENT_QUEUE)instance->eventQueue;
     if (request->subscribe && queue) {
-        if (queue->mask != request->mask) {
+        if (request->protocol == NETLINK_GENERIC
+            && queue->mask != request->mask) {
             status = STATUS_INVALID_PARAMETER;
-            OVS_LOG_WARN("Can not chnage mask when the queue is subscribed");
+            OVS_LOG_WARN("Can not change mask when the queue is subscribed");
+            goto done_event_subscribe;
+        }
+        if (request->protocol == NETLINK_NETFILTER) {
+            if (queue->mask == request->mask) {
+                /* Resubscribing to subscribed event */
+                status = STATUS_SUCCESS;
+                goto done_event_subscribe;
+            } else {
+                /* Update the instance and queue mask to reflect this */
+                queue->mask |= request->mask;
+                instance->mcastMask |= request->mask;
+            }
+            status = STATUS_SUCCESS;
             goto done_event_subscribe;
         }
     } else if (!request->subscribe && queue == NULL) {
@@ -234,20 +285,28 @@  OvsSubscribeEventIoctl(PFILE_OBJECT fileObject,
         }
         InitializeListHead(&queue->elemList);
         queue->mask = request->mask;
+        queue->mcastEventId = eventId;
         queue->pendingIrp = NULL;
         queue->numElems = 0;
-        InsertHeadList(&ovsEventQueue, &queue->queueLink);
-        ovsNumEventQueue++;
+        InsertHeadList(&ovsEventQueueArr[eventId], &queue->queueLink);
+        ovsNumEventQueueArr[eventId]++;
         instance->eventQueue = queue;
+        instance->mcastMask = request->mask;
         queue->instance = instance;
     } else {
         queue = (POVS_EVENT_QUEUE)instance->eventQueue;
-        RemoveEntryList(&queue->queueLink);
-        ovsNumEventQueue--;
-        instance->eventQueue = NULL;
+        queue->mask &= ~(request->mask);
+        instance->mcastMask &= ~(request->mask);
+        if (!queue->mask) {
+            /* No other mcast group exists */
+            RemoveEntryList(&queue->queueLink);
+            ovsNumEventQueueArr[eventId]--;
+            instance->eventQueue = NULL;
+            instance->mcastMask = 0;
+        }
     }
 done_event_subscribe:
-    if (!request->subscribe && queue) {
+    if (!request->subscribe && queue && !queue->mask) {
         POVS_EVENT_QUEUE_ELEM elem;
         PLIST_ENTRY link, next;
         PIRP irp = NULL;
@@ -260,7 +319,7 @@  done_event_subscribe:
                 irp = NULL;
             }
         }
-        OvsReleaseEventQueueLock();
+        OvsReleaseEventQueueLock(eventId);
         if (irp) {
             OvsCompleteIrpRequest(queue->pendingIrp, 0, STATUS_SUCCESS);
         }
@@ -270,7 +329,7 @@  done_event_subscribe:
         }
         OvsFreeMemoryWithTag(queue, OVS_EVENT_POOL_TAG);
     } else {
-        OvsReleaseEventQueueLock();
+        OvsReleaseEventQueueLock(eventId);
     }
     OVS_LOG_TRACE("Exit: subscribe event with status: %#x.", status);
     return status;
@@ -294,6 +353,8 @@  OvsCancelIrp(PDEVICE_OBJECT deviceObject,
     PFILE_OBJECT fileObject;
     POVS_EVENT_QUEUE queue;
     POVS_OPEN_INSTANCE instance;
+    UINT32 eventId;
+    NDIS_STATUS status;
 
     UNREFERENCED_PARAMETER(deviceObject);
 
@@ -305,17 +366,30 @@  OvsCancelIrp(PDEVICE_OBJECT deviceObject,
     if (fileObject == NULL) {
         goto done;
     }
-    OvsAcquireEventQueueLock();
+
     instance = (POVS_OPEN_INSTANCE)fileObject->FsContext;
-    if (instance == NULL || instance->eventQueue == NULL) {
-        OvsReleaseEventQueueLock();
+    if (instance == NULL) {
+        goto done;
+    }
+
+    status = OvsGetMcastEventId(instance->protocol,
+                                instance->mcastMask,
+                                &eventId);
+    if (status != NDIS_STATUS_SUCCESS) {
+        goto done;
+    }
+
+    OvsAcquireEventQueueLock(eventId);
+    if (instance->eventQueue == NULL) {
+        OvsReleaseEventQueueLock(eventId);
         goto done;
     }
+
     queue = instance->eventQueue;
     if (queue->pendingIrp == irp) {
         queue->pendingIrp = NULL;
     }
-    OvsReleaseEventQueueLock();
+    OvsReleaseEventQueueLock(eventId);
 done:
     OvsCompleteIrpRequest(irp, 0, STATUS_CANCELLED);
 }
@@ -346,6 +420,7 @@  OvsWaitEventIoctl(PIRP irp,
     POVS_OPEN_INSTANCE instance;
     BOOLEAN cancelled = FALSE;
     PDRIVER_CANCEL cancelRoutine;
+    UINT32 eventId;
 
     OVS_LOG_TRACE("Enter: inputLength: %u", inputLength);
 
@@ -362,7 +437,14 @@  OvsWaitEventIoctl(PIRP irp,
         return STATUS_INVALID_PARAMETER;
     }
 
-    OvsAcquireEventQueueLock();
+    status = OvsGetMcastEventId(instance->protocol,
+                                instance->mcastMask,
+                                &eventId);
+    if (status != NDIS_STATUS_SUCCESS) {
+        return STATUS_INVALID_PARAMETER;
+    }
+    OvsAcquireEventQueueLock(eventId);
+
     queue = (POVS_EVENT_QUEUE)instance->eventQueue;
     if (queue == NULL) {
         OVS_LOG_TRACE("Exit: Event queue does not exist");
@@ -388,7 +470,7 @@  OvsWaitEventIoctl(PIRP irp,
     }
 
 unlock:
-    OvsReleaseEventQueueLock();
+    OvsReleaseEventQueueLock(eventId);
     if (cancelled) {
         OvsCompleteIrpRequest(irp, 0, STATUS_CANCELLED);
         OVS_LOG_INFO("Event IRP cancelled: %p", irp);
@@ -414,7 +496,7 @@  OvsRemoveVportEventEntry(POVS_OPEN_INSTANCE instance,
     POVS_EVENT_QUEUE queue;
     POVS_EVENT_QUEUE_ELEM elem;
 
-    OvsAcquireEventQueueLock();
+    OvsAcquireEventQueueLock(OVS_MCAST_VPORT_EVENT);
 
     queue = (POVS_EVENT_QUEUE)instance->eventQueue;
 
@@ -432,6 +514,105 @@  OvsRemoveVportEventEntry(POVS_OPEN_INSTANCE instance,
     }
 
 remove_event_done:
-    OvsReleaseEventQueueLock();
+    OvsReleaseEventQueueLock(OVS_MCAST_VPORT_EVENT);
+    return status;
+}
+
+/*
+ * --------------------------------------------------------------------------
+ * OvsPostCtEvent - used to post a Conntrack related event
+ *
+ * Side effects: User thread may be woken up.
+ * XXX - Try to consolidate PostEvent for Vport/Ct events
+ * --------------------------------------------------------------------------
+ */
+VOID
+OvsPostCtEvent(POVS_CT_EVENT_ENTRY ctEvent)
+{
+    POVS_EVENT_QUEUE_ELEM elem;
+    POVS_EVENT_QUEUE queue;
+    PLIST_ENTRY link;
+    LIST_ENTRY list;
+    PLIST_ENTRY entry;
+    PIRP irp;
+
+    InitializeListHead(&list);
+
+    OvsAcquireEventQueueLock(OVS_MCAST_CT_EVENT);
+
+    LIST_FORALL(&ovsEventQueueArr[OVS_MCAST_CT_EVENT], link) {
+        queue = CONTAINING_RECORD(link, OVS_EVENT_QUEUE, queueLink);
+        if ((ctEvent->type & queue->mask) == 0) {
+            continue;
+        }
+        ctEvent->type &= queue->mask;
+
+        elem = (POVS_EVENT_QUEUE_ELEM)OvsAllocateMemoryWithTag(
+            sizeof(*elem), OVS_EVENT_POOL_TAG);
+
+        if (elem == NULL) {
+            OvsReleaseEventQueueLock(OVS_MCAST_CT_EVENT);
+            return;
+        }
+
+        RtlCopyMemory(&elem->ctEvent, ctEvent, sizeof elem->ctEvent);
+        InsertTailList(&queue->elemList, &elem->link);
+        queue->numElems++;
+
+        if (queue->pendingIrp != NULL) {
+            PDRIVER_CANCEL cancelRoutine;
+            irp = queue->pendingIrp;
+            queue->pendingIrp = NULL;
+            cancelRoutine = IoSetCancelRoutine(irp, NULL);
+            if (cancelRoutine) {
+                InsertTailList(&list, &irp->Tail.Overlay.ListEntry);
+            }
+        }
+    }
+
+    OvsReleaseEventQueueLock(OVS_MCAST_CT_EVENT);
+    while (!IsListEmpty(&list)) {
+        entry = RemoveHeadList(&list);
+        irp = CONTAINING_RECORD(entry, IRP, Tail.Overlay.ListEntry);
+        OvsCompleteIrpRequest(irp, 0, STATUS_SUCCESS);
+    }
+}
+
+/*
+ *--------------------------------------------------------------------------
+ * Poll event queued in the event queue.always synchronous.
+ *
+ * Results:
+ *     STATUS_SUCCESS event was dequeued
+ *     STATUS_UNSUCCESSFUL the queue is empty.
+ * --------------------------------------------------------------------------
+ */
+NTSTATUS
+OvsRemoveCtEventEntry(POVS_OPEN_INSTANCE instance,
+                      POVS_CT_EVENT_ENTRY entry)
+{
+    NTSTATUS status = STATUS_UNSUCCESSFUL;
+    POVS_EVENT_QUEUE queue;
+    POVS_EVENT_QUEUE_ELEM elem;
+
+    OvsAcquireEventQueueLock(OVS_MCAST_CT_EVENT);
+
+    queue = (POVS_EVENT_QUEUE)instance->eventQueue;
+
+    if (queue == NULL) {
+        ASSERT(queue);
+        goto remove_event_done;
+    }
+
+    if (queue->numElems) {
+        elem = (POVS_EVENT_QUEUE_ELEM)RemoveHeadList(&queue->elemList);
+        *entry = elem->ctEvent;
+        OvsFreeMemoryWithTag(elem, OVS_EVENT_POOL_TAG);
+        queue->numElems--;
+        status = STATUS_SUCCESS;
+    }
+
+remove_event_done:
+    OvsReleaseEventQueueLock(OVS_MCAST_CT_EVENT);
     return status;
 }
diff --git a/datapath-windows/ovsext/Event.h b/datapath-windows/ovsext/Event.h
index 255594c..b579463 100644
--- a/datapath-windows/ovsext/Event.h
+++ b/datapath-windows/ovsext/Event.h
@@ -16,16 +16,28 @@ 
 
 #ifndef __EVENT_H_
 #define __EVENT_H_ 1
+#include "Conntrack.h"
+
+typedef struct _OVS_CT_EVENT_ENTRY {
+    OVS_CT_ENTRY entry;
+    UINT8 type;
+    UINT64 pad[10];
+} OVS_CT_EVENT_ENTRY, *POVS_CT_EVENT_ENTRY;
 
 typedef struct _OVS_EVENT_QUEUE_ELEM {
     LIST_ENTRY link;
-    OVS_VPORT_EVENT_ENTRY vportEvent;
+    union {
+        OVS_VPORT_EVENT_ENTRY vportEvent;
+        OVS_CT_EVENT_ENTRY ctEvent;
+    };
 } OVS_EVENT_QUEUE_ELEM, *POVS_EVENT_QUEUE_ELEM;
 
 typedef struct _OVS_EVENT_QUEUE {
     LIST_ENTRY queueLink;
     LIST_ENTRY elemList;
     UINT32 mask;
+    UINT32 mcastEventId;
+    UINT32 protocol;
     UINT16 numElems;
     BOOLEAN pollAll;
     PIRP pendingIrp;
@@ -39,6 +51,7 @@  struct _OVS_OPEN_INSTANCE;
 
 VOID OvsCleanupEvent(struct _OVS_OPEN_INSTANCE *instance);
 VOID OvsPostVportEvent(POVS_VPORT_EVENT_ENTRY event);
+VOID OvsPostCtEvent(POVS_CT_EVENT_ENTRY ctEvent);
 NTSTATUS OvsSubscribeEventIoctl(PFILE_OBJECT fileObject, PVOID inputBuffer,
                                 UINT32 inputLength);
 NTSTATUS OvsPollEventIoctl(PFILE_OBJECT fileObject, PVOID inputBuffer,
@@ -48,5 +61,7 @@  NTSTATUS OvsWaitEventIoctl(PIRP irp, PFILE_OBJECT fileObject,
                            PVOID inputBuffer, UINT32 inputLength);
 NTSTATUS OvsRemoveVportEventEntry(POVS_OPEN_INSTANCE instance,
                                   POVS_VPORT_EVENT_ENTRY entry);
+NTSTATUS OvsRemoveCtEventEntry(POVS_OPEN_INSTANCE instance,
+                               POVS_CT_EVENT_ENTRY entry);
 
 #endif /* __EVENT_H_ */