From patchwork Mon Dec 21 11:55:57 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Sava Jakovljev X-Patchwork-Id: 1418999 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (sender SPF authorized) smtp.mailfrom=googlegroups.com (client-ip=2a00:1450:4864:20::438; helo=mail-wr1-x438.google.com; envelope-from=swupdate+bncbaabbye2ql7qkgqebd2m7ri@googlegroups.com; receiver=) Authentication-Results: ozlabs.org; dmarc=none (p=none dis=none) header.from=teufel.de Authentication-Results: ozlabs.org; dkim=pass (2048-bit key; unprotected) header.d=googlegroups.com header.i=@googlegroups.com header.a=rsa-sha256 header.s=20161025 header.b=rvk1G5j+; dkim-atps=neutral Received: from mail-wr1-x438.google.com (mail-wr1-x438.google.com [IPv6:2a00:1450:4864:20::438]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 4Czyc71y7Wz9sTg for ; Mon, 21 Dec 2020 22:56:19 +1100 (AEDT) Received: by mail-wr1-x438.google.com with SMTP id r8sf8123818wro.22 for ; Mon, 21 Dec 2020 03:56:18 -0800 (PST) ARC-Seal: i=2; a=rsa-sha256; t=1608551776; cv=pass; d=google.com; s=arc-20160816; b=tJ5I91AH2AyLFGOlkn4D1HweBQ+F8BqdZB5L/sob7Icyalt1A/ARRMbJEs4kPr4fwe kUdwx3kqHlwVqy9ct2TjC/L4hMIm/l104OmIj/JertJCsoisegF6S10t4J6+FA/uX0dx Z31FlFbJesVzzq7S9dX3G/Iy4+IcJ2gYHAqV7RT3Iyo0npOHpdbFupObtoOWNrUodgkQ LD6PdOccE4qg9/8qYDln8dZXCqWbdZoxyph+7dl3c1bxnCAh41V0zFk8tH75eSkxZTZ2 hQ34vNyGhhaqpG8TNBTl7/yEBDvLOKyTQU5qzIYS87bd4TBsXb/xcp1vUeY25anVYG8D kheg== ARC-Message-Signature: i=2; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=list-unsubscribe:list-subscribe:list-archive:list-help:list-post :list-id:mailing-list:precedence:mime-version:message-id:date :subject:cc:to:from:sender:dkim-signature; bh=kmHBBoUun3Gz7b2bbknoDilzds80yoHKy8/o8uGCr18=; b=BOAwkopztWFpjvymIfroXzYquPMhCOmGHEpa562anSNZxF1QDhXtibJeEPfMyIm2kz 3DlS5Xbnt6HnnNGMa/iiH2E1yZzrZAQReqHIvMgtB0IRayiyzroL1Iob1IE8marLzyfj MdixsOC9q+RRwAzaR7Gn6FRpsqGHxGWue2mJ3ajkuCf4wY9bfdTT91lRaH4urLyb5GkZ 5K9zGfx9OhsMYd5hkEuoSqG+obqfyn1drY8df/svZum9mehmNuYZb4NXZGBdw1jS1NBC iXZB7QnuS3CcDEMYXf7cbb+rgaTnzZuv4eYSCp/czLr86Ic7C3Kqv5AXVJzEcAQK51Iz 62HA== ARC-Authentication-Results: i=2; gmr-mx.google.com; dkim=pass header.i=@teufel.de header.s=hse1 header.b="gY/wpceg"; spf=pass (google.com: domain of sava.jakovljev@teufel.de designates 83.246.65.159 as permitted sender) smtp.mailfrom=sava.jakovljev@teufel.de DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=googlegroups.com; s=20161025; h=sender:from:to:cc:subject:date:message-id:mime-version :x-original-sender:x-original-authentication-results:precedence :mailing-list:list-id:list-post:list-help:list-archive :list-subscribe:list-unsubscribe; bh=kmHBBoUun3Gz7b2bbknoDilzds80yoHKy8/o8uGCr18=; b=rvk1G5j+kfxYZFgtlLQawyXjgSkC/Dkgydw5aW7sdK3+dA+9t9SO/Qlq2p1MpTqdwI E+VRF9OJQOhZdouYAM/bnyqpliFoPZj9U3eV4GRodg4ekSPFC9LIOx43tWISzxo2ESAk ctnQsd8MIEbm0xxpw0GCNr16LViTYbxnRZYqXvLwwfSDkq7kvtV1SdMetBznRhCYSpfO V/CzjHwiB7qWAzSKU4uhPRR84Rvl57E65kd3aalDlnBaycvrBKOGUc6FxdiRZ1uHYxQl TegS3E81YfuC2hVIolBHM8JpmOe7yhKEw/hJjk+GLnKNLwpTm8h8knOvjt/m2PS/Qh4B RgbQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=sender:x-gm-message-state:from:to:cc:subject:date:message-id :mime-version:x-original-sender:x-original-authentication-results :precedence:mailing-list:list-id:x-spam-checked-in-group:list-post :list-help:list-archive:list-subscribe:list-unsubscribe; bh=kmHBBoUun3Gz7b2bbknoDilzds80yoHKy8/o8uGCr18=; b=YL/TBObZyKQbjbpUHiAWoZba3A6hFRB/h6z/lD2VRJ2c8m84mTvuXolfhojrcn9p0b kGwbkuOy4HiStWf7n6E06OR8RIDcS3iZgvla/QULvDyY3VoHCppqrjUDofV1DQL5wCbM 2wd6S09xATknpTnz353NWsDyhn9mFQvxLRt8SAzXZZtg2L84Wk6R3FFEUqVxqTJBbm7T b1TNoHc+DeQl0XHcIXhM5fToEUranUpZ7tqrioIroYpclYTMdwBElI5MiSgi3CoF1rl+ ne9sqsjFUNgpPohPmbYhX1XaHAB+wTqbiVmMgRChxTjMjhcrso+AquMVAfmT+BsgQawt pS+A== Sender: swupdate@googlegroups.com X-Gm-Message-State: AOAM533lqNXLTatitziUDZKureh0oEFHSnfG+emaJfTyqxccEKdprvop qHyqnn9ZzC94Qcxy7CTIHQ0= X-Google-Smtp-Source: ABdhPJykMpgHsXgeFAz37i+xBLZ5a+qvBE/BTBk45epXHzeHuesBtU1G7l2sRAwCcDErkw0910wDug== X-Received: by 2002:a5d:650f:: with SMTP id x15mr18231289wru.332.1608551776691; Mon, 21 Dec 2020 03:56:16 -0800 (PST) X-BeenThere: swupdate@googlegroups.com Received: by 2002:a1c:9916:: with SMTP id b22ls4610034wme.2.canary-gmail; Mon, 21 Dec 2020 03:56:15 -0800 (PST) X-Received: by 2002:a1c:bdc1:: with SMTP id n184mr16308935wmf.125.1608551775773; Mon, 21 Dec 2020 03:56:15 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1608551775; cv=none; d=google.com; s=arc-20160816; b=PfdMobMTsEFUs9fFOQ+KOvR/S6iMFGw7lPK2WRNVfq9IaIJpedQKswBiDx+bf2/u5g 0RHh52VGIFU+WCAoObrssmMUrsbIFbjhYfkCAJjUZe9BXHGMqVGfnMzM5lE0vGobHz6P AS5W6fFRyt+Qlr06SuA0l56xQn6VRovLNzrxG8/43eyMV+JigGMsOCMqd6IuBwxbjcan AGzFBln8hySOgFGecdoGNCFbnHTPIKDN0Q4gCtCtMhwa3RYx46v8PUkFtGObOBcyt/BC kpUdPYcUuk7PC4d6AK7gLHtaK8qWLYVLbH5fxL0wcdeLWg2A9lKvUcniR9nnudCLUEUt JXZQ== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=dkim-signature:content-transfer-encoding:mime-version:message-id :date:subject:cc:to:from; bh=IJLahLCRdTXBPUDblY5FEeLac4bEGoiKH0EqtlrVRUg=; b=a8PCn90LjyZ7dNNX9+lhofeVuA7Lw+GwDyd0qexz5EyFOeInpDL2uGYge2R3zVoS5A Smlb8G2fHyhNbuw7DcfUpiLXNa2TG90DbWurbPBfmNbIY3gUV+9UyvAvsuahApyeGXQU jdege1EwphESFvAtJN5Iss6AMFsLbzaMk76ExJIcw9HPcTvFxu33w+3wXeUztk5Kd8rr 4Ia0izix8lqWTuRS8uf6nB6hhRLaQ9B+un5h0/hkfYO0SDh9JrRYmbZl76xdhu+tUqn1 Kd0Ndwr4/xg0WFNBQp7QfTE4JnADsA9EuLsvgpAwcmkykE+jFq5tm2PH2cknJsjoan5W 68sA== ARC-Authentication-Results: i=1; gmr-mx.google.com; dkim=pass header.i=@teufel.de header.s=hse1 header.b="gY/wpceg"; spf=pass (google.com: domain of sava.jakovljev@teufel.de designates 83.246.65.159 as permitted sender) smtp.mailfrom=sava.jakovljev@teufel.de Received: from mx-relay17-hz2.antispameurope.com (mx-relay17-hz2.antispameurope.com. [83.246.65.159]) by gmr-mx.google.com with ESMTPS id d17si581037wma.4.2020.12.21.03.56.15 for (version=TLS1_2 cipher=ECDHE-ECDSA-CHACHA20-POLY1305 bits=256/256); Mon, 21 Dec 2020 03:56:15 -0800 (PST) Received-SPF: pass (google.com: domain of sava.jakovljev@teufel.de designates 83.246.65.159 as permitted sender) client-ip=83.246.65.159; Received: from unknown ([212.91.255.190]) by mx-relay17-hz2.antispameurope.com; Mon, 21 Dec 2020 12:56:15 +0100 From: Sava Jakovljev To: CC: Sava Jakovljev Subject: [swupdate] [PATCH v4] Execute subprocess IPC in separate thread Date: Mon, 21 Dec 2020 12:55:57 +0100 Message-ID: <20201221115557.122133-1-sava.jakovljev@teufel.de> X-Mailer: git-send-email 2.26.2 MIME-Version: 1.0 X-Originating-IP: [10.10.25.44] X-ClientProxiedBy: DNS-EX-02.teufel.local (10.10.0.81) To DNS-EX-02.teufel.local (10.10.0.81) X-C2ProcessedOrg: b93e13a0-e8da-4ba4-97b8-f14375b21c41 X-cloud-security-sender: sava.jakovljev@teufel.de X-cloud-security-recipient: swupdate@googlegroups.com X-cloud-security-Virusscan: CLEAN X-cloud-security-disclaimer: This E-Mail was scanned by E-Mailservice on mx-relay17-hz2.antispameurope.com with 068BD12EDE37 X-cloud-security-connect: unknown[212.91.255.190], TLS=1, IP=212.91.255.190 X-cloud-security: scantime:.3229 X-Original-Sender: sava.jakovljev@teufel.de X-Original-Authentication-Results: gmr-mx.google.com; dkim=pass header.i=@teufel.de header.s=hse1 header.b="gY/wpceg"; spf=pass (google.com: domain of sava.jakovljev@teufel.de designates 83.246.65.159 as permitted sender) smtp.mailfrom=sava.jakovljev@teufel.de Precedence: list Mailing-list: list swupdate@googlegroups.com; contact swupdate+owners@googlegroups.com List-ID: X-Spam-Checked-In-Group: swupdate@googlegroups.com X-Google-Group-Id: 605343134186 List-Post: , List-Help: , List-Archive: , List-Unsubscribe: , Signed-off-by: Sava Jakovljev --- Subprocesses may want to alter global SWUpdate state. In order to do this, subprocess RPC must be handled with network thread being kept free, in order to avoid deadlocks. One currently exists in Suricatta activation IPC. v3 of this patch replaces printf calls with swupdate logging macros. Secondly, variable containing number of messages in the subprocess queue has been removed, since there is no need for it and the predicate for the corresponding condition variable has been modified to use SIMPLEQ_EMPTY helper. Last but not least, potential race during start-up has been resolved. v4 of this patch declares argument to send_subprocess_reply a const. core/network_thread.c | 218 ++++++++++++++++++++++++++++-------------- 1 file changed, 145 insertions(+), 73 deletions(-) -- 2.26.2 diff --git a/core/network_thread.c b/core/network_thread.c index 0e6080d..76a7ca1 100644 --- a/core/network_thread.c +++ b/core/network_thread.c @@ -56,6 +56,19 @@ static unsigned long nrmsgs = 0; static pthread_mutex_t msglock = PTHREAD_MUTEX_INITIALIZER; +struct subprocess_msg_elem { + ipc_message message; + int client; + SIMPLEQ_ENTRY(subprocess_msg_elem) next; +}; + +SIMPLEQ_HEAD(subprocess_msglist, subprocess_msg_elem); +static struct subprocess_msglist subprocess_messages; + +static pthread_t subprocess_ipc_handler_thread_id; +static pthread_mutex_t subprocess_msg_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t subprocess_wkup = PTHREAD_COND_INITIALIZER; + static bool is_selection_allowed(const char *software_set, char *running_mode, struct dict const *acceptedlist) { @@ -239,6 +252,111 @@ static void unlink_socket(void) unlink(get_ctrl_socket()); } +static void send_subprocess_reply( + const struct subprocess_msg_elem *const subprocess_msg) +{ + if (write(subprocess_msg->client, &subprocess_msg->message, + sizeof(subprocess_msg->message)) < 0) + ERROR("Error write on socket ctrl"); +} + +static void handle_subprocess_ipc(struct subprocess_msg_elem *subprocess_msg) +{ + ipc_message *msg = &subprocess_msg->message; + int pipe = pctl_getfd_from_type(msg->data.procmsg.source); + if (pipe < 0) { + ERROR("Cannot find channel for requested process"); + msg->type = NACK; + + return; + } + + TRACE("Received Message for %s", + pctl_getname_from_type(msg->data.procmsg.source)); + if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) { + ERROR("Pipe not available or closed: %d", pipe); + msg->type = NACK; + + return; + } + + /* + * Cleanup the queue to be sure there are not + * outstanding messages + */ + empty_pipe(pipe); + + int ret = write(pipe, msg, sizeof(*msg)); + if (ret != sizeof(*msg)) { + ERROR("Writing to pipe failed !"); + msg->type = NACK; + + return; + } + + /* + * Do not block forever for an answer + * This would block the whole thread + * If a message requires more time, + * the destination process should sent an + * answer back explaining this in the payload + */ + fd_set pipefds; + FD_ZERO(&pipefds); + FD_SET(pipe, &pipefds); + + struct timeval tv; + tv.tv_usec = 0; + if (!msg->data.procmsg.timeout) + tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT; + else + tv.tv_sec = msg->data.procmsg.timeout; + ret = select(pipe + 1, &pipefds, NULL, NULL, &tv); + + /* + * If there is an error or timeout, + * send a NACK back + */ + if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) { + msg->type = NACK; + + return; + } + + ret = read(pipe, msg, sizeof(*msg)); + if (ret != sizeof(*msg)) { + ERROR("Reading from pipe failed !"); + msg->type = NACK; + } +} + +static void *subprocess_thread (void *data) +{ + (void)data; + pthread_mutex_lock(&subprocess_msg_lock); + + while(1) { + while(!SIMPLEQ_EMPTY(&subprocess_messages)) { + struct subprocess_msg_elem *subprocess_msg; + subprocess_msg = SIMPLEQ_FIRST(&subprocess_messages); + SIMPLEQ_REMOVE_HEAD(&subprocess_messages, next); + + pthread_mutex_unlock(&subprocess_msg_lock); + + handle_subprocess_ipc(subprocess_msg); + send_subprocess_reply(subprocess_msg); + close(subprocess_msg->client); + + free(subprocess_msg); + pthread_mutex_lock(&subprocess_msg_lock); + } + + pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock); + } + + return NULL; +} + void *network_thread (void *data) { struct installer *instp = (struct installer *)data; @@ -249,10 +367,8 @@ void *network_thread (void *data) int nread; struct msg_elem *notification; int ret; - int pipe; - fd_set pipefds; - struct timeval tv; update_state_t value; + struct subprocess_msg_elem *subprocess_msg; if (!instp) { TRACE("Fatal error: Network thread aborting..."); @@ -260,8 +376,11 @@ void *network_thread (void *data) } SIMPLEQ_INIT(¬ifymsgs); + SIMPLEQ_INIT(&subprocess_messages); register_notifier(network_notifier); + subprocess_ipc_handler_thread_id = start_thread(subprocess_thread, NULL); + /* Initialize and bind to UDS */ ctrllisten = listener_create(get_ctrl_socket(), SOCK_STREAM); if (ctrllisten < 0 ) { @@ -287,7 +406,8 @@ void *network_thread (void *data) nread = read(ctrlconnfd, (void *)&msg, sizeof(msg)); if (nread != sizeof(msg)) { - TRACE("IPC message too short: fragmentation not supported"); + TRACE("IPC message too short: fragmentation not supported (read %d bytes)", + nread); close(ctrlconnfd); continue; } @@ -309,72 +429,21 @@ void *network_thread (void *data) } break; case SWUPDATE_SUBPROCESS: - /* - * this request is not for the installer, - * but for one of the subprocesses - * forward the request without checking - * the payload - */ - - pipe = pctl_getfd_from_type(msg.data.procmsg.source); - if (pipe < 0) { - ERROR("Cannot find channel for requested process"); - msg.type = NACK; - break; - } - TRACE("Received Message for %s", - pctl_getname_from_type(msg.data.procmsg.source)); - if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) { - ERROR("Pipe not available or closed: %d", pipe); - msg.type = NACK; - break; - } - - /* - * Cleanup the queue to be sure there are not - * outstanding messages - */ - empty_pipe(pipe); - - ret = write(pipe, &msg, sizeof(msg)); - if (ret != sizeof(msg)) { - ERROR("Writing to pipe failed !"); - msg.type = NACK; - break; - } - - /* - * Do not block forever for an answer - * This would block the whole thread - * If a message requires more time, - * the destination process should sent an - * answer back explaining this in the payload - */ - FD_ZERO(&pipefds); - FD_SET(pipe, &pipefds); - tv.tv_usec = 0; - if (!msg.data.procmsg.timeout) - tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT; - else - tv.tv_sec = msg.data.procmsg.timeout; - ret = select(pipe + 1, &pipefds, NULL, NULL, &tv); - - /* - * If there is an error or timeout, - * send a NACK back - */ - if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) { + subprocess_msg = (struct subprocess_msg_elem*)malloc( + sizeof(struct subprocess_msg_elem)); + if (subprocess_msg == NULL) { + ERROR("Cannot handle subprocess IPC because of OOM."); msg.type = NACK; break; } - ret = read(pipe, &msg, sizeof(msg)); - if (ret != sizeof(msg)) { - ERROR("Reading from pipe failed !"); - msg.type = NACK; - break; - } + subprocess_msg->client = ctrlconnfd; + subprocess_msg->message = msg; + pthread_mutex_lock(&subprocess_msg_lock); + SIMPLEQ_INSERT_TAIL(&subprocess_messages, subprocess_msg, next); + pthread_cond_signal(&subprocess_wkup); + pthread_mutex_unlock(&subprocess_msg_lock); /* * ACK/NACK was inserted by the called SUBPROCESS * It should not be touched here @@ -423,7 +492,7 @@ void *network_thread (void *data) strncpy(msg.data.status.desc, notification->msg, sizeof(msg.data.status.desc) - 1); #ifdef DEBUG_IPC - printf("GET STATUS: %s\n", msg.data.status.desc); + DEBUG("GET STATUS: %s\n", msg.data.status.desc); #endif msg.data.status.current = notification->status; msg.data.status.error = notification->error; @@ -453,12 +522,15 @@ void *network_thread (void *data) msg.type = NACK; sprintf(msg.data.msg, "Wrong request: aborting"); } - ret = write(ctrlconnfd, &msg, sizeof(msg)); - if (ret < 0) - printf("Error write on socket ctrl"); - if (msg.type != ACK) - close(ctrlconnfd); + if (msg.type == ACK || msg.type == NACK || msg.type == GET_STATUS) { + ret = write(ctrlconnfd, &msg, sizeof(msg)); + if (ret < 0) + ERROR("Error write on socket ctrl"); + + if (msg.type != ACK) + close(ctrlconnfd); + } pthread_mutex_unlock(&stream_mutex); } while (1); return (void *)0;