Hi, Running in IntelliJ IDE on a Mac with 4 vProcessors. Code compiles fine. It never gets into the Window5SecProcessing's process().I am able to get data from the Kinesis Consumer and it is deserialized properly when I debug the code. It gets into the Window5SecProcessing.open() method for initialization.
Not sure if I am failing with no slots available ??? In main(): ........ //trimmed a lot of code *FlinkKinesisConsumer<Monitoring> kinesisConsumer = getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ..., ...);* *DataStream<Monitoring> kinesisStream = env* * .addSource(kinesisConsumer)* * .uid(jobName + "KinesisSource");* *KeyedStream<Monitoring, Tuple3<String, String, String>> enrichedComponentInstanceStream1Key = kinesisStream* * .keyBy(new KeySelector<Monitoring, Tuple3<String, String, String>>() {* * public Tuple3<String, String, String> getKey(Monitoring mon) throws Exception {* * return new Tuple3<String, String, String>(mon.getComponent(), mon.getInstance(), mon.getOperation());* * }});* * WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));* * DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = enrichedComponentInstanceStream1Win* * .process(new Window5SecProcessing(gameId, FIVE_SECONDS, COMPONENT_INSTANCE_OPERATION))* * .uid("Component Instance Operation Key Monitoring " + FIVE_SECONDS);* *enrichedComponentInstanceStream1.addSink(new SinkFunction<MonitoringGrouping>() {* * @Override* * public void invoke(MonitoringGrouping mg, Context context) throws Exception {* * //TODO call ES* * logger.debug("In enrichedComponentInstanceStream1 Sink received mg:{}", mg);* * }* * });* *Window processing class*: private static class Window5SecProcessing extends ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String, String, String>, TimeWindow> { private transient Histogram fiveSecHist; private transient Histogram fiveMinHist; private transient org.apache.flink.metrics.Histogram fiveSecHistogram; private transient org.apache.flink.metrics.Histogram fiveMinHistogram; private transient ValueState<Long> total5SecCountState; private transient ValueStateDescriptor<Long> total5SecCountValueStateDescriptor; public Window5SecProcessing(String gameId, String interval, String keyType) { ... } public void open(Configuration parameters) throws Exception { super.open(parameters); logger.debug("Window5SecProcessing -Entered open - parameters:{}", parameters);//gets here com.codahale.metrics.Histogram fiveSecHist = new com.codahale.metrics.Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)); this.fiveSecHistogram = new DropwizardHistogramWrapper(fiveSecHist); total5SecCountValueStateDescriptor = new ValueStateDescriptor<Long>("total5SecCount", Long.class, 0L); total5SecCountState = getRuntimeContext().getState(total5SecCountValueStateDescriptor); } ...... * public void process(Tuple3<String, String, String> currentKey1, Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out) throws Exception {* * logger.debug("Window5SecProcessing - Entered process ");//never gets here* * Tuple3<String, String, String> currentKey = (Tuple3<String, String, String>) currentKey1;* * ....* * }* } At 1 point in the logs, I seem to see that there are no slots available ????? Is that the problem- how can I fix that if that is the case to test locally on my Mac ?? *Log:* flink-akka.actor.default-dispatcher-71 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Slot Pool Status: status: connected to akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed] *available slots: []* allocated slots: [[AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]] pending requests: [] sharing groups: { -------- 5a0ae59368145d715b3cc0d39ba6c05a -------- { groupId=5a0ae59368145d715b3cc0d39ba6c05a unresolved={} resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}]} all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}, allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584}, groupId=null, physicalSlot=AllocatedSlot AllocationID{e13f284707cafef978a3c59f27e7f3f3} @ 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0, children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{a3176498368d1123639f3ee94a9798b6}, group=8587a27f4c92252839400ce17054b261}, SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}]}, SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}), request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}, group=a43726daeecb466da4d91c7b1adefb1d}} } } TIA,