Hi Hangxiang, Thanks for providing me the steps to check if the checkpointing is getting triggered on failure recovery. I will follow them and respond back in case of any issues.
Thanks, Elakiya On Sat, Sep 30, 2023 at 2:34 PM Hangxiang Yu <master...@gmail.com> wrote: > Hi, Elakiya. > > I think you could check : > > 1. The TaskManager Log to figure out whether the job is restoring from > an existing checkpoint and the restoring checkpoint path. > 2. Or you could check the checkpoint ID when you restart your job (If > not restoring from a checkpoint, it starts from 1). > 3. Or you could check the checkpoint part in the FLINK UI, it will > show the detailed info about restoring checkpoint (ID and Path). > > > On Thu, Sep 28, 2023 at 9:21 PM elakiya udhayanan <laks....@gmail.com> > wrote: > >> Hi Feng, >> >> Thanks for your response. >> >> 1. We have configured checkpointing to upload to a s3 location, also we >> see metadata files getting created in the s3 location. But we are unsure if >> the job is getting triggered from that checkpoint in case of failure. Is >> there a possible way to test this. Also does this apply for any upgrades or >> enhancements to the job or how we can commit the offset in such cases. >> 2. For the savepointing, we are currently exploring it. >> 3. I would like to know if there are any properties that Flink provides >> to do the Kafka offset commit. >> >> Thanks, >> Elakiya >> >> On Thu, Sep 28, 2023 at 3:10 PM Feng Jin <jinfeng1...@gmail.com> wrote: >> >>> Hi Elakiya >>> >>> 1. You can confirm if the checkpoint for the task has been triggered >>> normally? >>> >>> 2. Also, If you stop the job, you need to use "STOP WITH SAVEPOINT" and >>> specify the path to the savepoint when starting the Flink job for recovery. >>> This is necessary to continue consuming from the historical offset >>> correctly. >>> >>> >>> Best, >>> Feng >>> >>> >>> On Thu, Sep 28, 2023 at 4:41 PM elakiya udhayanan <laks....@gmail.com> >>> wrote: >>> >>>> Hi team, >>>> >>>> I have a Kafka topic named employee which uses confluent avro schema >>>> and will emit the payload as below: >>>> { >>>> "id": "emp_123456", >>>> "employee": { >>>> "id": "123456", >>>> "name": "sampleName" >>>> } >>>> } >>>> I am using the upsert-kafka connector to consume the events from the >>>> above Kafka topic as below using the Flink SQL DDL statement.The problem is >>>> the connector is not committing the offset. Everytime, I submit the job, it >>>> reads Kafka events from the beginning. Please let me know if we can commit >>>> the offset for the read Kafka events. >>>> >>>> DDL Statement: >>>> String statement = "CREATE TABLE Employee (\r\n" + >>>> " id STRING,\r\n" + >>>> " employee ROW(id STRING, name STRING\r\n" + >>>> " ),\r\n" + >>>> " PRIMARY KEY (i <http://employee.id/>d) NOT ENFORCED\r\n" + >>>> ") WITH (\r\n" + >>>> " 'connector' = 'upsert-kafka',\r\n" + >>>> " 'topic' = 'employee',\r\n" + >>>> " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + >>>> " 'key.format' = 'raw',\r\n" + >>>> " 'value.format' = 'avro-confluent',\r\n" + >>>> " 'value.avro-confluent.url' = >>>> 'http://kafka-cp-schema-registry:8081',\r\n" >>>> + >>>> ")"; >>>> Any help is appreciated TIA >>>> >>>> Thanks, >>>> Elakiya >>>> >>> > > -- > Best, > Hangxiang. >