> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java,
> >  line 147
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487016#file1487016line147>
> >
> >     I think it would be useful to call out why SinkFunction is the only one 
> > that needs a MessageCollector and TaskCoordinator. 
> >     
> >     I believe the reason for TaskCoordinator is because Sink is terminal, 
> > so that is the only place you need to commit(). I'm not sure about the 
> > MessageCollector though. It seems like it should be consistent with 
> > StreamOperator. Either both should have collectors or neither... unless I'm 
> > missing something.

sink needs the MessageCollector since it is terminal and needs to send the 
messages to Kafka topic. StreamOperator are generating in-memory outputs only, 
hence, no need for MessageCollector


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java,
> >  line 152
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487016#file1487016line152>
> >
> >     It's odd that the Java Function class is used for the 1 parameter case 
> > but we define a completely independent interface for the multi-parameter 
> > case. 
> >     
> >     Maybe there should be a new @FunctionalInterface for this case.
> >     
> > http://stackoverflow.com/questions/27872387/can-a-java-lambda-have-more-than-1-parameter

Thanks for the suggestion. Using @FunctionalInterface does make the 
SinkFunction more generic.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java,
> >  line 385
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487016#file1487016line385>
> >
> >     Should this have generics so users can have specific types at the very 
> > beginning or is that not possible until at least one user-defined op has 
> > processed and declared the types?

The issue is that the IncomingMessageEnvelope is not typed. Hence, it is not 
possible to figure out the type of the key and value in the incoming message in 
the framework before creating the SystemMessageStream object and pass it to the 
user. The development of Data and Schema classes are targeted to build an 
abstract data representation and access layer s.t. in the process, serde is 
hidden from the user. More detailed discussion in SAMZA-848, SAMZA-842, and 
SAMZA-429. Ideally, the SystemMessageStream should come w/ Message<K extends 
Data, M extends Data>, via Data, we can access the fields in K and M w/o the 
need to know the serde. However, w/ the current implementation, that approach 
also has drawbacks: a) instead of having a indirect dependency on avro via 
serde implementation class, we are directly depending on avro, which brings in 
dependency management issue w/ serde implementation; b) for each serde, now we 
need to have an implementation of Data to wrap on top of it. I started
  to think that this may actually be better to be incorporated into serde class 
interface APIs, instead of the current intermediate approach. For now, I am not 
going to use this class and will always assume that incoming message is 
Message<Object, Object> and need user to convert it to known deserialized class 
(as we do in user code today).


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/Scan.java, line 
> > 28
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487017#file1487017line28>
> >
> >     Naming:
> >     Maybe it's just me and maybe I'm not seeing the big picture of use 
> > cases, but I still don't find this class name intuitive. Scan is a verb 
> > that makes some sense if the input is a table but I've never heard someone 
> > describe a "scan" of a stream. Further, to me, scan is a "read" operation 
> > and it would be unintuitive for it to have side effects like extracting 
> > keys and timestamps.   
> >     
> >     Also, it seems like this will only be used at the entry points of the 
> > DAG. If so, it should be the antithesis of "sink" so I'd call it "source"
> >     Source.createWithExtractors(te, ke)
> >     Source.createWithKeyExtractor(ke)
> >     Source.createWithTimestampExtractor(te)
> >     
> >     Alternatively, if this could be used in the middle of the DAG, it could 
> > be 
> >     MessageStream.createWithExtractors(te, ke)
> >     MessageStream.createWithKeyExtractor(ke)
> >     MessageStream.createWithTimestampExtractor(te)
> >     
> >     Neither of the above are verbs so here are some other options:
> >     Preprocess
> >     Normalize
> >     Decorate
> >     Annotate
> >     Extract
> >     Enrich (or any of its synonyms)
> >     
> >     I know they may be less concise, but they all feel clearer to me. 
> > Normalize and Enrich are my current favorites.

Chris had a similar comment and I have removed this Scan class completely from 
the API. Thanks for point it out as well!


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java, 
> > lines 51-63
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487053#file1487053line51>
> >
> >     I know this isn't part of the current patch but this smells funky. All 
> > the @Overrides are Unsupported..., and then there are a bunch of static 
> > equivalents. 
> >     
> >     Seems like the @Overrides could delegate to the static methods or they 
> > should be removed, but as it stands, it seems messy.

Yeah, as I mentioned in the replies above, I would rather open another JIRA to 
address this Data/AvroData and Schema implementation issue.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java,
> >  line 39
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487055#file1487055line39>
> >
> >     Naming:
> >     Class is in the operators package but has Sql in the name.

Same as my response above.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java,
> >  line 38
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487056#file1487056line38>
> >
> >     Naming:
> >     Class is in the operators package but has Sql in the name.

Good catch. That was missed when converting samza-sql-core to samza-operator 
module. I will open another JIRA to fix it.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java,
> >  line 28
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487057#file1487057line28>
> >
> >     Naming:
> >     Class is in the operators package but has Sql in the name.

Same here.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java,
> >  line 28
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487058#file1487058line28>
> >
> >     Naming:
> >     Class is in the operators package but has Sql in the name.

Same here.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java,
> >  line 33
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487061#file1487061line33>
> >
> >     Since this extends Message, I expected to see some @Override 
> > annotations, unless Message is an empty abstract class.

Message actually is an interface class.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java,
> >  line 40
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487078#file1487078line40>
> >
> >     Naming:
> >     Since this has been moved out of sql-core, the class should probably 
> > also be renamed to just "AvroSerdeTest"

Sure. Tracking separately.


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47835/#review147209
-----------------------------------------------------------


On Aug. 26, 2016, 8:43 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47835/
> -----------------------------------------------------------
> 
> (Updated Aug. 26, 2016, 8:43 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake 
> Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-914
>     https://issues.apache.org/jira/browse/SAMZA-914
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-914: initial draft of operator programming API. Design doc attached to 
> SAMZA-914: 
> https://issues.apache.org/jira/secure/attachment/12821524/SAMZA-914_%20operator%20Java%20programming%20API%20-%20Google%20Docs.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed 
>   gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb 
>   samza-api/src/test/java/org/apache/samza/config/TestConfig.java 
> 5d066c5867e9df9e94e60bde825dedf10703b399 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Scan.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/data/window/WindowOutput.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/join/Join.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/window/SessionWindow.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/window/Trigger.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/window/Window.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorBaseImpl.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/Pipeline.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/TestScanner.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/data/TestDataStream.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/AssembleCallGraphTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java 
> PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
>  PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java 
> PRE-CREATION 
>   
> samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
>  PRE-CREATION 
>   samza-sql-core/README.md PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
>  PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
>  PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
>  PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 
> PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
>  PRE-CREATION 
>   settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 
> 
> Diff: https://reviews.apache.org/r/47835/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>

Reply via email to