Hi Stephen

If the window has not been triggered ever, maybe you could investigate the 
watermark, maybe the doc[1][2] can be helpful.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#interaction-of-watermarks-and-windows
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks

Best, Congxian
On Feb 19, 2019, 21:31 +0800, Stephen Connolly 
<stephen.alan.conno...@gmail.com>, wrote:
> Hmmm my suspicions are now quite high. I created a file source that just 
> replays the events straight then I get more results....
>
> > On Tue, 19 Feb 2019 at 11:50, Stephen Connolly 
> > <stephen.alan.conno...@gmail.com> wrote:
> > > Hmmm after expanding the dataset such that there was additional data that 
> > > ended up on shard-0 (everything in my original dataset was coincidentally 
> > > landing on shard-1) I am now getting output... should I expect this kind 
> > > of behaviour if no data arrives at shard-0 ever?
> > >
> > > > On Tue, 19 Feb 2019 at 11:14, Stephen Connolly 
> > > > <stephen.alan.conno...@gmail.com> wrote:
> > > > > Hi, I’m having a strange situation and I would like to know where I 
> > > > > should start trying to debug.
> > > > >
> > > > > I have set up a configurable swap in source, with three 
> > > > > implementations:
> > > > >
> > > > > 1. A mock implementation
> > > > > 2. A Kafka consumer implementation
> > > > > 3. A Kinesis consumer implementation
> > > > >
> > > > > From injecting a log and no-op map function I can see that all three 
> > > > > sources pass through the events correctly.
> > > > >
> > > > > I then have a window based on event time stamps… and from inspecting 
> > > > > the aggregation function I can see that the data is getting 
> > > > > aggregated…, I’m using the 
> > > > > `.aggregate(AggregateFunction.WindowFunction)` variant so that I can 
> > > > > retrieve the key
> > > > >
> > > > > Here’s the strange thing, I only change the source (and each source 
> > > > > uses the same deserialization function) but:
> > > > >
> > > > >
> > > > > • When I use either Kafka or my Mock source, the WindowFunction gets 
> > > > > called as events pass the end of the window
> > > > > • When I use the Kinesis source, however, the window function never 
> > > > > gets called. I have even tried injecting events into kinesis with 
> > > > > really high timestamps to flush the watermarks in my 
> > > > > BoundedOutOfOrdernessTimestampExtractor... but nothing
> > > > >
> > > > > I cannot see how this source switching could result in such a 
> > > > > different behaviour:
> > > > >
> > > > >         Properties sourceProperties = new Properties();
> > > > >         ConsumerFactory sourceFactory;
> > > > >         String sourceName = configParams.getRequired("source");
> > > > >         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
> > > > >             case "kinesis":
> > > > >                 sourceFactory = FlinkKinesisConsumer::new;
> > > > >                 copyOptionalArg(configParams, "aws-region", 
> > > > > sourceProperties, AWSConfigConstants.AWS_REGION);
> > > > >                 copyOptionalArg(configParams, "aws-endpoint", 
> > > > > sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
> > > > >                 copyOptionalArg(configParams, "aws-access-key", 
> > > > > sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
> > > > >                 copyOptionalArg(configParams, "aws-secret-key", 
> > > > > sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
> > > > >                 copyOptionalArg(configParams, "aws-profile", 
> > > > > sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
> > > > >                 break;
> > > > >             case "kafka":
> > > > >                 sourceFactory = FlinkKafkaConsumer010::new;
> > > > >                 copyRequiredArg(configParams, "bootstrap-server", 
> > > > > sourceProperties, "bootstrap.servers");
> > > > >                 copyOptionalArg(configParams, "group-id", 
> > > > > sourceProperties, "group.id");
> > > > >                 break;
> > > > >             case "mock":
> > > > >                 sourceFactory = MockSourceFunction::new;
> > > > >                 break;
> > > > >             default:
> > > > >                 throw new RuntimeException("Unknown source '" + 
> > > > > sourceName + '\'');
> > > > >         }
> > > > >
> > > > >         // set up the streaming execution environment
> > > > >         final StreamExecutionEnvironment env = 
> > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > >
> > > > >         // poll watermark every second because using 
> > > > > BoundedOutOfOrdernessTimestampExtractor
> > > > >         env.getConfig().setAutoWatermarkInterval(1000L);
> > > > >         env.enableCheckpointing(5000);
> > > > >
> > > > >         SplitStream<JsonNode> eventsByType = 
> > > > > env.addSource(sourceFactory.create(
> > > > >                 configParams.getRequired("topic"),
> > > > >                 new ObjectNodeDeserializationSchema(),
> > > > >                 sourceProperties
> > > > >         ))
> > > > >                 .returns(ObjectNode.class) // the use of 
> > > > > ConsumerFactory erases the type info so add it back
> > > > >                 .name("raw-events")
> > > > >                 .assignTimestampsAndWatermarks(
> > > > >                         new 
> > > > > ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp", 
> > > > > Time.seconds(5))
> > > > >                 )
> > > > >                 .split(new JsonNodeOutputSelector("eventType"));
> > > > > ...
> > > > >         eventsByType.select(...)
> > > > >                 .keyBy(new JsonNodeStringKeySelector("_key"))
> > > > >                 
> > > > > .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
> > > > >                         (KeySelector<JsonNode, Time>) 
> > > > > TasksMain::offsetPerMaster))
> > > > >                 .trigger(EventTimeTrigger.create())
> > > > >                 .aggregate(new CountsAggregator<>(), new 
> > > > > KeyTagger<>()) // <==== The CountsAggregator is seeing the data
> > > > >                 .print() // <==== HERE is where we get no output from 
> > > > > Kinesis... but Kafka and my Mock are just fine!
> > > > >
> > > > >

Reply via email to