scala - Throttling messages from RabbitMQ using RxJava -
i'm using rxjava pull out values rabbitmq. here's code:
val amqp = new rabbitqueue("queuename") val obs = observable[string](subscr => while (true) subscr onnext amqp.next) obs subscribe ( s => println(s"string rabbitmq: $s"), error => amqp.connection.close )
it works fine have requirement value should pulled @ once per second while values should preserved (so debounce
won't since drops intermediary values).
it should amqp.next blocks thread we're waiting... (rabbitmq got 2 messages in queue) pulled 1st message... wait 1 second... pulled 2nd message... wait indefinitely next message...
how can achieve using rx
methods?
one option may use schedulers api in combination publishsubject observable.
unfortunately, don't know scala syntax here java version should able convert:
rabbitqueue amqp = new rabbitqueue("queuename"); scheduler.worker worker = schedulers.newthread().createworker(); publishsubject<string> obs = publishsubject.create(); worker.scheduleperiodically(new action0() { @override public void call() { obs.onnext(amqp.next); } }, 1, 1, timeunit.seconds);
your subscribe code above remain same:
obs subscribe ( s => println(s"string rabbitmq: $s"), error => amqp.connection.close )
Comments
Post a Comment