[ https://issues.apache.org/jira/browse/SPARK-21546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112146#comment-16112146 ]
Kevin Zhang commented on SPARK-21546: ------------------------------------- [~zsxwing] in my case I hope to use a watermark to expire the state but not use the watermark column to filter duplicate elements. i.e. I want to count the unique access of my website for one day, so I should just store the state of dropDuplicates for one day and drop the state the next day, meanwhile I want to use uuid as the key to drop duplicate elements rather than using (uuid, eventTime) together, but dropDuplicates behaves like the latter right? If so how can I get the right results as I expected? > dropDuplicates with watermark yields RuntimeException due to binding failure > ---------------------------------------------------------------------------- > > Key: SPARK-21546 > URL: https://issues.apache.org/jira/browse/SPARK-21546 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.0 > Reporter: Jacek Laskowski > Assignee: Shixiong Zhu > Fix For: 2.2.1, 2.3.0 > > > With today's master... > The following streaming query with watermark and {{dropDuplicates}} yields > {{RuntimeException}} due to failure in binding. > {code} > val topic1 = spark. > readStream. > format("kafka"). > option("subscribe", "topic1"). > option("kafka.bootstrap.servers", "localhost:9092"). > option("startingoffsets", "earliest"). > load > val records = topic1. > withColumn("eventtime", 'timestamp). // <-- just to put the right name > given the purpose > withWatermark(eventTime = "eventtime", delayThreshold = "30 seconds"). // > <-- use the renamed eventtime column > dropDuplicates("value"). // dropDuplicates will use watermark > // only when eventTime column exists > // include the watermark column => internal design leak? > select('key cast "string", 'value cast "string", 'eventtime). > as[(String, String, java.sql.Timestamp)] > scala> records.explain > == Physical Plan == > *Project [cast(key#0 as string) AS key#169, cast(value#1 as string) AS > value#170, eventtime#157-T30000ms] > +- StreamingDeduplicate [value#1], > StatefulOperatorStateInfo(<unknown>,93c3de98-3f85-41a4-8aef-d09caf8ea693,0,0), > 0 > +- Exchange hashpartitioning(value#1, 200) > +- EventTimeWatermark eventtime#157: timestamp, interval 30 seconds > +- *Project [key#0, value#1, timestamp#5 AS eventtime#157] > +- StreamingRelation kafka, [key#0, value#1, topic#2, > partition#3, offset#4L, timestamp#5, timestampType#6] > import org.apache.spark.sql.streaming.{OutputMode, Trigger} > val sq = records. > writeStream. > format("console"). > option("truncate", false). > trigger(Trigger.ProcessingTime("10 seconds")). > queryName("from-kafka-topic1-to-console"). > outputMode(OutputMode.Update). > start > {code} > {code} > ------------------------------------------- > Batch: 0 > ------------------------------------------- > 17/07/27 10:28:58 ERROR Executor: Exception in task 3.0 in stage 13.0 (TID > 438) > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding > attribute, tree: eventtime#157-T30000ms > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87) > at > org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.bind(GeneratePredicate.scala:45) > at > org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.bind(GeneratePredicate.scala:40) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:977) > at > org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:370) > at > org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.org$apache$spark$sql$execution$streaming$WatermarkSupport$$super$newPredicate(statefulOperators.scala:350) > at > org.apache.spark.sql.execution.streaming.WatermarkSupport$$anonfun$watermarkPredicateForKeys$1.apply(statefulOperators.scala:160) > at > org.apache.spark.sql.execution.streaming.WatermarkSupport$$anonfun$watermarkPredicateForKeys$1.apply(statefulOperators.scala:160) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.sql.execution.streaming.WatermarkSupport$class.watermarkPredicateForKeys(statefulOperators.scala:160) > at > org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.watermarkPredicateForKeys$lzycompute(statefulOperators.scala:350) > at > org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.watermarkPredicateForKeys(statefulOperators.scala:350) > at > org.apache.spark.sql.execution.streaming.WatermarkSupport$class.removeKeysOlderThanWatermark(statefulOperators.scala:167) > at > org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.removeKeysOlderThanWatermark(statefulOperators.scala:350) > at > org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec$$anonfun$doExecute$4$$anonfun$apply$4$$anonfun$apply$mcV$sp$1.apply$mcV$sp(statefulOperators.scala:403) > at > org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:96) > at > org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec.timeTakenMs(statefulOperators.scala:350) > at > org.apache.spark.sql.execution.streaming.StreamingDeduplicateExec$$anonfun$doExecute$4$$anonfun$apply$4.apply$mcV$sp(statefulOperators.scala:403) > at > org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Couldn't find eventtime#157-T30000ms > in [value#185] > at scala.sys.package$.error(package.scala:27) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) > ... 49 more > {code} > I'm somehow convinced that watermark support leaks from > {{StreamingDeduplicate}} and forces a Spark developer to include extra fields > for watermark. I think filter pushdown (for the select) should not be > executed for this case or should include the extra {{eventTime}} column > (regardless of whether a developer uses it or not). -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org