Hi,
Any help is appreciated.Dug into this. *I can see the deserialized output
log from FlinkKinesisConsumer deserialization but it keeps looping to pull
from Kinesis Stream but never gets into the Windowing operation for
process() or apply().*

FlinkKinesisConsumer seems to be stuck in a loop calling a Kinesis Stream
and the deserialized output never seems to get into the apply() or
process() method of a Windowing operation. I can see the logs of
MonitoringMapKinesisSchema deserializing data back successfully from
Kinesis and converting into a POJO.

Code:

*//Create environment*:
StreamExecutionEnvironment env;
if (local) {
    Configuration configuration = new Configuration();
    configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
    env = StreamExecutionEnvironment.createLocalEnvironment(1,
configuration);
} else {
    env = StreamExecutionEnvironment.getExecutionEnvironment();
}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
*//create FlinkKinesisConsumer*
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
"AUTO");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
"10000");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
"2000");
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON");
FlinkKinesisConsumer kinesisConsumer = new FlinkKinesisConsumer<>(
        kinesisTopicRead, new MonitoringMapKinesisSchema(),
kinesisConsumerConfig);*//deserialization works fine*
DataStream<Monitoring> kinesisStream = env
                .addSource(kinesisConsumer);
KeyedStream<Monitoring, Tuple3<String, String, String>>
enrichedComponentInstanceStream1Key = kinesisStream
                .keyBy(new KeySelector<Monitoring, Tuple3<String, String,
String>>() {
                    public Tuple3<String, String, String> getKey(Monitoring
mon) throws Exception {
                        return new Tuple3<String, String,
String>(mon.getComponent(), mon.getInstance(), mon.getOperation());
                    }
                });

WindowedStream<Monitoring, Tuple3<String, String, String>, TimeWindow>
enrichedComponentInstanceStream1Win = enrichedComponentInstanceStream1Key

.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));

DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 =
enrichedComponentInstanceStream1Win
                //.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
COMPONENT_INSTANCE_OPERATION))
                .process(new Window5SecProcessing());*//never gets in here*
//Gets into Window5SecProcessing.open() method during initialization but
never into the process method ????????
private static class Window5SecProcessing extends
ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String,
String, String>, TimeWindow> {

        private transient String interval;
        private transient String gameId;
        private transient String keyType;
        private transient org.apache.flink.metrics.Histogram
fiveSecHistogram;

        private transient ValueState<Long> total5SecCountState;
        private transient ValueStateDescriptor<Long>
total5SecCountValueStateDescriptor;
        public Window5SecProcessing() {

        }

        public Window5SecProcessing(String gameId, String interval, String
keyType) {
            this.gameId = gameId;
            this.interval = interval;
            this.keyType = keyType;
        }

        @Override
        public void clear(Context context) throws Exception {
            super.clear(context);
            KeyedStateStore keyedStateStore = context.windowState();

keyedStateStore.getState(total5SecCountValueStateDescriptor).clear();
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logger.debug("Gets in here fine -Window5SecProcessing -Entered
open - parameters:{}", parameters);
            com.codahale.metrics.Histogram fiveSecHist =
                    new com.codahale.metrics.Histogram(new
SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
            this.fiveSecHistogram = new
DropwizardHistogramWrapper(fiveSecHist);
            total5SecCountValueStateDescriptor =
                    new ValueStateDescriptor<Long>("total5SecCount",
Long.class, 0L);
            total5SecCountState =
getRuntimeContext().getState(total5SecCountValueStateDescriptor);
        }


        public void process(Tuple3<String, String, String> currentKey1,
Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out)
throws Exception {
            logger.debug("@@never gets here@@Window5SecProcessing - Entered
process ");//
...
}




On Mon, Nov 5, 2018 at 4:10 PM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Hi,
> Running in IntelliJ IDE on a Mac with 4 vProcessors.
> Code compiles fine. It never gets into the Window5SecProcessing's
> process().I am able to get data from the Kinesis Consumer and it is
> deserialized properly when I debug the code. It gets into the
> Window5SecProcessing.open() method for initialization.
>
> Not sure if I am failing with no slots available ???
> In main():
> ........ //trimmed a lot of code
> *FlinkKinesisConsumer<Monitoring> kinesisConsumer =
> getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ...,
> ...);*
>
> *DataStream<Monitoring> kinesisStream = env*
> *                .addSource(kinesisConsumer)*
> *                .uid(jobName + "KinesisSource");*
> *KeyedStream<Monitoring, Tuple3<String, String, String>>
> enrichedComponentInstanceStream1Key = kinesisStream*
> *                .keyBy(new KeySelector<Monitoring, Tuple3<String, String,
> String>>() {*
> *                    public Tuple3<String, String, String>
> getKey(Monitoring mon) throws Exception {*
> *                        return new Tuple3<String, String,
> String>(mon.getComponent(), mon.getInstance(), mon.getOperation());*
> *                    }});*
>
> *        WindowedStream<Monitoring, Tuple3<String, String, String>,
> TimeWindow> enrichedComponentInstanceStream1Win =
> enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));*
>
> *        DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 =
> enrichedComponentInstanceStream1Win*
> *                .process(new Window5SecProcessing(gameId, FIVE_SECONDS,
> COMPONENT_INSTANCE_OPERATION))*
> *                .uid("Component Instance Operation Key Monitoring " +
> FIVE_SECONDS);*
> *enrichedComponentInstanceStream1.addSink(new
> SinkFunction<MonitoringGrouping>() {*
> *            @Override*
> *            public void invoke(MonitoringGrouping mg, Context context)
> throws Exception {*
> *                //TODO call ES*
> *                logger.debug("In enrichedComponentInstanceStream1 Sink
> received mg:{}", mg);*
> *            }*
> *        });*
> *Window processing class*:
> private static class Window5SecProcessing extends
> ProcessWindowFunction<Monitoring, MonitoringGrouping, Tuple3<String,
> String, String>, TimeWindow> {
> private transient Histogram fiveSecHist;
>         private transient Histogram fiveMinHist;
>         private transient org.apache.flink.metrics.Histogram
> fiveSecHistogram;
>         private transient org.apache.flink.metrics.Histogram
> fiveMinHistogram;
>         private transient ValueState<Long> total5SecCountState;
>         private transient ValueStateDescriptor<Long>
> total5SecCountValueStateDescriptor;
>
>         public Window5SecProcessing(String gameId, String interval, String
> keyType) {
>             ...
>         }
>
>         public void open(Configuration parameters) throws Exception {
>             super.open(parameters);
>             logger.debug("Window5SecProcessing -Entered open -
> parameters:{}", parameters);//gets here
>             com.codahale.metrics.Histogram fiveSecHist =
>                     new com.codahale.metrics.Histogram(new
> SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
>             this.fiveSecHistogram = new
> DropwizardHistogramWrapper(fiveSecHist);
>             total5SecCountValueStateDescriptor =
>                     new ValueStateDescriptor<Long>("total5SecCount",
> Long.class, 0L);
>             total5SecCountState =
> getRuntimeContext().getState(total5SecCountValueStateDescriptor);
>         }
> ......
>
>        * public void process(Tuple3<String, String, String> currentKey1,
> Context ctx, Iterable<Monitoring> input, Collector<MonitoringGrouping> out)
> throws Exception {*
> *            logger.debug("Window5SecProcessing - Entered process
> ");//never gets here*
> *            Tuple3<String, String, String> currentKey = (Tuple3<String,
> String, String>) currentKey1;*
> *            ....*
> *        }*
>
>     }
> At 1 point in the logs, I seem to see that there are no slots available
> ????? Is that the problem- how can I fix that if that is the case to test
> locally on my Mac ??
> *Log:*
> flink-akka.actor.default-dispatcher-71 DEBUG
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Slot Pool
> Status:
> status: connected to
> akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
> registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
> *available slots: []*
> allocated slots: [[AllocatedSlot
> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
> pending requests: []
> sharing groups: {
> -------- 5a0ae59368145d715b3cc0d39ba6c05a --------
> {
> groupId=5a0ae59368145d715b3cc0d39ba6c05a
> unresolved={}
> resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost
> (dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f},
> allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584},
> groupId=null, physicalSlot=AllocatedSlot
> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0,
> children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
> group=8587a27f4c92252839400ce17054b261},
> SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
> group=a43726daeecb466da4d91c7b1adefb1d}]}]}
> all={SlotRequestId{a3176498368d1123639f3ee94a9798b6}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
> group=8587a27f4c92252839400ce17054b261},
> SlotRequestId{a5fd4a1b7478661f62350df3bea3695f}=MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f},
> allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584},
> groupId=null, physicalSlot=AllocatedSlot
> AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
> 52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0,
> children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
> request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
> group=8587a27f4c92252839400ce17054b261},
> SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
> group=a43726daeecb466da4d91c7b1adefb1d}]},
> SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9}=SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
> allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
> request=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
> group=a43726daeecb466da4d91c7b1adefb1d}}
> } }
>
> TIA,
>
>

Reply via email to