Hi,

You are using event time but are you assigning watermarks [1]? I do not see
it
in the code.

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records

On Fri, Nov 9, 2018 at 6:58 PM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Hi,
> Any help is appreciated.Dug into this. *I can see the deserialized output
> log from FlinkKinesisConsumer deserialization but it keeps looping to pull
> from Kinesis Stream but never gets into the Windowing operation for
> process() or apply().*
>
> FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream
> and the deserialized output never seems to get into the apply() or
> process() method of a Windowing operation. I can see the logs of
> MonitoringMapKinesisSchema deserializing data back successfully from
> Kinesis and converting into a POJO.
>
> Code:
>
> *//Create environment*:
> StreamExecutionEnvironment env;
> if (local) {
>     Configuration configuration = new Configuration();
>     configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
>     env = StreamExecutionEnvironment.createLocalEnvironment(1,
> configuration);
> } else {
>     env = StreamExecutionEnvironment.getExecutionEnvironment();
> }
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> *//create FlinkKinesisConsumer*
> Properties kinesisConsumerConfig = new Properties();
> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
> "AUTO");
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
> "10000");
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
> "2000");
> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> "TRIM_HORIZON");
> FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>(
>         kinesisTopicRead, new MonitoringMapKinesisSchema(),
> kinesisConsumerConfig);*//deserialization works fine*
> DataStream<Monitoring> kinesisStream = env
>                 .addSource(kinesisConsumer);
> KeyedStream<Monitoring, Tuple3<String, String, String>>
> enrichedComponentInstanceStream1Key = kinesisStream
>                 .keyBy(new KeySelector<Monitoring, Tuple3<String, String,
> String>>() {
>                     public Tuple3<String, String, String>
> getKey(Monitoring mon) throws Exception {
>                         return new Tuple3<String, String,
> String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
>                     }
>                 });
>
> WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow>
> enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key
>
> .timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));
>
> DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 =
> enrichedComponentInstanceStream1Win
>                 //.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
> COMPONENT_INSTANCE_OPERATION))
>                 .process(new Window5SecProcessing());*//never gets in
> here*
> //Gets into Window5SecProcessing.open() method during initialization but
> never into the process method ????????
> private static class Window5SecProcessing extends
> ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String,
> String, String>, TimeWindow> {
>
>         private transient String interval;
>         private transient String gameId;
>         private transient String keyType;
>         private transient org.apache.flink.metrics.Histogram
> fiveSecHistogram;
>
>         private transient ValueState<Long> total5SecCountState;
>         private transient ValueStateDescriptor<Long>
> total5SecCountValueStateDescriptor;
>         public Window5SecProcessing() {
>
>         }
>
>         public Window5SecProcessing(String gameId, String interval, String
> keyType) {
>             this.gameId = gameId;
>             this.interval = interval;
>             this.keyType = keyType;
>         }
>
>         @Override
>         public void clear(Context context) throws Exception {
>             super.clear(context);
>             KeyedStateStore keyedStateStore = context.windowState();
>
> keyedStateStore.getState(total5SecCountValueStateDescriptor).clear();
>         }
>
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             super.open(parameters);
>             logger.debug("Gets in here fine -Window5SecProcessing -Entered
> open - parameters:{}", parameters);
>             com.codahale.metrics.Histogram fiveSecHist =
>                     new com.codahale.metrics.Histogram(new
> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
>             this.fiveSecHistogram = new
> DropwizardHistogramWrapper(fiveSecHist);
>             total5SecCountValueStateDescriptor =
>                     new ValueStateDescriptor<Long>("total5SecCount",
> Long.class, 0L);
>             total5SecCountState =
> getRuntimeContext().getState(total5SecCountValueStateDescriptor);
>         }
>
>
>         public void process(Tuple3<String, String, String> currentKey1,
> Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out)
> throws Exception {
>             logger.debug("@@never gets here@@Window5SecProcessing -
> Entered process ");//
> ...
> }
>
>
>
>
> On Mon, Nov 5, 2018 at 4:10 PM Vijay Balakrishnan <bvija...@gmail.com>
> wrote:
>
>> Hi,
>> Running in IntelliJ IDE on a Mac with 4 vProcessors.
>> Code compiles fine. It never gets into the Window5SecProcessing's
>> process().I am able to get data from the Kinesis Consumer and it is
>> deserialized properly when I debug the code. It gets into the
>> Window5SecProcessing.open() method for initialization.
>>
>> Not sure if I am failing with no slots available ???
>> In main():
>> ........ //trimmed a lot of code
>> *FlinkKinesisConsumer<Monitoring> kinesisConsumer =
>> getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ...,
>> ...);*
>>
>> *DataStream<Monitoring> kinesisStream = env*
>> *                .addSource(kinesisConsumer)*
>> *                .uid(jobName + "KinesisSource");*
>> *KeyedStream<Monitoring, Tuple3<String, String, String>>
>> enrichedComponentInstanceStream1Key = kinesisStream*
>> *                .keyBy(new KeySelector<Monitoring, Tuple3<String,
>> String, String>>() {*
>> *                    public Tuple3<String, String, String>
>> getKey(Monitoring mon) throws Exception {*
>> *                        return new Tuple3<String, String,
>> String>(mon.getComponent(), mon.getInstance(), mon.getOperation());*
>> *                    }});*
>>
>> *        WindowedStream<Monitoring, Tuple3<String, String, String>,
>> TimeWindow> enrichedComponentInstanceStream1Win =
>> enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));*
>>
>> *        DataStream<MonitoringGrouping> enrichedComponentInstanceStream1
>> = enrichedComponentInstanceStream1Win*
>> *                .process(new Window5SecProcessing(gameId, FIVE_SECONDS,
>> COMPONENT_INSTANCE_OPERATION))*
>> *                .uid("Component Instance Operation Key Monitoring " +
>> FIVE_SECONDS);*
>> *enrichedComponentInstanceStream1.addSink(new
>> SinkFunction<MonitoringGrouping>() {*
>> *            @Override*
>> *            public void invoke(MonitoringGrouping mg, Context context)
>> throws Exception {*
>> *                //TODO call ES*
>> *                logger.debug("In enrichedComponentInstanceStream1 Sink
>> received mg:{}", mg);*
>> *            }*
>> *        });*
>> *Window processing class*:
>> private static class Window5SecProcessing extends
>> ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String,
>> String, String>, TimeWindow> {
>> private transient Histogram fiveSecHist;
>>         private transient Histogram fiveMinHist;
>>         private transient org.apache.flink.metrics.Histogram
>> fiveSecHistogram;
>>         private transient org.apache.flink.metrics.Histogram
>> fiveMinHistogram;
>>         private transient ValueState<Long> total5SecCountState;
>>         private transient ValueStateDescriptor<Long>
>> total5SecCountValueStateDescriptor;
>>
>>         public Window5SecProcessing(String gameId, String interval,
>> String keyType) {
>>             ...
>>         }
>>
>>         public void open(Configuration parameters) throws Exception {
>>             super.open(parameters);
>>             logger.debug("Window5SecProcessing -Entered open -
>> parameters:{}", parameters);//gets here
>>             com.codahale.metrics.Histogram fiveSecHist =
>>                     new com.codahale.metrics.Histogram(new
>> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
>>             this.fiveSecHistogram = new
>> DropwizardHistogramWrapper(fiveSecHist);
>>             total5SecCountValueStateDescriptor =
>>                     new ValueStateDescriptor<Long>("total5SecCount",
>> Long.class, 0L);
>>             total5SecCountState =
>> getRuntimeContext().getState(total5SecCountValueStateDescriptor);
>>         }
>> ......
>>
>>        * public void process(Tuple3<String, String, String> currentKey1,
>> Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out)
>> throws Exception {*
>> *            logger.debug("Window5SecProcessing - Entered process
>> ");//never gets here*
>> *            Tuple3<String, String, String> currentKey = (Tuple3<String,
>> String, String>) currentKey1;*
>> *            ....*
>> *        }*
>>
>>     }
>> At 1 point in the logs, I seem to see that there are no slots available
>> ????? Is that the problem- how can I fix that if that is the case to test
>> locally on my Mac ??
>> *Log:*
>> flink-akka.actor.default-dispatcher-71 DEBUG
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Slot Pool
>> Status:
>> status: connected to
>> akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
>> registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
>> *available slots: []*
>> allocated slots: [[AllocatedSlot
>> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
>> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
>> pending requests: []
>> sharing groups: {
>> -------- 5a0ae59368145d715b3cc0d39ba6c05a --------
>> {
>> groupId=5a0ae59368145d715b3cc0d39ba6c05a
>> unresolved={}
>> resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost
>> (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f},
>> allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584},
>> groupId=null, physicalSlot=AllocatedSlot
>> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
>> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0,
>> children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>> group=8587a27f4c92252839400ce17054b261},
>> SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>> group=a43726daeecb466da4d91c7b1adefb1d}]}]}
>> all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>> group=8587a27f4c92252839400ce17054b261},
>> SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f},
>> allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584},
>> groupId=null, physicalSlot=AllocatedSlot
>> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
>> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0,
>> children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
>> group=8587a27f4c92252839400ce17054b261},
>> SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>> group=a43726daeecb466da4d91c7b1adefb1d}]},
>> SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
>> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
>> group=a43726daeecb466da4d91c7b1adefb1d}}
>> } }
>>
>> TIA,
>>
>>

Reply via email to