Hi Gary,
Just posted the code.Pls let me know if that clarifies the problem. Have
been digging into how the FlinkKinesisConsumer deserialized output gets
passed into the process() or apply() method to no avail. The coding pattern
I used matches all the fink-examples I have seen for Flink 1.6.1
TIA,
Vijay

On Fri, Nov 9, 2018 at 9:53 AM Gary Yao <g...@data-artisans.com> wrote:

> Hi,
>
> If the job is actually running and consuming from Kinesis, the log you
> posted
> is unrelated to your problem. To understand why the process function is not
> invoked, we would need to see more of your code, or you would need to
> provide
> an executable example. The log only shows that all offered slots are
> occupied
> by tasks of your job.
>
> Best,
> Gary
>
> On Tue, Nov 6, 2018 at 1:10 AM Vijay Balakrishnan <bvija...@gmail.com>
> wrote:
>
>> Hi,
>> Running in IntelliJ IDE on a Mac with 4 vProcessors.
>> Code compiles fine. It never gets into the Window5SecProcessing's
>> process().I am able to get data from the Kinesis Consumer and it is
>> deserialized properly when I debug the code. It gets into the
>> Window5SecProcessing.open() method for initialization.
>>
>> Not sure if I am failing with no slots available ???
>> In main():
>> ........ //trimmed a lot of code
>> *FlinkKinesisConsumer<Monitoring> kinesisConsumer =
>> getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ...,
>> ...);*
>>
>> *DataStream<Monitoring> kinesisStream = env*
>> *                .addSource(kinesisConsumer)*
>> *                .uid(jobName + "KinesisSource");*
>> *KeyedStream<Monitoring, Tuple3<String, String, String>>
>> enrichedComponentInstanceStream1Key = kinesisStream*
>> *                .keyBy(new KeySelector<Monitoring, Tuple3<String,
>> String, String>>() {*
>> *                    public Tuple3<String, String, String>
>> getKey(Monitoring mon) throws Exception {*
>> *                        return new Tuple3<String, String,
>> String>(mon.getComponent(), mon.getInstance(), mon.getOperation());*
>> *                    }});*
>>
>> *        WindowedStream<Monitoring, Tuple3<String, String, String>,
>> TimeWindow> enrichedComponentInstanceStream1Win =
>> enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));*
>>
>> *        DataStream<MonitoringGrouping> enrichedComponentInstanceStream1
>> = enrichedComponentInstanceStream1Win*
>> *                .process(new Window5SecProcessing(gameId, FIVE_SECONDS,
>> COMPONENT_INSTANCE_OPERATION))*
>> *                .uid("Component Instance Operation Key Monitoring " +
>> FIVE_SECONDS);*
>> *enrichedComponentInstanceStream1.addSink(new
>> SinkFunction<MonitoringGrouping>() {*
>> *            @Override*
>> *            public void invoke(MonitoringGrouping mg, Context context)
>> throws Exception {*
>> *                //TODO call ES*
>> *                logger.debug("In enrichedComponentInstanceStream1 Sink
>> received mg:{}", mg);*
>> *            }*
>> *        });*
>> *Window processing class*:
>> private static class Window5SecProcessing extends
>> ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String,
>> String, String>, TimeWindow> {
>> private transient Histogram fiveSecHist;
>>         private transient Histogram fiveMinHist;
>>         private transient org.apache.flink.metrics.Histogram
>> fiveSecHistogram;
>>         private transient org.apache.flink.metrics.Histogram
>> fiveMinHistogram;
>>         private transient ValueState<Long> total5SecCountState;
>>         private transient ValueStateDescriptor<Long>
>> total5SecCountValueStateDescriptor;
>>
>>         public Window5SecProcessing(String gameId, String interval,
>> String keyType) {
>>             ...
>>         }
>>
>>         public void open(Configuration parameters) throws Exception {
>>             super.open(parameters);
>>             logger.debug("Window5SecProcessing -Entered open -
>> parameters:{}", parameters);//gets here
>>             com.codahale.metrics.Histogram fiveSecHist =
>>                     new com.codahale.metrics.Histogram(new
>> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
>>             this.fiveSecHistogram = new
>> DropwizardHistogramWrapper(fiveSecHist);
>>             total5SecCountValueStateDescriptor =
>>                     new ValueStateDescriptor<Long>("total5SecCount",
>> Long.class, 0L);
>>             total5SecCountState =
>> getRuntimeContext().getState(total5SecCountValueStateDescriptor);
>>         }
>> ......
>>
>>        * public void process(Tuple3<String, String, String> currentKey1,
>> Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out)
>> throws Exception {*
>> *            logger.debug("Window5SecProcessing - Entered process
>> ");//never gets here*
>> *            Tuple3<String, String, String> currentKey = (Tuple3<String,
>> String, String>) currentKey1;*
>> *            ....*
>> *        }*
>>
>>     }
>> At 1 point in the logs, I seem to see that there are no slots available
>> ????? Is that the problem- how can I fix that if that is the case to test
>> locally on my Mac ??
>> *Log:*
>> flink-akka.actor.default-dispatcher-71 DEBUG
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Slot Pool
>> Status:
>> status: connected to
>> akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
>> registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
>> *available slots: []*
>> allocated slots: [[AllocatedSlot
>> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
>> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
>> pending requests: []
>> sharing groups: {
>> -------- 5a0ae59368145d715b3cc0d39ba6c05a --------
>> {
>> groupId=5a0ae59368145d715b3cc0d39ba6c05a
>> unresolved={}
>> resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost
>> (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f},
>> allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584},
>> groupId=null, physicalSlot=AllocatedSlot
>> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
>> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0,
>> children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>> group=8587a27f4c92252839400ce17054b261},
>> SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>> group=a43726daeecb466da4d91c7b1adefb1d}]}]}
>> all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>> group=8587a27f4c92252839400ce17054b261},
>> SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f},
>> allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584},
>> groupId=null, physicalSlot=AllocatedSlot
>> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
>> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0,
>> children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>> group=8587a27f4c92252839400ce17054b261},
>> SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>> group=a43726daeecb466da4d91c7b1adefb1d}]},
>> SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>> group=a43726daeecb466da4d91c7b1adefb1d}}
>> } }
>>
>> TIA,
>>
>>

Reply via email to