Hi All,
I'm trying to test a pipeline that consists of two Flink tasks with a
MiniCluster. The 1st task has a WindowAll operator which groups items into
batches every second, and the 2nd task does an async operation with each batch
and flatMaps the result.
I've whittled it down to the bare bones below. There are two tests:
* testPipelineWithCountTrigger - this one works fine 🙂
* testPipelineWithProcessingTimeTrigger - this one doesn't give any output 🙁
It seems like a timing issue. If I step through the failing one slowly I can
see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear methods
do get called, and the asyncInvoke method also gets called, but when I run it
the 2nd test fails as it produces no output. I've tried setting the MiniCluster
timeout to 1 day, the same with my AsyncUDF timeout, and sleeping for 3 *
window after env.execute but no difference. I'm running this with Flink 1.9.0
and OpenJDK8 on Ubuntu (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10).
Any idea how I can get the 2nd test to wait to process the output?
Thanks 🙂
John.
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class StreamTest {
@Test // :)
@Tag("unit")
public void testPipelineWithCountTrigger() throws Exception {
runPipeline(10, CountTrigger.of(10));
}
@Test // :(
@Tag("unit")
public void testPipelineWithProcessingTimeTrigger() throws Exception {
runPipeline(10, ProcessingTimeTrigger.create());
}
private void runPipeline(int inputSize, Trigger<Object, TimeWindow> trigger)
throws Exception {
MiniClusterWithClientResource miniCluster = new
MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1,
TimeUnit.DAYS))
.build()
);
miniCluster.before();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
CollectSink.values.clear();
List<Integer> listOfNumbers = IntStream.rangeClosed(1,
inputSize).boxed().collect(Collectors.toList());
// 1st half of pipeline
DataStream<List<Integer>> pipeA = env.fromCollection(listOfNumbers)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(trigger)
.process(new Batcher());
// 2nd half of pipeline
DataStream<Integer> pipeB = AsyncDataStream.unorderedWait(pipeA, new
AsyncUDF(), 1L, TimeUnit.DAYS, 1 )
.flatMap((List<Integer> records, Collector<Integer> out) ->
records.forEach(out::collect)).returns(Types.INT);
pipeB.addSink(new CollectSink());
env.execute();
try {
Thread.sleep(1000L * 3);
} catch (InterruptedException e) {
System.out.println();
}
miniCluster.after();
assertEquals(inputSize, CollectSink.values.size());
}
public static class Batcher extends ProcessAllWindowFunction<Integer,
List<Integer>, TimeWindow> {
@Override
public void process(Context context, Iterable<Integer> elements,
Collector<List<Integer>> out) throws Exception {
out.collect(StreamSupport.stream(elements.spliterator(),
false).collect(Collectors.toList()));
}
}
private static class AsyncUDF extends RichAsyncFunction<List<Integer>,
List<Integer>> {
private CompletableFuture<List<Integer>> doAsyncStuff(List<Integer> value) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return value;
});
}
@Override
public void asyncInvoke(List<Integer> input, ResultFuture<List<Integer>>
resultFuture) throws Exception {
doAsyncStuff(input).thenAccept(stuff ->
resultFuture.complete(Collections.singleton(stuff)));
}
@Override
public void timeout(List<Integer> input, ResultFuture<List<Integer>>
resultFuture) throws Exception {
resultFuture.completeExceptionally(new RuntimeException("Timeout!"));
}
}
// create a testing sink
private static class CollectSink implements SinkFunction<Integer> {
public static final List<Integer> values = new ArrayList<>();
@Override
public synchronized void invoke(Integer value) throws Exception {
values.add(value);
}
}
}