multithreading - Python multithreaded ZeroMQ REQ-REP -
i looking implement req-rep pattern python , zeromq using multithreading.
with python, can create new thread when new client connects server. thread handle communications particular client, until socket closed:
# thread handle client's requests class clientthread(threading.thread): # implementation... def __init__(self, socket): threading.thread.__init__(self) self.socket = socket def run(self): while keep_alive: # thread can receive client data = self.socket.recv(1024) # processing... # , send reply self.socket.send(reply) while true: # server accepts incoming connection conn, addr = sock.accept() # , creates new thread handle client's requests newthread = clientthread(conn) # starting thread newthread.start()
is possible same[*] using zeromq? have seen examples of multithreading zeromq , python, in of them pool of threads created fixed number of threads @ beginning , seems more oriented load balancing.
[*] notice want keep connection between client , thread alive, thread expecting multiple req messages client , store information must kept between messages (i.e.: variable counter increments value on new req message; each thread has own variable , no other client should ever able access thread). new client = new thread.
yes, zeromq powerfull can-do toolbox
however, major surprise be, zeromq <socket>-s far more structured plain counterparts, use in sample.
{ azmqcontext -> azmqsocket -> abehavioralprimitive }
zeromq builds remarkable, abstraction-rich framework, under hood of "singleton" zmq-context
, (and shall remain) thing used "shared".
threads shall not "share" other "derived" objects, less state, there strong distributed-responsibility framework architecture implemented, both in sake of clean-design , high performance & low-latency.
for zmq-socket
-s 1 shall rather imagine smarter, layered sub-structure, 1 receives off-loaded worries i/o-activities ( managed inside zmq-context
responsibility -- keep-alive issues, timing issues , fair-queue buffering / select-polling issues simply cease visible ... ), 1 sort of formal communication pattern behaviour ( given chosen zmq-socket
-type archetype ).
finally
zeromq , nanomsg libraries rather lego-alike projects, empower you, architect & designer, more, 1 typically realises @ beginning.
one can focus on distributed-system behaviour, opposed lose time , energy on solving just-another-socket-messaging-[nightmare].
( worth have look both books pieter hintjens, co-father of zeromq. there find plenty aha!-moments on great subject. )
... , cherry on cake -- of transport-agnostic, universal environment, whether passing messages on inproc://
, other on ipc://
, in parallel listening / speaking on tcp://
layers.
edit#1
2014-08-19 17:00 [utc+0000]
kindly check comments below , further review -- both elementary , advanced -- design-options <trivial-failure-prone>-spin-off processing, <load-balanced>-rep-worker queueing, <scale-able>-distributed processing , <_faul-resilient_mode_>-rep-worker binary-start shaded processing.
no heap of mock-up sloc(s), no single code-sample one-size-fits-all.
this exponentially valid in designing distributed messaging systems.
sorry that.
hurts, true.
"""req/rep modified queue/router/dealer add-on --------------------------- multithreaded hello world server author: guillaume aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com> """ import time import threading import zmq print "zeromq version sanity-check: ", zmq.__version__ def aworker_asroutine( aworker_url, acontext = none ): """worker routine""" #context inherited or create new 1 trick------------------------------ acontext = acontext or zmq.context.instance() # socket talk dispatcher -------------------------------------------------- socket = acontext.socket( zmq.rep ) socket.connect( aworker_url ) while true: string = socket.recv() print( "received request: [ %s ]" % ( string ) ) # 'work' ----------------------------------------------------------- time.sleep(1) #send reply client, asked -------------------------------------- socket.send( b"world" ) def main(): """server routine""" url_worker = "inproc://workers" url_client = "tcp://*:5555" # prepare our context , sockets ------------------------------------------------ alocalhostcentralcontext = zmq.context.instance() # socket talk clients ------------------------------------------------------ clients = alocalhostcentralcontext.socket( zmq.router ) clients.bind( url_client ) # socket talk workers ------------------------------------------------------ workers = alocalhostcentralcontext.socket( zmq.dealer ) workers.bind( url_worker ) # --------------------------------------------------------------------||||||||||||-- # launch pool of worker threads --------------< or spin-off 1 in ondemandmode > in range(5): thread = threading.thread( target = aworker_asroutine, args = ( url_worker, ) ) thread.start() zmq.device( zmq.queue, clients, workers ) # ----------------------|||||||||||||||------------------------< fair practice >-- # never here clean anyhow clients.close() workers.close() alocalhostcentralcontext.term() if __name__ == "__main__": main()
Comments
Post a Comment