Scala Spark dataframe : Task not serilizable exception even with Broadcast variables -
this works (df : dataframe)
val filteredrdd = df.rdd.zipwithindex.collect { case (r, i) if >= 10 => r }
this doesn't
val start=10 val filteredrdd = df.rdd.zipwithindex.collect { case (r, i) if >= start => r }
i tried using broadcast variables , didn't work
val start=sc.broadcast(1) val filteredrdd = df.rdd.zipwithindex.collect { case (r, i) if >= start.value => r }
i getting task not serializable exception. can explain why fails broadcast variables.
org.apache.spark.sparkexception: task not serializable @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:304) @ org.apache.spark.util.closurecleaner$.org$apache$spark$util$closurecleaner$$clean(closurecleaner.scala:294) @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:122) @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:2055) @ org.apache.spark.rdd.rdd$$anonfun$collect$2.apply(rdd.scala:959) @ org.apache.spark.rdd.rdd$$anonfun$collect$2.apply(rdd.scala:958) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:111) @ org.apache.spark.rdd.rdd.withscope(rdd.scala:316) @ org.apache.spark.rdd.rdd.collect(rdd.scala:958) @ $iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$$$$$fa17825793f04f8d2edd8765c45e2a6c$$$$wc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc.(:172) @ $iwc
the basic constructs using solid. here similar code snippet does work. note uses broadcast
, uses broadcast value inside map
method - code.
scala> val dat = sc.parallelize(list(1,2,3)) dat: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[0] @ parallelize @ <console>:24 scala> val br = sc.broadcast(10) br: org.apache.spark.broadcast.broadcast[int] = broadcast(2) scala> dat.map(br.value * _) res2: org.apache.spark.rdd.rdd[long] = mappartitionsrdd[1] @ map @ <console>:29 scala> res2.collect res3: array[int] = array(10, 20, 30)
so may verification of general approach.
i suspect problem other variables in script. try stripping out first in new spark-shell session , find out culprit process of elimination.
Comments
Post a Comment