Hi John,

The root cause is the collection source exits too fast. The window would
also exit without being triggered.

You could verify that by waiting a second before releasing the window. For
example, insert a map operator between source and window operator. Blocking
a second or more in the "close" method of this map operator. You will see
the window would work well.

Thanks,
Biao /'bɪ.aʊ/



On Wed, 18 Dec 2019 at 06:24, John Morrow <johnniemor...@hotmail.com> wrote:

> Hi All,
>
> I'm trying to test a pipeline that consists of two Flink tasks with a
> MiniCluster. The 1st task has a WindowAll operator which groups items into
> batches every second, and the 2nd task does an async operation with each
> batch and flatMaps the result.
>
> I've whittled it down to the bare bones below. There are two tests:
>
>    - testPipelineWithCountTrigger - this one works fine 🙂
>    - testPipelineWithProcessingTimeTrigger - this one doesn't give any
>    output 🙁
>
>
> It seems like a timing issue. If I step through the failing one slowly I
> can see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear
> methods do get called, and the asyncInvoke method also gets called, but
> when I run it the 2nd test fails as it produces no output. I've tried
> setting the MiniCluster timeout to 1 day, the same with my AsyncUDF
> timeout, and sleeping for 3 * window after env.execute but no difference.
> I'm running this with Flink 1.9.0 and OpenJDK8 on Ubuntu (build
> 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10).
>
>
> Any idea how I can get the 2nd test to wait to process the output?
>
>
> Thanks 🙂
>
> John.
>
>
>
>
>
>
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
> import org.apache.flink.streaming.api.datastream.AsyncDataStream;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.async.ResultFuture;
> import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import
> org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
> import
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.test.util.MiniClusterWithClientResource;
> import org.apache.flink.util.Collector;
> import org.junit.jupiter.api.Tag;
> import org.junit.jupiter.api.Test;
>
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> import java.util.stream.StreamSupport;
>
> import static org.junit.jupiter.api.Assertions.assertEquals;
>
>
> public class StreamTest {
>
>   @Test // :)
>   @Tag("unit")
>   public void testPipelineWithCountTrigger() throws Exception {
>     runPipeline(10, CountTrigger.of(10));
>   }
>
>   @Test // :(
>   @Tag("unit")
>   public void testPipelineWithProcessingTimeTrigger() throws Exception {
>     runPipeline(10, ProcessingTimeTrigger.create());
>   }
>
>
>   private void runPipeline(int inputSize, Trigger<Object, TimeWindow>
> trigger) throws Exception {
>
>     MiniClusterWithClientResource miniCluster = new
> MiniClusterWithClientResource(
>         new MiniClusterResourceConfiguration.Builder()
>             .setNumberSlotsPerTaskManager(1)
>             .setNumberTaskManagers(1)
>
> .setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1,
> TimeUnit.DAYS))
>             .build()
>     );
>     miniCluster.before();
>
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>     CollectSink.values.clear();
>
>     List<Integer> listOfNumbers = IntStream.rangeClosed(1,
> inputSize).boxed().collect(Collectors.toList());
>
>     // 1st half of pipeline
>     DataStream<List<Integer>> pipeA = env.fromCollection(listOfNumbers)
>         .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
>         .trigger(trigger)
>         .process(new Batcher());
>
>     // 2nd half of pipeline
>     DataStream<Integer> pipeB = AsyncDataStream.unorderedWait(pipeA, new
> AsyncUDF(), 1L, TimeUnit.DAYS, 1 )
>         .flatMap((List<Integer> records, Collector<Integer> out) ->
> records.forEach(out::collect)).returns(Types.INT);
>     pipeB.addSink(new CollectSink());
>
>     env.execute();
>
>     try {
>       Thread.sleep(1000L * 3);
>     } catch (InterruptedException e) {
>       System.out.println();
>     }
>     miniCluster.after();
>
>     assertEquals(inputSize, CollectSink.values.size());
>   }
>
>
>   public static class Batcher extends ProcessAllWindowFunction<Integer,
> List<Integer>, TimeWindow> {
>     @Override
>     public void process(Context context, Iterable<Integer> elements,
> Collector<List<Integer>> out) throws Exception {
>       out.collect(StreamSupport.stream(elements.spliterator(),
> false).collect(Collectors.toList()));
>     }
>   }
>
>   private static class AsyncUDF extends RichAsyncFunction<List<Integer>,
> List<Integer>> {
>
>     private CompletableFuture<List<Integer>> doAsyncStuff(List<Integer>
> value) {
>       return CompletableFuture.supplyAsync(() -> {
>         try {
>           Thread.sleep(100);
>         } catch (InterruptedException e) {
>           e.printStackTrace();
>         }
>         return value;
>       });
>     }
>
>     @Override
>     public void asyncInvoke(List<Integer> input,
> ResultFuture<List<Integer>> resultFuture) throws Exception {
>       doAsyncStuff(input).thenAccept(stuff ->
> resultFuture.complete(Collections.singleton(stuff)));
>     }
>
>     @Override
>     public void timeout(List<Integer> input, ResultFuture<List<Integer>>
> resultFuture) throws Exception {
>       resultFuture.completeExceptionally(new RuntimeException("Timeout!"));
>     }
>   }
>
>   // create a testing sink
>   private static class CollectSink implements SinkFunction<Integer> {
>     public static final List<Integer> values = new ArrayList<>();
>
>     @Override
>     public synchronized void invoke(Integer value) throws Exception {
>       values.add(value);
>     }
>   }
>
> }
>
>

Reply via email to