Hi,

I've been investigating this SO question:
https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming

TL;DR: when using the Socket source, trying to create multiple queries does
not work properly, only one the first query in the start order will
receive data.

This minimal example reproduces the issue:

val lines = spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", "9999")
    .option("includeTimestamp", true)
    .load()

val q1 = lines.writeStream
  .outputMode("append")
  .format("console")
  .start()

val q2 = lines.withColumn("foo", lit("foo")).writeStream
  .outputMode("append")
  .format("console")
  .start()

Sample output (spark shell):

Batch: 0
-------------------------------------------
+-----+-------------------+
|value|          timestamp|
+-----+-------------------+
|  aaa|2017-08-11 23:37:59|
+-----+-------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-------------------+
|value|          timestamp|
+-----+-------------------+
|  aaa|2017-08-11 23:38:00|
+-----+-------------------+

q1.stop

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+-----+-------------------+---+
|value|          timestamp|foo|
+-----+-------------------+---+
|    b|2017-08-11 23:38:19|foo|
+-----+-------------------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-------------------+---+
|value|          timestamp|foo|
+-----+-------------------+---+
|    b|2017-08-11 23:38:19|foo|
+-----+-------------------+---+

This is certainly unexpected behavior. Even though the socket source is
marked "not for production" I wouldn't expect to be so limited.

Am I right to think that the first running query consumes all the data in
the source, and therefore all the other queries do not work (until the
previous ones are stopped)?

Is this a generalized behavior? e.g. each query started on a structured
streaming job fully consumes the source? e.g. the Kafka source can be used
with multiple queries because it can be replayed?

As a workaround, would there be a way to cache the incoming data to
multiplex it? We cannot call `cache` a streaming dataset, but is there a
maybe way to do that?

Could I have more details on the execution model (I've consumed all I could
find) and what are the (near) future plans?

thanks!

-Gerard.

Reply via email to