----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/51346/#review147015 -----------------------------------------------------------
Fix it, then Ship it! samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java (line 31) <https://reviews.apache.org/r/51346/#comment213968> How likely are we to collide with this? Thats the problem with using a user definable token. I see two options: If null is not supported (and thus not usable by user-defined implementations) I would use that and mark it as reserved. Otherwise I would probably do something more to make this unlikely to collide (call me paranoid). Something like use a NUL byte as the first character and document that offsets with such an encoding are reserved. I would also check that this sort of string doesn't make it to user code in the task. samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java (line 60) <https://reviews.apache.org/r/51346/#comment213969> Shouldn't this case be flagged as an error at the point at which this is detected during enqueue and then dropped? The benefit is that we have less states to worry about at layers like these and we also get a nice stack trace where the error originated. samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java (line 67) <https://reviews.apache.org/r/51346/#comment213970> Same comment as above. samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java (line 90) <https://reviews.apache.org/r/51346/#comment213971> OK, I take back the part about a nice stack trace... You could at least log the consumer that gave you back an end of stream followed by a message. samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 363 - 365) <https://reviews.apache.org/r/51346/#comment213972> We're distributing this end of stream knowledge in a few places. It actually looks totally unnecessary here? I think you could always call task.endOfStream and let the task decide if it needs to do anything as a result. samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala (line 174) <https://reviews.apache.org/r/51346/#comment213973> I would just check if you can coerce here. I don't think adding a second representation of that state is buying us much (and it would be bad for it to diverge from the truth). samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java (line 360) <https://reviews.apache.org/r/51346/#comment213974> This line is a bit long, you could break it up like this, which arguably improves readability: ``` when(consumerMultiplexer.choose(false)) .thenReturn(envelope0) .thenReturn(envelope1) ... ``` samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java (line 368) <https://reviews.apache.org/r/51346/#comment213975> Interesting - should end of stream actually count as a message? - Chris Pettitt On Aug. 25, 2016, 11:52 p.m., Jagadish Venkatraman wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/51346/ > ----------------------------------------------------------- > > (Updated Aug. 25, 2016, 11:52 p.m.) > > > Review request for samza, Boris Shkolnik, Chris Pettitt, Yi Pan (Data > Infrastructure), Navina Ramesh, and Xinyu Liu. > > > Repository: samza > > > Description > ------- > > Samza currently works with unbounded data sources (kafka streams). However, > for bounded data sources like HDFS files, snapshot files which are not > infinite, we need a notion of 'end-of-stream'. > > This is a step towards realizing a 'finite' Samza job that terminates once > data processing is complete.(as opposed to an infinite stream job that keeps > running) > > RB changes: > - New interface for EndOfStreamListener > - New 'end-of-stream' state in the state-machine of AsyncStreamTask > (Invariant: When end-of-stream is reached there are no buffered messages, > no-callbacks are in-flight and no-window or commit call shall be in progress) > - Changes to allow clean shut-downs of the tasks/container/job for > end-of-stream. > > Design Doc and Implementation Notes: > https://issues.apache.org/jira/secure/attachment/12825119/ProposalforEndofStreaminSamza.pdf > > > Diffs > ----- > > > samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java > cc860cf7eb4d514736913c1dceaa80534b61d71a > > samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java > a8f858aa7e4f4ce436f450cf439fe1a102983c64 > samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java > a510bb0c5914c772438930d27f100b4d360c1296 > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > 89f6857014489aba2db4129bc2e26dfec5b10652 > samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java > ca913dea79fecbcecdfd1010dc794318055c5764 > > Diff: https://reviews.apache.org/r/51346/diff/ > > > Testing > ------- > > Unit tests to test scenarios for inorder processing, out-of-order processing > and commit semantics. > > > Thanks, > > Jagadish Venkatraman > >