Hi John, The root cause is the collection source exits too fast. The window would also exit without being triggered.
You could verify that by waiting a second before releasing the window. For example, insert a map operator between source and window operator. Blocking a second or more in the "close" method of this map operator. You will see the window would work well. Thanks, Biao /'bɪ.aʊ/ On Wed, 18 Dec 2019 at 06:24, John Morrow <johnniemor...@hotmail.com> wrote: > Hi All, > > I'm trying to test a pipeline that consists of two Flink tasks with a > MiniCluster. The 1st task has a WindowAll operator which groups items into > batches every second, and the 2nd task does an async operation with each > batch and flatMaps the result. > > I've whittled it down to the bare bones below. There are two tests: > > - testPipelineWithCountTrigger - this one works fine 🙂 > - testPipelineWithProcessingTimeTrigger - this one doesn't give any > output 🙁 > > > It seems like a timing issue. If I step through the failing one slowly I > can see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear > methods do get called, and the asyncInvoke method also gets called, but > when I run it the 2nd test fails as it produces no output. I've tried > setting the MiniCluster timeout to 1 day, the same with my AsyncUDF > timeout, and sleeping for 3 * window after env.execute but no difference. > I'm running this with Flink 1.9.0 and OpenJDK8 on Ubuntu (build > 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10). > > > Any idea how I can get the 2nd test to wait to process the output? > > > Thanks 🙂 > > John. > > > > > > > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; > import org.apache.flink.streaming.api.datastream.AsyncDataStream; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.async.ResultFuture; > import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; > import org.apache.flink.streaming.api.functions.sink.SinkFunction; > import > org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; > import > org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; > import > org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; > import org.apache.flink.streaming.api.windowing.triggers.Trigger; > import org.apache.flink.streaming.api.windowing.windows.TimeWindow; > import org.apache.flink.test.util.MiniClusterWithClientResource; > import org.apache.flink.util.Collector; > import org.junit.jupiter.api.Tag; > import org.junit.jupiter.api.Test; > > import java.util.ArrayList; > import java.util.Collections; > import java.util.List; > import java.util.concurrent.CompletableFuture; > import java.util.concurrent.TimeUnit; > import java.util.stream.Collectors; > import java.util.stream.IntStream; > import java.util.stream.StreamSupport; > > import static org.junit.jupiter.api.Assertions.assertEquals; > > > public class StreamTest { > > @Test // :) > @Tag("unit") > public void testPipelineWithCountTrigger() throws Exception { > runPipeline(10, CountTrigger.of(10)); > } > > @Test // :( > @Tag("unit") > public void testPipelineWithProcessingTimeTrigger() throws Exception { > runPipeline(10, ProcessingTimeTrigger.create()); > } > > > private void runPipeline(int inputSize, Trigger<Object, TimeWindow> > trigger) throws Exception { > > MiniClusterWithClientResource miniCluster = new > MiniClusterWithClientResource( > new MiniClusterResourceConfiguration.Builder() > .setNumberSlotsPerTaskManager(1) > .setNumberTaskManagers(1) > > .setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1, > TimeUnit.DAYS)) > .build() > ); > miniCluster.before(); > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > CollectSink.values.clear(); > > List<Integer> listOfNumbers = IntStream.rangeClosed(1, > inputSize).boxed().collect(Collectors.toList()); > > // 1st half of pipeline > DataStream<List<Integer>> pipeA = env.fromCollection(listOfNumbers) > .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1))) > .trigger(trigger) > .process(new Batcher()); > > // 2nd half of pipeline > DataStream<Integer> pipeB = AsyncDataStream.unorderedWait(pipeA, new > AsyncUDF(), 1L, TimeUnit.DAYS, 1 ) > .flatMap((List<Integer> records, Collector<Integer> out) -> > records.forEach(out::collect)).returns(Types.INT); > pipeB.addSink(new CollectSink()); > > env.execute(); > > try { > Thread.sleep(1000L * 3); > } catch (InterruptedException e) { > System.out.println(); > } > miniCluster.after(); > > assertEquals(inputSize, CollectSink.values.size()); > } > > > public static class Batcher extends ProcessAllWindowFunction<Integer, > List<Integer>, TimeWindow> { > @Override > public void process(Context context, Iterable<Integer> elements, > Collector<List<Integer>> out) throws Exception { > out.collect(StreamSupport.stream(elements.spliterator(), > false).collect(Collectors.toList())); > } > } > > private static class AsyncUDF extends RichAsyncFunction<List<Integer>, > List<Integer>> { > > private CompletableFuture<List<Integer>> doAsyncStuff(List<Integer> > value) { > return CompletableFuture.supplyAsync(() -> { > try { > Thread.sleep(100); > } catch (InterruptedException e) { > e.printStackTrace(); > } > return value; > }); > } > > @Override > public void asyncInvoke(List<Integer> input, > ResultFuture<List<Integer>> resultFuture) throws Exception { > doAsyncStuff(input).thenAccept(stuff -> > resultFuture.complete(Collections.singleton(stuff))); > } > > @Override > public void timeout(List<Integer> input, ResultFuture<List<Integer>> > resultFuture) throws Exception { > resultFuture.completeExceptionally(new RuntimeException("Timeout!")); > } > } > > // create a testing sink > private static class CollectSink implements SinkFunction<Integer> { > public static final List<Integer> values = new ArrayList<>(); > > @Override > public synchronized void invoke(Integer value) throws Exception { > values.add(value); > } > } > > } > >