Hi, Any help is appreciated.Dug into this. *I can see the deserialized output log from FlinkKinesisConsumer deserialization but it keeps looping to pull from Kinesis Stream but never gets into the Windowing operation for process() or apply().*
FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream and the deserialized output never seems to get into the apply() or process() method of a Windowing operation. I can see the logs of MonitoringMapKinesisSchema deserializing data back successfully from Kinesis and converting into a POJO. Code: *//Create environment*: StreamExecutionEnvironment env; if (local) { Configuration configuration = new Configuration(); configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); env = StreamExecutionEnvironment.createLocalEnvironment(1, configuration); } else { env = StreamExecutionEnvironment.getExecutionEnvironment(); } env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); *//create FlinkKinesisConsumer* Properties kinesisConsumerConfig = new Properties(); kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region); kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO"); kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000"); kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000"); kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>( kinesisTopicRead, new MonitoringMapKinesisSchema(), kinesisConsumerConfig);*//deserialization works fine* DataStream<Monitoring> kinesisStream = env .addSource(kinesisConsumer); KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() { public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception { return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation()); } }); WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5)); DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win //.process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION)) .process(new Window5SecProcessing());*//never gets in here* //Gets into Window5SecProcessing.open() method during initialization but never into the process method ???????? private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> { private transient String interval; private transient String gameId; private transient String keyType; private transient org.apache.flink.metrics.Histogram fiveSecHistogram; private transient ValueState<Long> total5SecCountState; private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor; public Window5SecProcessing() { } public Window5SecProcessing(String gameId, String interval, String keyType) { this.gameId = gameId; this.interval = interval; this.keyType = keyType; } @Override public void clear(Context context) throws Exception { super.clear(context); KeyedStateStore keyedStateStore = context.windowState(); keyedStateStore.getState(total5SecCountValueStateDescriptor).clear(); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); logger.debug("Gets in here fine -Window5SecProcessing -Entered open - parameters:{}", parameters); com.codahale.metrics.Histogram fiveSecHist = new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist); total5SecCountValueStateDescriptor = new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L); total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor); } public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception { logger.debug("@@never gets here@@Window5SecProcessing - Entered process ");// ... } On Mon, Nov 5, 2018 at 4:10 PM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Hi, > Running in IntelliJ IDE on a Mac with 4 vProcessors. > Code compiles fine. It never gets into the Window5SecProcessing's > process().I am able to get data from the Kinesis Consumer and it is > deserialized properly when I debug the code. It gets into the > Window5SecProcessing.open() method for initialization. > > Not sure if I am failing with no slots available ??? > In main(): > ........ //trimmed a lot of code > *FlinkKinesisConsumer<Monitoring> kinesisConsumer = > getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., > ...);* > > *DataStream<Monitoring> kinesisStream = env* > * .addSource(kinesisConsumer)* > * .uid(jobName + "KinesisSource");* > *KeyedStream<Monitoring, Tuple3<String, String, String>> > enrichedComponentInstanceStream1Key = kinesisStream* > * .keyBy(new KeySelector<Monitoring, Tuple3<String, String, > String>>() {* > * public Tuple3<String, String, String> > getKey(Monitoring mon) throws Exception {* > * return new Tuple3<String, String, > String>(mon.getComponent(), mon.getInstance(), mon.getOperation());* > * }});* > > * WindowedStream<Monitoring, Tuple3<String, String, String>, > TimeWindow> enrichedComponentInstanceStream1Win = > enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));* > > * DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = > enrichedComponentInstanceStream1Win* > * .process(new Window5SecProcessing(gameId, FIVE_SECONDS, > COMPONENT_INSTANCE_OPERATION))* > * .uid("Component Instance Operation Key Monitoring " + > FIVE_SECONDS);* > *enrichedComponentInstanceStream1.addSink(new > SinkFunction<MonitoringGrouping>() {* > * @Override* > * public void invoke(MonitoringGrouping mg, Context context) > throws Exception {* > * //TODO call ES* > * logger.debug("In enrichedComponentInstanceStream1 Sink > received mg:{}", mg);* > * }* > * });* > *Window processing class*: > private static class Window5SecProcessing extends > ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, > String, String>, TimeWindow> { > private transient Histogram fiveSecHist; > private transient Histogram fiveMinHist; > private transient org.apache.flink.metrics.Histogram > fiveSecHistogram; > private transient org.apache.flink.metrics.Histogram > fiveMinHistogram; > private transient ValueState<Long> total5SecCountState; > private transient ValueStateDescriptor<Long> > total5SecCountValueStateDescriptor; > > public Window5SecProcessing(String gameId, String interval, String > keyType) { > ... > } > > public void open(Configuration parameters) throws Exception { > super.open(parameters); > logger.debug("Window5SecProcessing -Entered open - > parameters:{}", parameters);//gets here > com.codahale.metrics.Histogram fiveSecHist = > new com.codahale.metrics.Histogram(new > SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); > this.fiveSecHistogram = new > DropwizardHistogramWrapper(fiveSecHist); > total5SecCountValueStateDescriptor = > new ValueStateDescriptor<Long>("total5SecCount", > Long.class, 0L); > total5SecCountState = > getRuntimeContext().getState(total5SecCountValueStateDescriptor); > } > ...... > > * public void process(Tuple3<String, String, String> currentKey1, > Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) > throws Exception {* > * logger.debug("Window5SecProcessing - Entered process > ");//never gets here* > * Tuple3<String, String, String> currentKey = (Tuple3<String, > String, String>) currentKey1;* > * ....* > * }* > > } > At 1 point in the logs, I seem to see that there are no slots available > ????? Is that the problem- how can I fix that if that is the case to test > locally on my Mac ?? > *Log:* > flink-akka.actor.default-dispatcher-71 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Slot Pool > Status: > status: connected to > akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc > registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed] > *available slots: []* > allocated slots: [[AllocatedSlot > AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ > 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]] > pending requests: [] > sharing groups: { > -------- 5a0ae59368145d715b3cc0d39ba6c05a -------- > { > groupId=5a0ae59368145d715b3cc0d39ba6c05a > unresolved={} > resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost > (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, > allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, > groupId=null, physicalSlot=AllocatedSlot > AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ > 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, > children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, > allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), > request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, > group=8587a27f4c92252839400ce17054b261}, > SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, > allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), > request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, > group=a43726daeecb466da4d91c7b1adefb1d}]}]} > all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, > allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), > request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, > group=8587a27f4c92252839400ce17054b261}, > SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, > allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, > groupId=null, physicalSlot=AllocatedSlot > AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ > 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, > children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, > allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), > request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, > group=8587a27f4c92252839400ce17054b261}, > SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, > allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), > request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, > group=a43726daeecb466da4d91c7b1adefb1d}]}, > SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, > allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), > request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, > group=a43726daeecb466da4d91c7b1adefb1d}} > } } > > TIA, > >