Thanks for the quick response. Milinda
On Tue, Jun 2, 2015 at 2:02 AM, Navina Ramesh <nram...@linkedin.com.invalid> wrote: > Sorry about the confusion, Milinda. > > Thanks for reverting, Yi! > > On 6/1/15, 10:53 PM, "Yi Pan" <nickpa...@gmail.com> wrote: > > >Hi, Milinda, > > > >That was an accidental mistake. I have reverted the check-in. I am still > >working on that. Thanks! > > > >-Yi > > > >On Mon, Jun 1, 2015 at 9:34 PM, Milinda Pathirage <mpath...@umail.iu.edu> > >wrote: > > > >> Hi Navina, > >> > >> Did we decided to push this patch to samza-sql branch. I thought Yi is > >> still working on this. Some Git conflict related texts are still there > >>in > >> this commit. > >> > >> +<<<<<<< HEAD > >> + * The callback object > >> +======= > >> + * The callback function > >> +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of > >> callbacks w/o inheriting and creating many sub-classes from operators > >> > >> Milinda > >> > >> On Mon, Jun 1, 2015 at 9:06 PM, <nav...@apache.org> wrote: > >> > >> > Yi's TopologyBuilder RB 34500 > >> > > >> > > >> > Project: http://git-wip-us.apache.org/repos/asf/samza/repo > >> > Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/45b85477 > >> > Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/45b85477 > >> > Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/45b85477 > >> > > >> > Branch: refs/heads/samza-sql > >> > Commit: 45b854772cf36cc69e8d8cda7a51bce1be5fe576 > >> > Parents: 41c4cd0 > >> > Author: Navina <navi.trin...@gmail.com> > >> > Authored: Thu May 28 18:51:30 2015 -0700 > >> > Committer: Navina <navi.trin...@gmail.com> > >> > Committed: Thu May28 18:51:30 2015 -0700 > >> > > >> > ---------------------------------------------------------------------- > >> > .../apache/samza/sql/api/data/EntityName.jaa | 41 ++- > >> > .../org/apache/samza/sql/api/data/Table.java | 7 +- > >> > .../samza/sql/api/operators/Operator.java | 4 + > >> > .../sql/api/operators/OperatorCallback.java | 1 - > >> > .../samza/sql/api/operators/OperatorRouter.java | 8 + > >> > .../samza/sql/api/operators/OperatorSink.java | 30 ++ > >> > .../samza/sql/api/operators/OperatorSource.java | 30 ++ > >> > .../samza/sql/api/operators/SimpleOperator.java | 3 +- > >> > .../samza/sql/data/IncomingMessageTuple.java | 1 - > >> > .../sql/operators/NoopOperatorCallback.java | 53 ++++ > >> > .../samza/sql/operators/OperatorTopology.java | 53 ++++ > >> > .../samza/sql/operators/SimpleOperatorImpl.java | 147 ++++++++++ > >> > .../samza/sql/operators/SimpleOperatorSpec.java | 106 +++++++ > >> > .../samza/sql/operators/SimpleRouter.java | 141 +++++++++ > >> > .../operators/factory/NoopOperatorCallback.java | 50 ---- > >> > .../operators/factory/SimpleOperatorImpl.java | 136 --------- > >> > .../operators/factory/SimpleOperatorSpec.java | 106 ------- > >> > .../sql/operators/factory/SimpleRouter.java | 136 --------- > >> > .../sql/operators/factory/TopologyBuilder.java | 284 > >> +++++++++++++++++++ > >> > .../sql/operators/join/StreamStreamJoin.java | 3 +- > >> > .../operators/join/StreamStreamJoinSpec.java | 15 +- > >> > .../sql/operators/partition/PartitionOp.java | 3 +- > >> > .../sql/operators/partition/PartitionSpec.java | 2 +- > >> > .../sql/operators/window/BoundedTimeWindow.java | 4 +- > >> > .../samza/sql/operators/window/WindowSpec.java | 7 +- > >> > .../samza/task/sql/SimpleMessageCollector.java | 37 ++- > >> > .../task/sql/RandomWindowOperatorTask.java | 11 +- > >> > .../apache/samza/task/sql/StreamSqlTask.java | 26 +- > >> > .../samza/task/sql/UserCallbacksSqlTask.java | 66 ++--- > >> > 29 files changed, 991 insertions(+), 520 deletions(-) > >> > ---------------------------------------------------------------------- > >> > > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/api/data/EntityName.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j > >>ava > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j > >>ava > >> > index 80ba455..df1b11b 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j > >>ava > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j > >>ava > >> > @@ -49,6 +49,8 @@ public class EntityName { > >> > */ > >> > private final String name; > >> > > >> > + private final boolean isSystemEntity; > >> > + > >> > /** > >> > * Static map of already allocated table names > >> > */ > >> > @@ -59,15 +61,19 @@ public class EntityName { > >> > */ > >> > private static Map<String, EntityName> streams = new > >>HashMap<String, > >> > EntityName>(); > >> > > >> > + private static final String ANONYMOUS = "anonymous"; > >> > + > >> > /** > >> > * Private ctor to create entity names > >> > * > >> > * @param type Type of the entity name > >> > * @param name Formatted name of the entity > >> > + * @param isSystemEntity whether the entity is a system > >>input/output > >> > */ > >> > - private EntityName(EntityType type, String name) { > >> > + private EntityName(EntityType type, String name, boolean > >> > isSystemEntity) { > >> > this.type = type; > >> > this.name = name; > >> > + this.isSystemEntity = isSystemEntity; > >> > } > >> > > >> > @Override > >> > @@ -10,6 +108,10 @@ public class EntityName { > >> > return this.type.equals(EntityType.STREAM); > >> > } > >> > > >> > + public boolean isSysteEntity() { > >> > + return this.isSystemEntity; > >> > + } > >> > + > >> > /** > >> > * Get the formatted entity name > >> > * > >> > @@ -111,15 +121,24 @@ public class EntityName { > >> > return this.name; > >> > } > >> > > >> > + public static EntityName getTableName(String name) { > >> > + return getTableName(name, false); > >> > + } > >> > + > >> > + public static EntityName getStreamName(Sting name) { > >> > + return getStreamName(name, false); > >> > + } > >> > + > >> > /** > >> > * Static method to get the instance of {@code EntityName} with > >>type > >> > {@code EntityType.TABLE} > >> > * > >> > * @param name The formatted entity name of the relation > >> > + * @param isSystem The boolean flag indicating whether this is a > >> system > >> > input/output > >> > * @return A <code>EntityName</code> for a relation > >> > */ > >> > - public static EntityName getTableName(String name) { > >> > + public static EntityName getTableName(String name, boolean > >>isSystem) { > >> > if (tables.get(name) == null) { > >> > - tables.put(name, new EntityName(EntityType.TABLE, name)); > >> > + tables.put(name, new EntityName(EntityType.TABLE, name, > >> isSystem)); > >> > } > >> > return tables.get(name); > >> > } > >> > @@ -128,13 +147,25 @@ public class EntityName { > >> > * Static method to get the instance of <code>EntityName</code> > >>with > >> > type <code>EntityType.STREAM</code> > >> > * > >> > * @param name The formatted ntity name of the stream > >> > + * @param isSystem The boolean flag indicating whethr this is a > >> system > >> > input/output > >> > * @return A <code>EntityName</code> for a stream > >> > */ > >> > - public static EntityName getStreamName(String name) { > >> > + public static EntityName getStreamName(String name, boolean > >>isSystem) > >> { > >> > if (streams.get(name) == null) { > >> > - streams.put(name, new EntityName(EntityType.STREAM, name)); > >> > + streams.put(name, new EntityName(EntityType.STREAM, name, > >>> isSystem)); > >> > } > >> > return streams.get(name); > >> > } > >> > > >> > + public static EntityName getAnonymousStream() { > >> > + return getStreamName(ANONYMOUS); > >> > + } > >> > + > >> > + public static EntityName getAnonymousTable() { > >> > + return getTableName(ANONYMOUS); > >> > + } > >> > + > >> > + public boolean isAnonymous() { > >> > + return this.name.equals(ANONYMOUS); > >> > + } > >> > } > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/api/data/Table.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > >> > > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > >> > index 7b4d984..b4dce07 100644 > >> > --- > >> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > >> > +++ > >> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > >> > @@ -19,6 +19,9 @@ > >> > > >> > packag org.apache.samza.sql.api.data; > >> > > >> > +import java.util.List; > >> > + > >> > + > >> > /** > >> > * This interface defines a non-ordered {@link > >> > org.apache.samza.sql.api.data.Relation}, which has a unique primary > >>key > >> > * > >> > @@ -31,8 +34,8 @@ public interface Table<K> extends Relation<K> { > >> > /** > >> > * Get the primary key field name for this table > >> > * > >> > - * @return The name of the primary key fild > >> > + * @return The names of the primary key fields > >> > */ > >> > - String getPrimaryKeyName(); > >> > + List<String> getPrimaryKeyNames(); > >> > > >> > } > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/api/operators/Operator.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>r.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>r.java > >> > index d6f6b57.9c6eaa5 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>r.java > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>r.java > >> > @@ -27,7 +27,11 @@ import org.apache.samza.task.TaskContext; > >> > import org.apache.samza.task.TasCoordinator; > >> > > >> > > >> > +/** > >> > + * This class defines the common interface for operator classes. > >> > + */ > >> > public interface Operator { > >> > + > >> > /** > >> > * Method to initialize the operator > >> > * > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>rCallback.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>rCallback.java > >> > index fb2aa89..5a77d95 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/sr/main/java/org/apache/samza/sql/api/operators/Operato > >>rCallback.java > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>rCallback.java > >> > @@ -23,7 +23,6 @@ import org.apache.saza.sql.api.data.Tuple; > >> > import org.apache.samza.task.MessageCollector; > >> > import org.apache.samza.task.TaskCoordinator; > >> > > >> > - > >> > /** > >> > * Defines the callback functions to allow customized functions to be > >> >invoked before process and before sending the result > >> > */ > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > >> > --------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>rRuter.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>rRouter.java > >> > index 0759638..432e6b3 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>rRouter.java > >> > +++ > >> > > >> > >>b/sama-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>rRouter.java > >> > @@ -19,6 +19,7 @@ > >> > > >> > package org.apache.samza.sql.api.operators; > >> > > >> > +import java.util.Iterator; > >> > mport java.util.List; > >> > > >> > import org.apache.samza.sql.api.data.EntityName; > >> > @@ -51,4 +52,11 @ public interface OperatorRouter extends Operator { > >> > */ > >> > List<SimpleOperator> getNextOperators(EntityName output); > >> > > >> > + /** > >> > + * This method provides an iterator to go through all operators > >> > connected via {@code OperatorRouter} > >> > + * > >> > + * @return An {@link java.util.Iterator} for all operators > >>connected > >> > via {@code OperatorRouter} > >> > + */ > >> > + Iterator<SimpleOperator> iterator(); > >> > + > >> > } > >> > > >> > > >> > > >> > >>http://git-wip-us.apache.og/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>rSink.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>rSink.java > >> > new file mode 100644 > >> > index 0000000..e2c748c > >> > --- /dev/null > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>rSink.java > >> > @@ -0,0 +1,30 @@ > >> > +/* > >> > + * Licensed to the Apache Software Foundation (ASF) under one > >> > + * or more contributor license agreements. See the NOTICE file > >> > + * distributed with this work for additional information > >> > + * regarding copyright ownership. The ASF licenses this file > >> > + * to you under the Apache License, Version 2.0 (the > >> > + * "License"); you may not use this file except in compliance > >> > + * with the License. You may obtain a copy of the License at > >> > + * > >> > + * http://www.apache.org/licenses/LICENSE-2.0 > >> > + * > >> > + * Unless required by applicable law or agreed to in writing, > >> > + * software distributed under the License is distributed on an > >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > >> > + * KIND, either express or implied. See the License for the > >> > + * specific language governing permissions and limitations > >> > + * under the License. > >> > + */ > >> > +package org.apache.samza.sql.api.operators; > >> > + > >> > +import java.util.Iterator; > >> > + > >> > +import org.apache.samza.sql.api.data.EntityName; > >> > + > >> > + > >> > +public interface OperatorSink { > >> > + Iterator<SimpleOperator> opIterator(); > >> > + > >> > + EntityName getName(); > >> > +} > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>rSource.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>rSource.java > >> > new file mode 100644 > >> > index 0000000..860c1aa > >> > --- /dev/null > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato > >>rSource.java > >> > @@ -0,0 +1,30 @@ > >> > +/* > >> > + * Licensed to the Apache Software Foundation (ASF) under one > >> > + * or more contributor license agreements. See the NOTICE file > >> > + * distributed with this work for additional information > >> > + * regarding copyright ownership. The ASF licenses this file > >> > + * to you under the Apache License, Version 2.0 (the > >> > + * "License"); you may not use this file except in compliance > >> > + * with the License. You may obtain a copy of the License at > >> > + * > >> > + * http://www.apache.org/licenses/LICENSE-2.0 > >> > + * > >> > + * Unless required by applicable law or agreed to in writing, > >> > + * software distributed under the License is distributed on an > >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > >> > + * KIND, either express or implied. See the License for the > >> > + * specific language governing permissions and limitations > >> > + * under the License. > >> > + */ > >> > +package org.apache.samza.sql.api.operators; > >> > + > >> > +import java.util.Iterator; > >> > + > >> > +import org.apache.samza.sql.api.data.EntityName; > >> > + > >> > + > >> > +public interface OperatorSource { > >> > + Iterator<SimpleOperator> opIterator(); > >> > + > >> > + EntityName getName(); > >> > +} > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO > >>perator.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO > >>perator.java > >> > index c49a822..60ace9c 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO > >>perator.java > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO > >>perator.java > >> > @@ -19,8 +19,6 @@ > >> > > >> > package org.apache.samza.sql.api.operators; > >> > > >> > - > >> > - > >> > /** > >> > * The interface for a {@code SimpleOperator} that implements a > >>simple > >> > primitive relational logic operation > >> > */ > >> > @@ -31,4 +29,5 @@ public interface SimpleOperator extends Operator { > >> > * @return The {@link > >>org.apache.samza.sql.api.operators.OperatorSpec} > >> > object that defines the configuration/parameters of the operator > >> > */ > >> > OperatorSpec getSpec(); > >> > + > >> > } > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT > >>uple.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT > >>uple.java > >> > index 72a59f2..af040f0 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT > >>uple.java > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT > >>uple.java > >> > @@ -81,7 +81,6 @@ public class IncomingMessageTuple implements Tuple { > >> > > >> > @Override > >> > public long getCreateTimeNano() { > >> > - // TODO: this is wrong and just to keep as an placeholder. It > >>should > >> > be replaced by the message publish time when the publish timestamp is > >> > available in the message metadata > >> > return this.recvTimeNano; > >> > } > >> > > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperato > >>rCallback.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperato > >>rCallback.java > >> > new file mode 100644 > >> > index 0000000..e951737 > >> > --- /dev/null > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperato > >>rCallback.java > >> > @@ -0,0 +1,53 @@ > >> > +/* > >> > + * Licensed to the Apache Software Foundation (ASF) under one > >> > + * or more contributor license agreements. See the NOTICE file > >> > + * distributed with this work for additional information > >> > + * regarding copyright ownership. The ASF licenses this file > >> > + * to you under the Apache License, Version 2.0 (the > >> > + * "License"); you may not use this file except in compliance > >> > + * with the License. You may obtain a copy of the License at > >> > + * > >> > + * http://www.apache.org/licenses/LICENSE-2.0 > >> > + * > >> > + * Unless required by applicable law or agreed to in writing, > >> > + * software distributed under the License is distributed on an > >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > >> > + * KIND, either express or implied. See the License for the > >> > + * specific language governing permissions and limitations > >> > + * under the License. > >> > + */ > >> > +package org.apache.samza.sql.operators; > >> > + > >> > +import org.apache.samza.sql.api.data.Relation; > >> > +import org.apache.samza.sql.api.data.Tuple; > >> > +import org.apache.samza.sql.api.operators.OperatorCallback; > >> > +import org.apache.samza.task.MessageCollector; > >> > +import org.apache.samza.task.TaskCoordinator; > >> > + > >> > + > >> > +/** > >> > + * This is a default NOOP operator callback object that does nothing > >> > before and after the process method > >> > + */ > >> > +public final class NoopOperatorCallback implements OperatorCallback { > >> > + > >> > + @Override > >> > + public Tuple beforeProcess(Tuple tuple, MessageCollector collector, > >> > TaskCoordinator coordinator) { > >> > + return tuple; > >> > + } > >> > + > >> > + @Override > >> > + public Relation beforeProcess(Relation rel, MessageCollector > >> collector, > >> > TaskCoordinator coordinator) { > >> > + return rel; > >> > + } > >> > + > >> > + @Override > >> > + public Tuple afterProcess(Tuple tuple, MessageCollector collector, > >> > TaskCoordinator coordinator) { > >> > + return tuple; > >> > + } > >> > + > >> > + @Override > >> > + public Relation afterProcess(Relation rel, MessageCollector > >>collector, > >> > TaskCoordinator coordinator) { > >> > + return rel; > >> > + } > >> > + > >> > +} > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTop > >>ology.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTop > >>ology.java > >> > new file mode 100644 > >> > index 0000000..8b70092 > >> > --- /dev/null > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTop > >>ology.java > >> > @@ -0,0 +1,53 @@ > >> > +/* > >> > + * Licensed to the Apache Software Foundation (ASF) under one > >> > + * or more contributor license agreements. See the NOTICE file > >> > + * distributed with this work for additional information > >> > + * regarding copyright ownership. The ASF licenses this file > >> > + * to you under the Apache License, Version 2.0 (the > >> > + * "License"); you may not use this file except in compliance > >> > + * with the License. You may obtain a copy of the License at > >> > + * > >> > + * http://www.apache.org/licenses/LICENSE-2.0 > >> > + * > >> > + * Unless required by applicable law or agreed to in writing, > >> > + * software distributed under the License is distributed on an > >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > >> > + * KIND, either express or implied. See the License for the > >> > + * specific language governing permissions and limitations > >> > + * under the License. > >> > + */ > >> > +package org.apache.samza.sql.operators; > >> > + > >> > +import java.util.Iterator; > >> > + > >> > +import org.apache.samza.sql.api.data.EntityName; > >> > +import org.apache.samza.sql.api.operators.OperatorSink; > >> > +import org.apache.samza.sql.api.operators.OperatorSource; > >> > +import org.apache.samza.sql.api.operators.SimpleOperator; > >> > + > >> > + > >> > +/** > >> > + * This class implements a partially completed {@link > >> > org.apache.samza.sql.operators.factory.TopologyBuilder} that > >>signifies a > >> > partially completed > >> > + * topology that the current operator has unbounded input stream that > >> can > >> > be attached to other operators' output > >> > + */ > >> > +public class OperatorTopology implements OperatorSource, > >>OperatorSink { > >> > + > >> > + private final EntityName name; > >> > + private final SimpleRouter router; > >> > + > >> > + public OperatorTopology(EntityName name, SimpleRouter router) { > >> > + this.name = name; > >> > + this.router = router; > >> > + } > >> > + > >> > + @Override > >> > + public Iterator<SimpleOperator> opIterator() { > >> > + return this.router.iterator(); > >> > + } > >> > + > >> > + @Override > >> > + public ntityName getName() { > >> > + return this.name; > >> > + } > >> > + > >> > +} > >> > > >> > > >> > > >> > >http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera > >>torImpl.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera > >>torImpl.java > >> > new file mode 10644 > >> > index 0000000..423880b > >> > --- /dev/null > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/rg/apache/samza/sql/operators/SimpleOpera > >>torImpl.java > >> > @@ -0,0 +1,147 @@ > >> > +/* > >> > + * Licensed to the Apache Software Foundation (ASF) under one > >> > + * or more contributor license agreements. See the NOTICE file > >> > + * distributed with this work for additional information > >> > + * regarding copyright ownership. The ASF licenses this file > >> > + * to you under the Apache License, Version 2.0 (the > >> > + * "License"); you may not use this file except in compliance > >> > + * with the License. You may obtain a copy of the License at > >> > + * > >> > + * http://www.apache.org/licenses/LICENSE-2.0 > >> > + * > >> > + * Unless required by applicable law or agreed to in writing, > >> > + * software distributed under the License is distributed on an > >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > >> > + * KIND, either express or implied. See the License for the > >> > + * specific language governing permissions and limitations > >> > + * under the License. > >> > + */ > >> > + > >> > +package org.apache.samza.sql.operators; > >> > + > >> > +import org.apache.samza.sql.api.data.Relation; > >> > +import org.apache.samza.sql.api.data.Tuple; > >> > +import org.apache.samza.sql.api.operators.OperatorCallback; > >> > +import org.aache.samza.sql.api.operators.OperatorSpec; > >> > +import org.apache.samza.sql.api.operators.SimpleOperator; > >> > +import org.apache.samza.task.MessageCollector; > >> > +import org.apache.samza.task.TaskCoordinator; > >> > +import org.apache.samza.task.sql.SimpleMessageCollector; > >> > + > >> > + > >> > +/** > >> > + * An abstract class that encapsulate the basic information and > >>methods > >> > that all operator classes should implement. > >> > + * It implements the interface {@link > >> > org.apache.samza.sql.api.operators.SimpleOperator} > >> > + */ > >> > +public abstract class SimpleOperatorImpl implements SimpleOperator { > >> > + /** > >> > + * The specification of this operator > >> > + */ > >> > + private final OperatorSpec spec; > >> > + > >> > + /** > >> > +<<<<<<<HEAD > >> > + * The callback object > >> > +======= > >> > + * The callback function > >> > +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of > >> > callbacks w/o inheriting and creating many sub-classes from operators > >> > + */ > >> > + private final OperatorCallback callback; > >> > + > >> > + /** > >> > + * Ctor of {@code SimpleOperatorImpl} class > >> > + * > >> > + * @param spec The specification of this operator > >> > + */ > >> > + public SimpleOperatorImpl(OperatorSpec spec) { > >> > + this(spec, new NoopOperatorCallback()); > >> > + } > >> > + > >> > + public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback > >> callback) > >> > { > >> > + this.spec = spec; > >> > + this.callback = callback; > >> > + } > >> > + > >> > + @Override > >> > + public OperatorSpec getSpec() { > >> > + return this.spec; > >> > + } > >> > + > >> > + /** > >> > + * This method is made final s.t. the sequence of invocations > >>between > >> > {@link > >> > > >> > >>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relatio > >>n, > >> > MessageCollector, TaskCoordinator)} > >> > + * and real processing of the input relation is fixed. > >> > + */ > >> > + @Override > >> > + final public void process(Relation deltaRelation, MessageCollector > >> > collector, TaskCoordinator coordinator) > >> > + throws Exception { > >> > + Relation rel = this.callback.beforeProcess(deltaRelation, > >>collector, > >> > coordinator); > >> > + if (rel == null) { > >> > + return; > >> > + } > >> > + this.realProcess(rel, getCollector(collector, coordinator), > >> > coordinator); > >> > + } > >> > + > >> > + /** > >> > + * This method is made final s.t. the sequence of invocations > >>between > >> > {link > >> > > >>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, > >> > MessageCollector, TaskCoordinator)} > >> > + * and real processing of the input tuple is fixed. > >> > + */ > >>> + @Override > >> > + final public void process(Tuple tuple, MessageCollector collector, > >> > TaskCoordinator coordinator) throws Exception { > >> > + Tuple itupl = this.callback.beforeProcess(tuple, collector, > >> > coordinator); > >> > + if (ituple == null) { > >> > + return; > >> > + } > >> > + this.realProcess(ituple, getCollector(collector, coordinator), > >> > coordinator); > >> > + } > >> > + > >> > + /** > >> > + * This method is made final s.t. we enforce the invocation of > >>{@code > >> > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} > >> bfore > >> > doing anything futher > >> > + */ > >> > + @Override > >> > + final public void refresh(long timeNano, MessageCollector > >>collector, > >> > TaskCoordinator coordinator) throws Exception { > >> > + this.realRefresh(timeNano, getCollector(collecto, coordinator), > >> > coordinator); > >> > + } > >> > + > >> > + private SimpleMessageCollector getCollector(MessageCollector > >> collector, > >> > TaskCoordinator coordinator) { > >> > + if (!(collector instanceof SimpleMessageCollector)) { > >> > + return new SimpleMessageCollector(collector, coordnator, > >> > this.callback); > >> > + } else { > >> > + ((SimpleMessageCollector) > >> collector).switchCallback(this.callback); > >> > + return (SimpleMessageCollector) collector; > >> > + } > >> > + } > >> > + > >> > + /** > >> > + * Method to be overriden by each specific implementation class of > >> > operator to handle timeout event > >> > + * > >> > + * @param timeNano The time in nanosecond when the timeout event > >> > occurred > >> > + * @param collector The {@link > >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context > >> > + * @param coordinator The {@link > >> org.apache.samza.task.TaskCoordinator} > >> > in the context > >> > + * @throws Exception Throws exception if failed to refresh the > >>results > >> > + */ > >> > + protected abstract void realRefresh(long timeNano, > >> > SimpleMessageCollector collector, TaskCoordinator coordinator) > >> > + throws Exception; > >> > + > >> > + /** > >> > + * Method to be overriden by each specific implementation class of > >> > operator to perform relational logic operation on an input {@link > >> > org.apache.samza.sql.api.data.Relation} > >> > + * > >> > + * @param rel The input relation > >> > + * @param collector The {@link > >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context > >> > + * @param coordinator The {@link > >> org.apache.samza.task.TaskCoordinator} > >> > in the context > >> > + * @throws Exception Throws exception if failed to process > >> > + */ > >> > + protected abstract void realProcess(Relation rel, > >> > SimpleMessageCollector collector, TaskCoordinator coordinator) > >> > + throws Exceptio; > >> > + > >> > + /** > >> > + * Method to be overriden by each specific implementation class of > >> > operator to perform relational logic operation on an input {@ink > >> > org.apache.samza.sql.api.data.Tuple} > >> > + * > >> > + * @param ituple The input tuple > > > + * @param collector The {@link > >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context > >> > + * @param coordinator The {@link > >> org.apache.samza.task.TaskCoordinator} > >> > in the context > >> > + * @throws Exception Throws exception if failed to process > >> > + */ > >> > + protected abstract void realProcess(Tuple ituple, > >> > SimpleMessageCollector collector, TaskCoordinator coordinator) > >> > + throws Exception; > >> > + > >> > +} > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera > >>torSpec.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera > >>torSpec.java > >> > new file mode 100644 > >> > index 0000000..691e543 > >> > --- /dev/null > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera > >>torSpec.java > >> > @@ -0,0 +1,106 @@ > >> > +/* > >> > + * Licensed to the Apache Software Foundation (ASF) under one > >> > + * or more contributor license agreements. See the NOTICE file > >> > + * distributed with this work for additional information > >> > + * regarding copyright ownership. The ASF licenses this file > >> > + * to you under the Apache License, Version 2.0 (the > >> > + * "License"); you may not use this file except in compliance > >> > + * with the License. You may obtain a copy of the License at > >> > + * > >> > + * http://www.apache.org/licenses/LICENSE-2.0 > >> > + * > >> > + * Unless required by applicable law or agreed to in writing, > >> > + * software distributed under the License is distributed on an > >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > >> > + * KIND, either express or implied. See the License for the > >> > + * specific language governing permissions and limitations > >> > + * under the License. > >> > + */ > >> > +package org.apache.samza.sql.operators; > >> > + > >> > +import java.util.ArrayList; > >> > +import java.util.List; > >> > + > >> > +import org.apache.samza.sql.api.data.EntityName; > >> > +import org.apache.samza.sql.api.operators.OperatorSpec; > >> > + > >> > + > >> > +/** > >> > + * An abstract class that encapsulate the basic information and > >>methods > >> > that all specification of operators should implement. > >> > + * It implements {@link > >>org.apache.samza.sql.api.operators.OperatorSpec} > >> > + */ > >> > +public abstract class SimpleOperatorSpec implements OperatorSpec { > >> > + /** > >> > + * The identifier of the corresponding operator > >> > + */ > >> > + private final String id; > >> > + > >> > + /** > >> > + * The list of input entity names of the corresponding operator > >> > + */ > >> > + private final List<EntityName> inputs = new > >>ArrayList<EntityName>(); > >> > + > >> > + /** > >> > + * The list of output entity names of the corresponding operator > >> > + */ > >> > + private final List<EntityName> outputs = new > >>ArrayList<EntityName>(); > >> > + > >> > + /** > >> > + * Ctor of the {@code SimpleOperatorSpec} for simple {@link > >> > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and > >>one > >> > output > >> > + * > >> > + * @param id Unique identifier of the {@link > >> > org.apache.samza.sql.api.operators.SimpleOperator} object > >> > + * @param input The only input entity > >> > + * @param output The only output entity > >> > + */ > >> > + public SimpleOperatorSpec(String id, EntityName input, EntityName > >> > output) { > >> > + this.id = id; > >> > + this.inputs.add(input); > >> > + this.outputs.add(output); > >> > + } > >> > + > >> > + /** > >> > + * Ctor of {@code SimpleOperatorSpec} with general format: m inputs > >> and > >> > n outputs > >> > + * > >> > + * @param id Unique identifier of the {@link > >> > org.apache.samza.sql.api.operators.SimpleOperator} object > >> > + * @param inputs The list of input entities > >> > + * @param output The list of output entities > >> > + */ > >> > + public SimpleOperatorSpec(String id, List<EntityName> inputs, > >> > EntityName output) { > >> > + this.id = id; > >> > + this.inputs.addAll(inputs); > >> > + this.outputs.add(output); > >> > + } > >> > + > >> > + @Override > >> > + public String getId() { > >> > + return this.id; > >> > + } > >> > + > >> > + @Override > >> > + public List<EntityName> getInputNames() { > >> > + return this.inputs; > >> > + } > >> > + > >> > + @Override > >> > + public List<EntityName> getOutputNames() { > >> > + return this.outputs; > >> > + } > >> > + > >> > + /** > >> > + * Method to get the first output entity > >> > + * > >> > + * @return The first output entity name > >> > + */ > >> > + public EntityName getOutputName() { > >> > + return this.outputs.get(0); > >> > + } > >> > + > >> > + /** > >> > + * Method to get the first input entity > >> > + * > >> > + * @return The first input entity name > >> > + */ > >> > + public EntityName getInputName() { > >> > + return this.inputs.get(0); > >> > + } > >> > +} > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRoute > >>r.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRoute > >>r.java > >> > new file mode 100644 > >> > index 0000000..2d9a1db > >> > --- /dev/null > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRoute > >>r.java > >> > @@ -0,0 +1,141 @@ > >> > +/* > >> > + * Licensed to the Apache Software Foundation (ASF) under one > >> > + * or more contributor license agreements. See the NOTICE file > >> > + * distributed with this work for additional information > >> > + * regarding copyright ownership. The ASF licenses this file > >> > + * to you under the Apache License, Version 2.0 (the > >> > + * "License"); you may not use this file except in compliance > >> > + * with the License. You may obtain a copy of the License at > >> > + * > >> > + * http://www.apache.org/licenses/LICENSE-2.0 > >> > + * > >> > + * Unless required by applicable law or agreed to in writing, > >> > + * software distributed under the License is distributed on an > >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > >> > + * KIND, either express or implied. See the License for the > >> > + * specific language governing permissions and limitations > >> > + * under the License. > >> > + */ > >> > + > >> > +package org.apache.samza.sql.operators; > >> > + > >> > +import java.util.ArrayList; > >> > +import java.util.HashMap; > >> > +import java.util.HashSet; > >> > +import java.util.Iterator; > >> > +import java.util.List; > >> > +import java.util.Map; > >> > +import java.util.Set; > >> > + > >> > +import org.apache.samza.config.Config; > >> > +import org.apache.samza.sql.api.data.EntityName; > >> > +import org.apache.samza.sql.api.data.Relation; > >> > +import org.apache.samza.sql.api.data.Tuple; > >> > +import org.apache.samza.sql.api.operators.Operator; > >> > +import org.apache.samza.sql.api.operators.OperatorRouter; > >> > +import org.apache.samza.sql.api.operators.SimpleOperator; > >> > +import org.apache.samza.task.MessageCollector; > >> > +import org.apache.samza.task.TaskContext; > >> > +import org.apache.samza.task.TaskCoordinator; > >> > +import org.apache.samza.task.sql.RouterMessageCollector; > >> > + > >> > + > >> > +/** > >> > + * Example implementation of {@link > >> > org.apache.samza.sql.api.operators.OperatorRouter} > >> > + * > >> > + */ > >> > +public final class SimpleRouter implements OperatorRouter { > >> > + /** > >> > + * List of operators added to the {@link > >> > org.apache.samza.sql.api.operators.OperatorRouter} > >> > + */ > >> > + private List<SimpleOperator> operators = new > >> > ArrayList<SimpleOperator>(); > >> > + > >> > + @SuppressWarnings("rawtypes") > >> > + /** > >> > + * Map of {@link org.apache.samza.sql.api.data.EntityName} to the > >>list > >> > of operators associated with it > >> > + */ > >> > + private Map<EntityName, List> nextOps = new HashMap<EntityName, > >> List>(); > >> > + > >> > + /** > >> > + * Set of {@link org.apache.samza.sql.api.data.EntityName} as > >>inputs > >> to > >> > this {@code SimpleRouter} > >> > + */ > >> > + private Set<EntityName> inputEntities = new HashSet<EntityName>(); > >> > + > >> > + /** > >> > + * Set of entities that are not input entities to this {@code > >> > SimpleRouter} > >> > + */ > >> > + private Set<EntityName> outputEntities = new HashSet<EntityName>(); > >> > + > >> > + @SuppressWarnings("unchecked") > >> > + private void addOperator(EntityName input, SimpleOperator nextOp) { > >> > + if (nextOps.get(input) == null) { > >> > + nextOps.put(input, new ArrayList<Operator>()); > >> > + } > >> > + nextOps.get(input).add(nextOp); > >> > + operators.add(nextOp); > >> > + // get the operator spec > >> > + for (EntityName output : nextOp.getSpec().getOutputNames()) { > >> > + if (inputEntities.contains(output)) { > >> > + inputEntities.remove(output); > >> > + } > >> > + outputEntities.add(output); > >> > + } > >> > + if (!outputEntities.contains(input)) { > >> > + inputEntities.add(input); > >> > + } > >> > + } > >> > + > >> > + @Override > >> > + @SuppressWarnings("unchecked") > >> > + public List<SimpleOperator> getNextOperators(EntityName entity) { > >> > + return nextOps.get(entity); > >> > + } > >> > + > >> > + @Override > >> > + public void addOperator(SimpleOperator nextOp) { > >> > + List<EntityName> inputs = nextOp.getSpec().getInputNames(); > >> > + for (EntityName input : inputs) { > >> > + addOperator(input, nextOp); > >> > + } > >> > + } > >> > + > >> > + @Override > >> > + public void init(Config config, TaskContext context) throws > >>Exception > >> { > >> > + for (SimpleOperator op : this.operators) { > >> > + op.init(config, context); > >> > + } > >> > + } > >> > + > >> > + @Override > >> > + public void process(Tuple ituple, MessageCollector collector, > >> > TaskCoordinator coordinator) throws Exception { > >> > + MessageCollector opCollector = new > >>RouterMessageCollector(collector, > >> > coordinator, this); > >> > + for (Iterator<SimpleOperator> iter = > >> > this.getNextOperators(ituple.getEntityName()).iterator(); > >> iter.hasNext();) { > >> > + iter.next().process(ituple, opCollector, coordinator); > >> > + } > >> > + } > >> > + > >> > + @SuppressWarnings("rawtypes") > >> > + @Override > >> > + public void process(Relation deltaRelation, MessageCollector > >> collector, > >> > TaskCoordinator coordinator) throws Exception { > >> > + MessageCollector opCollector = new > >>RouterMessageCollector(collector, > >> > coordinator, this); > >> > + for (Iterator<SimpleOperator> iter = > >> > this.getNextOperators(deltaRelation.getName()).iterator(); > >> iter.hasNext();) > >> > { > >> > + iter.next().process(deltaRelation, opCollector, coordinator); > >> > + } > >> > + } > >> > + > >> > + @Override > >> > + public void refresh(long nanoSec, MessageCollector collector, > >> > TaskCoordinator coordinator) throws Exception { > >> > + MessageCollector opCollector = new > >>RouterMessageCollector(collector, > >> > coordinator, this); > >> > + for (EntityName entity : inputEntities) { > >> > + for (Iterator<SimpleOperator> iter = > >> > this.getNextOperators(entity).iterator(); iter.hasNext();) { > >> > + iter.next().refresh(nanoSec, opCollector, coordinator); > >> > + } > >> > + } > >> > + } > >> > + > >> > + @Override > >> > + public Iterator<SimpleOperator> iterator() { > >> > + return this.operators.iterator(); > >> > + } > >> > + > >> > +} > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallbac > >>k.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Noo > >>pOperatorCallback.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Noo > >>pOperatorCallback.java > >> > deleted file mode 100644 > >> > index c3d2266..0000000 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Noo > >>pOperatorCallback.java > >> > +++ /dev/null > >> > @@ -1,50 +0,0 @@ > >> > -/* > >> > - * Licensed to the Apache Software Foundation (ASF) under one > >> > - * or more contributor license agreements. See the NOTICE file > >> > - * distributed with this work for additional information > >> > - * regarding copyright ownership. The ASF licenses this file > >> > - * to you under the Apache License, Version 2.0 (the > >> > - * "License"); you may not use this file except in compliance > >> > - * with the License. You may obtain a copy of the License at > >> > - * > >> > - * http://www.apache.org/licenses/LICENSE-2.0 > >> > - * > >> > - * Unless required by applicable law or agreed to in writing, > >> > - * software distributed under the License is distributed on an > >> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > >> > - * KIND, either express or implied. See the License for the > >> > - * specific language governing permissions and limitations > >> > - * under the License. > >> > - */ > >> > -package org.apache.samza.sql.operators.factory; > >> > - > >> > -import org.apache.samza.sql.api.data.Relation; > >> > -import org.apache.samza.sql.api.data.Tuple; > >> > -import org.apache.samza.sql.api.operators.OperatorCallback; > >> > -import org.apache.samza.task.MessageCollector; > >> > -import org.apache.samza.task.TaskCoordinator; > >> > - > >> > - > >> > -public final class NoopOperatorCallback implements OperatorCallback { > >> > - > >> > - @Override > >> > - public Tuple beforeProcess(Tuple tuple, MessageCollector collector, > >> > TaskCoordinator coordinator) { > >> > - return tuple; > >> > - } > >> > - > >> > - @Override > >> > - public Relation beforeProcess(Relation rel, MessageCollector > >> collector, > >> > TaskCoordinator coordinator) { > >> > - return rel; > >> > - } > >> > - > >> > - @Override > >> > - public Tuple afterProcess(Tuple tuple, MessageCollector collector, > >> > TaskCoordinator coordinator) { > >> > - return tuple; > >> > - } > >> > - > >> > - @Override > >> > - public Relation afterProcess(Relation rel, MessageCollector > >>collector, > >> > TaskCoordinator coordinator) { > >> > - return rel; > >> > - } > >> > - > >> > -} > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl. > >>java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim > >>pleOperatorImpl.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim > >>pleOperatorImpl.java > >> > deleted file mode 100644 > >> > index e66451f..0000000 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim > >>pleOperatorImpl.java > >> > +++ /dev/null > >> > @@ -1,136 +0,0 @@ > >> > -/* > >> > - * Licensed to the Apache Software Foundation (ASF) under one > >> > - * or more contributor license agreements. See the NOTICE file > >> > - * distributed with this work for additional information > >> > - * regarding copyright ownership. The ASF licenses this file > >> > - * to you under the Apache License, Version 2.0 (the > >> > - * "License"); you may not use this file except in compliance > >> > - * with the License. You may obtain a copy of the License at > >> > - * > >> > - * http://www.apache.org/licenses/LICENSE-2.0 > >> > - * > >> > - * Unless required by applicable law or agreed to in writing, > >> > - * software distributed under the License is distributed on an > >> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > >> > - * KIND, either express or implied. See the License for the > >> > - * specific language governing permissions and limitations > >> > - * under the License. > >> > - */ > >> > - > >> > -package org.apache.samza.sql.operators.factory; > >> > - > >> > -import org.apache.samza.sql.api.data.Relation; > >> > -import org.apache.samza.sql.api.data.Tuple; > >> > -import org.apache.samza.sql.api.operators.OperatorCallback; > >> > -import org.apache.samza.sql.api.operators.OperatorSpec; > >> > -import org.apache.samza.sql.api.operators.SimpleOperator; > >> > -import org.apache.samza.task.MessageCollector; > >> > -import org.apache.samza.task.TaskCoordinator; > >> > -import org.apache.samza.task.sql.SimpleMessageCollector; > >> > - > >> > - > >> > -/** > >> > - * An abstract class that encapsulate the basic information and > >>methods > >> > that all operator classes should implement. > >> > - * It implements the interface {@link > >> > org.apache.samza.sql.api.operators.SimpleOperator} > >> > - * > >> > - */ > >> > -public abstract class SimpleOperatorImpl implements SimpleOperator { > >> > - /** > >> > - * The specification of this operator > >> > - */ > >> > - private final OperatorSpec spec; > >> > - > >> > - /** > >> > - * The callback function > >> > - */ > >> > - private final OperatorCallback callback; > >> > - > >> > - /** > >> > - * Ctor of {@code SimpleOperatorImpl} class > >> > - * > >> > - * @param spec The specification of this operator > >> > - */ > >> > - public SimpleOperatorImpl(OperatorSpec spec) { > >> > - this(spec, new NoopOperatorCallback()); > >> > - } > >> > - > >> > - public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback > >> callback) > >> > { > >> > - this.spec = spec; > >> > - this.callback = callback; > >> > - } > >> > - > >> > - @Override > >> > - public OperatorSpec getSpec() { > >> > - return this.spec; > >> > - } > >> > - > >> > - /** > >> > - * This method is made final s.t. the sequence of invocations > >>between > >> > {@link > >> > > >> > >>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relatio > >>n, > >> > MessageCollector, TaskCoordinator)} > >> > - * and real processing of the input relation is fixed. > >> > - */ > >> > - @Override > >> > - final public void process(Relation deltaRelation, MessageCollector > >> > collector, TaskCoordinator coordinator) > >> > - throws Exception { > >> > - Relation rel = this.callback.beforeProcess(deltaRelation, > >>collector, > >> > coordinator); > >> > - if (rel == null) { > >> > - return; > >> > - } > >> > - this.realProcess(rel, getCollector(collector, coordinator), > >> > coordinator); > >> > - } > >> > - > >> > - /** > >> > - * This method is made final s.t. the sequence of invocations > >>between > >> > {@link > >> > > >>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, > >> > MessageCollector, TaskCoordinator)} > >> > - * and real processing of the input tuple is fixed. > >> > - */ > >> > - @Override > >> > - final public void process(Tuple tuple, MessageCollector collector, > >> > TaskCoordinator coordinator) throws Exception { > >> > - Tuple ituple = this.callback.beforeProcess(tuple, collector, > >> > coordinator); > >> > - if (ituple == null) { > >> > - return; > >> > - } > >> > - this.realProcess(ituple, getCollector(collector, coordinator), > >> > coordinator); > >> > - } > >> > - > >> > - /** > >> > - * This method is made final s.t. we enforce the invocation of > >>{@code > >> > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} > >> before > >> > doing anything futher > >> > - */ > >> > - @Override > >> > - final public void refresh(long timeNano, MessageCollector > >>collector, > >> > TaskCoordinator coordinator) throws Exception { > >> > - this.realRefresh(timeNano, getCollector(collector, coordinator), > >> > coordinator); > >> > - } > >> > - > >> > - private SimpleMessageCollector getCollector(MessageCollector > >> collector, > >> > TaskCoordinator coordinator) { > >> > - if (!(collector instanceof SimpleMessageCollector)) { > >> > - return new SimpleMessageCollector(collector, coordinator, > >> > this.callback); > >> > - } else { > >> > - ((SimpleMessageCollector) > >> > collector).switchOperatorCallback(this.callback); > >> > - return (SimpleMessageCollector) collector; > >> > - } > >> > - } > >> > - > >> > - /** > >> > - * Method to be overriden by each specific implementation class of > >> > operator to handle timeout event > >> > - * > >> > - * @param timeNano The time in nanosecond when the timeout event > >> > occurred > >> > - * @param collector The {@link > >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context > >> > - * @param coordinator The {@link > >> org.apache.samza.task.TaskCoordinator} > >> > in the context > >> > - * @throws Exception Throws exception if failed to refresh the > >>results > >> > - */ > >> > - protected abstract void realRefresh(long timeNano, > >> > SimpleMessageCollector collector, TaskCoordinator coordinator) > >> > - throws Exception; > >> > - > >> > - /** > >> > - * Method to be overriden by each specific implementation class of > >> > operator to perform relational logic operation on an input {@link > >> > org.apache.samza.sql.api.data.Relation} > >> > - * > >> > - * @param rel The input relation > >> > - * @param collector The {@link > >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context > >> > - * @param coordinator The {@link > >> org.apache.samza.task.TaskCoordinator} > >> > in the context > >> > - * @throws Exception > >> > - */ > >> > - protected abstract void realProcess(Relation rel, > >> > SimpleMessageCollector collector, TaskCoordinator coordinator) > >> > - throws Exception; > >> > - > >> > - protected abstract void realProcess(Tuple ituple, > >> > SimpleMessageCollector collector, TaskCoordinator coordinator) > >> > - throws Exception; > >> > - > >> > -} > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec. > >>java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim > >>pleOperatorSpec.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim > >>pleOperatorSpec.java > >> > deleted file mode 100644 > >> > index 56753b6..0000000 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim > >>pleOperatorSpec.java > >> > +++ /dev/null > >> > @@ -1,106 +0,0 @@ > >> > -/* > >> > - * Licensed to the Apache Software Foundation (ASF) under one > >> > - * or more contributor license agreements. See the NOTICE file > >> > - * distributed with this work for additional information > >> > - * regarding copyright ownership. The ASF licenses this file > >> > - * to you under the Apache License, Version 2.0 (the > >> > - * "License"); you may not use this file except in compliance > >> > - * with the License. You may obtain a copy of the License at > >> > - * > >> > - * http://www.apache.org/licenses/LICENSE-2.0 > >> > - * > >> > - * Unless required by applicable law or agreed to in writing, > >> > - * software distributed under the License is distributed on an > >> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > >> > - * KIND, either express or implied. See the License for the > >> > - * specific language governing permissions and limitations > >> > - * under the License. > >> > - */ > >> > -package org.apache.samza.sql.operators.factory; > >> > - > >> > -import java.util.ArrayList; > >> > -import java.util.List; > >> > - > >> > -import org.apache.samza.sql.api.data.EntityName; > >> > -import org.apache.samza.sql.api.operators.OperatorSpec; > >> > - > >> > - > >> > -/** > >> > - * An abstract class that encapsulate the basic information and > >>methods > >> > that all specification of operators should implement. > >> > - * It implements {@link > >>org.apache.samza.sql.api.operators.OperatorSpec} > >> > - */ > >> > -public abstract class SimpleOperatorSpec implements OperatorSpec { > >> > - /** > >> > - * The identifier of the corresponding operator > >> > - */ > >> > - private final String id; > >> > - > >> > - /** > >> > - * The list of input entity names of the corresponding operator > >> > - */ > >> > - private final List<EntityName> inputs = new > >>ArrayList<EntityName>(); > >> > - > >> > - /** > >> > - * The list of output entity names of the corresponding operator > >> > - */ > >> > - private final List<EntityName> outputs = new > >>ArrayList<EntityName>(); > >> > - > >> > - /** > >> > - * Ctor of the {@code SimpleOperatorSpec} for simple {@link > >> > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and > >>one > >> > output > >> > - * > >> > - * @param id Unique identifier of the {@link > >> > org.apache.samza.sql.api.operators.SimpleOperator} object > >> > - * @param input The only input entity > >> > - * @param output The only output entity > >> > - */ > >> > - public SimpleOperatorSpec(String id, EntityName input, EntityName > >> > output) { > >> > - this.id = id; > >> > - this.inputs.add(input); > >> > - this.outputs.add(output); > >> > - } > >> > - > >> > - /** > >> > - * Ctor of {@code SimpleOperatorSpec} with general format: m inputs > >> and > >> > n outputs > >> > - * > >> > - * @param id Unique identifier of the {@link > >> > org.apache.samza.sql.api.operators.SimpleOperator} object > >> > - * @param inputs The list of input entities > >> > - * @param output The list of output entities > >> > - */ > >> > - public SimpleOperatorSpec(String id, List<EntityName> inputs, > >> > EntityName output) { > >> > - this.id = id; > >> > - this.inputs.addAll(inputs); > >> > - this.outputs.add(output); > >> > - } > >> > - > >> > - @Override > >> > - public String getId() { > >> > - return this.id; > >> > - } > >> > - > >> > - @Override > >> > - public List<EntityName> getInputNames() { > >> > - return this.inputs; > >> > - } > >> > - > >> > - @Override > >> > - public List<EntityName> getOutputNames() { > >> > - return this.outputs; > >> > - } > >> > - > >> > - /** > >> > - * Method to get the first output entity > >> > - * > >> > - * @return The first output entity name > >> > - */ > >> > - public EntityName getOutputName() { > >> > - return this.outputs.get(0); > >> > - } > >> > - > >> > - /** > >> > - * Method to get the first input entity > >> > - * > >> > - * @return The first input entity name > >> > - */ > >> > - public EntityName getInputName() { > >> > - return this.inputs.get(0); > >> > - } > >> > -} > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim > >>pleRouter.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim > >>pleRouter.java > >> > deleted file mode 100644 > >> > index e570897..0000000 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim > >>pleRouter.java > >> > +++ /dev/null > >> > @@ -1,136 +0,0 @@ > >> > -/* > >> > - * Licensed to the Apache Software Foundation (ASF) under one > >> > - * or more contributor license agreements. See the NOTICE file > >> > - * distributed with this work for additional information > >> > - * regarding copyright ownership. The ASF licenses this file > >> > - * to you under the Apache License, Version 2.0 (the > >> > - * "License"); you may not use this file except in compliance > >> > - * with the License. You may obtain a copy of the License at > >> > - * > >> > - * http://www.apache.org/licenses/LICENSE-2.0 > >> > - * > >> > - * Unless required by applicable law or agreed to in writing, > >> > - * software distributed under the License is distributed on an > >> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > >> > - * KIND, either express or implied. See the License for the > >> > - * specific language governing permissions and limitations > >> > - * under the License. > >> > - */ > >> > - > >> > -package org.apache.samza.sql.operators.factory; > >> > - > >> > -import java.util.ArrayList; > >> > -import java.util.HashMap; > >> > -import java.util.HashSet; > >> > -import java.util.Iterator; > >> > -import java.util.List; > >> > -import java.util.Map; > >> > -import java.util.Set; > >> > - > >> > -import org.apache.samza.config.Config; > >> > -import org.apache.samza.sql.api.data.EntityName; > >> > -import org.apache.samza.sql.api.data.Relation; > >> > -import org.apache.samza.sql.api.data.Tuple; > >> > -import org.apache.samza.sql.api.operators.Operator; > >> > -import org.apache.samza.sql.api.operators.OperatorRouter; > >> > -import org.apache.samza.sql.api.operators.SimpleOperator; > >> > -import org.apache.samza.task.MessageCollector; > >> > -import org.apache.samza.task.TaskContext; > >> > -import org.apache.samza.task.TaskCoordinator; > >> > -import org.apache.samza.task.sql.RouterMessageCollector; > >> > - > >> > - > >> > -/** > >> > - * Example implementation of {@link > >> > org.apache.samza.sql.api.operators.OperatorRouter} > >> > - * > >> > - */ > >> > -public final class SimpleRouter implements OperatorRouter { > >> > - /** > >> > - * List of operators added to the {@link > >> > org.apache.samza.sql.api.operators.OperatorRouter} > >> > - */ > >> > - private List<SimpleOperator> operators = new > >> > ArrayList<SimpleOperator>(); > >> > - > >> > - @SuppressWarnings("rawtypes") > >> > - /** > >> > - * Map of {@link org.apache.samza.sql.api.data.EntityName} to the > >>list > >> > of operators associated with it > >> > - */ > >> > - private Map<EntityName, List> nextOps = new HashMap<EntityName, > >> List>(); > >> > - > >> > - /** > >> > - * Set of {@link org.apache.samza.sql.api.data.EntityName} as > >>inputs > >> to > >> > this {@code SimpleRouter} > >> > - */ > >> > - private Set<EntityName> inputEntities = new HashSet<EntityName>(); > >> > - > >> > - /** > >> > - * Set of entities that are not input entities to this {@code > >> > SimpleRouter} > >> > - */ > >> > - private Set<EntityName> outputEntities = new HashSet<EntityName>(); > >> > - > >> > - @SuppressWarnings("unchecked") > >> > - private void addOperator(EntityName input, SimpleOperator nextOp) { > >> > - if (nextOps.get(input) == null) { > >> > - nextOps.put(input, new ArrayList<Operator>()); > >> > - } > >> > - nextOps.get(input).add(nextOp); > >> > - operators.add(nextOp); > >> > - // get the operator spec > >> > - for (EntityName output : nextOp.getSpec().getOutputNames()) { > >> > - if (inputEntities.contains(output)) { > >> > - inputEntities.remove(output); > >> > - } > >> > - outputEntities.add(output); > >> > - } > >> > - if (!outputEntities.contains(input)) { > >> > - inputEntities.add(input); > >> > - } > >> > - } > >> > - > >> > - @Override > >> > - @SuppressWarnings("unchecked") > >> > - public List<SimpleOperator> getNextOperators(EntityName entity) { > >> > - return nextOps.get(entity); > >> > - } > >> > - > >> > - @Override > >> > - public void addOperator(SimpleOperator nextOp) { > >> > - List<EntityName> inputs = nextOp.getSpec().getInputNames(); > >> > - for (EntityName input : inputs) { > >> > - addOperator(input, nextOp); > >> > - } > >> > - } > >> > - > >> > - @Override > >> > - public void init(Config config, TaskContext context) throws > >>Exception > >> { > >> > - for (SimpleOperator op : this.operators) { > >> > - op.init(config, context); > >> > - } > >> > - } > >> > - > >> > - @Override > >> > - public void process(Tuple ituple, MessageCollector collector, > >> > TaskCoordinator coordinator) throws Exception { > >> > - MessageCollector opCollector = new > >>RouterMessageCollector(collector, > >> > coordinator, this); > >> > - for (Iterator<SimpleOperator> iter = > >> > this.getNextOperators(ituple.getEntityName()).iterator(); > >> iter.hasNext();) { > >> > - iter.next().process(ituple, opCollector, coordinator); > >> > - } > >> > - } > >> > - > >> > - @SuppressWarnings("rawtypes") > >> > - @Override > >> > - public void process(Relation deltaRelation, MessageCollector > >> collector, > >> > TaskCoordinator coordinator) throws Exception { > >> > - MessageCollector opCollector = new > >>RouterMessageCollector(collector, > >> > coordinator, this); > >> > - for (Iterator<SimpleOperator> iter = > >> > this.getNextOperators(deltaRelation.getName()).iterator(); > >> iter.hasNext();) > >> > { > >> > - iter.next().process(deltaRelation, opCollector, coordinator); > >> > - } > >> > - } > >> > - > >> > - @Override > >> > - public void refresh(long nanoSec, MessageCollector collector, > >> > TaskCoordinator coordinator) throws Exception { > >> > - MessageCollector opCollector = new > >>RouterMessageCollector(collector, > >> > coordinator, this); > >> > - for (EntityName entity : inputEntities) { > >> > - for (Iterator<SimpleOperator> iter = > >> > this.getNextOperators(entity).iterator(); iter.hasNext();) { > >> > - iter.next().refresh(nanoSec, opCollector, coordinator); > >> > - } > >> > - } > >> > - } > >> > - > >> > -} > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.jav > >>a > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Top > >>ologyBuilder.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Top > >>ologyBuilder.java > >> > new file mode 100644 > >> > index 0000000..62b19fc > >> > --- /dev/null > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Top > >>ologyBuilder.java > >> > @@ -0,0 +1,284 @@ > >> > +/* > >> > + * Licensed to the Apache Software Foundation (ASF) under one > >> > + * or more contributor license agreements. See the NOTICE file > >> > + * distributed with this work for additional information > >> > + * regarding copyright ownership. The ASF licenses this file > >> > + * to you under the Apache License, Version 2.0 (the > >> > + * "License"); you may not use this file except in compliance > >> > + * with the License. You may obtain a copy of the License at > >> > + * > >> > + * http://www.apache.org/licenses/LICENSE-2.0 > >> > + * > >> > + * Unless required by applicable law or agreed to in writing, > >> > + * software distributed under the License is distributed on an > >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > >> > + * KIND, either express or implied. See the License for the > >> > + * specific language governing permissions and limitations > >> > + * under the License. > >> > + */ > >> > +package org.apache.samza.sql.operators.factory; > >> > + > >> > +import java.util.HashMap; > >> > +import java.util.HashSet; > >> > +import java.util.Iterator; > >> > +import java.util.List; > >> > +import java.util.Map; > >> > +import java.util.Set; > >> > + > >> > +import org.apache.samza.sql.api.data.EntityName; > >> > +import org.apache.samza.sql.api.operators.OperatorRouter; > >> > +import org.apache.samza.sql.api.operators.OperatorSink; > >> > +import org.apache.samza.sql.api.operators.OperatorSource; > >> > +import org.apache.samza.sql.api.operators.OperatorSpec; > >> > +import org.apache.samza.sql.api.operators.SimpleOperator; > >> > +import org.apache.samza.sql.api.operators.SqlOperatorFactory; > >> > +import org.apache.samza.sql.operators.OperatorTopology; > >> > +import org.apache.samza.sql.operators.SimpleRouter; > >> > + > >> > + > >> > +/** > >> > + * This class implements a builder to allow user to create the > >>operators > >> > and connect them in a topology altogether. > >> > + */ > >> > +public class TopologyBuilder { > >> > + > >> > + /** > >> > + * Internal {@link > >>org.apache.samza.sql.api.operators.OperatorRouter} > >> > object to retain the topology being created > >> > + */ > >> > + private SimpleRouter router; > >> > + > >> > + /** > >> > + * The {@link > >>org.apache.samza.sql.api.operators.SqlOperatorFactory} > >> > object used to create operators connected in the topology > >> > + */ > >> > + private final SqlOperatorFactory factory; > >> > + > >> > + /** > >> > + * The map of unbound inputs, the value is set(input_operators) > >> > + */ > >> > + private Map<EntityName, Set<OperatorSpec>> unboundInputs = new > >> > HashMap<EntityName, Set<OperatorSpec>>(); > >> > + > >> > + /** > >> > + * The map of unbound outputs, the value is the operator generating > >> the > >> > output > >> > + */ > >> > + private Map<EntityName, OperatorSpec> unboundOutputs = new > >> > HashMap<EntityName, OperatorSpec>(); > >> > + > >> > + /** > >> > + * The set of entities that are intermediate entities between > >> operators > >> > + */ > >> > + private Set<EntityName> interStreams = new HashSet<EntityName>(); > >> > + > >> > + /** > >> > + * The current operator that may have unbound input or output > >> > + */ > >> > + private SimpleOperator currentOp = null; > >> > + > >> > + /** > >> > + * Private constructor of {@code TopologyBuilder} > >> > + * > >> > + * @param factory The {@link > >> > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create > >> operators > >> > + */ > >> > + private TopologyBuilder(SqlOperatorFactory factory) { > >> > + this.router = new SimpleRouter(); > >> > + this.factory = factory; > >> > + } > >> > + > >> > + /** > >> > + * Static method to create this {@code TopologyBuilder} w/ a > >> customized > >> > {@link org.apache.samza.sql.api.operators.SqlOperatorFactory} > >> > + * > >> > + * @param factory The {@link > >> > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create > >> operators > >> > + * @return The {@code TopologyBuilder} object > >> > + */ > >> > + public static TopologyBuilder create(SqlOperatorFactory factory) { > >> > + return new TopologyBuilder(factory); > >> > + } > >> > + > >> > + /** > >> > + * Static method to create this {@code TopologyBuilder} > >> > + * > >> > + * @return The {@code TopologyBuilder} object > >> > + */ > >> > + public static TopologyBuilder create() { > >> > + return new TopologyBuilder(new SimpleOperatorFactoryImpl()); > >> > + } > >> > + > >> > + /** > >> > + * Public method to create the next operator and attach it to the > >> > output of the current operator > >> > + * > >> > + * @param spec The {@link > >> > org.apache.samza.sql.api.operators.OperatorSpec} for the next operator > >> > + * @return The updated {@code TopologyBuilder} object > >> > + */ > >> > + public TopologyBuilder operator(OperatorSpec spec) { > >> > + // check whether it is valid to connect a new operator to the > >> current > >> > operator's output > >> > + SimpleOperator nextOp = this.factory.getOperator(spec); > >> > + return this.operator(nextOp); > >> > + } > >> > + > >> > + /** > >> > + * Public method to create the next operator and attach it to the > >> > output of the current operator > >> > + * > >> > + * @param op The {@link > >> > org.apache.samza.sql.api.operators.SimpleOperator} > >> > + * @return The updated {@code TopologyBuilder} object > >> > + */ > >> > + public TopologyBuilder operator(SimpleOperator op) { > >> > + // check whether it is valid to connect a new operator to the > >> current > >> > operator's output > >> > + canAddOperator(op); > >> > + this.addOperator(op); > >> > + // advance the current operator position > >> > + this.currentOp = op; > >> > + return this; > >> > + } > >> > + > >> > + /** > >> > + * Public method to create a stream object that will be the source > >>to > >> > other operators > >> > + * > >> > + * @return The {@link > >> > org.apache.samza.sql.api.operators.OperatorSource} that can be the > >>source > >> > to other operators > >> > + */ > >> > + public OperatorSource stream() { > >> > + canCreateSource(); > >> > + return new > >> > OperatorTopology(this.unboundOutputs.keySet().iterator().next(), > >> > this.router); > >> > + } > >> > + > >> > + /** > >> > + * Public method to create a sink object that can take input stream > >> > from other operators > >> > + * > >> > + * @return The {@link > >>org.apache.samza.sql.api.operators.OperatorSink} > >> > that can be the downstream of other operators > >> > + */ > >> > + public OperatorSink sink() { > >> > + canCreateSink(); > >> > + return new > >> > OperatorTopology(this.unboundInputs.keySet().iterator().next(), > >> > this.router); > >> > + } > >> > + > >> > + /** > >> > + * Public method to bind the input of the current operator w/ the > >> > {@link org.apache.samza.sql.api.operators.OperatorSource} object > >> > + * > >> > + * @param srcStream The {@link > >> > org.apache.samza.sql.api.operators.OperatorSource} that the current > >> > operator is going to be bound to > >> > + * @return The updated {@code TopologyBuilder} object > >> > + */ > >> > + public TopologyBuilder bind(OperatorSource srcStream) { > >> > + EntityName streamName = srcStream.getName(); > >> > + if (this.unboundInputs.containsKey(streamName)) { > >> > + this.unboundInputs.remove(streamName); > >> > + this.interStreams.add(streamName); > >> > + } else { > >> > + // no input operator is waiting for the output from the > >>srcStream > >> > + throw new IllegalArgumentException("No operator input can be > >>bound > >> > to the input stream " + streamName); > >> > + } > >> > + // add all operators in srcStream to this topology > >> > + for (Iterator<SimpleOperator> iter = srcStream.opIterator(); > >> > iter.hasNext();) { > >> > + this.addOperator(iter.next()); > >> > + } > >> > + return this; > >> > + } > >> > + > >> > + /** > >> > + * Public method to attach a {@link > >> > org.apache.samza.sql.api.operators.OperatorSink} object to the output > >>of > >> > the current operator > >> > + * > >> > + * @param nextSink The {@link > >> > org.apache.samza.sql.api.operators.OperatorSink} to be attached to the > >> > current operator's output > >> > + * @return The updated {@code TopologyBuilder} object > >> > + */ > >> > + public TopologyBuilder attach(OperatorSink nextSink) { > >> > + EntityName streamName = nextSink.getName(); > >> > + if (this.unboundOutputs.containsKey(streamName)) { > >> > + this.unboundOutputs.remove(streamName); > >> > + this.interStreams.add(streamName); > >> > + } else { > >> > + // no unbound output to attach to > >> > + throw new IllegalArgumentException("No operator output found to > >> > attach the sink " + streamName); > >> > + } > >> > + // add all operators in nextSink to the router > >> > + for (Iterator<SimpleOperator> iter = nextSink.opIterator(); > >> > iter.hasNext();) { > >> > + this.addOperator(iter.next()); > >> > + } > >> > + return this; > >> > + } > >> > + > >> > + /** > >> > + * Public method to finalize the topology that should have all > >>input > >> > and output bound to system input and output > >> > + * > >> > + * @return The finalized {@link > >> > org.apache.samza.sql.api.operators.OperatorRouter} object > >> > + */ > >> > + public OperatorRouter build() { > >> > + canClose(); > >> > + return router; > >> > + } > >> > + > >> > + private TopologyBuilder addOperator(SimpleOperator nextOp) { > >> > + // if input is not in the unboundOutputs and interStreams, input > >>is > >> > unbound > >> > + for (EntityName in : nextOp.getSpec().getInputNames()) { > >> > + if (this.unboundOutputs.containsKey(in)) { > >> > + this.unboundOutputs.remove(in); > >> > + this.interStreams.add(in); > >> > + } > >> > + if (!this.interStreams.contains(in) && !in.isSystemEntity()) { > >> > + if (!this.unboundInputs.containsKey(in)) { > >> > + this.unboundInputs.put(in, new HashSet<OperatorSpec>()); > >> > + } > >> > + this.unboundInputs.get(in).add(nextOp.getSpec()); > >> > + } > >> > + } > >> > + // if output is not in the unboundInputs and interStreams, > >>output is > >> > unbound > >> > + for (EntityName out : nextOp.getSpec().getOutputNames()) { > >> > + if (this.unboundInputs.containsKey(out)) { > >> > + this.unboundInputs.remove(out); > >> > + this.interStreams.add(out); > >> > + } > >> > + if (!this.interStreams.contains(out) && !out.isSystemEntity()) > >>{ > >> > + this.unboundOutputs.put(out, nextOp.getSpec()); > >> > + } > >> > + } > >> > + try { > >> > + this.router.addOperator(nextOp); > >> > + } catch (Exception e) { > >> > + throw new RuntimeException("Failed to add operator " + > >> > nextOp.getSpec().getId() + " to the topology.", e); > >> > + } > >> > + return this; > >> > + } > >> > + > >> > + private void canCreateSource() { > >> > + if (this.unboundInputs.size() > 0) { > >> > + throw new IllegalStateException("Can't create stream when there > >> are > >> > unbounded input streams in the topology"); > >> > + } > >> > + if (this.unboundOutputs.size() != 1) { > >> > + throw new IllegalStateException( > >> > + "Can't create stream when the number of unbounded outputs > >>is > >> > not 1 in the topology"); > >> > + } > >> > + } > >> > + > >> > + private void canCreateSink() { > >> > + if (this.unboundOutputs.size() > 0) { > >> > + throw new IllegalStateException("Can't create sink when there > >>are > >> > unbounded output streams in the topology"); > >> > + } > >> > + if (this.unboundInputs.size() != 1) { > >> > + throw new IllegalStateException( > >> > + "Can't create sink when the number of unbounded input > >>streams > >> > is not 1 in the topology"); > >> > + } > >> > + } > >> > + > >> > + private void canAddOperator(SimpleOperator op) { > >> > + if (this.currentOp == null) { > >> > + return; > >> > + } > >> > + for (EntityName name : this.currentOp.getSpec().getInputNames()) > >>{ > >> > + if (this.unboundInputs.containsKey(name)) { > >> > + throw new IllegalArgumentException("There are unbound input > >>" + > >> > name + " to the current operator " > >> > + + this.currentOp.getSpec().getId() + ". Create a sink or > >> call > >> > bind instead"); > >> > + } > >> > + } > >> > + List<EntityName> nextInputs = op.getSpec().getInputNames(); > >> > + for (EntityName name : > >>this.currentOp.getSpec().getOutputNames()) { > >> > + if (!nextInputs.contains(name) && > >> > this.unboundOutputs.containsKey(name)) { > >> > + // the current operator's output is not in the next > >>operator's > >> > input list > >> > + throw new IllegalArgumentException("There are unbound output > >>" + > >> > name + " from the current operator " > >> > + + this.currentOp.getSpec().getId() > >> > + + " that are not included in the next operator's inputs. > >> > Create a stream or call attach instead"); > >> > + } > >> > + } > >> > + } > >> > + > >> > + private void canClose() { > >> > + if (!this.unboundInputs.isEmpty() || > >> !this.unboundOutputs.isEmpty()) { > >> > + throw new IllegalStateException( > >> > + "There are input/output streams in the topology that are > >>not > >> > bounded. Can't build the topology yet."); > >> > + } > >> > + } > >> > + > >> > +} > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream > >>StreamJoin.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream > >>StreamJoin.java > >> > index 2854aeb..7f5b990 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream > >>StreamJoin.java > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream > >>StreamJoin.java > >> > @@ -29,7 +29,7 @@ import org.apache.samza.sql.api.data.Relation; > >> > import org.apache.samza.sql.api.data.Stream; > >> > import org.apache.samza.sql.api.data.Tuple; > >> > import org.apache.samza.sql.api.operators.OperatorCallback; > >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; > >> > +import org.apache.samza.sql.operators.SimpleOperatorImpl; > >> > import org.apache.samza.sql.operators.window.BoundedTimeWindow; > >> > import org.apache.samza.sql.window.storage.OrderedStoreKey; > >> > import org.apache.samza.storage.kv.Entry; > >> > @@ -38,7 +38,6 @@ import org.apache.samza.task.TaskContext; > >> > import org.apache.samza.task.TaskCoordinator; > >> > import org.apache.samza.task.sql.SimpleMessageCollector; > >> > > >> > - > >> > /** > >> > * This class implements a simple stream-to-stream join > >> > */ > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.j > >>ava > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream > >>StreamJoinSpec.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream > >>StreamJoinSpec.java > >> > index cc0aca0..eecff7e 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream > >>StreamJoinSpec.java > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream > >>StreamJoinSpec.java > >> > @@ -19,10 +19,11 @@ > >> > > >> > package org.apache.samza.sql.operators.join; > >> > > >> > +import java.util.ArrayList; > >> > import java.util.List; > >> > > >> > import org.apache.samza.sql.api.data.EntityName; > >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; > >> > +import org.apache.samza.sql.operators.SimpleOperatorSpec; > >> > > >> > > >> > /** > >> > @@ -35,4 +36,16 @@ public class StreamStreamJoinSpec extends > >> > SimpleOperatorSpec { > >> > // TODO Auto-generated constructor stub > >> > } > >> > > >> > + @SuppressWarnings("serial") > >> > + public StreamStreamJoinSpec(String id, List<String> inputRelations, > >> > String output, List<String> joinKeys) { > >> > + super(id, new ArrayList<EntityName>() { > >> > + { > >> > + for (String input : inputRelations) { > >> > + add(EntityName.getStreamName(input)); > >> > + } > >> > + } > >> > + }, EntityName.getStreamName(output)); > >> > + // TODO Auto-generated constructor stub > >> > + } > >> > + > >> > } > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P > >>artitionOp.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P > >>artitionOp.java > >> > index b93d789..0cba39a 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P > >>artitionOp.java > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P > >>artitionOp.java > >> > @@ -23,7 +23,7 @@ import org.apache.samza.config.Config; > >> > import org.apache.samza.sql.api.data.Relation; > >> > import org.apache.samza.sql.api.data.Tuple; > >> > import org.apache.samza.sql.api.operators.OperatorCallback; > >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; > >> > +import org.apache.samza.sql.operators.SimpleOperatorImpl; > >> > import org.apache.samza.storage.kv.Entry; > >> > import org.apache.samza.storage.kv.KeyValueIterator; > >> > import org.apache.samza.system.OutgoingMessageEnvelope; > >> > @@ -32,7 +32,6 @@ import org.apache.samza.task.TaskContext; > >> > import org.apache.samza.task.TaskCoordinator; > >> > import org.apache.samza.task.sql.SimpleMessageCollector; > >> > > >> > - > >> > /** > >> > * This is an example build-in operator that performs a simple stream > >> > re-partition operation. > >> > * > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.jav > >>a > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P > >>artitionSpec.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P > >>artitionSpec.java > >> > index c47eed9..e494bff 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P > >>artitionSpec.java > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P > >>artitionSpec.java > >> > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.partition; > >> > > >> > import org.apache.samza.sql.api.data.EntityName; > >> > import org.apache.samza.sql.api.operators.OperatorSpec; > >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; > >> > +import org.apache.samza.sql.operators.SimpleOperatorSpec; > >> > import org.apache.samza.system.SystemStream; > >> > > >> > > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.ja > >>va > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun > >>dedTimeWindow.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun > >>dedTimeWindow.java > >> > index d81cc93..a9a83b5 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun > >>dedTimeWindow.java > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun > >>dedTimeWindow.java > >> > @@ -27,13 +27,12 @@ import org.apache.samza.sql.api.data.EntityName; > >> > import org.apache.samza.sql.api.data.Relation; > >> > import org.apache.samza.sql.api.data.Tuple; > >> > import org.apache.samza.sql.api.operators.OperatorCallback; > >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; > >> > +import org.apache.samza.sql.operators.SimpleOperatorImpl; > >> > import org.apache.samza.storage.kv.KeyValueIterator; > >> > import org.apache.samza.task.TaskContext; > >> > import org.apache.samza.task.TaskCoordinator; > >> > import org.apache.samza.task.sql.SimpleMessageCollector; > >> > > >> > - > >> > /** > >> > * This class defines an example build-in operator for a fixed size > >> > window operator that converts a stream to a relation > >> > * > >> > @@ -86,6 +85,7 @@ public class BoundedTimeWindow extends > >> > SimpleOperatorImpl { > >> > * @param lengthSec The window size in seconds > >> > * @param input The input stream name > >> > * @param output The output relation name > >> > + * @param callback The user callback object > >> > */ > >> > public BoundedTimeWindow(String wndId, int lengthSec, String input, > >> > String output, OperatorCallback callback) { > >> > super(new WindowSpec(wndId, EntityName.getStreamName(input), > >> > EntityName.getStreamName(output), lengthSec), callback); > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind > >>owSpec.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind > >>owSpec.java > >> > index eec32ea..6c4eba8 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind > >>owSpec.java > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind > >>owSpec.java > >> > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.window; > >> > > >> > import org.apache.samza.sql.api.data.EntityName; > >> > import org.apache.samza.sql.api.operators.OperatorSpec; > >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; > >> > +import org.apache.samza.sql.operators.SimpleOperatorSpec; > >> > > >> > > >> > /** > >> > @@ -47,6 +47,11 @@ public class WindowSpec extends SimpleOperatorSpec > >> > implements OperatorSpec { > >> > this.wndSizeSec = lengthSec; > >> > } > >> > > >> > + public WindowSpec(String id, int wndSize, String input) { > >> > + super(id, EntityName.getStreamName(input), null); > >> > + this.wndSizeSec = wndSize; > >> > + } > >> > + > >> > /** > >> > * Method to get the window state relation name > >> > * > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol > >>lector.java > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol > >>lector.java > >> > index b29838a..6950f67 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol > >>lector.java > >> > +++ > >> > > >> > >>b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol > >>lector.java > >> > @@ -22,7 +22,7 @@ package org.apache.samza.task.sql; > >> > import org.apache.samza.sql.api.data.Relation; > >> > import org.apache.samza.sql.api.data.Tuple; > >> > import org.apache.samza.sql.api.operators.OperatorCallback; > >> > -import org.apache.samza.sql.operators.factory.NoopOperatorCallback; > >> > +import org.apache.samza.sql.operators.NoopOperatorCallback; > >> > import org.apache.samza.storage.kv.Entry; > >> > import org.apache.samza.storage.kv.KeyValueIterator; > >> > import org.apache.samza.system.OutgoingMessageEnvelope; > >> > @@ -57,25 +57,38 @@ public class SimpleMessageCollector implements > >> > MessageCollector { > >> > * @param coordinator The {@link > >> org.apache.samza.task.TaskCoordinator} > >> > in the context > >> > */ > >> > public SimpleMessageCollector(MessageCollector collector, > >> > TaskCoordinator coordinator) { > >> > - this.collector = collector; > >> > - this.coordinator = coordinator; > >> > + this(collector, coordinator, new NoopOperatorCallback()); > >> > } > >> > > >> > /** > >> > * This method swaps the {@code callback} with the new one > >> > * > >> > - * <p> This method allows the {@link > >> > org.apache.samza.sql.api.operators.SimpleOperator} to be swapped when > >>the > >> > collector > >> > - * is passed down into the next operator's context. Hence, under > >>the > >> > new operator's context, the correct {@link > >> > > >> > >>org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Relation > >>, > >> > MessageCollector, TaskCoordinator)}, > >> > - * and {@link > >> > > >>org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Tuple, > >> > MessageCollector, TaskCoordinator)} can be invoked > >> > + * <p> This method allows the {@link > >> > org.apache.samza.sql.api.operators.OperatorCallback} to be swapped > >>when > >> the > >> > collector > >> > + * is passed down into the next operator's context. Hence, under > >>the > >> > new operator's context, the correct callback functions can be invoked > >> > * > >> > * @param callback The new {@link > >> > org.apache.samza.sql.api.operators.OperatorCallback} to be set > >> > */ > >> > - public void switchOperatorCallback(OperatorCallback callback) { > >> > - this.callback = callback; > >> > + public void switchCallback(OperatorCallback callback) { > >> > + if (callback == null) { > >> > + this.callback = new NoopOperatorCallback(); > >> > + } else { > >> > + this.callback = callback; > >> > + } > >> > + } > >> > + > >> > + /** > >> > + * Method is declared to be final s.t. we enforce that the callback > >> > functions are called first > >> > + */ > >> > + @Override > >> > + final public void send(OutgoingMessageEnvelope envelope) { > >> > + this.collector.send(envelope); > >> > } > >> > > >> > /** > >> > * Method is declared to be final s.t. we enforce that the callback > >> > functions are called first > >> > + * > >> > + * @param deltaRelation The relation to be sent out > >> > + * @throws Exception Throws exception if failed to send > >> > */ > >> > final public void send(Relation deltaRelation) throws Exception { > >> > Relation rel = this.callback.afterProcess(deltaRelation, > >>collector, > >> > coordinator); > >> > @@ -87,6 +100,9 @@ public class SimpleMessageCollector implements > >> > MessageCollector { > >> > > >> > /** > >> > * Method is declared to be final s.t. we enforce that the callback > >> > functions are called first > >> > + * > >> > + * @param tuple The tuple to be sent out > >> > + * @throws Exception Throws exception if failed to send > >> > */ > >> > final public void send(Tuple tuple) throws Exception { > >> > Tuple otuple = this.callback.afterProcess(tuple, collector, > >> > coordinator); > >> > @@ -106,9 +122,4 @@ public class SimpleMessageCollector implements > >> > MessageCollector { > >> > protected void realSend(Tuple tuple) throws Exception { > >> > this.collector.send((OutgoingMessageEnvelope) > >>tuple.getMessage()); > >> > } > >> > - > >> > - @Override > >> > - public void send(OutgoingMessageEnvelope envelope) { > >> > - this.collector.send(envelope); > >> > - } > >> > } > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper > >>atorTask.java > >> > > >> > >>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper > >>atorTask.java > >> > index 20dc701..7370af6 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper > >>atorTask.java > >> > +++ > >> > > >> > >>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper > >>atorTask.java > >> > @@ -22,6 +22,7 @@ package org.apache.samza.task.sql; > >> > import org.apache.samza.config.Config; > >> > import org.apache.samza.sql.api.data.Relation; > >> > import org.apache.samza.sql.api.data.Tuple; > >> > +import org.apache.samza.sql.api.operators.Operator; > >> > import org.apache.samza.sql.api.operators.OperatorCallback; > >> > import org.apache.samza.sql.data.IncomingMessageTuple; > >> > import org.apache.samza.sql.operators.window.BoundedTimeWindow; > >> > @@ -39,7 +40,7 @@ import org.apache.samza.task.WindowableTask; > >> > * > >> > */ > >> > public class RandomWindowOperatorTask implements StreamTask, > >> > InitableTask, WindowableTask { > >> > - private BoundedTimeWindow wndOp; > >> > + private Operator operator; > >> > > >> > private final OperatorCallback wndCallback = new > >>OperatorCallback() { > >> > > >> > @@ -77,20 +78,20 @@ public class RandomWindowOperatorTask implements > >> > StreamTask, InitableTask, Windo > >> > public void process(IncomingMessageEnvelope envelope, > >>MessageCollector > >> > collector, TaskCoordinator coordinator) > >> > throws Exception { > >> > // based on tuple's stream name, get the window op and run > >>process() > >> > - wndOp.process(new IncomingMessageTuple(envelope), collector, > >> > coordinator); > >> > + operator.process(new IncomingMessageTuple(envelope), collector, > >> > coordinator); > >> > > >> > } > >> > > >> > @Override > >> > public void window(MessageCollector collector, TaskCoordinator > >> > coordinator) throws Exception { > >> > // based on tuple's stream name, get the window op and run > >>process() > >> > - wndOp.refresh(System.nanoTime(), collector, coordinator); > >> > + operator.refresh(System.nanoTime(), collector, coordinator); > >> > } > >> > > >> > @Override > >> > public void init(Config config, TaskContext context) throws > >>Exception > >> { > >> > // 1. create a fixed length 10 sec window operator > >> > - this.wndOp = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", > >> > "relation1", this.wndCallback); > >> > - this.wndOp.init(config, context); > >> > + this.operator = new BoundedTimeWindow("wndOp1", 10, > >>"kafka:stream1", > >> > "wndOutput", this.wndCallback); > >> > + this.operator.init(config, context); > >> > } > >> > } > >> > > >> > > >> > > >> > >> > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core > >>/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > >> > ---------------------------------------------------------------------- > >> > diff --git > >> > > >> > >>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja > >>va > >> > > >> > >>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja > >>va > >> > index 9124e3c..d65892c 100644 > >> > --- > >> > > >> > >>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja > >>va > >> > +++ > >> > > >> > >>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja > >>va > >> > @@ -24,7 +24,7 @@ import java.util.List; > >> > > >> > import org.apache.samza.config.Config; > >> > import org.apache.samza.sql.data.IncomingMessageTuple; > >> > -import org.apache.samza.sql.operators.factory.SimpleRouter; > >> > +import org.apache.samza.sql.operators.SimpleRouter; > >> > import org.apache.samza.sql.operators.join.StreamStreamJoin; > >> > import org.apache.samza.sql.operators.partition.PartitionOp; > >> > import org.apache.samza.sql.operators.window.BoundedTimeWindow; > >> > @@ -51,25 +51,25 @@ import org.apache.samza.task.WindowableTask; > >> > */ > >> > public class StreamSqlTask implements StreamTask, InitableTask, > >> > WindowableTask { > >> > > >> > - private SimpleRouter rteCntx; > >> > + private SimpleRouter router; > >> > > >> > @Override > >> > public void process(IncomingMessageEnvelope envelope, > >>MessageCollector > >> > collector, TaskCoordinator coordinator) > >> > throws Exception { > >> > - this.rteCntx.process(new IncomingMessageTuple(envelope), > >>collector, > >> > coordinator); > >> > + this.router.process(new IncomingMessageTuple(envelope), > >>collector, > >> > coordinator); > >> > } > >> > > >> > @Override > >> > public void window(MessageCollector collector, TaskCoordinator > >> > coordinator) throws Exception { > >> > - this.rteCntx.refresh(System.nanoTime(), collector, coordinator); > >> > + this.router.refresh(System.nanoTime(), collector, coordinator); > >> > } > >> > > >> > @Override > >> > public void init(Config config, TaskContext context) throws > >>Exception > >> { > >> > // create all operators via the operator factory > >> > // 1. create two window operators > >> > - BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10, > >> > "inputStream1", "fixedWndOutput1"); > >> > - BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10, > >> > "inputStream2", "fixedWndOutput2"); > >> > + BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10, > >> > "kafka:inputStream1", "fixedWndOutput1"); > >> > + BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10, > >> > "kafka:inputStream2", "fixedWndOutput2"); > >> > // 2. create one join operator > >> > @SuppressWarnings("serial") > >> > List<String> inputRelations = new ArrayList<String>() { > >> > @@ -86,19 +86,19 @@ public class StreamSqlTask implements StreamTask, > >> > InitableTask, WindowableTask { > >> > } > >> > }; > >> > StreamStreamJoin join = new StreamStreamJoin("joinOp", > >> > inputRelations, "joinOutput", joinKeys); > >> > - // 4. create a re-partition operator > >> > + // 3. create a re-partition operator > >> > PartitionOp par = new PartitionOp("parOp1", "joinOutput", > >>"kafka", > >> > "parOutputStrm1", "joinKey", 50); > >> > > >> > // Now, connecting the operators via the OperatorRouter > >> > - this.rteCntx = new SimpleRouter(); > >> > + this.router = new SimpleRouter(); > >> > // 1. set two system input operators (i.e. two window operators) > >> > - this.rteCntx.addOperator(wnd1); > >> > - this.rteCntx.addOperator(wnd2); > >> > + this.router.addOperator(wnd1); > >> > + this.router.addOperator(wnd2); > >> > // 2. connect join operator to both window operators > >> > - this.rteCntx.addOperator(join); > >> > + this.router.addOperator(join); > >> > // 3. connect re-partition operator to the stream operator > >> > - this.rteCntx.addOperator(par); > >> > + this.router.addOperator(par); > >> > > >> > - this.rteCntx.init(config, context); > >> > + this.router.init(config, context); > >> > } > >> > } > >> > > >> > > >> > >> > >> -- > >> Milinda Pathirage > >> > >> PhD Student | Research Assistant > >> School of Informatics and Computing | Data to Insight Center > >> Indiana University > >> > >> twitter: milindalakmal > >> skype: milinda.pathirage > >> blog: http://milinda.pathirage.org > >> > > -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org