Hi Gary, Bang on the money. I did not have an assigned Watermark and once I put that in, the code entered the process() method. Thx a ton for your help.Life-saver!!!!
DataStream<Monitoring> kinesisStream = env .addSource(kinesisConsumer) .assignTimestampsAndWatermarks(new MonitoringAssigner())//<============= On Fri, Nov 9, 2018 at 10:02 AM Gary Yao <g...@data-artisans.com> wrote: > Hi, > > You are using event time but are you assigning watermarks [1]? I do not > see it > in the code. > > Best, > Gary > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records > > On Fri, Nov 9, 2018 at 6:58 PM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Hi, >> Any help is appreciated.Dug into this. *I can see the deserialized >> output log from FlinkKinesisConsumer deserialization but it keeps looping >> to pull from Kinesis Stream but never gets into the Windowing operation for >> process() or apply().* >> >> FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream >> and the deserialized output never seems to get into the apply() or >> process() method of a Windowing operation. I can see the logs of >> MonitoringMapKinesisSchema deserializing data back successfully from >> Kinesis and converting into a POJO. >> >> Code: >> >> *//Create environment*: >> StreamExecutionEnvironment env; >> if (local) { >> Configuration configuration = new Configuration(); >> configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); >> env = StreamExecutionEnvironment.createLocalEnvironment(1, >> configuration); >> } else { >> env = StreamExecutionEnvironment.getExecutionEnvironment(); >> } >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> *//create FlinkKinesisConsumer* >> Properties kinesisConsumerConfig = new Properties(); >> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region); >> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, >> "AUTO"); >> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, >> "10000"); >> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, >> "2000"); >> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, >> "TRIM_HORIZON"); >> FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>( >> kinesisTopicRead, new MonitoringMapKinesisSchema(), >> kinesisConsumerConfig);*//deserialization works fine* >> DataStream<Monitoring> kinesisStream = env >> .addSource(kinesisConsumer); >> KeyedStream<Monitoring, Tuple3<String, String, String>> >> enrichedComponentInstanceStream1Key = kinesisStream >> .keyBy(new KeySelector<Monitoring, Tuple3<String, String, >> String>>() { >> public Tuple3<String, String, String> >> getKey(Monitoring mon) throws Exception { >> return new Tuple3<String, String, >> String>(mon.getComponent(), mon.getInstance(), mon.getOperation()); >> } >> }); >> >> WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> >> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key >> >> .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5)); >> >> DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = >> enrichedComponentInstanceStream1Win >> //.process(new Window5SecProcessing(gameId, FIVE_SECONDS, >> COMPONENT_INSTANCE_OPERATION)) >> .process(new Window5SecProcessing());*//never gets in >> here* >> //Gets into Window5SecProcessing.open() method during initialization but >> never into the process method ???????? >> private static class Window5SecProcessing extends >> ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, >> String, String>, TimeWindow> { >> >> private transient String interval; >> private transient String gameId; >> private transient String keyType; >> private transient org.apache.flink.metrics.Histogram >> fiveSecHistogram; >> >> private transient ValueState<Long> total5SecCountState; >> private transient ValueStateDescriptor<Long> >> total5SecCountValueStateDescriptor; >> public Window5SecProcessing() { >> >> } >> >> public Window5SecProcessing(String gameId, String interval, >> String keyType) { >> this.gameId = gameId; >> this.interval = interval; >> this.keyType = keyType; >> } >> >> @Override >> public void clear(Context context) throws Exception { >> super.clear(context); >> KeyedStateStore keyedStateStore = context.windowState(); >> >> keyedStateStore.getState(total5SecCountValueStateDescriptor).clear(); >> } >> >> @Override >> public void open(Configuration parameters) throws Exception { >> super.open(parameters); >> logger.debug("Gets in here fine -Window5SecProcessing >> -Entered open - parameters:{}", parameters); >> com.codahale.metrics.Histogram fiveSecHist = >> new com.codahale.metrics.Histogram(new >> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); >> this.fiveSecHistogram = new >> DropwizardHistogramWrapper(fiveSecHist); >> total5SecCountValueStateDescriptor = >> new ValueStateDescriptor<Long>("total5SecCount", >> Long.class, 0L); >> total5SecCountState = >> getRuntimeContext().getState(total5SecCountValueStateDescriptor); >> } >> >> >> public void process(Tuple3<String, String, String> currentKey1, >> Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) >> throws Exception { >> logger.debug("@@never gets here@@Window5SecProcessing - >> Entered process ");// >> ... >> } >> >> >> >> >> On Mon, Nov 5, 2018 at 4:10 PM Vijay Balakrishnan <bvija...@gmail.com> >> wrote: >> >>> Hi, >>> Running in IntelliJ IDE on a Mac with 4 vProcessors. >>> Code compiles fine. It never gets into the Window5SecProcessing's >>> process().I am able to get data from the Kinesis Consumer and it is >>> deserialized properly when I debug the code. It gets into the >>> Window5SecProcessing.open() method for initialization. >>> >>> Not sure if I am failing with no slots available ??? >>> In main(): >>> ........ //trimmed a lot of code >>> *FlinkKinesisConsumer<Monitoring> kinesisConsumer = >>> getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., >>> ...);* >>> >>> *DataStream<Monitoring> kinesisStream = env* >>> * .addSource(kinesisConsumer)* >>> * .uid(jobName + "KinesisSource");* >>> *KeyedStream<Monitoring, Tuple3<String, String, String>> >>> enrichedComponentInstanceStream1Key = kinesisStream* >>> * .keyBy(new KeySelector<Monitoring, Tuple3<String, >>> String, String>>() {* >>> * public Tuple3<String, String, String> >>> getKey(Monitoring mon) throws Exception {* >>> * return new Tuple3<String, String, >>> String>(mon.getComponent(), mon.getInstance(), mon.getOperation());* >>> * }});* >>> >>> * WindowedStream<Monitoring, Tuple3<String, String, String>, >>> TimeWindow> enrichedComponentInstanceStream1Win = >>> enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));* >>> >>> * DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 >>> = enrichedComponentInstanceStream1Win* >>> * .process(new Window5SecProcessing(gameId, FIVE_SECONDS, >>> COMPONENT_INSTANCE_OPERATION))* >>> * .uid("Component Instance Operation Key Monitoring " + >>> FIVE_SECONDS);* >>> *enrichedComponentInstanceStream1.addSink(new >>> SinkFunction<MonitoringGrouping>() {* >>> * @Override* >>> * public void invoke(MonitoringGrouping mg, Context context) >>> throws Exception {* >>> * //TODO call ES* >>> * logger.debug("In enrichedComponentInstanceStream1 Sink >>> received mg:{}", mg);* >>> * }* >>> * });* >>> *Window processing class*: >>> private static class Window5SecProcessing extends >>> ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, >>> String, String>, TimeWindow> { >>> private transient Histogram fiveSecHist; >>> private transient Histogram fiveMinHist; >>> private transient org.apache.flink.metrics.Histogram >>> fiveSecHistogram; >>> private transient org.apache.flink.metrics.Histogram >>> fiveMinHistogram; >>> private transient ValueState<Long> total5SecCountState; >>> private transient ValueStateDescriptor<Long> >>> total5SecCountValueStateDescriptor; >>> >>> public Window5SecProcessing(String gameId, String interval, >>> String keyType) { >>> ... >>> } >>> >>> public void open(Configuration parameters) throws Exception { >>> super.open(parameters); >>> logger.debug("Window5SecProcessing -Entered open - >>> parameters:{}", parameters);//gets here >>> com.codahale.metrics.Histogram fiveSecHist = >>> new com.codahale.metrics.Histogram(new >>> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); >>> this.fiveSecHistogram = new >>> DropwizardHistogramWrapper(fiveSecHist); >>> total5SecCountValueStateDescriptor = >>> new ValueStateDescriptor<Long>("total5SecCount", >>> Long.class, 0L); >>> total5SecCountState = >>> getRuntimeContext().getState(total5SecCountValueStateDescriptor); >>> } >>> ...... >>> >>> * public void process(Tuple3<String, String, String> >>> currentKey1, Context ctx, Iterable<Monitoring> input, >>> Collector<MonitoringGrouping> out) throws Exception {* >>> * logger.debug("Window5SecProcessing - Entered process >>> ");//never gets here* >>> * Tuple3<String, String, String> currentKey = (Tuple3<String, >>> String, String>) currentKey1;* >>> * ....* >>> * }* >>> >>> } >>> At 1 point in the logs, I seem to see that there are no slots available >>> ????? Is that the problem- how can I fix that if that is the case to test >>> locally on my Mac ?? >>> *Log:* >>> flink-akka.actor.default-dispatcher-71 DEBUG >>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Slot Pool >>> Status: >>> status: connected to >>> akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc >>> registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed] >>> *available slots: []* >>> allocated slots: [[AllocatedSlot >>> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ >>> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]] >>> pending requests: [] >>> sharing groups: { >>> -------- 5a0ae59368145d715b3cc0d39ba6c05a -------- >>> { >>> groupId=5a0ae59368145d715b3cc0d39ba6c05a >>> unresolved={} >>> resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost >>> (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, >>> allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, >>> groupId=null, physicalSlot=AllocatedSlot >>> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ >>> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, >>> children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, >>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), >>> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, >>> group=8587a27f4c92252839400ce17054b261}, >>> SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, >>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), >>> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, >>> group=a43726daeecb466da4d91c7b1adefb1d}]}]} >>> all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, >>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), >>> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, >>> group=8587a27f4c92252839400ce17054b261}, >>> SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, >>> allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, >>> groupId=null, physicalSlot=AllocatedSlot >>> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ >>> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, >>> children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, >>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), >>> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, >>> group=8587a27f4c92252839400ce17054b261}, >>> SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, >>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), >>> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, >>> group=a43726daeecb466da4d91c7b1adefb1d}]}, >>> SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, >>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), >>> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, >>> group=a43726daeecb466da4d91c7b1adefb1d}} >>> } } >>> >>> TIA, >>> >>>