@@ -25,6 +25,8 @@
#include <unistd.h>
#include "socket-util.h"
#include "util.h"
+#include "latch.h"
+#include "guarded-list.h"
VLOG_DEFINE_THIS_MODULE(stopwatch);
@@ -73,6 +75,7 @@ enum stopwatch_op {
};
struct stopwatch_packet {
+ struct ovs_list list_node;
enum stopwatch_op op;
char name[32];
unsigned long long time;
@@ -82,7 +85,8 @@ static struct shash stopwatches = SHASH_INITIALIZER(&stopwatches);
static struct ovs_mutex stopwatches_lock = OVS_MUTEX_INITIALIZER;
static pthread_cond_t stopwatches_sync = PTHREAD_COND_INITIALIZER;
-static int stopwatch_pipe[2];
+static struct latch stopwatch_latch;
+static struct guarded_list stopwatch_commands;
static pthread_t stopwatch_thread_id;
static const char *unit_name[] = {
@@ -329,17 +333,33 @@ stopwatch_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
ds_destroy(&s);
}
+static struct stopwatch_packet *
+stopwatch_packet_create(enum stopwatch_op op)
+{
+ struct stopwatch_packet *pkt;
+
+ pkt = xzalloc(sizeof *pkt);
+ pkt->op = op;
+
+ return pkt;
+}
+
+static void
+stopwatch_packet_write(struct stopwatch_packet *pkt)
+{
+ guarded_list_push_back(&stopwatch_commands, &pkt->list_node, SIZE_MAX);
+ latch_set(&stopwatch_latch);
+}
+
static void
stopwatch_reset(struct unixctl_conn *conn, int argc OVS_UNUSED,
const char *argv[], void *aux OVS_UNUSED)
{
- struct stopwatch_packet pkt = {
- .op = OP_RESET,
- };
+ struct stopwatch_packet *pkt = stopwatch_packet_create(OP_RESET);
if (argc > 1) {
- ovs_strlcpy(pkt.name, argv[1], sizeof pkt.name);
+ ovs_strlcpy(pkt->name, argv[1], sizeof pkt->name);
}
- ignore(write(stopwatch_pipe[1], &pkt, sizeof pkt));
+ stopwatch_packet_write(pkt);
unixctl_command_reply(conn, "");
}
@@ -406,31 +426,34 @@ stopwatch_thread(void *ign OVS_UNUSED)
bool should_exit = false;
while (!should_exit) {
- struct stopwatch_packet pkt;
- while (read(stopwatch_pipe[0], &pkt, sizeof pkt) > 0) {
- ovs_mutex_lock(&stopwatches_lock);
- switch (pkt.op) {
+ struct ovs_list command_list;
+ struct stopwatch_packet *pkt;
+
+ guarded_list_pop_all(&stopwatch_commands, &command_list);
+ ovs_mutex_lock(&stopwatches_lock);
+ LIST_FOR_EACH_POP (pkt, list_node, &command_list) {
+ switch (pkt->op) {
case OP_START_SAMPLE:
- stopwatch_start_sample_protected(&pkt);
+ stopwatch_start_sample_protected(pkt);
break;
case OP_END_SAMPLE:
- stopwatch_end_sample_protected(&pkt);
+ stopwatch_end_sample_protected(pkt);
break;
case OP_SYNC:
xpthread_cond_signal(&stopwatches_sync);
break;
case OP_RESET:
- stopwatch_reset_protected(&pkt);
+ stopwatch_reset_protected(pkt);
break;
case OP_SHUTDOWN:
should_exit = true;
break;
}
- ovs_mutex_unlock(&stopwatches_lock);
}
+ ovs_mutex_unlock(&stopwatches_lock);
if (!should_exit) {
- poll_fd_wait(stopwatch_pipe[0], POLLIN);
+ latch_wait(&stopwatch_latch);
poll_block();
}
}
@@ -442,11 +465,8 @@ static void
stopwatch_exit(void)
{
struct shash_node *node, *node_next;
- struct stopwatch_packet pkt = {
- .op = OP_SHUTDOWN,
- };
-
- ignore(write(stopwatch_pipe[1], &pkt, sizeof pkt));
+ struct stopwatch_packet *pkt = stopwatch_packet_create(OP_SHUTDOWN);
+ stopwatch_packet_write(pkt);
xpthread_join(stopwatch_thread_id, NULL);
/* Process is exiting and we have joined the only
@@ -460,6 +480,8 @@ stopwatch_exit(void)
}
shash_destroy(&stopwatches);
ovs_mutex_destroy(&stopwatches_lock);
+ guarded_list_destroy(&stopwatch_commands);
+ latch_destroy(&stopwatch_latch);
}
static void
@@ -469,7 +491,8 @@ do_init_stopwatch(void)
stopwatch_show, NULL);
unixctl_command_register("stopwatch/reset", "[NAME]", 0, 1,
stopwatch_reset, NULL);
- xpipe_nonblocking(stopwatch_pipe);
+ guarded_list_init(&stopwatch_commands);
+ latch_init(&stopwatch_latch);
stopwatch_thread_id = ovs_thread_create(
"stopwatch", stopwatch_thread, NULL);
atexit(stopwatch_exit);
@@ -503,34 +526,27 @@ stopwatch_create(const char *name, enum stopwatch_units units)
void
stopwatch_start(const char *name, unsigned long long ts)
{
- struct stopwatch_packet pkt = {
- .op = OP_START_SAMPLE,
- .time = ts,
- };
- ovs_strlcpy(pkt.name, name, sizeof pkt.name);
- ignore(write(stopwatch_pipe[1], &pkt, sizeof pkt));
+ struct stopwatch_packet *pkt = stopwatch_packet_create(OP_START_SAMPLE);
+ ovs_strlcpy(pkt->name, name, sizeof pkt->name);
+ pkt->time = ts;
+ stopwatch_packet_write(pkt);
}
void
stopwatch_stop(const char *name, unsigned long long ts)
{
- struct stopwatch_packet pkt = {
- .op = OP_END_SAMPLE,
- .time = ts,
- };
- ovs_strlcpy(pkt.name, name, sizeof pkt.name);
- ignore(write(stopwatch_pipe[1], &pkt, sizeof pkt));
+ struct stopwatch_packet *pkt = stopwatch_packet_create(OP_END_SAMPLE);
+ ovs_strlcpy(pkt->name, name, sizeof pkt->name);
+ pkt->time = ts;
+ stopwatch_packet_write(pkt);
}
void
stopwatch_sync(void)
{
- struct stopwatch_packet pkt = {
- .op = OP_SYNC,
- };
-
+ struct stopwatch_packet *pkt = stopwatch_packet_create(OP_SYNC);
ovs_mutex_lock(&stopwatches_lock);
- ignore(write(stopwatch_pipe[1], &pkt, sizeof pkt));
+ stopwatch_packet_write(pkt);
ovs_mutex_cond_wait(&stopwatches_sync, &stopwatches_lock);
ovs_mutex_unlock(&stopwatches_lock);
}
Stopwatch was implemented using a Unix-only pipe structure. This commit changes to using a guarded list and latch in order to pass data between threads. Signed-off-by: Mark Michelson <mmichels@redhat.com> --- lib/stopwatch.c | 92 +++++++++++++++++++++++++++++++++------------------------ 1 file changed, 54 insertions(+), 38 deletions(-)