[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633158#comment-15633158
 ] 

ASF GitHub Bot commented on FLINK-4391:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2629#discussion_r86349629
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
 ---
    @@ -195,6 +202,70 @@ public Integer map(NonSerializable value) throws 
Exception {
                env.execute();
        }
     
    +   @Test
    +   public void testAsyncWaitOperator() throws Exception {
    +           final int numElements = 10;
    +
    +           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +           DataStream<Tuple2<Integer, NonSerializable>> input = 
env.addSource(new NonSerializableTupleSource(numElements)).setParallelism(1);
    +
    +           AsyncFunction<Tuple2<Integer, NonSerializable>, Integer> 
function = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() {
    +                   transient ExecutorService executorService;
    +
    +                   @Override
    +                   public void open(Configuration parameters) throws 
Exception {
    +                           super.open(parameters);
    +                           executorService = 
Executors.newFixedThreadPool(numElements);
    +                   }
    +
    +                   @Override
    +                   public void close() throws Exception {
    +                           super.close();
    +                           executorService.shutdown();
    +                   }
    +
    +                   @Override
    +                   public void asyncInvoke(final Tuple2<Integer, 
NonSerializable> input,
    +                                                                   final 
AsyncCollector<Tuple2<Integer, NonSerializable>, Integer> collector) throws 
Exception {
    +                           this.executorService.submit(new Runnable() {
    +                                   @Override
    +                                   public void run() {
    +                                           // wait for while to simulate 
async operation here
    +                                           int sleep = (int) (new 
Random().nextFloat() * 1000);
    --- End diff --
    
    Decrease the sleep timeout will help to speedup the test.


> Provide support for asynchronous operations over streams
> --------------------------------------------------------
>
>                 Key: FLINK-4391
>                 URL: https://issues.apache.org/jira/browse/FLINK-4391
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Jamie Grier
>            Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to