Hi, You are using event time but are you assigning watermarks [1]? I do not see it in the code.
Best, Gary [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records On Fri, Nov 9, 2018 at 6:58 PM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Hi, > Any help is appreciated.Dug into this. *I can see the deserialized output > log from FlinkKinesisConsumer deserialization but it keeps looping to pull > from Kinesis Stream but never gets into the Windowing operation for > process() or apply().* > > FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream > and the deserialized output never seems to get into the apply() or > process() method of a Windowing operation. I can see the logs of > MonitoringMapKinesisSchema deserializing data back successfully from > Kinesis and converting into a POJO. > > Code: > > *//Create environment*: > StreamExecutionEnvironment env; > if (local) { > Configuration configuration = new Configuration(); > configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); > env = StreamExecutionEnvironment.createLocalEnvironment(1, > configuration); > } else { > env = StreamExecutionEnvironment.getExecutionEnvironment(); > } > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > *//create FlinkKinesisConsumer* > Properties kinesisConsumerConfig = new Properties(); > kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region); > kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, > "AUTO"); > kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, > "10000"); > kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, > "2000"); > kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > "TRIM_HORIZON"); > FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>( > kinesisTopicRead, new MonitoringMapKinesisSchema(), > kinesisConsumerConfig);*//deserialization works fine* > DataStream<Monitoring> kinesisStream = env > .addSource(kinesisConsumer); > KeyedStream<Monitoring, Tuple3<String, String, String>> > enrichedComponentInstanceStream1Key = kinesisStream > .keyBy(new KeySelector<Monitoring, Tuple3<String, String, > String>>() { > public Tuple3<String, String, String> > getKey(Monitoring mon) throws Exception { > return new Tuple3<String, String, > String>(mon.getComponent(), mon.getInstance(), mon.getOperation()); > } > }); > > WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> > enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key > > .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5)); > > DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = > enrichedComponentInstanceStream1Win > //.process(new Window5SecProcessing(gameId, FIVE_SECONDS, > COMPONENT_INSTANCE_OPERATION)) > .process(new Window5SecProcessing());*//never gets in > here* > //Gets into Window5SecProcessing.open() method during initialization but > never into the process method ???????? > private static class Window5SecProcessing extends > ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, > String, String>, TimeWindow> { > > private transient String interval; > private transient String gameId; > private transient String keyType; > private transient org.apache.flink.metrics.Histogram > fiveSecHistogram; > > private transient ValueState<Long> total5SecCountState; > private transient ValueStateDescriptor<Long> > total5SecCountValueStateDescriptor; > public Window5SecProcessing() { > > } > > public Window5SecProcessing(String gameId, String interval, String > keyType) { > this.gameId = gameId; > this.interval = interval; > this.keyType = keyType; > } > > @Override > public void clear(Context context) throws Exception { > super.clear(context); > KeyedStateStore keyedStateStore = context.windowState(); > > keyedStateStore.getState(total5SecCountValueStateDescriptor).clear(); > } > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > logger.debug("Gets in here fine -Window5SecProcessing -Entered > open - parameters:{}", parameters); > com.codahale.metrics.Histogram fiveSecHist = > new com.codahale.metrics.Histogram(new > SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); > this.fiveSecHistogram = new > DropwizardHistogramWrapper(fiveSecHist); > total5SecCountValueStateDescriptor = > new ValueStateDescriptor<Long>("total5SecCount", > Long.class, 0L); > total5SecCountState = > getRuntimeContext().getState(total5SecCountValueStateDescriptor); > } > > > public void process(Tuple3<String, String, String> currentKey1, > Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) > throws Exception { > logger.debug("@@never gets here@@Window5SecProcessing - > Entered process ");// > ... > } > > > > > On Mon, Nov 5, 2018 at 4:10 PM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Hi, >> Running in IntelliJ IDE on a Mac with 4 vProcessors. >> Code compiles fine. It never gets into the Window5SecProcessing's >> process().I am able to get data from the Kinesis Consumer and it is >> deserialized properly when I debug the code. It gets into the >> Window5SecProcessing.open() method for initialization. >> >> Not sure if I am failing with no slots available ??? >> In main(): >> ........ //trimmed a lot of code >> *FlinkKinesisConsumer<Monitoring> kinesisConsumer = >> getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., >> ...);* >> >> *DataStream<Monitoring> kinesisStream = env* >> * .addSource(kinesisConsumer)* >> * .uid(jobName + "KinesisSource");* >> *KeyedStream<Monitoring, Tuple3<String, String, String>> >> enrichedComponentInstanceStream1Key = kinesisStream* >> * .keyBy(new KeySelector<Monitoring, Tuple3<String, >> String, String>>() {* >> * public Tuple3<String, String, String> >> getKey(Monitoring mon) throws Exception {* >> * return new Tuple3<String, String, >> String>(mon.getComponent(), mon.getInstance(), mon.getOperation());* >> * }});* >> >> * WindowedStream<Monitoring, Tuple3<String, String, String>, >> TimeWindow> enrichedComponentInstanceStream1Win = >> enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));* >> >> * DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 >> = enrichedComponentInstanceStream1Win* >> * .process(new Window5SecProcessing(gameId, FIVE_SECONDS, >> COMPONENT_INSTANCE_OPERATION))* >> * .uid("Component Instance Operation Key Monitoring " + >> FIVE_SECONDS);* >> *enrichedComponentInstanceStream1.addSink(new >> SinkFunction<MonitoringGrouping>() {* >> * @Override* >> * public void invoke(MonitoringGrouping mg, Context context) >> throws Exception {* >> * //TODO call ES* >> * logger.debug("In enrichedComponentInstanceStream1 Sink >> received mg:{}", mg);* >> * }* >> * });* >> *Window processing class*: >> private static class Window5SecProcessing extends >> ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, >> String, String>, TimeWindow> { >> private transient Histogram fiveSecHist; >> private transient Histogram fiveMinHist; >> private transient org.apache.flink.metrics.Histogram >> fiveSecHistogram; >> private transient org.apache.flink.metrics.Histogram >> fiveMinHistogram; >> private transient ValueState<Long> total5SecCountState; >> private transient ValueStateDescriptor<Long> >> total5SecCountValueStateDescriptor; >> >> public Window5SecProcessing(String gameId, String interval, >> String keyType) { >> ... >> } >> >> public void open(Configuration parameters) throws Exception { >> super.open(parameters); >> logger.debug("Window5SecProcessing -Entered open - >> parameters:{}", parameters);//gets here >> com.codahale.metrics.Histogram fiveSecHist = >> new com.codahale.metrics.Histogram(new >> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); >> this.fiveSecHistogram = new >> DropwizardHistogramWrapper(fiveSecHist); >> total5SecCountValueStateDescriptor = >> new ValueStateDescriptor<Long>("total5SecCount", >> Long.class, 0L); >> total5SecCountState = >> getRuntimeContext().getState(total5SecCountValueStateDescriptor); >> } >> ...... >> >> * public void process(Tuple3<String, String, String> currentKey1, >> Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) >> throws Exception {* >> * logger.debug("Window5SecProcessing - Entered process >> ");//never gets here* >> * Tuple3<String, String, String> currentKey = (Tuple3<String, >> String, String>) currentKey1;* >> * ....* >> * }* >> >> } >> At 1 point in the logs, I seem to see that there are no slots available >> ????? Is that the problem- how can I fix that if that is the case to test >> locally on my Mac ?? >> *Log:* >> flink-akka.actor.default-dispatcher-71 DEBUG >> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Slot Pool >> Status: >> status: connected to >> akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc >> registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed] >> *available slots: []* >> allocated slots: [[AllocatedSlot >> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ >> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]] >> pending requests: [] >> sharing groups: { >> -------- 5a0ae59368145d715b3cc0d39ba6c05a -------- >> { >> groupId=5a0ae59368145d715b3cc0d39ba6c05a >> unresolved={} >> resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost >> (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, >> allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, >> groupId=null, physicalSlot=AllocatedSlot >> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ >> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, >> children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, >> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), >> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, >> group=8587a27f4c92252839400ce17054b261}, >> SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, >> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), >> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, >> group=a43726daeecb466da4d91c7b1adefb1d}]}]} >> all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, >> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), >> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, >> group=8587a27f4c92252839400ce17054b261}, >> SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, >> allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, >> groupId=null, physicalSlot=AllocatedSlot >> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ >> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, >> children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, >> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), >> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, >> group=8587a27f4c92252839400ce17054b261}, >> SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, >> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), >> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, >> group=a43726daeecb466da4d91c7b1adefb1d}]}, >> SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, >> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), >> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, >> group=a43726daeecb466da4d91c7b1adefb1d}} >> } } >> >> TIA, >> >>