Thanks for the quick response.

Milinda

On Tue, Jun 2, 2015 at 2:02 AM, Navina Ramesh <nram...@linkedin.com.invalid>
wrote:

> Sorry about the confusion, Milinda.
>
> Thanks for reverting, Yi!
>
> On 6/1/15, 10:53 PM, "Yi Pan" <nickpa...@gmail.com> wrote:
>
> >Hi, Milinda,
> >
> >That was an accidental mistake. I have reverted the check-in. I am still
> >working on that. Thanks!
> >
> >-Yi
> >
> >On Mon, Jun 1, 2015 at 9:34 PM, Milinda Pathirage <mpath...@umail.iu.edu>
> >wrote:
> >
> >> Hi Navina,
> >>
> >> Did we decided to push this patch to samza-sql branch. I thought Yi is
> >> still working on this. Some Git conflict related texts are still there
> >>in
> >> this commit.
> >>
> >> +<<<<<<< HEAD
> >> +   * The callback object
> >> +=======
> >> +   * The callback function
> >> +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of
> >> callbacks w/o inheriting and creating many sub-classes from operators
> >>
> >> Milinda
> >>
> >> On Mon, Jun 1, 2015 at 9:06 PM, <nav...@apache.org> wrote:
> >>
> >> > Yi's TopologyBuilder RB 34500
> >> >
> >> >
> >> > Project: http://git-wip-us.apache.org/repos/asf/samza/repo
> >> > Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/45b85477
> >> > Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/45b85477
> >> > Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/45b85477
> >> >
> >> > Branch: refs/heads/samza-sql
> >> > Commit: 45b854772cf36cc69e8d8cda7a51bce1be5fe576
> >> > Parents: 41c4cd0
> >> > Author: Navina <navi.trin...@gmail.com>
> >> > Authored: Thu May 28 18:51:30 2015 -0700
> >> > Committer: Navina <navi.trin...@gmail.com>
> >> > Committed: Thu May28 18:51:30 2015 -0700
> >> >
> >> > ----------------------------------------------------------------------
> >> >  .../apache/samza/sql/api/data/EntityName.jaa   |  41 ++-
> >> >  .../org/apache/samza/sql/api/data/Table.java    |   7 +-
> >> >  .../samza/sql/api/operators/Operator.java       |   4 +
> >> >  .../sql/api/operators/OperatorCallback.java    |   1 -
> >> >  .../samza/sql/api/operators/OperatorRouter.java |   8 +
> >> >  .../samza/sql/api/operators/OperatorSink.java   |  30 ++
> >> >  .../samza/sql/api/operators/OperatorSource.java |  30 ++
> >> >  .../samza/sql/api/operators/SimpleOperator.java |   3 +-
> >> >  .../samza/sql/data/IncomingMessageTuple.java    |   1 -
> >> >  .../sql/operators/NoopOperatorCallback.java     |  53 ++++
> >> >  .../samza/sql/operators/OperatorTopology.java   |  53 ++++
> >> >  .../samza/sql/operators/SimpleOperatorImpl.java | 147 ++++++++++
> >> >  .../samza/sql/operators/SimpleOperatorSpec.java | 106 +++++++
> >> >  .../samza/sql/operators/SimpleRouter.java       | 141 +++++++++
> >> >  .../operators/factory/NoopOperatorCallback.java |  50 ----
> >> >  .../operators/factory/SimpleOperatorImpl.java   | 136 ---------
> >> >  .../operators/factory/SimpleOperatorSpec.java   | 106 -------
> >> >  .../sql/operators/factory/SimpleRouter.java     | 136 ---------
> >> >  .../sql/operators/factory/TopologyBuilder.java  | 284
> >> +++++++++++++++++++
> >> >  .../sql/operators/join/StreamStreamJoin.java    |   3 +-
> >> >  .../operators/join/StreamStreamJoinSpec.java    |  15 +-
> >> >  .../sql/operators/partition/PartitionOp.java    |   3 +-
> >> >  .../sql/operators/partition/PartitionSpec.java  |   2 +-
> >> >  .../sql/operators/window/BoundedTimeWindow.java |   4 +-
> >> >  .../samza/sql/operators/window/WindowSpec.java  |   7 +-
> >> >  .../samza/task/sql/SimpleMessageCollector.java  |  37 ++-
> >> >  .../task/sql/RandomWindowOperatorTask.java      |  11 +-
> >> >  .../apache/samza/task/sql/StreamSqlTask.java    |  26 +-
> >> >  .../samza/task/sql/UserCallbacksSqlTask.java    |  66 ++---
> >> >  29 files changed, 991 insertions(+), 520 deletions(-)
> >> > ----------------------------------------------------------------------
> >> >
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/api/data/EntityName.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j
> >>ava
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j
> >>ava
> >> > index 80ba455..df1b11b 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j
> >>ava
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.j
> >>ava
> >> > @@ -49,6 +49,8 @@ public class EntityName {
> >> >     */
> >> >    private final String name;
> >> >
> >> > +  private final boolean isSystemEntity;
> >> > +
> >> >    /**
> >> >     * Static map of already allocated table names
> >> >     */
> >> > @@ -59,15 +61,19 @@ public class EntityName {
> >> >     */
> >> >    private static Map<String, EntityName> streams = new
> >>HashMap<String,
> >> > EntityName>();
> >> >
> >> > +  private static final String ANONYMOUS = "anonymous";
> >> > +
> >> >    /**
> >> >     * Private ctor to create entity names
> >> >     *
> >> >     * @param type Type of the entity name
> >> >     * @param name Formatted name of the entity
> >> > +   * @param isSystemEntity whether the entity is a system
> >>input/output
> >> >     */
> >> > -  private EntityName(EntityType type, String name) {
> >> > +  private EntityName(EntityType type, String name, boolean
> >> > isSystemEntity) {
> >> >      this.type = type;
> >> >      this.name = name;
> >> > +    this.isSystemEntity = isSystemEntity;
> >> >    }
> >> >
> >> >    @Override
> >> > @@ -10,6 +108,10 @@ public class EntityName {
> >> >      return this.type.equals(EntityType.STREAM);
> >> >    }
> >> >
> >> > +  public boolean isSysteEntity() {
> >> > +    return this.isSystemEntity;
> >> > +  }
> >> > +
> >> >    /**
> >> >     * Get the formatted entity name
> >> >     *
> >> > @@ -111,15 +121,24 @@ public class EntityName {
> >> >      return this.name;
> >> >    }
> >> >
> >> > +  public static EntityName getTableName(String name) {
> >> > +    return getTableName(name, false);
> >> > +  }
> >> > +
> >> > +  public static EntityName getStreamName(Sting name) {
> >> > +    return getStreamName(name, false);
> >> > +  }
> >> > +
> >> >    /**
> >> >     * Static method to get the instance of {@code EntityName} with
> >>type
> >> > {@code EntityType.TABLE}
> >> >     *
> >> >     * @param name The formatted entity name of the relation
> >> > +   * @param isSystem The boolean flag indicating whether this is a
> >> system
> >> > input/output
> >> >     * @return A <code>EntityName</code> for a relation
> >> >     */
> >> > -  public static EntityName getTableName(String name) {
> >> > +  public static EntityName getTableName(String name, boolean
> >>isSystem) {
> >> >      if (tables.get(name) == null) {
> >> > -      tables.put(name, new EntityName(EntityType.TABLE, name));
> >> > +      tables.put(name, new EntityName(EntityType.TABLE, name,
> >> isSystem));
> >> >      }
> >> >      return tables.get(name);
> >> >    }
> >> > @@ -128,13 +147,25 @@ public class EntityName {
> >> >     * Static method to get the instance of <code>EntityName</code>
> >>with
> >> > type <code>EntityType.STREAM</code>
> >> >     *
> >> >     * @param name The formatted ntity name of the stream
> >> > +   * @param isSystem The boolean flag indicating whethr this is a
> >> system
> >> > input/output
> >> >     * @return A <code>EntityName</code> for a stream
> >> >     */
> >> > -  public static EntityName getStreamName(String name) {
> >> > +  public static EntityName getStreamName(String name, boolean
> >>isSystem)
> >> {
> >> >      if (streams.get(name) == null) {
> >> > -      streams.put(name, new EntityName(EntityType.STREAM, name));
> >> > +      streams.put(name, new EntityName(EntityType.STREAM, name,
> >>> isSystem));
> >> >      }
> >> >      return streams.get(name);
> >> >    }
> >> >
> >> > +  public static EntityName getAnonymousStream() {
> >> > +    return getStreamName(ANONYMOUS);
> >> > +  }
> >> > +
> >> > +  public static EntityName getAnonymousTable() {
> >> > +    return getTableName(ANONYMOUS);
> >> > +  }
> >> > +
> >> > +  public boolean isAnonymous() {
> >> > +    return this.name.equals(ANONYMOUS);
> >> > +  }
> >> >  }
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/api/data/Table.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> >> >
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> >> > index 7b4d984..b4dce07 100644
> >> > ---
> >> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> >> > +++
> >> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> >> > @@ -19,6 +19,9 @@
> >> >
> >> >  packag org.apache.samza.sql.api.data;
> >> >
> >> > +import java.util.List;
> >> > +
> >> > +
> >> >  /**
> >> >   * This interface defines a non-ordered {@link
> >> > org.apache.samza.sql.api.data.Relation}, which has a unique primary
> >>key
> >> >   *
> >> > @@ -31,8 +34,8 @@ public interface Table<K> extends Relation<K> {
> >> >    /**
> >> >     * Get the primary key field name for this table
> >> >     *
> >> > -   * @return The name of the primary key fild
> >> > +   * @return The names of the primary key fields
> >> >     */
> >> > -  String getPrimaryKeyName();
> >> > +  List<String> getPrimaryKeyNames();
> >> >
> >> >  }
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/api/operators/Operator.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>r.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>r.java
> >> > index d6f6b57.9c6eaa5 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>r.java
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>r.java
> >> > @@ -27,7 +27,11 @@ import org.apache.samza.task.TaskContext;
> >> >  import org.apache.samza.task.TasCoordinator;
> >> >
> >> >
> >> > +/**
> >> > + * This class defines the common interface for operator classes.
> >> > + */
> >> >  public interface Operator {
> >> > +
> >> >    /**
> >> >     * Method to initialize the operator
> >> >     *
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>rCallback.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>rCallback.java
> >> > index fb2aa89..5a77d95 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/sr/main/java/org/apache/samza/sql/api/operators/Operato
> >>rCallback.java
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>rCallback.java
> >> > @@ -23,7 +23,6 @@ import org.apache.saza.sql.api.data.Tuple;
> >> >  import org.apache.samza.task.MessageCollector;
> >> >  import org.apache.samza.task.TaskCoordinator;
> >> >
> >> > -
> >> >  /**
> >> >   * Defines the callback functions to allow customized functions to be
> >> >invoked before process and before sending the result
> >> >   */
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
> >> > ---------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>rRuter.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>rRouter.java
> >> > index 0759638..432e6b3 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>rRouter.java
> >> > +++
> >> >
> >>
> >>b/sama-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>rRouter.java
> >> > @@ -19,6 +19,7 @@
> >> >
> >> >  package org.apache.samza.sql.api.operators;
> >> >
> >> > +import java.util.Iterator;
> >> >  mport java.util.List;
> >> >
> >> >  import org.apache.samza.sql.api.data.EntityName;
> >> > @@ -51,4 +52,11 @ public interface OperatorRouter extends Operator {
> >> >     */
> >> >    List<SimpleOperator> getNextOperators(EntityName output);
> >> >
> >> > +  /**
> >> > +   * This method provides an iterator to go through all operators
> >> > connected via {@code OperatorRouter}
> >> > +   *
> >> > +   * @return An {@link java.util.Iterator} for all operators
> >>connected
> >> > via {@code OperatorRouter}
> >> > +   */
> >> > +  Iterator<SimpleOperator> iterator();
> >> > +
> >> >  }
> >> >
> >> >
> >> >
> >>
> >>http://git-wip-us.apache.og/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>rSink.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>rSink.java
> >> > new file mode 100644
> >> > index 0000000..e2c748c
> >> > --- /dev/null
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>rSink.java
> >> > @@ -0,0 +1,30 @@
> >> > +/*
> >> > + * Licensed to the Apache Software Foundation (ASF) under one
> >> > + * or more contributor license agreements.  See the NOTICE file
> >> > + * distributed with this work for additional information
> >> > + * regarding copyright ownership.  The ASF licenses this file
> >> > + * to you under the Apache License, Version 2.0 (the
> >> > + * "License"); you may not use this file except in compliance
> >> > + * with the License.  You may obtain a copy of the License at
> >> > + *
> >> > + *   http://www.apache.org/licenses/LICENSE-2.0
> >> > + *
> >> > + * Unless required by applicable law or agreed to in writing,
> >> > + * software distributed under the License is distributed on an
> >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >> > + * KIND, either express or implied.  See the License for the
> >> > + * specific language governing permissions and limitations
> >> > + * under the License.
> >> > + */
> >> > +package org.apache.samza.sql.api.operators;
> >> > +
> >> > +import java.util.Iterator;
> >> > +
> >> > +import org.apache.samza.sql.api.data.EntityName;
> >> > +
> >> > +
> >> > +public interface OperatorSink {
> >> > +  Iterator<SimpleOperator> opIterator();
> >> > +
> >> > +  EntityName getName();
> >> > +}
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>rSource.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>rSource.java
> >> > new file mode 100644
> >> > index 0000000..860c1aa
> >> > --- /dev/null
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operato
> >>rSource.java
> >> > @@ -0,0 +1,30 @@
> >> > +/*
> >> > + * Licensed to the Apache Software Foundation (ASF) under one
> >> > + * or more contributor license agreements.  See the NOTICE file
> >> > + * distributed with this work for additional information
> >> > + * regarding copyright ownership.  The ASF licenses this file
> >> > + * to you under the Apache License, Version 2.0 (the
> >> > + * "License"); you may not use this file except in compliance
> >> > + * with the License.  You may obtain a copy of the License at
> >> > + *
> >> > + *   http://www.apache.org/licenses/LICENSE-2.0
> >> > + *
> >> > + * Unless required by applicable law or agreed to in writing,
> >> > + * software distributed under the License is distributed on an
> >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >> > + * KIND, either express or implied.  See the License for the
> >> > + * specific language governing permissions and limitations
> >> > + * under the License.
> >> > + */
> >> > +package org.apache.samza.sql.api.operators;
> >> > +
> >> > +import java.util.Iterator;
> >> > +
> >> > +import org.apache.samza.sql.api.data.EntityName;
> >> > +
> >> > +
> >> > +public interface OperatorSource {
> >> > +  Iterator<SimpleOperator> opIterator();
> >> > +
> >> > +  EntityName getName();
> >> > +}
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO
> >>perator.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO
> >>perator.java
> >> > index c49a822..60ace9c 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO
> >>perator.java
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleO
> >>perator.java
> >> > @@ -19,8 +19,6 @@
> >> >
> >> >  package org.apache.samza.sql.api.operators;
> >> >
> >> > -
> >> > -
> >> >  /**
> >> >   * The interface for a {@code SimpleOperator} that implements a
> >>simple
> >> > primitive relational logic operation
> >> >   */
> >> > @@ -31,4 +29,5 @@ public interface SimpleOperator extends Operator {
> >> >     * @return The {@link
> >>org.apache.samza.sql.api.operators.OperatorSpec}
> >> > object that defines the configuration/parameters of the operator
> >> >     */
> >> >    OperatorSpec getSpec();
> >> > +
> >> >  }
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT
> >>uple.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT
> >>uple.java
> >> > index 72a59f2..af040f0 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT
> >>uple.java
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageT
> >>uple.java
> >> > @@ -81,7 +81,6 @@ public class IncomingMessageTuple implements Tuple {
> >> >
> >> >    @Override
> >> >    public long getCreateTimeNano() {
> >> > -    // TODO: this is wrong and just to keep as an placeholder. It
> >>should
> >> > be replaced by the message publish time when the publish timestamp is
> >> > available in the message metadata
> >> >      return this.recvTimeNano;
> >> >    }
> >> >
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperato
> >>rCallback.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperato
> >>rCallback.java
> >> > new file mode 100644
> >> > index 0000000..e951737
> >> > --- /dev/null
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperato
> >>rCallback.java
> >> > @@ -0,0 +1,53 @@
> >> > +/*
> >> > + * Licensed to the Apache Software Foundation (ASF) under one
> >> > + * or more contributor license agreements.  See the NOTICE file
> >> > + * distributed with this work for additional information
> >> > + * regarding copyright ownership.  The ASF licenses this file
> >> > + * to you under the Apache License, Version 2.0 (the
> >> > + * "License"); you may not use this file except in compliance
> >> > + * with the License.  You may obtain a copy of the License at
> >> > + *
> >> > + *   http://www.apache.org/licenses/LICENSE-2.0
> >> > + *
> >> > + * Unless required by applicable law or agreed to in writing,
> >> > + * software distributed under the License is distributed on an
> >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >> > + * KIND, either express or implied.  See the License for the
> >> > + * specific language governing permissions and limitations
> >> > + * under the License.
> >> > + */
> >> > +package org.apache.samza.sql.operators;
> >> > +
> >> > +import org.apache.samza.sql.api.data.Relation;
> >> > +import org.apache.samza.sql.api.data.Tuple;
> >> > +import org.apache.samza.sql.api.operators.OperatorCallback;
> >> > +import org.apache.samza.task.MessageCollector;
> >> > +import org.apache.samza.task.TaskCoordinator;
> >> > +
> >> > +
> >> > +/**
> >> > + * This is a default NOOP operator callback object that does nothing
> >> > before and after the process method
> >> > + */
> >> > +public final class NoopOperatorCallback implements OperatorCallback {
> >> > +
> >> > +  @Override
> >> > +  public Tuple beforeProcess(Tuple tuple, MessageCollector collector,
> >> > TaskCoordinator coordinator) {
> >> > +    return tuple;
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public Relation beforeProcess(Relation rel, MessageCollector
> >> collector,
> >> > TaskCoordinator coordinator) {
> >> > +    return rel;
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public Tuple afterProcess(Tuple tuple, MessageCollector collector,
> >> > TaskCoordinator coordinator) {
> >> > +    return tuple;
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public Relation afterProcess(Relation rel, MessageCollector
> >>collector,
> >> > TaskCoordinator coordinator) {
> >> > +    return rel;
> >> > +  }
> >> > +
> >> > +}
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTop
> >>ology.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTop
> >>ology.java
> >> > new file mode 100644
> >> > index 0000000..8b70092
> >> > --- /dev/null
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTop
> >>ology.java
> >> > @@ -0,0 +1,53 @@
> >> > +/*
> >> > + * Licensed to the Apache Software Foundation (ASF) under one
> >> > + * or more contributor license agreements.  See the NOTICE file
> >> > + * distributed with this work for additional information
> >> > + * regarding copyright ownership.  The ASF licenses this file
> >> > + * to you under the Apache License, Version 2.0 (the
> >> > + * "License"); you may not use this file except in compliance
> >> > + * with the License.  You may obtain a copy of the License at
> >> > + *
> >> > + *   http://www.apache.org/licenses/LICENSE-2.0
> >> > + *
> >> > + * Unless required by applicable law or agreed to in writing,
> >> > + * software distributed under the License is distributed on an
> >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >> > + * KIND, either express or implied.  See the License for the
> >> > + * specific language governing permissions and limitations
> >> > + * under the License.
> >> > + */
> >> > +package org.apache.samza.sql.operators;
> >> > +
> >> > +import java.util.Iterator;
> >> > +
> >> > +import org.apache.samza.sql.api.data.EntityName;
> >> > +import org.apache.samza.sql.api.operators.OperatorSink;
> >> > +import org.apache.samza.sql.api.operators.OperatorSource;
> >> > +import org.apache.samza.sql.api.operators.SimpleOperator;
> >> > +
> >> > +
> >> > +/**
> >> > + * This class implements a partially completed {@link
> >> > org.apache.samza.sql.operators.factory.TopologyBuilder} that
> >>signifies a
> >> > partially completed
> >> > + * topology that the current operator has unbounded input stream that
> >> can
> >> > be attached to other operators' output
> >> > + */
> >> > +public class OperatorTopology implements OperatorSource,
> >>OperatorSink {
> >> > +
> >> > +  private final EntityName name;
> >> > +  private final SimpleRouter router;
> >> > +
> >> > +  public OperatorTopology(EntityName name, SimpleRouter router) {
> >> > +    this.name = name;
> >> > +    this.router = router;
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public Iterator<SimpleOperator> opIterator() {
> >> > +    return this.router.iterator();
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public ntityName getName() {
> >> > +    return this.name;
> >> > +  }
> >> > +
> >> > +}
> >> >
> >> >
> >> >
> >>
> >http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera
> >>torImpl.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera
> >>torImpl.java
> >> > new file mode 10644
> >> > index 0000000..423880b
> >> > --- /dev/null
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/rg/apache/samza/sql/operators/SimpleOpera
> >>torImpl.java
> >> > @@ -0,0 +1,147 @@
> >> > +/*
> >> > + * Licensed to the Apache Software Foundation (ASF) under one
> >> > + * or more contributor license agreements.  See the NOTICE file
> >> > + * distributed with this work for additional information
> >> > + * regarding copyright ownership.  The ASF licenses this file
> >> > + * to you under the Apache License, Version 2.0 (the
> >> > + * "License"); you may not use this file except in compliance
> >> > + * with the License.  You may obtain a copy of the License at
> >> > + *
> >> > + *   http://www.apache.org/licenses/LICENSE-2.0
> >> > + *
> >> > + * Unless required by applicable law or agreed to in writing,
> >> > + * software distributed under the License is distributed on an
> >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >> > + * KIND, either express or implied.  See the License for the
> >> > + * specific language governing permissions and limitations
> >> > + * under the License.
> >> > + */
> >> > +
> >> > +package org.apache.samza.sql.operators;
> >> > +
> >> > +import org.apache.samza.sql.api.data.Relation;
> >> > +import org.apache.samza.sql.api.data.Tuple;
> >> > +import org.apache.samza.sql.api.operators.OperatorCallback;
> >> > +import org.aache.samza.sql.api.operators.OperatorSpec;
> >> > +import org.apache.samza.sql.api.operators.SimpleOperator;
> >> > +import org.apache.samza.task.MessageCollector;
> >> > +import org.apache.samza.task.TaskCoordinator;
> >> > +import org.apache.samza.task.sql.SimpleMessageCollector;
> >> > +
> >> > +
> >> > +/**
> >> > + * An abstract class that encapsulate the basic information and
> >>methods
> >> > that all operator classes should implement.
> >> > + * It implements the interface {@link
> >> > org.apache.samza.sql.api.operators.SimpleOperator}
> >> > + */
> >> > +public abstract class SimpleOperatorImpl implements SimpleOperator {
> >> > +  /**
> >> > +   * The specification of this operator
> >> > +   */
> >> > +  private final OperatorSpec spec;
> >> > +
> >> > +  /**
> >> > +<<<<<<<HEAD
> >> > +   * The callback object
> >> > +=======
> >> > +   * The callback function
> >> > +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of
> >> > callbacks w/o inheriting and creating many sub-classes from operators
> >> > +   */
> >> > +  private final OperatorCallback callback;
> >> > +
> >> > +  /**
> >> > +   * Ctor of {@code SimpleOperatorImpl} class
> >> > +   *
> >> > +   * @param spec The specification of this operator
> >> > +   */
> >> > +  public SimpleOperatorImpl(OperatorSpec spec) {
> >> > +    this(spec, new NoopOperatorCallback());
> >> > +  }
> >> > +
> >> > +  public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback
> >> callback)
> >> > {
> >> > +    this.spec = spec;
> >> > +    this.callback = callback;
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public OperatorSpec getSpec() {
> >> > +    return this.spec;
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * This method is made final s.t. the sequence of invocations
> >>between
> >> > {@link
> >> >
> >>
> >>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relatio
> >>n,
> >> > MessageCollector, TaskCoordinator)}
> >> > +   * and real processing of the input relation is fixed.
> >> > +   */
> >> > +  @Override
> >> > +  final public void process(Relation deltaRelation, MessageCollector
> >> > collector, TaskCoordinator coordinator)
> >> > +      throws Exception {
> >> > +    Relation rel = this.callback.beforeProcess(deltaRelation,
> >>collector,
> >> > coordinator);
> >> > +    if (rel == null) {
> >> > +      return;
> >> > +    }
> >> > +    this.realProcess(rel, getCollector(collector, coordinator),
> >> > coordinator);
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * This method is made final s.t. the sequence of invocations
> >>between
> >> > {link
> >> >
> >>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple,
> >> > MessageCollector, TaskCoordinator)}
> >> > +   * and real processing of the input tuple is fixed.
> >> > +   */
> >>> +  @Override
> >> > +  final public void process(Tuple tuple, MessageCollector collector,
> >> > TaskCoordinator coordinator) throws Exception {
> >> > +    Tuple itupl = this.callback.beforeProcess(tuple, collector,
> >> > coordinator);
> >> > +    if (ituple == null) {
> >> > +      return;
> >> > +    }
> >> > +    this.realProcess(ituple, getCollector(collector, coordinator),
> >> > coordinator);
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * This method is made final s.t. we enforce the invocation of
> >>{@code
> >> > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)}
> >> bfore
> >> > doing anything futher
> >> > +   */
> >> > +  @Override
> >> > +  final public void refresh(long timeNano, MessageCollector
> >>collector,
> >> > TaskCoordinator coordinator) throws Exception {
> >> > +    this.realRefresh(timeNano, getCollector(collecto, coordinator),
> >> > coordinator);
> >> > +  }
> >> > +
> >> > +  private SimpleMessageCollector getCollector(MessageCollector
> >> collector,
> >> > TaskCoordinator coordinator) {
> >> > +    if (!(collector instanceof SimpleMessageCollector)) {
> >> > +      return new SimpleMessageCollector(collector, coordnator,
> >> > this.callback);
> >> > +    } else {
> >> > +      ((SimpleMessageCollector)
> >> collector).switchCallback(this.callback);
> >> > +      return (SimpleMessageCollector) collector;
> >> > +    }
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Method to be overriden by each specific implementation class of
> >> > operator to handle timeout event
> >> > +   *
> >> > +   * @param timeNano The time in nanosecond when the timeout event
> >> > occurred
> >> > +   * @param collector The {@link
> >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
> >> > +   * @param coordinator The {@link
> >> org.apache.samza.task.TaskCoordinator}
> >> > in the context
> >> > +   * @throws Exception Throws exception if failed to refresh the
> >>results
> >> > +   */
> >> > +  protected abstract void realRefresh(long timeNano,
> >> > SimpleMessageCollector collector, TaskCoordinator coordinator)
> >> > +      throws Exception;
> >> > +
> >> > +  /**
> >> > +   * Method to be overriden by each specific implementation class of
> >> > operator to perform relational logic operation on an input {@link
> >> > org.apache.samza.sql.api.data.Relation}
> >> > +   *
> >> > +   * @param rel The input relation
> >> > +   * @param collector The {@link
> >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
> >> > +   * @param coordinator The {@link
> >> org.apache.samza.task.TaskCoordinator}
> >> > in the context
> >> > +   * @throws Exception Throws exception if failed to process
> >> > +   */
> >> > +  protected abstract void realProcess(Relation rel,
> >> > SimpleMessageCollector collector, TaskCoordinator coordinator)
> >> > +      throws Exceptio;
> >> > +
> >> > +  /**
> >> > +   * Method to be overriden by each specific implementation class of
> >> > operator to perform relational logic operation on an input {@ink
> >> > org.apache.samza.sql.api.data.Tuple}
> >> > +   *
> >> > +   * @param ituple The input tuple
> > > +   * @param collector The {@link
> >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
> >> > +   * @param coordinator The {@link
> >> org.apache.samza.task.TaskCoordinator}
> >> > in the context
> >> > +   * @throws Exception Throws exception if failed to process
> >> > +   */
> >> > +  protected abstract void realProcess(Tuple ituple,
> >> > SimpleMessageCollector collector, TaskCoordinator coordinator)
> >> > +     throws Exception;
> >> > +
> >> > +}
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera
> >>torSpec.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera
> >>torSpec.java
> >> > new file mode 100644
> >> > index 0000000..691e543
> >> > --- /dev/null
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOpera
> >>torSpec.java
> >> > @@ -0,0 +1,106 @@
> >> > +/*
> >> > + * Licensed to the Apache Software Foundation (ASF) under one
> >> > + * or more contributor license agreements.  See the NOTICE file
> >> > + * distributed with this work for additional information
> >> > + * regarding copyright ownership.  The ASF licenses this file
> >> > + * to you under the Apache License, Version 2.0 (the
> >> > + * "License"); you may not use this file except in compliance
> >> > + * with the License.  You may obtain a copy of the License at
> >> > + *
> >> > + *   http://www.apache.org/licenses/LICENSE-2.0
> >> > + *
> >> > + * Unless required by applicable law or agreed to in writing,
> >> > + * software distributed under the License is distributed on an
> >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >> > + * KIND, either express or implied.  See the License for the
> >> > + * specific language governing permissions and limitations
> >> > + * under the License.
> >> > + */
> >> > +package org.apache.samza.sql.operators;
> >> > +
> >> > +import java.util.ArrayList;
> >> > +import java.util.List;
> >> > +
> >> > +import org.apache.samza.sql.api.data.EntityName;
> >> > +import org.apache.samza.sql.api.operators.OperatorSpec;
> >> > +
> >> > +
> >> > +/**
> >> > + * An abstract class that encapsulate the basic information and
> >>methods
> >> > that all specification of operators should implement.
> >> > + * It implements {@link
> >>org.apache.samza.sql.api.operators.OperatorSpec}
> >> > + */
> >> > +public abstract class SimpleOperatorSpec implements OperatorSpec {
> >> > +  /**
> >> > +   * The identifier of the corresponding operator
> >> > +   */
> >> > +  private final String id;
> >> > +
> >> > +  /**
> >> > +   * The list of input entity names of the corresponding operator
> >> > +   */
> >> > +  private final List<EntityName> inputs = new
> >>ArrayList<EntityName>();
> >> > +
> >> > +  /**
> >> > +   * The list of output entity names of the corresponding operator
> >> > +   */
> >> > +  private final List<EntityName> outputs = new
> >>ArrayList<EntityName>();
> >> > +
> >> > +  /**
> >> > +   * Ctor of the {@code SimpleOperatorSpec} for simple {@link
> >> > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and
> >>one
> >> > output
> >> > +   *
> >> > +   * @param id Unique identifier of the {@link
> >> > org.apache.samza.sql.api.operators.SimpleOperator} object
> >> > +   * @param input The only input entity
> >> > +   * @param output The only output entity
> >> > +   */
> >> > +  public SimpleOperatorSpec(String id, EntityName input, EntityName
> >> > output) {
> >> > +    this.id = id;
> >> > +    this.inputs.add(input);
> >> > +    this.outputs.add(output);
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Ctor of {@code SimpleOperatorSpec} with general format: m inputs
> >> and
> >> > n outputs
> >> > +   *
> >> > +   * @param id Unique identifier of the {@link
> >> > org.apache.samza.sql.api.operators.SimpleOperator} object
> >> > +   * @param inputs The list of input entities
> >> > +   * @param output The list of output entities
> >> > +   */
> >> > +  public SimpleOperatorSpec(String id, List<EntityName> inputs,
> >> > EntityName output) {
> >> > +    this.id = id;
> >> > +    this.inputs.addAll(inputs);
> >> > +    this.outputs.add(output);
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public String getId() {
> >> > +    return this.id;
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public List<EntityName> getInputNames() {
> >> > +    return this.inputs;
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public List<EntityName> getOutputNames() {
> >> > +    return this.outputs;
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Method to get the first output entity
> >> > +   *
> >> > +   * @return The first output entity name
> >> > +   */
> >> > +  public EntityName getOutputName() {
> >> > +    return this.outputs.get(0);
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Method to get the first input entity
> >> > +   *
> >> > +   * @return The first input entity name
> >> > +   */
> >> > +  public EntityName getInputName() {
> >> > +    return this.inputs.get(0);
> >> > +  }
> >> > +}
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRoute
> >>r.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRoute
> >>r.java
> >> > new file mode 100644
> >> > index 0000000..2d9a1db
> >> > --- /dev/null
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRoute
> >>r.java
> >> > @@ -0,0 +1,141 @@
> >> > +/*
> >> > + * Licensed to the Apache Software Foundation (ASF) under one
> >> > + * or more contributor license agreements.  See the NOTICE file
> >> > + * distributed with this work for additional information
> >> > + * regarding copyright ownership.  The ASF licenses this file
> >> > + * to you under the Apache License, Version 2.0 (the
> >> > + * "License"); you may not use this file except in compliance
> >> > + * with the License.  You may obtain a copy of the License at
> >> > + *
> >> > + *   http://www.apache.org/licenses/LICENSE-2.0
> >> > + *
> >> > + * Unless required by applicable law or agreed to in writing,
> >> > + * software distributed under the License is distributed on an
> >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >> > + * KIND, either express or implied.  See the License for the
> >> > + * specific language governing permissions and limitations
> >> > + * under the License.
> >> > + */
> >> > +
> >> > +package org.apache.samza.sql.operators;
> >> > +
> >> > +import java.util.ArrayList;
> >> > +import java.util.HashMap;
> >> > +import java.util.HashSet;
> >> > +import java.util.Iterator;
> >> > +import java.util.List;
> >> > +import java.util.Map;
> >> > +import java.util.Set;
> >> > +
> >> > +import org.apache.samza.config.Config;
> >> > +import org.apache.samza.sql.api.data.EntityName;
> >> > +import org.apache.samza.sql.api.data.Relation;
> >> > +import org.apache.samza.sql.api.data.Tuple;
> >> > +import org.apache.samza.sql.api.operators.Operator;
> >> > +import org.apache.samza.sql.api.operators.OperatorRouter;
> >> > +import org.apache.samza.sql.api.operators.SimpleOperator;
> >> > +import org.apache.samza.task.MessageCollector;
> >> > +import org.apache.samza.task.TaskContext;
> >> > +import org.apache.samza.task.TaskCoordinator;
> >> > +import org.apache.samza.task.sql.RouterMessageCollector;
> >> > +
> >> > +
> >> > +/**
> >> > + * Example implementation of {@link
> >> > org.apache.samza.sql.api.operators.OperatorRouter}
> >> > + *
> >> > + */
> >> > +public final class SimpleRouter implements OperatorRouter {
> >> > +  /**
> >> > +   * List of operators added to the {@link
> >> > org.apache.samza.sql.api.operators.OperatorRouter}
> >> > +   */
> >> > +  private List<SimpleOperator> operators = new
> >> > ArrayList<SimpleOperator>();
> >> > +
> >> > +  @SuppressWarnings("rawtypes")
> >> > +  /**
> >> > +   * Map of {@link org.apache.samza.sql.api.data.EntityName} to the
> >>list
> >> > of operators associated with it
> >> > +   */
> >> > +  private Map<EntityName, List> nextOps = new HashMap<EntityName,
> >> List>();
> >> > +
> >> > +  /**
> >> > +   * Set of {@link org.apache.samza.sql.api.data.EntityName} as
> >>inputs
> >> to
> >> > this {@code SimpleRouter}
> >> > +   */
> >> > +  private Set<EntityName> inputEntities = new HashSet<EntityName>();
> >> > +
> >> > +  /**
> >> > +   * Set of entities that are not input entities to this {@code
> >> > SimpleRouter}
> >> > +   */
> >> > +  private Set<EntityName> outputEntities = new HashSet<EntityName>();
> >> > +
> >> > +  @SuppressWarnings("unchecked")
> >> > +  private void addOperator(EntityName input, SimpleOperator nextOp) {
> >> > +    if (nextOps.get(input) == null) {
> >> > +      nextOps.put(input, new ArrayList<Operator>());
> >> > +    }
> >> > +    nextOps.get(input).add(nextOp);
> >> > +    operators.add(nextOp);
> >> > +    // get the operator spec
> >> > +    for (EntityName output : nextOp.getSpec().getOutputNames()) {
> >> > +      if (inputEntities.contains(output)) {
> >> > +        inputEntities.remove(output);
> >> > +      }
> >> > +      outputEntities.add(output);
> >> > +    }
> >> > +    if (!outputEntities.contains(input)) {
> >> > +      inputEntities.add(input);
> >> > +    }
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  @SuppressWarnings("unchecked")
> >> > +  public List<SimpleOperator> getNextOperators(EntityName entity) {
> >> > +    return nextOps.get(entity);
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public void addOperator(SimpleOperator nextOp) {
> >> > +    List<EntityName> inputs = nextOp.getSpec().getInputNames();
> >> > +    for (EntityName input : inputs) {
> >> > +      addOperator(input, nextOp);
> >> > +    }
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public void init(Config config, TaskContext context) throws
> >>Exception
> >> {
> >> > +    for (SimpleOperator op : this.operators) {
> >> > +      op.init(config, context);
> >> > +    }
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public void process(Tuple ituple, MessageCollector collector,
> >> > TaskCoordinator coordinator) throws Exception {
> >> > +    MessageCollector opCollector = new
> >>RouterMessageCollector(collector,
> >> > coordinator, this);
> >> > +    for (Iterator<SimpleOperator> iter =
> >> > this.getNextOperators(ituple.getEntityName()).iterator();
> >> iter.hasNext();) {
> >> > +      iter.next().process(ituple, opCollector, coordinator);
> >> > +    }
> >> > +  }
> >> > +
> >> > +  @SuppressWarnings("rawtypes")
> >> > +  @Override
> >> > +  public void process(Relation deltaRelation, MessageCollector
> >> collector,
> >> > TaskCoordinator coordinator) throws Exception {
> >> > +    MessageCollector opCollector = new
> >>RouterMessageCollector(collector,
> >> > coordinator, this);
> >> > +    for (Iterator<SimpleOperator> iter =
> >> > this.getNextOperators(deltaRelation.getName()).iterator();
> >> iter.hasNext();)
> >> > {
> >> > +      iter.next().process(deltaRelation, opCollector, coordinator);
> >> > +    }
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public void refresh(long nanoSec, MessageCollector collector,
> >> > TaskCoordinator coordinator) throws Exception {
> >> > +    MessageCollector opCollector = new
> >>RouterMessageCollector(collector,
> >> > coordinator, this);
> >> > +    for (EntityName entity : inputEntities) {
> >> > +      for (Iterator<SimpleOperator> iter =
> >> > this.getNextOperators(entity).iterator(); iter.hasNext();) {
> >> > +        iter.next().refresh(nanoSec, opCollector, coordinator);
> >> > +      }
> >> > +    }
> >> > +  }
> >> > +
> >> > +  @Override
> >> > +  public Iterator<SimpleOperator> iterator() {
> >> > +    return this.operators.iterator();
> >> > +  }
> >> > +
> >> > +}
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallbac
> >>k.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Noo
> >>pOperatorCallback.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Noo
> >>pOperatorCallback.java
> >> > deleted file mode 100644
> >> > index c3d2266..0000000
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Noo
> >>pOperatorCallback.java
> >> > +++ /dev/null
> >> > @@ -1,50 +0,0 @@
> >> > -/*
> >> > - * Licensed to the Apache Software Foundation (ASF) under one
> >> > - * or more contributor license agreements.  See the NOTICE file
> >> > - * distributed with this work for additional information
> >> > - * regarding copyright ownership.  The ASF licenses this file
> >> > - * to you under the Apache License, Version 2.0 (the
> >> > - * "License"); you may not use this file except in compliance
> >> > - * with the License.  You may obtain a copy of the License at
> >> > - *
> >> > - *   http://www.apache.org/licenses/LICENSE-2.0
> >> > - *
> >> > - * Unless required by applicable law or agreed to in writing,
> >> > - * software distributed under the License is distributed on an
> >> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >> > - * KIND, either express or implied.  See the License for the
> >> > - * specific language governing permissions and limitations
> >> > - * under the License.
> >> > - */
> >> > -package org.apache.samza.sql.operators.factory;
> >> > -
> >> > -import org.apache.samza.sql.api.data.Relation;
> >> > -import org.apache.samza.sql.api.data.Tuple;
> >> > -import org.apache.samza.sql.api.operators.OperatorCallback;
> >> > -import org.apache.samza.task.MessageCollector;
> >> > -import org.apache.samza.task.TaskCoordinator;
> >> > -
> >> > -
> >> > -public final class NoopOperatorCallback implements OperatorCallback {
> >> > -
> >> > -  @Override
> >> > -  public Tuple beforeProcess(Tuple tuple, MessageCollector collector,
> >> > TaskCoordinator coordinator) {
> >> > -    return tuple;
> >> > -  }
> >> > -
> >> > -  @Override
> >> > -  public Relation beforeProcess(Relation rel, MessageCollector
> >> collector,
> >> > TaskCoordinator coordinator) {
> >> > -    return rel;
> >> > -  }
> >> > -
> >> > -  @Override
> >> > -  public Tuple afterProcess(Tuple tuple, MessageCollector collector,
> >> > TaskCoordinator coordinator) {
> >> > -    return tuple;
> >> > -  }
> >> > -
> >> > -  @Override
> >> > -  public Relation afterProcess(Relation rel, MessageCollector
> >>collector,
> >> > TaskCoordinator coordinator) {
> >> > -    return rel;
> >> > -  }
> >> > -
> >> > -}
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.
> >>java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
> >>pleOperatorImpl.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
> >>pleOperatorImpl.java
> >> > deleted file mode 100644
> >> > index e66451f..0000000
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
> >>pleOperatorImpl.java
> >> > +++ /dev/null
> >> > @@ -1,136 +0,0 @@
> >> > -/*
> >> > - * Licensed to the Apache Software Foundation (ASF) under one
> >> > - * or more contributor license agreements.  See the NOTICE file
> >> > - * distributed with this work for additional information
> >> > - * regarding copyright ownership.  The ASF licenses this file
> >> > - * to you under the Apache License, Version 2.0 (the
> >> > - * "License"); you may not use this file except in compliance
> >> > - * with the License.  You may obtain a copy of the License at
> >> > - *
> >> > - *   http://www.apache.org/licenses/LICENSE-2.0
> >> > - *
> >> > - * Unless required by applicable law or agreed to in writing,
> >> > - * software distributed under the License is distributed on an
> >> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >> > - * KIND, either express or implied.  See the License for the
> >> > - * specific language governing permissions and limitations
> >> > - * under the License.
> >> > - */
> >> > -
> >> > -package org.apache.samza.sql.operators.factory;
> >> > -
> >> > -import org.apache.samza.sql.api.data.Relation;
> >> > -import org.apache.samza.sql.api.data.Tuple;
> >> > -import org.apache.samza.sql.api.operators.OperatorCallback;
> >> > -import org.apache.samza.sql.api.operators.OperatorSpec;
> >> > -import org.apache.samza.sql.api.operators.SimpleOperator;
> >> > -import org.apache.samza.task.MessageCollector;
> >> > -import org.apache.samza.task.TaskCoordinator;
> >> > -import org.apache.samza.task.sql.SimpleMessageCollector;
> >> > -
> >> > -
> >> > -/**
> >> > - * An abstract class that encapsulate the basic information and
> >>methods
> >> > that all operator classes should implement.
> >> > - * It implements the interface {@link
> >> > org.apache.samza.sql.api.operators.SimpleOperator}
> >> > - *
> >> > - */
> >> > -public abstract class SimpleOperatorImpl implements SimpleOperator {
> >> > -  /**
> >> > -   * The specification of this operator
> >> > -   */
> >> > -  private final OperatorSpec spec;
> >> > -
> >> > -  /**
> >> > -   * The callback function
> >> > -   */
> >> > -  private final OperatorCallback callback;
> >> > -
> >> > -  /**
> >> > -   * Ctor of {@code SimpleOperatorImpl} class
> >> > -   *
> >> > -   * @param spec The specification of this operator
> >> > -   */
> >> > -  public SimpleOperatorImpl(OperatorSpec spec) {
> >> > -    this(spec, new NoopOperatorCallback());
> >> > -  }
> >> > -
> >> > -  public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback
> >> callback)
> >> > {
> >> > -    this.spec = spec;
> >> > -    this.callback = callback;
> >> > -  }
> >> > -
> >> > -  @Override
> >> > -  public OperatorSpec getSpec() {
> >> > -    return this.spec;
> >> > -  }
> >> > -
> >> > -  /**
> >> > -   * This method is made final s.t. the sequence of invocations
> >>between
> >> > {@link
> >> >
> >>
> >>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relatio
> >>n,
> >> > MessageCollector, TaskCoordinator)}
> >> > -   * and real processing of the input relation is fixed.
> >> > -   */
> >> > -  @Override
> >> > -  final public void process(Relation deltaRelation, MessageCollector
> >> > collector, TaskCoordinator coordinator)
> >> > -      throws Exception {
> >> > -    Relation rel = this.callback.beforeProcess(deltaRelation,
> >>collector,
> >> > coordinator);
> >> > -    if (rel == null) {
> >> > -      return;
> >> > -    }
> >> > -    this.realProcess(rel, getCollector(collector, coordinator),
> >> > coordinator);
> >> > -  }
> >> > -
> >> > -  /**
> >> > -   * This method is made final s.t. the sequence of invocations
> >>between
> >> > {@link
> >> >
> >>org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple,
> >> > MessageCollector, TaskCoordinator)}
> >> > -   * and real processing of the input tuple is fixed.
> >> > -   */
> >> > -  @Override
> >> > -  final public void process(Tuple tuple, MessageCollector collector,
> >> > TaskCoordinator coordinator) throws Exception {
> >> > -    Tuple ituple = this.callback.beforeProcess(tuple, collector,
> >> > coordinator);
> >> > -    if (ituple == null) {
> >> > -      return;
> >> > -    }
> >> > -    this.realProcess(ituple, getCollector(collector, coordinator),
> >> > coordinator);
> >> > -  }
> >> > -
> >> > -  /**
> >> > -   * This method is made final s.t. we enforce the invocation of
> >>{@code
> >> > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)}
> >> before
> >> > doing anything futher
> >> > -   */
> >> > -  @Override
> >> > -  final public void refresh(long timeNano, MessageCollector
> >>collector,
> >> > TaskCoordinator coordinator) throws Exception {
> >> > -    this.realRefresh(timeNano, getCollector(collector, coordinator),
> >> > coordinator);
> >> > -  }
> >> > -
> >> > -  private SimpleMessageCollector getCollector(MessageCollector
> >> collector,
> >> > TaskCoordinator coordinator) {
> >> > -    if (!(collector instanceof SimpleMessageCollector)) {
> >> > -      return new SimpleMessageCollector(collector, coordinator,
> >> > this.callback);
> >> > -    } else {
> >> > -      ((SimpleMessageCollector)
> >> > collector).switchOperatorCallback(this.callback);
> >> > -      return (SimpleMessageCollector) collector;
> >> > -    }
> >> > -  }
> >> > -
> >> > -  /**
> >> > -   * Method to be overriden by each specific implementation class of
> >> > operator to handle timeout event
> >> > -   *
> >> > -   * @param timeNano The time in nanosecond when the timeout event
> >> > occurred
> >> > -   * @param collector The {@link
> >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
> >> > -   * @param coordinator The {@link
> >> org.apache.samza.task.TaskCoordinator}
> >> > in the context
> >> > -   * @throws Exception Throws exception if failed to refresh the
> >>results
> >> > -   */
> >> > -  protected abstract void realRefresh(long timeNano,
> >> > SimpleMessageCollector collector, TaskCoordinator coordinator)
> >> > -      throws Exception;
> >> > -
> >> > -  /**
> >> > -   * Method to be overriden by each specific implementation class of
> >> > operator to perform relational logic operation on an input {@link
> >> > org.apache.samza.sql.api.data.Relation}
> >> > -   *
> >> > -   * @param rel The input relation
> >> > -   * @param collector The {@link
> >> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
> >> > -   * @param coordinator The {@link
> >> org.apache.samza.task.TaskCoordinator}
> >> > in the context
> >> > -   * @throws Exception
> >> > -   */
> >> > -  protected abstract void realProcess(Relation rel,
> >> > SimpleMessageCollector collector, TaskCoordinator coordinator)
> >> > -      throws Exception;
> >> > -
> >> > -  protected abstract void realProcess(Tuple ituple,
> >> > SimpleMessageCollector collector, TaskCoordinator coordinator)
> >> > -      throws Exception;
> >> > -
> >> > -}
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.
> >>java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
> >>pleOperatorSpec.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
> >>pleOperatorSpec.java
> >> > deleted file mode 100644
> >> > index 56753b6..0000000
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
> >>pleOperatorSpec.java
> >> > +++ /dev/null
> >> > @@ -1,106 +0,0 @@
> >> > -/*
> >> > - * Licensed to the Apache Software Foundation (ASF) under one
> >> > - * or more contributor license agreements.  See the NOTICE file
> >> > - * distributed with this work for additional information
> >> > - * regarding copyright ownership.  The ASF licenses this file
> >> > - * to you under the Apache License, Version 2.0 (the
> >> > - * "License"); you may not use this file except in compliance
> >> > - * with the License.  You may obtain a copy of the License at
> >> > - *
> >> > - *   http://www.apache.org/licenses/LICENSE-2.0
> >> > - *
> >> > - * Unless required by applicable law or agreed to in writing,
> >> > - * software distributed under the License is distributed on an
> >> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >> > - * KIND, either express or implied.  See the License for the
> >> > - * specific language governing permissions and limitations
> >> > - * under the License.
> >> > - */
> >> > -package org.apache.samza.sql.operators.factory;
> >> > -
> >> > -import java.util.ArrayList;
> >> > -import java.util.List;
> >> > -
> >> > -import org.apache.samza.sql.api.data.EntityName;
> >> > -import org.apache.samza.sql.api.operators.OperatorSpec;
> >> > -
> >> > -
> >> > -/**
> >> > - * An abstract class that encapsulate the basic information and
> >>methods
> >> > that all specification of operators should implement.
> >> > - * It implements {@link
> >>org.apache.samza.sql.api.operators.OperatorSpec}
> >> > - */
> >> > -public abstract class SimpleOperatorSpec implements OperatorSpec {
> >> > -  /**
> >> > -   * The identifier of the corresponding operator
> >> > -   */
> >> > -  private final String id;
> >> > -
> >> > -  /**
> >> > -   * The list of input entity names of the corresponding operator
> >> > -   */
> >> > -  private final List<EntityName> inputs = new
> >>ArrayList<EntityName>();
> >> > -
> >> > -  /**
> >> > -   * The list of output entity names of the corresponding operator
> >> > -   */
> >> > -  private final List<EntityName> outputs = new
> >>ArrayList<EntityName>();
> >> > -
> >> > -  /**
> >> > -   * Ctor of the {@code SimpleOperatorSpec} for simple {@link
> >> > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and
> >>one
> >> > output
> >> > -   *
> >> > -   * @param id Unique identifier of the {@link
> >> > org.apache.samza.sql.api.operators.SimpleOperator} object
> >> > -   * @param input The only input entity
> >> > -   * @param output The only output entity
> >> > -   */
> >> > -  public SimpleOperatorSpec(String id, EntityName input, EntityName
> >> > output) {
> >> > -    this.id = id;
> >> > -    this.inputs.add(input);
> >> > -    this.outputs.add(output);
> >> > -  }
> >> > -
> >> > -  /**
> >> > -   * Ctor of {@code SimpleOperatorSpec} with general format: m inputs
> >> and
> >> > n outputs
> >> > -   *
> >> > -   * @param id Unique identifier of the {@link
> >> > org.apache.samza.sql.api.operators.SimpleOperator} object
> >> > -   * @param inputs The list of input entities
> >> > -   * @param output The list of output entities
> >> > -   */
> >> > -  public SimpleOperatorSpec(String id, List<EntityName> inputs,
> >> > EntityName output) {
> >> > -    this.id = id;
> >> > -    this.inputs.addAll(inputs);
> >> > -    this.outputs.add(output);
> >> > -  }
> >> > -
> >> > -  @Override
> >> > -  public String getId() {
> >> > -    return this.id;
> >> > -  }
> >> > -
> >> > -  @Override
> >> > -  public List<EntityName> getInputNames() {
> >> > -    return this.inputs;
> >> > -  }
> >> > -
> >> > -  @Override
> >> > -  public List<EntityName> getOutputNames() {
> >> > -    return this.outputs;
> >> > -  }
> >> > -
> >> > -  /**
> >> > -   * Method to get the first output entity
> >> > -   *
> >> > -   * @return The first output entity name
> >> > -   */
> >> > -  public EntityName getOutputName() {
> >> > -    return this.outputs.get(0);
> >> > -  }
> >> > -
> >> > -  /**
> >> > -   * Method to get the first input entity
> >> > -   *
> >> > -   * @return The first input entity name
> >> > -   */
> >> > -  public EntityName getInputName() {
> >> > -    return this.inputs.get(0);
> >> > -  }
> >> > -}
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
> >>pleRouter.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
> >>pleRouter.java
> >> > deleted file mode 100644
> >> > index e570897..0000000
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Sim
> >>pleRouter.java
> >> > +++ /dev/null
> >> > @@ -1,136 +0,0 @@
> >> > -/*
> >> > - * Licensed to the Apache Software Foundation (ASF) under one
> >> > - * or more contributor license agreements.  See the NOTICE file
> >> > - * distributed with this work for additional information
> >> > - * regarding copyright ownership.  The ASF licenses this file
> >> > - * to you under the Apache License, Version 2.0 (the
> >> > - * "License"); you may not use this file except in compliance
> >> > - * with the License.  You may obtain a copy of the License at
> >> > - *
> >> > - *   http://www.apache.org/licenses/LICENSE-2.0
> >> > - *
> >> > - * Unless required by applicable law or agreed to in writing,
> >> > - * software distributed under the License is distributed on an
> >> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >> > - * KIND, either express or implied.  See the License for the
> >> > - * specific language governing permissions and limitations
> >> > - * under the License.
> >> > - */
> >> > -
> >> > -package org.apache.samza.sql.operators.factory;
> >> > -
> >> > -import java.util.ArrayList;
> >> > -import java.util.HashMap;
> >> > -import java.util.HashSet;
> >> > -import java.util.Iterator;
> >> > -import java.util.List;
> >> > -import java.util.Map;
> >> > -import java.util.Set;
> >> > -
> >> > -import org.apache.samza.config.Config;
> >> > -import org.apache.samza.sql.api.data.EntityName;
> >> > -import org.apache.samza.sql.api.data.Relation;
> >> > -import org.apache.samza.sql.api.data.Tuple;
> >> > -import org.apache.samza.sql.api.operators.Operator;
> >> > -import org.apache.samza.sql.api.operators.OperatorRouter;
> >> > -import org.apache.samza.sql.api.operators.SimpleOperator;
> >> > -import org.apache.samza.task.MessageCollector;
> >> > -import org.apache.samza.task.TaskContext;
> >> > -import org.apache.samza.task.TaskCoordinator;
> >> > -import org.apache.samza.task.sql.RouterMessageCollector;
> >> > -
> >> > -
> >> > -/**
> >> > - * Example implementation of {@link
> >> > org.apache.samza.sql.api.operators.OperatorRouter}
> >> > - *
> >> > - */
> >> > -public final class SimpleRouter implements OperatorRouter {
> >> > -  /**
> >> > -   * List of operators added to the {@link
> >> > org.apache.samza.sql.api.operators.OperatorRouter}
> >> > -   */
> >> > -  private List<SimpleOperator> operators = new
> >> > ArrayList<SimpleOperator>();
> >> > -
> >> > -  @SuppressWarnings("rawtypes")
> >> > -  /**
> >> > -   * Map of {@link org.apache.samza.sql.api.data.EntityName} to the
> >>list
> >> > of operators associated with it
> >> > -   */
> >> > -  private Map<EntityName, List> nextOps = new HashMap<EntityName,
> >> List>();
> >> > -
> >> > -  /**
> >> > -   * Set of {@link org.apache.samza.sql.api.data.EntityName} as
> >>inputs
> >> to
> >> > this {@code SimpleRouter}
> >> > -   */
> >> > -  private Set<EntityName> inputEntities = new HashSet<EntityName>();
> >> > -
> >> > -  /**
> >> > -   * Set of entities that are not input entities to this {@code
> >> > SimpleRouter}
> >> > -   */
> >> > -  private Set<EntityName> outputEntities = new HashSet<EntityName>();
> >> > -
> >> > -  @SuppressWarnings("unchecked")
> >> > -  private void addOperator(EntityName input, SimpleOperator nextOp) {
> >> > -    if (nextOps.get(input) == null) {
> >> > -      nextOps.put(input, new ArrayList<Operator>());
> >> > -    }
> >> > -    nextOps.get(input).add(nextOp);
> >> > -    operators.add(nextOp);
> >> > -    // get the operator spec
> >> > -    for (EntityName output : nextOp.getSpec().getOutputNames()) {
> >> > -      if (inputEntities.contains(output)) {
> >> > -        inputEntities.remove(output);
> >> > -      }
> >> > -      outputEntities.add(output);
> >> > -    }
> >> > -    if (!outputEntities.contains(input)) {
> >> > -      inputEntities.add(input);
> >> > -    }
> >> > -  }
> >> > -
> >> > -  @Override
> >> > -  @SuppressWarnings("unchecked")
> >> > -  public List<SimpleOperator> getNextOperators(EntityName entity) {
> >> > -    return nextOps.get(entity);
> >> > -  }
> >> > -
> >> > -  @Override
> >> > -  public void addOperator(SimpleOperator nextOp) {
> >> > -    List<EntityName> inputs = nextOp.getSpec().getInputNames();
> >> > -    for (EntityName input : inputs) {
> >> > -      addOperator(input, nextOp);
> >> > -    }
> >> > -  }
> >> > -
> >> > -  @Override
> >> > -  public void init(Config config, TaskContext context) throws
> >>Exception
> >> {
> >> > -    for (SimpleOperator op : this.operators) {
> >> > -      op.init(config, context);
> >> > -    }
> >> > -  }
> >> > -
> >> > -  @Override
> >> > -  public void process(Tuple ituple, MessageCollector collector,
> >> > TaskCoordinator coordinator) throws Exception {
> >> > -    MessageCollector opCollector = new
> >>RouterMessageCollector(collector,
> >> > coordinator, this);
> >> > -    for (Iterator<SimpleOperator> iter =
> >> > this.getNextOperators(ituple.getEntityName()).iterator();
> >> iter.hasNext();) {
> >> > -      iter.next().process(ituple, opCollector, coordinator);
> >> > -    }
> >> > -  }
> >> > -
> >> > -  @SuppressWarnings("rawtypes")
> >> > -  @Override
> >> > -  public void process(Relation deltaRelation, MessageCollector
> >> collector,
> >> > TaskCoordinator coordinator) throws Exception {
> >> > -    MessageCollector opCollector = new
> >>RouterMessageCollector(collector,
> >> > coordinator, this);
> >> > -    for (Iterator<SimpleOperator> iter =
> >> > this.getNextOperators(deltaRelation.getName()).iterator();
> >> iter.hasNext();)
> >> > {
> >> > -      iter.next().process(deltaRelation, opCollector, coordinator);
> >> > -    }
> >> > -  }
> >> > -
> >> > -  @Override
> >> > -  public void refresh(long nanoSec, MessageCollector collector,
> >> > TaskCoordinator coordinator) throws Exception {
> >> > -    MessageCollector opCollector = new
> >>RouterMessageCollector(collector,
> >> > coordinator, this);
> >> > -    for (EntityName entity : inputEntities) {
> >> > -      for (Iterator<SimpleOperator> iter =
> >> > this.getNextOperators(entity).iterator(); iter.hasNext();) {
> >> > -        iter.next().refresh(nanoSec, opCollector, coordinator);
> >> > -      }
> >> > -    }
> >> > -  }
> >> > -
> >> > -}
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.jav
> >>a
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Top
> >>ologyBuilder.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Top
> >>ologyBuilder.java
> >> > new file mode 100644
> >> > index 0000000..62b19fc
> >> > --- /dev/null
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/Top
> >>ologyBuilder.java
> >> > @@ -0,0 +1,284 @@
> >> > +/*
> >> > + * Licensed to the Apache Software Foundation (ASF) under one
> >> > + * or more contributor license agreements.  See the NOTICE file
> >> > + * distributed with this work for additional information
> >> > + * regarding copyright ownership.  The ASF licenses this file
> >> > + * to you under the Apache License, Version 2.0 (the
> >> > + * "License"); you may not use this file except in compliance
> >> > + * with the License.  You may obtain a copy of the License at
> >> > + *
> >> > + *   http://www.apache.org/licenses/LICENSE-2.0
> >> > + *
> >> > + * Unless required by applicable law or agreed to in writing,
> >> > + * software distributed under the License is distributed on an
> >> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >> > + * KIND, either express or implied.  See the License for the
> >> > + * specific language governing permissions and limitations
> >> > + * under the License.
> >> > + */
> >> > +package org.apache.samza.sql.operators.factory;
> >> > +
> >> > +import java.util.HashMap;
> >> > +import java.util.HashSet;
> >> > +import java.util.Iterator;
> >> > +import java.util.List;
> >> > +import java.util.Map;
> >> > +import java.util.Set;
> >> > +
> >> > +import org.apache.samza.sql.api.data.EntityName;
> >> > +import org.apache.samza.sql.api.operators.OperatorRouter;
> >> > +import org.apache.samza.sql.api.operators.OperatorSink;
> >> > +import org.apache.samza.sql.api.operators.OperatorSource;
> >> > +import org.apache.samza.sql.api.operators.OperatorSpec;
> >> > +import org.apache.samza.sql.api.operators.SimpleOperator;
> >> > +import org.apache.samza.sql.api.operators.SqlOperatorFactory;
> >> > +import org.apache.samza.sql.operators.OperatorTopology;
> >> > +import org.apache.samza.sql.operators.SimpleRouter;
> >> > +
> >> > +
> >> > +/**
> >> > + * This class implements a builder to allow user to create the
> >>operators
> >> > and connect them in a topology altogether.
> >> > + */
> >> > +public class TopologyBuilder {
> >> > +
> >> > +  /**
> >> > +   * Internal {@link
> >>org.apache.samza.sql.api.operators.OperatorRouter}
> >> > object to retain the topology being created
> >> > +   */
> >> > +  private SimpleRouter router;
> >> > +
> >> > +  /**
> >> > +   * The {@link
> >>org.apache.samza.sql.api.operators.SqlOperatorFactory}
> >> > object used to create operators connected in the topology
> >> > +   */
> >> > +  private final SqlOperatorFactory factory;
> >> > +
> >> > +  /**
> >> > +   * The map of unbound inputs, the value is set(input_operators)
> >> > +   */
> >> > +  private Map<EntityName, Set<OperatorSpec>> unboundInputs = new
> >> > HashMap<EntityName, Set<OperatorSpec>>();
> >> > +
> >> > +  /**
> >> > +   * The map of unbound outputs, the value is the operator generating
> >> the
> >> > output
> >> > +   */
> >> > +  private Map<EntityName, OperatorSpec> unboundOutputs = new
> >> > HashMap<EntityName, OperatorSpec>();
> >> > +
> >> > +  /**
> >> > +   * The set of entities that are intermediate entities between
> >> operators
> >> > +   */
> >> > +  private Set<EntityName> interStreams = new HashSet<EntityName>();
> >> > +
> >> > +  /**
> >> > +   * The current operator that may have unbound input or output
> >> > +   */
> >> > +  private SimpleOperator currentOp = null;
> >> > +
> >> > +  /**
> >> > +   * Private constructor of {@code TopologyBuilder}
> >> > +   *
> >> > +   * @param factory The {@link
> >> > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create
> >> operators
> >> > +   */
> >> > +  private TopologyBuilder(SqlOperatorFactory factory) {
> >> > +    this.router = new SimpleRouter();
> >> > +    this.factory = factory;
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Static method to create this {@code TopologyBuilder} w/ a
> >> customized
> >> > {@link org.apache.samza.sql.api.operators.SqlOperatorFactory}
> >> > +   *
> >> > +   * @param factory The {@link
> >> > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create
> >> operators
> >> > +   * @return The {@code TopologyBuilder} object
> >> > +   */
> >> > +  public static TopologyBuilder create(SqlOperatorFactory factory) {
> >> > +    return new TopologyBuilder(factory);
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Static method to create this {@code TopologyBuilder}
> >> > +   *
> >> > +   * @return The {@code TopologyBuilder} object
> >> > +   */
> >> > +  public static TopologyBuilder create() {
> >> > +    return new TopologyBuilder(new SimpleOperatorFactoryImpl());
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Public method to create the next operator and attach it to the
> >> > output of the current operator
> >> > +   *
> >> > +   * @param spec The {@link
> >> > org.apache.samza.sql.api.operators.OperatorSpec} for the next operator
> >> > +   * @return The updated {@code TopologyBuilder} object
> >> > +   */
> >> > +  public TopologyBuilder operator(OperatorSpec spec) {
> >> > +    // check whether it is valid to connect a new operator to the
> >> current
> >> > operator's output
> >> > +    SimpleOperator nextOp = this.factory.getOperator(spec);
> >> > +    return this.operator(nextOp);
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Public method to create the next operator and attach it to the
> >> > output of the current operator
> >> > +   *
> >> > +   * @param op The {@link
> >> > org.apache.samza.sql.api.operators.SimpleOperator}
> >> > +   * @return The updated {@code TopologyBuilder} object
> >> > +   */
> >> > +  public TopologyBuilder operator(SimpleOperator op) {
> >> > +    // check whether it is valid to connect a new operator to the
> >> current
> >> > operator's output
> >> > +    canAddOperator(op);
> >> > +    this.addOperator(op);
> >> > +    // advance the current operator position
> >> > +    this.currentOp = op;
> >> > +    return this;
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Public method to create a stream object that will be the source
> >>to
> >> > other operators
> >> > +   *
> >> > +   * @return The {@link
> >> > org.apache.samza.sql.api.operators.OperatorSource} that can be the
> >>source
> >> > to other operators
> >> > +   */
> >> > +  public OperatorSource stream() {
> >> > +    canCreateSource();
> >> > +    return new
> >> > OperatorTopology(this.unboundOutputs.keySet().iterator().next(),
> >> > this.router);
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Public method to create a sink object that can take input stream
> >> > from other operators
> >> > +   *
> >> > +   * @return The {@link
> >>org.apache.samza.sql.api.operators.OperatorSink}
> >> > that can be the downstream of other operators
> >> > +   */
> >> > +  public OperatorSink sink() {
> >> > +    canCreateSink();
> >> > +    return new
> >> > OperatorTopology(this.unboundInputs.keySet().iterator().next(),
> >> > this.router);
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Public method to bind the input of the current operator w/ the
> >> > {@link org.apache.samza.sql.api.operators.OperatorSource} object
> >> > +   *
> >> > +   * @param srcStream The {@link
> >> > org.apache.samza.sql.api.operators.OperatorSource} that the current
> >> > operator is going to be bound to
> >> > +   * @return The updated {@code TopologyBuilder} object
> >> > +   */
> >> > +  public TopologyBuilder bind(OperatorSource srcStream) {
> >> > +    EntityName streamName = srcStream.getName();
> >> > +    if (this.unboundInputs.containsKey(streamName)) {
> >> > +      this.unboundInputs.remove(streamName);
> >> > +      this.interStreams.add(streamName);
> >> > +    } else {
> >> > +      // no input operator is waiting for the output from the
> >>srcStream
> >> > +      throw new IllegalArgumentException("No operator input can be
> >>bound
> >> > to the input stream " + streamName);
> >> > +    }
> >> > +    // add all operators in srcStream to this topology
> >> > +    for (Iterator<SimpleOperator> iter = srcStream.opIterator();
> >> > iter.hasNext();) {
> >> > +      this.addOperator(iter.next());
> >> > +    }
> >> > +    return this;
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Public method to attach a {@link
> >> > org.apache.samza.sql.api.operators.OperatorSink} object to the output
> >>of
> >> > the current operator
> >> > +   *
> >> > +   * @param nextSink The {@link
> >> > org.apache.samza.sql.api.operators.OperatorSink} to be attached to the
> >> > current operator's output
> >> > +   * @return The updated {@code TopologyBuilder} object
> >> > +   */
> >> > +  public TopologyBuilder attach(OperatorSink nextSink) {
> >> > +    EntityName streamName = nextSink.getName();
> >> > +    if (this.unboundOutputs.containsKey(streamName)) {
> >> > +      this.unboundOutputs.remove(streamName);
> >> > +      this.interStreams.add(streamName);
> >> > +    } else {
> >> > +      // no unbound output to attach to
> >> > +      throw new IllegalArgumentException("No operator output found to
> >> > attach the sink " + streamName);
> >> > +    }
> >> > +    // add all operators in nextSink to the router
> >> > +    for (Iterator<SimpleOperator> iter = nextSink.opIterator();
> >> > iter.hasNext();) {
> >> > +      this.addOperator(iter.next());
> >> > +    }
> >> > +    return this;
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Public method to finalize the topology that should have all
> >>input
> >> > and output bound to system input and output
> >> > +   *
> >> > +   * @return The finalized {@link
> >> > org.apache.samza.sql.api.operators.OperatorRouter} object
> >> > +   */
> >> > +  public OperatorRouter build() {
> >> > +    canClose();
> >> > +    return router;
> >> > +  }
> >> > +
> >> > +  private TopologyBuilder addOperator(SimpleOperator nextOp) {
> >> > +    // if input is not in the unboundOutputs and interStreams, input
> >>is
> >> > unbound
> >> > +    for (EntityName in : nextOp.getSpec().getInputNames()) {
> >> > +      if (this.unboundOutputs.containsKey(in)) {
> >> > +        this.unboundOutputs.remove(in);
> >> > +        this.interStreams.add(in);
> >> > +      }
> >> > +      if (!this.interStreams.contains(in) && !in.isSystemEntity()) {
> >> > +        if (!this.unboundInputs.containsKey(in)) {
> >> > +          this.unboundInputs.put(in, new HashSet<OperatorSpec>());
> >> > +        }
> >> > +        this.unboundInputs.get(in).add(nextOp.getSpec());
> >> > +      }
> >> > +    }
> >> > +    // if output is not in the unboundInputs and interStreams,
> >>output is
> >> > unbound
> >> > +    for (EntityName out : nextOp.getSpec().getOutputNames()) {
> >> > +      if (this.unboundInputs.containsKey(out)) {
> >> > +        this.unboundInputs.remove(out);
> >> > +        this.interStreams.add(out);
> >> > +      }
> >> > +      if (!this.interStreams.contains(out) && !out.isSystemEntity())
> >>{
> >> > +        this.unboundOutputs.put(out, nextOp.getSpec());
> >> > +      }
> >> > +    }
> >> > +    try {
> >> > +      this.router.addOperator(nextOp);
> >> > +    } catch (Exception e) {
> >> > +      throw new RuntimeException("Failed to add operator " +
> >> > nextOp.getSpec().getId() + " to the topology.", e);
> >> > +    }
> >> > +    return this;
> >> > +  }
> >> > +
> >> > +  private void canCreateSource() {
> >> > +    if (this.unboundInputs.size() > 0) {
> >> > +      throw new IllegalStateException("Can't create stream when there
> >> are
> >> > unbounded input streams in the topology");
> >> > +    }
> >> > +    if (this.unboundOutputs.size() != 1) {
> >> > +      throw new IllegalStateException(
> >> > +          "Can't create stream when the number of unbounded outputs
> >>is
> >> > not 1 in the topology");
> >> > +    }
> >> > +  }
> >> > +
> >> > +  private void canCreateSink() {
> >> > +    if (this.unboundOutputs.size() > 0) {
> >> > +      throw new IllegalStateException("Can't create sink when there
> >>are
> >> > unbounded output streams in the topology");
> >> > +    }
> >> > +    if (this.unboundInputs.size() != 1) {
> >> > +      throw new IllegalStateException(
> >> > +          "Can't create sink when the number of unbounded input
> >>streams
> >> > is not 1 in the topology");
> >> > +    }
> >> > +  }
> >> > +
> >> > +  private void canAddOperator(SimpleOperator op) {
> >> > +    if (this.currentOp == null) {
> >> > +      return;
> >> > +    }
> >> > +    for (EntityName name : this.currentOp.getSpec().getInputNames())
> >>{
> >> > +      if (this.unboundInputs.containsKey(name)) {
> >> > +        throw new IllegalArgumentException("There are unbound input
> >>" +
> >> > name + " to the current operator "
> >> > +            + this.currentOp.getSpec().getId() + ". Create a sink or
> >> call
> >> > bind instead");
> >> > +      }
> >> > +    }
> >> > +    List<EntityName> nextInputs = op.getSpec().getInputNames();
> >> > +    for (EntityName name :
> >>this.currentOp.getSpec().getOutputNames()) {
> >> > +      if (!nextInputs.contains(name) &&
> >> > this.unboundOutputs.containsKey(name)) {
> >> > +        // the current operator's output is not in the next
> >>operator's
> >> > input list
> >> > +        throw new IllegalArgumentException("There are unbound output
> >>" +
> >> > name + " from the current operator "
> >> > +            + this.currentOp.getSpec().getId()
> >> > +            + " that are not included in the next operator's inputs.
> >> > Create a stream or call attach instead");
> >> > +      }
> >> > +    }
> >> > +  }
> >> > +
> >> > +  private void canClose() {
> >> > +    if (!this.unboundInputs.isEmpty() ||
> >> !this.unboundOutputs.isEmpty()) {
> >> > +      throw new IllegalStateException(
> >> > +          "There are input/output streams in the topology that are
> >>not
> >> > bounded. Can't build the topology yet.");
> >> > +    }
> >> > +  }
> >> > +
> >> > +}
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
> >>StreamJoin.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
> >>StreamJoin.java
> >> > index 2854aeb..7f5b990 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
> >>StreamJoin.java
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
> >>StreamJoin.java
> >> > @@ -29,7 +29,7 @@ import org.apache.samza.sql.api.data.Relation;
> >> >  import org.apache.samza.sql.api.data.Stream;
> >> >  import org.apache.samza.sql.api.data.Tuple;
> >> >  import org.apache.samza.sql.api.operators.OperatorCallback;
> >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
> >> > +import org.apache.samza.sql.operators.SimpleOperatorImpl;
> >> >  import org.apache.samza.sql.operators.window.BoundedTimeWindow;
> >> >  import org.apache.samza.sql.window.storage.OrderedStoreKey;
> >> >  import org.apache.samza.storage.kv.Entry;
> >> > @@ -38,7 +38,6 @@ import org.apache.samza.task.TaskContext;
> >> >  import org.apache.samza.task.TaskCoordinator;
> >> >  import org.apache.samza.task.sql.SimpleMessageCollector;
> >> >
> >> > -
> >> >  /**
> >> >   * This class implements a simple stream-to-stream join
> >> >   */
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.j
> >>ava
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
> >>StreamJoinSpec.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
> >>StreamJoinSpec.java
> >> > index cc0aca0..eecff7e 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
> >>StreamJoinSpec.java
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/Stream
> >>StreamJoinSpec.java
> >> > @@ -19,10 +19,11 @@
> >> >
> >> >  package org.apache.samza.sql.operators.join;
> >> >
> >> > +import java.util.ArrayList;
> >> >  import java.util.List;
> >> >
> >> >  import org.apache.samza.sql.api.data.EntityName;
> >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
> >> > +import org.apache.samza.sql.operators.SimpleOperatorSpec;
> >> >
> >> >
> >> >  /**
> >> > @@ -35,4 +36,16 @@ public class StreamStreamJoinSpec extends
> >> > SimpleOperatorSpec {
> >> >      // TODO Auto-generated constructor stub
> >> >    }
> >> >
> >> > +  @SuppressWarnings("serial")
> >> > +  public StreamStreamJoinSpec(String id, List<String> inputRelations,
> >> > String output, List<String> joinKeys) {
> >> > +    super(id, new ArrayList<EntityName>() {
> >> > +      {
> >> > +        for (String input : inputRelations) {
> >> > +          add(EntityName.getStreamName(input));
> >> > +        }
> >> > +      }
> >> > +    }, EntityName.getStreamName(output));
> >> > +    // TODO Auto-generated constructor stub
> >> > +  }
> >> > +
> >> >  }
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
> >>artitionOp.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
> >>artitionOp.java
> >> > index b93d789..0cba39a 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
> >>artitionOp.java
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
> >>artitionOp.java
> >> > @@ -23,7 +23,7 @@ import org.apache.samza.config.Config;
> >> >  import org.apache.samza.sql.api.data.Relation;
> >> >  import org.apache.samza.sql.api.data.Tuple;
> >> >  import org.apache.samza.sql.api.operators.OperatorCallback;
> >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
> >> > +import org.apache.samza.sql.operators.SimpleOperatorImpl;
> >> >  import org.apache.samza.storage.kv.Entry;
> >> >  import org.apache.samza.storage.kv.KeyValueIterator;
> >> >  import org.apache.samza.system.OutgoingMessageEnvelope;
> >> > @@ -32,7 +32,6 @@ import org.apache.samza.task.TaskContext;
> >> >  import org.apache.samza.task.TaskCoordinator;
> >> >  import org.apache.samza.task.sql.SimpleMessageCollector;
> >> >
> >> > -
> >> >  /**
> >> >   * This is an example build-in operator that performs a simple stream
> >> > re-partition operation.
> >> >   *
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.jav
> >>a
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
> >>artitionSpec.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
> >>artitionSpec.java
> >> > index c47eed9..e494bff 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
> >>artitionSpec.java
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/P
> >>artitionSpec.java
> >> > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.partition;
> >> >
> >> >  import org.apache.samza.sql.api.data.EntityName;
> >> >  import org.apache.samza.sql.api.operators.OperatorSpec;
> >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
> >> > +import org.apache.samza.sql.operators.SimpleOperatorSpec;
> >> >  import org.apache.samza.system.SystemStream;
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.ja
> >>va
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun
> >>dedTimeWindow.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun
> >>dedTimeWindow.java
> >> > index d81cc93..a9a83b5 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun
> >>dedTimeWindow.java
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Boun
> >>dedTimeWindow.java
> >> > @@ -27,13 +27,12 @@ import org.apache.samza.sql.api.data.EntityName;
> >> >  import org.apache.samza.sql.api.data.Relation;
> >> >  import org.apache.samza.sql.api.data.Tuple;
> >> >  import org.apache.samza.sql.api.operators.OperatorCallback;
> >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
> >> > +import org.apache.samza.sql.operators.SimpleOperatorImpl;
> >> >  import org.apache.samza.storage.kv.KeyValueIterator;
> >> >  import org.apache.samza.task.TaskContext;
> >> >  import org.apache.samza.task.TaskCoordinator;
> >> >  import org.apache.samza.task.sql.SimpleMessageCollector;
> >> >
> >> > -
> >> >  /**
> >> >   * This class defines an example build-in operator for a fixed size
> >> > window operator that converts a stream to a relation
> >> >   *
> >> > @@ -86,6 +85,7 @@ public class BoundedTimeWindow extends
> >> > SimpleOperatorImpl {
> >> >     * @param lengthSec The window size in seconds
> >> >     * @param input The input stream name
> >> >     * @param output The output relation name
> >> > +   * @param callback The user callback object
> >> >     */
> >> >    public BoundedTimeWindow(String wndId, int lengthSec, String input,
> >> > String output, OperatorCallback callback) {
> >> >      super(new WindowSpec(wndId, EntityName.getStreamName(input),
> >> > EntityName.getStreamName(output), lengthSec), callback);
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind
> >>owSpec.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind
> >>owSpec.java
> >> > index eec32ea..6c4eba8 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind
> >>owSpec.java
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/Wind
> >>owSpec.java
> >> > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.window;
> >> >
> >> >  import org.apache.samza.sql.api.data.EntityName;
> >> >  import org.apache.samza.sql.api.operators.OperatorSpec;
> >> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
> >> > +import org.apache.samza.sql.operators.SimpleOperatorSpec;
> >> >
> >> >
> >> >  /**
> >> > @@ -47,6 +47,11 @@ public class WindowSpec extends SimpleOperatorSpec
> >> > implements OperatorSpec {
> >> >      this.wndSizeSec = lengthSec;
> >> >    }
> >> >
> >> > +  public WindowSpec(String id, int wndSize, String input) {
> >> > +    super(id, EntityName.getStreamName(input), null);
> >> > +    this.wndSizeSec = wndSize;
> >> > +  }
> >> > +
> >> >    /**
> >> >     * Method to get the window state relation name
> >> >     *
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol
> >>lector.java
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol
> >>lector.java
> >> > index b29838a..6950f67 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol
> >>lector.java
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCol
> >>lector.java
> >> > @@ -22,7 +22,7 @@ package org.apache.samza.task.sql;
> >> >  import org.apache.samza.sql.api.data.Relation;
> >> >  import org.apache.samza.sql.api.data.Tuple;
> >> >  import org.apache.samza.sql.api.operators.OperatorCallback;
> >> > -import org.apache.samza.sql.operators.factory.NoopOperatorCallback;
> >> > +import org.apache.samza.sql.operators.NoopOperatorCallback;
> >> >  import org.apache.samza.storage.kv.Entry;
> >> >  import org.apache.samza.storage.kv.KeyValueIterator;
> >> >  import org.apache.samza.system.OutgoingMessageEnvelope;
> >> > @@ -57,25 +57,38 @@ public class SimpleMessageCollector implements
> >> > MessageCollector {
> >> >     * @param coordinator The {@link
> >> org.apache.samza.task.TaskCoordinator}
> >> > in the context
> >> >     */
> >> >    public SimpleMessageCollector(MessageCollector collector,
> >> > TaskCoordinator coordinator) {
> >> > -    this.collector = collector;
> >> > -    this.coordinator = coordinator;
> >> > +    this(collector, coordinator, new NoopOperatorCallback());
> >> >    }
> >> >
> >> >    /**
> >> >     * This method swaps the {@code callback} with the new one
> >> >     *
> >> > -   * <p> This method allows the {@link
> >> > org.apache.samza.sql.api.operators.SimpleOperator} to be swapped when
> >>the
> >> > collector
> >> > -   * is passed down into the next operator's context. Hence, under
> >>the
> >> > new operator's context, the correct {@link
> >> >
> >>
> >>org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Relation
> >>,
> >> > MessageCollector, TaskCoordinator)},
> >> > -   * and {@link
> >> >
> >>org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Tuple,
> >> > MessageCollector, TaskCoordinator)} can be invoked
> >> > +   * <p> This method allows the {@link
> >> > org.apache.samza.sql.api.operators.OperatorCallback} to be swapped
> >>when
> >> the
> >> > collector
> >> > +   * is passed down into the next operator's context. Hence, under
> >>the
> >> > new operator's context, the correct callback functions can be invoked
> >> >     *
> >> >     * @param callback The new {@link
> >> > org.apache.samza.sql.api.operators.OperatorCallback} to be set
> >> >     */
> >> > -  public void switchOperatorCallback(OperatorCallback callback) {
> >> > -    this.callback = callback;
> >> > +  public void switchCallback(OperatorCallback callback) {
> >> > +    if (callback == null) {
> >> > +      this.callback = new NoopOperatorCallback();
> >> > +    } else {
> >> > +      this.callback = callback;
> >> > +    }
> >> > +  }
> >> > +
> >> > +  /**
> >> > +   * Method is declared to be final s.t. we enforce that the callback
> >> > functions are called first
> >> > +   */
> >> > +  @Override
> >> > +  final public void send(OutgoingMessageEnvelope envelope) {
> >> > +    this.collector.send(envelope);
> >> >    }
> >> >
> >> >    /**
> >> >     * Method is declared to be final s.t. we enforce that the callback
> >> > functions are called first
> >> > +   *
> >> > +   * @param deltaRelation The relation to be sent out
> >> > +   * @throws Exception Throws exception if failed to send
> >> >     */
> >> >    final public void send(Relation deltaRelation) throws Exception {
> >> >      Relation rel = this.callback.afterProcess(deltaRelation,
> >>collector,
> >> > coordinator);
> >> > @@ -87,6 +100,9 @@ public class SimpleMessageCollector implements
> >> > MessageCollector {
> >> >
> >> >    /**
> >> >     * Method is declared to be final s.t. we enforce that the callback
> >> > functions are called first
> >> > +   *
> >> > +   * @param tuple The tuple to be sent out
> >> > +   * @throws Exception Throws exception if failed to send
> >> >     */
> >> >    final public void send(Tuple tuple) throws Exception {
> >> >      Tuple otuple = this.callback.afterProcess(tuple, collector,
> >> > coordinator);
> >> > @@ -106,9 +122,4 @@ public class SimpleMessageCollector implements
> >> > MessageCollector {
> >> >    protected void realSend(Tuple tuple) throws Exception {
> >> >      this.collector.send((OutgoingMessageEnvelope)
> >>tuple.getMessage());
> >> >    }
> >> > -
> >> > -  @Override
> >> > -  public void send(OutgoingMessageEnvelope envelope) {
> >> > -    this.collector.send(envelope);
> >> > -  }
> >> >  }
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper
> >>atorTask.java
> >> >
> >>
> >>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper
> >>atorTask.java
> >> > index 20dc701..7370af6 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper
> >>atorTask.java
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOper
> >>atorTask.java
> >> > @@ -22,6 +22,7 @@ package org.apache.samza.task.sql;
> >> >  import org.apache.samza.config.Config;
> >> >  import org.apache.samza.sql.api.data.Relation;
> >> >  import org.apache.samza.sql.api.data.Tuple;
> >> > +import org.apache.samza.sql.api.operators.Operator;
> >> >  import org.apache.samza.sql.api.operators.OperatorCallback;
> >> >  import org.apache.samza.sql.data.IncomingMessageTuple;
> >> >  import org.apache.samza.sql.operators.window.BoundedTimeWindow;
> >> > @@ -39,7 +40,7 @@ import org.apache.samza.task.WindowableTask;
> >> >   *
> >> >   */
> >> >  public class RandomWindowOperatorTask implements StreamTask,
> >> > InitableTask, WindowableTask {
> >> > -  private BoundedTimeWindow wndOp;
> >> > +  private Operator operator;
> >> >
> >> >    private final OperatorCallback wndCallback = new
> >>OperatorCallback() {
> >> >
> >> > @@ -77,20 +78,20 @@ public class RandomWindowOperatorTask implements
> >> > StreamTask, InitableTask, Windo
> >> >    public void process(IncomingMessageEnvelope envelope,
> >>MessageCollector
> >> > collector, TaskCoordinator coordinator)
> >> >        throws Exception {
> >> >      // based on tuple's stream name, get the window op and run
> >>process()
> >> > -    wndOp.process(new IncomingMessageTuple(envelope), collector,
> >> > coordinator);
> >> > +    operator.process(new IncomingMessageTuple(envelope), collector,
> >> > coordinator);
> >> >
> >> >    }
> >> >
> >> >    @Override
> >> >    public void window(MessageCollector collector, TaskCoordinator
> >> > coordinator) throws Exception {
> >> >      // based on tuple's stream name, get the window op and run
> >>process()
> >> > -    wndOp.refresh(System.nanoTime(), collector, coordinator);
> >> > +    operator.refresh(System.nanoTime(), collector, coordinator);
> >> >    }
> >> >
> >> >    @Override
> >> >    public void init(Config config, TaskContext context) throws
> >>Exception
> >> {
> >> >      // 1. create a fixed length 10 sec window operator
> >> > -    this.wndOp = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1",
> >> > "relation1", this.wndCallback);
> >> > -    this.wndOp.init(config, context);
> >> > +    this.operator = new BoundedTimeWindow("wndOp1", 10,
> >>"kafka:stream1",
> >> > "wndOutput", this.wndCallback);
> >> > +    this.operator.init(config, context);
> >> >    }
> >> >  }
> >> >
> >> >
> >> >
> >>
> >>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core
> >>/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
> >> > ----------------------------------------------------------------------
> >> > diff --git
> >> >
> >>
> >>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja
> >>va
> >> >
> >>
> >>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja
> >>va
> >> > index 9124e3c..d65892c 100644
> >> > ---
> >> >
> >>
> >>a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja
> >>va
> >> > +++
> >> >
> >>
> >>b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.ja
> >>va
> >> > @@ -24,7 +24,7 @@ import java.util.List;
> >> >
> >> >  import org.apache.samza.config.Config;
> >> >  import org.apache.samza.sql.data.IncomingMessageTuple;
> >> > -import org.apache.samza.sql.operators.factory.SimpleRouter;
> >> > +import org.apache.samza.sql.operators.SimpleRouter;
> >> >  import org.apache.samza.sql.operators.join.StreamStreamJoin;
> >> >  import org.apache.samza.sql.operators.partition.PartitionOp;
> >> >  import org.apache.samza.sql.operators.window.BoundedTimeWindow;
> >> > @@ -51,25 +51,25 @@ import org.apache.samza.task.WindowableTask;
> >> >   */
> >> >  public class StreamSqlTask implements StreamTask, InitableTask,
> >> > WindowableTask {
> >> >
> >> > -  private SimpleRouter rteCntx;
> >> > +  private SimpleRouter router;
> >> >
> >> >    @Override
> >> >    public void process(IncomingMessageEnvelope envelope,
> >>MessageCollector
> >> > collector, TaskCoordinator coordinator)
> >> >        throws Exception {
> >> > -    this.rteCntx.process(new IncomingMessageTuple(envelope),
> >>collector,
> >> > coordinator);
> >> > +    this.router.process(new IncomingMessageTuple(envelope),
> >>collector,
> >> > coordinator);
> >> >    }
> >> >
> >> >    @Override
> >> >    public void window(MessageCollector collector, TaskCoordinator
> >> > coordinator) throws Exception {
> >> > -    this.rteCntx.refresh(System.nanoTime(), collector, coordinator);
> >> > +    this.router.refresh(System.nanoTime(), collector, coordinator);
> >> >    }
> >> >
> >> >    @Override
> >> >    public void init(Config config, TaskContext context) throws
> >>Exception
> >> {
> >> >      // create all operators via the operator factory
> >> >      // 1. create two window operators
> >> > -    BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10,
> >> > "inputStream1", "fixedWndOutput1");
> >> > -    BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10,
> >> > "inputStream2", "fixedWndOutput2");
> >> > +    BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10,
> >> > "kafka:inputStream1", "fixedWndOutput1");
> >> > +    BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10,
> >> > "kafka:inputStream2", "fixedWndOutput2");
> >> >      // 2. create one join operator
> >> >      @SuppressWarnings("serial")
> >> >      List<String> inputRelations = new ArrayList<String>() {
> >> > @@ -86,19 +86,19 @@ public class StreamSqlTask implements StreamTask,
> >> > InitableTask, WindowableTask {
> >> >        }
> >> >      };
> >> >      StreamStreamJoin join = new StreamStreamJoin("joinOp",
> >> > inputRelations, "joinOutput", joinKeys);
> >> > -    // 4. create a re-partition operator
> >> > +    // 3. create a re-partition operator
> >> >      PartitionOp par = new PartitionOp("parOp1", "joinOutput",
> >>"kafka",
> >> > "parOutputStrm1", "joinKey", 50);
> >> >
> >> >      // Now, connecting the operators via the OperatorRouter
> >> > -    this.rteCntx = new SimpleRouter();
> >> > +    this.router = new SimpleRouter();
> >> >      // 1. set two system input operators (i.e. two window operators)
> >> > -    this.rteCntx.addOperator(wnd1);
> >> > -    this.rteCntx.addOperator(wnd2);
> >> > +    this.router.addOperator(wnd1);
> >> > +    this.router.addOperator(wnd2);
> >> >      // 2. connect join operator to both window operators
> >> > -    this.rteCntx.addOperator(join);
> >> > +    this.router.addOperator(join);
> >> >      // 3. connect re-partition operator to the stream operator
> >> > -    this.rteCntx.addOperator(par);
> >> > +    this.router.addOperator(par);
> >> >
> >> > -    this.rteCntx.init(config, context);
> >> > +    this.router.init(config, context);
> >> >    }
> >> >  }
> >> >
> >> >
> >>
> >>
> >> --
> >> Milinda Pathirage
> >>
> >> PhD Student | Research Assistant
> >> School of Informatics and Computing | Data to Insight Center
> >> Indiana University
> >>
> >> twitter: milindalakmal
> >> skype: milinda.pathirage
> >> blog: http://milinda.pathirage.org
> >>
>
>


-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Reply via email to