Hi, Xinyu, Thanks for the proposal. I took a quick pass and had the following questions/comments:
- message shuffling ==> data shuffling??? - the proposal is for all types of control messages, not just for end-of-stream, right? Better to define the scope and layout the comment requirements of control message delivery. - dropped option should go to “Rejected alternatives” - “Samza finds out the following intermediate streams that all the inputs have been end-of-stream” what does it mean? The task consuming the input stream(s) reconcile all EoS from all input stream partitions and then propagate EoS messages to all partitions in intermediate streams? This is not super clear to me. - in step-3, how does the consumer of intermediate streams know how many EOS messages should be received? And we should make it clear that it should be EOS / producer and the count of the downstream consumer is counting on the number of unique EOS from all producers from the upstream. - In comparison table, “checkpoint the control messages received” ==> is it referring to the partially accumulated upstream EOS messages? - Please make a clear definition on “Watermark” and “EndOfStream”. Why are they different? Are they both control messages that requires the same delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)? If yes, should we make the “watermark” vs “EndOfStream” a sub-category in control message? - As for the serde for intermediate stream, I assume that we will need an envelope serde that is avro to wrap the user message and control message in? So, user-defined serde now only applies to the “UserMessage”? And what’s the message key in the message format? - A big question regarding to the watermark propagation: “When Samza receives watermark messages, it will emit a watermark with the earliest event time across all the stream partitions. No emission if the earliest event time doesn’t change.” Does the watermark propagation requires synchronization/coordination between all producers at the source? Say, if the task taking one input source emits watermark at 1min interval and the task taking another input source emits watermark at 5min interval, how does the downstream consumer reconcile the watermarks? - In the checkpoint message format, it seems that it is only design for watermark messages? Any streamId info that EoS is carrying over? Thanks a lot! -Yi On Tue, May 30, 2017 at 9:46 AM, xinyu liu <xinyuliu...@gmail.com> wrote: > Makes sense. I noticed that too and I dropped the ControlMessage type in my > code. I also moved taskName, taskCount to the parent ControlMessage class. > Just updated the SEP-6. Please take a look again. > > Thanks, > Xinyu > > On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt < > cpett...@linkedin.com.invalid> wrote: > > > MessageType and ControlMessage.Type look redundant. You could either use > > "ControlMessage" as the type in MessageType or drop ControlMessage.Type. > > > > On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xinyuliu...@gmail.com> > wrote: > > > > > Thanks a lot for the comments. I updated the SEP with more details and > > > clarification. Please let me know if you have further questions. > > > > > > Thanks, > > > Xinyu > > > > > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari < > > > pmaheshw...@linkedin.com.invalid> wrote: > > > > > > > Hi Xinyu, > > > > > > > > Thanks for the proposal. Some requests for clarifications. Let's > update > > > the > > > > SEP directly instead of replying here. > > > > > > > > E.g., in "For any following intermediate stream whose input streams > are > > > all > > > > end-of-stream, it will be marked as pending EOS" - Should clarify > that > > > > (IIUC) something is injecting EOS messages in all intermediate stream > > > > partitions once it receives EOS from all input stream partitions it's > > > > consuming. Should also clarify what is that something. > > > > Same for "declare end of stream once all the EOS messages have been > > > > received." - What does this declaration involve and who is doing > this? > > > > > > > > In pro for approach 2: Not clear what this means - "The watermark can > > > > conclude the input messages before this watermark have been > complete." > > > > > > > > For the cons of approach 2: "Complicated failure scenario of the > second > > > > job. It needs to checkpoint all the watermark messages received, so > > when > > > it > > > > recovered from failure, it can still count." - How is this related to > > > EOS? > > > > How is this related to the checkpoint watermark section? > > > > Also, what is the "more messages required to write.. " referring to? > > > > > > > > "Samza needs to reconcile based on the task counts." - Please explain > > > what > > > > reconciliation means, why it needs to happen, and why we need to > track > > > the > > > > producer task and total task count in the watermark message to do > this. > > > > > > > > Checkpoint watermarks section is also unclear. What problem are we > > trying > > > > to solve here? > > > > > > > > Should also move the message format and the watermark message > interface > > > > sections to the bottom, since they depend on details in the event > time > > > and > > > > checkpoint watermark sections. > > > > > > > > Thanks, > > > > Prateek > > > > > > > > > > > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu <xinyuliu...@gmail.com> > > > wrote: > > > > > > > > > Hi all, > > > > > > > > > > I created SEP-6 for SAMZA-1260 > > > > > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support > > Watermark > > > > > Across Intermediate Streams for Batch Processing. The link to the > SEP > > > is > > > > > here: > > > > > > > > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP- > > > > > 6+Support+Watermark+Across+Intermediate+Streams+for+ > Batch+Processing > > > > > > > > > > Please review and comments are welcome! > > > > > > > > > > Thanks, > > > > > Xinyu > > > > > > > > > > > > > > >