multithreading - Python multithreaded ZeroMQ REQ-REP -


i looking implement req-rep pattern python , zeromq using multithreading.

with python, can create new thread when new client connects server. thread handle communications particular client, until socket closed:

# thread handle client's requests class clientthread(threading.thread):     # implementation...     def __init__(self, socket):         threading.thread.__init__(self)         self.socket = socket     def run(self):         while keep_alive:             # thread can receive client             data = self.socket.recv(1024)             # processing...             # , send reply             self.socket.send(reply)  while true:     # server accepts incoming connection     conn, addr = sock.accept()     # , creates new thread handle client's requests     newthread = clientthread(conn)     # starting thread     newthread.start() 

is possible same[*] using zeromq? have seen examples of multithreading zeromq , python, in of them pool of threads created fixed number of threads @ beginning , seems more oriented load balancing.

[*] notice want keep connection between client , thread alive, thread expecting multiple req messages client , store information must kept between messages (i.e.: variable counter increments value on new req message; each thread has own variable , no other client should ever able access thread). new client = new thread.

yes, zeromq powerfull can-do toolbox

however, major surprise be, zeromq <socket>-s far more structured plain counterparts, use in sample.

{ azmqcontext -> azmqsocket -> abehavioralprimitive }

zeromq builds remarkable, abstraction-rich framework, under hood of "singleton" zmq-context, (and shall remain) thing used "shared".

threads shall not "share" other "derived" objects, less state, there strong distributed-responsibility framework architecture implemented, both in sake of clean-design , high performance & low-latency.

for zmq-socket-s 1 shall rather imagine smarter, layered sub-structure, 1 receives off-loaded worries i/o-activities ( managed inside zmq-context responsibility -- keep-alive issues, timing issues , fair-queue buffering / select-polling issues simply cease visible ... ), 1 sort of formal communication pattern behaviour ( given chosen zmq-socket-type archetype ).

finally

zeromq , nanomsg libraries rather lego-alike projects, empower you, architect & designer, more, 1 typically realises @ beginning.

enter image description here

one can focus on distributed-system behaviour, opposed lose time , energy on solving just-another-socket-messaging-[nightmare].

( worth have look both books pieter hintjens, co-father of zeromq. there find plenty aha!-moments on great subject. )

... , cherry on cake -- of transport-agnostic, universal environment, whether passing messages on inproc://, other on ipc:// , in parallel listening / speaking on tcp:// layers.

edit#12014-08-19 17:00 [utc+0000]

kindly check comments below , further review -- both elementary , advanced -- design-options <trivial-failure-prone>-spin-off processing, <load-balanced>-rep-worker queueing, <scale-able>-distributed processing , <_faul-resilient_mode_>-rep-worker binary-start shaded processing.

no heap of mock-up sloc(s), no single code-sample one-size-fits-all.

this exponentially valid in designing distributed messaging systems.

sorry that.

hurts, true.

"""req/rep modified queue/router/dealer add-on ---------------------------     multithreaded hello world server     author: guillaume aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>  """ import time import threading import zmq  print "zeromq version sanity-check: ", zmq.__version__  def aworker_asroutine( aworker_url, acontext = none ):     """worker routine"""     #context inherited or create new 1 trick------------------------------     acontext = acontext or zmq.context.instance()      # socket talk dispatcher --------------------------------------------------     socket = acontext.socket( zmq.rep )      socket.connect( aworker_url )      while true:          string  = socket.recv()          print( "received request: [ %s ]" % ( string ) )          # 'work' -----------------------------------------------------------         time.sleep(1)          #send reply client, asked --------------------------------------         socket.send( b"world" )  def main():     """server routine"""      url_worker = "inproc://workers"     url_client = "tcp://*:5555"      # prepare our context , sockets ------------------------------------------------     alocalhostcentralcontext = zmq.context.instance()      # socket talk clients ------------------------------------------------------     clients = alocalhostcentralcontext.socket( zmq.router )     clients.bind( url_client )      # socket talk workers ------------------------------------------------------     workers = alocalhostcentralcontext.socket( zmq.dealer )     workers.bind( url_worker )      # --------------------------------------------------------------------||||||||||||--     # launch pool of worker threads --------------< or spin-off 1 in ondemandmode >     in range(5):         thread = threading.thread( target = aworker_asroutine, args = ( url_worker, ) )         thread.start()      zmq.device( zmq.queue, clients, workers )      # ----------------------|||||||||||||||------------------------< fair practice >--     # never here clean anyhow     clients.close()     workers.close()     alocalhostcentralcontext.term()  if __name__ == "__main__":     main() 

Comments

Popular posts from this blog

javascript - Jquery show_hide, what to add in order to make the page scroll to the bottom of the hidden field once button is clicked -

python - Django-cities exits with "killed" -

python - How to get a widget position inside it's layout in Kivy? -