scala - ForkJoinPool getting blocked with just two workers -


i have code not performance-sensitive , trying make stacks easier follow using fewer futures. resulted in code similar following:

  val fut = future {     val r = future.traverse(ips) { ip =>       val httpresponse: future[httpresponse] = asynchttpclient.exec(req)       httpresponse.andthen {         case x => logger.info(s"received response here: $x")       }       httpresponse.map(r => (ip, r))     }     r.andthen { case x => logger.info(s"final result: $x") }     await.result(r, 10 seconds)   }   fut.andthen { x => logger.info(s"finished $x") }   logger.info("here nonblocking") 

as expected internal logging in http client shows response returns immediately, callbacks executing logger.info(s"received response here: $x") , logger.info(s"final result: $x") not execute until after await.result(r, 10 seconds) times out. looking @ log output, includes thread ids, callbacks being executed in same thread (forkjoinpool-1-worker-3) awaiting result, creating deadlock. understanding executioncontext.global create threads on demand when ran out of threads. not case? there appears 2 threads global fork join pool producing output in logs (1 , 3). can explain this?

as fixes, know perhaps best way separate blocking , nonblocking work different thread pools, hoping avoid bookkeeping using dynamically sized thread pool. there better solution?

if want grow pool (temporarily) when threads blocked, use concurrent.blocking. here, you've used threads, doing i/o , scheduling more work map , andthen (the result of don't use).

more info: "final result" expected execute after traverse, normal.

example blocking, although there must q&a it:

scala> import concurrent._ ; import executioncontext.implicits._  scala> val = 1 100 tolist scala> def db = s"${thread.currentthread}" db: string  scala> def f(i: int) = future { println(db) ; thread.sleep(1000l) ; 2 * } f: (i: int)scala.concurrent.future[int]  scala> future.traverse(is)(f _) thread[forkjoinpool-1-worker-13,5,main] thread[forkjoinpool-1-worker-7,5,main] thread[forkjoinpool-1-worker-9,5,main] thread[forkjoinpool-1-worker-3,5,main] thread[forkjoinpool-1-worker-5,5,main] thread[forkjoinpool-1-worker-1,5,main] thread[forkjoinpool-1-worker-15,5,main] thread[forkjoinpool-1-worker-11,5,main] res0: scala.concurrent.future[list[int]] = scala.concurrent.impl.promise$defaultpromise@3a4b0e5d [etc, n @ time] 

versus overly parallel:

scala> def f(i: int) = future { blocking { println(db) ; thread.sleep(1000l) ; 2 * }} f: (i: int)scala.concurrent.future[int]  scala> future.traverse(is)(f _) thread[forkjoinpool-1-worker-13,5,main] thread[forkjoinpool-1-worker-3,5,main] thread[forkjoinpool-1-worker-1,5,main] res1: scala.concurrent.future[list[int]] = scala.concurrent.impl.promise$defaultpromise@759d81f3 thread[forkjoinpool-1-worker-7,5,main] thread[forkjoinpool-1-worker-25,5,main] thread[forkjoinpool-1-worker-29,5,main] thread[forkjoinpool-1-worker-19,5,main]  scala> thread[forkjoinpool-1-worker-23,5,main] thread[forkjoinpool-1-worker-27,5,main] thread[forkjoinpool-1-worker-21,5,main] thread[forkjoinpool-1-worker-31,5,main] thread[forkjoinpool-1-worker-17,5,main] thread[forkjoinpool-1-worker-49,5,main] thread[forkjoinpool-1-worker-45,5,main] thread[forkjoinpool-1-worker-59,5,main] thread[forkjoinpool-1-worker-43,5,main] thread[forkjoinpool-1-worker-57,5,main] thread[forkjoinpool-1-worker-37,5,main] thread[forkjoinpool-1-worker-51,5,main] thread[forkjoinpool-1-worker-35,5,main] thread[forkjoinpool-1-worker-53,5,main] thread[forkjoinpool-1-worker-63,5,main] thread[forkjoinpool-1-worker-47,5,main] 

Comments

Popular posts from this blog

java - How to specify maven bin in eclipse maven plugin? -

single sign on - Logging into Plone site with credentials passed through HTTP -

php - Why does AJAX not process login form? -