Hello Gordon,

thank you for your help. I have set the discovery interval to 30 seconds and
just starting the job on a clean kinesalite service (I am running it inside
docker so every time the container gets stopped and removed to start from
scratch).

This is the output without actually any data in the stream:

11/16/2016 17:59:03     Source: Custom Source -> Sink: Unnamed(1/1) switched to
RUNNING 
17:59:04,673 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher 
- Subtask 0 will be seeded with initial shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694685205596999719397165301965297537316555774230530,}}'}, starting
state set as sequence number LATEST_SEQUENCE_NUM
17:59:04,674 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher 
- Subtask 0 will start consuming seeded shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694685205596999719397165301965297537316555774230530,}}'} from sequence
number LATEST_SEQUENCE_NUM with ShardConsumer 0
17:59:04,689 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher 
- Subtask 0 has discovered a new shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694685205596999719397165301965297537316555774230530,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 1
17:59:08,817 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 1 @ 1479315548815
17:59:08,835 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 1 (in 20 ms)
17:59:13,815 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 2 @ 1479315553815
17:59:13,817 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 2 (in 1 ms)
17:59:18,814 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 3 @ 1479315558814
17:59:18,815 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 3 (in 1 ms)
17:59:23,815 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 4 @ 1479315563815
17:59:23,816 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 4 (in 1 ms)
17:59:28,814 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 5 @ 1479315568813
17:59:28,814 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 5 (in 1 ms)
17:59:33,814 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 6 @ 1479315573814
17:59:33,815 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 6 (in 1 ms)
17:59:34,704 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher 
- Subtask 0 has discovered a new shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694685205596999719397165301965297537316555774230530,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 2

I then restarted the kinesalite container and posted a message to the stream
before the 30 second mark occurred. The output shows that the job consumes
from the 2 shards discovered initially (I initialized kinsalite with one
shard only) right away and then continues to consume for the new shards to
be discovered whenever they appear in 30 second frequencies. (I am posting a
string to the stream but expect a JSON document in my job so the parsing
kind of fails but look for the json output I am writing by just doing a
job.print()):

Thanks
Philipp

11/16/2016 18:03:30     Source: Custom Source -> Sink: Unnamed(1/1) switched to
RUNNING 
18:03:30,832 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher 
- Subtask 0 will be seeded with initial shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694816512384728667706222664274486890894391754358786,}}'}, starting
state set as sequence number LATEST_SEQUENCE_NUM
18:03:30,833 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher 
- Subtask 0 will start consuming seeded shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694816512384728667706222664274486890894391754358786,}}'} from sequence
number LATEST_SEQUENCE_NUM with ShardConsumer 0
18:03:30,847 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher 
- Subtask 0 has discovered a new shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694816512384728667706222664274486890894391754358786,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 1
18:03:34,878 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
- class de.harvee.dataspa.flink.model.HarveeEventError is not a valid POJO
type
18:03:35,093 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 1 @ 1479315815091
{"domainId":null,"title":null,"description":null,"venue":null,"message":"could
not parse message"}
{"domainId":null,"title":null,"description":null,"venue":null,"message":"could
not parse message"}
18:03:35,191 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 1 (in 100 ms)
18:03:40,003 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 2 @ 1479315820003
18:03:40,005 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 2 (in 1 ms)
18:03:45,005 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 3 @ 1479315825005
18:03:45,006 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 3 (in 1 ms)
18:03:50,007 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 4 @ 1479315830007
18:03:50,007 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 4 (in 0 ms)
18:03:55,006 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 5 @ 1479315835006
18:03:55,007 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 5 (in 1 ms)
18:04:00,007 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Triggering checkpoint 6 @ 1479315840007
18:04:00,008 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
- Completed checkpoint 6 (in 1 ms)
18:04:00,861 INFO 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher 
- Subtask 0 has discovered a new shard
KinesisStreamShard{streamName='TestKinesisStream', shard='{ShardId:
shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49567694816512384728667706222664274486890894391754358786,}}'} due to
resharding, and will start consuming the shard from sequence number
EARLIEST_SEQUENCE_NUM with ShardConsumer 2
{"domainId":null,"title":null,"description":null,"venue":null,"message":"could
not parse message"}








--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-tp10133p10154.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to