> On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote: > > samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java, > > line 31 > > <https://reviews.apache.org/r/51346/diff/4/?file=1486375#file1486375line31> > > > > How likely are we to collide with this? Thats the problem with using a > > user definable token. I see two options: > > > > If null is not supported (and thus not usable by user-defined > > implementations) I would use that and mark it as reserved. > > > > Otherwise I would probably do something more to make this unlikely to > > collide (call me paranoid). Something like use a NUL byte as the first > > character and document that offsets with such an encoding are reserved. I > > would also check that this sort of string doesn't make it to user code in > > the task.
Returning a null is not possible (because a null offset could mean that we don't have messages at this moment instead of meaning end-of-stream. While we should poll again when a consumer returns null, we should not for the END_OF_STREAM case.) Hence, I was hoping to use a special offset. I like your suggestion of using a NUL byte as the first character (and calling that out). I'll update the RB with that. > On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote: > > samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java, > > line 60 > > <https://reviews.apache.org/r/51346/diff/4/?file=1486376#file1486376line60> > > > > Shouldn't this case be flagged as an error at the point at which this > > is detected during enqueue and then dropped? The benefit is that we have > > less states to worry about at layers like these and we also get a nice > > stack trace where the error originated. I'm not sure I completely understand the part about - "detecting during enqueue and dropping it". There's buffering being done at every next/hasNext call. So, an enqueue of an end-of-stream is not necessarily an error. - I've refactored how the `endOfStreamReached` flag is manipulated in the new draft. Hopefully, this refactoring makes the handling of states simpler. > On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote: > > samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java, > > line 90 > > <https://reviews.apache.org/r/51346/diff/4/?file=1486376#file1486376line90> > > > > OK, I take back the part about a nice stack trace... You could at least > > log the consumer that gave you back an end of stream followed by a message. I've logged that the end-of-stream gave back end-of-stream. Thanks for the suggestion. > On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote: > > samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java, > > line 67 > > <https://reviews.apache.org/r/51346/diff/4/?file=1486376#file1486376line67> > > > > Same comment as above. I've refactored how the `endOfStreamReached` flag is manipulated in the new draft. Hopefully, this refactoring makes the handling of states simpler. > On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, lines > > 364-366 > > <https://reviews.apache.org/r/51346/diff/4/?file=1486378#file1486378line364> > > > > We're distributing this end of stream knowledge in a few places. It > > actually looks totally unnecessary here? I think you could always call > > task.endOfStream and let the task decide if it needs to do anything as a > > result. Good observation! I've fixed this one in the new draft. > On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote: > > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, > > line 174 > > <https://reviews.apache.org/r/51346/diff/4/?file=1486379#file1486379line174> > > > > I would just check if you can coerce here. I don't think adding a > > second representation of that state is buying us much (and it would be bad > > for it to diverge from the truth). I tried to be consistent with the rest of the Samza code base (on how we do this for Windowable, Initable, Closeable and Async tasks as well). We log the type of the task in the `detailedString()` function (in the revised RB). So, there maybe value in having a separate function. I'm not sure how the representation may diverge (as the flags are immutable and set during construction). > On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote: > > samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java, line > > 363 > > <https://reviews.apache.org/r/51346/diff/4/?file=1486380#file1486380line363> > > > > This line is a bit long, you could break it up like this, which > > arguably improves readability: > > > > ``` > > when(consumerMultiplexer.choose(false)) > > .thenReturn(envelope0) > > .thenReturn(envelope1) > > ... > > ``` This is good feedback on readability. Thanks for the input :) > On Aug. 26, 2016, 7:51 p.m., Chris Pettitt wrote: > > samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java, line > > 371 > > <https://reviews.apache.org/r/51346/diff/4/?file=1486380#file1486380line371> > > > > Interesting - should end of stream actually count as a message? The metric - `processes` measures how many times we invoked `process` on the `StreamTask` implementation. Since, we don't pass over the endOfStream message to user-code, we will not invoke `process` for those messages. End-Of-Stream messages will be purely internal to Samza. - Jagadish ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/51346/#review147015 ----------------------------------------------------------- On Aug. 25, 2016, 11:52 p.m., Jagadish Venkatraman wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/51346/ > ----------------------------------------------------------- > > (Updated Aug. 25, 2016, 11:52 p.m.) > > > Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data > Infrastructure), Navina Ramesh, and Xinyu Liu. > > > Repository: samza > > > Description > ------- > > Samza currently works with unbounded data sources (kafka streams). However, > for bounded data sources like HDFS files, snapshot files which are not > infinite, we need a notion of 'end-of-stream'. > > This is a step towards realizing a 'finite' Samza job that terminates once > data processing is complete.(as opposed to an infinite stream job that keeps > running) > > RB changes: > - New interface for EndOfStreamListener > - New 'end-of-stream' state in the state-machine of AsyncStreamTask > (Invariant: When end-of-stream is reached there are no buffered messages, > no-callbacks are in-flight and no-window or commit call shall be in progress) > - Changes to allow clean shut-downs of the tasks/container/job for > end-of-stream. > > Design Doc and Implementation Notes: > https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf > > > Diffs > ----- > > > samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java > cc860cf7eb4d514736913c1dceaa80534b61d71a > > samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java > a8f858aa7e4f4ce436f450cf439fe1a102983c64 > samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java > a510bb0c5914c772438930d27f100b4d360c1296 > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > 89f6857014489aba2db4129bc2e26dfec5b10652 > samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java > ca913dea79fecbcecdfd1010dc794318055c5764 > > Diff: https://reviews.apache.org/r/51346/diff/ > > > Testing > ------- > > Unit tests to test scenarios for inorder processing, out-of-order processing > and commit semantics. > > > Thanks, > > Jagadish Venkatraman > >