diff mbox series

parsearchive: parallel processing

Message ID 20191018111547.4415-1-dja@axtens.net
State Superseded
Headers show
Series parsearchive: parallel processing | expand

Commit Message

Daniel Axtens Oct. 18, 2019, 11:15 a.m. UTC
Allow parsing to be done in parallel with -jN. Handy for testing
the code against parallelism problems, also faster!

Issue: #241
Signed-off-by: Daniel Axtens <dja@axtens.net>

---

I wrote this ages ago when I was working on our racy series parsing.
I wanted to pull it out to test the patch to prevent split series.
Not fussy about whether we actually merge it or not.
---
 patchwork/management/commands/parsearchive.py | 132 +++++++++++++-----
 1 file changed, 95 insertions(+), 37 deletions(-)

Comments

Daniel Axtens Oct. 18, 2019, 11:32 a.m. UTC | #1
Daniel Axtens <dja@axtens.net> writes:

> Allow parsing to be done in parallel with -jN. Handy for testing
> the code against parallelism problems, also faster!
>
> Issue: #241
> Signed-off-by: Daniel Axtens <dja@axtens.net>
>
> ---
>
> I wrote this ages ago when I was working on our racy series parsing.
> I wanted to pull it out to test the patch to prevent split series.
> Not fussy about whether we actually merge it or not.

On reflection, we probably don't want to merge this, or if we do we want
some big noisy warning messages. Parsing in parallel is going to lose
comments - if the patch hasn't been parsed, the comment will just be
dropped. Out-of-order works fine for patches - by design - but the
comments are only saved if there's a patch to attach to.

Regards,
Daniel

> ---
>  patchwork/management/commands/parsearchive.py | 132 +++++++++++++-----
>  1 file changed, 95 insertions(+), 37 deletions(-)
>
> diff --git patchwork/management/commands/parsearchive.py patchwork/management/commands/parsearchive.py
> index b7f1ea7313c2..7ffc23ec8507 100644
> --- patchwork/management/commands/parsearchive.py
> +++ patchwork/management/commands/parsearchive.py
> @@ -7,7 +7,9 @@ import logging
>  import mailbox
>  import os
>  import sys
> +import multiprocessing
>  
> +from django import db
>  from django.core.management.base import BaseCommand
>  
>  from patchwork import models
> @@ -16,6 +18,16 @@ from patchwork.parser import DuplicateMailError
>  
>  logger = logging.getLogger(__name__)
>  
> +TYPE_CONVERSION = {
> +    models.Patch: 0,
> +    models.CoverLetter: 1,
> +    models.Comment: 2,
> +}
> +DUPLICATE = 3
> +DROPPED = 4
> +ERROR = 5
> +NUM_TYPES = 6
> +
>  
>  class Command(BaseCommand):
>      help = 'Parse an mbox archive file and store any patches/comments found.'
> @@ -28,17 +40,12 @@ class Command(BaseCommand):
>              '--list-id',
>              help='mailing list ID. If not supplied, this will be '
>              'extracted from the mail headers.')
> +        parser.add_argument(
> +            '--jobs', '-j',
> +            help='process the archive in N parallel jobs',
> +            type=int, default=1)
>  
>      def handle(self, *args, **options):
> -        results = {
> -            models.Patch: 0,
> -            models.CoverLetter: 0,
> -            models.Comment: 0,
> -        }
> -        duplicates = 0
> -        dropped = 0
> -        errors = 0
> -
>          verbosity = int(options['verbosity'])
>          if not verbosity:
>              level = logging.CRITICAL
> @@ -53,6 +60,11 @@ class Command(BaseCommand):
>              logger.setLevel(level)
>              logging.getLogger('patchwork.parser').setLevel(level)
>  
> +        jobs = options['jobs']
> +        if jobs < 1:
> +            logger.error('Invalid number of jobs %d, must be at least 1')
> +            sys.exit(1)
> +
>          # TODO(stephenfin): Support passing via stdin
>          path = args and args[0] or options['infile']
>          if not os.path.exists(path):
> @@ -65,8 +77,6 @@ class Command(BaseCommand):
>          else:
>              mbox = mailbox.Maildir(path, create=False)
>  
> -        count = len(mbox)
> -
>          # Iterate through the mbox. This will pick up exceptions that are only
>          # thrown when a broken email is found part way through. Without this
>          # block, we'd get the exception thrown in enumerate(mbox) below, which
> @@ -84,26 +94,35 @@ class Command(BaseCommand):
>              logger.error('Broken mbox/Maildir, aborting')
>              return
>  
> -        logger.info('Parsing %d mails', count)
> -        for i, msg in enumerate(mbox):
> -            try:
> -                obj = parse_mail(msg, options['list_id'])
> -                if obj:
> -                    results[type(obj)] += 1
> -                else:
> -                    dropped += 1
> -            except DuplicateMailError as exc:
> -                duplicates += 1
> -                logger.warning('Duplicate mail for message ID %s', exc.msgid)
> -            except (ValueError, Exception) as exc:
> -                errors += 1
> -                logger.warning('Invalid mail: %s', repr(exc))
> -
> -            if verbosity < 3 and (i % 10) == 0:
> -                self.stdout.write('%06d/%06d\r' % (i, count), ending='')
> -                self.stdout.flush()
> +        # we need to close the db connection so each process gets its own
> +        # see e.g. https://stackoverflow.com/a/10684672
> +        db.connections.close_all()
> +
> +        threads = []
> +        processed = multiprocessing.Value('i')
> +        results = multiprocessing.Array('i', NUM_TYPES)
> +        for job in range(jobs):
> +            thread = multiprocessing.Process(target=self.parse_mbox,
> +                                             kwargs={
> +                                                 'path': path,
> +                                                 'list_id': options['list_id'],
> +                                                 'job': job,
> +                                                 'num_jobs': jobs,
> +                                                 'processed': processed,
> +                                                 'results': results
> +                                             })
> +            print("starting", thread)
> +            thread.daemon = True  # this makes Ctrl-C work
> +            thread.start()
> +            threads += [thread]
>  
> -        mbox.close()
> +        count = len(mbox)
> +        for thread in threads:
> +            while thread.is_alive():
> +                thread.join(1)
> +                if True or verbosity < 3:
> +                    self.stdout.write('%06d/%06d\r' % (processed.value, count), ending='')
> +                    self.stdout.flush()
>  
>          if not verbosity:
>              return
> @@ -118,11 +137,50 @@ class Command(BaseCommand):
>              '  %(errors)4d errors\n'
>              'Total: %(new)s new entries' % {
>                  'total': count,
> -                'covers': results[models.CoverLetter],
> -                'patches': results[models.Patch],
> -                'comments': results[models.Comment],
> -                'duplicates': duplicates,
> -                'dropped': dropped,
> -                'errors': errors,
> -                'new': count - duplicates - dropped - errors,
> +                'covers': results[TYPE_CONVERSION[models.CoverLetter]],
> +                'patches': results[TYPE_CONVERSION[models.Patch]],
> +                'comments': results[TYPE_CONVERSION[models.Comment]],
> +                'duplicates': results[DUPLICATE],
> +                'dropped': results[DROPPED],
> +                'errors': results[ERROR],
> +                'new': count - results[DUPLICATE] - results[DROPPED] - results[ERROR],
>              })
> +
> +    def parse_mbox(self, path, list_id, job, num_jobs, processed, results):
> +        if os.path.isfile(path):
> +            mbox = mailbox.mbox(path, create=False)
> +        else:
> +            mbox = mailbox.Maildir(path, create=False)
> +
> +        count = len(mbox)
> +
> +        if num_jobs == 1:
> +            logger.info('Parsing %d mails', count)
> +        else:
> +            logger.info('Parsing %d total mails, job %d of %d',
> +                        count, job + 1, num_jobs)
> +        for i, msg in enumerate(mbox):
> +
> +            if i % num_jobs != job:
> +                continue
> +
> +            try:
> +                obj = parse_mail(msg, list_id)
> +                with results.get_lock():
> +                    if obj:
> +                        results[TYPE_CONVERSION[type(obj)]] += 1
> +                    else:
> +                        results[DROPPED] += 1
> +            except DuplicateMailError as exc:
> +                with results.get_lock():
> +                    results[DUPLICATE] += 1
> +                logger.warning('Duplicate mail %d for message ID %s', i, exc.msgid)
> +            except (ValueError, Exception) as exc:
> +                with results.get_lock():
> +                    results[ERROR] += 1
> +                logger.warning('Invalid mail %d: %s', i, repr(exc))
> +
> +            with processed.get_lock():
> +                processed.value += 1
> +
> +        mbox.close()
> -- 
> 2.20.1
diff mbox series

Patch

diff --git patchwork/management/commands/parsearchive.py patchwork/management/commands/parsearchive.py
index b7f1ea7313c2..7ffc23ec8507 100644
--- patchwork/management/commands/parsearchive.py
+++ patchwork/management/commands/parsearchive.py
@@ -7,7 +7,9 @@  import logging
 import mailbox
 import os
 import sys
+import multiprocessing
 
+from django import db
 from django.core.management.base import BaseCommand
 
 from patchwork import models
@@ -16,6 +18,16 @@  from patchwork.parser import DuplicateMailError
 
 logger = logging.getLogger(__name__)
 
+TYPE_CONVERSION = {
+    models.Patch: 0,
+    models.CoverLetter: 1,
+    models.Comment: 2,
+}
+DUPLICATE = 3
+DROPPED = 4
+ERROR = 5
+NUM_TYPES = 6
+
 
 class Command(BaseCommand):
     help = 'Parse an mbox archive file and store any patches/comments found.'
@@ -28,17 +40,12 @@  class Command(BaseCommand):
             '--list-id',
             help='mailing list ID. If not supplied, this will be '
             'extracted from the mail headers.')
+        parser.add_argument(
+            '--jobs', '-j',
+            help='process the archive in N parallel jobs',
+            type=int, default=1)
 
     def handle(self, *args, **options):
-        results = {
-            models.Patch: 0,
-            models.CoverLetter: 0,
-            models.Comment: 0,
-        }
-        duplicates = 0
-        dropped = 0
-        errors = 0
-
         verbosity = int(options['verbosity'])
         if not verbosity:
             level = logging.CRITICAL
@@ -53,6 +60,11 @@  class Command(BaseCommand):
             logger.setLevel(level)
             logging.getLogger('patchwork.parser').setLevel(level)
 
+        jobs = options['jobs']
+        if jobs < 1:
+            logger.error('Invalid number of jobs %d, must be at least 1')
+            sys.exit(1)
+
         # TODO(stephenfin): Support passing via stdin
         path = args and args[0] or options['infile']
         if not os.path.exists(path):
@@ -65,8 +77,6 @@  class Command(BaseCommand):
         else:
             mbox = mailbox.Maildir(path, create=False)
 
-        count = len(mbox)
-
         # Iterate through the mbox. This will pick up exceptions that are only
         # thrown when a broken email is found part way through. Without this
         # block, we'd get the exception thrown in enumerate(mbox) below, which
@@ -84,26 +94,35 @@  class Command(BaseCommand):
             logger.error('Broken mbox/Maildir, aborting')
             return
 
-        logger.info('Parsing %d mails', count)
-        for i, msg in enumerate(mbox):
-            try:
-                obj = parse_mail(msg, options['list_id'])
-                if obj:
-                    results[type(obj)] += 1
-                else:
-                    dropped += 1
-            except DuplicateMailError as exc:
-                duplicates += 1
-                logger.warning('Duplicate mail for message ID %s', exc.msgid)
-            except (ValueError, Exception) as exc:
-                errors += 1
-                logger.warning('Invalid mail: %s', repr(exc))
-
-            if verbosity < 3 and (i % 10) == 0:
-                self.stdout.write('%06d/%06d\r' % (i, count), ending='')
-                self.stdout.flush()
+        # we need to close the db connection so each process gets its own
+        # see e.g. https://stackoverflow.com/a/10684672
+        db.connections.close_all()
+
+        threads = []
+        processed = multiprocessing.Value('i')
+        results = multiprocessing.Array('i', NUM_TYPES)
+        for job in range(jobs):
+            thread = multiprocessing.Process(target=self.parse_mbox,
+                                             kwargs={
+                                                 'path': path,
+                                                 'list_id': options['list_id'],
+                                                 'job': job,
+                                                 'num_jobs': jobs,
+                                                 'processed': processed,
+                                                 'results': results
+                                             })
+            print("starting", thread)
+            thread.daemon = True  # this makes Ctrl-C work
+            thread.start()
+            threads += [thread]
 
-        mbox.close()
+        count = len(mbox)
+        for thread in threads:
+            while thread.is_alive():
+                thread.join(1)
+                if True or verbosity < 3:
+                    self.stdout.write('%06d/%06d\r' % (processed.value, count), ending='')
+                    self.stdout.flush()
 
         if not verbosity:
             return
@@ -118,11 +137,50 @@  class Command(BaseCommand):
             '  %(errors)4d errors\n'
             'Total: %(new)s new entries' % {
                 'total': count,
-                'covers': results[models.CoverLetter],
-                'patches': results[models.Patch],
-                'comments': results[models.Comment],
-                'duplicates': duplicates,
-                'dropped': dropped,
-                'errors': errors,
-                'new': count - duplicates - dropped - errors,
+                'covers': results[TYPE_CONVERSION[models.CoverLetter]],
+                'patches': results[TYPE_CONVERSION[models.Patch]],
+                'comments': results[TYPE_CONVERSION[models.Comment]],
+                'duplicates': results[DUPLICATE],
+                'dropped': results[DROPPED],
+                'errors': results[ERROR],
+                'new': count - results[DUPLICATE] - results[DROPPED] - results[ERROR],
             })
+
+    def parse_mbox(self, path, list_id, job, num_jobs, processed, results):
+        if os.path.isfile(path):
+            mbox = mailbox.mbox(path, create=False)
+        else:
+            mbox = mailbox.Maildir(path, create=False)
+
+        count = len(mbox)
+
+        if num_jobs == 1:
+            logger.info('Parsing %d mails', count)
+        else:
+            logger.info('Parsing %d total mails, job %d of %d',
+                        count, job + 1, num_jobs)
+        for i, msg in enumerate(mbox):
+
+            if i % num_jobs != job:
+                continue
+
+            try:
+                obj = parse_mail(msg, list_id)
+                with results.get_lock():
+                    if obj:
+                        results[TYPE_CONVERSION[type(obj)]] += 1
+                    else:
+                        results[DROPPED] += 1
+            except DuplicateMailError as exc:
+                with results.get_lock():
+                    results[DUPLICATE] += 1
+                logger.warning('Duplicate mail %d for message ID %s', i, exc.msgid)
+            except (ValueError, Exception) as exc:
+                with results.get_lock():
+                    results[ERROR] += 1
+                logger.warning('Invalid mail %d: %s', i, repr(exc))
+
+            with processed.get_lock():
+                processed.value += 1
+
+        mbox.close()