Hi Stephen If the window has not been triggered ever, maybe you could investigate the watermark, maybe the doc[1][2] can be helpful.
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#interaction-of-watermarks-and-windows [2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks Best, Congxian On Feb 19, 2019, 21:31 +0800, Stephen Connolly <stephen.alan.conno...@gmail.com>, wrote: > Hmmm my suspicions are now quite high. I created a file source that just > replays the events straight then I get more results.... > > > On Tue, 19 Feb 2019 at 11:50, Stephen Connolly > > <stephen.alan.conno...@gmail.com> wrote: > > > Hmmm after expanding the dataset such that there was additional data that > > > ended up on shard-0 (everything in my original dataset was coincidentally > > > landing on shard-1) I am now getting output... should I expect this kind > > > of behaviour if no data arrives at shard-0 ever? > > > > > > > On Tue, 19 Feb 2019 at 11:14, Stephen Connolly > > > > <stephen.alan.conno...@gmail.com> wrote: > > > > > Hi, I’m having a strange situation and I would like to know where I > > > > > should start trying to debug. > > > > > > > > > > I have set up a configurable swap in source, with three > > > > > implementations: > > > > > > > > > > 1. A mock implementation > > > > > 2. A Kafka consumer implementation > > > > > 3. A Kinesis consumer implementation > > > > > > > > > > From injecting a log and no-op map function I can see that all three > > > > > sources pass through the events correctly. > > > > > > > > > > I then have a window based on event time stamps… and from inspecting > > > > > the aggregation function I can see that the data is getting > > > > > aggregated…, I’m using the > > > > > `.aggregate(AggregateFunction.WindowFunction)` variant so that I can > > > > > retrieve the key > > > > > > > > > > Here’s the strange thing, I only change the source (and each source > > > > > uses the same deserialization function) but: > > > > > > > > > > > > > > > • When I use either Kafka or my Mock source, the WindowFunction gets > > > > > called as events pass the end of the window > > > > > • When I use the Kinesis source, however, the window function never > > > > > gets called. I have even tried injecting events into kinesis with > > > > > really high timestamps to flush the watermarks in my > > > > > BoundedOutOfOrdernessTimestampExtractor... but nothing > > > > > > > > > > I cannot see how this source switching could result in such a > > > > > different behaviour: > > > > > > > > > > Properties sourceProperties = new Properties(); > > > > > ConsumerFactory sourceFactory; > > > > > String sourceName = configParams.getRequired("source"); > > > > > switch (sourceName.toLowerCase(Locale.ENGLISH)) { > > > > > case "kinesis": > > > > > sourceFactory = FlinkKinesisConsumer::new; > > > > > copyOptionalArg(configParams, "aws-region", > > > > > sourceProperties, AWSConfigConstants.AWS_REGION); > > > > > copyOptionalArg(configParams, "aws-endpoint", > > > > > sourceProperties, AWSConfigConstants.AWS_ENDPOINT); > > > > > copyOptionalArg(configParams, "aws-access-key", > > > > > sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID); > > > > > copyOptionalArg(configParams, "aws-secret-key", > > > > > sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY); > > > > > copyOptionalArg(configParams, "aws-profile", > > > > > sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME); > > > > > break; > > > > > case "kafka": > > > > > sourceFactory = FlinkKafkaConsumer010::new; > > > > > copyRequiredArg(configParams, "bootstrap-server", > > > > > sourceProperties, "bootstrap.servers"); > > > > > copyOptionalArg(configParams, "group-id", > > > > > sourceProperties, "group.id"); > > > > > break; > > > > > case "mock": > > > > > sourceFactory = MockSourceFunction::new; > > > > > break; > > > > > default: > > > > > throw new RuntimeException("Unknown source '" + > > > > > sourceName + '\''); > > > > > } > > > > > > > > > > // set up the streaming execution environment > > > > > final StreamExecutionEnvironment env = > > > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > > > > > > // poll watermark every second because using > > > > > BoundedOutOfOrdernessTimestampExtractor > > > > > env.getConfig().setAutoWatermarkInterval(1000L); > > > > > env.enableCheckpointing(5000); > > > > > > > > > > SplitStream<JsonNode> eventsByType = > > > > > env.addSource(sourceFactory.create( > > > > > configParams.getRequired("topic"), > > > > > new ObjectNodeDeserializationSchema(), > > > > > sourceProperties > > > > > )) > > > > > .returns(ObjectNode.class) // the use of > > > > > ConsumerFactory erases the type info so add it back > > > > > .name("raw-events") > > > > > .assignTimestampsAndWatermarks( > > > > > new > > > > > ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", > > > > > Time.seconds(5)) > > > > > ) > > > > > .split(new JsonNodeOutputSelector("eventType")); > > > > > ... > > > > > eventsByType.select(...) > > > > > .keyBy(new JsonNodeStringKeySelector("_key")) > > > > > > > > > > .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration), > > > > > (KeySelector<JsonNode, Time>) > > > > > TasksMain::offsetPerMaster)) > > > > > .trigger(EventTimeTrigger.create()) > > > > > .aggregate(new CountsAggregator<>(), new > > > > > KeyTagger<>()) // <==== The CountsAggregator is seeing the data > > > > > .print() // <==== HERE is where we get no output from > > > > > Kinesis... but Kafka and my Mock are just fine! > > > > > > > > > >