Hi Navina,

Did we decided to push this patch to samza-sql branch. I thought Yi is
still working on this. Some Git conflict related texts are still there in
this commit.

+<<<<<<< HEAD
+   * The callback object
+=======
+   * The callback function
+>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of
callbacks w/o inheriting and creating many sub-classes from operators

Milinda

On Mon, Jun 1, 2015 at 9:06 PM, <nav...@apache.org> wrote:

> Yi's TopologyBuilder RB 34500
>
>
> Project: http://git-wip-us.apache.org/repos/asf/samza/repo
> Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/45b85477
> Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/45b85477
> Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/45b85477
>
> Branch: refs/heads/samza-sql
> Commit: 45b854772cf36cc69e8d8cda7a51bce1be5fe576
> Parents: 41c4cd0
> Author: Navina <navi.trin...@gmail.com>
> Authored: Thu May 28 18:51:30 2015 -0700
> Committer: Navina <navi.trin...@gmail.com>
> Committed: Thu May 28 18:51:30 2015 -0700
>
> ----------------------------------------------------------------------
>  .../apache/samza/sql/api/data/EntityName.java   |  41 ++-
>  .../org/apache/samza/sql/api/data/Table.java    |   7 +-
>  .../samza/sql/api/operators/Operator.java       |   4 +
>  .../sql/api/operators/OperatorCallback.java     |   1 -
>  .../samza/sql/api/operators/OperatorRouter.java |   8 +
>  .../samza/sql/api/operators/OperatorSink.java   |  30 ++
>  .../samza/sql/api/operators/OperatorSource.java |  30 ++
>  .../samza/sql/api/operators/SimpleOperator.java |   3 +-
>  .../samza/sql/data/IncomingMessageTuple.java    |   1 -
>  .../sql/operators/NoopOperatorCallback.java     |  53 ++++
>  .../samza/sql/operators/OperatorTopology.java   |  53 ++++
>  .../samza/sql/operators/SimpleOperatorImpl.java | 147 ++++++++++
>  .../samza/sql/operators/SimpleOperatorSpec.java | 106 +++++++
>  .../samza/sql/operators/SimpleRouter.java       | 141 +++++++++
>  .../operators/factory/NoopOperatorCallback.java |  50 ----
>  .../operators/factory/SimpleOperatorImpl.java   | 136 ---------
>  .../operators/factory/SimpleOperatorSpec.java   | 106 -------
>  .../sql/operators/factory/SimpleRouter.java     | 136 ---------
>  .../sql/operators/factory/TopologyBuilder.java  | 284 +++++++++++++++++++
>  .../sql/operators/join/StreamStreamJoin.java    |   3 +-
>  .../operators/join/StreamStreamJoinSpec.java    |  15 +-
>  .../sql/operators/partition/PartitionOp.java    |   3 +-
>  .../sql/operators/partition/PartitionSpec.java  |   2 +-
>  .../sql/operators/window/BoundedTimeWindow.java |   4 +-
>  .../samza/sql/operators/window/WindowSpec.java  |   7 +-
>  .../samza/task/sql/SimpleMessageCollector.java  |  37 ++-
>  .../task/sql/RandomWindowOperatorTask.java      |  11 +-
>  .../apache/samza/task/sql/StreamSqlTask.java    |  26 +-
>  .../samza/task/sql/UserCallbacksSqlTask.java    |  66 ++---
>  29 files changed, 991 insertions(+), 520 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
> index 80ba455..df1b11b 100644
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
> @@ -49,6 +49,8 @@ public class EntityName {
>     */
>    private final String name;
>
> +  private final boolean isSystemEntity;
> +
>    /**
>     * Static map of already allocated table names
>     */
> @@ -59,15 +61,19 @@ public class EntityName {
>     */
>    private static Map<String, EntityName> streams = new HashMap<String,
> EntityName>();
>
> +  private static final String ANONYMOUS = "anonymous";
> +
>    /**
>     * Private ctor to create entity names
>     *
>     * @param type Type of the entity name
>     * @param name Formatted name of the entity
> +   * @param isSystemEntity whether the entity is a system input/output
>     */
> -  private EntityName(EntityType type, String name) {
> +  private EntityName(EntityType type, String name, boolean
> isSystemEntity) {
>      this.type = type;
>      this.name = name;
> +    this.isSystemEntity = isSystemEntity;
>    }
>
>    @Override
> @@ -102,6 +108,10 @@ public class EntityName {
>      return this.type.equals(EntityType.STREAM);
>    }
>
> +  public boolean isSystemEntity() {
> +    return this.isSystemEntity;
> +  }
> +
>    /**
>     * Get the formatted entity name
>     *
> @@ -111,15 +121,24 @@ public class EntityName {
>      return this.name;
>    }
>
> +  public static EntityName getTableName(String name) {
> +    return getTableName(name, false);
> +  }
> +
> +  public static EntityName getStreamName(String name) {
> +    return getStreamName(name, false);
> +  }
> +
>    /**
>     * Static method to get the instance of {@code EntityName} with type
> {@code EntityType.TABLE}
>     *
>     * @param name The formatted entity name of the relation
> +   * @param isSystem The boolean flag indicating whether this is a system
> input/output
>     * @return A <code>EntityName</code> for a relation
>     */
> -  public static EntityName getTableName(String name) {
> +  public static EntityName getTableName(String name, boolean isSystem) {
>      if (tables.get(name) == null) {
> -      tables.put(name, new EntityName(EntityType.TABLE, name));
> +      tables.put(name, new EntityName(EntityType.TABLE, name, isSystem));
>      }
>      return tables.get(name);
>    }
> @@ -128,13 +147,25 @@ public class EntityName {
>     * Static method to get the instance of <code>EntityName</code> with
> type <code>EntityType.STREAM</code>
>     *
>     * @param name The formatted entity name of the stream
> +   * @param isSystem The boolean flag indicating whether this is a system
> input/output
>     * @return A <code>EntityName</code> for a stream
>     */
> -  public static EntityName getStreamName(String name) {
> +  public static EntityName getStreamName(String name, boolean isSystem) {
>      if (streams.get(name) == null) {
> -      streams.put(name, new EntityName(EntityType.STREAM, name));
> +      streams.put(name, new EntityName(EntityType.STREAM, name,
> isSystem));
>      }
>      return streams.get(name);
>    }
>
> +  public static EntityName getAnonymousStream() {
> +    return getStreamName(ANONYMOUS);
> +  }
> +
> +  public static EntityName getAnonymousTable() {
> +    return getTableName(ANONYMOUS);
> +  }
> +
> +  public boolean isAnonymous() {
> +    return this.name.equals(ANONYMOUS);
> +  }
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> index 7b4d984..b4dce07 100644
> --- a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> +++ b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> @@ -19,6 +19,9 @@
>
>  package org.apache.samza.sql.api.data;
>
> +import java.util.List;
> +
> +
>  /**
>   * This interface defines a non-ordered {@link
> org.apache.samza.sql.api.data.Relation}, which has a unique primary key
>   *
> @@ -31,8 +34,8 @@ public interface Table<K> extends Relation<K> {
>    /**
>     * Get the primary key field name for this table
>     *
> -   * @return The name of the primary key field
> +   * @return The names of the primary key fields
>     */
> -  String getPrimaryKeyName();
> +  List<String> getPrimaryKeyNames();
>
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
> index d6f6b57..9c6eaa5 100644
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
> @@ -27,7 +27,11 @@ import org.apache.samza.task.TaskContext;
>  import org.apache.samza.task.TaskCoordinator;
>
>
> +/**
> + * This class defines the common interface for operator classes.
> + */
>  public interface Operator {
> +
>    /**
>     * Method to initialize the operator
>     *
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
> index fb2aa89..5a77d95 100644
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
> @@ -23,7 +23,6 @@ import org.apache.samza.sql.api.data.Tuple;
>  import org.apache.samza.task.MessageCollector;
>  import org.apache.samza.task.TaskCoordinator;
>
> -
>  /**
>   * Defines the callback functions to allow customized functions to be
> invoked before process and before sending the result
>   */
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
> index 0759638..432e6b3 100644
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
> @@ -19,6 +19,7 @@
>
>  package org.apache.samza.sql.api.operators;
>
> +import java.util.Iterator;
>  import java.util.List;
>
>  import org.apache.samza.sql.api.data.EntityName;
> @@ -51,4 +52,11 @@ public interface OperatorRouter extends Operator {
>     */
>    List<SimpleOperator> getNextOperators(EntityName output);
>
> +  /**
> +   * This method provides an iterator to go through all operators
> connected via {@code OperatorRouter}
> +   *
> +   * @return An {@link java.util.Iterator} for all operators connected
> via {@code OperatorRouter}
> +   */
> +  Iterator<SimpleOperator> iterator();
> +
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
> new file mode 100644
> index 0000000..e2c748c
> --- /dev/null
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
> @@ -0,0 +1,30 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +package org.apache.samza.sql.api.operators;
> +
> +import java.util.Iterator;
> +
> +import org.apache.samza.sql.api.data.EntityName;
> +
> +
> +public interface OperatorSink {
> +  Iterator<SimpleOperator> opIterator();
> +
> +  EntityName getName();
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
> new file mode 100644
> index 0000000..860c1aa
> --- /dev/null
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
> @@ -0,0 +1,30 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +package org.apache.samza.sql.api.operators;
> +
> +import java.util.Iterator;
> +
> +import org.apache.samza.sql.api.data.EntityName;
> +
> +
> +public interface OperatorSource {
> +  Iterator<SimpleOperator> opIterator();
> +
> +  EntityName getName();
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
> index c49a822..60ace9c 100644
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
> @@ -19,8 +19,6 @@
>
>  package org.apache.samza.sql.api.operators;
>
> -
> -
>  /**
>   * The interface for a {@code SimpleOperator} that implements a simple
> primitive relational logic operation
>   */
> @@ -31,4 +29,5 @@ public interface SimpleOperator extends Operator {
>     * @return The {@link org.apache.samza.sql.api.operators.OperatorSpec}
> object that defines the configuration/parameters of the operator
>     */
>    OperatorSpec getSpec();
> +
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
> index 72a59f2..af040f0 100644
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
> @@ -81,7 +81,6 @@ public class IncomingMessageTuple implements Tuple {
>
>    @Override
>    public long getCreateTimeNano() {
> -    // TODO: this is wrong and just to keep as an placeholder. It should
> be replaced by the message publish time when the publish timestamp is
> available in the message metadata
>      return this.recvTimeNano;
>    }
>
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
> new file mode 100644
> index 0000000..e951737
> --- /dev/null
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
> @@ -0,0 +1,53 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +package org.apache.samza.sql.operators;
> +
> +import org.apache.samza.sql.api.data.Relation;
> +import org.apache.samza.sql.api.data.Tuple;
> +import org.apache.samza.sql.api.operators.OperatorCallback;
> +import org.apache.samza.task.MessageCollector;
> +import org.apache.samza.task.TaskCoordinator;
> +
> +
> +/**
> + * This is a default NOOP operator callback object that does nothing
> before and after the process method
> + */
> +public final class NoopOperatorCallback implements OperatorCallback {
> +
> +  @Override
> +  public Tuple beforeProcess(Tuple tuple, MessageCollector collector,
> TaskCoordinator coordinator) {
> +    return tuple;
> +  }
> +
> +  @Override
> +  public Relation beforeProcess(Relation rel, MessageCollector collector,
> TaskCoordinator coordinator) {
> +    return rel;
> +  }
> +
> +  @Override
> +  public Tuple afterProcess(Tuple tuple, MessageCollector collector,
> TaskCoordinator coordinator) {
> +    return tuple;
> +  }
> +
> +  @Override
> +  public Relation afterProcess(Relation rel, MessageCollector collector,
> TaskCoordinator coordinator) {
> +    return rel;
> +  }
> +
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
> new file mode 100644
> index 0000000..8b70092
> --- /dev/null
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
> @@ -0,0 +1,53 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +package org.apache.samza.sql.operators;
> +
> +import java.util.Iterator;
> +
> +import org.apache.samza.sql.api.data.EntityName;
> +import org.apache.samza.sql.api.operators.OperatorSink;
> +import org.apache.samza.sql.api.operators.OperatorSource;
> +import org.apache.samza.sql.api.operators.SimpleOperator;
> +
> +
> +/**
> + * This class implements a partially completed {@link
> org.apache.samza.sql.operators.factory.TopologyBuilder} that signifies a
> partially completed
> + * topology that the current operator has unbounded input stream that can
> be attached to other operators' output
> + */
> +public class OperatorTopology implements OperatorSource, OperatorSink {
> +
> +  private final EntityName name;
> +  private final SimpleRouter router;
> +
> +  public OperatorTopology(EntityName name, SimpleRouter router) {
> +    this.name = name;
> +    this.router = router;
> +  }
> +
> +  @Override
> +  public Iterator<SimpleOperator> opIterator() {
> +    return this.router.iterator();
> +  }
> +
> +  @Override
> +  public EntityName getName() {
> +    return this.name;
> +  }
> +
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
> new file mode 100644
> index 0000000..423880b
> --- /dev/null
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
> @@ -0,0 +1,147 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +
> +package org.apache.samza.sql.operators;
> +
> +import org.apache.samza.sql.api.data.Relation;
> +import org.apache.samza.sql.api.data.Tuple;
> +import org.apache.samza.sql.api.operators.OperatorCallback;
> +import org.apache.samza.sql.api.operators.OperatorSpec;
> +import org.apache.samza.sql.api.operators.SimpleOperator;
> +import org.apache.samza.task.MessageCollector;
> +import org.apache.samza.task.TaskCoordinator;
> +import org.apache.samza.task.sql.SimpleMessageCollector;
> +
> +
> +/**
> + * An abstract class that encapsulate the basic information and methods
> that all operator classes should implement.
> + * It implements the interface {@link
> org.apache.samza.sql.api.operators.SimpleOperator}
> + */
> +public abstract class SimpleOperatorImpl implements SimpleOperator {
> +  /**
> +   * The specification of this operator
> +   */
> +  private final OperatorSpec spec;
> +
> +  /**
> +<<<<<<< HEAD
> +   * The callback object
> +=======
> +   * The callback function
> +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of
> callbacks w/o inheriting and creating many sub-classes from operators
> +   */
> +  private final OperatorCallback callback;
> +
> +  /**
> +   * Ctor of {@code SimpleOperatorImpl} class
> +   *
> +   * @param spec The specification of this operator
> +   */
> +  public SimpleOperatorImpl(OperatorSpec spec) {
> +    this(spec, new NoopOperatorCallback());
> +  }
> +
> +  public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback callback)
> {
> +    this.spec = spec;
> +    this.callback = callback;
> +  }
> +
> +  @Override
> +  public OperatorSpec getSpec() {
> +    return this.spec;
> +  }
> +
> +  /**
> +   * This method is made final s.t. the sequence of invocations between
> {@link
> org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation,
> MessageCollector, TaskCoordinator)}
> +   * and real processing of the input relation is fixed.
> +   */
> +  @Override
> +  final public void process(Relation deltaRelation, MessageCollector
> collector, TaskCoordinator coordinator)
> +      throws Exception {
> +    Relation rel = this.callback.beforeProcess(deltaRelation, collector,
> coordinator);
> +    if (rel == null) {
> +      return;
> +    }
> +    this.realProcess(rel, getCollector(collector, coordinator),
> coordinator);
> +  }
> +
> +  /**
> +   * This method is made final s.t. the sequence of invocations between
> {@link
> org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple,
> MessageCollector, TaskCoordinator)}
> +   * and real processing of the input tuple is fixed.
> +   */
> +  @Override
> +  final public void process(Tuple tuple, MessageCollector collector,
> TaskCoordinator coordinator) throws Exception {
> +    Tuple ituple = this.callback.beforeProcess(tuple, collector,
> coordinator);
> +    if (ituple == null) {
> +      return;
> +    }
> +    this.realProcess(ituple, getCollector(collector, coordinator),
> coordinator);
> +  }
> +
> +  /**
> +   * This method is made final s.t. we enforce the invocation of {@code
> SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} before
> doing anything futher
> +   */
> +  @Override
> +  final public void refresh(long timeNano, MessageCollector collector,
> TaskCoordinator coordinator) throws Exception {
> +    this.realRefresh(timeNano, getCollector(collector, coordinator),
> coordinator);
> +  }
> +
> +  private SimpleMessageCollector getCollector(MessageCollector collector,
> TaskCoordinator coordinator) {
> +    if (!(collector instanceof SimpleMessageCollector)) {
> +      return new SimpleMessageCollector(collector, coordinator,
> this.callback);
> +    } else {
> +      ((SimpleMessageCollector) collector).switchCallback(this.callback);
> +      return (SimpleMessageCollector) collector;
> +    }
> +  }
> +
> +  /**
> +   * Method to be overriden by each specific implementation class of
> operator to handle timeout event
> +   *
> +   * @param timeNano The time in nanosecond when the timeout event
> occurred
> +   * @param collector The {@link
> org.apache.samza.task.sql.SimpleMessageCollector} in the context
> +   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator}
> in the context
> +   * @throws Exception Throws exception if failed to refresh the results
> +   */
> +  protected abstract void realRefresh(long timeNano,
> SimpleMessageCollector collector, TaskCoordinator coordinator)
> +      throws Exception;
> +
> +  /**
> +   * Method to be overriden by each specific implementation class of
> operator to perform relational logic operation on an input {@link
> org.apache.samza.sql.api.data.Relation}
> +   *
> +   * @param rel The input relation
> +   * @param collector The {@link
> org.apache.samza.task.sql.SimpleMessageCollector} in the context
> +   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator}
> in the context
> +   * @throws Exception Throws exception if failed to process
> +   */
> +  protected abstract void realProcess(Relation rel,
> SimpleMessageCollector collector, TaskCoordinator coordinator)
> +      throws Exception;
> +
> +  /**
> +   * Method to be overriden by each specific implementation class of
> operator to perform relational logic operation on an input {@link
> org.apache.samza.sql.api.data.Tuple}
> +   *
> +   * @param ituple The input tuple
> +   * @param collector The {@link
> org.apache.samza.task.sql.SimpleMessageCollector} in the context
> +   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator}
> in the context
> +   * @throws Exception Throws exception if failed to process
> +   */
> +  protected abstract void realProcess(Tuple ituple,
> SimpleMessageCollector collector, TaskCoordinator coordinator)
> +      throws Exception;
> +
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
> new file mode 100644
> index 0000000..691e543
> --- /dev/null
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
> @@ -0,0 +1,106 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +package org.apache.samza.sql.operators;
> +
> +import java.util.ArrayList;
> +import java.util.List;
> +
> +import org.apache.samza.sql.api.data.EntityName;
> +import org.apache.samza.sql.api.operators.OperatorSpec;
> +
> +
> +/**
> + * An abstract class that encapsulate the basic information and methods
> that all specification of operators should implement.
> + * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec}
> + */
> +public abstract class SimpleOperatorSpec implements OperatorSpec {
> +  /**
> +   * The identifier of the corresponding operator
> +   */
> +  private final String id;
> +
> +  /**
> +   * The list of input entity names of the corresponding operator
> +   */
> +  private final List<EntityName> inputs = new ArrayList<EntityName>();
> +
> +  /**
> +   * The list of output entity names of the corresponding operator
> +   */
> +  private final List<EntityName> outputs = new ArrayList<EntityName>();
> +
> +  /**
> +   * Ctor of the {@code SimpleOperatorSpec} for simple {@link
> org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one
> output
> +   *
> +   * @param id Unique identifier of the {@link
> org.apache.samza.sql.api.operators.SimpleOperator} object
> +   * @param input The only input entity
> +   * @param output The only output entity
> +   */
> +  public SimpleOperatorSpec(String id, EntityName input, EntityName
> output) {
> +    this.id = id;
> +    this.inputs.add(input);
> +    this.outputs.add(output);
> +  }
> +
> +  /**
> +   * Ctor of {@code SimpleOperatorSpec} with general format: m inputs and
> n outputs
> +   *
> +   * @param id Unique identifier of the {@link
> org.apache.samza.sql.api.operators.SimpleOperator} object
> +   * @param inputs The list of input entities
> +   * @param output The list of output entities
> +   */
> +  public SimpleOperatorSpec(String id, List<EntityName> inputs,
> EntityName output) {
> +    this.id = id;
> +    this.inputs.addAll(inputs);
> +    this.outputs.add(output);
> +  }
> +
> +  @Override
> +  public String getId() {
> +    return this.id;
> +  }
> +
> +  @Override
> +  public List<EntityName> getInputNames() {
> +    return this.inputs;
> +  }
> +
> +  @Override
> +  public List<EntityName> getOutputNames() {
> +    return this.outputs;
> +  }
> +
> +  /**
> +   * Method to get the first output entity
> +   *
> +   * @return The first output entity name
> +   */
> +  public EntityName getOutputName() {
> +    return this.outputs.get(0);
> +  }
> +
> +  /**
> +   * Method to get the first input entity
> +   *
> +   * @return The first input entity name
> +   */
> +  public EntityName getInputName() {
> +    return this.inputs.get(0);
> +  }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java
> new file mode 100644
> index 0000000..2d9a1db
> --- /dev/null
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java
> @@ -0,0 +1,141 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +
> +package org.apache.samza.sql.operators;
> +
> +import java.util.ArrayList;
> +import java.util.HashMap;
> +import java.util.HashSet;
> +import java.util.Iterator;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.Set;
> +
> +import org.apache.samza.config.Config;
> +import org.apache.samza.sql.api.data.EntityName;
> +import org.apache.samza.sql.api.data.Relation;
> +import org.apache.samza.sql.api.data.Tuple;
> +import org.apache.samza.sql.api.operators.Operator;
> +import org.apache.samza.sql.api.operators.OperatorRouter;
> +import org.apache.samza.sql.api.operators.SimpleOperator;
> +import org.apache.samza.task.MessageCollector;
> +import org.apache.samza.task.TaskContext;
> +import org.apache.samza.task.TaskCoordinator;
> +import org.apache.samza.task.sql.RouterMessageCollector;
> +
> +
> +/**
> + * Example implementation of {@link
> org.apache.samza.sql.api.operators.OperatorRouter}
> + *
> + */
> +public final class SimpleRouter implements OperatorRouter {
> +  /**
> +   * List of operators added to the {@link
> org.apache.samza.sql.api.operators.OperatorRouter}
> +   */
> +  private List<SimpleOperator> operators = new
> ArrayList<SimpleOperator>();
> +
> +  @SuppressWarnings("rawtypes")
> +  /**
> +   * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list
> of operators associated with it
> +   */
> +  private Map<EntityName, List> nextOps = new HashMap<EntityName, List>();
> +
> +  /**
> +   * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs to
> this {@code SimpleRouter}
> +   */
> +  private Set<EntityName> inputEntities = new HashSet<EntityName>();
> +
> +  /**
> +   * Set of entities that are not input entities to this {@code
> SimpleRouter}
> +   */
> +  private Set<EntityName> outputEntities = new HashSet<EntityName>();
> +
> +  @SuppressWarnings("unchecked")
> +  private void addOperator(EntityName input, SimpleOperator nextOp) {
> +    if (nextOps.get(input) == null) {
> +      nextOps.put(input, new ArrayList<Operator>());
> +    }
> +    nextOps.get(input).add(nextOp);
> +    operators.add(nextOp);
> +    // get the operator spec
> +    for (EntityName output : nextOp.getSpec().getOutputNames()) {
> +      if (inputEntities.contains(output)) {
> +        inputEntities.remove(output);
> +      }
> +      outputEntities.add(output);
> +    }
> +    if (!outputEntities.contains(input)) {
> +      inputEntities.add(input);
> +    }
> +  }
> +
> +  @Override
> +  @SuppressWarnings("unchecked")
> +  public List<SimpleOperator> getNextOperators(EntityName entity) {
> +    return nextOps.get(entity);
> +  }
> +
> +  @Override
> +  public void addOperator(SimpleOperator nextOp) {
> +    List<EntityName> inputs = nextOp.getSpec().getInputNames();
> +    for (EntityName input : inputs) {
> +      addOperator(input, nextOp);
> +    }
> +  }
> +
> +  @Override
> +  public void init(Config config, TaskContext context) throws Exception {
> +    for (SimpleOperator op : this.operators) {
> +      op.init(config, context);
> +    }
> +  }
> +
> +  @Override
> +  public void process(Tuple ituple, MessageCollector collector,
> TaskCoordinator coordinator) throws Exception {
> +    MessageCollector opCollector = new RouterMessageCollector(collector,
> coordinator, this);
> +    for (Iterator<SimpleOperator> iter =
> this.getNextOperators(ituple.getEntityName()).iterator(); iter.hasNext();) {
> +      iter.next().process(ituple, opCollector, coordinator);
> +    }
> +  }
> +
> +  @SuppressWarnings("rawtypes")
> +  @Override
> +  public void process(Relation deltaRelation, MessageCollector collector,
> TaskCoordinator coordinator) throws Exception {
> +    MessageCollector opCollector = new RouterMessageCollector(collector,
> coordinator, this);
> +    for (Iterator<SimpleOperator> iter =
> this.getNextOperators(deltaRelation.getName()).iterator(); iter.hasNext();)
> {
> +      iter.next().process(deltaRelation, opCollector, coordinator);
> +    }
> +  }
> +
> +  @Override
> +  public void refresh(long nanoSec, MessageCollector collector,
> TaskCoordinator coordinator) throws Exception {
> +    MessageCollector opCollector = new RouterMessageCollector(collector,
> coordinator, this);
> +    for (EntityName entity : inputEntities) {
> +      for (Iterator<SimpleOperator> iter =
> this.getNextOperators(entity).iterator(); iter.hasNext();) {
> +        iter.next().refresh(nanoSec, opCollector, coordinator);
> +      }
> +    }
> +  }
> +
> +  @Override
> +  public Iterator<SimpleOperator> iterator() {
> +    return this.operators.iterator();
> +  }
> +
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
> deleted file mode 100644
> index c3d2266..0000000
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
> +++ /dev/null
> @@ -1,50 +0,0 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing,
> - * software distributed under the License is distributed on an
> - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> - * KIND, either express or implied.  See the License for the
> - * specific language governing permissions and limitations
> - * under the License.
> - */
> -package org.apache.samza.sql.operators.factory;
> -
> -import org.apache.samza.sql.api.data.Relation;
> -import org.apache.samza.sql.api.data.Tuple;
> -import org.apache.samza.sql.api.operators.OperatorCallback;
> -import org.apache.samza.task.MessageCollector;
> -import org.apache.samza.task.TaskCoordinator;
> -
> -
> -public final class NoopOperatorCallback implements OperatorCallback {
> -
> -  @Override
> -  public Tuple beforeProcess(Tuple tuple, MessageCollector collector,
> TaskCoordinator coordinator) {
> -    return tuple;
> -  }
> -
> -  @Override
> -  public Relation beforeProcess(Relation rel, MessageCollector collector,
> TaskCoordinator coordinator) {
> -    return rel;
> -  }
> -
> -  @Override
> -  public Tuple afterProcess(Tuple tuple, MessageCollector collector,
> TaskCoordinator coordinator) {
> -    return tuple;
> -  }
> -
> -  @Override
> -  public Relation afterProcess(Relation rel, MessageCollector collector,
> TaskCoordinator coordinator) {
> -    return rel;
> -  }
> -
> -}
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
> deleted file mode 100644
> index e66451f..0000000
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
> +++ /dev/null
> @@ -1,136 +0,0 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing,
> - * software distributed under the License is distributed on an
> - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> - * KIND, either express or implied.  See the License for the
> - * specific language governing permissions and limitations
> - * under the License.
> - */
> -
> -package org.apache.samza.sql.operators.factory;
> -
> -import org.apache.samza.sql.api.data.Relation;
> -import org.apache.samza.sql.api.data.Tuple;
> -import org.apache.samza.sql.api.operators.OperatorCallback;
> -import org.apache.samza.sql.api.operators.OperatorSpec;
> -import org.apache.samza.sql.api.operators.SimpleOperator;
> -import org.apache.samza.task.MessageCollector;
> -import org.apache.samza.task.TaskCoordinator;
> -import org.apache.samza.task.sql.SimpleMessageCollector;
> -
> -
> -/**
> - * An abstract class that encapsulate the basic information and methods
> that all operator classes should implement.
> - * It implements the interface {@link
> org.apache.samza.sql.api.operators.SimpleOperator}
> - *
> - */
> -public abstract class SimpleOperatorImpl implements SimpleOperator {
> -  /**
> -   * The specification of this operator
> -   */
> -  private final OperatorSpec spec;
> -
> -  /**
> -   * The callback function
> -   */
> -  private final OperatorCallback callback;
> -
> -  /**
> -   * Ctor of {@code SimpleOperatorImpl} class
> -   *
> -   * @param spec The specification of this operator
> -   */
> -  public SimpleOperatorImpl(OperatorSpec spec) {
> -    this(spec, new NoopOperatorCallback());
> -  }
> -
> -  public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback callback)
> {
> -    this.spec = spec;
> -    this.callback = callback;
> -  }
> -
> -  @Override
> -  public OperatorSpec getSpec() {
> -    return this.spec;
> -  }
> -
> -  /**
> -   * This method is made final s.t. the sequence of invocations between
> {@link
> org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation,
> MessageCollector, TaskCoordinator)}
> -   * and real processing of the input relation is fixed.
> -   */
> -  @Override
> -  final public void process(Relation deltaRelation, MessageCollector
> collector, TaskCoordinator coordinator)
> -      throws Exception {
> -    Relation rel = this.callback.beforeProcess(deltaRelation, collector,
> coordinator);
> -    if (rel == null) {
> -      return;
> -    }
> -    this.realProcess(rel, getCollector(collector, coordinator),
> coordinator);
> -  }
> -
> -  /**
> -   * This method is made final s.t. the sequence of invocations between
> {@link
> org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple,
> MessageCollector, TaskCoordinator)}
> -   * and real processing of the input tuple is fixed.
> -   */
> -  @Override
> -  final public void process(Tuple tuple, MessageCollector collector,
> TaskCoordinator coordinator) throws Exception {
> -    Tuple ituple = this.callback.beforeProcess(tuple, collector,
> coordinator);
> -    if (ituple == null) {
> -      return;
> -    }
> -    this.realProcess(ituple, getCollector(collector, coordinator),
> coordinator);
> -  }
> -
> -  /**
> -   * This method is made final s.t. we enforce the invocation of {@code
> SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} before
> doing anything futher
> -   */
> -  @Override
> -  final public void refresh(long timeNano, MessageCollector collector,
> TaskCoordinator coordinator) throws Exception {
> -    this.realRefresh(timeNano, getCollector(collector, coordinator),
> coordinator);
> -  }
> -
> -  private SimpleMessageCollector getCollector(MessageCollector collector,
> TaskCoordinator coordinator) {
> -    if (!(collector instanceof SimpleMessageCollector)) {
> -      return new SimpleMessageCollector(collector, coordinator,
> this.callback);
> -    } else {
> -      ((SimpleMessageCollector)
> collector).switchOperatorCallback(this.callback);
> -      return (SimpleMessageCollector) collector;
> -    }
> -  }
> -
> -  /**
> -   * Method to be overriden by each specific implementation class of
> operator to handle timeout event
> -   *
> -   * @param timeNano The time in nanosecond when the timeout event
> occurred
> -   * @param collector The {@link
> org.apache.samza.task.sql.SimpleMessageCollector} in the context
> -   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator}
> in the context
> -   * @throws Exception Throws exception if failed to refresh the results
> -   */
> -  protected abstract void realRefresh(long timeNano,
> SimpleMessageCollector collector, TaskCoordinator coordinator)
> -      throws Exception;
> -
> -  /**
> -   * Method to be overriden by each specific implementation class of
> operator to perform relational logic operation on an input {@link
> org.apache.samza.sql.api.data.Relation}
> -   *
> -   * @param rel The input relation
> -   * @param collector The {@link
> org.apache.samza.task.sql.SimpleMessageCollector} in the context
> -   * @param coordinator The {@link org.apache.samza.task.TaskCoordinator}
> in the context
> -   * @throws Exception
> -   */
> -  protected abstract void realProcess(Relation rel,
> SimpleMessageCollector collector, TaskCoordinator coordinator)
> -      throws Exception;
> -
> -  protected abstract void realProcess(Tuple ituple,
> SimpleMessageCollector collector, TaskCoordinator coordinator)
> -      throws Exception;
> -
> -}
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
> deleted file mode 100644
> index 56753b6..0000000
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
> +++ /dev/null
> @@ -1,106 +0,0 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing,
> - * software distributed under the License is distributed on an
> - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> - * KIND, either express or implied.  See the License for the
> - * specific language governing permissions and limitations
> - * under the License.
> - */
> -package org.apache.samza.sql.operators.factory;
> -
> -import java.util.ArrayList;
> -import java.util.List;
> -
> -import org.apache.samza.sql.api.data.EntityName;
> -import org.apache.samza.sql.api.operators.OperatorSpec;
> -
> -
> -/**
> - * An abstract class that encapsulate the basic information and methods
> that all specification of operators should implement.
> - * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec}
> - */
> -public abstract class SimpleOperatorSpec implements OperatorSpec {
> -  /**
> -   * The identifier of the corresponding operator
> -   */
> -  private final String id;
> -
> -  /**
> -   * The list of input entity names of the corresponding operator
> -   */
> -  private final List<EntityName> inputs = new ArrayList<EntityName>();
> -
> -  /**
> -   * The list of output entity names of the corresponding operator
> -   */
> -  private final List<EntityName> outputs = new ArrayList<EntityName>();
> -
> -  /**
> -   * Ctor of the {@code SimpleOperatorSpec} for simple {@link
> org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one
> output
> -   *
> -   * @param id Unique identifier of the {@link
> org.apache.samza.sql.api.operators.SimpleOperator} object
> -   * @param input The only input entity
> -   * @param output The only output entity
> -   */
> -  public SimpleOperatorSpec(String id, EntityName input, EntityName
> output) {
> -    this.id = id;
> -    this.inputs.add(input);
> -    this.outputs.add(output);
> -  }
> -
> -  /**
> -   * Ctor of {@code SimpleOperatorSpec} with general format: m inputs and
> n outputs
> -   *
> -   * @param id Unique identifier of the {@link
> org.apache.samza.sql.api.operators.SimpleOperator} object
> -   * @param inputs The list of input entities
> -   * @param output The list of output entities
> -   */
> -  public SimpleOperatorSpec(String id, List<EntityName> inputs,
> EntityName output) {
> -    this.id = id;
> -    this.inputs.addAll(inputs);
> -    this.outputs.add(output);
> -  }
> -
> -  @Override
> -  public String getId() {
> -    return this.id;
> -  }
> -
> -  @Override
> -  public List<EntityName> getInputNames() {
> -    return this.inputs;
> -  }
> -
> -  @Override
> -  public List<EntityName> getOutputNames() {
> -    return this.outputs;
> -  }
> -
> -  /**
> -   * Method to get the first output entity
> -   *
> -   * @return The first output entity name
> -   */
> -  public EntityName getOutputName() {
> -    return this.outputs.get(0);
> -  }
> -
> -  /**
> -   * Method to get the first input entity
> -   *
> -   * @return The first input entity name
> -   */
> -  public EntityName getInputName() {
> -    return this.inputs.get(0);
> -  }
> -}
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
> deleted file mode 100644
> index e570897..0000000
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
> +++ /dev/null
> @@ -1,136 +0,0 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *   http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing,
> - * software distributed under the License is distributed on an
> - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> - * KIND, either express or implied.  See the License for the
> - * specific language governing permissions and limitations
> - * under the License.
> - */
> -
> -package org.apache.samza.sql.operators.factory;
> -
> -import java.util.ArrayList;
> -import java.util.HashMap;
> -import java.util.HashSet;
> -import java.util.Iterator;
> -import java.util.List;
> -import java.util.Map;
> -import java.util.Set;
> -
> -import org.apache.samza.config.Config;
> -import org.apache.samza.sql.api.data.EntityName;
> -import org.apache.samza.sql.api.data.Relation;
> -import org.apache.samza.sql.api.data.Tuple;
> -import org.apache.samza.sql.api.operators.Operator;
> -import org.apache.samza.sql.api.operators.OperatorRouter;
> -import org.apache.samza.sql.api.operators.SimpleOperator;
> -import org.apache.samza.task.MessageCollector;
> -import org.apache.samza.task.TaskContext;
> -import org.apache.samza.task.TaskCoordinator;
> -import org.apache.samza.task.sql.RouterMessageCollector;
> -
> -
> -/**
> - * Example implementation of {@link
> org.apache.samza.sql.api.operators.OperatorRouter}
> - *
> - */
> -public final class SimpleRouter implements OperatorRouter {
> -  /**
> -   * List of operators added to the {@link
> org.apache.samza.sql.api.operators.OperatorRouter}
> -   */
> -  private List<SimpleOperator> operators = new
> ArrayList<SimpleOperator>();
> -
> -  @SuppressWarnings("rawtypes")
> -  /**
> -   * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list
> of operators associated with it
> -   */
> -  private Map<EntityName, List> nextOps = new HashMap<EntityName, List>();
> -
> -  /**
> -   * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs to
> this {@code SimpleRouter}
> -   */
> -  private Set<EntityName> inputEntities = new HashSet<EntityName>();
> -
> -  /**
> -   * Set of entities that are not input entities to this {@code
> SimpleRouter}
> -   */
> -  private Set<EntityName> outputEntities = new HashSet<EntityName>();
> -
> -  @SuppressWarnings("unchecked")
> -  private void addOperator(EntityName input, SimpleOperator nextOp) {
> -    if (nextOps.get(input) == null) {
> -      nextOps.put(input, new ArrayList<Operator>());
> -    }
> -    nextOps.get(input).add(nextOp);
> -    operators.add(nextOp);
> -    // get the operator spec
> -    for (EntityName output : nextOp.getSpec().getOutputNames()) {
> -      if (inputEntities.contains(output)) {
> -        inputEntities.remove(output);
> -      }
> -      outputEntities.add(output);
> -    }
> -    if (!outputEntities.contains(input)) {
> -      inputEntities.add(input);
> -    }
> -  }
> -
> -  @Override
> -  @SuppressWarnings("unchecked")
> -  public List<SimpleOperator> getNextOperators(EntityName entity) {
> -    return nextOps.get(entity);
> -  }
> -
> -  @Override
> -  public void addOperator(SimpleOperator nextOp) {
> -    List<EntityName> inputs = nextOp.getSpec().getInputNames();
> -    for (EntityName input : inputs) {
> -      addOperator(input, nextOp);
> -    }
> -  }
> -
> -  @Override
> -  public void init(Config config, TaskContext context) throws Exception {
> -    for (SimpleOperator op : this.operators) {
> -      op.init(config, context);
> -    }
> -  }
> -
> -  @Override
> -  public void process(Tuple ituple, MessageCollector collector,
> TaskCoordinator coordinator) throws Exception {
> -    MessageCollector opCollector = new RouterMessageCollector(collector,
> coordinator, this);
> -    for (Iterator<SimpleOperator> iter =
> this.getNextOperators(ituple.getEntityName()).iterator(); iter.hasNext();) {
> -      iter.next().process(ituple, opCollector, coordinator);
> -    }
> -  }
> -
> -  @SuppressWarnings("rawtypes")
> -  @Override
> -  public void process(Relation deltaRelation, MessageCollector collector,
> TaskCoordinator coordinator) throws Exception {
> -    MessageCollector opCollector = new RouterMessageCollector(collector,
> coordinator, this);
> -    for (Iterator<SimpleOperator> iter =
> this.getNextOperators(deltaRelation.getName()).iterator(); iter.hasNext();)
> {
> -      iter.next().process(deltaRelation, opCollector, coordinator);
> -    }
> -  }
> -
> -  @Override
> -  public void refresh(long nanoSec, MessageCollector collector,
> TaskCoordinator coordinator) throws Exception {
> -    MessageCollector opCollector = new RouterMessageCollector(collector,
> coordinator, this);
> -    for (EntityName entity : inputEntities) {
> -      for (Iterator<SimpleOperator> iter =
> this.getNextOperators(entity).iterator(); iter.hasNext();) {
> -        iter.next().refresh(nanoSec, opCollector, coordinator);
> -      }
> -    }
> -  }
> -
> -}
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
> new file mode 100644
> index 0000000..62b19fc
> --- /dev/null
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
> @@ -0,0 +1,284 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +package org.apache.samza.sql.operators.factory;
> +
> +import java.util.HashMap;
> +import java.util.HashSet;
> +import java.util.Iterator;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.Set;
> +
> +import org.apache.samza.sql.api.data.EntityName;
> +import org.apache.samza.sql.api.operators.OperatorRouter;
> +import org.apache.samza.sql.api.operators.OperatorSink;
> +import org.apache.samza.sql.api.operators.OperatorSource;
> +import org.apache.samza.sql.api.operators.OperatorSpec;
> +import org.apache.samza.sql.api.operators.SimpleOperator;
> +import org.apache.samza.sql.api.operators.SqlOperatorFactory;
> +import org.apache.samza.sql.operators.OperatorTopology;
> +import org.apache.samza.sql.operators.SimpleRouter;
> +
> +
> +/**
> + * This class implements a builder to allow user to create the operators
> and connect them in a topology altogether.
> + */
> +public class TopologyBuilder {
> +
> +  /**
> +   * Internal {@link org.apache.samza.sql.api.operators.OperatorRouter}
> object to retain the topology being created
> +   */
> +  private SimpleRouter router;
> +
> +  /**
> +   * The {@link org.apache.samza.sql.api.operators.SqlOperatorFactory}
> object used to create operators connected in the topology
> +   */
> +  private final SqlOperatorFactory factory;
> +
> +  /**
> +   * The map of unbound inputs, the value is set(input_operators)
> +   */
> +  private Map<EntityName, Set<OperatorSpec>> unboundInputs = new
> HashMap<EntityName, Set<OperatorSpec>>();
> +
> +  /**
> +   * The map of unbound outputs, the value is the operator generating the
> output
> +   */
> +  private Map<EntityName, OperatorSpec> unboundOutputs = new
> HashMap<EntityName, OperatorSpec>();
> +
> +  /**
> +   * The set of entities that are intermediate entities between operators
> +   */
> +  private Set<EntityName> interStreams = new HashSet<EntityName>();
> +
> +  /**
> +   * The current operator that may have unbound input or output
> +   */
> +  private SimpleOperator currentOp = null;
> +
> +  /**
> +   * Private constructor of {@code TopologyBuilder}
> +   *
> +   * @param factory The {@link
> org.apache.samza.sql.api.operators.SqlOperatorFactory} to create operators
> +   */
> +  private TopologyBuilder(SqlOperatorFactory factory) {
> +    this.router = new SimpleRouter();
> +    this.factory = factory;
> +  }
> +
> +  /**
> +   * Static method to create this {@code TopologyBuilder} w/ a customized
> {@link org.apache.samza.sql.api.operators.SqlOperatorFactory}
> +   *
> +   * @param factory The {@link
> org.apache.samza.sql.api.operators.SqlOperatorFactory} to create operators
> +   * @return The {@code TopologyBuilder} object
> +   */
> +  public static TopologyBuilder create(SqlOperatorFactory factory) {
> +    return new TopologyBuilder(factory);
> +  }
> +
> +  /**
> +   * Static method to create this {@code TopologyBuilder}
> +   *
> +   * @return The {@code TopologyBuilder} object
> +   */
> +  public static TopologyBuilder create() {
> +    return new TopologyBuilder(new SimpleOperatorFactoryImpl());
> +  }
> +
> +  /**
> +   * Public method to create the next operator and attach it to the
> output of the current operator
> +   *
> +   * @param spec The {@link
> org.apache.samza.sql.api.operators.OperatorSpec} for the next operator
> +   * @return The updated {@code TopologyBuilder} object
> +   */
> +  public TopologyBuilder operator(OperatorSpec spec) {
> +    // check whether it is valid to connect a new operator to the current
> operator's output
> +    SimpleOperator nextOp = this.factory.getOperator(spec);
> +    return this.operator(nextOp);
> +  }
> +
> +  /**
> +   * Public method to create the next operator and attach it to the
> output of the current operator
> +   *
> +   * @param op The {@link
> org.apache.samza.sql.api.operators.SimpleOperator}
> +   * @return The updated {@code TopologyBuilder} object
> +   */
> +  public TopologyBuilder operator(SimpleOperator op) {
> +    // check whether it is valid to connect a new operator to the current
> operator's output
> +    canAddOperator(op);
> +    this.addOperator(op);
> +    // advance the current operator position
> +    this.currentOp = op;
> +    return this;
> +  }
> +
> +  /**
> +   * Public method to create a stream object that will be the source to
> other operators
> +   *
> +   * @return The {@link
> org.apache.samza.sql.api.operators.OperatorSource} that can be the source
> to other operators
> +   */
> +  public OperatorSource stream() {
> +    canCreateSource();
> +    return new
> OperatorTopology(this.unboundOutputs.keySet().iterator().next(),
> this.router);
> +  }
> +
> +  /**
> +   * Public method to create a sink object that can take input stream
> from other operators
> +   *
> +   * @return The {@link org.apache.samza.sql.api.operators.OperatorSink}
> that can be the downstream of other operators
> +   */
> +  public OperatorSink sink() {
> +    canCreateSink();
> +    return new
> OperatorTopology(this.unboundInputs.keySet().iterator().next(),
> this.router);
> +  }
> +
> +  /**
> +   * Public method to bind the input of the current operator w/ the
> {@link org.apache.samza.sql.api.operators.OperatorSource} object
> +   *
> +   * @param srcStream The {@link
> org.apache.samza.sql.api.operators.OperatorSource} that the current
> operator is going to be bound to
> +   * @return The updated {@code TopologyBuilder} object
> +   */
> +  public TopologyBuilder bind(OperatorSource srcStream) {
> +    EntityName streamName = srcStream.getName();
> +    if (this.unboundInputs.containsKey(streamName)) {
> +      this.unboundInputs.remove(streamName);
> +      this.interStreams.add(streamName);
> +    } else {
> +      // no input operator is waiting for the output from the srcStream
> +      throw new IllegalArgumentException("No operator input can be bound
> to the input stream " + streamName);
> +    }
> +    // add all operators in srcStream to this topology
> +    for (Iterator<SimpleOperator> iter = srcStream.opIterator();
> iter.hasNext();) {
> +      this.addOperator(iter.next());
> +    }
> +    return this;
> +  }
> +
> +  /**
> +   * Public method to attach a {@link
> org.apache.samza.sql.api.operators.OperatorSink} object to the output of
> the current operator
> +   *
> +   * @param nextSink The {@link
> org.apache.samza.sql.api.operators.OperatorSink} to be attached to the
> current operator's output
> +   * @return The updated {@code TopologyBuilder} object
> +   */
> +  public TopologyBuilder attach(OperatorSink nextSink) {
> +    EntityName streamName = nextSink.getName();
> +    if (this.unboundOutputs.containsKey(streamName)) {
> +      this.unboundOutputs.remove(streamName);
> +      this.interStreams.add(streamName);
> +    } else {
> +      // no unbound output to attach to
> +      throw new IllegalArgumentException("No operator output found to
> attach the sink " + streamName);
> +    }
> +    // add all operators in nextSink to the router
> +    for (Iterator<SimpleOperator> iter = nextSink.opIterator();
> iter.hasNext();) {
> +      this.addOperator(iter.next());
> +    }
> +    return this;
> +  }
> +
> +  /**
> +   * Public method to finalize the topology that should have all input
> and output bound to system input and output
> +   *
> +   * @return The finalized {@link
> org.apache.samza.sql.api.operators.OperatorRouter} object
> +   */
> +  public OperatorRouter build() {
> +    canClose();
> +    return router;
> +  }
> +
> +  private TopologyBuilder addOperator(SimpleOperator nextOp) {
> +    // if input is not in the unboundOutputs and interStreams, input is
> unbound
> +    for (EntityName in : nextOp.getSpec().getInputNames()) {
> +      if (this.unboundOutputs.containsKey(in)) {
> +        this.unboundOutputs.remove(in);
> +        this.interStreams.add(in);
> +      }
> +      if (!this.interStreams.contains(in) && !in.isSystemEntity()) {
> +        if (!this.unboundInputs.containsKey(in)) {
> +          this.unboundInputs.put(in, new HashSet<OperatorSpec>());
> +        }
> +        this.unboundInputs.get(in).add(nextOp.getSpec());
> +      }
> +    }
> +    // if output is not in the unboundInputs and interStreams, output is
> unbound
> +    for (EntityName out : nextOp.getSpec().getOutputNames()) {
> +      if (this.unboundInputs.containsKey(out)) {
> +        this.unboundInputs.remove(out);
> +        this.interStreams.add(out);
> +      }
> +      if (!this.interStreams.contains(out) && !out.isSystemEntity()) {
> +        this.unboundOutputs.put(out, nextOp.getSpec());
> +      }
> +    }
> +    try {
> +      this.router.addOperator(nextOp);
> +    } catch (Exception e) {
> +      throw new RuntimeException("Failed to add operator " +
> nextOp.getSpec().getId() + " to the topology.", e);
> +    }
> +    return this;
> +  }
> +
> +  private void canCreateSource() {
> +    if (this.unboundInputs.size() > 0) {
> +      throw new IllegalStateException("Can't create stream when there are
> unbounded input streams in the topology");
> +    }
> +    if (this.unboundOutputs.size() != 1) {
> +      throw new IllegalStateException(
> +          "Can't create stream when the number of unbounded outputs is
> not 1 in the topology");
> +    }
> +  }
> +
> +  private void canCreateSink() {
> +    if (this.unboundOutputs.size() > 0) {
> +      throw new IllegalStateException("Can't create sink when there are
> unbounded output streams in the topology");
> +    }
> +    if (this.unboundInputs.size() != 1) {
> +      throw new IllegalStateException(
> +          "Can't create sink when the number of unbounded input streams
> is not 1 in the topology");
> +    }
> +  }
> +
> +  private void canAddOperator(SimpleOperator op) {
> +    if (this.currentOp == null) {
> +      return;
> +    }
> +    for (EntityName name : this.currentOp.getSpec().getInputNames()) {
> +      if (this.unboundInputs.containsKey(name)) {
> +        throw new IllegalArgumentException("There are unbound input " +
> name + " to the current operator "
> +            + this.currentOp.getSpec().getId() + ". Create a sink or call
> bind instead");
> +      }
> +    }
> +    List<EntityName> nextInputs = op.getSpec().getInputNames();
> +    for (EntityName name : this.currentOp.getSpec().getOutputNames()) {
> +      if (!nextInputs.contains(name) &&
> this.unboundOutputs.containsKey(name)) {
> +        // the current operator's output is not in the next operator's
> input list
> +        throw new IllegalArgumentException("There are unbound output " +
> name + " from the current operator "
> +            + this.currentOp.getSpec().getId()
> +            + " that are not included in the next operator's inputs.
> Create a stream or call attach instead");
> +      }
> +    }
> +  }
> +
> +  private void canClose() {
> +    if (!this.unboundInputs.isEmpty() || !this.unboundOutputs.isEmpty()) {
> +      throw new IllegalStateException(
> +          "There are input/output streams in the topology that are not
> bounded. Can't build the topology yet.");
> +    }
> +  }
> +
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
> index 2854aeb..7f5b990 100644
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
> @@ -29,7 +29,7 @@ import org.apache.samza.sql.api.data.Relation;
>  import org.apache.samza.sql.api.data.Stream;
>  import org.apache.samza.sql.api.data.Tuple;
>  import org.apache.samza.sql.api.operators.OperatorCallback;
> -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
> +import org.apache.samza.sql.operators.SimpleOperatorImpl;
>  import org.apache.samza.sql.operators.window.BoundedTimeWindow;
>  import org.apache.samza.sql.window.storage.OrderedStoreKey;
>  import org.apache.samza.storage.kv.Entry;
> @@ -38,7 +38,6 @@ import org.apache.samza.task.TaskContext;
>  import org.apache.samza.task.TaskCoordinator;
>  import org.apache.samza.task.sql.SimpleMessageCollector;
>
> -
>  /**
>   * This class implements a simple stream-to-stream join
>   */
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
> index cc0aca0..eecff7e 100644
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
> @@ -19,10 +19,11 @@
>
>  package org.apache.samza.sql.operators.join;
>
> +import java.util.ArrayList;
>  import java.util.List;
>
>  import org.apache.samza.sql.api.data.EntityName;
> -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
> +import org.apache.samza.sql.operators.SimpleOperatorSpec;
>
>
>  /**
> @@ -35,4 +36,16 @@ public class StreamStreamJoinSpec extends
> SimpleOperatorSpec {
>      // TODO Auto-generated constructor stub
>    }
>
> +  @SuppressWarnings("serial")
> +  public StreamStreamJoinSpec(String id, List<String> inputRelations,
> String output, List<String> joinKeys) {
> +    super(id, new ArrayList<EntityName>() {
> +      {
> +        for (String input : inputRelations) {
> +          add(EntityName.getStreamName(input));
> +        }
> +      }
> +    }, EntityName.getStreamName(output));
> +    // TODO Auto-generated constructor stub
> +  }
> +
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
> index b93d789..0cba39a 100644
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
> @@ -23,7 +23,7 @@ import org.apache.samza.config.Config;
>  import org.apache.samza.sql.api.data.Relation;
>  import org.apache.samza.sql.api.data.Tuple;
>  import org.apache.samza.sql.api.operators.OperatorCallback;
> -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
> +import org.apache.samza.sql.operators.SimpleOperatorImpl;
>  import org.apache.samza.storage.kv.Entry;
>  import org.apache.samza.storage.kv.KeyValueIterator;
>  import org.apache.samza.system.OutgoingMessageEnvelope;
> @@ -32,7 +32,6 @@ import org.apache.samza.task.TaskContext;
>  import org.apache.samza.task.TaskCoordinator;
>  import org.apache.samza.task.sql.SimpleMessageCollector;
>
> -
>  /**
>   * This is an example build-in operator that performs a simple stream
> re-partition operation.
>   *
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
> index c47eed9..e494bff 100644
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
> @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.partition;
>
>  import org.apache.samza.sql.api.data.EntityName;
>  import org.apache.samza.sql.api.operators.OperatorSpec;
> -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
> +import org.apache.samza.sql.operators.SimpleOperatorSpec;
>  import org.apache.samza.system.SystemStream;
>
>
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
> index d81cc93..a9a83b5 100644
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
> @@ -27,13 +27,12 @@ import org.apache.samza.sql.api.data.EntityName;
>  import org.apache.samza.sql.api.data.Relation;
>  import org.apache.samza.sql.api.data.Tuple;
>  import org.apache.samza.sql.api.operators.OperatorCallback;
> -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
> +import org.apache.samza.sql.operators.SimpleOperatorImpl;
>  import org.apache.samza.storage.kv.KeyValueIterator;
>  import org.apache.samza.task.TaskContext;
>  import org.apache.samza.task.TaskCoordinator;
>  import org.apache.samza.task.sql.SimpleMessageCollector;
>
> -
>  /**
>   * This class defines an example build-in operator for a fixed size
> window operator that converts a stream to a relation
>   *
> @@ -86,6 +85,7 @@ public class BoundedTimeWindow extends
> SimpleOperatorImpl {
>     * @param lengthSec The window size in seconds
>     * @param input The input stream name
>     * @param output The output relation name
> +   * @param callback The user callback object
>     */
>    public BoundedTimeWindow(String wndId, int lengthSec, String input,
> String output, OperatorCallback callback) {
>      super(new WindowSpec(wndId, EntityName.getStreamName(input),
> EntityName.getStreamName(output), lengthSec), callback);
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
> index eec32ea..6c4eba8 100644
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
> @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.window;
>
>  import org.apache.samza.sql.api.data.EntityName;
>  import org.apache.samza.sql.api.operators.OperatorSpec;
> -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
> +import org.apache.samza.sql.operators.SimpleOperatorSpec;
>
>
>  /**
> @@ -47,6 +47,11 @@ public class WindowSpec extends SimpleOperatorSpec
> implements OperatorSpec {
>      this.wndSizeSec = lengthSec;
>    }
>
> +  public WindowSpec(String id, int wndSize, String input) {
> +    super(id, EntityName.getStreamName(input), null);
> +    this.wndSizeSec = wndSize;
> +  }
> +
>    /**
>     * Method to get the window state relation name
>     *
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
> b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
> index b29838a..6950f67 100644
> ---
> a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
> +++
> b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
> @@ -22,7 +22,7 @@ package org.apache.samza.task.sql;
>  import org.apache.samza.sql.api.data.Relation;
>  import org.apache.samza.sql.api.data.Tuple;
>  import org.apache.samza.sql.api.operators.OperatorCallback;
> -import org.apache.samza.sql.operators.factory.NoopOperatorCallback;
> +import org.apache.samza.sql.operators.NoopOperatorCallback;
>  import org.apache.samza.storage.kv.Entry;
>  import org.apache.samza.storage.kv.KeyValueIterator;
>  import org.apache.samza.system.OutgoingMessageEnvelope;
> @@ -57,25 +57,38 @@ public class SimpleMessageCollector implements
> MessageCollector {
>     * @param coordinator The {@link org.apache.samza.task.TaskCoordinator}
> in the context
>     */
>    public SimpleMessageCollector(MessageCollector collector,
> TaskCoordinator coordinator) {
> -    this.collector = collector;
> -    this.coordinator = coordinator;
> +    this(collector, coordinator, new NoopOperatorCallback());
>    }
>
>    /**
>     * This method swaps the {@code callback} with the new one
>     *
> -   * <p> This method allows the {@link
> org.apache.samza.sql.api.operators.SimpleOperator} to be swapped when the
> collector
> -   * is passed down into the next operator's context. Hence, under the
> new operator's context, the correct {@link
> org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Relation,
> MessageCollector, TaskCoordinator)},
> -   * and {@link
> org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Tuple,
> MessageCollector, TaskCoordinator)} can be invoked
> +   * <p> This method allows the {@link
> org.apache.samza.sql.api.operators.OperatorCallback} to be swapped when the
> collector
> +   * is passed down into the next operator's context. Hence, under the
> new operator's context, the correct callback functions can be invoked
>     *
>     * @param callback The new {@link
> org.apache.samza.sql.api.operators.OperatorCallback} to be set
>     */
> -  public void switchOperatorCallback(OperatorCallback callback) {
> -    this.callback = callback;
> +  public void switchCallback(OperatorCallback callback) {
> +    if (callback == null) {
> +      this.callback = new NoopOperatorCallback();
> +    } else {
> +      this.callback = callback;
> +    }
> +  }
> +
> +  /**
> +   * Method is declared to be final s.t. we enforce that the callback
> functions are called first
> +   */
> +  @Override
> +  final public void send(OutgoingMessageEnvelope envelope) {
> +    this.collector.send(envelope);
>    }
>
>    /**
>     * Method is declared to be final s.t. we enforce that the callback
> functions are called first
> +   *
> +   * @param deltaRelation The relation to be sent out
> +   * @throws Exception Throws exception if failed to send
>     */
>    final public void send(Relation deltaRelation) throws Exception {
>      Relation rel = this.callback.afterProcess(deltaRelation, collector,
> coordinator);
> @@ -87,6 +100,9 @@ public class SimpleMessageCollector implements
> MessageCollector {
>
>    /**
>     * Method is declared to be final s.t. we enforce that the callback
> functions are called first
> +   *
> +   * @param tuple The tuple to be sent out
> +   * @throws Exception Throws exception if failed to send
>     */
>    final public void send(Tuple tuple) throws Exception {
>      Tuple otuple = this.callback.afterProcess(tuple, collector,
> coordinator);
> @@ -106,9 +122,4 @@ public class SimpleMessageCollector implements
> MessageCollector {
>    protected void realSend(Tuple tuple) throws Exception {
>      this.collector.send((OutgoingMessageEnvelope) tuple.getMessage());
>    }
> -
> -  @Override
> -  public void send(OutgoingMessageEnvelope envelope) {
> -    this.collector.send(envelope);
> -  }
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
> b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
> index 20dc701..7370af6 100644
> ---
> a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
> +++
> b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
> @@ -22,6 +22,7 @@ package org.apache.samza.task.sql;
>  import org.apache.samza.config.Config;
>  import org.apache.samza.sql.api.data.Relation;
>  import org.apache.samza.sql.api.data.Tuple;
> +import org.apache.samza.sql.api.operators.Operator;
>  import org.apache.samza.sql.api.operators.OperatorCallback;
>  import org.apache.samza.sql.data.IncomingMessageTuple;
>  import org.apache.samza.sql.operators.window.BoundedTimeWindow;
> @@ -39,7 +40,7 @@ import org.apache.samza.task.WindowableTask;
>   *
>   */
>  public class RandomWindowOperatorTask implements StreamTask,
> InitableTask, WindowableTask {
> -  private BoundedTimeWindow wndOp;
> +  private Operator operator;
>
>    private final OperatorCallback wndCallback = new OperatorCallback() {
>
> @@ -77,20 +78,20 @@ public class RandomWindowOperatorTask implements
> StreamTask, InitableTask, Windo
>    public void process(IncomingMessageEnvelope envelope, MessageCollector
> collector, TaskCoordinator coordinator)
>        throws Exception {
>      // based on tuple's stream name, get the window op and run process()
> -    wndOp.process(new IncomingMessageTuple(envelope), collector,
> coordinator);
> +    operator.process(new IncomingMessageTuple(envelope), collector,
> coordinator);
>
>    }
>
>    @Override
>    public void window(MessageCollector collector, TaskCoordinator
> coordinator) throws Exception {
>      // based on tuple's stream name, get the window op and run process()
> -    wndOp.refresh(System.nanoTime(), collector, coordinator);
> +    operator.refresh(System.nanoTime(), collector, coordinator);
>    }
>
>    @Override
>    public void init(Config config, TaskContext context) throws Exception {
>      // 1. create a fixed length 10 sec window operator
> -    this.wndOp = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1",
> "relation1", this.wndCallback);
> -    this.wndOp.init(config, context);
> +    this.operator = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1",
> "wndOutput", this.wndCallback);
> +    this.operator.init(config, context);
>    }
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
> ----------------------------------------------------------------------
> diff --git
> a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
> b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
> index 9124e3c..d65892c 100644
> ---
> a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
> +++
> b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
> @@ -24,7 +24,7 @@ import java.util.List;
>
>  import org.apache.samza.config.Config;
>  import org.apache.samza.sql.data.IncomingMessageTuple;
> -import org.apache.samza.sql.operators.factory.SimpleRouter;
> +import org.apache.samza.sql.operators.SimpleRouter;
>  import org.apache.samza.sql.operators.join.StreamStreamJoin;
>  import org.apache.samza.sql.operators.partition.PartitionOp;
>  import org.apache.samza.sql.operators.window.BoundedTimeWindow;
> @@ -51,25 +51,25 @@ import org.apache.samza.task.WindowableTask;
>   */
>  public class StreamSqlTask implements StreamTask, InitableTask,
> WindowableTask {
>
> -  private SimpleRouter rteCntx;
> +  private SimpleRouter router;
>
>    @Override
>    public void process(IncomingMessageEnvelope envelope, MessageCollector
> collector, TaskCoordinator coordinator)
>        throws Exception {
> -    this.rteCntx.process(new IncomingMessageTuple(envelope), collector,
> coordinator);
> +    this.router.process(new IncomingMessageTuple(envelope), collector,
> coordinator);
>    }
>
>    @Override
>    public void window(MessageCollector collector, TaskCoordinator
> coordinator) throws Exception {
> -    this.rteCntx.refresh(System.nanoTime(), collector, coordinator);
> +    this.router.refresh(System.nanoTime(), collector, coordinator);
>    }
>
>    @Override
>    public void init(Config config, TaskContext context) throws Exception {
>      // create all operators via the operator factory
>      // 1. create two window operators
> -    BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10,
> "inputStream1", "fixedWndOutput1");
> -    BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10,
> "inputStream2", "fixedWndOutput2");
> +    BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10,
> "kafka:inputStream1", "fixedWndOutput1");
> +    BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10,
> "kafka:inputStream2", "fixedWndOutput2");
>      // 2. create one join operator
>      @SuppressWarnings("serial")
>      List<String> inputRelations = new ArrayList<String>() {
> @@ -86,19 +86,19 @@ public class StreamSqlTask implements StreamTask,
> InitableTask, WindowableTask {
>        }
>      };
>      StreamStreamJoin join = new StreamStreamJoin("joinOp",
> inputRelations, "joinOutput", joinKeys);
> -    // 4. create a re-partition operator
> +    // 3. create a re-partition operator
>      PartitionOp par = new PartitionOp("parOp1", "joinOutput", "kafka",
> "parOutputStrm1", "joinKey", 50);
>
>      // Now, connecting the operators via the OperatorRouter
> -    this.rteCntx = new SimpleRouter();
> +    this.router = new SimpleRouter();
>      // 1. set two system input operators (i.e. two window operators)
> -    this.rteCntx.addOperator(wnd1);
> -    this.rteCntx.addOperator(wnd2);
> +    this.router.addOperator(wnd1);
> +    this.router.addOperator(wnd2);
>      // 2. connect join operator to both window operators
> -    this.rteCntx.addOperator(join);
> +    this.router.addOperator(join);
>      // 3. connect re-partition operator to the stream operator
> -    this.rteCntx.addOperator(par);
> +    this.router.addOperator(par);
>
> -    this.rteCntx.init(config, context);
> +    this.router.init(config, context);
>    }
>  }
>
>


-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Reply via email to