Hi,
Running in IntelliJ IDE on a Mac with 4 vProcessors.
Code compiles fine. It never gets into the Window5SecProcessing's
process().I am able to get data from the Kinesis Consumer and it is
deserialized properly when I debug the code. It gets into the
Window5SecProcessing.open() method for initialization.

Not sure if I am failing with no slots available ???
In main():
........ //trimmed a lot of code
*FlinkKinesisConsumer<Monitoring> kinesisConsumer =
getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ...,
...);*

*DataStream<Monitoring> kinesisStream = env*
*                .addSource(kinesisConsumer)*
*                .uid(jobName + "KinesisSource");*
*KeyedStream<Monitoring, Tuple3<String, String, String>>
enrichedComponentInstanceStream1Key = kinesisStream*
*                .keyBy(new KeySelector<Monitoring, Tuple3<String, String,
String>>() {*
*                    public Tuple3<String, String, String>
getKey(Monitoring mon) throws Exception {*
*                        return new Tuple3<String, String,
String>(mon.getComponent(), mon.getInstance(), mon.getOperation());*
*                    }});*

*        WindowedStream<Monitoring, Tuple3<String, String, String>,
TimeWindow> enrichedComponentInstanceStream1Win =
enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));*

*        DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 =
enrichedComponentInstanceStream1Win*
*                .process(new Window5SecProcessing(gameId, FIVE_SECONDS,
COMPONENT_INSTANCE_OPERATION))*
*                .uid("Component Instance Operation Key Monitoring " +
FIVE_SECONDS);*
*enrichedComponentInstanceStream1.addSink(new
SinkFunction<MonitoringGrouping>() {*
*            @Override*
*            public void invoke(MonitoringGrouping mg, Context context)
throws Exception {*
*                //TODO call ES*
*                logger.debug("In enrichedComponentInstanceStream1 Sink
received mg:{}", mg);*
*            }*
*        });*
*Window processing class*:
private static class Window5SecProcessing extends
ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String,
String, String>, TimeWindow> {
private transient Histogram fiveSecHist;
        private transient Histogram fiveMinHist;
        private transient org.apache.flink.metrics.Histogram
fiveSecHistogram;
        private transient org.apache.flink.metrics.Histogram
fiveMinHistogram;
        private transient ValueState<Long> total5SecCountState;
        private transient ValueStateDescriptor<Long>
total5SecCountValueStateDescriptor;

        public Window5SecProcessing(String gameId, String interval, String
keyType) {
            ...
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logger.debug("Window5SecProcessing -Entered open -
parameters:{}", parameters);//gets here
            com.codahale.metrics.Histogram fiveSecHist =
                    new com.codahale.metrics.Histogram(new
SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
            this.fiveSecHistogram = new
DropwizardHistogramWrapper(fiveSecHist);
            total5SecCountValueStateDescriptor =
                    new ValueStateDescriptor<Long>("total5SecCount",
Long.class, 0L);
            total5SecCountState =
getRuntimeContext().getState(total5SecCountValueStateDescriptor);
        }
......

       * public void process(Tuple3<String, String, String> currentKey1,
Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out)
throws Exception {*
*            logger.debug("Window5SecProcessing - Entered process
");//never gets here*
*            Tuple3<String, String, String> currentKey = (Tuple3<String,
String, String>) currentKey1;*
*            ....*
*        }*

    }
At 1 point in the logs, I seem to see that there are no slots available
????? Is that the problem- how can I fix that if that is the case to test
locally on my Mac ??
*Log:*
flink-akka.actor.default-dispatcher-71 DEBUG
org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Slot Pool
Status:
status: connected to
akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
*available slots: []*
allocated slots: [[AllocatedSlot
AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
pending requests: []
sharing groups: {
-------- 5a0ae59368145d715b3cc0d39ba6c05a --------
{
groupId=5a0ae59368145d715b3cc0d39ba6c05a
unresolved={}
resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost
(dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f},
allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584},
groupId=null, physicalSlot=AllocatedSlot
AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0,
children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
group=8587a27f4c92252839400ce17054b261},
SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
group=a43726daeecb466da4d91c7b1adefb1d}]}]}
all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
group=8587a27f4c92252839400ce17054b261},
SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f},
allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584},
groupId=null, physicalSlot=AllocatedSlot
AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0,
children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
group=8587a27f4c92252839400ce17054b261},
SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
group=a43726daeecb466da4d91c7b1adefb1d}]},
SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
group=a43726daeecb466da4d91c7b1adefb1d}}
} }

TIA,

Reply via email to