java - How To Have Multiple Subscribers To A Grouped Observable in RxJava? -


this using rxjava version 0.19.6.

outside of groupby operation, 1 can create pipeline described following code to, instance, select record observable based on criteria or select first record meets alternate criteria:

observable<long> observable = observable.interval(1, timeunit.milliseconds).take(10); observable<long> filter1 = observable.filter(new func1<long, boolean>() {     @override     public boolean call(long along) {         return 5 == along % 5;     } }); observable<long> filter2 = observable.filter(new func1<long, boolean>() {     @override     public boolean call(long along) {         return 2 == along % 5;     } }); blockingobservable.from(observable.concat(filter1, filter2).first()).foreach(new action1<long>() {     @override     public void call(long along) {         system.out.println(along);     } }); 

...unfortunately, due restrictions on groupedobservable, appears same kind of procedure not work operating inside of grouped context:

blockingobservable.from(observable.groupby(new func1<long, long>() {     @override     public long call(long along) {         return along % 5;     } }).flatmap(new func1<groupedobservable<long, long>, observable<long>>() {     @override     public observable<long> call(groupedobservable<long, long> in) {         observable<long> filter1 = in.filter(new func1<long, boolean>() {             @override             public boolean call(long along) {                 return 5 == along % 5;             }         });         observable<long> filter2 = in.filter(new func1<long, boolean>() {             @override             public boolean call(long along) {                 return 2 == along % 5;             }         });         return observable.concat(filter1, filter2).first();     } })).foreach(new action1<long>() {     @override     public void call(long along) {         system.out.println(along);     } }); 

...results in multiple subscriber exception (exception in thread "main" java.lang.illegalstateexception: 1 subscriber allowed!).

am missing obvious fix problem? have tried playing around connectableobservables in case give appearance of single subscriber, attempts have been failures (surely due ignorance on part).


on related note, groupbyuntil seems give reference groupedobservable giving me similar headache of complaining multiple subscribers if tried use determine when close window. here again i'm sure overlooking obvious since api expects 1 use groupedobservable!

you may able use .cache() on groupedobservable.

final observable<long> incached = in.cache(); 

then use resulting observable in filters.

observable<long> filter1 = incached.filter(new func1<long, boolean>() {         @override         public boolean call(long along) {             return 5 == along % 5;         }     }); 

that way each subscriber see same items there 1 subscriber groupedobservable.


Comments

Popular posts from this blog

java - How to specify maven bin in eclipse maven plugin? -

single sign on - Logging into Plone site with credentials passed through HTTP -

php - Why does AJAX not process login form? -