scala - Spark 1.5 MlLib LDA - getting topic distribusions for new documents -
not duplicate of this because i'm asking input is, not function call, see below
i followed this guide create lda model in spark 1.5. saw in this question topic distribution of new document need use localldamodel's topicdistributions function takes rdd[(long, vector)].
should new document vector term-count vector? type of vector lda trained with. code compiles , runs i'd know if intended use of topicdistributions function
import org.apache.spark.rdd._ import org.apache.spark.mllib.clustering.{lda, distributedldamodel, localldamodel} import org.apache.spark.mllib.linalg.{vector, vectors} import scala.collection.mutable val input = seq("this document","this document","these training, not tests", "here final file (document)") val corpus: rdd[array[string]] = sc.parallelize(input.map{ doc => doc.split("\\s") }) val termcounts: array[(string, long)] = corpus.flatmap(_.map(_ -> 1l)).reducebykey(_ + _).collect().sortby(-_._2) val vocabarray: array[string] = termcounts.takeright(termcounts.size).map(_._1) val vocab: map[string, int] = vocabarray.zipwithindex.tomap // convert documents term count vectors val documents: rdd[(long, vector)] = corpus.zipwithindex.map { case (tokens, id) => val counts = new mutable.hashmap[int, double]() tokens.foreach { term => if (vocab.contains(term)) { val idx = vocab(term) counts(idx) = counts.getorelse(idx, 0.0) + 1.0 } } (id, vectors.sparse(vocab.size, counts.toseq)) } // set lda parameters val numtopics = 10 val ldamodel: distributedldamodel = new lda().setk(numtopics).setmaxiterations(20).run(documents).asinstanceof[distributedldamodel] //create test input, convert term count, , topic distribution val test_input = seq("this test document") val test_document:rdd[(long,vector)] = sc.parallelize(test_input.map(doc=>doc.split("\\s"))).zipwithindex.map{ case (tokens, id) => val counts = new mutable.hashmap[int, double]() tokens.foreach { term => if (vocab.contains(term)) { val idx = vocab(term) counts(idx) = counts.getorelse(idx, 0.0) + 1.0 } } (id, vectors.sparse(vocab.size, counts.toseq)) } println("test_document: "+test_document.first._2.toarray.mkstring(", ")) val localldamodel: localldamodel = ldamodel.tolocal val topicdistributions = localldamodel.topicdistributions(documents) println("first topic distribution:"+topicdistributions.first._2.toarray.mkstring(", "))
according spark src, notice following comment regarding documents parameter:
* @param documents: * rdd of documents, term (word) count vectors paired ids. * term count vectors "bags of words" fixed-size vocabulary * (where vocabulary size length of vector). * must use same vocabulary (ordering of term counts) in training. * document ids must unique , >= 0.
so answer yes, new document vector should term count vector. further, vector ordering should same used in training.
Comments
Post a Comment