Hi devs, As we make progress on some minor PRs on structured streaming, I'd like to remind about major PRs on SS area to get more chances to be reviewed.
Please note that I only include existing PRs, so something still not discussed like queryable state is not included in the curation list. Also, I've excluded PRs on continuous processing, as I'm not fully sure about current direction and vision on this feature. Minor PRs are mostly excluded unless they are proposed for a long ago. Last, I could be biased on curating list. Let's get started! ---- A. File Source/Sink 1. [SPARK-20568][SS] Provide option to clean up completed files in streaming query ISSUE: https://issues.apache.org/jira/browse/SPARK-20568 PR: https://github.com/apache/spark/pull/22952 >From the nature of "stream", the input data will grow infinitely and end users want to have a clear way to clean up completed files. Unlike batch query, structured streaming doesn't require all input files to be presented - once they've been committed (say, completed processing), they wouldn't be read from such query. This patch automatically cleans up input files when they're committed, with three options: 1) keep it as it is, 2) archive (move) to other directory 3) delete. 2. [SPARK-27188][SS] FileStreamSink: provide a new option to have retention on output files ISSUE: https://issues.apache.org/jira/browse/SPARK-27188 PR: https://github.com/apache/spark/pull/24128 File sink writes metadata which records list of output files to ensure file source to only read correct files, which helps to achieve end-to-end exactly once. But file sink has no idea when output files will not be accessed from downstream query, so metadata just grows infinitely and output files cannot be removed safely. This patch opens the chance for end users to provide TTL on output files so that metadata will eventually exclude expired output files as well as end users could remove the output files safely. B. Kafka Source/Sink 1. [SPARK-21869][SS] A cached Kafka producer should not be closed if any task is using it - adds inuse tracking. ISSUE: https://issues.apache.org/jira/browse/SPARK-21869 PR: https://github.com/apache/spark/pull/19096 This is a long-lasting bug (around 2 years after filing the JIRA issue): if some task uses cached Kafka producer longer than 10 minutes, pool will recognize it as "timed-out" and just close it. After closing undefined behavior from task side will occur. This patch adds "in-use" tracking on producer to address this. Please note that Kafka producer is thread-safe (whereas Kafka consumer is not) and we allow using it concurrently, so we can't adopt commons pool to pool producer. (Though we can still leverage commons pool if we are OK to not share between threads.) 2. [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming ISSUE: https://issues.apache.org/jira/browse/SPARK-23539 PR: https://github.com/apache/spark/pull/22282 As there's great doc to rationalize the needs on supporting Kafka headers, I'll just let the doc explaining it. https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers Please note that the issue has been commented from end users regarding availability, which also represents the needs on end users' side. 3. [SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer ISSUE: https://issues.apache.org/jira/browse/SPARK-25151 PR: https://github.com/apache/spark/pull/22138 Kafka source has its pooling logic for consumers, but as I saw some JIRA issues regarding pooling we seem to agree we would like to replace with known pool implementation which provides advanced configuration, detailed metrics, etc. This patch adopts Apache Commons Pool (which above advantages are brought) to be used as a connection pool for consumers, with respecting to current behavior whenever possible. It also separates pooling for consumer and fetched data which enables to maximize efficiency on pooling consumers, and also address the bug on unnecessary re-fetch on self-join. (The result of experiment is in PR's content.) 4. [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending) SQL ISSUE: https://issues.apache.org/jira/browse/SPARK-26848 PR: https://github.com/apache/spark/pull/23747 When end users would want to replay their records in Kafka topic, they wouldn't memorize exact offsets per each partition but Spark requires to do that, otherwise just start from earliest. We as human being are much familiar with time, once we want to replay some records we know the timestamp of records we should start from. This patch opens the chance for end users to provide offset by timestamp (either starting or ending, or both) which will be transparently passed on Kafka when requesting. C. State 1. [SPARK-27237][SS] Introduce State schema validation among query restart ISSUE: https://issues.apache.org/jira/browse/SPARK-27237 PR: https://github.com/apache/spark/pull/24173 Spark doesn't have explicit mechanism to avoid end users to change their query as "non-compatible". We documented the rules where the query will not be compatible between changes, but it's not easier to self-determine the rules, and non-friendly error message will be thrown if end users violate the rule. In fact, undefined behavior will occur. This patch introduces state schema validation, which verifies schema compatibility regarding states between changes of query, and provides informative error message on end users so that they indicate previous schema and current schema of state. This is also a baseline of new data source - state, as we can leverage state schema information and not requiring end users to input the schema. 2. [SPARK-28191][SS] New data source - state - reader part ISSUE: https://issues.apache.org/jira/browse/SPARK-28191 PR: https://github.com/apache/spark/pull/24990 Please read below JIRA issue to see rationalization of state data source, as the issue description contains the cases where state data source can be used. (e.g. schema evolution on state, offline rescale on state, etc.) https://issues.apache.org/jira/browse/SPARK-28190 This patch deals with source part - enables reading states on structured streaming query to the batch query. 3. [SPARK-28120][SS] Rocksdb state storage implementation ISSUE: https://issues.apache.org/jira/browse/SPARK-28120 PR: https://github.com/apache/spark/pull/24922 The memory has been huge limitation of state size. As structured streaming loads two versions of state in executor by default, memory pressure becomes the real problem on dealing with large state. Scaling up executors may work, but it requires unnecessary waste of resource, and it can't help when executor is beyond number of partitions. (State data source will eventually help on repartitioning but it requires offline batch query.) State store which resides outside of memory is mandatory to structured streaming for dealing with large state, and this patch is trying to address it by introducing RocksDB state store provider. D. Structured Streaming 1. [SPARK-24634][SS] Add a new metric regarding number of rows later than watermark ISSUE: https://issues.apache.org/jira/browse/SPARK-24634 PR: https://github.com/apache/spark/pull/24936 Spark does't provide any information on late rows which could be dropped on stateful processor. This patch adds metrics on counting late rows so that end users can be noticed about it. Please note that the issue was originally meant to provide the number of dropped rows due the late, but Spark does pre-aggregation on streaming aggregation, so it doesn't provide correct number. Current approach is less informative than origin intention but still bring the value, for example, determining whether the query is affected by SPARK-28074. 2. [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows ISSUE: https://issues.apache.org/jira/browse/SPARK-26154 PR: https://github.com/apache/spark/pull/23634 This is long-standing correctness issue, and multiple end users (including me) reported about the behavior. This is occurred on edge-case, but the edge-case is not hard to reproduce, even closer to example query we provide as streaming outer join. This patch addresses the correctness issue via changing the state on join - introduced "matched" flag. 3. [SPARK-26655][SS] Support multiple aggregates in append mode ISSUE: https://issues.apache.org/jira/browse/SPARK-26655 PR: https://github.com/apache/spark/pull/23576 Multiple streaming aggregates has been concerned by end users - in perspective of end users, it sounds like just an essential thing to support, but Spark doesn't support this. There're many SO questions as well as mail threads asking this feature, but we still didn't deal with it. If we only think about append mode, technically the feature is bound to proper definition of watermark. We haven't considered watermark calculation (and/or propagation) for multiple stages of stateful operations, but as there's widely used concept on multiple stages of watermark, we can leverage it and focus how to apply it to Spark. (For update mode, retraction is needed which would require huge efforts on adopting, so let's ignore for now.) Please keep in mind, lack of definition of watermark on multiple stateful stages is not only the problem of multiple streaming aggregations, but also multiple stateful operations (including streaming join, flatMapGroupsWithState, deduplicate, etc) which is not technically restricted by Spark. SPARK-28074 points out this problem. This patch tries to address multiple aggregates - the patch itself may not be valid, but there's a design doc we can move forward and update the implementation. 4. [SPARK-27330][SS] support task abort in foreach writer ISSUE: https://issues.apache.org/jira/browse/SPARK-27330 PR: https://github.com/apache/spark/pull/24382 Foreach writer could leak resource when task is aborted, as Spark does't call writer.close() when task is aborted. If the task throws exception in process in foreach writer or succeeds to commit, it would properly call close(), but in other case calling close() is missing due to missing proper handle about abort. This patch fixes the bug. 5. [SPARK-28074][DOC][SS] Document caveats on using multiple stateful operations in single query ISSUE: https://issues.apache.org/jira/browse/SPARK-28074 PR: https://github.com/apache/spark/pull/24890 As I mentioned in SPARK-26655, Spark doesn't restrict using multiple stateful operations in single query (except streaming aggregations), where the concept of watermark is not covered properly on multiple stateful stages. I've explained this issue with example on dev mailing list earlier, so you can refer the link to see rationalization of issue. https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@%3Cdev.spark.apache.org%3E We've not decided how to let end users avoid the issue (dealing with SPARK-26655 is the best but in the meanwhile...) and this patch is trying to establish (or discuss) how to guide end users. SPARK-24634 would be help to end users to determine whether their query is affected by this issue, as in append mode intermediate output should not be later than watermark. ---- Please chime in and share your curation if I'm missing something. Thanks, Jungtaek Lim (HeartSaVioR)