Hi Hangxiang,

Thanks for providing me the steps to check if the checkpointing is getting
triggered on failure recovery. I will follow them and respond back in case
of any issues.

Thanks,
Elakiya

On Sat, Sep 30, 2023 at 2:34 PM Hangxiang Yu <master...@gmail.com> wrote:

> Hi, Elakiya.
>
> I think you could check :
>
>    1. The TaskManager Log to figure out whether the job is restoring from
>    an existing checkpoint and the restoring checkpoint path.
>    2. Or you could check the checkpoint ID when you restart your job (If
>    not restoring from a checkpoint, it starts from 1).
>    3. Or you could check the checkpoint part in the FLINK UI, it will
>    show the detailed info about restoring checkpoint (ID and Path).
>
>
> On Thu, Sep 28, 2023 at 9:21 PM elakiya udhayanan <laks....@gmail.com>
> wrote:
>
>> Hi Feng,
>>
>> Thanks for your response.
>>
>> 1. We have configured checkpointing to upload to a s3 location, also we
>> see metadata files getting created in the s3 location. But we are unsure if
>> the job is getting triggered from that checkpoint in case of failure. Is
>> there a possible way to test this. Also does this apply for any upgrades or
>> enhancements to the job or how we can commit the offset in such cases.
>> 2. For the savepointing, we are currently exploring it.
>> 3. I would like to know if there are any properties that Flink provides
>> to do the Kafka offset commit.
>>
>> Thanks,
>> Elakiya
>>
>> On Thu, Sep 28, 2023 at 3:10 PM Feng Jin <jinfeng1...@gmail.com> wrote:
>>
>>> Hi Elakiya
>>>
>>> 1. You can confirm if the checkpoint for the task has been triggered
>>> normally?
>>>
>>> 2. Also, If you stop the job, you need to use "STOP WITH SAVEPOINT" and
>>> specify the path to the savepoint when starting the Flink job for recovery.
>>> This is necessary to continue consuming from the historical offset
>>> correctly.
>>>
>>>
>>> Best,
>>> Feng
>>>
>>>
>>> On Thu, Sep 28, 2023 at 4:41 PM elakiya udhayanan <laks....@gmail.com>
>>> wrote:
>>>
>>>> Hi team,
>>>>
>>>> I have a Kafka topic named employee which uses confluent avro schema
>>>> and will emit the payload as below:
>>>> {
>>>> "id": "emp_123456",
>>>> "employee": {
>>>> "id": "123456",
>>>> "name": "sampleName"
>>>> }
>>>> }
>>>> I am using the upsert-kafka connector to consume the events from the
>>>> above Kafka topic as below using the Flink SQL DDL statement.The problem is
>>>> the connector is not committing the offset. Everytime, I submit the job, it
>>>> reads Kafka events from the beginning. Please let me know if we can commit
>>>> the offset for the read Kafka events.
>>>>
>>>> DDL Statement:
>>>> String statement = "CREATE TABLE Employee (\r\n" +
>>>> "  id STRING,\r\n" +
>>>> "  employee  ROW(id STRING, name STRING\r\n" +
>>>> "  ),\r\n" +
>>>> "  PRIMARY KEY (i <http://employee.id/>d) NOT ENFORCED\r\n" +
>>>> ") WITH (\r\n" +
>>>> "  'connector' = 'upsert-kafka',\r\n" +
>>>> "  'topic' = 'employee',\r\n" +
>>>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>>>> "  'key.format' = 'raw',\r\n" +
>>>> "  'value.format' = 'avro-confluent',\r\n" +
>>>> "  'value.avro-confluent.url' = 
>>>> 'http://kafka-cp-schema-registry:8081',\r\n"
>>>> +
>>>> ")";
>>>> Any help is appreciated TIA
>>>>
>>>> Thanks,
>>>> Elakiya
>>>>
>>>
>
> --
> Best,
> Hangxiang.
>

Reply via email to