rx scala - Controlling observable buffering by observable itself -


i'm trying slice observable stream itself, eg.:

val source = observable.from(1 10).share val boundaries = source.filter(_ % 3 == 0) val result = source.tumblingbuffer(boundaries)  result.subscribe((buf) => println(buf.tostring)) 

te output is:

buffer() buffer() buffer() buffer() 

source iterated on boundaries line, before reaches result create boundaries , resulting buffers there's nothing fill in.

my approach using publish/connect:

val source2 = observable.from(1 10).publish val boundaries2 = source2.filter(_ % 3 == 0) val result2 = source2.tumblingbuffer(boundaries2)  result2.subscribe((buf) => println(buf.tostring)) source2.connect 

this produces output alright:

buffer(1, 2) buffer(3, 4, 5) buffer(6, 7, 8) buffer(9, 10) 

now need hide connect outer world , connect when result gets subscribed (i doing inside class , don't want expose it). like:

val source3 = observable.from(1 10).publish val boundaries3 = source3.filter(_ % 3 == 0) val result3 = source3           .tumblingbuffer(boundaries3)           .doonsubscribe(() => source3.connect)  result3.subscribe((buf) => println(buf.tostring)) 

but now, doonsubscribe action gets never called published source gets never connected...

what's wrong?

you on right track publish solution. there alternative publish operator takes lambda argument (see documentation) of type observable[t] => observable[r]. argument of lambda original stream, can safely subscribe multiple times. within lambda transform original stream liking; in case filter stream and buffer on filter.

observable.from(1 10)     .publish(src => src.tumblingbuffer(src.filter(_ % 3 == 0)))     .subscribe(buf => println(buf.tostring())) 

the best thing of operator don't need call connect afterwards.


Comments

Popular posts from this blog

javascript - Laravel datatable invalid JSON response -

java - Exception in thread "main" org.springframework.context.ApplicationContextException: Unable to start embedded container; -

sql server 2008 - My Sql Code Get An Error Of Msg 245, Level 16, State 1, Line 1 Conversion failed when converting the varchar value '8:45 AM' to data type int -