I have recently converted a farily large object oriented python program to use the python standard multiprocessing library. I ran into some minor issues along the way, so I thought I would highlight the issues and how I was able to work around it. The first limitation/issue that I ran into is that I did not want to completely rewrite my program to support queues or semaphores. I had already designed my methods to return the data I wanted, and wanted to just be able to call these methods in threads. In order to work around this I set up a multiprocessing pool. With the pool you can obtain a map of the outputs of all of the threads.

Here is a simple example of using multiprocessing on methods with a return statement.

pool.py

import multiprocessingdef return_method(i):
   return [i, i*2] 
inputs = list(range(5))
pool = multiprocessing.Pool(processes=4) 
outputs = pool.map(return_method, inputs)
pool.close()
pool.join()
print 'outputs: %s' % outputs

Output

$ python pool.py 
outputs: [[0, 0], [1, 2], [2, 4], [3, 6], [4, 8]]

The second issue that I ran into was calling methods that are part of a class. Multiprocessing uses pickle to dispatch the multiprocessing threads, and pickle has a hard time pickling python self, so it doesn’t allow you to thread methods from within a class, since those methods require self as an argument. Since my program was designed using an Object Oriented approach, all of my methods were divided into classes. In order to workaround this I had to create an intermediate method that was outside of a class that would unwrap or unpack the arguments and send them back to the desired method. It is also important that you add self as one of the arguments that gets passed.

Here is an example to show how to use an external unwrapping method.

unwrap.py

import multiprocessingdef unwrap_method(arg, **kwarg):
   return MyClass.return_method(*arg, **kwarg)class MyClass:
   def return_method(self, i):
      return [i, i*3]

   def start_threads(self):
      inputs = list(range(5))
      pool = multiprocessing.Pool(processes=4)
      outputs = pool.map(unwrap_method, zip([self]*len(inputs),inputs))
      pool.close()
      pool.join()
      return outputsm = MyClass()
outputs = m.start_threads()print 'outputs: %s' % outputs

Output

$ python unwrap.py
outputs: [[0, 0], [1, 3], [2, 6], [3, 9], [4, 12]]

The third issue that I ran into was that I had integrated logging into all of my python programs. The logger can’t be pickled, so this meant that the classes couldn’t be reconstructed to be used by multiprocessing. In order to get around this you need to override the __getstate__ and __setstate__ methods to respectively remove and add the logger to/from the class dict.

The following shows an easy way to remove and restore the logger for a class involved in multiprocessing.

logger.py

import multiprocessing
import loggingdef unwrap_method(arg, **kwarg):
   return MyClass.return_method(*arg, **kwarg)class MyClass:   def __init__(self, mode, format):
      self.logger = logging.getLogger('my_class')
      self.MODE = mode
      self.format = format
      self.logger.setLevel(self.MODE)
      console = logging.StreamHandler()
      formatter = logging.Formatter(self.format)
      console.setFormatter(formatter)
      self.logger.addHandler(console)
      self.logger.info('Logger Created')

   def __getstate__(self):
      d = dict(self.__dict__)
      self.logger.info('Logger getting deleted')
      del d['logger']
      return d

   def __setstate__(self, d):
      self.__dict__.update(d)
      self.logger = logging.getLogger('my_class')
      self.logger.setLevel(self.MODE)
      self.format = format
      console = logging.StreamHandler()
      formatter = logging.Formatter(self.format)
      console.setFormatter(formatter)
      self.logger.debug('Logger Recreated')

   def return_method(self, i):
      self.logger.info('Input: %d, Output: %d' , i, i*4)
      return [i, i*4]

   def start_threads(self):
      inputs = list(range(5))
      pool = multiprocessing.Pool(processes=4)
      outputs = pool.map(unwrap_method, zip([self]*len(inputs),inputs))
      pool.close()
      pool.join()
      return outputsformat = '%(levelname)s - %(message)s'
m = MyClass(logging.DEBUG, format)
outputs = m.start_threads()print 'outputs: %s' % outputs

Output

$ python logger.py 
INFO - Logger Created
INFO - Logger getting deleted
INFO - Logger getting deleted
INFO - Logger getting deleted
INFO - Logger getting deleted
INFO - Logger getting deleted
DEBUG - Logger Recreated
INFO - Input: 0, Output: 0
DEBUG - Logger Recreated
INFO - Input: 1, Output: 4
DEBUG - Logger Recreated
INFO - Input: 2, Output: 8
DEBUG - Logger Recreated
INFO - Input: 3, Output: 12
DEBUG - Logger Recreated
INFO - Input: 4, Output: 16
outputs: [[0, 0], [1, 4], [2, 8], [3, 12], [4, 16]]

Hopefully this can help you avoid some of the limitations of the multiprocessing python library. If you know of any other cool tricks or workarounds with multiprocessing, feel free to add them below.

References: