scala - Throttling messages from RabbitMQ using RxJava -


i'm using rxjava pull out values rabbitmq. here's code:

val amqp = new rabbitqueue("queuename") val obs = observable[string](subscr => while (true) subscr onnext amqp.next) obs subscribe (   s => println(s"string rabbitmq: $s"),    error => amqp.connection.close ) 

it works fine have requirement value should pulled @ once per second while values should preserved (so debounce won't since drops intermediary values).

it should amqp.next blocks thread we're waiting... (rabbitmq got 2 messages in queue) pulled 1st message... wait 1 second... pulled 2nd message... wait indefinitely next message...

how can achieve using rx methods?

one option may use schedulers api in combination publishsubject observable.

unfortunately, don't know scala syntax here java version should able convert:

rabbitqueue amqp = new rabbitqueue("queuename"); scheduler.worker worker = schedulers.newthread().createworker(); publishsubject<string> obs = publishsubject.create(); worker.scheduleperiodically(new action0() {     @override     public void call() {         obs.onnext(amqp.next);     } }, 1, 1, timeunit.seconds); 

your subscribe code above remain same:

obs subscribe (   s => println(s"string rabbitmq: $s"),    error => amqp.connection.close ) 

Comments

Popular posts from this blog

javascript - Jquery show_hide, what to add in order to make the page scroll to the bottom of the hidden field once button is clicked -

python - Django-cities exits with "killed" -

python - How to get a widget position inside it's layout in Kivy? -