parallel processing - Python multiprocessing synchronization -
i have function "function" want call 10 times using 2 times 5 cpus multiprocessing.
therefore need way synchronize processes described in code below.
is possible without using multiprocessing pool? strange errors if (for example "unboundlocalerror: local variable 'fd' referenced before assignment" (i don't have such variable)). processes seem terminate randomly.
if possible without pool. thanks!
number_of_cpus = 5 number_of_iterations = 2 # array processes. processing_jobs = [] # start 5 processes 2 times. iteration in range(0, number_of_iterations): # todo synchronize here # start 5 processes @ time. cpu_number in range(0, number_of_cpus): # calculate offset current function call. file_offset = iteration * cpu_number * number_of_files_per_process p = multiprocessing.process(target=function, args=(file_offset,)) processing_jobs.append(p) p.start() # todo synchronize here
this (anonymized) traceback of errors when run code in pool:
process process-5: traceback (most recent call last): file "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() file "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) file "python_code_3.py", line 88, in function_x xyz = python_code_1.function_y(args) file "/python_code_1.py", line 254, in __init__ self.wk = file.wk(filename) file "/python_code_2.py", line 1754, in __init__ self.__parse__(name, data, fast_load) file "/python_code_2.py", line 1810, in __parse__ fd.close() unboundlocalerror: local variable 'fd' referenced before assignment
most of processes crash not of them. more of them seem crash when increase number of processes. thought might due memory limitations...
here's how can synchronization you're looking without using pool:
import multiprocessing def function(arg): print ("got arg %s" % arg) if __name__ == "__main__": number_of_cpus = 5 number_of_iterations = 2 # array processes. processing_jobs = [] # start 5 processes 2 times. iteration in range(1, number_of_iterations+1): # start range 1 don't multiply zero. # start 5 processes @ time. cpu_number in range(1, number_of_cpus+1): # calculate offset current function call. file_offset = iteration * cpu_number * number_of_files_per_process p = multiprocessing.process(target=function, args=(file_offset,)) processing_jobs.append(p) p.start() # wait processes finish. proc in processing_jobs: proc.join() # empty active job list. del processing_jobs[:] # write file here print("writing")
here pool
:
import multiprocessing def function(arg): print ("got arg %s" % arg) if __name__ == "__main__": number_of_cpus = 5 number_of_iterations = 2 pool = multiprocessing.pool(number_of_cpus) in range(1, number_of_iterations+1): # start range 1 don't multiply 0 file_offsets = [number_of_files_per_process * * cpu_num cpu_num in range(1, number_of_cpus+1)] pool.map(function, file_offsets) print("writing") # write file here
as can see, pool
solution nicer.
this doesn't solve traceback problem, though. it's hard me how fix without understanding what's causing that. may need use multiprocessing.lock
synchronize access resource.
Comments
Post a Comment