From patchwork Fri Dec 6 01:59:53 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Daniel Axtens X-Patchwork-Id: 1204896 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.ozlabs.org (lists.ozlabs.org [IPv6:2401:3900:2:1::3]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 47TbQD4xQfz9sPJ for ; Fri, 6 Dec 2019 13:00:20 +1100 (AEDT) Authentication-Results: ozlabs.org; dmarc=none (p=none dis=none) header.from=axtens.net Authentication-Results: ozlabs.org; dkim=fail reason="signature verification failed" (1024-bit key; unprotected) header.d=axtens.net header.i=@axtens.net header.b="QvgHf9W/"; dkim-atps=neutral Received: from lists.ozlabs.org (lists.ozlabs.org [IPv6:2401:3900:2:1::3]) by lists.ozlabs.org (Postfix) with ESMTP id 47TbQD0g62zDqZw for ; Fri, 6 Dec 2019 13:00:20 +1100 (AEDT) X-Original-To: patchwork@lists.ozlabs.org Delivered-To: patchwork@lists.ozlabs.org Authentication-Results: lists.ozlabs.org; spf=pass (sender SPF authorized) smtp.mailfrom=axtens.net (client-ip=2607:f8b0:4864:20::430; helo=mail-pf1-x430.google.com; envelope-from=dja@axtens.net; receiver=) Authentication-Results: lists.ozlabs.org; dmarc=none (p=none dis=none) header.from=axtens.net Authentication-Results: lists.ozlabs.org; dkim=pass (1024-bit key; unprotected) header.d=axtens.net header.i=@axtens.net header.b="QvgHf9W/"; dkim-atps=neutral Received: from mail-pf1-x430.google.com (mail-pf1-x430.google.com [IPv6:2607:f8b0:4864:20::430]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by lists.ozlabs.org (Postfix) with ESMTPS id 47TbPz2v5JzDqKT for ; Fri, 6 Dec 2019 13:00:04 +1100 (AEDT) Received: by mail-pf1-x430.google.com with SMTP id d199so2492518pfd.11 for ; Thu, 05 Dec 2019 18:00:04 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=axtens.net; s=google; h=from:to:cc:subject:date:message-id:mime-version :content-transfer-encoding; bh=Np/seZ3CgTdkKLbniFA2OCxS92S8sJ/oN+eA5k1MQR0=; b=QvgHf9W/rHal6BgrVYKOM5tgcHrpDwIx9aqBfe6gYD9wop7lOYeoskGfpqTTfciKWn 6w/LtAw/DYq8mgUUyJAeVctsuswGBPMrLl2KK3DmMAZKg6uUqhcs4GofvFzpHQI0NRP+ 7WG/RcvmW1pKz1Aj42i/KYZVlfrNZ94TYAsNI= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:mime-version :content-transfer-encoding; bh=Np/seZ3CgTdkKLbniFA2OCxS92S8sJ/oN+eA5k1MQR0=; b=mDYWI2ydjq8kurmMQEb34djglcEEshSeMVTi9rMAGVscpcVKke2WRp5PQluEjgyNvT WAE36bllTvF9i7w+sRf1cYm+HBerYvWpId+4Xe/qm+bqZ6/65MDNYiixX4ulwAlT+u0G 7c1WtByzpgERXurT9xoYTZ15vZCDtxC4KJN6IvJlqDhjD5LZYRH+61t9E83QISTtdDbr UxI/WbUZ5OOnN+gjCtKCTiA3w2owCws+R+5oYcRE8MtpupAUjPcdJ5ktfnBwRRh9PnnO K3eP91+N0p7VxsDemEW7dHU6NNp3o6816ji+wloKCIdt0LhdJ+osTQzi623DwS3Ng26x O6HA== X-Gm-Message-State: APjAAAWixkEbd1JHa9Y+tf1rnX36aijz6X1riQiyAjuRY1XuMGA9iL2B fG3cPc2W4URsxrHN54A+wtkAGP0S+MY= X-Google-Smtp-Source: APXvYqy2KwGPnaWvO7iVd0ffxGWMsghK8A5LG7v/nJyl2B0aF/Sa7vT/gL5dRbJ1lbo9fQJPhLt1WQ== X-Received: by 2002:a62:f94d:: with SMTP id g13mr12019815pfm.60.1575597600028; Thu, 05 Dec 2019 18:00:00 -0800 (PST) Received: from localhost (2001-44b8-111e-5c00-61b9-031c-bed1-3502.static.ipv6.internode.on.net. [2001:44b8:111e:5c00:61b9:31c:bed1:3502]) by smtp.gmail.com with ESMTPSA id u2sm12719594pgc.19.2019.12.05.17.59.58 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Thu, 05 Dec 2019 17:59:59 -0800 (PST) From: Daniel Axtens To: patchwork@lists.ozlabs.org Subject: [PATCH v3] parsearchive: parallel processing Date: Fri, 6 Dec 2019 12:59:53 +1100 Message-Id: <20191206015953.10324-1-dja@axtens.net> X-Mailer: git-send-email 2.20.1 MIME-Version: 1.0 X-BeenThere: patchwork@lists.ozlabs.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: Patchwork development List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: patchwork-bounces+incoming=patchwork.ozlabs.org@lists.ozlabs.org Sender: "Patchwork" Allow parsing to be done in parallel with -jN. Handy for testing the code against parallelism problems, also faster! Issue: #241 Signed-off-by: Daniel Axtens --- v2: i was having problems with one thread 'running ahead' and merging unrelated spins of poorly threaded series. This gives a 100 message limit to how far any one thread can 'run ahead' of any other. 100 is arbitrary, it's a speed/accuracy trade off. v3: make the locking a bit clearer/more correct/avoid RMW issues, derp. I still think it's probably not worth merging, but I want to have the code out there so people can use it if they want, esp for development. I'm still trying to hammer out a series parsing change that doesn't involve arbitrary retries. I think I'm getting somewhere. --- patchwork/management/commands/parsearchive.py | 147 +++++++++++++----- 1 file changed, 110 insertions(+), 37 deletions(-) diff --git a/patchwork/management/commands/parsearchive.py b/patchwork/management/commands/parsearchive.py index b7f1ea7313c2..45e942034812 100644 --- a/patchwork/management/commands/parsearchive.py +++ b/patchwork/management/commands/parsearchive.py @@ -7,7 +7,9 @@ import logging import mailbox import os import sys +import multiprocessing +from django import db from django.core.management.base import BaseCommand from patchwork import models @@ -16,6 +18,17 @@ from patchwork.parser import DuplicateMailError logger = logging.getLogger(__name__) +TYPE_CONVERSION = { + models.Patch: 0, + models.CoverLetter: 1, + models.Comment: 2, +} +DUPLICATE = 3 +DROPPED = 4 +ERROR = 5 +NUM_TYPES = 6 + +RUN_AHEAD_LIMIT = 100 class Command(BaseCommand): help = 'Parse an mbox archive file and store any patches/comments found.' @@ -28,17 +41,12 @@ class Command(BaseCommand): '--list-id', help='mailing list ID. If not supplied, this will be ' 'extracted from the mail headers.') + parser.add_argument( + '--jobs', '-j', + help='process the archive in N parallel jobs', + type=int, default=1) def handle(self, *args, **options): - results = { - models.Patch: 0, - models.CoverLetter: 0, - models.Comment: 0, - } - duplicates = 0 - dropped = 0 - errors = 0 - verbosity = int(options['verbosity']) if not verbosity: level = logging.CRITICAL @@ -53,6 +61,11 @@ class Command(BaseCommand): logger.setLevel(level) logging.getLogger('patchwork.parser').setLevel(level) + jobs = options['jobs'] + if jobs < 1: + logger.error('Invalid number of jobs %d, must be at least 1') + sys.exit(1) + # TODO(stephenfin): Support passing via stdin path = args and args[0] or options['infile'] if not os.path.exists(path): @@ -65,8 +78,6 @@ class Command(BaseCommand): else: mbox = mailbox.Maildir(path, create=False) - count = len(mbox) - # Iterate through the mbox. This will pick up exceptions that are only # thrown when a broken email is found part way through. Without this # block, we'd get the exception thrown in enumerate(mbox) below, which @@ -84,26 +95,39 @@ class Command(BaseCommand): logger.error('Broken mbox/Maildir, aborting') return - logger.info('Parsing %d mails', count) - for i, msg in enumerate(mbox): - try: - obj = parse_mail(msg, options['list_id']) - if obj: - results[type(obj)] += 1 - else: - dropped += 1 - except DuplicateMailError as exc: - duplicates += 1 - logger.warning('Duplicate mail for message ID %s', exc.msgid) - except (ValueError, Exception) as exc: - errors += 1 - logger.warning('Invalid mail: %s', repr(exc)) + # we need to close the db connection so each process gets its own + # see e.g. https://stackoverflow.com/a/10684672 + db.connections.close_all() + + threads = [] + processed = multiprocessing.Value('i') + results = multiprocessing.Array('i', NUM_TYPES) + run_ahead_barrier = multiprocessing.Condition() + latest_msg = multiprocessing.Value('i') + for job in range(jobs): + thread = multiprocessing.Process(target=self.parse_mbox, + kwargs={ + 'path': path, + 'list_id': options['list_id'], + 'job': job, + 'num_jobs': jobs, + 'processed': processed, + 'results': results, + 'run_ahead_barrier': run_ahead_barrier, + 'latest_msg': latest_msg, + }) + print("starting", thread) + thread.daemon = True # this makes Ctrl-C work + thread.start() + threads += [thread] - if verbosity < 3 and (i % 10) == 0: - self.stdout.write('%06d/%06d\r' % (i, count), ending='') - self.stdout.flush() - - mbox.close() + count = len(mbox) + for thread in threads: + while thread.is_alive(): + thread.join(1) + if True or verbosity < 3: + self.stdout.write('%06d/%06d\r' % (processed.value, count), ending='') + self.stdout.flush() if not verbosity: return @@ -118,11 +142,60 @@ class Command(BaseCommand): ' %(errors)4d errors\n' 'Total: %(new)s new entries' % { 'total': count, - 'covers': results[models.CoverLetter], - 'patches': results[models.Patch], - 'comments': results[models.Comment], - 'duplicates': duplicates, - 'dropped': dropped, - 'errors': errors, - 'new': count - duplicates - dropped - errors, + 'covers': results[TYPE_CONVERSION[models.CoverLetter]], + 'patches': results[TYPE_CONVERSION[models.Patch]], + 'comments': results[TYPE_CONVERSION[models.Comment]], + 'duplicates': results[DUPLICATE], + 'dropped': results[DROPPED], + 'errors': results[ERROR], + 'new': count - results[DUPLICATE] - results[DROPPED] - results[ERROR], }) + + def parse_mbox(self, path, list_id, job, num_jobs, processed, results, + run_ahead_barrier, latest_msg): + if os.path.isfile(path): + mbox = mailbox.mbox(path, create=False) + else: + mbox = mailbox.Maildir(path, create=False) + + count = len(mbox) + + if num_jobs == 1: + logger.info('Parsing %d mails', count) + else: + logger.info('Parsing %d total mails, job %d of %d', + count, job + 1, num_jobs) + for i, msg in enumerate(mbox): + + if i % num_jobs != job: + continue + + with run_ahead_barrier: + run_ahead_barrier.wait_for(lambda: i - latest_msg.value <= RUN_AHEAD_LIMIT) + + try: + obj = parse_mail(msg, list_id) + with results.get_lock(): + if obj: + results[TYPE_CONVERSION[type(obj)]] += 1 + else: + results[DROPPED] += 1 + except DuplicateMailError as exc: + with results.get_lock(): + results[DUPLICATE] += 1 + logger.warning('Duplicate mail %d for message ID %s', i, exc.msgid) + except (ValueError, Exception) as exc: + with results.get_lock(): + results[ERROR] += 1 + logger.warning('Invalid mail %d: %s', i, repr(exc)) + + with processed.get_lock(): + processed.value += 1 + + with run_ahead_barrier: + with latest_msg.get_lock(): + if i > latest_msg.value: + latest_msg.value = i + run_ahead_barrier.notify_all() + + mbox.close()