Scala Spark dataframe : Task not serilizable exception even with Broadcast variables -


this works (df : dataframe)

val filteredrdd = df.rdd.zipwithindex.collect { case (r, i) if >= 10 => r } 

this doesn't

val start=10 val filteredrdd = df.rdd.zipwithindex.collect { case (r, i) if >= start => r } 

i tried using broadcast variables , didn't work

 val start=sc.broadcast(1)  val filteredrdd = df.rdd.zipwithindex.collect { case (r, i) if >= start.value => r } 

i getting task not serializable exception. can explain why fails broadcast variables.

org.apache.spark.sparkexception: task not serializable @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:304) @ org.apache.spark.util.closurecleaner$.org$apache$spark$util$closurecleaner$$clean(closurecleaner.scala:294) @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:122) @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:2055) @ org.apache.spark.rdd.rdd$$anonfun$collect$2.apply(rdd.scala:959) @ org.apache.spark.rdd.rdd$$anonfun$collect$2.apply(rdd.scala:958) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:111) @ org.apache.spark.rdd.rdd.withscope(rdd.scala:316) @ org.apache.spark.rdd.rdd.collect(rdd.scala:958) @ $iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$$$$$fa17825793f04f8d2edd8765c45e2a6c$$$$wc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc.(:172) @ $iwc 

the basic constructs using solid. here similar code snippet does work. note uses broadcast , uses broadcast value inside map method - code.

scala> val dat = sc.parallelize(list(1,2,3)) dat: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[0] @ parallelize @ <console>:24 scala> val br = sc.broadcast(10) br: org.apache.spark.broadcast.broadcast[int] = broadcast(2)  scala> dat.map(br.value * _) res2: org.apache.spark.rdd.rdd[long] = mappartitionsrdd[1] @ map @ <console>:29  scala> res2.collect res3: array[int] = array(10, 20, 30) 

so may verification of general approach.

i suspect problem other variables in script. try stripping out first in new spark-shell session , find out culprit process of elimination.


Comments

Popular posts from this blog

javascript - Laravel datatable invalid JSON response -

java - Exception in thread "main" org.springframework.context.ApplicationContextException: Unable to start embedded container; -

sql server 2008 - My Sql Code Get An Error Of Msg 245, Level 16, State 1, Line 1 Conversion failed when converting the varchar value '8:45 AM' to data type int -