rx scala - Controlling observable buffering by observable itself -
i'm trying slice observable stream itself, eg.:
val source = observable.from(1 10).share val boundaries = source.filter(_ % 3 == 0) val result = source.tumblingbuffer(boundaries) result.subscribe((buf) => println(buf.tostring))
te output is:
buffer() buffer() buffer() buffer()
source
iterated on boundaries
line, before reaches result
create boundaries , resulting buffers there's nothing fill in.
my approach using publish
/connect
:
val source2 = observable.from(1 10).publish val boundaries2 = source2.filter(_ % 3 == 0) val result2 = source2.tumblingbuffer(boundaries2) result2.subscribe((buf) => println(buf.tostring)) source2.connect
this produces output alright:
buffer(1, 2) buffer(3, 4, 5) buffer(6, 7, 8) buffer(9, 10)
now need hide connect
outer world , connect
when result
gets subscribed (i doing inside class , don't want expose it). like:
val source3 = observable.from(1 10).publish val boundaries3 = source3.filter(_ % 3 == 0) val result3 = source3 .tumblingbuffer(boundaries3) .doonsubscribe(() => source3.connect) result3.subscribe((buf) => println(buf.tostring))
but now, doonsubscribe
action gets never called published source
gets never connected...
what's wrong?
you on right track publish
solution. there alternative publish
operator takes lambda argument (see documentation) of type observable[t] => observable[r]
. argument of lambda original stream, can safely subscribe multiple times. within lambda transform original stream liking; in case filter stream and buffer on filter.
observable.from(1 10) .publish(src => src.tumblingbuffer(src.filter(_ % 3 == 0))) .subscribe(buf => println(buf.tostring()))
the best thing of operator don't need call connect
afterwards.
Comments
Post a Comment