java - How To Have Multiple Subscribers To A Grouped Observable in RxJava? -
this using rxjava version 0.19.6.
outside of groupby operation, 1 can create pipeline described following code to, instance, select record observable based on criteria or select first record meets alternate criteria:
observable<long> observable = observable.interval(1, timeunit.milliseconds).take(10); observable<long> filter1 = observable.filter(new func1<long, boolean>() { @override public boolean call(long along) { return 5 == along % 5; } }); observable<long> filter2 = observable.filter(new func1<long, boolean>() { @override public boolean call(long along) { return 2 == along % 5; } }); blockingobservable.from(observable.concat(filter1, filter2).first()).foreach(new action1<long>() { @override public void call(long along) { system.out.println(along); } }); ...unfortunately, due restrictions on groupedobservable, appears same kind of procedure not work operating inside of grouped context:
blockingobservable.from(observable.groupby(new func1<long, long>() { @override public long call(long along) { return along % 5; } }).flatmap(new func1<groupedobservable<long, long>, observable<long>>() { @override public observable<long> call(groupedobservable<long, long> in) { observable<long> filter1 = in.filter(new func1<long, boolean>() { @override public boolean call(long along) { return 5 == along % 5; } }); observable<long> filter2 = in.filter(new func1<long, boolean>() { @override public boolean call(long along) { return 2 == along % 5; } }); return observable.concat(filter1, filter2).first(); } })).foreach(new action1<long>() { @override public void call(long along) { system.out.println(along); } }); ...results in multiple subscriber exception (exception in thread "main" java.lang.illegalstateexception: 1 subscriber allowed!).
am missing obvious fix problem? have tried playing around connectableobservables in case give appearance of single subscriber, attempts have been failures (surely due ignorance on part).
on related note, groupbyuntil seems give reference groupedobservable giving me similar headache of complaining multiple subscribers if tried use determine when close window. here again i'm sure overlooking obvious since api expects 1 use groupedobservable!
you may able use .cache() on groupedobservable.
final observable<long> incached = in.cache(); then use resulting observable in filters.
observable<long> filter1 = incached.filter(new func1<long, boolean>() { @override public boolean call(long along) { return 5 == along % 5; } }); that way each subscriber see same items there 1 subscriber groupedobservable.
Comments
Post a Comment