[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r88844164
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Project
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.ProjectableTableSource
+
+import scala.collection.JavaConverters._
+
+class BatchTableProject(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType)
+  extends Project(cluster, traits, input, projects, projectionRowType)
+  with DataSetRel {
+
+  override def copy(traitSet: RelTraitSet, input: RelNode,
+projects: util.List[RexNode], scanRawType: RelDataType): Project = {
+new BatchTableProject(cluster, traitSet, input, projects, 
projectionRowType)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: BatchTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val projectableSource = 
getInput.getTable.unwrap(classOf[TableSourceTable])
+  .tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
+projectableSource.setProjection(indexes)
+
+getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
--- End diff --

You are right. But if the `setProjection`  return a copy TableSource, we 
should translate TableSource to DataStream and convert to expected type in this 
method. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15682842#comment-15682842
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r88844164
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Project
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.ProjectableTableSource
+
+import scala.collection.JavaConverters._
+
+class BatchTableProject(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType)
+  extends Project(cluster, traits, input, projects, projectionRowType)
+  with DataSetRel {
+
+  override def copy(traitSet: RelTraitSet, input: RelNode,
+projects: util.List[RexNode], scanRawType: RelDataType): Project = {
+new BatchTableProject(cluster, traitSet, input, projects, 
projectionRowType)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: BatchTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val projectableSource = 
getInput.getTable.unwrap(classOf[TableSourceTable])
+  .tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
+projectableSource.setProjection(indexes)
+
+getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
--- End diff --

You are right. But if the `setProjection`  return a copy TableSource, we 
should translate TableSource to DataStream and convert to expected type in this 
method. 


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5105) Improve ReduceState: value put into ReducingState should always be a copy

2016-11-21 Thread sunjincheng (JIRA)
sunjincheng created FLINK-5105:
--

 Summary: Improve ReduceState: value put into ReducingState should 
always be a copy
 Key: FLINK-5105
 URL: https://issues.apache.org/jira/browse/FLINK-5105
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: sunjincheng


 In case of overlapping sliding windows, multiple references are hold on the 
same object. If we modify value1 or value2 the results are incorrect.  The 
value that is put into a ReducingState is always copied. That would allow to 
modify and emit one of the two input values (the one which is comes from the 
state).
The FoldingState has the same problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5106) improving IncrementalAggregateReduceFunction

2016-11-21 Thread sunjincheng (JIRA)
sunjincheng created FLINK-5106:
--

 Summary: improving IncrementalAggregateReduceFunction
 Key: FLINK-5106
 URL: https://issues.apache.org/jira/browse/FLINK-5106
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: sunjincheng
Assignee: sunjincheng


Please refer to FLINK-4937.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88847319
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

Okay,I have created 
[FLINK-5105](https://issues.apache.org/jira/browse/FLINK-5105) and 
[FLINK-5106](https://issues.apache.org/jira/browse/FLINK-5106) .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15682887#comment-15682887
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88847319
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

Okay,I have created 
[FLINK-5105](https://issues.apache.org/jira/browse/FLINK-5105) and 
[FLINK-5106](https://issues.apache.org/jira/browse/FLINK-5106) .


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4741) WebRuntimeMonitor does not shut down all of it's threads (EventLoopGroups) on exit.

2016-11-21 Thread Roman Maier (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Maier reassigned FLINK-4741:
--

Assignee: Roman Maier

> WebRuntimeMonitor does not shut down all of it's threads (EventLoopGroups) on 
> exit.
> ---
>
> Key: FLINK-4741
> URL: https://issues.apache.org/jira/browse/FLINK-4741
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.1.2
>Reporter: Joseph Sims
>Assignee: Roman Maier
>Priority: Minor
>
> WebRuntimeMonitor does not shutdown correctly, causing the overall 
> application to hang on shutdown. It shuts down bootstrap.group 
> (EventLoopGroup) but not the bootstrap.childGroup (EventLoopGroup).
> If WebRuntimeMonitor is not used (local.start-webserver=false), this problem 
> does not occur.
> Class: WebRuntimeMonitor
> method: stop()
> Line: ~387
> Called:
> bootstrap.group().shutdownGracefully()
> Not called:
> bootstrap.childGroup().shutdownGracefully()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-21 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r88849756
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Project
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.ProjectableTableSource
+
+import scala.collection.JavaConverters._
+
+class BatchTableProject(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType)
+  extends Project(cluster, traits, input, projects, projectionRowType)
+  with DataSetRel {
+
+  override def copy(traitSet: RelTraitSet, input: RelNode,
+projects: util.List[RexNode], scanRawType: RelDataType): Project = {
+new BatchTableProject(cluster, traitSet, input, projects, 
projectionRowType)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: BatchTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val projectableSource = 
getInput.getTable.unwrap(classOf[TableSourceTable])
+  .tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
+projectableSource.setProjection(indexes)
+
+getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
--- End diff --

So it will turns into scan and will takes first step in logic plan, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15682921#comment-15682921
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r88849756
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Project
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.ProjectableTableSource
+
+import scala.collection.JavaConverters._
+
+class BatchTableProject(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType)
+  extends Project(cluster, traits, input, projects, projectionRowType)
+  with DataSetRel {
+
+  override def copy(traitSet: RelTraitSet, input: RelNode,
+projects: util.List[RexNode], scanRawType: RelDataType): Project = {
+new BatchTableProject(cluster, traitSet, input, projects, 
projectionRowType)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: BatchTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val projectableSource = 
getInput.getTable.unwrap(classOf[TableSourceTable])
+  .tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
+projectableSource.setProjection(indexes)
+
+getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
--- End diff --

So it will turns into scan and will takes first step in logic plan, right?


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-21 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15682939#comment-15682939
 ] 

Anton Mushin commented on FLINK-4832:
-

Hello everyone,
I'm update implementation according to changes in FLINK-4263.
Could somebody check 
[changes|https://github.com/apache/flink/compare/master...ex00:FLINK-4832] 
please, is correct idea ?


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5031) Consecutive DataStream.split() ignored

2016-11-21 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15682957#comment-15682957
 ] 

Fabian Hueske commented on FLINK-5031:
--

Hi [~RenkaiGe], thanks for looking into this issue!
I'm not very familiar with the DataStream API internals, but I doubt, that a 
union of all consecutively applied selects is intended. I'd rather think it 
should be the intersection.
[~aljoscha], what do you think?

> Consecutive DataStream.split() ignored
> --
>
> Key: FLINK-5031
> URL: https://issues.apache.org/jira/browse/FLINK-5031
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
>Assignee: Renkai Ge
> Fix For: 1.2.0
>
>
> The output of the following program 
> {code}
> static final class ThresholdSelector implements OutputSelector {
>   long threshold;
>   public ThresholdSelector(long threshold) {
>   this.threshold = threshold;
>   }
>   @Override
>   public Iterable select(Long value) {
>   if (value < threshold) {
>   return Collections.singletonList("Less");
>   } else {
>   return Collections.singletonList("GreaterEqual");
>   }
>   }
> }
> public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(1);
>   SplitStream split1 = env.generateSequence(1, 11)
>   .split(new ThresholdSelector(6));
>   // stream11 should be [1,2,3,4,5]
>   DataStream stream11 = split1.select("Less");
>   SplitStream split2 = stream11
> //.map(new MapFunction() {
> //@Override
> //public Long map(Long value) throws Exception {
> //return value;
> //}
> //})
>   .split(new ThresholdSelector(3));
>   DataStream stream21 = split2.select("Less");
>   // stream21 should be [1,2]
>   stream21.print();
>   env.execute();
> }
> {code}
> should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second 
> {{split}} operation is ignored.
> The program is correctly evaluate if the identity {{MapFunction}} is added to 
> the program.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683011#comment-15683011
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88856847
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

@wuchong, yes this is a problem with the `HeapStateBackend`. The RocksDB 
backend does not suffer from this problem. I think in the long run we should 
migrate the `HeapStateBackend` to always keep data in serialised form, then we 
also won't have this problem anymore.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88856847
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

@wuchong, yes this is a problem with the `HeapStateBackend`. The RocksDB 
backend does not suffer from this problem. I think in the long run we should 
migrate the `HeapStateBackend` to always keep data in serialised form, then we 
also won't have this problem anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-21 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683017#comment-15683017
 ] 

Fabian Hueske commented on FLINK-4832:
--

The  overall approach looks good, IMO. 
Can you open a pull request?

Thanks, Fabian

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts

2016-11-21 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-5107:
-

 Summary: Job Manager goes out of memory from long history of prior 
execution attempts
 Key: FLINK-5107
 URL: https://issues.apache.org/jira/browse/FLINK-5107
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Reporter: Stefan Richter
Assignee: Stefan Richter


We have observed that the job manager can run out of memory during long running 
jobs with many vertexes. Analysis of the heap dump shows, that the ever-growing 
history of prior execution attempts is the culprit for this problem.

We should limit this history to a number of n most recent attempts. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started

2016-11-21 Thread Andrew Efimov (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683063#comment-15683063
 ] 

Andrew Efimov commented on FLINK-4905:
--

I would suggest the following solution:
- not only set null in finally block of {{Kafka08Fetcher}} 
{{this.zookeeperOffsetHandler = null;}}, also set flag {{volatile closed}} for 
{{ZookeeperOffsetHandler}}. Threads will check the flag before call methods 
{{ZookeeperOffsetHandler.setOffsetInZooKeeper}} or 
{{ZookeeperOffsetHandler.getOffsetFromZooKeeper}}
- and create atomic thread counter for ZookeeperOffsetHandler and perform close 
only if counter = 0, with timeout of cause 
or use {{CheckpointLock}} that is in context

Team, what do you think?

> Kafka test instability IllegalStateException: Client is not started
> ---
>
> Key: FLINK-4905
> URL: https://issues.apache.org/jira/browse/FLINK-4905
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>  Labels: test-stability
>
> The following travis build 
> (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt)  
> failed because of this error
> {code}
> 08:17:11,239 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to 
> commit offsets to Kafka) changed to FAILING.
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Client is not started
>   at 
> org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
>   at 
> org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148)
>   at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
>   at 
> org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141)
>   at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571)
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035)
>   ... 5 more
> 08:17:11,241 INFO  org.apache.flink.runtime.taskmanager.Task  
>- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: 
> Unnamed (1/3)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88861382
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

I think a lot of people would use it for production use cases. (Maybe 
you're confusing this with the old distinction between `MemStateBackend` and 
`FsStateBackend`? Internally, both memory and file backend now use a 
`HeapStateBackend` but for the file backend the contents are checkpointed to a 
FileSystem.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88861427
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

@fhueske But you're right, we could enforce a copy there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683076#comment-15683076
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88861427
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

@fhueske But you're right, we could enforce a copy there.


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683074#comment-15683074
 ] 

ASF GitHub Bot commented on FLINK-4937:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88861382
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

I think a lot of people would use it for production use cases. (Maybe 
you're confusing this with the old distinction between `MemStateBackend` and 
`FsStateBackend`? Internally, both memory and file backend now use a 
`HeapStateBackend` but for the file backend the contents are checkpointed to a 
FileSystem.)


> Add incremental group window aggregation for streaming Table API
> 
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts

2016-11-21 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683080#comment-15683080
 ] 

Chesnay Schepler commented on FLINK-5107:
-

another solution would be to incrementally archive the finished executions into 
files.

> Job Manager goes out of memory from long history of prior execution attempts
> 
>
> Key: FLINK-5107
> URL: https://issues.apache.org/jira/browse/FLINK-5107
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> We have observed that the job manager can run out of memory during long 
> running jobs with many vertexes. Analysis of the heap dump shows, that the 
> ever-growing history of prior execution attempts is the culprit for this 
> problem.
> We should limit this history to a number of n most recent attempts. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup

2016-11-21 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683082#comment-15683082
 ] 

Maximilian Michels commented on FLINK-5081:
---

I've had a second look. The issue is not that the configuration is not loaded. 
Moreover, your finding reveals at least two other issues with our per-job YARN 
implementation:

1. When executing in non-detached job submission mode, the "Client Shutdown 
Hook" shuts down the Yarn application in case of job failures (e.g. TaskManager 
dies). We should remove the shutdown hook. It should only be active during 
deployment.

2. The per-job Yarn application is supposed to automatically shut down the 
cluster after job completion. In case of failures (e.g. TaskManager dies) the 
shutdown apparently is performed as well although it shouldn't.

> unable to set yarn.maximum-failed-containers with flink one-time YARN setup
> ---
>
> Key: FLINK-5081
> URL: https://issues.apache.org/jira/browse/FLINK-5081
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.1.4
>Reporter: Nico Kruber
>
> When letting flink setup YARN for a one-time job, it apparently does not 
> deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the 
> {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as 
> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn
>  suggested also does not work.
> example:
> {code:none}
> flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 
> -Dyarn.maximum-failed-containers=100
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup

2016-11-21 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-5081:
-

Assignee: Maximilian Michels

> unable to set yarn.maximum-failed-containers with flink one-time YARN setup
> ---
>
> Key: FLINK-5081
> URL: https://issues.apache.org/jira/browse/FLINK-5081
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.1.4
>Reporter: Nico Kruber
>Assignee: Maximilian Michels
>
> When letting flink setup YARN for a one-time job, it apparently does not 
> deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the 
> {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as 
> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn
>  suggested also does not work.
> example:
> {code:none}
> flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 
> -Dyarn.maximum-failed-containers=100
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup

2016-11-21 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-5081:
--
Affects Version/s: 1.2.0

> unable to set yarn.maximum-failed-containers with flink one-time YARN setup
> ---
>
> Key: FLINK-5081
> URL: https://issues.apache.org/jira/browse/FLINK-5081
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Nico Kruber
>Assignee: Maximilian Michels
> Fix For: 1.2.0, 1.1.4
>
>
> When letting flink setup YARN for a one-time job, it apparently does not 
> deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the 
> {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as 
> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn
>  suggested also does not work.
> example:
> {code:none}
> flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 
> -Dyarn.maximum-failed-containers=100
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup

2016-11-21 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-5081:
--
Fix Version/s: 1.1.4
   1.2.0

> unable to set yarn.maximum-failed-containers with flink one-time YARN setup
> ---
>
> Key: FLINK-5081
> URL: https://issues.apache.org/jira/browse/FLINK-5081
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Nico Kruber
>Assignee: Maximilian Michels
> Fix For: 1.2.0, 1.1.4
>
>
> When letting flink setup YARN for a one-time job, it apparently does not 
> deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the 
> {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as 
> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn
>  suggested also does not work.
> example:
> {code:none}
> flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 
> -Dyarn.maximum-failed-containers=100
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2837: [FLINK-5107] Introduced limit for prior execution ...

2016-11-21 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2837

[FLINK-5107] Introduced limit for prior execution attempt history

This PR addresses the problem of JobManager going out of memory for a large 
history of prior execution attempts by pruning the history in FIFO fashion, 
only keeping a limited history size.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink limit-prior-executions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2837.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2837


commit f9053060fe4d396e9a8917ce8d422b0ebbcc044f
Author: Stefan Richter 
Date:   2016-11-18T18:07:56Z

[FLINK-5107] Introduced limit for prior execution attempt history




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683092#comment-15683092
 ] 

ASF GitHub Bot commented on FLINK-5107:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2837

[FLINK-5107] Introduced limit for prior execution attempt history

This PR addresses the problem of JobManager going out of memory for a large 
history of prior execution attempts by pruning the history in FIFO fashion, 
only keeping a limited history size.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink limit-prior-executions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2837.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2837


commit f9053060fe4d396e9a8917ce8d422b0ebbcc044f
Author: Stefan Richter 
Date:   2016-11-18T18:07:56Z

[FLINK-5107] Introduced limit for prior execution attempt history




> Job Manager goes out of memory from long history of prior execution attempts
> 
>
> Key: FLINK-5107
> URL: https://issues.apache.org/jira/browse/FLINK-5107
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> We have observed that the job manager can run out of memory during long 
> running jobs with many vertexes. Analysis of the heap dump shows, that the 
> ever-growing history of prior execution attempts is the culprit for this 
> problem.
> We should limit this history to a number of n most recent attempts. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts

2016-11-21 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683096#comment-15683096
 ] 

Stefan Richter commented on FLINK-5107:
---

agreed, it could be a next step to provide this alternative to pruning the 
history.

> Job Manager goes out of memory from long history of prior execution attempts
> 
>
> Key: FLINK-5107
> URL: https://issues.apache.org/jira/browse/FLINK-5107
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> We have observed that the job manager can run out of memory during long 
> running jobs with many vertexes. Analysis of the heap dump shows, that the 
> ever-growing history of prior execution attempts is the culprit for this 
> problem.
> We should limit this history to a number of n most recent attempts. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores

2016-11-21 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683099#comment-15683099
 ] 

Maximilian Michels commented on FLINK-5071:
---

Is this 1.1.3 or 1.2-SNAPSHOT? Could you post some more details?

> YARN: yarn.containers.vcores config not respected when checking for vcores
> --
>
> Key: FLINK-5071
> URL: https://issues.apache.org/jira/browse/FLINK-5071
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Gyula Fora
> Fix For: 1.1.3
>
>
> The YarnClient validates whether the number of task slots is less then the 
> max vcores settings of yarn but seems to ignore the yarn.containers.vcores 
> flink config which should be used instead of the slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2837: [FLINK-5107] Introduced limit for prior execution attempt...

2016-11-21 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2837
  
R @uce or anybody else interested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683101#comment-15683101
 ] 

ASF GitHub Bot commented on FLINK-5107:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2837
  
R @uce or anybody else interested.


> Job Manager goes out of memory from long history of prior execution attempts
> 
>
> Key: FLINK-5107
> URL: https://issues.apache.org/jira/browse/FLINK-5107
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> We have observed that the job manager can run out of memory during long 
> running jobs with many vertexes. Analysis of the heap dump shows, that the 
> ever-growing history of prior execution attempts is the culprit for this 
> problem.
> We should limit this history to a number of n most recent attempts. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5108) Remove ClientShutdownHook during job execution

2016-11-21 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-5108:
-

 Summary: Remove ClientShutdownHook during job execution
 Key: FLINK-5108
 URL: https://issues.apache.org/jira/browse/FLINK-5108
 Project: Flink
  Issue Type: Bug
  Components: YARN Client
Affects Versions: 1.1.3, 1.2.0
Reporter: Maximilian Michels


The behavior of the Standalone mode is to not react to client interrupts once a 
job has been deployed. We should change the Yarn client implementation to 
behave the same. This avoids accidental shutdown of the job, e.g. when the user 
sends an interrupt via CTRL-C or when the client machine shuts down.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores

2016-11-21 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683135#comment-15683135
 ] 

Gyula Fora commented on FLINK-5071:
---

Both actually.

The problem is here:
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L308

which only compares the allowed number of vcores (in yarn config) with the 
number of slots, assuming that requested vcores == numslots

> YARN: yarn.containers.vcores config not respected when checking for vcores
> --
>
> Key: FLINK-5071
> URL: https://issues.apache.org/jira/browse/FLINK-5071
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Gyula Fora
> Fix For: 1.1.3
>
>
> The YarnClient validates whether the number of task slots is less then the 
> max vcores settings of yarn but seems to ignore the yarn.containers.vcores 
> flink config which should be used instead of the slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup

2016-11-21 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683082#comment-15683082
 ] 

Maximilian Michels edited comment on FLINK-5081 at 11/21/16 10:36 AM:
--

I've had a second look. -The issue is not that the configuration is not loaded. 
Moreover, your finding reveals at least two other issues with our per-job YARN 
implementation:-

-1. When executing in non-detached job submission mode, the "Client Shutdown 
Hook" shuts down the Yarn application in case of job failures (e.g. TaskManager 
dies). We should remove the shutdown hook. It should only be active during 
deployment.-

-2. The per-job Yarn application is supposed to automatically shut down the 
cluster after job completion. In case of failures (e.g. TaskManager dies) the 
shutdown apparently is performed as well although it shouldn't.-

edit: 

1) is not an issue since it only shuts down when it reaches a terminal state.
2) Is an issue but unrelated to this issue

The actual issue here is that the JobManager informs the client of the failed 
job and the client shuts down the cluster. We should differentiate between 
fatal and non-fatal failures in the client.


was (Author: mxm):
I've had a second look. The issue is not that the configuration is not loaded. 
Moreover, your finding reveals at least two other issues with our per-job YARN 
implementation:

1. When executing in non-detached job submission mode, the "Client Shutdown 
Hook" shuts down the Yarn application in case of job failures (e.g. TaskManager 
dies). We should remove the shutdown hook. It should only be active during 
deployment.

2. The per-job Yarn application is supposed to automatically shut down the 
cluster after job completion. In case of failures (e.g. TaskManager dies) the 
shutdown apparently is performed as well although it shouldn't.

> unable to set yarn.maximum-failed-containers with flink one-time YARN setup
> ---
>
> Key: FLINK-5081
> URL: https://issues.apache.org/jira/browse/FLINK-5081
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Nico Kruber
>Assignee: Maximilian Michels
> Fix For: 1.2.0, 1.1.4
>
>
> When letting flink setup YARN for a one-time job, it apparently does not 
> deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the 
> {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as 
> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn
>  suggested also does not work.
> example:
> {code:none}
> flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 
> -Dyarn.maximum-failed-containers=100
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...

2016-11-21 Thread gaborhermann
GitHub user gaborhermann opened a pull request:

https://github.com/apache/flink/pull/2838

[FLINK-4712] [FLINK-4713] [ml] Ranking recommendation & evaluation (WIP)

Please note that this is a work-in-progress PR for discussing API design 
decisions. We propose here a class hierarchy for fitting the ranking 
evaluations into the proposed evaluation framework (see 
[PR](https://github.com/apache/flink/pull/1849)).
The features are mostly working, but documentation is missing and minor 
refactoring is needed. The evaluations currently work with top 100 rankings 
(burnt-in), and we still need to fix that. We need feedback for two main 
solutions, so we can go on with the PR. Thanks for any comment!

### `RankingPredictor`

We have managed to rework the evaluation framework proposed by @thvasilo, 
so that ranking predictions would fit in. Our approach is to use separate 
`RankingPredictor` and `Predictor` traits. One main problem however remains: 
there is no common superclass for `RankingPredictor` and `Predictor` so the 
pipelining mechanism might not work. A `Predictor` can only be at the and of 
the pipeline, so this should not really be a problem, but I do not know for 
sure. An alternative solution would be to have different objects `ALS` and 
`RankingALS` that give different predictions, but both extends only a 
`Predictor`. There could be implicit conversions between the two. I would 
prefer the current solution if it does not break the pipelining. @thvasilo what 
do you think about this?

(This seems to be a problem similar to having a `predict_proba` function in 
scikit learn classification models, where the same model for the same input 
gives two different predictions: a `predict` for discrete predictions and 
`predict_proba` for giving a probability.)

### Generalizing `EvalutateDataSetOperation`

On the other hand, we seem to have solved the scoring issue. The users can 
evaluate a recommendation algorithm such as ALS by using a score operating on 
rankings (e.g. nDCG), or a score operating on ratings (e.g. RMSE). They only 
need to modify the `Score` they use in their code, and nothing else.

The main problem was that the evaluate method and 
`EvaluateDataSetOperation` were not general enough. They prepare the evaluation 
to `(trueValue, predictedValue)` pairs (i.e. a `DataSet[(PredictionType, 
PredictionType)]`), while ranking evaluations needed a more general input with 
the true ratings (`DataSet[(Int,Int,Double)]`) and the predicted rankings 
(`DataSet[(Int,Int,Int)]`).

Instead of using `EvaluateDataSetOperation` we use a more general 
`PrepareOperation`. We rename the `Score` in the original evaluation framework 
to `PairwiseScore`. `RankingScore` and `PairwiseScore` has a common trait 
`Score`. This way the user can use both a `RankingScore` and a `PairwiseScore` 
for a certain model, and only need to alter the score used in the code.

In case of pairwise scores (that only need true and predicted value pairs 
for evaluation) `EvaluateDataSetOperation` is used as a `PrepareOperation`. It 
prepares the evaluation by creating `(trueValue, predicitedValue)` pairs from 
the test dataset. Thus, the result of preparing and the input of 
`PairwiseScore`s will be `DataSet[(PredictionType,PredictionType)]`. In case of 
rankings the `PrepareOperation` passes the test dataset and creates the 
rankings. The result of preparing and the input of `RankingScore`s will be 
`(DataSet[Int,Int,Double], DataSet[Int,Int,Int])`. I believe this is a fairly 
acceptable solution that avoids breaking the API.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaborhermann/flink ranking-rec-eval

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2838.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2838






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683167#comment-15683167
 ] 

ASF GitHub Bot commented on FLINK-4712:
---

GitHub user gaborhermann opened a pull request:

https://github.com/apache/flink/pull/2838

[FLINK-4712] [FLINK-4713] [ml] Ranking recommendation & evaluation (WIP)

Please note that this is a work-in-progress PR for discussing API design 
decisions. We propose here a class hierarchy for fitting the ranking 
evaluations into the proposed evaluation framework (see 
[PR](https://github.com/apache/flink/pull/1849)).
The features are mostly working, but documentation is missing and minor 
refactoring is needed. The evaluations currently work with top 100 rankings 
(burnt-in), and we still need to fix that. We need feedback for two main 
solutions, so we can go on with the PR. Thanks for any comment!

### `RankingPredictor`

We have managed to rework the evaluation framework proposed by @thvasilo, 
so that ranking predictions would fit in. Our approach is to use separate 
`RankingPredictor` and `Predictor` traits. One main problem however remains: 
there is no common superclass for `RankingPredictor` and `Predictor` so the 
pipelining mechanism might not work. A `Predictor` can only be at the and of 
the pipeline, so this should not really be a problem, but I do not know for 
sure. An alternative solution would be to have different objects `ALS` and 
`RankingALS` that give different predictions, but both extends only a 
`Predictor`. There could be implicit conversions between the two. I would 
prefer the current solution if it does not break the pipelining. @thvasilo what 
do you think about this?

(This seems to be a problem similar to having a `predict_proba` function in 
scikit learn classification models, where the same model for the same input 
gives two different predictions: a `predict` for discrete predictions and 
`predict_proba` for giving a probability.)

### Generalizing `EvalutateDataSetOperation`

On the other hand, we seem to have solved the scoring issue. The users can 
evaluate a recommendation algorithm such as ALS by using a score operating on 
rankings (e.g. nDCG), or a score operating on ratings (e.g. RMSE). They only 
need to modify the `Score` they use in their code, and nothing else.

The main problem was that the evaluate method and 
`EvaluateDataSetOperation` were not general enough. They prepare the evaluation 
to `(trueValue, predictedValue)` pairs (i.e. a `DataSet[(PredictionType, 
PredictionType)]`), while ranking evaluations needed a more general input with 
the true ratings (`DataSet[(Int,Int,Double)]`) and the predicted rankings 
(`DataSet[(Int,Int,Int)]`).

Instead of using `EvaluateDataSetOperation` we use a more general 
`PrepareOperation`. We rename the `Score` in the original evaluation framework 
to `PairwiseScore`. `RankingScore` and `PairwiseScore` has a common trait 
`Score`. This way the user can use both a `RankingScore` and a `PairwiseScore` 
for a certain model, and only need to alter the score used in the code.

In case of pairwise scores (that only need true and predicted value pairs 
for evaluation) `EvaluateDataSetOperation` is used as a `PrepareOperation`. It 
prepares the evaluation by creating `(trueValue, predicitedValue)` pairs from 
the test dataset. Thus, the result of preparing and the input of 
`PairwiseScore`s will be `DataSet[(PredictionType,PredictionType)]`. In case of 
rankings the `PrepareOperation` passes the test dataset and creates the 
rankings. The result of preparing and the input of `RankingScore`s will be 
`(DataSet[Int,Int,Double], DataSet[Int,Int,Int])`. I believe this is a fairly 
acceptable solution that avoids breaking the API.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaborhermann/flink ranking-rec-eval

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2838.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2838






> Implementing ranking predictions for ALS
> 
>
> Key: FLINK-4712
> URL: https://issues.apache.org/jira/browse/FLINK-4712
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Domokos Miklós Kelen
>Assignee: Gábor Hermann
>
> We started working on implementing ranking predictions for recommender 
> systems. Ranking prediction means that beside predicting scores for user-item 
> pairs, the recommender system is able to recommend a top K list for the users.
> Details:
> In practice, this would mean find

[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...

2016-11-21 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r88868369
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala
 ---
@@ -18,12 +18,37 @@
 
 package org.apache.flink.ml.evaluation
 
+import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.ml._
+import org.apache.flink.ml.pipeline._
+import org.apache.flink.ml.RichNumericDataSet
+import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
 
+trait Score[
--- End diff --

`Score` is the common trait for `RankingScore` and `PairwiseScore`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683173#comment-15683173
 ] 

ASF GitHub Bot commented on FLINK-4712:
---

Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r88868369
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala
 ---
@@ -18,12 +18,37 @@
 
 package org.apache.flink.ml.evaluation
 
+import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.ml._
+import org.apache.flink.ml.pipeline._
+import org.apache.flink.ml.RichNumericDataSet
+import org.apache.flink.util.Collector
 
 import scala.reflect.ClassTag
 
+trait Score[
--- End diff --

`Score` is the common trait for `RankingScore` and `PairwiseScore`.


> Implementing ranking predictions for ALS
> 
>
> Key: FLINK-4712
> URL: https://issues.apache.org/jira/browse/FLINK-4712
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Domokos Miklós Kelen
>Assignee: Gábor Hermann
>
> We started working on implementing ranking predictions for recommender 
> systems. Ranking prediction means that beside predicting scores for user-item 
> pairs, the recommender system is able to recommend a top K list for the users.
> Details:
> In practice, this would mean finding the K items for a particular user with 
> the highest predicted rating. It should be possible also to specify whether 
> to exclude the already seen items from a particular user's toplist. (See for 
> example the 'exclude_known' setting of [Graphlab Create's ranking 
> factorization 
> recommender|https://turi.com/products/create/docs/generated/graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend.html#graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend]
>  ).
> The output of the topK recommendation function could be in the form of 
> {{DataSet[(Int,Int,Int)]}}, meaning (user, item, rank), similar to Graphlab 
> Create's output. However, this is arguable: follow up work includes 
> implementing ranking recommendation evaluation metrics (such as precision@k, 
> recall@k, ndcg@k), similar to [Spark's 
> implementations|https://spark.apache.org/docs/1.5.0/mllib-evaluation-metrics.html#ranking-systems].
>  It would be beneficial if we were able to design the API such that it could 
> be included in the proposed evaluation framework (see 
> [5157|https://issues.apache.org/jira/browse/FLINK-2157]), which makes it 
> neccessary to consider the possible output type {{DataSet[(Int, 
> Array[Int])]}} or {{DataSet[(Int, Array[(Int,Double)])]}} meaning (user, 
> array of items), possibly including the predicted scores as well. See 
> [4713|https://issues.apache.org/jira/browse/FLINK-4713] for details.
> Another question arising is whether to provide this function as a member of 
> the ALS class, as a switch-kind of parameter to the ALS implementation 
> (meaning the model is either a rating or a ranking recommender model) or in 
> some other way.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...

2016-11-21 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r88868762
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 ---
@@ -267,6 +401,21 @@ trait PredictOperation[Instance, Model, Testing, 
Prediction] extends Serializabl
   def predict(value: Testing, model: Model): Prediction
 }
 
+/**
+  * Operation for preparing a testing [[DataSet]] for evaluation.
+  *
+  * The most commonly [[EvaluateDataSetOperation]] is used, but evaluation 
of
+  * ranking recommendations need input in a different form.
+  */
+trait PrepareOperation[Instance, Testing, Prepared] extends Serializable {
--- End diff --

`PrepareOperation` is the common trait for `EvaluateDataSetOperation` and 
preparing ranking evaluation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683179#comment-15683179
 ] 

ASF GitHub Bot commented on FLINK-4712:
---

Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r88868762
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 ---
@@ -267,6 +401,21 @@ trait PredictOperation[Instance, Model, Testing, 
Prediction] extends Serializabl
   def predict(value: Testing, model: Model): Prediction
 }
 
+/**
+  * Operation for preparing a testing [[DataSet]] for evaluation.
+  *
+  * The most commonly [[EvaluateDataSetOperation]] is used, but evaluation 
of
+  * ranking recommendations need input in a different form.
+  */
+trait PrepareOperation[Instance, Testing, Prepared] extends Serializable {
--- End diff --

`PrepareOperation` is the common trait for `EvaluateDataSetOperation` and 
preparing ranking evaluation.


> Implementing ranking predictions for ALS
> 
>
> Key: FLINK-4712
> URL: https://issues.apache.org/jira/browse/FLINK-4712
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Domokos Miklós Kelen
>Assignee: Gábor Hermann
>
> We started working on implementing ranking predictions for recommender 
> systems. Ranking prediction means that beside predicting scores for user-item 
> pairs, the recommender system is able to recommend a top K list for the users.
> Details:
> In practice, this would mean finding the K items for a particular user with 
> the highest predicted rating. It should be possible also to specify whether 
> to exclude the already seen items from a particular user's toplist. (See for 
> example the 'exclude_known' setting of [Graphlab Create's ranking 
> factorization 
> recommender|https://turi.com/products/create/docs/generated/graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend.html#graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend]
>  ).
> The output of the topK recommendation function could be in the form of 
> {{DataSet[(Int,Int,Int)]}}, meaning (user, item, rank), similar to Graphlab 
> Create's output. However, this is arguable: follow up work includes 
> implementing ranking recommendation evaluation metrics (such as precision@k, 
> recall@k, ndcg@k), similar to [Spark's 
> implementations|https://spark.apache.org/docs/1.5.0/mllib-evaluation-metrics.html#ranking-systems].
>  It would be beneficial if we were able to design the API such that it could 
> be included in the proposed evaluation framework (see 
> [5157|https://issues.apache.org/jira/browse/FLINK-2157]), which makes it 
> neccessary to consider the possible output type {{DataSet[(Int, 
> Array[Int])]}} or {{DataSet[(Int, Array[(Int,Double)])]}} meaning (user, 
> array of items), possibly including the predicted scores as well. See 
> [4713|https://issues.apache.org/jira/browse/FLINK-4713] for details.
> Another question arising is whether to provide this function as a member of 
> the ALS class, as a switch-kind of parameter to the ALS implementation 
> (meaning the model is either a rating or a ranking recommender model) or in 
> some other way.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2646) Rich functions should provide a method "closeAfterFailure()"

2016-11-21 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683181#comment-15683181
 ] 

Stephan Ewen commented on FLINK-2646:
-

Strictly speaking, this is API breaking, because we add a method to a public 
interface.
We assume it does not really break, because no one implements the interface 
directly, but always goes through the {{AbstractRichFunction}}, from which all 
the rich function variants inherit.

So, We would need to add it to {{AbstractRichFunction}} and define a rule 
exclusion for the {{apicmp}} plugin.

> Rich functions should provide a method "closeAfterFailure()"
> 
>
> Key: FLINK-2646
> URL: https://issues.apache.org/jira/browse/FLINK-2646
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Liang Chen
> Fix For: 1.0.0
>
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...

2016-11-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r88869401
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Project
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.ProjectableTableSource
+
+import scala.collection.JavaConverters._
+
+class BatchTableProject(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType)
+  extends Project(cluster, traits, input, projects, projectionRowType)
+  with DataSetRel {
+
+  override def copy(traitSet: RelTraitSet, input: RelNode,
+projects: util.List[RexNode], scanRawType: RelDataType): Project = {
+new BatchTableProject(cluster, traitSet, input, projects, 
projectionRowType)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: BatchTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val projectableSource = 
getInput.getTable.unwrap(classOf[TableSourceTable])
+  .tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
+projectableSource.setProjection(indexes)
+
+getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
--- End diff --

Yes, it will turn into a scan with projection and will skip the  `super. 
translateToPlan `.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683185#comment-15683185
 ] 

ASF GitHub Bot commented on FLINK-3848:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2810#discussion_r88869401
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Project
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.plan.schema.TableSourceTable
+import org.apache.flink.api.table.sources.ProjectableTableSource
+
+import scala.collection.JavaConverters._
+
+class BatchTableProject(
+cluster: RelOptCluster,
+traits: RelTraitSet,
+input: RelNode,
+projects: util.List[_ <: RexNode],
+projectionRowType: RelDataType)
+  extends Project(cluster, traits, input, projects, projectionRowType)
+  with DataSetRel {
+
+  override def copy(traitSet: RelTraitSet, input: RelNode,
+projects: util.List[RexNode], scanRawType: RelDataType): Project = {
+new BatchTableProject(cluster, traitSet, input, projects, 
projectionRowType)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: 
RelMetadataQuery): RelOptCost = {
+super.computeSelfCost(planner, mq).multiplyBy(0.8)
+  }
+
+  override def translateToPlan(
+tableEnv: BatchTableEnvironment,
+expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val projectableSource = 
getInput.getTable.unwrap(classOf[TableSourceTable])
+  .tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val indexes = 
projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray
+projectableSource.setProjection(indexes)
+
+getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
--- End diff --

Yes, it will turn into a scan with projection and will skip the  `super. 
translateToPlan `.


> Add ProjectableTableSource interface and translation rule
> -
>
> Key: FLINK-3848
> URL: https://issues.apache.org/jira/browse/FLINK-3848
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation 
> that support projection push-down.
> The interface could look as follows
> {code}
> def trait ProjectableTableSource {
>   def setProjection(fields: Array[String]): Unit
> }
> {code}
> In addition we need Calcite rules to push a projection into a TableScan that 
> refers to a {{ProjectableTableSource}}. We might need to tweak the cost model 
> as well to push the optimizer in the right direction.
> Moreover, the {{CsvTableSource}} could be extended to implement 
> {{ProjectableTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5097) The TypeExtractor is missing input type information in some Graph methods

2016-11-21 Thread Vasia Kalavri (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vasia Kalavri updated FLINK-5097:
-
Description: The TypeExtractor is called without information about the 
input type in {{mapVertices}} and {{mapEdges}} although this information can be 
easily retrieved.  (was: The TypeExtractor is called without information about 
the input type in {{mapVertices}}, {{mapVEdges}}, and {{fromDataSet}}, although 
this information can be easily retrieved.)

> The TypeExtractor is missing input type information in some Graph methods
> -
>
> Key: FLINK-5097
> URL: https://issues.apache.org/jira/browse/FLINK-5097
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> The TypeExtractor is called without information about the input type in 
> {{mapVertices}} and {{mapEdges}} although this information can be easily 
> retrieved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup

2016-11-21 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683082#comment-15683082
 ] 

Maximilian Michels edited comment on FLINK-5081 at 11/21/16 11:23 AM:
--

I've had a second look. -The issue is not that the configuration is not loaded. 
Moreover, your finding reveals at least two other issues with our per-job YARN 
implementation:-

-1. When executing in non-detached job submission mode, the "Client Shutdown 
Hook" shuts down the Yarn application in case of job failures (e.g. TaskManager 
dies). We should remove the shutdown hook. It should only be active during 
deployment.-

-2. The per-job Yarn application is supposed to automatically shut down the 
cluster after job completion. In case of failures (e.g. TaskManager dies) the 
shutdown apparently is performed as well although it shouldn't.-

edit: 

1) is not an issue since it only shuts down when it reaches a terminal state.
2) Is an issue but unrelated to this issue

-The actual issue here is that the JobManager informs the client of the failed 
job and the client shuts down the cluster. We should differentiate between 
fatal and non-fatal failures in the client.-

edit2:

Not an issue either :) Probably you forgot to configure the restart strategy.

{noformat}
# defaults to none
restart-strategy: fixed-delay
# defaults to 1
restart-strategy.fixed-delay.attempts: 10
{noformat}

With that in place, we can kill TaskManagers and resume job execution after a 
container restart without problems.


was (Author: mxm):
I've had a second look. -The issue is not that the configuration is not loaded. 
Moreover, your finding reveals at least two other issues with our per-job YARN 
implementation:-

-1. When executing in non-detached job submission mode, the "Client Shutdown 
Hook" shuts down the Yarn application in case of job failures (e.g. TaskManager 
dies). We should remove the shutdown hook. It should only be active during 
deployment.-

-2. The per-job Yarn application is supposed to automatically shut down the 
cluster after job completion. In case of failures (e.g. TaskManager dies) the 
shutdown apparently is performed as well although it shouldn't.-

edit: 

1) is not an issue since it only shuts down when it reaches a terminal state.
2) Is an issue but unrelated to this issue

The actual issue here is that the JobManager informs the client of the failed 
job and the client shuts down the cluster. We should differentiate between 
fatal and non-fatal failures in the client.

> unable to set yarn.maximum-failed-containers with flink one-time YARN setup
> ---
>
> Key: FLINK-5081
> URL: https://issues.apache.org/jira/browse/FLINK-5081
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Nico Kruber
>Assignee: Maximilian Michels
> Fix For: 1.2.0, 1.1.4
>
>
> When letting flink setup YARN for a one-time job, it apparently does not 
> deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the 
> {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as 
> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn
>  suggested also does not work.
> example:
> {code:none}
> flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 
> -Dyarn.maximum-failed-containers=100
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup

2016-11-21 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-5081:
--
Fix Version/s: (was: 1.1.4)
   (was: 1.2.0)

> unable to set yarn.maximum-failed-containers with flink one-time YARN setup
> ---
>
> Key: FLINK-5081
> URL: https://issues.apache.org/jira/browse/FLINK-5081
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Nico Kruber
>Assignee: Maximilian Michels
>
> When letting flink setup YARN for a one-time job, it apparently does not 
> deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the 
> {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as 
> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn
>  suggested also does not work.
> example:
> {code:none}
> flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 
> -Dyarn.maximum-failed-containers=100
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup

2016-11-21 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-5081.
---
Resolution: Not A Problem

Resolving this issue but feel free to re-open in case I missed anything.

> unable to set yarn.maximum-failed-containers with flink one-time YARN setup
> ---
>
> Key: FLINK-5081
> URL: https://issues.apache.org/jira/browse/FLINK-5081
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Nico Kruber
>Assignee: Maximilian Michels
> Fix For: 1.2.0, 1.1.4
>
>
> When letting flink setup YARN for a one-time job, it apparently does not 
> deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the 
> {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as 
> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn
>  suggested also does not work.
> example:
> {code:none}
> flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 
> -Dyarn.maximum-failed-containers=100
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores

2016-11-21 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683269#comment-15683269
 ] 

Maximilian Michels commented on FLINK-5071:
---

Thanks! This has changed recently and only 1.1-SNAPSHOT and 1.2-SNAPSHOT are 
affected.

> YARN: yarn.containers.vcores config not respected when checking for vcores
> --
>
> Key: FLINK-5071
> URL: https://issues.apache.org/jira/browse/FLINK-5071
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Gyula Fora
> Fix For: 1.2.0, 1.1.4
>
>
> The YarnClient validates whether the number of task slots is less then the 
> max vcores settings of yarn but seems to ignore the yarn.containers.vcores 
> flink config which should be used instead of the slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores

2016-11-21 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-5071:
--
Affects Version/s: 1.1.4
   1.2.0

> YARN: yarn.containers.vcores config not respected when checking for vcores
> --
>
> Key: FLINK-5071
> URL: https://issues.apache.org/jira/browse/FLINK-5071
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Gyula Fora
> Fix For: 1.2.0, 1.1.4
>
>
> The YarnClient validates whether the number of task slots is less then the 
> max vcores settings of yarn but seems to ignore the yarn.containers.vcores 
> flink config which should be used instead of the slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores

2016-11-21 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-5071:
-

Assignee: Maximilian Michels

> YARN: yarn.containers.vcores config not respected when checking for vcores
> --
>
> Key: FLINK-5071
> URL: https://issues.apache.org/jira/browse/FLINK-5071
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Gyula Fora
>Assignee: Maximilian Michels
> Fix For: 1.2.0, 1.1.4
>
>
> The YarnClient validates whether the number of task slots is less then the 
> max vcores settings of yarn but seems to ignore the yarn.containers.vcores 
> flink config which should be used instead of the slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores

2016-11-21 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-5071:
--
Fix Version/s: (was: 1.1.3)
   1.1.4
   1.2.0

> YARN: yarn.containers.vcores config not respected when checking for vcores
> --
>
> Key: FLINK-5071
> URL: https://issues.apache.org/jira/browse/FLINK-5071
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Gyula Fora
> Fix For: 1.2.0, 1.1.4
>
>
> The YarnClient validates whether the number of task slots is less then the 
> max vcores settings of yarn but seems to ignore the yarn.containers.vcores 
> flink config which should be used instead of the slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88700937
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+   this.mode = mode;
--- End diff --

`mode` could also be `null`. I think it's easier to follow the following 
pattern:

```
this.mode = Preconditions.checkNot

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88676639
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public void restoreState(Integer state) throws Exception {
+   this.start = state;
+   }
+
+   @Override
+   public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+   return start;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   }
+   Thread.sleep(10);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+
+   public static void main(String[] args) throws Exception {
+
+   // obtain execution environment and set setBufferTimeout to 1 
to enable
+   // continuous flushing of the output buffers (lowest latency)
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
+   .setBufferTimeout(1);
+
+   // configurations for the job
+   String statePath = args[0];
+   String cpMode = args[1];
+   int maxCount = Integer.valueOf(args[2]);
+   final int sleepFactor = Integer.valueOf(args[3]);
+   final float failRatio = Float.valueOf(args[4]);
+   String mode = args[5];
+   int taskNum = Integer.valueOf(args[6]);
+   String timeType = args[7];
+
+

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88678544
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import 
org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+/**
+ * A helper class to apply {@link AsyncFunction} to a data stream.
+ * 
+ * {@code
+ * DataStream input = ...
+ * AsyncFunction> asyncFunc = ...
+ *
+ * AsyncDataStream.orderedWait(input, asyncFunc, 100);
+ * }
+ * 
+ */
+public class AsyncDataStream {
+   public enum OutputMode { ORDERED, UNORDERED }
+
+   private static final int DEFAULT_BUFFER_SIZE = 100;
+
+   private static  SingleOutputStreamOperator addOperator(
+   DataStream in,
+   AsyncFunction func,
+   int bufSize,
+   OutputMode mode) {
+   TypeInformation outTypeInfo =
+   TypeExtractor.getUnaryOperatorReturnType((Function) 
func, AsyncFunction.class, false,
+   true, in.getType(), 
Utils.getCallLocationName(), true);
+
+   // create transform
+   AsyncWaitOperator operator = new 
AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func));
+   operator.setBufferSize(bufSize);
+   operator.setOutputMode(mode);
+
+   OneInputTransformation resultTransform = new 
OneInputTransformation<>(
+   in.getTransformation(),
+   "async wait operator",
+   operator,
+   outTypeInfo,
+   in.getExecutionEnvironment().getParallelism());
+
+   SingleOutputStreamOperator returnStream =
+   new 
SingleOutputStreamOperator<>(in.getExecutionEnvironment(), resultTransform);
+
+   
returnStream.getExecutionEnvironment().addOperator(resultTransform);
+
+   return returnStream;
--- End diff --

Can't we replace everything from `resultTransform` via

```
return in.transform(
"async wait operator",
outTypeInfo,
operator);
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88676017
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
ListCheckpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public List snapshotState(long checkpointId, long 
timestamp) throws Exception {
+   return Collections.singletonList(start);
+   }
+
+   @Override
+   public void restoreState(List state) throws Exception {
+   for (Integer i : state)
+   this.start = i;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
--- End diff --

Can we set the `counter` to `-1` to indicate an infinite input stream?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88679834
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+
+/**
+ * Rich variant of the {@link AsyncFunction}. As a {@link RichFunction}, 
it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and 
provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} 
and
+ * {@link RichFunction#close()}.
+ *
+ * @param  The type of the input elements.
+ * @param  The type of the returned elements.
+ */
+public abstract class RichAsyncFunction extends 
AbstractRichFunction
--- End diff --

I'm not sure whether we should offer a rich variant of `AsyncFunction`, 
because any access to state from a future callback will not be scoped to the 
correct key. I'm quite sure that people will run into this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88680106
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+
+import java.io.Serializable;
+
+/**
+ * A function to trigger Async I/O operation.
+ * 
+ * For each #asyncInvoke, an async io operation can be triggered, and once 
it has been done,
+ * the result can be collected by calling {@link AsyncCollector#collect}. 
For each async
+ * operations, their contexts are buffered in the operator immediately 
after invoking
+ * #asyncInvoke, leading to no blocking for each stream input as long as 
internal buffer is not full.
+ * 
+ * {@link AsyncCollector} can be passed into callbacks or futures provided 
by async client to
+ * fetch result data. Any error can also be propagate to the operator by 
{@link AsyncCollector#collect(Throwable)}.
+ *
+ * 
+ * Typical usage for callback:
+ * {@code
+ * public class HBaseAsyncFunc implements AsyncFunction {
+ *   @Override
+ *   public void asyncInvoke(String row, AsyncCollector collector) 
throws Exception {
+ * HBaseCallback cb = new HBaseCallback(collector);
+ * Get get = new Get(Bytes.toBytes(row));
+ * hbase.asyncGet(get, cb);
+ *   }
+ * }
+ * }
+ * 
+ *
+ * 
+ * Typical usage for {@link 
com.google.common.util.concurrent.ListenableFuture}
+ * {@code
+ * public class HBaseAsyncFunc implements AsyncFunction {
+ *   @Override
+ *   public void asyncInvoke(String row, final AsyncCollector 
collector) throws Exception {
+ * Get get = new Get(Bytes.toBytes(row));
+ * ListenableFuture future = hbase.asyncGet(get);
+ * Futures.addCallback(future, new FutureCallback() {
+ *   public void onSuccess(Result result) {
+ * List ret = process(result);
+ * collector.collect(ret);
+ *   }
+ *   public void onFailure(Throwable thrown) {
+ * collector.collect(thrown);
+ *   }
+ * });
+ *   }
+ * }
+ * }
+ * 
+ *
+ * @param  The type of the input elements.
+ * @param  The type of the returned elements.
+ */
+public interface AsyncFunction extends Function, Serializable {
--- End diff --

`PublicEvolving`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88707089
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+   this.mode = mode;
+   this.output = output;
+   this.timestampedCollector = collector;
+   this.operator = operator;
+   this.lock = 

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88681032
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
--- End diff --

line break missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88707393
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+   this.mode = mode;
+   this.output = output;
+   this.timestampedCollector = collector;
+   this.operator = operator;
+   this.lock = 

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88680162
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import 
org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+/**
+ * A helper class to apply {@link AsyncFunction} to a data stream.
+ * 
+ * {@code
+ * DataStream input = ...
+ * AsyncFunction> asyncFunc = ...
+ *
+ * AsyncDataStream.orderedWait(input, asyncFunc, 100);
+ * }
+ * 
+ */
+public class AsyncDataStream {
--- End diff --

`PublicEvolving`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88707993
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+   this.mode = mode;
+   this.output = output;
+   this.timestampedCollector = collector;
+   this.operator = operator;
+   this.lock = 

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88701658
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+   this.mode = mode;
+   this.output = output;
+   this.timestampedCollector = collector;
+   this.operator = operator;
+   this.lock = 

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88680325
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link AsyncCollector} collects data / error in user codes while 
processing async i/o.
+ *
+ * @param  Input type
+ * @param  Output type
+ */
+@Internal
+public class AsyncCollector {
+   private List result;
+   private Throwable error;
+
+   private boolean isDone = false;
+
+   private final AsyncCollectorBuffer buffer;
+
+   public AsyncCollector(AsyncCollectorBuffer buffer) {
+   Preconditions.checkNotNull(buffer, "Reference to 
AsyncCollectorBuffer should not be null");
+
+   this.buffer = buffer;
+   }
+
+   public AsyncCollector(AsyncCollectorBuffer buffer, boolean 
isDone) {
+   this(buffer);
+   this.isDone = isDone;
--- End diff --

An `AsyncCollector` should never be done when being created, shouldn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88675892
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public void restoreState(Integer state) throws Exception {
+   this.start = state;
+   }
+
+   @Override
+   public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+   return start;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   }
+   Thread.sleep(10);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+
+   public static void main(String[] args) throws Exception {
+
+   // obtain execution environment and set setBufferTimeout to 1 
to enable
+   // continuous flushing of the output buffers (lowest latency)
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
+   .setBufferTimeout(1);
+
+   // configurations for the job
+   String statePath = args[0];
+   String cpMode = args[1];
+   int maxCount = Integer.valueOf(args[2]);
+   final int sleepFactor = Integer.valueOf(args[3]);
+   final float failRatio = Float.valueOf(args[4]);
+   String mode = args[5];
+   int taskNum = Integer.valueOf(args[6]);
+   String timeType = args[7];
+
+

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88712552
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+   this.mode = mode;
+   this.output = output;
+   this.timestampedCollector = collector;
+   this.operator = operator;
+   this.lock = 

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88676202
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public void restoreState(Integer state) throws Exception {
+   this.start = state;
+   }
+
+   @Override
+   public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+   return start;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   }
+   Thread.sleep(10);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+
+   public static void main(String[] args) throws Exception {
+
+   // obtain execution environment and set setBufferTimeout to 1 
to enable
+   // continuous flushing of the output buffers (lowest latency)
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
+   .setBufferTimeout(1);
+
+   // configurations for the job
+   String statePath = args[0];
+   String cpMode = args[1];
+   int maxCount = Integer.valueOf(args[2]);
+   final int sleepFactor = Integer.valueOf(args[3]);
+   final float failRatio = Float.valueOf(args[4]);
+   String mode = args[5];
+   int taskNum = Integer.valueOf(args[6]);
+   String timeType = args[7];
+
+

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88676904
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
ListCheckpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public List snapshotState(long checkpointId, long 
timestamp) throws Exception {
+   return Collections.singletonList(start);
+   }
+
+   @Override
+   public void restoreState(List state) throws Exception {
+   for (Integer i : state)
+   this.start = i;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   }
+   Thread.sleep(10);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+   private static void printUsage() {
+   System.out.println("To customize example, use: AsyncIOExample 
[--fsStatePath ] " +
+   "[--checkpointMode ] 
[--maxCount ] " +
+   "[--sleepFactor ] [--failRatio ] " +
+   "[--waitMode ] 
[--waitOperatorParallelism ] " +
+   "[--eventType ]");
+   }
+
+   public static void main(String[] args) throws Exception {
+
+   // obtain execution environment
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   printUsage();
+
+   // parse parameters

[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

2016-11-21 Thread vasia
Github user vasia commented on the issue:

https://github.com/apache/flink/pull/2564
  
Thank @mushketyk. @greghogan are you shepherding this PR or shall I? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683278#comment-15683278
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88681032
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
--- End diff --

line break missing


> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683277#comment-15683277
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88680106
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+
+import java.io.Serializable;
+
+/**
+ * A function to trigger Async I/O operation.
+ * 
+ * For each #asyncInvoke, an async io operation can be triggered, and once 
it has been done,
+ * the result can be collected by calling {@link AsyncCollector#collect}. 
For each async
+ * operations, their contexts are buffered in the operator immediately 
after invoking
+ * #asyncInvoke, leading to no blocking for each stream input as long as 
internal buffer is not full.
+ * 
+ * {@link AsyncCollector} can be passed into callbacks or futures provided 
by async client to
+ * fetch result data. Any error can also be propagate to the operator by 
{@link AsyncCollector#collect(Throwable)}.
+ *
+ * 
+ * Typical usage for callback:
+ * {@code
+ * public class HBaseAsyncFunc implements AsyncFunction {
+ *   @Override
+ *   public void asyncInvoke(String row, AsyncCollector collector) 
throws Exception {
+ * HBaseCallback cb = new HBaseCallback(collector);
+ * Get get = new Get(Bytes.toBytes(row));
+ * hbase.asyncGet(get, cb);
+ *   }
+ * }
+ * }
+ * 
+ *
+ * 
+ * Typical usage for {@link 
com.google.common.util.concurrent.ListenableFuture}
+ * {@code
+ * public class HBaseAsyncFunc implements AsyncFunction {
+ *   @Override
+ *   public void asyncInvoke(String row, final AsyncCollector 
collector) throws Exception {
+ * Get get = new Get(Bytes.toBytes(row));
+ * ListenableFuture future = hbase.asyncGet(get);
+ * Futures.addCallback(future, new FutureCallback() {
+ *   public void onSuccess(Result result) {
+ * List ret = process(result);
+ * collector.collect(ret);
+ *   }
+ *   public void onFailure(Throwable thrown) {
+ * collector.collect(thrown);
+ *   }
+ * });
+ *   }
+ * }
+ * }
+ * 
+ *
+ * @param  The type of the input elements.
+ * @param  The type of the returned elements.
+ */
+public interface AsyncFunction extends Function, Serializable {
--- End diff --

`PublicEvolving`


> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683280#comment-15683280
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88680681
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
--- End diff --

Maybe the `AsyncCollectorBuffer` belongs into the `functions` package as 
the `AsyncFunction`.


> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88714777
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+   this.mode = mode;
+   this.output = output;
+   this.timestampedCollector = collector;
+   this.operator = operator;
+   this.lock = 

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683284#comment-15683284
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88701332
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NU

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88707416
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+   this.mode = mode;
+   this.output = output;
+   this.timestampedCollector = collector;
+   this.operator = operator;
+   this.lock = 

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683323#comment-15683323
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88859015
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
 ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Tests for {@link AsyncCollectorBuffer}. These test that:
+ *
+ * 
+ * Add a new item into the buffer
+ * Ordered mode processing
+ * Unordered mode processing
+ * Error handling
+ * 
+ */
+public class AsyncCollectorBufferTest {
+   private AsyncFunction function;
+
+   private AsyncWaitOperator operator;
+
+   private AsyncCollectorBuffer buffer;
+
+   private Output> output;
+
+   private TimestampedCollector collector;
+
+   private Object lock = new Object();
+
+   @Before
+   public void setUp() throws Exception {
+   function = new AsyncFunction() {
+   @Override
+   public void asyncInvoke(Integer input, 
AsyncCollector collector) throws Exception {
+
+   }
+   };
+
+   operator = new AsyncWaitOperator<>(function);
+   Class[] classes = 
AbstractStreamOperator.class.getDeclaredClasses();
+   Class latencyClass = null;
+   for (Class c : classes) {
+   if (c.getName().indexOf("LatencyGauge") != -1) {
+   latencyClass = c;
+   }
+   }
+
+   Constructor explicitConstructor = 
latencyClass.getDeclaredConstructors()[0];
+   explicitConstructor.setAccessible(true);
+   Whitebox.setInternalState(operator, "latencyGauge", 
explicitConstructor.newInstance(10));
+
+   output = new FakedOutput(new ArrayList());
+   collector =new TimestampedCollector(output);
+
+   Whitebox.setInternalState(operator, "output", output);
+   }
+
+   @Test
+   public void testAdd() throws Exception {
+   buffer =
+   new AsyncCollectorBuffer<>(3, 
AsyncDataStream.OutputMode.UNORDERED, output, collector, lock, operator);
+
+   buffer.addWatermark(new Watermark(0l));
+   buffer.addLatencyMarker(new LatencyMarker(111L, 1, 1));
+   Assert.assertEquals(buffer.getQueue().size(), 2);
+
+   Iterator, 
StreamElement>> iterator =
+   buffer.getQueue().entrySet().iterator();

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683308#comment-15683308
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88715005
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NU

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683297#comment-15683297
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88712331
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NU

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683273#comment-15683273
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88676017
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
ListCheckpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public List snapshotState(long checkpointId, long 
timestamp) throws Exception {
+   return Collections.singletonList(start);
+   }
+
+   @Override
+   public void restoreState(List state) throws Exception {
+   for (Integer i : state)
+   this.start = i;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
--- End diff --

Can we set the `counter` to `-1` to indicate an infinite input stream?


> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so ma

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683272#comment-15683272
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88675892
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public void restoreState(Integer state) throws Exception {
+   this.start = state;
+   }
+
+   @Override
+   public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+   return start;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   }
+   Thread.sleep(10);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+
+   public static void main(String[] args) throws Exception {
+
+   // obtain execution environment and set setBufferTimeout to 1 
to enable
+   // continuous flushing of the output buffers (lowest latency)
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
+   .setBufferTimeout(1);
+
+   // configurations for the job
+   String statePath = args[0];
+   String cpMode = args[1];
+   int maxCount = Integer.valueOf(args[2]);
+   final int s

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88712331
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NULL.");
+
+   this.bufferSize = bufferSize;
+   this.mode = mode;
+   this.output = output;
+   this.timestampedCollector = collector;
+   this.operator = operator;
+   this.lock = 

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683287#comment-15683287
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88701684
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NU

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683300#comment-15683300
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88720498
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -116,7 +116,12 @@ public OperatorChain(StreamTask 
containingTask) {
 
// add head operator to end of chain
allOps.add(headOperator);
-   
+
+   // reverse the order of all operators so that head 
operator is at the first place.
+   // for chained operator with async wait operator, 
operators after wait operator have to
+   // wait for while until all data in the buffer in wait 
operator has done snapshot.
+   Collections.reverse(allOps);
--- End diff --

This won't work with emitting elements in the open method of 
`AsyncWaitOperator`, because then the downstream operators are potentially not 
yet opened when the first stream element arrives there.


> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683310#comment-15683310
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88719571
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+@Internal
+public class AsyncWaitOperator
+   extends AbstractUdfStreamOperator>
+   implements OneInputStreamOperator
+{
+   private final int DEFAULT_BUFFER_SIZE = 1000;
+
+   private static final long serialVersionUID = 1L;
+
+   private final static String STATE_NAME = "_async_wait_operator_state_";
+
+   /**
+* {@link TypeSerializer} for inputs while making snapshots.
+*/
+   private transient StreamElementSerializer inStreamElementSerializer;
+
+   /**
+* input stream elements from the state
+*/
+   private transient ListState recoveredStreamElements;
+
+   private transient TimestampedCollector collector;
+
+   private transient AsyncCollectorBuffer buffer;
+
+   /**
+* Checkpoint lock from {@link StreamTask#lock}
+*/
+   private transient Object checkpointLock;
+
+   private int bufferSize = DEFAULT_BUFFER_SIZE;
+   private AsyncDataStream.OutputMode mode;
+
+   /**
+* For test only. Normally this flag is true, indicating that the 
Emitter Thread
+* in the buffer will work.
+*/
+   private boolean emitFlag = true;
+
+   /**
+* Test serializer used in unit test
+*/
+   private StreamElementSerializer inStreamElementSerializerForTest;
+
+
+   public AsyncWaitOperator(AsyncFunction asyncFunction) {
+   super(asyncFunction);
+   chainingStrategy = ChainingStrategy.ALWAYS;
+   }
+
+   public void setBufferSize(int size) {
+   Preconditions.checkArgument(size > 0, "The number of concurrent 
async operation should be greater than 0.");
+   bufferSize = size;
+   }
+
+   public void setOutputMode(AsyncDataStream.OutputMode mode) {
+   this.mode = mode;
+   }
+
+   @VisibleForTesting
+   public void

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683320#comment-15683320
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88868024
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+@Internal
+public class AsyncWaitOperator
+   extends AbstractUdfStreamOperator>
+   implements OneInputStreamOperator
+{
+   private final int DEFAULT_BUFFER_SIZE = 1000;
+
+   private static final long serialVersionUID = 1L;
+
+   private final static String STATE_NAME = "_async_wait_operator_state_";
+
+   /**
+* {@link TypeSerializer} for inputs while making snapshots.
+*/
+   private transient StreamElementSerializer inStreamElementSerializer;
+
+   /**
+* input stream elements from the state
+*/
+   private transient ListState recoveredStreamElements;
+
+   private transient TimestampedCollector collector;
+
+   private transient AsyncCollectorBuffer buffer;
+
+   /**
+* Checkpoint lock from {@link StreamTask#lock}
+*/
+   private transient Object checkpointLock;
+
+   private int bufferSize = DEFAULT_BUFFER_SIZE;
+   private AsyncDataStream.OutputMode mode;
+
+   /**
+* For test only. Normally this flag is true, indicating that the 
Emitter Thread
+* in the buffer will work.
+*/
+   private boolean emitFlag = true;
+
+   /**
+* Test serializer used in unit test
+*/
+   private StreamElementSerializer inStreamElementSerializerForTest;
+
+
+   public AsyncWaitOperator(AsyncFunction asyncFunction) {
+   super(asyncFunction);
+   chainingStrategy = ChainingStrategy.ALWAYS;
+   }
+
+   public void setBufferSize(int size) {
+   Preconditions.checkArgument(size > 0, "The number of concurrent 
async operation should be greater than 0.");
+   bufferSize = size;
+   }
+
+   public void setOutputMode(AsyncDataStream.OutputMode mode) {
+   this.mode = mode;
+   }
+
+   @VisibleForTesting
+   public void

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683321#comment-15683321
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88865433
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AsyncWaitOperator}. These test that:
+ *
+ * 
+ * Process StreamRecords and Watermarks in ORDERED mode
+ * Process StreamRecords and Watermarks in UNORDERED mode
+ * Snapshot state and restore state
+ * 
+ */
+public class AsyncWaitOperatorTest {
+
+   public static class MyAsyncFunction extends RichAsyncFunction {
+   transient final int SLEEP_FACTOR = 100;
+   transient final int THREAD_POOL_SIZE = 10;
+   transient ExecutorService executorService;
--- End diff --

Can't we share the `ExecutorService`?


> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was se

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683314#comment-15683314
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88861548
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
 ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Tests for {@link AsyncCollectorBuffer}. These test that:
+ *
+ * 
+ * Add a new item into the buffer
+ * Ordered mode processing
+ * Unordered mode processing
+ * Error handling
+ * 
+ */
+public class AsyncCollectorBufferTest {
+   private AsyncFunction function;
+
+   private AsyncWaitOperator operator;
+
+   private AsyncCollectorBuffer buffer;
+
+   private Output> output;
+
+   private TimestampedCollector collector;
+
+   private Object lock = new Object();
+
+   @Before
+   public void setUp() throws Exception {
+   function = new AsyncFunction() {
+   @Override
+   public void asyncInvoke(Integer input, 
AsyncCollector collector) throws Exception {
+
+   }
+   };
+
+   operator = new AsyncWaitOperator<>(function);
+   Class[] classes = 
AbstractStreamOperator.class.getDeclaredClasses();
+   Class latencyClass = null;
+   for (Class c : classes) {
+   if (c.getName().indexOf("LatencyGauge") != -1) {
+   latencyClass = c;
+   }
+   }
+
+   Constructor explicitConstructor = 
latencyClass.getDeclaredConstructors()[0];
+   explicitConstructor.setAccessible(true);
+   Whitebox.setInternalState(operator, "latencyGauge", 
explicitConstructor.newInstance(10));
+
+   output = new FakedOutput(new ArrayList());
+   collector =new TimestampedCollector(output);
+
+   Whitebox.setInternalState(operator, "output", output);
+   }
+
+   @Test
+   public void testAdd() throws Exception {
+   buffer =
+   new AsyncCollectorBuffer<>(3, 
AsyncDataStream.OutputMode.UNORDERED, output, collector, lock, operator);
+
+   buffer.addWatermark(new Watermark(0l));
+   buffer.addLatencyMarker(new LatencyMarker(111L, 1, 1));
+   Assert.assertEquals(buffer.getQueue().size(), 2);
+
+   Iterator, 
StreamElement>> iterator =
+   buffer.getQueue().entrySet().iterator();

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683295#comment-15683295
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88714427
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NU

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683328#comment-15683328
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88872044
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NU

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683296#comment-15683296
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88712042
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NU

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683311#comment-15683311
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88861278
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
 ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Tests for {@link AsyncCollectorBuffer}. These test that:
+ *
+ * 
+ * Add a new item into the buffer
+ * Ordered mode processing
+ * Unordered mode processing
+ * Error handling
+ * 
+ */
+public class AsyncCollectorBufferTest {
+   private AsyncFunction function;
+
+   private AsyncWaitOperator operator;
+
+   private AsyncCollectorBuffer buffer;
+
+   private Output> output;
+
+   private TimestampedCollector collector;
+
+   private Object lock = new Object();
+
+   @Before
+   public void setUp() throws Exception {
+   function = new AsyncFunction() {
+   @Override
+   public void asyncInvoke(Integer input, 
AsyncCollector collector) throws Exception {
+
+   }
+   };
+
+   operator = new AsyncWaitOperator<>(function);
+   Class[] classes = 
AbstractStreamOperator.class.getDeclaredClasses();
+   Class latencyClass = null;
+   for (Class c : classes) {
+   if (c.getName().indexOf("LatencyGauge") != -1) {
+   latencyClass = c;
+   }
+   }
+
+   Constructor explicitConstructor = 
latencyClass.getDeclaredConstructors()[0];
+   explicitConstructor.setAccessible(true);
+   Whitebox.setInternalState(operator, "latencyGauge", 
explicitConstructor.newInstance(10));
+
+   output = new FakedOutput(new ArrayList());
+   collector =new TimestampedCollector(output);
+
+   Whitebox.setInternalState(operator, "output", output);
+   }
+
+   @Test
+   public void testAdd() throws Exception {
+   buffer =
+   new AsyncCollectorBuffer<>(3, 
AsyncDataStream.OutputMode.UNORDERED, output, collector, lock, operator);
+
+   buffer.addWatermark(new Watermark(0l));
+   buffer.addLatencyMarker(new LatencyMarker(111L, 1, 1));
+   Assert.assertEquals(buffer.getQueue().size(), 2);
+
+   Iterator, 
StreamElement>> iterator =
+   buffer.getQueue().entrySet().iterator();

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683315#comment-15683315
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88865406
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 ---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AsyncWaitOperator}. These test that:
+ *
+ * 
+ * Process StreamRecords and Watermarks in ORDERED mode
+ * Process StreamRecords and Watermarks in UNORDERED mode
+ * Snapshot state and restore state
+ * 
+ */
+public class AsyncWaitOperatorTest {
+
+   public static class MyAsyncFunction extends RichAsyncFunction {
+   transient final int SLEEP_FACTOR = 100;
+   transient final int THREAD_POOL_SIZE = 10;
--- End diff --

why are these values `transient`?


> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683274#comment-15683274
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88676904
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
ListCheckpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public List snapshotState(long checkpointId, long 
timestamp) throws Exception {
+   return Collections.singletonList(start);
+   }
+
+   @Override
+   public void restoreState(List state) throws Exception {
+   for (Integer i : state)
+   this.start = i;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   }
+   Thread.sleep(10);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+   private static void printUsage() {
+   System.out.println("To customize example, use: AsyncIOExample 
[--fsStatePath ] " +
+   "[--checkpointMode ] 
[--maxCount ] " +
+   "[--sleepFactor ] [--failRatio ] " +
+   "[--waitMode ] 
[--waitOperatorParallelism ] " +
+   "[--eventType ]");
+   }
+
+   public static void main(String[] args) thr

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683303#comment-15683303
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88718820
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+@Internal
+public class AsyncWaitOperator
+   extends AbstractUdfStreamOperator>
+   implements OneInputStreamOperator
+{
+   private final int DEFAULT_BUFFER_SIZE = 1000;
+
+   private static final long serialVersionUID = 1L;
+
+   private final static String STATE_NAME = "_async_wait_operator_state_";
+
+   /**
+* {@link TypeSerializer} for inputs while making snapshots.
+*/
+   private transient StreamElementSerializer inStreamElementSerializer;
+
+   /**
+* input stream elements from the state
+*/
+   private transient ListState recoveredStreamElements;
+
+   private transient TimestampedCollector collector;
+
+   private transient AsyncCollectorBuffer buffer;
+
+   /**
+* Checkpoint lock from {@link StreamTask#lock}
+*/
+   private transient Object checkpointLock;
+
+   private int bufferSize = DEFAULT_BUFFER_SIZE;
+   private AsyncDataStream.OutputMode mode;
+
+   /**
+* For test only. Normally this flag is true, indicating that the 
Emitter Thread
+* in the buffer will work.
+*/
+   private boolean emitFlag = true;
+
+   /**
+* Test serializer used in unit test
+*/
+   private StreamElementSerializer inStreamElementSerializerForTest;
+
+
+   public AsyncWaitOperator(AsyncFunction asyncFunction) {
+   super(asyncFunction);
+   chainingStrategy = ChainingStrategy.ALWAYS;
+   }
+
+   public void setBufferSize(int size) {
+   Preconditions.checkArgument(size > 0, "The number of concurrent 
async operation should be greater than 0.");
+   bufferSize = size;
+   }
+
+   public void setOutputMode(AsyncDataStream.OutputMode mode) {
+   this.mode = mode;
+   }
+
+   @VisibleForTesting
+   public void

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683324#comment-15683324
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88721238
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
 ---
@@ -195,6 +202,70 @@ public Integer map(NonSerializable value) throws 
Exception {
env.execute();
}
 
+   @Test
+   public void testAsyncWaitOperator() throws Exception {
+   final int numElements = 10;
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream> input = 
env.addSource(new NonSerializableTupleSource(numElements)).setParallelism(1);
+
+   AsyncFunction, Integer> 
function = new RichAsyncFunction, Integer>() {
+   transient ExecutorService executorService;
+
+   @Override
+   public void open(Configuration parameters) throws 
Exception {
+   super.open(parameters);
+   executorService = 
Executors.newFixedThreadPool(numElements);
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   executorService.shutdown();
+   }
+
+   @Override
+   public void asyncInvoke(final Tuple2 input,
+   final 
AsyncCollector, Integer> collector) throws 
Exception {
+   this.executorService.submit(new Runnable() {
+   @Override
+   public void run() {
+   // wait for while to simulate 
async operation here
+   int sleep = (int) (new 
Random().nextFloat() * 1000);
+   try {
+   Thread.sleep(sleep);
+   List ret = new 
ArrayList<>();
+   
ret.add(input.f0+input.f0);
+   collector.collect(ret);
+   }
+   catch (InterruptedException e) {
+   collector.collect(new 
ArrayList(0));
+   }
+   }
+   });
+   }
+   };
+
+   DataStream orderedResult = 
AsyncDataStream.orderedWait(input, function, 2).setParallelism(1);
+   orderedResult.writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE).setParallelism(1);
+
+   DataStream unorderedResult = 
AsyncDataStream.unorderedWait(input, function, 2).setParallelism(1);
+   unorderedResult.writeAsText(resultPath2, 
FileSystem.WriteMode.OVERWRITE);
--- End diff --

Writing to disk can fail on travis. Thus, it is highly recommended 
rewriting these tests doing so that they store their results in memory.


> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683319#comment-15683319
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88857262
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
 ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Tests for {@link AsyncCollectorBuffer}. These test that:
+ *
+ * 
+ * Add a new item into the buffer
+ * Ordered mode processing
+ * Unordered mode processing
+ * Error handling
+ * 
+ */
+public class AsyncCollectorBufferTest {
+   private AsyncFunction function;
+
+   private AsyncWaitOperator operator;
+
+   private AsyncCollectorBuffer buffer;
+
+   private Output> output;
+
+   private TimestampedCollector collector;
+
+   private Object lock = new Object();
+
+   @Before
+   public void setUp() throws Exception {
+   function = new AsyncFunction() {
+   @Override
+   public void asyncInvoke(Integer input, 
AsyncCollector collector) throws Exception {
+
+   }
+   };
+
+   operator = new AsyncWaitOperator<>(function);
+   Class[] classes = 
AbstractStreamOperator.class.getDeclaredClasses();
+   Class latencyClass = null;
+   for (Class c : classes) {
+   if (c.getName().indexOf("LatencyGauge") != -1) {
+   latencyClass = c;
+   }
+   }
+
+   Constructor explicitConstructor = 
latencyClass.getDeclaredConstructors()[0];
+   explicitConstructor.setAccessible(true);
+   Whitebox.setInternalState(operator, "latencyGauge", 
explicitConstructor.newInstance(10));
+
+   output = new FakedOutput(new ArrayList());
+   collector =new TimestampedCollector(output);
+
+   Whitebox.setInternalState(operator, "output", output);
--- End diff --

Can't we simply call `AsyncWaitOperator#setup` here?


> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683299#comment-15683299
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88714544
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java
 ---
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its 
internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators 
following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer {
+
+   /**
+* Max number of {@link AsyncCollector} in the buffer.
+*/
+   private final int bufferSize;
+
+   private final AsyncDataStream.OutputMode mode;
+
+   private final AsyncWaitOperator operator;
+
+   /**
+* Keep all {@code AsyncCollector} and their input {@link StreamElement}
+*/
+   private final Map, StreamElement> queue = new 
LinkedHashMap<>();
+   /**
+* For the AsyncWaitOperator chained with StreamSource, the checkpoint 
thread may get the
+* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} 
while {@link AsyncCollectorBuffer#queue}
+* is full since main thread waits on this lock. The StreamElement in
+* {@link AsyncWaitOperator#processElement(StreamRecord)} should be 
treated as a part of all StreamElements
+* in its queue. It will be kept in the operator state while 
snapshotting.
+*/
+   private StreamElement extraStreamElement;
+
+   /**
+* {@link TimestampedCollector} and {@link Output} to collect results 
and watermarks.
+*/
+   private final Output> output;
+   private final TimestampedCollector timestampedCollector;
+
+   /**
+* Checkpoint lock from {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+*/
+   private final Object lock;
+
+   private final Emitter emitter;
+   private final Thread emitThread;
+
+   private IOException error;
+
+   public AsyncCollectorBuffer(
+   int bufferSize,
+   AsyncDataStream.OutputMode mode,
+   Output> output,
+   TimestampedCollector collector,
+   Object lock,
+   AsyncWaitOperator operator) {
+   Preconditions.checkArgument(bufferSize > 0, "Future buffer size 
should be greater than 0.");
+   Preconditions.checkNotNull(output, "Output should not be 
NULL.");
+   Preconditions.checkNotNull(collector, "TimestampedCollector 
should not be NULL.");
+   Preconditions.checkNotNull(lock, "Checkpoint lock should not be 
NULL.");
+   Preconditions.checkNotNull(operator, "Reference to 
AsyncWaitOperator should not be NU

[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...

2016-11-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88716757
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+@Internal
+public class AsyncWaitOperator
+   extends AbstractUdfStreamOperator>
+   implements OneInputStreamOperator
+{
+   private final int DEFAULT_BUFFER_SIZE = 1000;
+
+   private static final long serialVersionUID = 1L;
+
+   private final static String STATE_NAME = "_async_wait_operator_state_";
+
+   /**
+* {@link TypeSerializer} for inputs while making snapshots.
+*/
+   private transient StreamElementSerializer inStreamElementSerializer;
+
+   /**
+* input stream elements from the state
+*/
+   private transient ListState recoveredStreamElements;
+
+   private transient TimestampedCollector collector;
+
+   private transient AsyncCollectorBuffer buffer;
+
+   /**
+* Checkpoint lock from {@link StreamTask#lock}
+*/
+   private transient Object checkpointLock;
+
+   private int bufferSize = DEFAULT_BUFFER_SIZE;
+   private AsyncDataStream.OutputMode mode;
+
+   /**
+* For test only. Normally this flag is true, indicating that the 
Emitter Thread
+* in the buffer will work.
+*/
+   private boolean emitFlag = true;
+
+   /**
+* Test serializer used in unit test
+*/
+   private StreamElementSerializer inStreamElementSerializerForTest;
+
+
+   public AsyncWaitOperator(AsyncFunction asyncFunction) {
+   super(asyncFunction);
+   chainingStrategy = ChainingStrategy.ALWAYS;
+   }
+
+   public void setBufferSize(int size) {
+   Preconditions.checkArgument(size > 0, "The number of concurrent 
async operation should be greater than 0.");
+   bufferSize = size;
+   }
+
+   public void setOutputMode(AsyncDataStream.OutputMode mode) {
+   this.mode = mode;
+   }
--- End diff --

Can't we give these information at creation time of the operator? Then we 
can mark these fields also final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and w

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683318#comment-15683318
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88858824
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
 ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Tests for {@link AsyncCollectorBuffer}. These test that:
+ *
+ * 
+ * Add a new item into the buffer
+ * Ordered mode processing
+ * Unordered mode processing
+ * Error handling
+ * 
+ */
+public class AsyncCollectorBufferTest {
+   private AsyncFunction function;
+
+   private AsyncWaitOperator operator;
+
+   private AsyncCollectorBuffer buffer;
+
+   private Output> output;
+
+   private TimestampedCollector collector;
+
+   private Object lock = new Object();
+
+   @Before
+   public void setUp() throws Exception {
+   function = new AsyncFunction() {
+   @Override
+   public void asyncInvoke(Integer input, 
AsyncCollector collector) throws Exception {
+
+   }
+   };
+
+   operator = new AsyncWaitOperator<>(function);
+   Class[] classes = 
AbstractStreamOperator.class.getDeclaredClasses();
+   Class latencyClass = null;
+   for (Class c : classes) {
+   if (c.getName().indexOf("LatencyGauge") != -1) {
+   latencyClass = c;
+   }
+   }
+
+   Constructor explicitConstructor = 
latencyClass.getDeclaredConstructors()[0];
+   explicitConstructor.setAccessible(true);
+   Whitebox.setInternalState(operator, "latencyGauge", 
explicitConstructor.newInstance(10));
+
+   output = new FakedOutput(new ArrayList());
+   collector =new TimestampedCollector(output);
+
+   Whitebox.setInternalState(operator, "output", output);
+   }
+
+   @Test
+   public void testAdd() throws Exception {
+   buffer =
+   new AsyncCollectorBuffer<>(3, 
AsyncDataStream.OutputMode.UNORDERED, output, collector, lock, operator);
+
+   buffer.addWatermark(new Watermark(0l));
+   buffer.addLatencyMarker(new LatencyMarker(111L, 1, 1));
+   Assert.assertEquals(buffer.getQueue().size(), 2);
+
+   Iterator, 
StreamElement>> iterator =
+   buffer.getQueue().entrySet().iterator();

[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683275#comment-15683275
 ] 

ASF GitHub Bot commented on FLINK-4391:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2629#discussion_r88676639
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link 
org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+   /**
+* A checkpointed source.
+*/
+   private static class SimpleSource implements SourceFunction, 
Checkpointed {
+   private static final long serialVersionUID = 1L;
+
+   private volatile boolean isRunning = true;
+   private int counter = 0;
+   private int start = 0;
+
+   @Override
+   public void restoreState(Integer state) throws Exception {
+   this.start = state;
+   }
+
+   @Override
+   public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+   return start;
+   }
+
+   public SimpleSource(int maxNum) {
+   this.counter = maxNum;
+   }
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   while (start < counter && isRunning) {
+   synchronized (ctx.getCheckpointLock()) {
+   ctx.collect(start);
+   ++start;
+   }
+   Thread.sleep(10);
+   }
+   }
+
+   @Override
+   public void cancel() {
+   isRunning = false;
+   }
+   }
+
+
+   public static void main(String[] args) throws Exception {
+
+   // obtain execution environment and set setBufferTimeout to 1 
to enable
+   // continuous flushing of the output buffers (lowest latency)
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
+   .setBufferTimeout(1);
+
+   // configurations for the job
+   String statePath = args[0];
+   String cpMode = args[1];
+   int maxCount = Integer.valueOf(args[2]);
+   final int s

  1   2   3   4   >