Hello there, I have a quick question regarding how to share data (a small data collection) between a kafka producer and consumer using spark streaming (spark 2.2):
(A) the data published by a kafka producer is received in order on the kafka consumer side (see (a) copied below). (B) however, collect() or cache() on a streaming dataframe does not seem to be supported (see links in (b) below): i got this: Exception in thread "DataProducer" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; (C) My question would be: --- How can I use the collection data (on a streaming dataframe) arrived on the consumer side, e.g convert it to an array of objects? --- Maybe there's another quick way to use kafka for sharing static data (instead of streaming) between two spark application services (without any common spark context and session etc.)? I have copied some code snippet in (c). It seems to be a very simple use case scenario to share a global collection between a spark producer and consumer. But I spent entire day to try various options and go thru online resources such as google-general/apache-spark/stackoverflow/cloudera/etc/etc. Any help would be very much appreciated! Thanks! Peter (a) streaming data (df) received on the consumer side (console sink): root |-- ad_id: string (nullable = true) |-- campaign_id: string (nullable = true) |-- timestamp: timestamp (nullable = true) ------------------------------------------- Batch: 0 ------------------------------------------- +------------------------------------+------------------------------------+-----------------------+ |ad_id |campaign_id |timestamp | +------------------------------------+------------------------------------+-----------------------+ |b5629b58-376e-462c-9e65-726184390c84|bf412fa4-aeaa-4148-8467-1e1e2a6e0945|2018-04-27 14:35:45.475| |64e93f73-15bb-478c-9f96-fd38f6a24da2|bf412fa4-aeaa-4148-8467-1e1e2a6e0945|2018-04-27 14:35:45.475| |05fa1349-fcb3-432e-9b58-2bb0559859a2|060810fd-0430-444f-808c-8a177613226a|2018-04-27 14:35:45.478| |ae0a176e-236a-4d3a-acb9-141157e81568|42b68023-6a3a-4618-a54a-e6f71df26710|2018-04-27 14:35:45.484| (b) online discussions on unsupported operations on streaming dataframe: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operatio... https://stackoverflow.com/questions/42062092/why-does-using-cache-on-streaming-datasets-fail-with-analysisexception-queries (c) code snippet: OK: val rawDf = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString) .option("startingOffsets", "earliest") .option("subscribe", Variables.CAMPAIGNS_TOPIC) .load() OK: val mySchema = StructType(Array( StructField("ad_id", StringType), StructField("campaign_id", StringType))) val campaignsDf2 = campaignsDf.select(from_json($"value", mySchema).as("data"), $"timestamp") .select("data.*", "timestamp") OK: campaignsDf2.writeStream .format("console") .option("truncate","false") .trigger(org.apache.spark.sql.streaming.Trigger.Once()) //trigger once since this is a onetime static data .awaitTermination() Exception: val campaignsArrayRows = campaignsDf2.collect() //<==== not supported ====> AnalysisException!