zeromq - Python Multi-Processing Question? -
i have folder 500 input files (total size of files ~ 500[mb]).
i'd write python
script following:
(1) load of input files memory
(2) initializes empty python
list later used ... see bullet (4)
(3) start 15 different (independent) processes: each of these uses same input data [from (1)
] -- yet uses different algorithms processes it, generating different results
(4) i'd independent processes [from step (3)
] store output in same python
list [same list initialized in step (2)
]
once 15 processes have completed run, have one python list
includes results of 15 independent processes.
my question is, possible above efficiently in python
? if so, can provide scheme / sample code illustrates how so?
note #1: running on strong, multi-core server; goal here use processing power while sharing memory {input data
, output list
} among independent processes.
note #2: working in linux
environment
ok whipped using zeromq demonstrate single subscriber multiple publishers. same queues need manage them bit more. zeromq sockets work makes nice things imo.
""" demo of multiple processes doing processing , publishing results common subscriber """ multiprocessing import process class worker(process): def __init__(self, filename, bind): self._filename = filename self._bind = bind super(worker, self).__init__() def run(self): import zmq import time ctx = zmq.context() result_publisher = ctx.socket(zmq.pub) result_publisher.bind(self._bind) time.sleep(1) open(self._filename) my_input: l in my_input.readlines(): result_publisher.send(l) if __name__ == '__main__': import sys import os import zmq #assume every argument first file processed files = sys.argv[1:] # create worker each file processed if exists pass # in bind argument instructing socket communicate via ipc workers = [worker(f, "ipc://%s_%s" % (f, i)) i, f \ in enumerate((x x in files if os.path.exists(x)))] # create subscriber socket ctx = zmq.context() result_subscriber = ctx.socket(zmq.sub) result_subscriber.setsockopt(zmq.subscribe, "") # wire subscriber whatever worker bound w in workers: print w._bind result_subscriber.connect(w._bind) # start workers w in workers: print "starting workers..." w.start() result = [] # read subscriber , add result list long # @ least 1 worker alive while [w w in workers if w.is_alive()]: result.append(result_subscriber.recv()) else: # output result print result
oh , zmq just
$ pip install pyzmq-static
Comments
Post a Comment