[v3] parsearchive: parallel processing
diff mbox series

Message ID 20191206015953.10324-1-dja@axtens.net
State New
Headers show
Series
  • [v3] parsearchive: parallel processing
Related show

Commit Message

Daniel Axtens Dec. 6, 2019, 1:59 a.m. UTC
Allow parsing to be done in parallel with -jN. Handy for testing
the code against parallelism problems, also faster!

Issue: #241
Signed-off-by: Daniel Axtens <dja@axtens.net>

---

v2: i was having problems with one thread 'running ahead' and merging
unrelated spins of poorly threaded series. This gives a 100 message
limit to how far any one thread can 'run ahead' of any other. 100 is
arbitrary, it's a speed/accuracy trade off.

v3: make the locking a bit clearer/more correct/avoid RMW issues, derp.
I still think it's probably not worth merging, but I want to have the
code out there so people can use it if they want, esp for development.

I'm still trying to hammer out a series parsing change that doesn't
involve arbitrary retries. I think I'm getting somewhere.
---
 patchwork/management/commands/parsearchive.py | 147 +++++++++++++-----
 1 file changed, 110 insertions(+), 37 deletions(-)

Patch
diff mbox series

diff --git a/patchwork/management/commands/parsearchive.py b/patchwork/management/commands/parsearchive.py
index b7f1ea7313c2..45e942034812 100644
--- a/patchwork/management/commands/parsearchive.py
+++ b/patchwork/management/commands/parsearchive.py
@@ -7,7 +7,9 @@  import logging
 import mailbox
 import os
 import sys
+import multiprocessing
 
+from django import db
 from django.core.management.base import BaseCommand
 
 from patchwork import models
@@ -16,6 +18,17 @@  from patchwork.parser import DuplicateMailError
 
 logger = logging.getLogger(__name__)
 
+TYPE_CONVERSION = {
+    models.Patch: 0,
+    models.CoverLetter: 1,
+    models.Comment: 2,
+}
+DUPLICATE = 3
+DROPPED = 4
+ERROR = 5
+NUM_TYPES = 6
+
+RUN_AHEAD_LIMIT = 100
 
 class Command(BaseCommand):
     help = 'Parse an mbox archive file and store any patches/comments found.'
@@ -28,17 +41,12 @@  class Command(BaseCommand):
             '--list-id',
             help='mailing list ID. If not supplied, this will be '
             'extracted from the mail headers.')
+        parser.add_argument(
+            '--jobs', '-j',
+            help='process the archive in N parallel jobs',
+            type=int, default=1)
 
     def handle(self, *args, **options):
-        results = {
-            models.Patch: 0,
-            models.CoverLetter: 0,
-            models.Comment: 0,
-        }
-        duplicates = 0
-        dropped = 0
-        errors = 0
-
         verbosity = int(options['verbosity'])
         if not verbosity:
             level = logging.CRITICAL
@@ -53,6 +61,11 @@  class Command(BaseCommand):
             logger.setLevel(level)
             logging.getLogger('patchwork.parser').setLevel(level)
 
+        jobs = options['jobs']
+        if jobs < 1:
+            logger.error('Invalid number of jobs %d, must be at least 1')
+            sys.exit(1)
+
         # TODO(stephenfin): Support passing via stdin
         path = args and args[0] or options['infile']
         if not os.path.exists(path):
@@ -65,8 +78,6 @@  class Command(BaseCommand):
         else:
             mbox = mailbox.Maildir(path, create=False)
 
-        count = len(mbox)
-
         # Iterate through the mbox. This will pick up exceptions that are only
         # thrown when a broken email is found part way through. Without this
         # block, we'd get the exception thrown in enumerate(mbox) below, which
@@ -84,26 +95,39 @@  class Command(BaseCommand):
             logger.error('Broken mbox/Maildir, aborting')
             return
 
-        logger.info('Parsing %d mails', count)
-        for i, msg in enumerate(mbox):
-            try:
-                obj = parse_mail(msg, options['list_id'])
-                if obj:
-                    results[type(obj)] += 1
-                else:
-                    dropped += 1
-            except DuplicateMailError as exc:
-                duplicates += 1
-                logger.warning('Duplicate mail for message ID %s', exc.msgid)
-            except (ValueError, Exception) as exc:
-                errors += 1
-                logger.warning('Invalid mail: %s', repr(exc))
+        # we need to close the db connection so each process gets its own
+        # see e.g. https://stackoverflow.com/a/10684672
+        db.connections.close_all()
+
+        threads = []
+        processed = multiprocessing.Value('i')
+        results = multiprocessing.Array('i', NUM_TYPES)
+        run_ahead_barrier = multiprocessing.Condition()
+        latest_msg = multiprocessing.Value('i')
+        for job in range(jobs):
+            thread = multiprocessing.Process(target=self.parse_mbox,
+                                             kwargs={
+                                                 'path': path,
+                                                 'list_id': options['list_id'],
+                                                 'job': job,
+                                                 'num_jobs': jobs,
+                                                 'processed': processed,
+                                                 'results': results,
+                                                 'run_ahead_barrier': run_ahead_barrier,
+                                                 'latest_msg': latest_msg,
+                                             })
+            print("starting", thread)
+            thread.daemon = True  # this makes Ctrl-C work
+            thread.start()
+            threads += [thread]
 
-            if verbosity < 3 and (i % 10) == 0:
-                self.stdout.write('%06d/%06d\r' % (i, count), ending='')
-                self.stdout.flush()
-
-        mbox.close()
+        count = len(mbox)
+        for thread in threads:
+            while thread.is_alive():
+                thread.join(1)
+                if True or verbosity < 3:
+                    self.stdout.write('%06d/%06d\r' % (processed.value, count), ending='')
+                    self.stdout.flush()
 
         if not verbosity:
             return
@@ -118,11 +142,60 @@  class Command(BaseCommand):
             '  %(errors)4d errors\n'
             'Total: %(new)s new entries' % {
                 'total': count,
-                'covers': results[models.CoverLetter],
-                'patches': results[models.Patch],
-                'comments': results[models.Comment],
-                'duplicates': duplicates,
-                'dropped': dropped,
-                'errors': errors,
-                'new': count - duplicates - dropped - errors,
+                'covers': results[TYPE_CONVERSION[models.CoverLetter]],
+                'patches': results[TYPE_CONVERSION[models.Patch]],
+                'comments': results[TYPE_CONVERSION[models.Comment]],
+                'duplicates': results[DUPLICATE],
+                'dropped': results[DROPPED],
+                'errors': results[ERROR],
+                'new': count - results[DUPLICATE] - results[DROPPED] - results[ERROR],
             })
+
+    def parse_mbox(self, path, list_id, job, num_jobs, processed, results,
+                   run_ahead_barrier, latest_msg):
+        if os.path.isfile(path):
+            mbox = mailbox.mbox(path, create=False)
+        else:
+            mbox = mailbox.Maildir(path, create=False)
+
+        count = len(mbox)
+
+        if num_jobs == 1:
+            logger.info('Parsing %d mails', count)
+        else:
+            logger.info('Parsing %d total mails, job %d of %d',
+                        count, job + 1, num_jobs)
+        for i, msg in enumerate(mbox):
+
+            if i % num_jobs != job:
+                continue
+
+            with run_ahead_barrier:
+                run_ahead_barrier.wait_for(lambda: i - latest_msg.value <= RUN_AHEAD_LIMIT)
+
+            try:
+                obj = parse_mail(msg, list_id)
+                with results.get_lock():
+                    if obj:
+                        results[TYPE_CONVERSION[type(obj)]] += 1
+                    else:
+                        results[DROPPED] += 1
+            except DuplicateMailError as exc:
+                with results.get_lock():
+                    results[DUPLICATE] += 1
+                logger.warning('Duplicate mail %d for message ID %s', i, exc.msgid)
+            except (ValueError, Exception) as exc:
+                with results.get_lock():
+                    results[ERROR] += 1
+                logger.warning('Invalid mail %d: %s', i, repr(exc))
+
+            with processed.get_lock():
+                processed.value += 1
+
+            with run_ahead_barrier:
+                with latest_msg.get_lock():
+                    if i > latest_msg.value:
+                        latest_msg.value = i
+                        run_ahead_barrier.notify_all()
+
+        mbox.close()