In structured streaming, the QueryProgressEvent does not seem to have the final emitted record count to the destination, I see only the number of input rows. I was trying to use the count (additional action after persisting the dataset), but I face the below exception when calling persist or count on the dataset before the query is started. I have a sample code below, please suggest how to get the query running and the final count.
"Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;" Dataset<PropertyPageView> data = transform(kafkaTopic, SPECIFIC_AVRO, kafkaStreamSet, UserSessionEventJoin.class, PropertyPageView.class, (Function<UserSessionEventJoin, Boolean> & Serializable) (UserSessionEventJoin userSessionEventJoin) -> { UserEvent userEvent = userSessionEventJoin.getUserEvent(); if (userEvent != null && TYPE_PAGE_VIEW.equalsIgnoreCase(userEvent.getType())) { if (userEvent.getPayloadMap() != null) { return PAGE_TYPE_PROPERTY.equalsIgnoreCase( userEvent.getPayloadMap().get(PAGE_TYPE)); } } return false; } ); data.persist(StorageLevel.MEMORY_AND_DISK()); log.info("dataset persisted"); long emittedCount = data.count(); Map<String,String> metricTags = new HashMap<>(); metricTags.put("source",kafkaTopic); metricTags.put("destination",sinkPath); DataMonitorMetric recordsWrittenMetric = dataMonitorUtils .buildDataMonitorMetricWithValue(null, System.currentTimeMillis(), "numOutputRows", metricTags, Aspect.EMITTED, emittedCount); dataMonitorUtils.sendMetric(recordsWrittenMetric); StreamingQuery streamingQuery = data.writeStream().outputMode("append") .format("parquet") .option("checkpointLocation", "file:///Users/asethurathnam/Downloads/parquet/checkpoint") .trigger(Trigger.ProcessingTime(1000, TimeUnit.MILLISECONDS)) .partitionBy("eventDate") .start("file:///Users/asethurathnam/Downloads/parquet/output-parquet"); data.unpersist(); log.info("dataset unpersisted"); streamingQuery.awaitTermination(); -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org