Hi Navina, Did we decided to push this patch to samza-sql branch. I thought Yi is still working on this. Some Git conflict related texts are still there in this commit.
+<<<<<<< HEAD + * The callback object +======= + * The callback function +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of callbacks w/o inheriting and creating many sub-classes from operators Milinda On Mon, Jun 1, 2015 at 9:06 PM, <nav...@apache.org> wrote: > Yi's TopologyBuilder RB 34500 > > > Project: http://git-wip-us.apache.org/repos/asf/samza/repo > Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/45b85477 > Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/45b85477 > Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/45b85477 > > Branch: refs/heads/samza-sql > Commit: 45b854772cf36cc69e8d8cda7a51bce1be5fe576 > Parents: 41c4cd0 > Author: Navina <navi.trin...@gmail.com> > Authored: Thu May 28 18:51:30 2015 -0700 > Committer: Navina <navi.trin...@gmail.com> > Committed: Thu May 28 18:51:30 2015 -0700 > > ---------------------------------------------------------------------- > .../apache/samza/sql/api/data/EntityName.java | 41 ++- > .../org/apache/samza/sql/api/data/Table.java | 7 +- > .../samza/sql/api/operators/Operator.java | 4 + > .../sql/api/operators/OperatorCallback.java | 1 - > .../samza/sql/api/operators/OperatorRouter.java | 8 + > .../samza/sql/api/operators/OperatorSink.java | 30 ++ > .../samza/sql/api/operators/OperatorSource.java | 30 ++ > .../samza/sql/api/operators/SimpleOperator.java | 3 +- > .../samza/sql/data/IncomingMessageTuple.java | 1 - > .../sql/operators/NoopOperatorCallback.java | 53 ++++ > .../samza/sql/operators/OperatorTopology.java | 53 ++++ > .../samza/sql/operators/SimpleOperatorImpl.java | 147 ++++++++++ > .../samza/sql/operators/SimpleOperatorSpec.java | 106 +++++++ > .../samza/sql/operators/SimpleRouter.java | 141 +++++++++ > .../operators/factory/NoopOperatorCallback.java | 50 ---- > .../operators/factory/SimpleOperatorImpl.java | 136 --------- > .../operators/factory/SimpleOperatorSpec.java | 106 ------- > .../sql/operators/factory/SimpleRouter.java | 136 --------- > .../sql/operators/factory/TopologyBuilder.java | 284 +++++++++++++++++++ > .../sql/operators/join/StreamStreamJoin.java | 3 +- > .../operators/join/StreamStreamJoinSpec.java | 15 +- > .../sql/operators/partition/PartitionOp.java | 3 +- > .../sql/operators/partition/PartitionSpec.java | 2 +- > .../sql/operators/window/BoundedTimeWindow.java | 4 +- > .../samza/sql/operators/window/WindowSpec.java | 7 +- > .../samza/task/sql/SimpleMessageCollector.java | 37 ++- > .../task/sql/RandomWindowOperatorTask.java | 11 +- > .../apache/samza/task/sql/StreamSqlTask.java | 26 +- > .../samza/task/sql/UserCallbacksSqlTask.java | 66 ++--- > 29 files changed, 991 insertions(+), 520 deletions(-) > ---------------------------------------------------------------------- > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > index 80ba455..df1b11b 100644 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > @@ -49,6 +49,8 @@ public class EntityName { > */ > private final String name; > > + private final boolean isSystemEntity; > + > /** > * Static map of already allocated table names > */ > @@ -59,15 +61,19 @@ public class EntityName { > */ > private static Map<String, EntityName> streams = new HashMap<String, > EntityName>(); > > + private static final String ANONYMOUS = "anonymous"; > + > /** > * Private ctor to create entity names > * > * @param type Type of the entity name > * @param name Formatted name of the entity > + * @param isSystemEntity whether the entity is a system input/output > */ > - private EntityName(EntityType type, String name) { > + private EntityName(EntityType type, String name, boolean > isSystemEntity) { > this.type = type; > this.name = name; > + this.isSystemEntity = isSystemEntity; > } > > @Override > @@ -102,6 +108,10 @@ public class EntityName { > return this.type.equals(EntityType.STREAM); > } > > + public boolean isSystemEntity() { > + return this.isSystemEntity; > + } > + > /** > * Get the formatted entity name > * > @@ -111,15 +121,24 @@ public class EntityName { > return this.name; > } > > + public static EntityName getTableName(String name) { > + return getTableName(name, false); > + } > + > + public static EntityName getStreamName(String name) { > + return getStreamName(name, false); > + } > + > /** > * Static method to get the instance of {@code EntityName} with type > {@code EntityType.TABLE} > * > * @param name The formatted entity name of the relation > + * @param isSystem The boolean flag indicating whether this is a system > input/output > * @return A <code>EntityName</code> for a relation > */ > - public static EntityName getTableName(String name) { > + public static EntityName getTableName(String name, boolean isSystem) { > if (tables.get(name) == null) { > - tables.put(name, new EntityName(EntityType.TABLE, name)); > + tables.put(name, new EntityName(EntityType.TABLE, name, isSystem)); > } > return tables.get(name); > } > @@ -128,13 +147,25 @@ public class EntityName { > * Static method to get the instance of <code>EntityName</code> with > type <code>EntityType.STREAM</code> > * > * @param name The formatted entity name of the stream > + * @param isSystem The boolean flag indicating whether this is a system > input/output > * @return A <code>EntityName</code> for a stream > */ > - public static EntityName getStreamName(String name) { > + public static EntityName getStreamName(String name, boolean isSystem) { > if (streams.get(name) == null) { > - streams.put(name, new EntityName(EntityType.STREAM, name)); > + streams.put(name, new EntityName(EntityType.STREAM, name, > isSystem)); > } > return streams.get(name); > } > > + public static EntityName getAnonymousStream() { > + return getStreamName(ANONYMOUS); > + } > + > + public static EntityName getAnonymousTable() { > + return getTableName(ANONYMOUS); > + } > + > + public boolean isAnonymous() { > + return this.name.equals(ANONYMOUS); > + } > } > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > index 7b4d984..b4dce07 100644 > --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > +++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > @@ -19,6 +19,9 @@ > > package org.apache.samza.sql.api.data; > > +import java.util.List; > + > + > /** > * This interface defines a non-ordered {@link > org.apache.samza.sql.api.data.Relation}, which has a unique primary key > * > @@ -31,8 +34,8 @@ public interface Table<K> extends Relation<K> { > /** > * Get the primary key field name for this table > * > - * @return The name of the primary key field > + * @return The names of the primary key fields > */ > - String getPrimaryKeyName(); > + List<String> getPrimaryKeyNames(); > > } > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > index d6f6b57..9c6eaa5 100644 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > @@ -27,7 +27,11 @@ import org.apache.samza.task.TaskContext; > import org.apache.samza.task.TaskCoordinator; > > > +/** > + * This class defines the common interface for operator classes. > + */ > public interface Operator { > + > /** > * Method to initialize the operator > * > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > index fb2aa89..5a77d95 100644 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > @@ -23,7 +23,6 @@ import org.apache.samza.sql.api.data.Tuple; > import org.apache.samza.task.MessageCollector; > import org.apache.samza.task.TaskCoordinator; > > - > /** > * Defines the callback functions to allow customized functions to be > invoked before process and before sending the result > */ > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > index 0759638..432e6b3 100644 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > @@ -19,6 +19,7 @@ > > package org.apache.samza.sql.api.operators; > > +import java.util.Iterator; > import java.util.List; > > import org.apache.samza.sql.api.data.EntityName; > @@ -51,4 +52,11 @@ public interface OperatorRouter extends Operator { > */ > List<SimpleOperator> getNextOperators(EntityName output); > > + /** > + * This method provides an iterator to go through all operators > connected via {@code OperatorRouter} > + * > + * @return An {@link java.util.Iterator} for all operators connected > via {@code OperatorRouter} > + */ > + Iterator<SimpleOperator> iterator(); > + > } > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java > new file mode 100644 > index 0000000..e2c748c > --- /dev/null > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java > @@ -0,0 +1,30 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package org.apache.samza.sql.api.operators; > + > +import java.util.Iterator; > + > +import org.apache.samza.sql.api.data.EntityName; > + > + > +public interface OperatorSink { > + Iterator<SimpleOperator> opIterator(); > + > + EntityName getName(); > +} > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java > new file mode 100644 > index 0000000..860c1aa > --- /dev/null > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java > @@ -0,0 +1,30 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package org.apache.samza.sql.api.operators; > + > +import java.util.Iterator; > + > +import org.apache.samza.sql.api.data.EntityName; > + > + > +public interface OperatorSource { > + Iterator<SimpleOperator> opIterator(); > + > + EntityName getName(); > +} > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > index c49a822..60ace9c 100644 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > @@ -19,8 +19,6 @@ > > package org.apache.samza.sql.api.operators; > > - > - > /** > * The interface for a {@code SimpleOperator} that implements a simple > primitive relational logic operation > */ > @@ -31,4 +29,5 @@ public interface SimpleOperator extends Operator { > * @return The {@link org.apache.samza.sql.api.operators.OperatorSpec} > object that defines the configuration/parameters of the operator > */ > OperatorSpec getSpec(); > + > } > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > index 72a59f2..af040f0 100644 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > @@ -81,7 +81,6 @@ public class IncomingMessageTuple implements Tuple { > > @Override > public long getCreateTimeNano() { > - // TODO: this is wrong and just to keep as an placeholder. It should > be replaced by the message publish time when the publish timestamp is > available in the message metadata > return this.recvTimeNano; > } > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java > new file mode 100644 > index 0000000..e951737 > --- /dev/null > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java > @@ -0,0 +1,53 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package org.apache.samza.sql.operators; > + > +import org.apache.samza.sql.api.data.Relation; > +import org.apache.samza.sql.api.data.Tuple; > +import org.apache.samza.sql.api.operators.OperatorCallback; > +import org.apache.samza.task.MessageCollector; > +import org.apache.samza.task.TaskCoordinator; > + > + > +/** > + * This is a default NOOP operator callback object that does nothing > before and after the process method > + */ > +public final class NoopOperatorCallback implements OperatorCallback { > + > + @Override > + public Tuple beforeProcess(Tuple tuple, MessageCollector collector, > TaskCoordinator coordinator) { > + return tuple; > + } > + > + @Override > + public Relation beforeProcess(Relation rel, MessageCollector collector, > TaskCoordinator coordinator) { > + return rel; > + } > + > + @Override > + public Tuple afterProcess(Tuple tuple, MessageCollector collector, > TaskCoordinator coordinator) { > + return tuple; > + } > + > + @Override > + public Relation afterProcess(Relation rel, MessageCollector collector, > TaskCoordinator coordinator) { > + return rel; > + } > + > +} > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java > new file mode 100644 > index 0000000..8b70092 > --- /dev/null > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java > @@ -0,0 +1,53 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package org.apache.samza.sql.operators; > + > +import java.util.Iterator; > + > +import org.apache.samza.sql.api.data.EntityName; > +import org.apache.samza.sql.api.operators.OperatorSink; > +import org.apache.samza.sql.api.operators.OperatorSource; > +import org.apache.samza.sql.api.operators.SimpleOperator; > + > + > +/** > + * This class implements a partially completed {@link > org.apache.samza.sql.operators.factory.TopologyBuilder} that signifies a > partially completed > + * topology that the current operator has unbounded input stream that can > be attached to other operators' output > + */ > +public class OperatorTopology implements OperatorSource, OperatorSink { > + > + private final EntityName name; > + private final SimpleRouter router; > + > + public OperatorTopology(EntityName name, SimpleRouter router) { > + this.name = name; > + this.router = router; > + } > + > + @Override > + public Iterator<SimpleOperator> opIterator() { > + return this.router.iterator(); > + } > + > + @Override > + public EntityName getName() { > + return this.name; > + } > + > +} > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java > new file mode 100644 > index 0000000..423880b > --- /dev/null > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java > @@ -0,0 +1,147 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > + > +package org.apache.samza.sql.operators; > + > +import org.apache.samza.sql.api.data.Relation; > +import org.apache.samza.sql.api.data.Tuple; > +import org.apache.samza.sql.api.operators.OperatorCallback; > +import org.apache.samza.sql.api.operators.OperatorSpec; > +import org.apache.samza.sql.api.operators.SimpleOperator; > +import org.apache.samza.task.MessageCollector; > +import org.apache.samza.task.TaskCoordinator; > +import org.apache.samza.task.sql.SimpleMessageCollector; > + > + > +/** > + * An abstract class that encapsulate the basic information and methods > that all operator classes should implement. > + * It implements the interface {@link > org.apache.samza.sql.api.operators.SimpleOperator} > + */ > +public abstract class SimpleOperatorImpl implements SimpleOperator { > + /** > + * The specification of this operator > + */ > + private final OperatorSpec spec; > + > + /** > +<<<<<<< HEAD > + * The callback object > +======= > + * The callback function > +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of > callbacks w/o inheriting and creating many sub-classes from operators > + */ > + private final OperatorCallback callback; > + > + /** > + * Ctor of {@code SimpleOperatorImpl} class > + * > + * @param spec The specification of this operator > + */ > + public SimpleOperatorImpl(OperatorSpec spec) { > + this(spec, new NoopOperatorCallback()); > + } > + > + public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback callback) > { > + this.spec = spec; > + this.callback = callback; > + } > + > + @Override > + public OperatorSpec getSpec() { > + return this.spec; > + } > + > + /** > + * This method is made final s.t. the sequence of invocations between > {@link > org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation, > MessageCollector, TaskCoordinator)} > + * and real processing of the input relation is fixed. > + */ > + @Override > + final public void process(Relation deltaRelation, MessageCollector > collector, TaskCoordinator coordinator) > + throws Exception { > + Relation rel = this.callback.beforeProcess(deltaRelation, collector, > coordinator); > + if (rel == null) { > + return; > + } > + this.realProcess(rel, getCollector(collector, coordinator), > coordinator); > + } > + > + /** > + * This method is made final s.t. the sequence of invocations between > {@link > org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, > MessageCollector, TaskCoordinator)} > + * and real processing of the input tuple is fixed. > + */ > + @Override > + final public void process(Tuple tuple, MessageCollector collector, > TaskCoordinator coordinator) throws Exception { > + Tuple ituple = this.callback.beforeProcess(tuple, collector, > coordinator); > + if (ituple == null) { > + return; > + } > + this.realProcess(ituple, getCollector(collector, coordinator), > coordinator); > + } > + > + /** > + * This method is made final s.t. we enforce the invocation of {@code > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} before > doing anything futher > + */ > + @Override > + final public void refresh(long timeNano, MessageCollector collector, > TaskCoordinator coordinator) throws Exception { > + this.realRefresh(timeNano, getCollector(collector, coordinator), > coordinator); > + } > + > + private SimpleMessageCollector getCollector(MessageCollector collector, > TaskCoordinator coordinator) { > + if (!(collector instanceof SimpleMessageCollector)) { > + return new SimpleMessageCollector(collector, coordinator, > this.callback); > + } else { > + ((SimpleMessageCollector) collector).switchCallback(this.callback); > + return (SimpleMessageCollector) collector; > + } > + } > + > + /** > + * Method to be overriden by each specific implementation class of > operator to handle timeout event > + * > + * @param timeNano The time in nanosecond when the timeout event > occurred > + * @param collector The {@link > org.apache.samza.task.sql.SimpleMessageCollector} in the context > + * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} > in the context > + * @throws Exception Throws exception if failed to refresh the results > + */ > + protected abstract void realRefresh(long timeNano, > SimpleMessageCollector collector, TaskCoordinator coordinator) > + throws Exception; > + > + /** > + * Method to be overriden by each specific implementation class of > operator to perform relational logic operation on an input {@link > org.apache.samza.sql.api.data.Relation} > + * > + * @param rel The input relation > + * @param collector The {@link > org.apache.samza.task.sql.SimpleMessageCollector} in the context > + * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} > in the context > + * @throws Exception Throws exception if failed to process > + */ > + protected abstract void realProcess(Relation rel, > SimpleMessageCollector collector, TaskCoordinator coordinator) > + throws Exception; > + > + /** > + * Method to be overriden by each specific implementation class of > operator to perform relational logic operation on an input {@link > org.apache.samza.sql.api.data.Tuple} > + * > + * @param ituple The input tuple > + * @param collector The {@link > org.apache.samza.task.sql.SimpleMessageCollector} in the context > + * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} > in the context > + * @throws Exception Throws exception if failed to process > + */ > + protected abstract void realProcess(Tuple ituple, > SimpleMessageCollector collector, TaskCoordinator coordinator) > + throws Exception; > + > +} > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java > new file mode 100644 > index 0000000..691e543 > --- /dev/null > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java > @@ -0,0 +1,106 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package org.apache.samza.sql.operators; > + > +import java.util.ArrayList; > +import java.util.List; > + > +import org.apache.samza.sql.api.data.EntityName; > +import org.apache.samza.sql.api.operators.OperatorSpec; > + > + > +/** > + * An abstract class that encapsulate the basic information and methods > that all specification of operators should implement. > + * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec} > + */ > +public abstract class SimpleOperatorSpec implements OperatorSpec { > + /** > + * The identifier of the corresponding operator > + */ > + private final String id; > + > + /** > + * The list of input entity names of the corresponding operator > + */ > + private final List<EntityName> inputs = new ArrayList<EntityName>(); > + > + /** > + * The list of output entity names of the corresponding operator > + */ > + private final List<EntityName> outputs = new ArrayList<EntityName>(); > + > + /** > + * Ctor of the {@code SimpleOperatorSpec} for simple {@link > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one > output > + * > + * @param id Unique identifier of the {@link > org.apache.samza.sql.api.operators.SimpleOperator} object > + * @param input The only input entity > + * @param output The only output entity > + */ > + public SimpleOperatorSpec(String id, EntityName input, EntityName > output) { > + this.id = id; > + this.inputs.add(input); > + this.outputs.add(output); > + } > + > + /** > + * Ctor of {@code SimpleOperatorSpec} with general format: m inputs and > n outputs > + * > + * @param id Unique identifier of the {@link > org.apache.samza.sql.api.operators.SimpleOperator} object > + * @param inputs The list of input entities > + * @param output The list of output entities > + */ > + public SimpleOperatorSpec(String id, List<EntityName> inputs, > EntityName output) { > + this.id = id; > + this.inputs.addAll(inputs); > + this.outputs.add(output); > + } > + > + @Override > + public String getId() { > + return this.id; > + } > + > + @Override > + public List<EntityName> getInputNames() { > + return this.inputs; > + } > + > + @Override > + public List<EntityName> getOutputNames() { > + return this.outputs; > + } > + > + /** > + * Method to get the first output entity > + * > + * @return The first output entity name > + */ > + public EntityName getOutputName() { > + return this.outputs.get(0); > + } > + > + /** > + * Method to get the first input entity > + * > + * @return The first input entity name > + */ > + public EntityName getInputName() { > + return this.inputs.get(0); > + } > +} > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java > new file mode 100644 > index 0000000..2d9a1db > --- /dev/null > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java > @@ -0,0 +1,141 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > + > +package org.apache.samza.sql.operators; > + > +import java.util.ArrayList; > +import java.util.HashMap; > +import java.util.HashSet; > +import java.util.Iterator; > +import java.util.List; > +import java.util.Map; > +import java.util.Set; > + > +import org.apache.samza.config.Config; > +import org.apache.samza.sql.api.data.EntityName; > +import org.apache.samza.sql.api.data.Relation; > +import org.apache.samza.sql.api.data.Tuple; > +import org.apache.samza.sql.api.operators.Operator; > +import org.apache.samza.sql.api.operators.OperatorRouter; > +import org.apache.samza.sql.api.operators.SimpleOperator; > +import org.apache.samza.task.MessageCollector; > +import org.apache.samza.task.TaskContext; > +import org.apache.samza.task.TaskCoordinator; > +import org.apache.samza.task.sql.RouterMessageCollector; > + > + > +/** > + * Example implementation of {@link > org.apache.samza.sql.api.operators.OperatorRouter} > + * > + */ > +public final class SimpleRouter implements OperatorRouter { > + /** > + * List of operators added to the {@link > org.apache.samza.sql.api.operators.OperatorRouter} > + */ > + private List<SimpleOperator> operators = new > ArrayList<SimpleOperator>(); > + > + @SuppressWarnings("rawtypes") > + /** > + * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list > of operators associated with it > + */ > + private Map<EntityName, List> nextOps = new HashMap<EntityName, List>(); > + > + /** > + * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs to > this {@code SimpleRouter} > + */ > + private Set<EntityName> inputEntities = new HashSet<EntityName>(); > + > + /** > + * Set of entities that are not input entities to this {@code > SimpleRouter} > + */ > + private Set<EntityName> outputEntities = new HashSet<EntityName>(); > + > + @SuppressWarnings("unchecked") > + private void addOperator(EntityName input, SimpleOperator nextOp) { > + if (nextOps.get(input) == null) { > + nextOps.put(input, new ArrayList<Operator>()); > + } > + nextOps.get(input).add(nextOp); > + operators.add(nextOp); > + // get the operator spec > + for (EntityName output : nextOp.getSpec().getOutputNames()) { > + if (inputEntities.contains(output)) { > + inputEntities.remove(output); > + } > + outputEntities.add(output); > + } > + if (!outputEntities.contains(input)) { > + inputEntities.add(input); > + } > + } > + > + @Override > + @SuppressWarnings("unchecked") > + public List<SimpleOperator> getNextOperators(EntityName entity) { > + return nextOps.get(entity); > + } > + > + @Override > + public void addOperator(SimpleOperator nextOp) { > + List<EntityName> inputs = nextOp.getSpec().getInputNames(); > + for (EntityName input : inputs) { > + addOperator(input, nextOp); > + } > + } > + > + @Override > + public void init(Config config, TaskContext context) throws Exception { > + for (SimpleOperator op : this.operators) { > + op.init(config, context); > + } > + } > + > + @Override > + public void process(Tuple ituple, MessageCollector collector, > TaskCoordinator coordinator) throws Exception { > + MessageCollector opCollector = new RouterMessageCollector(collector, > coordinator, this); > + for (Iterator<SimpleOperator> iter = > this.getNextOperators(ituple.getEntityName()).iterator(); iter.hasNext();) { > + iter.next().process(ituple, opCollector, coordinator); > + } > + } > + > + @SuppressWarnings("rawtypes") > + @Override > + public void process(Relation deltaRelation, MessageCollector collector, > TaskCoordinator coordinator) throws Exception { > + MessageCollector opCollector = new RouterMessageCollector(collector, > coordinator, this); > + for (Iterator<SimpleOperator> iter = > this.getNextOperators(deltaRelation.getName()).iterator(); iter.hasNext();) > { > + iter.next().process(deltaRelation, opCollector, coordinator); > + } > + } > + > + @Override > + public void refresh(long nanoSec, MessageCollector collector, > TaskCoordinator coordinator) throws Exception { > + MessageCollector opCollector = new RouterMessageCollector(collector, > coordinator, this); > + for (EntityName entity : inputEntities) { > + for (Iterator<SimpleOperator> iter = > this.getNextOperators(entity).iterator(); iter.hasNext();) { > + iter.next().refresh(nanoSec, opCollector, coordinator); > + } > + } > + } > + > + @Override > + public Iterator<SimpleOperator> iterator() { > + return this.operators.iterator(); > + } > + > +} > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > deleted file mode 100644 > index c3d2266..0000000 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > +++ /dev/null > @@ -1,50 +0,0 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one > - * or more contributor license agreements. See the NOTICE file > - * distributed with this work for additional information > - * regarding copyright ownership. The ASF licenses this file > - * to you under the Apache License, Version 2.0 (the > - * "License"); you may not use this file except in compliance > - * with the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, > - * software distributed under the License is distributed on an > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > - * KIND, either express or implied. See the License for the > - * specific language governing permissions and limitations > - * under the License. > - */ > -package org.apache.samza.sql.operators.factory; > - > -import org.apache.samza.sql.api.data.Relation; > -import org.apache.samza.sql.api.data.Tuple; > -import org.apache.samza.sql.api.operators.OperatorCallback; > -import org.apache.samza.task.MessageCollector; > -import org.apache.samza.task.TaskCoordinator; > - > - > -public final class NoopOperatorCallback implements OperatorCallback { > - > - @Override > - public Tuple beforeProcess(Tuple tuple, MessageCollector collector, > TaskCoordinator coordinator) { > - return tuple; > - } > - > - @Override > - public Relation beforeProcess(Relation rel, MessageCollector collector, > TaskCoordinator coordinator) { > - return rel; > - } > - > - @Override > - public Tuple afterProcess(Tuple tuple, MessageCollector collector, > TaskCoordinator coordinator) { > - return tuple; > - } > - > - @Override > - public Relation afterProcess(Relation rel, MessageCollector collector, > TaskCoordinator coordinator) { > - return rel; > - } > - > -} > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > deleted file mode 100644 > index e66451f..0000000 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > +++ /dev/null > @@ -1,136 +0,0 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one > - * or more contributor license agreements. See the NOTICE file > - * distributed with this work for additional information > - * regarding copyright ownership. The ASF licenses this file > - * to you under the Apache License, Version 2.0 (the > - * "License"); you may not use this file except in compliance > - * with the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, > - * software distributed under the License is distributed on an > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > - * KIND, either express or implied. See the License for the > - * specific language governing permissions and limitations > - * under the License. > - */ > - > -package org.apache.samza.sql.operators.factory; > - > -import org.apache.samza.sql.api.data.Relation; > -import org.apache.samza.sql.api.data.Tuple; > -import org.apache.samza.sql.api.operators.OperatorCallback; > -import org.apache.samza.sql.api.operators.OperatorSpec; > -import org.apache.samza.sql.api.operators.SimpleOperator; > -import org.apache.samza.task.MessageCollector; > -import org.apache.samza.task.TaskCoordinator; > -import org.apache.samza.task.sql.SimpleMessageCollector; > - > - > -/** > - * An abstract class that encapsulate the basic information and methods > that all operator classes should implement. > - * It implements the interface {@link > org.apache.samza.sql.api.operators.SimpleOperator} > - * > - */ > -public abstract class SimpleOperatorImpl implements SimpleOperator { > - /** > - * The specification of this operator > - */ > - private final OperatorSpec spec; > - > - /** > - * The callback function > - */ > - private final OperatorCallback callback; > - > - /** > - * Ctor of {@code SimpleOperatorImpl} class > - * > - * @param spec The specification of this operator > - */ > - public SimpleOperatorImpl(OperatorSpec spec) { > - this(spec, new NoopOperatorCallback()); > - } > - > - public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback callback) > { > - this.spec = spec; > - this.callback = callback; > - } > - > - @Override > - public OperatorSpec getSpec() { > - return this.spec; > - } > - > - /** > - * This method is made final s.t. the sequence of invocations between > {@link > org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation, > MessageCollector, TaskCoordinator)} > - * and real processing of the input relation is fixed. > - */ > - @Override > - final public void process(Relation deltaRelation, MessageCollector > collector, TaskCoordinator coordinator) > - throws Exception { > - Relation rel = this.callback.beforeProcess(deltaRelation, collector, > coordinator); > - if (rel == null) { > - return; > - } > - this.realProcess(rel, getCollector(collector, coordinator), > coordinator); > - } > - > - /** > - * This method is made final s.t. the sequence of invocations between > {@link > org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, > MessageCollector, TaskCoordinator)} > - * and real processing of the input tuple is fixed. > - */ > - @Override > - final public void process(Tuple tuple, MessageCollector collector, > TaskCoordinator coordinator) throws Exception { > - Tuple ituple = this.callback.beforeProcess(tuple, collector, > coordinator); > - if (ituple == null) { > - return; > - } > - this.realProcess(ituple, getCollector(collector, coordinator), > coordinator); > - } > - > - /** > - * This method is made final s.t. we enforce the invocation of {@code > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} before > doing anything futher > - */ > - @Override > - final public void refresh(long timeNano, MessageCollector collector, > TaskCoordinator coordinator) throws Exception { > - this.realRefresh(timeNano, getCollector(collector, coordinator), > coordinator); > - } > - > - private SimpleMessageCollector getCollector(MessageCollector collector, > TaskCoordinator coordinator) { > - if (!(collector instanceof SimpleMessageCollector)) { > - return new SimpleMessageCollector(collector, coordinator, > this.callback); > - } else { > - ((SimpleMessageCollector) > collector).switchOperatorCallback(this.callback); > - return (SimpleMessageCollector) collector; > - } > - } > - > - /** > - * Method to be overriden by each specific implementation class of > operator to handle timeout event > - * > - * @param timeNano The time in nanosecond when the timeout event > occurred > - * @param collector The {@link > org.apache.samza.task.sql.SimpleMessageCollector} in the context > - * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} > in the context > - * @throws Exception Throws exception if failed to refresh the results > - */ > - protected abstract void realRefresh(long timeNano, > SimpleMessageCollector collector, TaskCoordinator coordinator) > - throws Exception; > - > - /** > - * Method to be overriden by each specific implementation class of > operator to perform relational logic operation on an input {@link > org.apache.samza.sql.api.data.Relation} > - * > - * @param rel The input relation > - * @param collector The {@link > org.apache.samza.task.sql.SimpleMessageCollector} in the context > - * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} > in the context > - * @throws Exception > - */ > - protected abstract void realProcess(Relation rel, > SimpleMessageCollector collector, TaskCoordinator coordinator) > - throws Exception; > - > - protected abstract void realProcess(Tuple ituple, > SimpleMessageCollector collector, TaskCoordinator coordinator) > - throws Exception; > - > -} > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > deleted file mode 100644 > index 56753b6..0000000 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > +++ /dev/null > @@ -1,106 +0,0 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one > - * or more contributor license agreements. See the NOTICE file > - * distributed with this work for additional information > - * regarding copyright ownership. The ASF licenses this file > - * to you under the Apache License, Version 2.0 (the > - * "License"); you may not use this file except in compliance > - * with the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, > - * software distributed under the License is distributed on an > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > - * KIND, either express or implied. See the License for the > - * specific language governing permissions and limitations > - * under the License. > - */ > -package org.apache.samza.sql.operators.factory; > - > -import java.util.ArrayList; > -import java.util.List; > - > -import org.apache.samza.sql.api.data.EntityName; > -import org.apache.samza.sql.api.operators.OperatorSpec; > - > - > -/** > - * An abstract class that encapsulate the basic information and methods > that all specification of operators should implement. > - * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec} > - */ > -public abstract class SimpleOperatorSpec implements OperatorSpec { > - /** > - * The identifier of the corresponding operator > - */ > - private final String id; > - > - /** > - * The list of input entity names of the corresponding operator > - */ > - private final List<EntityName> inputs = new ArrayList<EntityName>(); > - > - /** > - * The list of output entity names of the corresponding operator > - */ > - private final List<EntityName> outputs = new ArrayList<EntityName>(); > - > - /** > - * Ctor of the {@code SimpleOperatorSpec} for simple {@link > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one > output > - * > - * @param id Unique identifier of the {@link > org.apache.samza.sql.api.operators.SimpleOperator} object > - * @param input The only input entity > - * @param output The only output entity > - */ > - public SimpleOperatorSpec(String id, EntityName input, EntityName > output) { > - this.id = id; > - this.inputs.add(input); > - this.outputs.add(output); > - } > - > - /** > - * Ctor of {@code SimpleOperatorSpec} with general format: m inputs and > n outputs > - * > - * @param id Unique identifier of the {@link > org.apache.samza.sql.api.operators.SimpleOperator} object > - * @param inputs The list of input entities > - * @param output The list of output entities > - */ > - public SimpleOperatorSpec(String id, List<EntityName> inputs, > EntityName output) { > - this.id = id; > - this.inputs.addAll(inputs); > - this.outputs.add(output); > - } > - > - @Override > - public String getId() { > - return this.id; > - } > - > - @Override > - public List<EntityName> getInputNames() { > - return this.inputs; > - } > - > - @Override > - public List<EntityName> getOutputNames() { > - return this.outputs; > - } > - > - /** > - * Method to get the first output entity > - * > - * @return The first output entity name > - */ > - public EntityName getOutputName() { > - return this.outputs.get(0); > - } > - > - /** > - * Method to get the first input entity > - * > - * @return The first input entity name > - */ > - public EntityName getInputName() { > - return this.inputs.get(0); > - } > -} > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > deleted file mode 100644 > index e570897..0000000 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > +++ /dev/null > @@ -1,136 +0,0 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one > - * or more contributor license agreements. See the NOTICE file > - * distributed with this work for additional information > - * regarding copyright ownership. The ASF licenses this file > - * to you under the Apache License, Version 2.0 (the > - * "License"); you may not use this file except in compliance > - * with the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, > - * software distributed under the License is distributed on an > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > - * KIND, either express or implied. See the License for the > - * specific language governing permissions and limitations > - * under the License. > - */ > - > -package org.apache.samza.sql.operators.factory; > - > -import java.util.ArrayList; > -import java.util.HashMap; > -import java.util.HashSet; > -import java.util.Iterator; > -import java.util.List; > -import java.util.Map; > -import java.util.Set; > - > -import org.apache.samza.config.Config; > -import org.apache.samza.sql.api.data.EntityName; > -import org.apache.samza.sql.api.data.Relation; > -import org.apache.samza.sql.api.data.Tuple; > -import org.apache.samza.sql.api.operators.Operator; > -import org.apache.samza.sql.api.operators.OperatorRouter; > -import org.apache.samza.sql.api.operators.SimpleOperator; > -import org.apache.samza.task.MessageCollector; > -import org.apache.samza.task.TaskContext; > -import org.apache.samza.task.TaskCoordinator; > -import org.apache.samza.task.sql.RouterMessageCollector; > - > - > -/** > - * Example implementation of {@link > org.apache.samza.sql.api.operators.OperatorRouter} > - * > - */ > -public final class SimpleRouter implements OperatorRouter { > - /** > - * List of operators added to the {@link > org.apache.samza.sql.api.operators.OperatorRouter} > - */ > - private List<SimpleOperator> operators = new > ArrayList<SimpleOperator>(); > - > - @SuppressWarnings("rawtypes") > - /** > - * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list > of operators associated with it > - */ > - private Map<EntityName, List> nextOps = new HashMap<EntityName, List>(); > - > - /** > - * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs to > this {@code SimpleRouter} > - */ > - private Set<EntityName> inputEntities = new HashSet<EntityName>(); > - > - /** > - * Set of entities that are not input entities to this {@code > SimpleRouter} > - */ > - private Set<EntityName> outputEntities = new HashSet<EntityName>(); > - > - @SuppressWarnings("unchecked") > - private void addOperator(EntityName input, SimpleOperator nextOp) { > - if (nextOps.get(input) == null) { > - nextOps.put(input, new ArrayList<Operator>()); > - } > - nextOps.get(input).add(nextOp); > - operators.add(nextOp); > - // get the operator spec > - for (EntityName output : nextOp.getSpec().getOutputNames()) { > - if (inputEntities.contains(output)) { > - inputEntities.remove(output); > - } > - outputEntities.add(output); > - } > - if (!outputEntities.contains(input)) { > - inputEntities.add(input); > - } > - } > - > - @Override > - @SuppressWarnings("unchecked") > - public List<SimpleOperator> getNextOperators(EntityName entity) { > - return nextOps.get(entity); > - } > - > - @Override > - public void addOperator(SimpleOperator nextOp) { > - List<EntityName> inputs = nextOp.getSpec().getInputNames(); > - for (EntityName input : inputs) { > - addOperator(input, nextOp); > - } > - } > - > - @Override > - public void init(Config config, TaskContext context) throws Exception { > - for (SimpleOperator op : this.operators) { > - op.init(config, context); > - } > - } > - > - @Override > - public void process(Tuple ituple, MessageCollector collector, > TaskCoordinator coordinator) throws Exception { > - MessageCollector opCollector = new RouterMessageCollector(collector, > coordinator, this); > - for (Iterator<SimpleOperator> iter = > this.getNextOperators(ituple.getEntityName()).iterator(); iter.hasNext();) { > - iter.next().process(ituple, opCollector, coordinator); > - } > - } > - > - @SuppressWarnings("rawtypes") > - @Override > - public void process(Relation deltaRelation, MessageCollector collector, > TaskCoordinator coordinator) throws Exception { > - MessageCollector opCollector = new RouterMessageCollector(collector, > coordinator, this); > - for (Iterator<SimpleOperator> iter = > this.getNextOperators(deltaRelation.getName()).iterator(); iter.hasNext();) > { > - iter.next().process(deltaRelation, opCollector, coordinator); > - } > - } > - > - @Override > - public void refresh(long nanoSec, MessageCollector collector, > TaskCoordinator coordinator) throws Exception { > - MessageCollector opCollector = new RouterMessageCollector(collector, > coordinator, this); > - for (EntityName entity : inputEntities) { > - for (Iterator<SimpleOperator> iter = > this.getNextOperators(entity).iterator(); iter.hasNext();) { > - iter.next().refresh(nanoSec, opCollector, coordinator); > - } > - } > - } > - > -} > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java > new file mode 100644 > index 0000000..62b19fc > --- /dev/null > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java > @@ -0,0 +1,284 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package org.apache.samza.sql.operators.factory; > + > +import java.util.HashMap; > +import java.util.HashSet; > +import java.util.Iterator; > +import java.util.List; > +import java.util.Map; > +import java.util.Set; > + > +import org.apache.samza.sql.api.data.EntityName; > +import org.apache.samza.sql.api.operators.OperatorRouter; > +import org.apache.samza.sql.api.operators.OperatorSink; > +import org.apache.samza.sql.api.operators.OperatorSource; > +import org.apache.samza.sql.api.operators.OperatorSpec; > +import org.apache.samza.sql.api.operators.SimpleOperator; > +import org.apache.samza.sql.api.operators.SqlOperatorFactory; > +import org.apache.samza.sql.operators.OperatorTopology; > +import org.apache.samza.sql.operators.SimpleRouter; > + > + > +/** > + * This class implements a builder to allow user to create the operators > and connect them in a topology altogether. > + */ > +public class TopologyBuilder { > + > + /** > + * Internal {@link org.apache.samza.sql.api.operators.OperatorRouter} > object to retain the topology being created > + */ > + private SimpleRouter router; > + > + /** > + * The {@link org.apache.samza.sql.api.operators.SqlOperatorFactory} > object used to create operators connected in the topology > + */ > + private final SqlOperatorFactory factory; > + > + /** > + * The map of unbound inputs, the value is set(input_operators) > + */ > + private Map<EntityName, Set<OperatorSpec>> unboundInputs = new > HashMap<EntityName, Set<OperatorSpec>>(); > + > + /** > + * The map of unbound outputs, the value is the operator generating the > output > + */ > + private Map<EntityName, OperatorSpec> unboundOutputs = new > HashMap<EntityName, OperatorSpec>(); > + > + /** > + * The set of entities that are intermediate entities between operators > + */ > + private Set<EntityName> interStreams = new HashSet<EntityName>(); > + > + /** > + * The current operator that may have unbound input or output > + */ > + private SimpleOperator currentOp = null; > + > + /** > + * Private constructor of {@code TopologyBuilder} > + * > + * @param factory The {@link > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create operators > + */ > + private TopologyBuilder(SqlOperatorFactory factory) { > + this.router = new SimpleRouter(); > + this.factory = factory; > + } > + > + /** > + * Static method to create this {@code TopologyBuilder} w/ a customized > {@link org.apache.samza.sql.api.operators.SqlOperatorFactory} > + * > + * @param factory The {@link > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create operators > + * @return The {@code TopologyBuilder} object > + */ > + public static TopologyBuilder create(SqlOperatorFactory factory) { > + return new TopologyBuilder(factory); > + } > + > + /** > + * Static method to create this {@code TopologyBuilder} > + * > + * @return The {@code TopologyBuilder} object > + */ > + public static TopologyBuilder create() { > + return new TopologyBuilder(new SimpleOperatorFactoryImpl()); > + } > + > + /** > + * Public method to create the next operator and attach it to the > output of the current operator > + * > + * @param spec The {@link > org.apache.samza.sql.api.operators.OperatorSpec} for the next operator > + * @return The updated {@code TopologyBuilder} object > + */ > + public TopologyBuilder operator(OperatorSpec spec) { > + // check whether it is valid to connect a new operator to the current > operator's output > + SimpleOperator nextOp = this.factory.getOperator(spec); > + return this.operator(nextOp); > + } > + > + /** > + * Public method to create the next operator and attach it to the > output of the current operator > + * > + * @param op The {@link > org.apache.samza.sql.api.operators.SimpleOperator} > + * @return The updated {@code TopologyBuilder} object > + */ > + public TopologyBuilder operator(SimpleOperator op) { > + // check whether it is valid to connect a new operator to the current > operator's output > + canAddOperator(op); > + this.addOperator(op); > + // advance the current operator position > + this.currentOp = op; > + return this; > + } > + > + /** > + * Public method to create a stream object that will be the source to > other operators > + * > + * @return The {@link > org.apache.samza.sql.api.operators.OperatorSource} that can be the source > to other operators > + */ > + public OperatorSource stream() { > + canCreateSource(); > + return new > OperatorTopology(this.unboundOutputs.keySet().iterator().next(), > this.router); > + } > + > + /** > + * Public method to create a sink object that can take input stream > from other operators > + * > + * @return The {@link org.apache.samza.sql.api.operators.OperatorSink} > that can be the downstream of other operators > + */ > + public OperatorSink sink() { > + canCreateSink(); > + return new > OperatorTopology(this.unboundInputs.keySet().iterator().next(), > this.router); > + } > + > + /** > + * Public method to bind the input of the current operator w/ the > {@link org.apache.samza.sql.api.operators.OperatorSource} object > + * > + * @param srcStream The {@link > org.apache.samza.sql.api.operators.OperatorSource} that the current > operator is going to be bound to > + * @return The updated {@code TopologyBuilder} object > + */ > + public TopologyBuilder bind(OperatorSource srcStream) { > + EntityName streamName = srcStream.getName(); > + if (this.unboundInputs.containsKey(streamName)) { > + this.unboundInputs.remove(streamName); > + this.interStreams.add(streamName); > + } else { > + // no input operator is waiting for the output from the srcStream > + throw new IllegalArgumentException("No operator input can be bound > to the input stream " + streamName); > + } > + // add all operators in srcStream to this topology > + for (Iterator<SimpleOperator> iter = srcStream.opIterator(); > iter.hasNext();) { > + this.addOperator(iter.next()); > + } > + return this; > + } > + > + /** > + * Public method to attach a {@link > org.apache.samza.sql.api.operators.OperatorSink} object to the output of > the current operator > + * > + * @param nextSink The {@link > org.apache.samza.sql.api.operators.OperatorSink} to be attached to the > current operator's output > + * @return The updated {@code TopologyBuilder} object > + */ > + public TopologyBuilder attach(OperatorSink nextSink) { > + EntityName streamName = nextSink.getName(); > + if (this.unboundOutputs.containsKey(streamName)) { > + this.unboundOutputs.remove(streamName); > + this.interStreams.add(streamName); > + } else { > + // no unbound output to attach to > + throw new IllegalArgumentException("No operator output found to > attach the sink " + streamName); > + } > + // add all operators in nextSink to the router > + for (Iterator<SimpleOperator> iter = nextSink.opIterator(); > iter.hasNext();) { > + this.addOperator(iter.next()); > + } > + return this; > + } > + > + /** > + * Public method to finalize the topology that should have all input > and output bound to system input and output > + * > + * @return The finalized {@link > org.apache.samza.sql.api.operators.OperatorRouter} object > + */ > + public OperatorRouter build() { > + canClose(); > + return router; > + } > + > + private TopologyBuilder addOperator(SimpleOperator nextOp) { > + // if input is not in the unboundOutputs and interStreams, input is > unbound > + for (EntityName in : nextOp.getSpec().getInputNames()) { > + if (this.unboundOutputs.containsKey(in)) { > + this.unboundOutputs.remove(in); > + this.interStreams.add(in); > + } > + if (!this.interStreams.contains(in) && !in.isSystemEntity()) { > + if (!this.unboundInputs.containsKey(in)) { > + this.unboundInputs.put(in, new HashSet<OperatorSpec>()); > + } > + this.unboundInputs.get(in).add(nextOp.getSpec()); > + } > + } > + // if output is not in the unboundInputs and interStreams, output is > unbound > + for (EntityName out : nextOp.getSpec().getOutputNames()) { > + if (this.unboundInputs.containsKey(out)) { > + this.unboundInputs.remove(out); > + this.interStreams.add(out); > + } > + if (!this.interStreams.contains(out) && !out.isSystemEntity()) { > + this.unboundOutputs.put(out, nextOp.getSpec()); > + } > + } > + try { > + this.router.addOperator(nextOp); > + } catch (Exception e) { > + throw new RuntimeException("Failed to add operator " + > nextOp.getSpec().getId() + " to the topology.", e); > + } > + return this; > + } > + > + private void canCreateSource() { > + if (this.unboundInputs.size() > 0) { > + throw new IllegalStateException("Can't create stream when there are > unbounded input streams in the topology"); > + } > + if (this.unboundOutputs.size() != 1) { > + throw new IllegalStateException( > + "Can't create stream when the number of unbounded outputs is > not 1 in the topology"); > + } > + } > + > + private void canCreateSink() { > + if (this.unboundOutputs.size() > 0) { > + throw new IllegalStateException("Can't create sink when there are > unbounded output streams in the topology"); > + } > + if (this.unboundInputs.size() != 1) { > + throw new IllegalStateException( > + "Can't create sink when the number of unbounded input streams > is not 1 in the topology"); > + } > + } > + > + private void canAddOperator(SimpleOperator op) { > + if (this.currentOp == null) { > + return; > + } > + for (EntityName name : this.currentOp.getSpec().getInputNames()) { > + if (this.unboundInputs.containsKey(name)) { > + throw new IllegalArgumentException("There are unbound input " + > name + " to the current operator " > + + this.currentOp.getSpec().getId() + ". Create a sink or call > bind instead"); > + } > + } > + List<EntityName> nextInputs = op.getSpec().getInputNames(); > + for (EntityName name : this.currentOp.getSpec().getOutputNames()) { > + if (!nextInputs.contains(name) && > this.unboundOutputs.containsKey(name)) { > + // the current operator's output is not in the next operator's > input list > + throw new IllegalArgumentException("There are unbound output " + > name + " from the current operator " > + + this.currentOp.getSpec().getId() > + + " that are not included in the next operator's inputs. > Create a stream or call attach instead"); > + } > + } > + } > + > + private void canClose() { > + if (!this.unboundInputs.isEmpty() || !this.unboundOutputs.isEmpty()) { > + throw new IllegalStateException( > + "There are input/output streams in the topology that are not > bounded. Can't build the topology yet."); > + } > + } > + > +} > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > index 2854aeb..7f5b990 100644 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > @@ -29,7 +29,7 @@ import org.apache.samza.sql.api.data.Relation; > import org.apache.samza.sql.api.data.Stream; > import org.apache.samza.sql.api.data.Tuple; > import org.apache.samza.sql.api.operators.OperatorCallback; > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; > +import org.apache.samza.sql.operators.SimpleOperatorImpl; > import org.apache.samza.sql.operators.window.BoundedTimeWindow; > import org.apache.samza.sql.window.storage.OrderedStoreKey; > import org.apache.samza.storage.kv.Entry; > @@ -38,7 +38,6 @@ import org.apache.samza.task.TaskContext; > import org.apache.samza.task.TaskCoordinator; > import org.apache.samza.task.sql.SimpleMessageCollector; > > - > /** > * This class implements a simple stream-to-stream join > */ > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > index cc0aca0..eecff7e 100644 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > @@ -19,10 +19,11 @@ > > package org.apache.samza.sql.operators.join; > > +import java.util.ArrayList; > import java.util.List; > > import org.apache.samza.sql.api.data.EntityName; > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; > +import org.apache.samza.sql.operators.SimpleOperatorSpec; > > > /** > @@ -35,4 +36,16 @@ public class StreamStreamJoinSpec extends > SimpleOperatorSpec { > // TODO Auto-generated constructor stub > } > > + @SuppressWarnings("serial") > + public StreamStreamJoinSpec(String id, List<String> inputRelations, > String output, List<String> joinKeys) { > + super(id, new ArrayList<EntityName>() { > + { > + for (String input : inputRelations) { > + add(EntityName.getStreamName(input)); > + } > + } > + }, EntityName.getStreamName(output)); > + // TODO Auto-generated constructor stub > + } > + > } > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > index b93d789..0cba39a 100644 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > @@ -23,7 +23,7 @@ import org.apache.samza.config.Config; > import org.apache.samza.sql.api.data.Relation; > import org.apache.samza.sql.api.data.Tuple; > import org.apache.samza.sql.api.operators.OperatorCallback; > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; > +import org.apache.samza.sql.operators.SimpleOperatorImpl; > import org.apache.samza.storage.kv.Entry; > import org.apache.samza.storage.kv.KeyValueIterator; > import org.apache.samza.system.OutgoingMessageEnvelope; > @@ -32,7 +32,6 @@ import org.apache.samza.task.TaskContext; > import org.apache.samza.task.TaskCoordinator; > import org.apache.samza.task.sql.SimpleMessageCollector; > > - > /** > * This is an example build-in operator that performs a simple stream > re-partition operation. > * > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > index c47eed9..e494bff 100644 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.partition; > > import org.apache.samza.sql.api.data.EntityName; > import org.apache.samza.sql.api.operators.OperatorSpec; > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; > +import org.apache.samza.sql.operators.SimpleOperatorSpec; > import org.apache.samza.system.SystemStream; > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > index d81cc93..a9a83b5 100644 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > @@ -27,13 +27,12 @@ import org.apache.samza.sql.api.data.EntityName; > import org.apache.samza.sql.api.data.Relation; > import org.apache.samza.sql.api.data.Tuple; > import org.apache.samza.sql.api.operators.OperatorCallback; > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; > +import org.apache.samza.sql.operators.SimpleOperatorImpl; > import org.apache.samza.storage.kv.KeyValueIterator; > import org.apache.samza.task.TaskContext; > import org.apache.samza.task.TaskCoordinator; > import org.apache.samza.task.sql.SimpleMessageCollector; > > - > /** > * This class defines an example build-in operator for a fixed size > window operator that converts a stream to a relation > * > @@ -86,6 +85,7 @@ public class BoundedTimeWindow extends > SimpleOperatorImpl { > * @param lengthSec The window size in seconds > * @param input The input stream name > * @param output The output relation name > + * @param callback The user callback object > */ > public BoundedTimeWindow(String wndId, int lengthSec, String input, > String output, OperatorCallback callback) { > super(new WindowSpec(wndId, EntityName.getStreamName(input), > EntityName.getStreamName(output), lengthSec), callback); > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > index eec32ea..6c4eba8 100644 > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.window; > > import org.apache.samza.sql.api.data.EntityName; > import org.apache.samza.sql.api.operators.OperatorSpec; > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; > +import org.apache.samza.sql.operators.SimpleOperatorSpec; > > > /** > @@ -47,6 +47,11 @@ public class WindowSpec extends SimpleOperatorSpec > implements OperatorSpec { > this.wndSizeSec = lengthSec; > } > > + public WindowSpec(String id, int wndSize, String input) { > + super(id, EntityName.getStreamName(input), null); > + this.wndSizeSec = wndSize; > + } > + > /** > * Method to get the window state relation name > * > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > index b29838a..6950f67 100644 > --- > a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > +++ > b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > @@ -22,7 +22,7 @@ package org.apache.samza.task.sql; > import org.apache.samza.sql.api.data.Relation; > import org.apache.samza.sql.api.data.Tuple; > import org.apache.samza.sql.api.operators.OperatorCallback; > -import org.apache.samza.sql.operators.factory.NoopOperatorCallback; > +import org.apache.samza.sql.operators.NoopOperatorCallback; > import org.apache.samza.storage.kv.Entry; > import org.apache.samza.storage.kv.KeyValueIterator; > import org.apache.samza.system.OutgoingMessageEnvelope; > @@ -57,25 +57,38 @@ public class SimpleMessageCollector implements > MessageCollector { > * @param coordinator The {@link org.apache.samza.task.TaskCoordinator} > in the context > */ > public SimpleMessageCollector(MessageCollector collector, > TaskCoordinator coordinator) { > - this.collector = collector; > - this.coordinator = coordinator; > + this(collector, coordinator, new NoopOperatorCallback()); > } > > /** > * This method swaps the {@code callback} with the new one > * > - * <p> This method allows the {@link > org.apache.samza.sql.api.operators.SimpleOperator} to be swapped when the > collector > - * is passed down into the next operator's context. Hence, under the > new operator's context, the correct {@link > org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Relation, > MessageCollector, TaskCoordinator)}, > - * and {@link > org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Tuple, > MessageCollector, TaskCoordinator)} can be invoked > + * <p> This method allows the {@link > org.apache.samza.sql.api.operators.OperatorCallback} to be swapped when the > collector > + * is passed down into the next operator's context. Hence, under the > new operator's context, the correct callback functions can be invoked > * > * @param callback The new {@link > org.apache.samza.sql.api.operators.OperatorCallback} to be set > */ > - public void switchOperatorCallback(OperatorCallback callback) { > - this.callback = callback; > + public void switchCallback(OperatorCallback callback) { > + if (callback == null) { > + this.callback = new NoopOperatorCallback(); > + } else { > + this.callback = callback; > + } > + } > + > + /** > + * Method is declared to be final s.t. we enforce that the callback > functions are called first > + */ > + @Override > + final public void send(OutgoingMessageEnvelope envelope) { > + this.collector.send(envelope); > } > > /** > * Method is declared to be final s.t. we enforce that the callback > functions are called first > + * > + * @param deltaRelation The relation to be sent out > + * @throws Exception Throws exception if failed to send > */ > final public void send(Relation deltaRelation) throws Exception { > Relation rel = this.callback.afterProcess(deltaRelation, collector, > coordinator); > @@ -87,6 +100,9 @@ public class SimpleMessageCollector implements > MessageCollector { > > /** > * Method is declared to be final s.t. we enforce that the callback > functions are called first > + * > + * @param tuple The tuple to be sent out > + * @throws Exception Throws exception if failed to send > */ > final public void send(Tuple tuple) throws Exception { > Tuple otuple = this.callback.afterProcess(tuple, collector, > coordinator); > @@ -106,9 +122,4 @@ public class SimpleMessageCollector implements > MessageCollector { > protected void realSend(Tuple tuple) throws Exception { > this.collector.send((OutgoingMessageEnvelope) tuple.getMessage()); > } > - > - @Override > - public void send(OutgoingMessageEnvelope envelope) { > - this.collector.send(envelope); > - } > } > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > index 20dc701..7370af6 100644 > --- > a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > +++ > b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > @@ -22,6 +22,7 @@ package org.apache.samza.task.sql; > import org.apache.samza.config.Config; > import org.apache.samza.sql.api.data.Relation; > import org.apache.samza.sql.api.data.Tuple; > +import org.apache.samza.sql.api.operators.Operator; > import org.apache.samza.sql.api.operators.OperatorCallback; > import org.apache.samza.sql.data.IncomingMessageTuple; > import org.apache.samza.sql.operators.window.BoundedTimeWindow; > @@ -39,7 +40,7 @@ import org.apache.samza.task.WindowableTask; > * > */ > public class RandomWindowOperatorTask implements StreamTask, > InitableTask, WindowableTask { > - private BoundedTimeWindow wndOp; > + private Operator operator; > > private final OperatorCallback wndCallback = new OperatorCallback() { > > @@ -77,20 +78,20 @@ public class RandomWindowOperatorTask implements > StreamTask, InitableTask, Windo > public void process(IncomingMessageEnvelope envelope, MessageCollector > collector, TaskCoordinator coordinator) > throws Exception { > // based on tuple's stream name, get the window op and run process() > - wndOp.process(new IncomingMessageTuple(envelope), collector, > coordinator); > + operator.process(new IncomingMessageTuple(envelope), collector, > coordinator); > > } > > @Override > public void window(MessageCollector collector, TaskCoordinator > coordinator) throws Exception { > // based on tuple's stream name, get the window op and run process() > - wndOp.refresh(System.nanoTime(), collector, coordinator); > + operator.refresh(System.nanoTime(), collector, coordinator); > } > > @Override > public void init(Config config, TaskContext context) throws Exception { > // 1. create a fixed length 10 sec window operator > - this.wndOp = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", > "relation1", this.wndCallback); > - this.wndOp.init(config, context); > + this.operator = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", > "wndOutput", this.wndCallback); > + this.operator.init(config, context); > } > } > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > ---------------------------------------------------------------------- > diff --git > a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > index 9124e3c..d65892c 100644 > --- > a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > +++ > b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > @@ -24,7 +24,7 @@ import java.util.List; > > import org.apache.samza.config.Config; > import org.apache.samza.sql.data.IncomingMessageTuple; > -import org.apache.samza.sql.operators.factory.SimpleRouter; > +import org.apache.samza.sql.operators.SimpleRouter; > import org.apache.samza.sql.operators.join.StreamStreamJoin; > import org.apache.samza.sql.operators.partition.PartitionOp; > import org.apache.samza.sql.operators.window.BoundedTimeWindow; > @@ -51,25 +51,25 @@ import org.apache.samza.task.WindowableTask; > */ > public class StreamSqlTask implements StreamTask, InitableTask, > WindowableTask { > > - private SimpleRouter rteCntx; > + private SimpleRouter router; > > @Override > public void process(IncomingMessageEnvelope envelope, MessageCollector > collector, TaskCoordinator coordinator) > throws Exception { > - this.rteCntx.process(new IncomingMessageTuple(envelope), collector, > coordinator); > + this.router.process(new IncomingMessageTuple(envelope), collector, > coordinator); > } > > @Override > public void window(MessageCollector collector, TaskCoordinator > coordinator) throws Exception { > - this.rteCntx.refresh(System.nanoTime(), collector, coordinator); > + this.router.refresh(System.nanoTime(), collector, coordinator); > } > > @Override > public void init(Config config, TaskContext context) throws Exception { > // create all operators via the operator factory > // 1. create two window operators > - BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10, > "inputStream1", "fixedWndOutput1"); > - BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10, > "inputStream2", "fixedWndOutput2"); > + BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10, > "kafka:inputStream1", "fixedWndOutput1"); > + BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10, > "kafka:inputStream2", "fixedWndOutput2"); > // 2. create one join operator > @SuppressWarnings("serial") > List<String> inputRelations = new ArrayList<String>() { > @@ -86,19 +86,19 @@ public class StreamSqlTask implements StreamTask, > InitableTask, WindowableTask { > } > }; > StreamStreamJoin join = new StreamStreamJoin("joinOp", > inputRelations, "joinOutput", joinKeys); > - // 4. create a re-partition operator > + // 3. create a re-partition operator > PartitionOp par = new PartitionOp("parOp1", "joinOutput", "kafka", > "parOutputStrm1", "joinKey", 50); > > // Now, connecting the operators via the OperatorRouter > - this.rteCntx = new SimpleRouter(); > + this.router = new SimpleRouter(); > // 1. set two system input operators (i.e. two window operators) > - this.rteCntx.addOperator(wnd1); > - this.rteCntx.addOperator(wnd2); > + this.router.addOperator(wnd1); > + this.router.addOperator(wnd2); > // 2. connect join operator to both window operators > - this.rteCntx.addOperator(join); > + this.router.addOperator(join); > // 3. connect re-partition operator to the stream operator > - this.rteCntx.addOperator(par); > + this.router.addOperator(par); > > - this.rteCntx.init(config, context); > + this.router.init(config, context); > } > } > > -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org