scala - ForkJoinPool getting blocked with just two workers -
i have code not performance-sensitive , trying make stacks easier follow using fewer futures. resulted in code similar following:
val fut = future { val r = future.traverse(ips) { ip => val httpresponse: future[httpresponse] = asynchttpclient.exec(req) httpresponse.andthen { case x => logger.info(s"received response here: $x") } httpresponse.map(r => (ip, r)) } r.andthen { case x => logger.info(s"final result: $x") } await.result(r, 10 seconds) } fut.andthen { x => logger.info(s"finished $x") } logger.info("here nonblocking") as expected internal logging in http client shows response returns immediately, callbacks executing logger.info(s"received response here: $x") , logger.info(s"final result: $x") not execute until after await.result(r, 10 seconds) times out. looking @ log output, includes thread ids, callbacks being executed in same thread (forkjoinpool-1-worker-3) awaiting result, creating deadlock. understanding executioncontext.global create threads on demand when ran out of threads. not case? there appears 2 threads global fork join pool producing output in logs (1 , 3). can explain this?
as fixes, know perhaps best way separate blocking , nonblocking work different thread pools, hoping avoid bookkeeping using dynamically sized thread pool. there better solution?
if want grow pool (temporarily) when threads blocked, use concurrent.blocking. here, you've used threads, doing i/o , scheduling more work map , andthen (the result of don't use).
more info: "final result" expected execute after traverse, normal.
example blocking, although there must q&a it:
scala> import concurrent._ ; import executioncontext.implicits._ scala> val = 1 100 tolist scala> def db = s"${thread.currentthread}" db: string scala> def f(i: int) = future { println(db) ; thread.sleep(1000l) ; 2 * } f: (i: int)scala.concurrent.future[int] scala> future.traverse(is)(f _) thread[forkjoinpool-1-worker-13,5,main] thread[forkjoinpool-1-worker-7,5,main] thread[forkjoinpool-1-worker-9,5,main] thread[forkjoinpool-1-worker-3,5,main] thread[forkjoinpool-1-worker-5,5,main] thread[forkjoinpool-1-worker-1,5,main] thread[forkjoinpool-1-worker-15,5,main] thread[forkjoinpool-1-worker-11,5,main] res0: scala.concurrent.future[list[int]] = scala.concurrent.impl.promise$defaultpromise@3a4b0e5d [etc, n @ time] versus overly parallel:
scala> def f(i: int) = future { blocking { println(db) ; thread.sleep(1000l) ; 2 * }} f: (i: int)scala.concurrent.future[int] scala> future.traverse(is)(f _) thread[forkjoinpool-1-worker-13,5,main] thread[forkjoinpool-1-worker-3,5,main] thread[forkjoinpool-1-worker-1,5,main] res1: scala.concurrent.future[list[int]] = scala.concurrent.impl.promise$defaultpromise@759d81f3 thread[forkjoinpool-1-worker-7,5,main] thread[forkjoinpool-1-worker-25,5,main] thread[forkjoinpool-1-worker-29,5,main] thread[forkjoinpool-1-worker-19,5,main] scala> thread[forkjoinpool-1-worker-23,5,main] thread[forkjoinpool-1-worker-27,5,main] thread[forkjoinpool-1-worker-21,5,main] thread[forkjoinpool-1-worker-31,5,main] thread[forkjoinpool-1-worker-17,5,main] thread[forkjoinpool-1-worker-49,5,main] thread[forkjoinpool-1-worker-45,5,main] thread[forkjoinpool-1-worker-59,5,main] thread[forkjoinpool-1-worker-43,5,main] thread[forkjoinpool-1-worker-57,5,main] thread[forkjoinpool-1-worker-37,5,main] thread[forkjoinpool-1-worker-51,5,main] thread[forkjoinpool-1-worker-35,5,main] thread[forkjoinpool-1-worker-53,5,main] thread[forkjoinpool-1-worker-63,5,main] thread[forkjoinpool-1-worker-47,5,main]
Comments
Post a Comment