scala - How can I use and return Source queue to caller without materializing it? -
i'm trying use new akka streams , wonder how can use , return source queue caller without materializing in code ?
imagine have library makes number of async calls , returns results via source
. function looks this
def findarticlesbytitle(text: string): source[string, sourcequeue[string]] = { val source = source.queue[string](100, backpressure) source.mapmaterializedvalue { case queue => val url = s"http://.....&term=$text" httpclient.get(url).map(httpresponsetosprayjson[searchresponse]).map { v => v.idlist.foreach { id => queue.offer(id) } queue.complete() } } source }
and caller might use this
// there implicit actormaterializer somewhere val stream = plugin.findarticlesbytitle(title) val results = stream.runfold(list[string]())((result, article) => article :: result)
when run code within mapmaterializedvalue
never executed.
i can't understand why don't have access instance of sourcequeue
if should caller decide how materialize source.
how should implement ?
in code example you're returning source instead of return value of source.mapmaterializedvalue
(the method call doesn't mutate source object).
Comments
Post a Comment