python - multi-threaded script hangs at the end -
hello: trying script work. depending on number of users (the example below shows 3 can increased), script not exit. jobs done script hangs without exiting. think approach existing while true in worker problem, not know of alternative. ideas?
import datetime, logging, os.path, queue, random, threading, time script = os.path.basename(__file__) logging.basicconfig(level=logging.debug, format="%(asctime)-4s %(thread)6s %(message)s", datefmt="%m-%d %h:%m:%s", filename="%s_%s.log"%(script[:script.find(".")],datetime.datetime.today().strftime("%y%m%d-%h%m%s"))) class user(object): def __init__(self, id, ndelay, mind, maxd): self.id = id self.numdelay = ndelay #number of delays self.mind = mind #min delay self.maxd = maxd #max delay self.currdelaynum = 0 #index next delay def hasdelay(self): if self.currdelaynum >= 0 , self.currdelaynum < self.numdelay: return true def runnextdelay(self): delay = round(self.mind + random.random()*(self.maxd - self.mind)) logging.info("%s beg (delay=%d)"%(self.id,delay)) time.sleep(delay) logging.info("%s end"%self.id) self.currdelaynum += 1 def worker(unext,udone): while true: if unext.qsize() > 0: m = unext.get() users_all[m].runnextdelay() if users_all[m].hasdelay(): unext.put(m) else: udone.put(m) else: if udone.qsize() >= len(users_all): break if __name__=='__main__': random.seed(10) #global users_all users_all = list() users_all.append(user("aa",2,3,9)) users_all.append(user("bb",3,2,4)) users_all.append(user("cc",1,4,5)) users_next = queue.queue() users_done = queue.queue() n in range(len(users_all)): users_next.put(n) threads = [threading.thread(target=worker, args=(users_next,users_done)) n in range(2)] t in threads: t.start() t in threads: t.join() most multithreaded python examples have queue of jobs known upfront. writing script tests response times queries running in parallel on database. make above example self-contained, have replace odbc query part of sleep. appreciate comments better implementation.
updated version based on comments
def worker(unext): while true: try: m = unext.get_nowait() users_all[m].runnextdelay() if users_all[m].hasdelay(): unext.put(m) except queue.empty: break
as commented univerio, there race conditions. in general, when working objects shared between multiple threads ask question, happen if thread interrupted @ point , thread allowed run? case outlined univerio qsize() call may return non-zero in thread a, thread b runs , pulls item off same queue. time thread runs again perform get() assumption there item in queue wrong, , get() may block.
here's untested code may used guide final implementation:
def worker(unext, udone): while true: try: m = unext.get_nowait() users_all[m].runnextdelay() if users_all[m].hasdelay(): unext.put(m) else: udone.put(m) except queue.queue.empty: if udone.qsize() >= len(users_all): break this still not ideal implementation since when unext queue empty other threads have not finished processing, while loop spinning furiously in of threads, waiting last thread finish.
it may better have threads work , exit when there no more work left, , have main thread wait udone.qsize() >= len(users_all) condition become true.
Comments
Post a Comment