[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r88844164 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Project +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.ProjectableTableSource + +import scala.collection.JavaConverters._ + +class BatchTableProject( +cluster: RelOptCluster, +traits: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType) + extends Project(cluster, traits, input, projects, projectionRowType) + with DataSetRel { + + override def copy(traitSet: RelTraitSet, input: RelNode, +projects: util.List[RexNode], scanRawType: RelDataType): Project = { +new BatchTableProject(cluster, traitSet, input, projects, projectionRowType) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: BatchTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val projectableSource = getInput.getTable.unwrap(classOf[TableSourceTable]) + .tableSource.asInstanceOf[ProjectableTableSource[_]] + +val indexes = projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray +projectableSource.setProjection(indexes) + +getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) --- End diff -- You are right. But if the `setProjection` return a copy TableSource, we should translate TableSource to DataStream and convert to expected type in this method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15682842#comment-15682842 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r88844164 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Project +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.ProjectableTableSource + +import scala.collection.JavaConverters._ + +class BatchTableProject( +cluster: RelOptCluster, +traits: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType) + extends Project(cluster, traits, input, projects, projectionRowType) + with DataSetRel { + + override def copy(traitSet: RelTraitSet, input: RelNode, +projects: util.List[RexNode], scanRawType: RelDataType): Project = { +new BatchTableProject(cluster, traitSet, input, projects, projectionRowType) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: BatchTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val projectableSource = getInput.getTable.unwrap(classOf[TableSourceTable]) + .tableSource.asInstanceOf[ProjectableTableSource[_]] + +val indexes = projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray +projectableSource.setProjection(indexes) + +getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) --- End diff -- You are right. But if the `setProjection` return a copy TableSource, we should translate TableSource to DataStream and convert to expected type in this method. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5105) Improve ReduceState: value put into ReducingState should always be a copy
sunjincheng created FLINK-5105: -- Summary: Improve ReduceState: value put into ReducingState should always be a copy Key: FLINK-5105 URL: https://issues.apache.org/jira/browse/FLINK-5105 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: sunjincheng In case of overlapping sliding windows, multiple references are hold on the same object. If we modify value1 or value2 the results are incorrect. The value that is put into a ReducingState is always copied. That would allow to modify and emit one of the two input values (the one which is comes from the state). The FoldingState has the same problem. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5106) improving IncrementalAggregateReduceFunction
sunjincheng created FLINK-5106: -- Summary: improving IncrementalAggregateReduceFunction Key: FLINK-5106 URL: https://issues.apache.org/jira/browse/FLINK-5106 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: sunjincheng Assignee: sunjincheng Please refer to FLINK-4937. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r88847319 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.table.Row +import org.apache.flink.util.Preconditions + +/** + * For Incremental intermediate aggregate Rows, merge every row into aggregate buffer. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + */ +class IncrementalAggregateReduceFunction( +private val aggregates: Array[Aggregate[_]], +private val groupKeysMapping: Array[(Int, Int)], +private val intermediateRowArity: Int)extends ReduceFunction[Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + @transient var accumulatorRow:Row = _ + + /** +* For Incremental intermediate aggregate Rows, merge value1 and value2 +* into aggregate buffer, return aggregate buffer. +* +* @param value1 The first value to combined. +* @param value2 The second value to combined. +* @return The combined value of both input values. +* +*/ + override def reduce(value1: Row, value2: Row): Row = { + +if(null == accumulatorRow){ + accumulatorRow = new Row(intermediateRowArity) +} + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(accumulatorRow)) --- End diff -- Okay,I have created [FLINK-5105](https://issues.apache.org/jira/browse/FLINK-5105) and [FLINK-5106](https://issues.apache.org/jira/browse/FLINK-5106) . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15682887#comment-15682887 ] ASF GitHub Bot commented on FLINK-4937: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r88847319 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.table.Row +import org.apache.flink.util.Preconditions + +/** + * For Incremental intermediate aggregate Rows, merge every row into aggregate buffer. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + */ +class IncrementalAggregateReduceFunction( +private val aggregates: Array[Aggregate[_]], +private val groupKeysMapping: Array[(Int, Int)], +private val intermediateRowArity: Int)extends ReduceFunction[Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + @transient var accumulatorRow:Row = _ + + /** +* For Incremental intermediate aggregate Rows, merge value1 and value2 +* into aggregate buffer, return aggregate buffer. +* +* @param value1 The first value to combined. +* @param value2 The second value to combined. +* @return The combined value of both input values. +* +*/ + override def reduce(value1: Row, value2: Row): Row = { + +if(null == accumulatorRow){ + accumulatorRow = new Row(intermediateRowArity) +} + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(accumulatorRow)) --- End diff -- Okay,I have created [FLINK-5105](https://issues.apache.org/jira/browse/FLINK-5105) and [FLINK-5106](https://issues.apache.org/jira/browse/FLINK-5106) . > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4741) WebRuntimeMonitor does not shut down all of it's threads (EventLoopGroups) on exit.
[ https://issues.apache.org/jira/browse/FLINK-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Maier reassigned FLINK-4741: -- Assignee: Roman Maier > WebRuntimeMonitor does not shut down all of it's threads (EventLoopGroups) on > exit. > --- > > Key: FLINK-4741 > URL: https://issues.apache.org/jira/browse/FLINK-4741 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.1.2 >Reporter: Joseph Sims >Assignee: Roman Maier >Priority: Minor > > WebRuntimeMonitor does not shutdown correctly, causing the overall > application to hang on shutdown. It shuts down bootstrap.group > (EventLoopGroup) but not the bootstrap.childGroup (EventLoopGroup). > If WebRuntimeMonitor is not used (local.start-webserver=false), this problem > does not occur. > Class: WebRuntimeMonitor > method: stop() > Line: ~387 > Called: > bootstrap.group().shutdownGracefully() > Not called: > bootstrap.childGroup().shutdownGracefully() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r88849756 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Project +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.ProjectableTableSource + +import scala.collection.JavaConverters._ + +class BatchTableProject( +cluster: RelOptCluster, +traits: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType) + extends Project(cluster, traits, input, projects, projectionRowType) + with DataSetRel { + + override def copy(traitSet: RelTraitSet, input: RelNode, +projects: util.List[RexNode], scanRawType: RelDataType): Project = { +new BatchTableProject(cluster, traitSet, input, projects, projectionRowType) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: BatchTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val projectableSource = getInput.getTable.unwrap(classOf[TableSourceTable]) + .tableSource.asInstanceOf[ProjectableTableSource[_]] + +val indexes = projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray +projectableSource.setProjection(indexes) + +getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) --- End diff -- So it will turns into scan and will takes first step in logic plan, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15682921#comment-15682921 ] ASF GitHub Bot commented on FLINK-3848: --- Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r88849756 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Project +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.ProjectableTableSource + +import scala.collection.JavaConverters._ + +class BatchTableProject( +cluster: RelOptCluster, +traits: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType) + extends Project(cluster, traits, input, projects, projectionRowType) + with DataSetRel { + + override def copy(traitSet: RelTraitSet, input: RelNode, +projects: util.List[RexNode], scanRawType: RelDataType): Project = { +new BatchTableProject(cluster, traitSet, input, projects, projectionRowType) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: BatchTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val projectableSource = getInput.getTable.unwrap(classOf[TableSourceTable]) + .tableSource.asInstanceOf[ProjectableTableSource[_]] + +val indexes = projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray +projectableSource.setProjection(indexes) + +getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) --- End diff -- So it will turns into scan and will takes first step in logic plan, right? > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15682939#comment-15682939 ] Anton Mushin commented on FLINK-4832: - Hello everyone, I'm update implementation according to changes in FLINK-4263. Could somebody check [changes|https://github.com/apache/flink/compare/master...ex00:FLINK-4832] please, is correct idea ? > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5031) Consecutive DataStream.split() ignored
[ https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15682957#comment-15682957 ] Fabian Hueske commented on FLINK-5031: -- Hi [~RenkaiGe], thanks for looking into this issue! I'm not very familiar with the DataStream API internals, but I doubt, that a union of all consecutively applied selects is intended. I'd rather think it should be the intersection. [~aljoscha], what do you think? > Consecutive DataStream.split() ignored > -- > > Key: FLINK-5031 > URL: https://issues.apache.org/jira/browse/FLINK-5031 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.0, 1.1.3 >Reporter: Fabian Hueske >Assignee: Renkai Ge > Fix For: 1.2.0 > > > The output of the following program > {code} > static final class ThresholdSelector implements OutputSelector { > long threshold; > public ThresholdSelector(long threshold) { > this.threshold = threshold; > } > @Override > public Iterable select(Long value) { > if (value < threshold) { > return Collections.singletonList("Less"); > } else { > return Collections.singletonList("GreaterEqual"); > } > } > } > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > SplitStream split1 = env.generateSequence(1, 11) > .split(new ThresholdSelector(6)); > // stream11 should be [1,2,3,4,5] > DataStream stream11 = split1.select("Less"); > SplitStream split2 = stream11 > //.map(new MapFunction() { > //@Override > //public Long map(Long value) throws Exception { > //return value; > //} > //}) > .split(new ThresholdSelector(3)); > DataStream stream21 = split2.select("Less"); > // stream21 should be [1,2] > stream21.print(); > env.execute(); > } > {code} > should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second > {{split}} operation is ignored. > The program is correctly evaluate if the identity {{MapFunction}} is added to > the program. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683011#comment-15683011 ] ASF GitHub Bot commented on FLINK-4937: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r88856847 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.table.Row +import org.apache.flink.util.Preconditions + +/** + * For Incremental intermediate aggregate Rows, merge every row into aggregate buffer. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + */ +class IncrementalAggregateReduceFunction( +private val aggregates: Array[Aggregate[_]], +private val groupKeysMapping: Array[(Int, Int)], +private val intermediateRowArity: Int)extends ReduceFunction[Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + @transient var accumulatorRow:Row = _ + + /** +* For Incremental intermediate aggregate Rows, merge value1 and value2 +* into aggregate buffer, return aggregate buffer. +* +* @param value1 The first value to combined. +* @param value2 The second value to combined. +* @return The combined value of both input values. +* +*/ + override def reduce(value1: Row, value2: Row): Row = { + +if(null == accumulatorRow){ + accumulatorRow = new Row(intermediateRowArity) +} + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(accumulatorRow)) --- End diff -- @wuchong, yes this is a problem with the `HeapStateBackend`. The RocksDB backend does not suffer from this problem. I think in the long run we should migrate the `HeapStateBackend` to always keep data in serialised form, then we also won't have this problem anymore. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r88856847 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.table.Row +import org.apache.flink.util.Preconditions + +/** + * For Incremental intermediate aggregate Rows, merge every row into aggregate buffer. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + */ +class IncrementalAggregateReduceFunction( +private val aggregates: Array[Aggregate[_]], +private val groupKeysMapping: Array[(Int, Int)], +private val intermediateRowArity: Int)extends ReduceFunction[Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + @transient var accumulatorRow:Row = _ + + /** +* For Incremental intermediate aggregate Rows, merge value1 and value2 +* into aggregate buffer, return aggregate buffer. +* +* @param value1 The first value to combined. +* @param value2 The second value to combined. +* @return The combined value of both input values. +* +*/ + override def reduce(value1: Row, value2: Row): Row = { + +if(null == accumulatorRow){ + accumulatorRow = new Row(intermediateRowArity) +} + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(accumulatorRow)) --- End diff -- @wuchong, yes this is a problem with the `HeapStateBackend`. The RocksDB backend does not suffer from this problem. I think in the long run we should migrate the `HeapStateBackend` to always keep data in serialised form, then we also won't have this problem anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683017#comment-15683017 ] Fabian Hueske commented on FLINK-4832: -- The overall approach looks good, IMO. Can you open a pull request? Thanks, Fabian > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts
Stefan Richter created FLINK-5107: - Summary: Job Manager goes out of memory from long history of prior execution attempts Key: FLINK-5107 URL: https://issues.apache.org/jira/browse/FLINK-5107 Project: Flink Issue Type: Bug Components: JobManager Reporter: Stefan Richter Assignee: Stefan Richter We have observed that the job manager can run out of memory during long running jobs with many vertexes. Analysis of the heap dump shows, that the ever-growing history of prior execution attempts is the culprit for this problem. We should limit this history to a number of n most recent attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4905) Kafka test instability IllegalStateException: Client is not started
[ https://issues.apache.org/jira/browse/FLINK-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683063#comment-15683063 ] Andrew Efimov commented on FLINK-4905: -- I would suggest the following solution: - not only set null in finally block of {{Kafka08Fetcher}} {{this.zookeeperOffsetHandler = null;}}, also set flag {{volatile closed}} for {{ZookeeperOffsetHandler}}. Threads will check the flag before call methods {{ZookeeperOffsetHandler.setOffsetInZooKeeper}} or {{ZookeeperOffsetHandler.getOffsetFromZooKeeper}} - and create atomic thread counter for ZookeeperOffsetHandler and perform close only if counter = 0, with timeout of cause or use {{CheckpointLock}} that is in context Team, what do you think? > Kafka test instability IllegalStateException: Client is not started > --- > > Key: FLINK-4905 > URL: https://issues.apache.org/jira/browse/FLINK-4905 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger > Labels: test-stability > > The following travis build > (https://s3.amazonaws.com/archive.travis-ci.org/jobs/170365439/log.txt) > failed because of this error > {code} > 08:17:11,239 INFO org.apache.flink.runtime.jobmanager.JobManager >- Status of job 33ebdc0e7c91be186d80658ce3d17069 (Read some records to > commit offsets to Kafka) changed to FAILING. > java.lang.RuntimeException: Error while confirming checkpoint > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1040) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Client is not started > at > org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173) > at > org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:113) > at > org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148) > at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) > at > org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java:141) > at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:133) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(ZookeeperOffsetHandler.java:93) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.commitInternalOffsetsToKafka(Kafka08Fetcher.java:341) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:421) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:571) > at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1035) > ... 5 more > 08:17:11,241 INFO org.apache.flink.runtime.taskmanager.Task >- Attempting to cancel task Source: Custom Source -> Map -> Map -> Sink: > Unnamed (1/3) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r88861382 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.table.Row +import org.apache.flink.util.Preconditions + +/** + * For Incremental intermediate aggregate Rows, merge every row into aggregate buffer. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + */ +class IncrementalAggregateReduceFunction( +private val aggregates: Array[Aggregate[_]], +private val groupKeysMapping: Array[(Int, Int)], +private val intermediateRowArity: Int)extends ReduceFunction[Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + @transient var accumulatorRow:Row = _ + + /** +* For Incremental intermediate aggregate Rows, merge value1 and value2 +* into aggregate buffer, return aggregate buffer. +* +* @param value1 The first value to combined. +* @param value2 The second value to combined. +* @return The combined value of both input values. +* +*/ + override def reduce(value1: Row, value2: Row): Row = { + +if(null == accumulatorRow){ + accumulatorRow = new Row(intermediateRowArity) +} + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(accumulatorRow)) --- End diff -- I think a lot of people would use it for production use cases. (Maybe you're confusing this with the old distinction between `MemStateBackend` and `FsStateBackend`? Internally, both memory and file backend now use a `HeapStateBackend` but for the file backend the contents are checkpointed to a FileSystem.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r88861427 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.table.Row +import org.apache.flink.util.Preconditions + +/** + * For Incremental intermediate aggregate Rows, merge every row into aggregate buffer. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + */ +class IncrementalAggregateReduceFunction( +private val aggregates: Array[Aggregate[_]], +private val groupKeysMapping: Array[(Int, Int)], +private val intermediateRowArity: Int)extends ReduceFunction[Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + @transient var accumulatorRow:Row = _ + + /** +* For Incremental intermediate aggregate Rows, merge value1 and value2 +* into aggregate buffer, return aggregate buffer. +* +* @param value1 The first value to combined. +* @param value2 The second value to combined. +* @return The combined value of both input values. +* +*/ + override def reduce(value1: Row, value2: Row): Row = { + +if(null == accumulatorRow){ + accumulatorRow = new Row(intermediateRowArity) +} + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(accumulatorRow)) --- End diff -- @fhueske But you're right, we could enforce a copy there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683076#comment-15683076 ] ASF GitHub Bot commented on FLINK-4937: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r88861427 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.table.Row +import org.apache.flink.util.Preconditions + +/** + * For Incremental intermediate aggregate Rows, merge every row into aggregate buffer. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + */ +class IncrementalAggregateReduceFunction( +private val aggregates: Array[Aggregate[_]], +private val groupKeysMapping: Array[(Int, Int)], +private val intermediateRowArity: Int)extends ReduceFunction[Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + @transient var accumulatorRow:Row = _ + + /** +* For Incremental intermediate aggregate Rows, merge value1 and value2 +* into aggregate buffer, return aggregate buffer. +* +* @param value1 The first value to combined. +* @param value2 The second value to combined. +* @return The combined value of both input values. +* +*/ + override def reduce(value1: Row, value2: Row): Row = { + +if(null == accumulatorRow){ + accumulatorRow = new Row(intermediateRowArity) +} + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(accumulatorRow)) --- End diff -- @fhueske But you're right, we could enforce a copy there. > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4937) Add incremental group window aggregation for streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683074#comment-15683074 ] ASF GitHub Bot commented on FLINK-4937: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2792#discussion_r88861382 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.table.Row +import org.apache.flink.util.Preconditions + +/** + * For Incremental intermediate aggregate Rows, merge every row into aggregate buffer. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + */ +class IncrementalAggregateReduceFunction( +private val aggregates: Array[Aggregate[_]], +private val groupKeysMapping: Array[(Int, Int)], +private val intermediateRowArity: Int)extends ReduceFunction[Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + @transient var accumulatorRow:Row = _ + + /** +* For Incremental intermediate aggregate Rows, merge value1 and value2 +* into aggregate buffer, return aggregate buffer. +* +* @param value1 The first value to combined. +* @param value2 The second value to combined. +* @return The combined value of both input values. +* +*/ + override def reduce(value1: Row, value2: Row): Row = { + +if(null == accumulatorRow){ + accumulatorRow = new Row(intermediateRowArity) +} + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(accumulatorRow)) --- End diff -- I think a lot of people would use it for production use cases. (Maybe you're confusing this with the old distinction between `MemStateBackend` and `FsStateBackend`? Internally, both memory and file backend now use a `HeapStateBackend` but for the file backend the contents are checkpointed to a FileSystem.) > Add incremental group window aggregation for streaming Table API > > > Key: FLINK-4937 > URL: https://issues.apache.org/jira/browse/FLINK-4937 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Group-window aggregates for streaming tables are currently not done in an > incremental fashion. This means that the window collects all records and > performs the aggregation when the window is closed instead of eagerly > updating a partial aggregate for every added record. Since records are > buffered, non-incremental aggregation requires more storage space than > incremental aggregation. > The DataStream API which is used under the hood of the streaming Table API > features [incremental > aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation] > using a {{ReduceFunction}}. > We should add support for incremental aggregation in group-windows. > This is a follow-up task of FLINK-4691. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts
[ https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683080#comment-15683080 ] Chesnay Schepler commented on FLINK-5107: - another solution would be to incrementally archive the finished executions into files. > Job Manager goes out of memory from long history of prior execution attempts > > > Key: FLINK-5107 > URL: https://issues.apache.org/jira/browse/FLINK-5107 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Stefan Richter >Assignee: Stefan Richter > > We have observed that the job manager can run out of memory during long > running jobs with many vertexes. Analysis of the heap dump shows, that the > ever-growing history of prior execution attempts is the culprit for this > problem. > We should limit this history to a number of n most recent attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup
[ https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683082#comment-15683082 ] Maximilian Michels commented on FLINK-5081: --- I've had a second look. The issue is not that the configuration is not loaded. Moreover, your finding reveals at least two other issues with our per-job YARN implementation: 1. When executing in non-detached job submission mode, the "Client Shutdown Hook" shuts down the Yarn application in case of job failures (e.g. TaskManager dies). We should remove the shutdown hook. It should only be active during deployment. 2. The per-job Yarn application is supposed to automatically shut down the cluster after job completion. In case of failures (e.g. TaskManager dies) the shutdown apparently is performed as well although it shouldn't. > unable to set yarn.maximum-failed-containers with flink one-time YARN setup > --- > > Key: FLINK-5081 > URL: https://issues.apache.org/jira/browse/FLINK-5081 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.1.4 >Reporter: Nico Kruber > > When letting flink setup YARN for a one-time job, it apparently does not > deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the > {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as > https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn > suggested also does not work. > example: > {code:none} > flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 > -Dyarn.maximum-failed-containers=100 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup
[ https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned FLINK-5081: - Assignee: Maximilian Michels > unable to set yarn.maximum-failed-containers with flink one-time YARN setup > --- > > Key: FLINK-5081 > URL: https://issues.apache.org/jira/browse/FLINK-5081 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.1.4 >Reporter: Nico Kruber >Assignee: Maximilian Michels > > When letting flink setup YARN for a one-time job, it apparently does not > deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the > {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as > https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn > suggested also does not work. > example: > {code:none} > flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 > -Dyarn.maximum-failed-containers=100 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup
[ https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-5081: -- Affects Version/s: 1.2.0 > unable to set yarn.maximum-failed-containers with flink one-time YARN setup > --- > > Key: FLINK-5081 > URL: https://issues.apache.org/jira/browse/FLINK-5081 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.2.0, 1.1.4 >Reporter: Nico Kruber >Assignee: Maximilian Michels > Fix For: 1.2.0, 1.1.4 > > > When letting flink setup YARN for a one-time job, it apparently does not > deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the > {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as > https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn > suggested also does not work. > example: > {code:none} > flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 > -Dyarn.maximum-failed-containers=100 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup
[ https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-5081: -- Fix Version/s: 1.1.4 1.2.0 > unable to set yarn.maximum-failed-containers with flink one-time YARN setup > --- > > Key: FLINK-5081 > URL: https://issues.apache.org/jira/browse/FLINK-5081 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.2.0, 1.1.4 >Reporter: Nico Kruber >Assignee: Maximilian Michels > Fix For: 1.2.0, 1.1.4 > > > When letting flink setup YARN for a one-time job, it apparently does not > deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the > {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as > https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn > suggested also does not work. > example: > {code:none} > flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 > -Dyarn.maximum-failed-containers=100 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2837: [FLINK-5107] Introduced limit for prior execution ...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2837 [FLINK-5107] Introduced limit for prior execution attempt history This PR addresses the problem of JobManager going out of memory for a large history of prior execution attempts by pruning the history in FIFO fashion, only keeping a limited history size. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink limit-prior-executions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2837.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2837 commit f9053060fe4d396e9a8917ce8d422b0ebbcc044f Author: Stefan Richter Date: 2016-11-18T18:07:56Z [FLINK-5107] Introduced limit for prior execution attempt history --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts
[ https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683092#comment-15683092 ] ASF GitHub Bot commented on FLINK-5107: --- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2837 [FLINK-5107] Introduced limit for prior execution attempt history This PR addresses the problem of JobManager going out of memory for a large history of prior execution attempts by pruning the history in FIFO fashion, only keeping a limited history size. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink limit-prior-executions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2837.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2837 commit f9053060fe4d396e9a8917ce8d422b0ebbcc044f Author: Stefan Richter Date: 2016-11-18T18:07:56Z [FLINK-5107] Introduced limit for prior execution attempt history > Job Manager goes out of memory from long history of prior execution attempts > > > Key: FLINK-5107 > URL: https://issues.apache.org/jira/browse/FLINK-5107 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Stefan Richter >Assignee: Stefan Richter > > We have observed that the job manager can run out of memory during long > running jobs with many vertexes. Analysis of the heap dump shows, that the > ever-growing history of prior execution attempts is the culprit for this > problem. > We should limit this history to a number of n most recent attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts
[ https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683096#comment-15683096 ] Stefan Richter commented on FLINK-5107: --- agreed, it could be a next step to provide this alternative to pruning the history. > Job Manager goes out of memory from long history of prior execution attempts > > > Key: FLINK-5107 > URL: https://issues.apache.org/jira/browse/FLINK-5107 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Stefan Richter >Assignee: Stefan Richter > > We have observed that the job manager can run out of memory during long > running jobs with many vertexes. Analysis of the heap dump shows, that the > ever-growing history of prior execution attempts is the culprit for this > problem. > We should limit this history to a number of n most recent attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores
[ https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683099#comment-15683099 ] Maximilian Michels commented on FLINK-5071: --- Is this 1.1.3 or 1.2-SNAPSHOT? Could you post some more details? > YARN: yarn.containers.vcores config not respected when checking for vcores > -- > > Key: FLINK-5071 > URL: https://issues.apache.org/jira/browse/FLINK-5071 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Gyula Fora > Fix For: 1.1.3 > > > The YarnClient validates whether the number of task slots is less then the > max vcores settings of yarn but seems to ignore the yarn.containers.vcores > flink config which should be used instead of the slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2837: [FLINK-5107] Introduced limit for prior execution attempt...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2837 R @uce or anybody else interested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5107) Job Manager goes out of memory from long history of prior execution attempts
[ https://issues.apache.org/jira/browse/FLINK-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683101#comment-15683101 ] ASF GitHub Bot commented on FLINK-5107: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2837 R @uce or anybody else interested. > Job Manager goes out of memory from long history of prior execution attempts > > > Key: FLINK-5107 > URL: https://issues.apache.org/jira/browse/FLINK-5107 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Stefan Richter >Assignee: Stefan Richter > > We have observed that the job manager can run out of memory during long > running jobs with many vertexes. Analysis of the heap dump shows, that the > ever-growing history of prior execution attempts is the culprit for this > problem. > We should limit this history to a number of n most recent attempts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5108) Remove ClientShutdownHook during job execution
Maximilian Michels created FLINK-5108: - Summary: Remove ClientShutdownHook during job execution Key: FLINK-5108 URL: https://issues.apache.org/jira/browse/FLINK-5108 Project: Flink Issue Type: Bug Components: YARN Client Affects Versions: 1.1.3, 1.2.0 Reporter: Maximilian Michels The behavior of the Standalone mode is to not react to client interrupts once a job has been deployed. We should change the Yarn client implementation to behave the same. This avoids accidental shutdown of the job, e.g. when the user sends an interrupt via CTRL-C or when the client machine shuts down. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores
[ https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683135#comment-15683135 ] Gyula Fora commented on FLINK-5071: --- Both actually. The problem is here: https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L308 which only compares the allowed number of vcores (in yarn config) with the number of slots, assuming that requested vcores == numslots > YARN: yarn.containers.vcores config not respected when checking for vcores > -- > > Key: FLINK-5071 > URL: https://issues.apache.org/jira/browse/FLINK-5071 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Gyula Fora > Fix For: 1.1.3 > > > The YarnClient validates whether the number of task slots is less then the > max vcores settings of yarn but seems to ignore the yarn.containers.vcores > flink config which should be used instead of the slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup
[ https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683082#comment-15683082 ] Maximilian Michels edited comment on FLINK-5081 at 11/21/16 10:36 AM: -- I've had a second look. -The issue is not that the configuration is not loaded. Moreover, your finding reveals at least two other issues with our per-job YARN implementation:- -1. When executing in non-detached job submission mode, the "Client Shutdown Hook" shuts down the Yarn application in case of job failures (e.g. TaskManager dies). We should remove the shutdown hook. It should only be active during deployment.- -2. The per-job Yarn application is supposed to automatically shut down the cluster after job completion. In case of failures (e.g. TaskManager dies) the shutdown apparently is performed as well although it shouldn't.- edit: 1) is not an issue since it only shuts down when it reaches a terminal state. 2) Is an issue but unrelated to this issue The actual issue here is that the JobManager informs the client of the failed job and the client shuts down the cluster. We should differentiate between fatal and non-fatal failures in the client. was (Author: mxm): I've had a second look. The issue is not that the configuration is not loaded. Moreover, your finding reveals at least two other issues with our per-job YARN implementation: 1. When executing in non-detached job submission mode, the "Client Shutdown Hook" shuts down the Yarn application in case of job failures (e.g. TaskManager dies). We should remove the shutdown hook. It should only be active during deployment. 2. The per-job Yarn application is supposed to automatically shut down the cluster after job completion. In case of failures (e.g. TaskManager dies) the shutdown apparently is performed as well although it shouldn't. > unable to set yarn.maximum-failed-containers with flink one-time YARN setup > --- > > Key: FLINK-5081 > URL: https://issues.apache.org/jira/browse/FLINK-5081 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.2.0, 1.1.4 >Reporter: Nico Kruber >Assignee: Maximilian Michels > Fix For: 1.2.0, 1.1.4 > > > When letting flink setup YARN for a one-time job, it apparently does not > deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the > {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as > https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn > suggested also does not work. > example: > {code:none} > flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 > -Dyarn.maximum-failed-containers=100 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...
GitHub user gaborhermann opened a pull request: https://github.com/apache/flink/pull/2838 [FLINK-4712] [FLINK-4713] [ml] Ranking recommendation & evaluation (WIP) Please note that this is a work-in-progress PR for discussing API design decisions. We propose here a class hierarchy for fitting the ranking evaluations into the proposed evaluation framework (see [PR](https://github.com/apache/flink/pull/1849)). The features are mostly working, but documentation is missing and minor refactoring is needed. The evaluations currently work with top 100 rankings (burnt-in), and we still need to fix that. We need feedback for two main solutions, so we can go on with the PR. Thanks for any comment! ### `RankingPredictor` We have managed to rework the evaluation framework proposed by @thvasilo, so that ranking predictions would fit in. Our approach is to use separate `RankingPredictor` and `Predictor` traits. One main problem however remains: there is no common superclass for `RankingPredictor` and `Predictor` so the pipelining mechanism might not work. A `Predictor` can only be at the and of the pipeline, so this should not really be a problem, but I do not know for sure. An alternative solution would be to have different objects `ALS` and `RankingALS` that give different predictions, but both extends only a `Predictor`. There could be implicit conversions between the two. I would prefer the current solution if it does not break the pipelining. @thvasilo what do you think about this? (This seems to be a problem similar to having a `predict_proba` function in scikit learn classification models, where the same model for the same input gives two different predictions: a `predict` for discrete predictions and `predict_proba` for giving a probability.) ### Generalizing `EvalutateDataSetOperation` On the other hand, we seem to have solved the scoring issue. The users can evaluate a recommendation algorithm such as ALS by using a score operating on rankings (e.g. nDCG), or a score operating on ratings (e.g. RMSE). They only need to modify the `Score` they use in their code, and nothing else. The main problem was that the evaluate method and `EvaluateDataSetOperation` were not general enough. They prepare the evaluation to `(trueValue, predictedValue)` pairs (i.e. a `DataSet[(PredictionType, PredictionType)]`), while ranking evaluations needed a more general input with the true ratings (`DataSet[(Int,Int,Double)]`) and the predicted rankings (`DataSet[(Int,Int,Int)]`). Instead of using `EvaluateDataSetOperation` we use a more general `PrepareOperation`. We rename the `Score` in the original evaluation framework to `PairwiseScore`. `RankingScore` and `PairwiseScore` has a common trait `Score`. This way the user can use both a `RankingScore` and a `PairwiseScore` for a certain model, and only need to alter the score used in the code. In case of pairwise scores (that only need true and predicted value pairs for evaluation) `EvaluateDataSetOperation` is used as a `PrepareOperation`. It prepares the evaluation by creating `(trueValue, predicitedValue)` pairs from the test dataset. Thus, the result of preparing and the input of `PairwiseScore`s will be `DataSet[(PredictionType,PredictionType)]`. In case of rankings the `PrepareOperation` passes the test dataset and creates the rankings. The result of preparing and the input of `RankingScore`s will be `(DataSet[Int,Int,Double], DataSet[Int,Int,Int])`. I believe this is a fairly acceptable solution that avoids breaking the API. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gaborhermann/flink ranking-rec-eval Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2838.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2838 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS
[ https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683167#comment-15683167 ] ASF GitHub Bot commented on FLINK-4712: --- GitHub user gaborhermann opened a pull request: https://github.com/apache/flink/pull/2838 [FLINK-4712] [FLINK-4713] [ml] Ranking recommendation & evaluation (WIP) Please note that this is a work-in-progress PR for discussing API design decisions. We propose here a class hierarchy for fitting the ranking evaluations into the proposed evaluation framework (see [PR](https://github.com/apache/flink/pull/1849)). The features are mostly working, but documentation is missing and minor refactoring is needed. The evaluations currently work with top 100 rankings (burnt-in), and we still need to fix that. We need feedback for two main solutions, so we can go on with the PR. Thanks for any comment! ### `RankingPredictor` We have managed to rework the evaluation framework proposed by @thvasilo, so that ranking predictions would fit in. Our approach is to use separate `RankingPredictor` and `Predictor` traits. One main problem however remains: there is no common superclass for `RankingPredictor` and `Predictor` so the pipelining mechanism might not work. A `Predictor` can only be at the and of the pipeline, so this should not really be a problem, but I do not know for sure. An alternative solution would be to have different objects `ALS` and `RankingALS` that give different predictions, but both extends only a `Predictor`. There could be implicit conversions between the two. I would prefer the current solution if it does not break the pipelining. @thvasilo what do you think about this? (This seems to be a problem similar to having a `predict_proba` function in scikit learn classification models, where the same model for the same input gives two different predictions: a `predict` for discrete predictions and `predict_proba` for giving a probability.) ### Generalizing `EvalutateDataSetOperation` On the other hand, we seem to have solved the scoring issue. The users can evaluate a recommendation algorithm such as ALS by using a score operating on rankings (e.g. nDCG), or a score operating on ratings (e.g. RMSE). They only need to modify the `Score` they use in their code, and nothing else. The main problem was that the evaluate method and `EvaluateDataSetOperation` were not general enough. They prepare the evaluation to `(trueValue, predictedValue)` pairs (i.e. a `DataSet[(PredictionType, PredictionType)]`), while ranking evaluations needed a more general input with the true ratings (`DataSet[(Int,Int,Double)]`) and the predicted rankings (`DataSet[(Int,Int,Int)]`). Instead of using `EvaluateDataSetOperation` we use a more general `PrepareOperation`. We rename the `Score` in the original evaluation framework to `PairwiseScore`. `RankingScore` and `PairwiseScore` has a common trait `Score`. This way the user can use both a `RankingScore` and a `PairwiseScore` for a certain model, and only need to alter the score used in the code. In case of pairwise scores (that only need true and predicted value pairs for evaluation) `EvaluateDataSetOperation` is used as a `PrepareOperation`. It prepares the evaluation by creating `(trueValue, predicitedValue)` pairs from the test dataset. Thus, the result of preparing and the input of `PairwiseScore`s will be `DataSet[(PredictionType,PredictionType)]`. In case of rankings the `PrepareOperation` passes the test dataset and creates the rankings. The result of preparing and the input of `RankingScore`s will be `(DataSet[Int,Int,Double], DataSet[Int,Int,Int])`. I believe this is a fairly acceptable solution that avoids breaking the API. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gaborhermann/flink ranking-rec-eval Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2838.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2838 > Implementing ranking predictions for ALS > > > Key: FLINK-4712 > URL: https://issues.apache.org/jira/browse/FLINK-4712 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Domokos Miklós Kelen >Assignee: Gábor Hermann > > We started working on implementing ranking predictions for recommender > systems. Ranking prediction means that beside predicting scores for user-item > pairs, the recommender system is able to recommend a top K list for the users. > Details: > In practice, this would mean find
[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...
Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r88868369 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala --- @@ -18,12 +18,37 @@ package org.apache.flink.ml.evaluation +import org.apache.flink.api.common.operators.Order import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.ml._ +import org.apache.flink.ml.pipeline._ +import org.apache.flink.ml.RichNumericDataSet +import org.apache.flink.util.Collector import scala.reflect.ClassTag +trait Score[ --- End diff -- `Score` is the common trait for `RankingScore` and `PairwiseScore`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS
[ https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683173#comment-15683173 ] ASF GitHub Bot commented on FLINK-4712: --- Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r88868369 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala --- @@ -18,12 +18,37 @@ package org.apache.flink.ml.evaluation +import org.apache.flink.api.common.operators.Order import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.ml._ +import org.apache.flink.ml.pipeline._ +import org.apache.flink.ml.RichNumericDataSet +import org.apache.flink.util.Collector import scala.reflect.ClassTag +trait Score[ --- End diff -- `Score` is the common trait for `RankingScore` and `PairwiseScore`. > Implementing ranking predictions for ALS > > > Key: FLINK-4712 > URL: https://issues.apache.org/jira/browse/FLINK-4712 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Domokos Miklós Kelen >Assignee: Gábor Hermann > > We started working on implementing ranking predictions for recommender > systems. Ranking prediction means that beside predicting scores for user-item > pairs, the recommender system is able to recommend a top K list for the users. > Details: > In practice, this would mean finding the K items for a particular user with > the highest predicted rating. It should be possible also to specify whether > to exclude the already seen items from a particular user's toplist. (See for > example the 'exclude_known' setting of [Graphlab Create's ranking > factorization > recommender|https://turi.com/products/create/docs/generated/graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend.html#graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend] > ). > The output of the topK recommendation function could be in the form of > {{DataSet[(Int,Int,Int)]}}, meaning (user, item, rank), similar to Graphlab > Create's output. However, this is arguable: follow up work includes > implementing ranking recommendation evaluation metrics (such as precision@k, > recall@k, ndcg@k), similar to [Spark's > implementations|https://spark.apache.org/docs/1.5.0/mllib-evaluation-metrics.html#ranking-systems]. > It would be beneficial if we were able to design the API such that it could > be included in the proposed evaluation framework (see > [5157|https://issues.apache.org/jira/browse/FLINK-2157]), which makes it > neccessary to consider the possible output type {{DataSet[(Int, > Array[Int])]}} or {{DataSet[(Int, Array[(Int,Double)])]}} meaning (user, > array of items), possibly including the predicted scores as well. See > [4713|https://issues.apache.org/jira/browse/FLINK-4713] for details. > Another question arising is whether to provide this function as a member of > the ALS class, as a switch-kind of parameter to the ALS implementation > (meaning the model is either a rating or a ranking recommender model) or in > some other way. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...
Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r88868762 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala --- @@ -267,6 +401,21 @@ trait PredictOperation[Instance, Model, Testing, Prediction] extends Serializabl def predict(value: Testing, model: Model): Prediction } +/** + * Operation for preparing a testing [[DataSet]] for evaluation. + * + * The most commonly [[EvaluateDataSetOperation]] is used, but evaluation of + * ranking recommendations need input in a different form. + */ +trait PrepareOperation[Instance, Testing, Prepared] extends Serializable { --- End diff -- `PrepareOperation` is the common trait for `EvaluateDataSetOperation` and preparing ranking evaluation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS
[ https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683179#comment-15683179 ] ASF GitHub Bot commented on FLINK-4712: --- Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r88868762 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala --- @@ -267,6 +401,21 @@ trait PredictOperation[Instance, Model, Testing, Prediction] extends Serializabl def predict(value: Testing, model: Model): Prediction } +/** + * Operation for preparing a testing [[DataSet]] for evaluation. + * + * The most commonly [[EvaluateDataSetOperation]] is used, but evaluation of + * ranking recommendations need input in a different form. + */ +trait PrepareOperation[Instance, Testing, Prepared] extends Serializable { --- End diff -- `PrepareOperation` is the common trait for `EvaluateDataSetOperation` and preparing ranking evaluation. > Implementing ranking predictions for ALS > > > Key: FLINK-4712 > URL: https://issues.apache.org/jira/browse/FLINK-4712 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Domokos Miklós Kelen >Assignee: Gábor Hermann > > We started working on implementing ranking predictions for recommender > systems. Ranking prediction means that beside predicting scores for user-item > pairs, the recommender system is able to recommend a top K list for the users. > Details: > In practice, this would mean finding the K items for a particular user with > the highest predicted rating. It should be possible also to specify whether > to exclude the already seen items from a particular user's toplist. (See for > example the 'exclude_known' setting of [Graphlab Create's ranking > factorization > recommender|https://turi.com/products/create/docs/generated/graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend.html#graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend] > ). > The output of the topK recommendation function could be in the form of > {{DataSet[(Int,Int,Int)]}}, meaning (user, item, rank), similar to Graphlab > Create's output. However, this is arguable: follow up work includes > implementing ranking recommendation evaluation metrics (such as precision@k, > recall@k, ndcg@k), similar to [Spark's > implementations|https://spark.apache.org/docs/1.5.0/mllib-evaluation-metrics.html#ranking-systems]. > It would be beneficial if we were able to design the API such that it could > be included in the proposed evaluation framework (see > [5157|https://issues.apache.org/jira/browse/FLINK-2157]), which makes it > neccessary to consider the possible output type {{DataSet[(Int, > Array[Int])]}} or {{DataSet[(Int, Array[(Int,Double)])]}} meaning (user, > array of items), possibly including the predicted scores as well. See > [4713|https://issues.apache.org/jira/browse/FLINK-4713] for details. > Another question arising is whether to provide this function as a member of > the ALS class, as a switch-kind of parameter to the ALS implementation > (meaning the model is either a rating or a ranking recommender model) or in > some other way. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2646) Rich functions should provide a method "closeAfterFailure()"
[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683181#comment-15683181 ] Stephan Ewen commented on FLINK-2646: - Strictly speaking, this is API breaking, because we add a method to a public interface. We assume it does not really break, because no one implements the interface directly, but always goes through the {{AbstractRichFunction}}, from which all the rich function variants inherit. So, We would need to add it to {{AbstractRichFunction}} and define a rule exclusion for the {{apicmp}} plugin. > Rich functions should provide a method "closeAfterFailure()" > > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Liang Chen > Fix For: 1.0.0 > > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r88869401 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Project +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.ProjectableTableSource + +import scala.collection.JavaConverters._ + +class BatchTableProject( +cluster: RelOptCluster, +traits: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType) + extends Project(cluster, traits, input, projects, projectionRowType) + with DataSetRel { + + override def copy(traitSet: RelTraitSet, input: RelNode, +projects: util.List[RexNode], scanRawType: RelDataType): Project = { +new BatchTableProject(cluster, traitSet, input, projects, projectionRowType) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: BatchTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val projectableSource = getInput.getTable.unwrap(classOf[TableSourceTable]) + .tableSource.asInstanceOf[ProjectableTableSource[_]] + +val indexes = projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray +projectableSource.setProjection(indexes) + +getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) --- End diff -- Yes, it will turn into a scan with projection and will skip the `super. translateToPlan `. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683185#comment-15683185 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r88869401 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableProject.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Project +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.ProjectableTableSource + +import scala.collection.JavaConverters._ + +class BatchTableProject( +cluster: RelOptCluster, +traits: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType) + extends Project(cluster, traits, input, projects, projectionRowType) + with DataSetRel { + + override def copy(traitSet: RelTraitSet, input: RelNode, +projects: util.List[RexNode], scanRawType: RelDataType): Project = { +new BatchTableProject(cluster, traitSet, input, projects, projectionRowType) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: BatchTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val projectableSource = getInput.getTable.unwrap(classOf[TableSourceTable]) + .tableSource.asInstanceOf[ProjectableTableSource[_]] + +val indexes = projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray +projectableSource.setProjection(indexes) + +getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) --- End diff -- Yes, it will turn into a scan with projection and will skip the `super. translateToPlan `. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5097) The TypeExtractor is missing input type information in some Graph methods
[ https://issues.apache.org/jira/browse/FLINK-5097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri updated FLINK-5097: - Description: The TypeExtractor is called without information about the input type in {{mapVertices}} and {{mapEdges}} although this information can be easily retrieved. (was: The TypeExtractor is called without information about the input type in {{mapVertices}}, {{mapVEdges}}, and {{fromDataSet}}, although this information can be easily retrieved.) > The TypeExtractor is missing input type information in some Graph methods > - > > Key: FLINK-5097 > URL: https://issues.apache.org/jira/browse/FLINK-5097 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > The TypeExtractor is called without information about the input type in > {{mapVertices}} and {{mapEdges}} although this information can be easily > retrieved. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup
[ https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683082#comment-15683082 ] Maximilian Michels edited comment on FLINK-5081 at 11/21/16 11:23 AM: -- I've had a second look. -The issue is not that the configuration is not loaded. Moreover, your finding reveals at least two other issues with our per-job YARN implementation:- -1. When executing in non-detached job submission mode, the "Client Shutdown Hook" shuts down the Yarn application in case of job failures (e.g. TaskManager dies). We should remove the shutdown hook. It should only be active during deployment.- -2. The per-job Yarn application is supposed to automatically shut down the cluster after job completion. In case of failures (e.g. TaskManager dies) the shutdown apparently is performed as well although it shouldn't.- edit: 1) is not an issue since it only shuts down when it reaches a terminal state. 2) Is an issue but unrelated to this issue -The actual issue here is that the JobManager informs the client of the failed job and the client shuts down the cluster. We should differentiate between fatal and non-fatal failures in the client.- edit2: Not an issue either :) Probably you forgot to configure the restart strategy. {noformat} # defaults to none restart-strategy: fixed-delay # defaults to 1 restart-strategy.fixed-delay.attempts: 10 {noformat} With that in place, we can kill TaskManagers and resume job execution after a container restart without problems. was (Author: mxm): I've had a second look. -The issue is not that the configuration is not loaded. Moreover, your finding reveals at least two other issues with our per-job YARN implementation:- -1. When executing in non-detached job submission mode, the "Client Shutdown Hook" shuts down the Yarn application in case of job failures (e.g. TaskManager dies). We should remove the shutdown hook. It should only be active during deployment.- -2. The per-job Yarn application is supposed to automatically shut down the cluster after job completion. In case of failures (e.g. TaskManager dies) the shutdown apparently is performed as well although it shouldn't.- edit: 1) is not an issue since it only shuts down when it reaches a terminal state. 2) Is an issue but unrelated to this issue The actual issue here is that the JobManager informs the client of the failed job and the client shuts down the cluster. We should differentiate between fatal and non-fatal failures in the client. > unable to set yarn.maximum-failed-containers with flink one-time YARN setup > --- > > Key: FLINK-5081 > URL: https://issues.apache.org/jira/browse/FLINK-5081 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.2.0, 1.1.4 >Reporter: Nico Kruber >Assignee: Maximilian Michels > Fix For: 1.2.0, 1.1.4 > > > When letting flink setup YARN for a one-time job, it apparently does not > deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the > {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as > https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn > suggested also does not work. > example: > {code:none} > flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 > -Dyarn.maximum-failed-containers=100 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup
[ https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-5081: -- Fix Version/s: (was: 1.1.4) (was: 1.2.0) > unable to set yarn.maximum-failed-containers with flink one-time YARN setup > --- > > Key: FLINK-5081 > URL: https://issues.apache.org/jira/browse/FLINK-5081 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.2.0, 1.1.4 >Reporter: Nico Kruber >Assignee: Maximilian Michels > > When letting flink setup YARN for a one-time job, it apparently does not > deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the > {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as > https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn > suggested also does not work. > example: > {code:none} > flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 > -Dyarn.maximum-failed-containers=100 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5081) unable to set yarn.maximum-failed-containers with flink one-time YARN setup
[ https://issues.apache.org/jira/browse/FLINK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-5081. --- Resolution: Not A Problem Resolving this issue but feel free to re-open in case I missed anything. > unable to set yarn.maximum-failed-containers with flink one-time YARN setup > --- > > Key: FLINK-5081 > URL: https://issues.apache.org/jira/browse/FLINK-5081 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.2.0, 1.1.4 >Reporter: Nico Kruber >Assignee: Maximilian Michels > Fix For: 1.2.0, 1.1.4 > > > When letting flink setup YARN for a one-time job, it apparently does not > deliver the {{yarn.maximum-failed-containers}} parameter to YARN as the > {{yarn-session.sh}} script does. Adding it to conf/flink-conf.yaml as > https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn > suggested also does not work. > example: > {code:none} > flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 .jar --parallelism 3 > -Dyarn.maximum-failed-containers=100 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores
[ https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683269#comment-15683269 ] Maximilian Michels commented on FLINK-5071: --- Thanks! This has changed recently and only 1.1-SNAPSHOT and 1.2-SNAPSHOT are affected. > YARN: yarn.containers.vcores config not respected when checking for vcores > -- > > Key: FLINK-5071 > URL: https://issues.apache.org/jira/browse/FLINK-5071 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.2.0, 1.1.4 >Reporter: Gyula Fora > Fix For: 1.2.0, 1.1.4 > > > The YarnClient validates whether the number of task slots is less then the > max vcores settings of yarn but seems to ignore the yarn.containers.vcores > flink config which should be used instead of the slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores
[ https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-5071: -- Affects Version/s: 1.1.4 1.2.0 > YARN: yarn.containers.vcores config not respected when checking for vcores > -- > > Key: FLINK-5071 > URL: https://issues.apache.org/jira/browse/FLINK-5071 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.2.0, 1.1.4 >Reporter: Gyula Fora > Fix For: 1.2.0, 1.1.4 > > > The YarnClient validates whether the number of task slots is less then the > max vcores settings of yarn but seems to ignore the yarn.containers.vcores > flink config which should be used instead of the slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores
[ https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned FLINK-5071: - Assignee: Maximilian Michels > YARN: yarn.containers.vcores config not respected when checking for vcores > -- > > Key: FLINK-5071 > URL: https://issues.apache.org/jira/browse/FLINK-5071 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.2.0, 1.1.4 >Reporter: Gyula Fora >Assignee: Maximilian Michels > Fix For: 1.2.0, 1.1.4 > > > The YarnClient validates whether the number of task slots is less then the > max vcores settings of yarn but seems to ignore the yarn.containers.vcores > flink config which should be used instead of the slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores
[ https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-5071: -- Fix Version/s: (was: 1.1.3) 1.1.4 1.2.0 > YARN: yarn.containers.vcores config not respected when checking for vcores > -- > > Key: FLINK-5071 > URL: https://issues.apache.org/jira/browse/FLINK-5071 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.2.0, 1.1.4 >Reporter: Gyula Fora > Fix For: 1.2.0, 1.1.4 > > > The YarnClient validates whether the number of task slots is less then the > max vcores settings of yarn but seems to ignore the yarn.containers.vcores > flink config which should be used instead of the slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88700937 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; + this.mode = mode; --- End diff -- `mode` could also be `null`. I think it's easier to follow the following pattern: ``` this.mode = Preconditions.checkNot
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88676639 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public void restoreState(Integer state) throws Exception { + this.start = state; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return start; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + } + Thread.sleep(10); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + + public static void main(String[] args) throws Exception { + + // obtain execution environment and set setBufferTimeout to 1 to enable + // continuous flushing of the output buffers (lowest latency) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() + .setBufferTimeout(1); + + // configurations for the job + String statePath = args[0]; + String cpMode = args[1]; + int maxCount = Integer.valueOf(args[2]); + final int sleepFactor = Integer.valueOf(args[3]); + final float failRatio = Float.valueOf(args[4]); + String mode = args[5]; + int taskNum = Integer.valueOf(args[6]); + String timeType = args[7]; + +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88678544 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; + +/** + * A helper class to apply {@link AsyncFunction} to a data stream. + * + * {@code + * DataStream input = ... + * AsyncFunction> asyncFunc = ... + * + * AsyncDataStream.orderedWait(input, asyncFunc, 100); + * } + * + */ +public class AsyncDataStream { + public enum OutputMode { ORDERED, UNORDERED } + + private static final int DEFAULT_BUFFER_SIZE = 100; + + private static SingleOutputStreamOperator addOperator( + DataStream in, + AsyncFunction func, + int bufSize, + OutputMode mode) { + TypeInformation outTypeInfo = + TypeExtractor.getUnaryOperatorReturnType((Function) func, AsyncFunction.class, false, + true, in.getType(), Utils.getCallLocationName(), true); + + // create transform + AsyncWaitOperator operator = new AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func)); + operator.setBufferSize(bufSize); + operator.setOutputMode(mode); + + OneInputTransformation resultTransform = new OneInputTransformation<>( + in.getTransformation(), + "async wait operator", + operator, + outTypeInfo, + in.getExecutionEnvironment().getParallelism()); + + SingleOutputStreamOperator returnStream = + new SingleOutputStreamOperator<>(in.getExecutionEnvironment(), resultTransform); + + returnStream.getExecutionEnvironment().addOperator(resultTransform); + + return returnStream; --- End diff -- Can't we replace everything from `resultTransform` via ``` return in.transform( "async wait operator", outTypeInfo, operator); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88676017 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, ListCheckpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public List snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(start); + } + + @Override + public void restoreState(List state) throws Exception { + for (Integer i : state) + this.start = i; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { --- End diff -- Can we set the `counter` to `-1` to indicate an infinite input stream? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88679834 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; + +/** + * Rich variant of the {@link AsyncFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + * + * @param The type of the input elements. + * @param The type of the returned elements. + */ +public abstract class RichAsyncFunction extends AbstractRichFunction --- End diff -- I'm not sure whether we should offer a rich variant of `AsyncFunction`, because any access to state from a future callback will not be scoped to the correct key. I'm quite sure that people will run into this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88680106 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; + +import java.io.Serializable; + +/** + * A function to trigger Async I/O operation. + * + * For each #asyncInvoke, an async io operation can be triggered, and once it has been done, + * the result can be collected by calling {@link AsyncCollector#collect}. For each async + * operations, their contexts are buffered in the operator immediately after invoking + * #asyncInvoke, leading to no blocking for each stream input as long as internal buffer is not full. + * + * {@link AsyncCollector} can be passed into callbacks or futures provided by async client to + * fetch result data. Any error can also be propagate to the operator by {@link AsyncCollector#collect(Throwable)}. + * + * + * Typical usage for callback: + * {@code + * public class HBaseAsyncFunc implements AsyncFunction { + * @Override + * public void asyncInvoke(String row, AsyncCollector collector) throws Exception { + * HBaseCallback cb = new HBaseCallback(collector); + * Get get = new Get(Bytes.toBytes(row)); + * hbase.asyncGet(get, cb); + * } + * } + * } + * + * + * + * Typical usage for {@link com.google.common.util.concurrent.ListenableFuture} + * {@code + * public class HBaseAsyncFunc implements AsyncFunction { + * @Override + * public void asyncInvoke(String row, final AsyncCollector collector) throws Exception { + * Get get = new Get(Bytes.toBytes(row)); + * ListenableFuture future = hbase.asyncGet(get); + * Futures.addCallback(future, new FutureCallback() { + * public void onSuccess(Result result) { + * List ret = process(result); + * collector.collect(ret); + * } + * public void onFailure(Throwable thrown) { + * collector.collect(thrown); + * } + * }); + * } + * } + * } + * + * + * @param The type of the input elements. + * @param The type of the returned elements. + */ +public interface AsyncFunction extends Function, Serializable { --- End diff -- `PublicEvolving` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88707089 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; + this.mode = mode; + this.output = output; + this.timestampedCollector = collector; + this.operator = operator; + this.lock =
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88681032 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** --- End diff -- line break missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88707393 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; + this.mode = mode; + this.output = output; + this.timestampedCollector = collector; + this.operator = operator; + this.lock =
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88680162 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; + +/** + * A helper class to apply {@link AsyncFunction} to a data stream. + * + * {@code + * DataStream input = ... + * AsyncFunction> asyncFunc = ... + * + * AsyncDataStream.orderedWait(input, asyncFunc, 100); + * } + * + */ +public class AsyncDataStream { --- End diff -- `PublicEvolving` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88707993 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; + this.mode = mode; + this.output = output; + this.timestampedCollector = collector; + this.operator = operator; + this.lock =
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88701658 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; + this.mode = mode; + this.output = output; + this.timestampedCollector = collector; + this.operator = operator; + this.lock =
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88680325 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.List; + +/** + * {@link AsyncCollector} collects data / error in user codes while processing async i/o. + * + * @param Input type + * @param Output type + */ +@Internal +public class AsyncCollector { + private List result; + private Throwable error; + + private boolean isDone = false; + + private final AsyncCollectorBuffer buffer; + + public AsyncCollector(AsyncCollectorBuffer buffer) { + Preconditions.checkNotNull(buffer, "Reference to AsyncCollectorBuffer should not be null"); + + this.buffer = buffer; + } + + public AsyncCollector(AsyncCollectorBuffer buffer, boolean isDone) { + this(buffer); + this.isDone = isDone; --- End diff -- An `AsyncCollector` should never be done when being created, shouldn't it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88675892 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public void restoreState(Integer state) throws Exception { + this.start = state; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return start; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + } + Thread.sleep(10); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + + public static void main(String[] args) throws Exception { + + // obtain execution environment and set setBufferTimeout to 1 to enable + // continuous flushing of the output buffers (lowest latency) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() + .setBufferTimeout(1); + + // configurations for the job + String statePath = args[0]; + String cpMode = args[1]; + int maxCount = Integer.valueOf(args[2]); + final int sleepFactor = Integer.valueOf(args[3]); + final float failRatio = Float.valueOf(args[4]); + String mode = args[5]; + int taskNum = Integer.valueOf(args[6]); + String timeType = args[7]; + +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88712552 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; + this.mode = mode; + this.output = output; + this.timestampedCollector = collector; + this.operator = operator; + this.lock =
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88676202 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public void restoreState(Integer state) throws Exception { + this.start = state; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return start; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + } + Thread.sleep(10); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + + public static void main(String[] args) throws Exception { + + // obtain execution environment and set setBufferTimeout to 1 to enable + // continuous flushing of the output buffers (lowest latency) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() + .setBufferTimeout(1); + + // configurations for the job + String statePath = args[0]; + String cpMode = args[1]; + int maxCount = Integer.valueOf(args[2]); + final int sleepFactor = Integer.valueOf(args[3]); + final float failRatio = Float.valueOf(args[4]); + String mode = args[5]; + int taskNum = Integer.valueOf(args[6]); + String timeType = args[7]; + +
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88676904 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, ListCheckpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public List snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(start); + } + + @Override + public void restoreState(List state) throws Exception { + for (Integer i : state) + this.start = i; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + } + Thread.sleep(10); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + private static void printUsage() { + System.out.println("To customize example, use: AsyncIOExample [--fsStatePath ] " + + "[--checkpointMode ] [--maxCount ] " + + "[--sleepFactor ] [--failRatio ] " + + "[--waitMode ] [--waitOperatorParallelism ] " + + "[--eventType ]"); + } + + public static void main(String[] args) throws Exception { + + // obtain execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + printUsage(); + + // parse parameters
[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class
Github user vasia commented on the issue: https://github.com/apache/flink/pull/2564 Thank @mushketyk. @greghogan are you shepherding this PR or shall I? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683278#comment-15683278 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88681032 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** --- End diff -- line break missing > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683277#comment-15683277 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88680106 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; + +import java.io.Serializable; + +/** + * A function to trigger Async I/O operation. + * + * For each #asyncInvoke, an async io operation can be triggered, and once it has been done, + * the result can be collected by calling {@link AsyncCollector#collect}. For each async + * operations, their contexts are buffered in the operator immediately after invoking + * #asyncInvoke, leading to no blocking for each stream input as long as internal buffer is not full. + * + * {@link AsyncCollector} can be passed into callbacks or futures provided by async client to + * fetch result data. Any error can also be propagate to the operator by {@link AsyncCollector#collect(Throwable)}. + * + * + * Typical usage for callback: + * {@code + * public class HBaseAsyncFunc implements AsyncFunction { + * @Override + * public void asyncInvoke(String row, AsyncCollector collector) throws Exception { + * HBaseCallback cb = new HBaseCallback(collector); + * Get get = new Get(Bytes.toBytes(row)); + * hbase.asyncGet(get, cb); + * } + * } + * } + * + * + * + * Typical usage for {@link com.google.common.util.concurrent.ListenableFuture} + * {@code + * public class HBaseAsyncFunc implements AsyncFunction { + * @Override + * public void asyncInvoke(String row, final AsyncCollector collector) throws Exception { + * Get get = new Get(Bytes.toBytes(row)); + * ListenableFuture future = hbase.asyncGet(get); + * Futures.addCallback(future, new FutureCallback() { + * public void onSuccess(Result result) { + * List ret = process(result); + * collector.collect(ret); + * } + * public void onFailure(Throwable thrown) { + * collector.collect(thrown); + * } + * }); + * } + * } + * } + * + * + * @param The type of the input elements. + * @param The type of the returned elements. + */ +public interface AsyncFunction extends Function, Serializable { --- End diff -- `PublicEvolving` > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683280#comment-15683280 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88680681 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollector.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; --- End diff -- Maybe the `AsyncCollectorBuffer` belongs into the `functions` package as the `AsyncFunction`. > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88714777 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; + this.mode = mode; + this.output = output; + this.timestampedCollector = collector; + this.operator = operator; + this.lock =
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683284#comment-15683284 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88701332 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NU
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88707416 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; + this.mode = mode; + this.output = output; + this.timestampedCollector = collector; + this.operator = operator; + this.lock =
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683323#comment-15683323 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88859015 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Tests for {@link AsyncCollectorBuffer}. These test that: + * + * + * Add a new item into the buffer + * Ordered mode processing + * Unordered mode processing + * Error handling + * + */ +public class AsyncCollectorBufferTest { + private AsyncFunction function; + + private AsyncWaitOperator operator; + + private AsyncCollectorBuffer buffer; + + private Output> output; + + private TimestampedCollector collector; + + private Object lock = new Object(); + + @Before + public void setUp() throws Exception { + function = new AsyncFunction() { + @Override + public void asyncInvoke(Integer input, AsyncCollector collector) throws Exception { + + } + }; + + operator = new AsyncWaitOperator<>(function); + Class[] classes = AbstractStreamOperator.class.getDeclaredClasses(); + Class latencyClass = null; + for (Class c : classes) { + if (c.getName().indexOf("LatencyGauge") != -1) { + latencyClass = c; + } + } + + Constructor explicitConstructor = latencyClass.getDeclaredConstructors()[0]; + explicitConstructor.setAccessible(true); + Whitebox.setInternalState(operator, "latencyGauge", explicitConstructor.newInstance(10)); + + output = new FakedOutput(new ArrayList()); + collector =new TimestampedCollector(output); + + Whitebox.setInternalState(operator, "output", output); + } + + @Test + public void testAdd() throws Exception { + buffer = + new AsyncCollectorBuffer<>(3, AsyncDataStream.OutputMode.UNORDERED, output, collector, lock, operator); + + buffer.addWatermark(new Watermark(0l)); + buffer.addLatencyMarker(new LatencyMarker(111L, 1, 1)); + Assert.assertEquals(buffer.getQueue().size(), 2); + + Iterator, StreamElement>> iterator = + buffer.getQueue().entrySet().iterator();
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683308#comment-15683308 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88715005 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NU
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683297#comment-15683297 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88712331 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NU
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683273#comment-15683273 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88676017 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, ListCheckpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public List snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(start); + } + + @Override + public void restoreState(List state) throws Exception { + for (Integer i : state) + this.start = i; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { --- End diff -- Can we set the `counter` to `-1` to indicate an infinite input stream? > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so ma
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683272#comment-15683272 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88675892 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public void restoreState(Integer state) throws Exception { + this.start = state; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return start; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + } + Thread.sleep(10); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + + public static void main(String[] args) throws Exception { + + // obtain execution environment and set setBufferTimeout to 1 to enable + // continuous flushing of the output buffers (lowest latency) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() + .setBufferTimeout(1); + + // configurations for the job + String statePath = args[0]; + String cpMode = args[1]; + int maxCount = Integer.valueOf(args[2]); + final int s
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88712331 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL."); + + this.bufferSize = bufferSize; + this.mode = mode; + this.output = output; + this.timestampedCollector = collector; + this.operator = operator; + this.lock =
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683287#comment-15683287 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88701684 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NU
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683300#comment-15683300 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88720498 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -116,7 +116,12 @@ public OperatorChain(StreamTask containingTask) { // add head operator to end of chain allOps.add(headOperator); - + + // reverse the order of all operators so that head operator is at the first place. + // for chained operator with async wait operator, operators after wait operator have to + // wait for while until all data in the buffer in wait operator has done snapshot. + Collections.reverse(allOps); --- End diff -- This won't work with emitting elements in the open method of `AsyncWaitOperator`, because then the downstream operators are potentially not yet opened when the first stream element arrives there. > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683310#comment-15683310 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88719571 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; + +@Internal +public class AsyncWaitOperator + extends AbstractUdfStreamOperator> + implements OneInputStreamOperator +{ + private final int DEFAULT_BUFFER_SIZE = 1000; + + private static final long serialVersionUID = 1L; + + private final static String STATE_NAME = "_async_wait_operator_state_"; + + /** +* {@link TypeSerializer} for inputs while making snapshots. +*/ + private transient StreamElementSerializer inStreamElementSerializer; + + /** +* input stream elements from the state +*/ + private transient ListState recoveredStreamElements; + + private transient TimestampedCollector collector; + + private transient AsyncCollectorBuffer buffer; + + /** +* Checkpoint lock from {@link StreamTask#lock} +*/ + private transient Object checkpointLock; + + private int bufferSize = DEFAULT_BUFFER_SIZE; + private AsyncDataStream.OutputMode mode; + + /** +* For test only. Normally this flag is true, indicating that the Emitter Thread +* in the buffer will work. +*/ + private boolean emitFlag = true; + + /** +* Test serializer used in unit test +*/ + private StreamElementSerializer inStreamElementSerializerForTest; + + + public AsyncWaitOperator(AsyncFunction asyncFunction) { + super(asyncFunction); + chainingStrategy = ChainingStrategy.ALWAYS; + } + + public void setBufferSize(int size) { + Preconditions.checkArgument(size > 0, "The number of concurrent async operation should be greater than 0."); + bufferSize = size; + } + + public void setOutputMode(AsyncDataStream.OutputMode mode) { + this.mode = mode; + } + + @VisibleForTesting + public void
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683320#comment-15683320 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88868024 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; + +@Internal +public class AsyncWaitOperator + extends AbstractUdfStreamOperator> + implements OneInputStreamOperator +{ + private final int DEFAULT_BUFFER_SIZE = 1000; + + private static final long serialVersionUID = 1L; + + private final static String STATE_NAME = "_async_wait_operator_state_"; + + /** +* {@link TypeSerializer} for inputs while making snapshots. +*/ + private transient StreamElementSerializer inStreamElementSerializer; + + /** +* input stream elements from the state +*/ + private transient ListState recoveredStreamElements; + + private transient TimestampedCollector collector; + + private transient AsyncCollectorBuffer buffer; + + /** +* Checkpoint lock from {@link StreamTask#lock} +*/ + private transient Object checkpointLock; + + private int bufferSize = DEFAULT_BUFFER_SIZE; + private AsyncDataStream.OutputMode mode; + + /** +* For test only. Normally this flag is true, indicating that the Emitter Thread +* in the buffer will work. +*/ + private boolean emitFlag = true; + + /** +* Test serializer used in unit test +*/ + private StreamElementSerializer inStreamElementSerializerForTest; + + + public AsyncWaitOperator(AsyncFunction asyncFunction) { + super(asyncFunction); + chainingStrategy = ChainingStrategy.ALWAYS; + } + + public void setBufferSize(int size) { + Preconditions.checkArgument(size > 0, "The number of concurrent async operation should be greater than 0."); + bufferSize = size; + } + + public void setOutputMode(AsyncDataStream.OutputMode mode) { + this.mode = mode; + } + + @VisibleForTesting + public void
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683321#comment-15683321 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88865433 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java --- @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AsyncWaitOperator}. These test that: + * + * + * Process StreamRecords and Watermarks in ORDERED mode + * Process StreamRecords and Watermarks in UNORDERED mode + * Snapshot state and restore state + * + */ +public class AsyncWaitOperatorTest { + + public static class MyAsyncFunction extends RichAsyncFunction { + transient final int SLEEP_FACTOR = 100; + transient final int THREAD_POOL_SIZE = 10; + transient ExecutorService executorService; --- End diff -- Can't we share the `ExecutorService`? > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was se
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683314#comment-15683314 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88861548 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Tests for {@link AsyncCollectorBuffer}. These test that: + * + * + * Add a new item into the buffer + * Ordered mode processing + * Unordered mode processing + * Error handling + * + */ +public class AsyncCollectorBufferTest { + private AsyncFunction function; + + private AsyncWaitOperator operator; + + private AsyncCollectorBuffer buffer; + + private Output> output; + + private TimestampedCollector collector; + + private Object lock = new Object(); + + @Before + public void setUp() throws Exception { + function = new AsyncFunction() { + @Override + public void asyncInvoke(Integer input, AsyncCollector collector) throws Exception { + + } + }; + + operator = new AsyncWaitOperator<>(function); + Class[] classes = AbstractStreamOperator.class.getDeclaredClasses(); + Class latencyClass = null; + for (Class c : classes) { + if (c.getName().indexOf("LatencyGauge") != -1) { + latencyClass = c; + } + } + + Constructor explicitConstructor = latencyClass.getDeclaredConstructors()[0]; + explicitConstructor.setAccessible(true); + Whitebox.setInternalState(operator, "latencyGauge", explicitConstructor.newInstance(10)); + + output = new FakedOutput(new ArrayList()); + collector =new TimestampedCollector(output); + + Whitebox.setInternalState(operator, "output", output); + } + + @Test + public void testAdd() throws Exception { + buffer = + new AsyncCollectorBuffer<>(3, AsyncDataStream.OutputMode.UNORDERED, output, collector, lock, operator); + + buffer.addWatermark(new Watermark(0l)); + buffer.addLatencyMarker(new LatencyMarker(111L, 1, 1)); + Assert.assertEquals(buffer.getQueue().size(), 2); + + Iterator, StreamElement>> iterator = + buffer.getQueue().entrySet().iterator();
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683295#comment-15683295 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88714427 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NU
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683328#comment-15683328 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88872044 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NU
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683296#comment-15683296 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88712042 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NU
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683311#comment-15683311 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88861278 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Tests for {@link AsyncCollectorBuffer}. These test that: + * + * + * Add a new item into the buffer + * Ordered mode processing + * Unordered mode processing + * Error handling + * + */ +public class AsyncCollectorBufferTest { + private AsyncFunction function; + + private AsyncWaitOperator operator; + + private AsyncCollectorBuffer buffer; + + private Output> output; + + private TimestampedCollector collector; + + private Object lock = new Object(); + + @Before + public void setUp() throws Exception { + function = new AsyncFunction() { + @Override + public void asyncInvoke(Integer input, AsyncCollector collector) throws Exception { + + } + }; + + operator = new AsyncWaitOperator<>(function); + Class[] classes = AbstractStreamOperator.class.getDeclaredClasses(); + Class latencyClass = null; + for (Class c : classes) { + if (c.getName().indexOf("LatencyGauge") != -1) { + latencyClass = c; + } + } + + Constructor explicitConstructor = latencyClass.getDeclaredConstructors()[0]; + explicitConstructor.setAccessible(true); + Whitebox.setInternalState(operator, "latencyGauge", explicitConstructor.newInstance(10)); + + output = new FakedOutput(new ArrayList()); + collector =new TimestampedCollector(output); + + Whitebox.setInternalState(operator, "output", output); + } + + @Test + public void testAdd() throws Exception { + buffer = + new AsyncCollectorBuffer<>(3, AsyncDataStream.OutputMode.UNORDERED, output, collector, lock, operator); + + buffer.addWatermark(new Watermark(0l)); + buffer.addLatencyMarker(new LatencyMarker(111L, 1, 1)); + Assert.assertEquals(buffer.getQueue().size(), 2); + + Iterator, StreamElement>> iterator = + buffer.getQueue().entrySet().iterator();
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683315#comment-15683315 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88865406 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java --- @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AsyncWaitOperator}. These test that: + * + * + * Process StreamRecords and Watermarks in ORDERED mode + * Process StreamRecords and Watermarks in UNORDERED mode + * Snapshot state and restore state + * + */ +public class AsyncWaitOperatorTest { + + public static class MyAsyncFunction extends RichAsyncFunction { + transient final int SLEEP_FACTOR = 100; + transient final int THREAD_POOL_SIZE = 10; --- End diff -- why are these values `transient`? > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683274#comment-15683274 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88676904 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, ListCheckpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public List snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(start); + } + + @Override + public void restoreState(List state) throws Exception { + for (Integer i : state) + this.start = i; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + } + Thread.sleep(10); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + private static void printUsage() { + System.out.println("To customize example, use: AsyncIOExample [--fsStatePath ] " + + "[--checkpointMode ] [--maxCount ] " + + "[--sleepFactor ] [--failRatio ] " + + "[--waitMode ] [--waitOperatorParallelism ] " + + "[--eventType ]"); + } + + public static void main(String[] args) thr
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683303#comment-15683303 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88718820 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; + +@Internal +public class AsyncWaitOperator + extends AbstractUdfStreamOperator> + implements OneInputStreamOperator +{ + private final int DEFAULT_BUFFER_SIZE = 1000; + + private static final long serialVersionUID = 1L; + + private final static String STATE_NAME = "_async_wait_operator_state_"; + + /** +* {@link TypeSerializer} for inputs while making snapshots. +*/ + private transient StreamElementSerializer inStreamElementSerializer; + + /** +* input stream elements from the state +*/ + private transient ListState recoveredStreamElements; + + private transient TimestampedCollector collector; + + private transient AsyncCollectorBuffer buffer; + + /** +* Checkpoint lock from {@link StreamTask#lock} +*/ + private transient Object checkpointLock; + + private int bufferSize = DEFAULT_BUFFER_SIZE; + private AsyncDataStream.OutputMode mode; + + /** +* For test only. Normally this flag is true, indicating that the Emitter Thread +* in the buffer will work. +*/ + private boolean emitFlag = true; + + /** +* Test serializer used in unit test +*/ + private StreamElementSerializer inStreamElementSerializerForTest; + + + public AsyncWaitOperator(AsyncFunction asyncFunction) { + super(asyncFunction); + chainingStrategy = ChainingStrategy.ALWAYS; + } + + public void setBufferSize(int size) { + Preconditions.checkArgument(size > 0, "The number of concurrent async operation should be greater than 0."); + bufferSize = size; + } + + public void setOutputMode(AsyncDataStream.OutputMode mode) { + this.mode = mode; + } + + @VisibleForTesting + public void
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683324#comment-15683324 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88721238 --- Diff: flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java --- @@ -195,6 +202,70 @@ public Integer map(NonSerializable value) throws Exception { env.execute(); } + @Test + public void testAsyncWaitOperator() throws Exception { + final int numElements = 10; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.addSource(new NonSerializableTupleSource(numElements)).setParallelism(1); + + AsyncFunction, Integer> function = new RichAsyncFunction, Integer>() { + transient ExecutorService executorService; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + executorService = Executors.newFixedThreadPool(numElements); + } + + @Override + public void close() throws Exception { + super.close(); + executorService.shutdown(); + } + + @Override + public void asyncInvoke(final Tuple2 input, + final AsyncCollector, Integer> collector) throws Exception { + this.executorService.submit(new Runnable() { + @Override + public void run() { + // wait for while to simulate async operation here + int sleep = (int) (new Random().nextFloat() * 1000); + try { + Thread.sleep(sleep); + List ret = new ArrayList<>(); + ret.add(input.f0+input.f0); + collector.collect(ret); + } + catch (InterruptedException e) { + collector.collect(new ArrayList(0)); + } + } + }); + } + }; + + DataStream orderedResult = AsyncDataStream.orderedWait(input, function, 2).setParallelism(1); + orderedResult.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE).setParallelism(1); + + DataStream unorderedResult = AsyncDataStream.unorderedWait(input, function, 2).setParallelism(1); + unorderedResult.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE); --- End diff -- Writing to disk can fail on travis. Thus, it is highly recommended rewriting these tests doing so that they store their results in memory. > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683319#comment-15683319 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88857262 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Tests for {@link AsyncCollectorBuffer}. These test that: + * + * + * Add a new item into the buffer + * Ordered mode processing + * Unordered mode processing + * Error handling + * + */ +public class AsyncCollectorBufferTest { + private AsyncFunction function; + + private AsyncWaitOperator operator; + + private AsyncCollectorBuffer buffer; + + private Output> output; + + private TimestampedCollector collector; + + private Object lock = new Object(); + + @Before + public void setUp() throws Exception { + function = new AsyncFunction() { + @Override + public void asyncInvoke(Integer input, AsyncCollector collector) throws Exception { + + } + }; + + operator = new AsyncWaitOperator<>(function); + Class[] classes = AbstractStreamOperator.class.getDeclaredClasses(); + Class latencyClass = null; + for (Class c : classes) { + if (c.getName().indexOf("LatencyGauge") != -1) { + latencyClass = c; + } + } + + Constructor explicitConstructor = latencyClass.getDeclaredConstructors()[0]; + explicitConstructor.setAccessible(true); + Whitebox.setInternalState(operator, "latencyGauge", explicitConstructor.newInstance(10)); + + output = new FakedOutput(new ArrayList()); + collector =new TimestampedCollector(output); + + Whitebox.setInternalState(operator, "output", output); --- End diff -- Can't we simply call `AsyncWaitOperator#setup` here? > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier >Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream.
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683299#comment-15683299 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88714544 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer { + + /** +* Max number of {@link AsyncCollector} in the buffer. +*/ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator operator; + + /** +* Keep all {@code AsyncCollector} and their input {@link StreamElement} +*/ + private final Map, StreamElement> queue = new LinkedHashMap<>(); + /** +* For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the +* {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue} +* is full since main thread waits on this lock. The StreamElement in +* {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements +* in its queue. It will be kept in the operator state while snapshotting. +*/ + private StreamElement extraStreamElement; + + /** +* {@link TimestampedCollector} and {@link Output} to collect results and watermarks. +*/ + private final Output> output; + private final TimestampedCollector timestampedCollector; + + /** +* Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} +*/ + private final Object lock; + + private final Emitter emitter; + private final Thread emitThread; + + private IOException error; + + public AsyncCollectorBuffer( + int bufferSize, + AsyncDataStream.OutputMode mode, + Output> output, + TimestampedCollector collector, + Object lock, + AsyncWaitOperator operator) { + Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0."); + Preconditions.checkNotNull(output, "Output should not be NULL."); + Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL."); + Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL."); + Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NU
[GitHub] flink pull request #2629: [FLINK-4391] Provide support for asynchronous oper...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88716757 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; + +@Internal +public class AsyncWaitOperator + extends AbstractUdfStreamOperator> + implements OneInputStreamOperator +{ + private final int DEFAULT_BUFFER_SIZE = 1000; + + private static final long serialVersionUID = 1L; + + private final static String STATE_NAME = "_async_wait_operator_state_"; + + /** +* {@link TypeSerializer} for inputs while making snapshots. +*/ + private transient StreamElementSerializer inStreamElementSerializer; + + /** +* input stream elements from the state +*/ + private transient ListState recoveredStreamElements; + + private transient TimestampedCollector collector; + + private transient AsyncCollectorBuffer buffer; + + /** +* Checkpoint lock from {@link StreamTask#lock} +*/ + private transient Object checkpointLock; + + private int bufferSize = DEFAULT_BUFFER_SIZE; + private AsyncDataStream.OutputMode mode; + + /** +* For test only. Normally this flag is true, indicating that the Emitter Thread +* in the buffer will work. +*/ + private boolean emitFlag = true; + + /** +* Test serializer used in unit test +*/ + private StreamElementSerializer inStreamElementSerializerForTest; + + + public AsyncWaitOperator(AsyncFunction asyncFunction) { + super(asyncFunction); + chainingStrategy = ChainingStrategy.ALWAYS; + } + + public void setBufferSize(int size) { + Preconditions.checkArgument(size > 0, "The number of concurrent async operation should be greater than 0."); + bufferSize = size; + } + + public void setOutputMode(AsyncDataStream.OutputMode mode) { + this.mode = mode; + } --- End diff -- Can't we give these information at creation time of the operator? Then we can mark these fields also final. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and w
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683318#comment-15683318 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88858824 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Tests for {@link AsyncCollectorBuffer}. These test that: + * + * + * Add a new item into the buffer + * Ordered mode processing + * Unordered mode processing + * Error handling + * + */ +public class AsyncCollectorBufferTest { + private AsyncFunction function; + + private AsyncWaitOperator operator; + + private AsyncCollectorBuffer buffer; + + private Output> output; + + private TimestampedCollector collector; + + private Object lock = new Object(); + + @Before + public void setUp() throws Exception { + function = new AsyncFunction() { + @Override + public void asyncInvoke(Integer input, AsyncCollector collector) throws Exception { + + } + }; + + operator = new AsyncWaitOperator<>(function); + Class[] classes = AbstractStreamOperator.class.getDeclaredClasses(); + Class latencyClass = null; + for (Class c : classes) { + if (c.getName().indexOf("LatencyGauge") != -1) { + latencyClass = c; + } + } + + Constructor explicitConstructor = latencyClass.getDeclaredConstructors()[0]; + explicitConstructor.setAccessible(true); + Whitebox.setInternalState(operator, "latencyGauge", explicitConstructor.newInstance(10)); + + output = new FakedOutput(new ArrayList()); + collector =new TimestampedCollector(output); + + Whitebox.setInternalState(operator, "output", output); + } + + @Test + public void testAdd() throws Exception { + buffer = + new AsyncCollectorBuffer<>(3, AsyncDataStream.OutputMode.UNORDERED, output, collector, lock, operator); + + buffer.addWatermark(new Watermark(0l)); + buffer.addLatencyMarker(new LatencyMarker(111L, 1, 1)); + Assert.assertEquals(buffer.getQueue().size(), 2); + + Iterator, StreamElement>> iterator = + buffer.getQueue().entrySet().iterator();
[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683275#comment-15683275 ] ASF GitHub Bot commented on FLINK-4391: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r88676639 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.async; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.async.AsyncCollector; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} + */ +public class AsyncIOExample { + + /** +* A checkpointed source. +*/ + private static class SimpleSource implements SourceFunction, Checkpointed { + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + private int counter = 0; + private int start = 0; + + @Override + public void restoreState(Integer state) throws Exception { + this.start = state; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return start; + } + + public SimpleSource(int maxNum) { + this.counter = maxNum; + } + + @Override + public void run(SourceContext ctx) throws Exception { + while (start < counter && isRunning) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(start); + ++start; + } + Thread.sleep(10); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + + public static void main(String[] args) throws Exception { + + // obtain execution environment and set setBufferTimeout to 1 to enable + // continuous flushing of the output buffers (lowest latency) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() + .setBufferTimeout(1); + + // configurations for the job + String statePath = args[0]; + String cpMode = args[1]; + int maxCount = Integer.valueOf(args[2]); + final int s