> On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > Here's a first round. > > > > Some JavaDoc or a reverence to any existing design doc would help the > > reviewers. Also, the Jira says there are examples in this patch but I > > didn't see them. Can you point them out? They may be a good starting place. > > > > It seems that this patch builds on another patch, so we can't apply this > > one on master. Please direct us to the other patch. > > > > What are the plans for using reactive streams? Is it just to formalize the > > DAG of operators or is there more to it than that?
This patch is based from samza-sql branch. Please checkout open source samza-sql branch as indicated in the RB. The following two patches are built atop this one. My inclusion of reactive streams is for experimental implementation of operators. The purpose are: a) exactly as you pointed, trying to formalize the implementation class of operators; b) I see potential usage of reactive streams lib to help us implementing flow-control / back-pressure. However, it is not absolutely needed for now and I think that I will remove it for clarity of this review. > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-operator/src/main/java/org/apache/samza/operators/exception/OperatorException.java, > > line 33 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314357#file1314357line33> > > > > A valid UID should be generated Sure. Will add that later. OperatorException class was removed in the later patches and delayed for later real implementation. > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-operator/src/main/java/org/apache/samza/operators/exception/OperatorException.java, > > line 52 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314357#file1314357line52> > > > > It might be worth renaming these static constructors to distinguish > > them from getError(). > > > > Two options come to mind: > > 1. createInputDisabledException() and similar for TooOld > > 2. Considering the user perspective, I think > > OperatorException.inputDisabled() reads well. This is a stub class just to add specific errors from operators. We can definitely rename it now. But I think that these static constructors are only examples to give an idea. Real implementation will change the names quite a bit. I have removed this class in the later patches to reduce the scope of the change now. > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-operator/src/main/java/org/apache/samza/operators/factory/DataStream.java, > > line 30 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314358#file1314358line30> > > > > Doc missing from this entire class, actually most of the patch. It's > > definitely needed before submitting, but would make review much easier too. I separated the sequence of changes in three RBs and the javadoc was in the latest changes in the third RB. Adding the changes on top of this is a bit tricky and I would rather include all changes in one RB, if that's easier for the review. We can sync up in person on Mon > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-operator/src/main/java/org/apache/samza/operators/scan/Scanner.java, > > line 24 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314359#file1314359line24> > > > > It's unclear to me what a scanner is. > > > > I can see that it operates on a SSP, but does it filter the SSP, > > duplicate it, act as a cursor? Scanner decorate the incoming messages based on user defined timestamps and message keys. > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-operator/src/main/java/org/apache/samza/operators/scan/Scanner.java, > > line 32 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314359#file1314359line32> > > > > Is this method filtering based on a key or is the key a timestamp? It's > > unclear to me why the method is called timestamp and why it needs to be > > chainable (I assume that's why it's stubbed to return "this") > > > > Same comment for messageKey() below Later patches have the implementation of those two methods. The idea here is to allow user to define a customized timestamp field from the message, with the field specified by key. The same goes to messageKey(). > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java, line > > 38 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314375#file1314375line38> > > > > The terminology is a bit mixed here. > > > > Tuple<K, T>: > > K suggests "Key" > > T suggests "Tuple" > > > > But then that's a Tuple containing a Tuple, which is awkward. > > > > And then on this line we have T getMessage() implying T is a message. > > > > I think these constructs need to be clarified. > > > > Perhaps this interface is KeyedTuple<K, T> and the method is T > > getTuple()? Good point. It would be better to change it to Tuple<K, M>, in which M represents message. I will make the change. > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java, line > > 66 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314375#file1314375line66> > > > > I think it'll be useful to always qualify timestamps as > > "eventTimeStamp" or "systemTimeStamp" systemTimestamp does not make a lot of sense to the programmer, since it is system specific information and changes for each run. The current user-defined windowing method uses it and suffers from the exact behavior of the uncertainty. I can't think of a good use case for the user to use systemTimestamp in windowing. In addition, since Scanner allows users to specify the timestamp field for the tuple, user will make explicit choice on the timestamp they want to use for the input tuples. If the user does not specifying the timestamp field, we would default to systemTimestamp. We can sync up in person on Monday. Thanks! > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java, > > line 22 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314380#file1314380line22> > > > > JoinCondition is spelled out below. For consistency, I think > > JoinFunction should be spelled out here. Sure. It would be better to keep consistency. > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java, > > line 26 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314380#file1314380line26> > > > > "JoinCondition" no 's' Consider it done. Thanks! > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java, > > line 39 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314391#file1314391line39> > > > > I don't have an intuition for what this method does or what a > > Subscriber is. Does it just register a downstream listener? Yes. This is inherit from reactiveio library. We may or may not need to use this. The reactiveio library may be useful when we start to implement flow control and back pressure in the operator pipelines. We do not absolutely need it now. If it creates too much confusion, I can remove it from the current implementation. Since it does not change the user programming APIs, we can always swap in the reactiveio based operator implementation classes later, when implementing the flow control and back pressure in operator pipelines. > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java, > > line 37 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314402#file1314402line37> > > > > Are there going to be other, non-Stream pipelines? > > If not, I think we can drop "Stream" from the class name Good point. I will do that! > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java, > > line 45 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314402#file1314402line45> > > > > Curious why this class uses a message collector rather than > > output/chaining This is actually the implementation class of a connected operator topology. This won't be seen by the users. The idea is to allow programmers to use output chaining in describing the pipeline, and the system automatically creates the implementation objects that implements onNext() to execute the logic. Hence, this is not part of the user-facing programming API and does not follow output chaining pattern. > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java, > > line 22 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314403#file1314403line22> > > > > Will Batch need to be templated with generics? Or is it really just a > > window cardinality? I have changed it to Limit class later. The whole idea is that this is descrbing a counter-threshold based triggering condition. > On May 20, 2016, 11:56 p.m., Jake Maes wrote: > > samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java, line > > 20 > > <https://reviews.apache.org/r/45324/diff/1/?file=1314405#file1314405line20> > > > > Alt package: > > org.apache.samza.stream Agree that it seems to be a better place. Or even, should be in org.apache.samza.operator.api.data, since Offset classes are features descrbing the input Tuple. Thanks! - Yi ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/45324/#review128249 ----------------------------------------------------------- On March 28, 2016, 6:59 p.m., Yi Pan (Data Infrastructure) wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/45324/ > ----------------------------------------------------------- > > (Updated March 28, 2016, 6:59 p.m.) > > > Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake > Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu. > > > Bugs: SAMZA-914 > https://issues.apache.org/jira/browse/SAMZA-914 > > > Repository: samza > > > Description > ------- > > SAMZA-914: Initial draft for Java programming APIs on operators supporting > DAGs > > > Diffs > ----- > > build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed > gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb > > samza-operator/src/main/java/org/apache/samza/operators/exception/OperatorException.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/factory/DataStream.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/operators/scan/Scanner.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/window/SessionWindow.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/operators/window/Timeout.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/operators/window/Window.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/task/DataStreamTask.java > PRE-CREATION > samza-operator/src/test/java/org/apache/samza/task/DataStreamJoinTask.java > PRE-CREATION > samza-operator/src/test/java/org/apache/samza/task/DataStreamSplitTask.java > PRE-CREATION > samza-operator/src/test/java/org/apache/samza/task/DataStreamUserTask.java > PRE-CREATION > > samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java > PRE-CREATION > samza-sql-core/README.md PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > PRE-CREATION > samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java > PRE-CREATION > settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 > > Diff: https://reviews.apache.org/r/45324/diff/ > > > Testing > ------- > > Locally build via ./gradlew clean build > > > Thanks, > > Yi Pan (Data Infrastructure) > >