Hey All,

I am running flink in docker containers (image Tag
:flink:scala_2.11-java11) on EC2.

I am able to connect to a Kinesis Connector but nothing is being consumed.

My command to start Jobmanager and TaskManager :









*docker run \--rm \--volume /root/:/root/ \--env
JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
\--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\--name=jobmanager \--network flink-network \--publish 8081:8081
\flink:scala_2.11-java11 jobmanager &*









*docker run \--rm \--env
JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
\--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\--name=taskmanager_0 \--network flink-network \flink:scala_2.11-java11
taskmanager &*

2021-08-17 22:38:01,106 INFO org.apache.flink.streaming.connectors.kinesis.
FlinkKinesisConsumer [] -
Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='<My
Stream Name>', shard='{ShardId: shardId-000000000000,HashKeyRange:
{StartingHashKey: 0,EndingHashKey: 34028236692093846346337460743176821144}
,SequenceNumberRange: {StartingSequenceNumber:
49600280467722672235426674687631661510244124728928239618,}}'}, starting
state set as sequence number LATEST_SEQUENCE_NUM

&&& this for each shard Consumer

2021-08-17 22:38:01,107 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
[] - Subtask 0 will start consuming seeded shard
StreamShardHandle{streamName='web-clickstream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
34028236692093846346337460743176821144},SequenceNumberRange:
{StartingSequenceNumber:
49600280467722672235426674687631661510244124728928239618,}}'} from sequence
number LATEST_SEQUENCE_NUM with ShardConsumer 0

my program is simple to test out a DataStream from Kinesis

FlinkKinesisConsumer<String> kinesisConsumer =
        new FlinkKinesisConsumer<>(
                "<My-SteamName>", new SimpleStringSchema(),
getKafkaConsumerProperties());
env.addSource(kinesisConsumer).print();

env.execute("Read files in streaming fashion");

Other Facts:


   1. I can see data being flowing into our kinesis stream from the
   Monitoring Tab of AWS continuously.
   2. I was facing issues with Authorization of accessing the Kinesis in
   our AWS infra, but I resolved that by moving in the same security group of
   Kinesis deployment and creating a role with full access to Kinesis.


Any pointers are really appreciated!

Thanks,
Tarun

Reply via email to