[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633158#comment-15633158 ]
ASF GitHub Bot commented on FLINK-4391: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86349629 --- Diff: flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java --- @@ -195,6 +202,70 @@ public Integer map(NonSerializable value) throws Exception { env.execute(); } + @Test + public void testAsyncWaitOperator() throws Exception { + final int numElements = 10; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements)).setParallelism(1); + + AsyncFunction<Tuple2<Integer, NonSerializable>, Integer> function = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() { + transient ExecutorService executorService; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + executorService = Executors.newFixedThreadPool(numElements); + } + + @Override + public void close() throws Exception { + super.close(); + executorService.shutdown(); + } + + @Override + public void asyncInvoke(final Tuple2<Integer, NonSerializable> input, + final AsyncCollector<Tuple2<Integer, NonSerializable>, Integer> collector) throws Exception { + this.executorService.submit(new Runnable() { + @Override + public void run() { + // wait for while to simulate async operation here + int sleep = (int) (new Random().nextFloat() * 1000); --- End diff -- Decrease the sleep timeout will help to speedup the test. > Provide support for asynchronous operations over streams > -------------------------------------------------------- > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Jamie Grier > Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)