Hi Son

I think it might be because of not assigning operator ids to your Filter and 
Map functions, you could refer to [1] to assign ids to your application. 
Moreover, if you have ever removed some operators, please consider to add 
--allowNonRestoredState [2] option.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

Best
Yun Tang

________________________________
From: Son Mai <hongson1...@gmail.com>
Sent: Tuesday, March 26, 2019 9:51
To: Konstantin Knauf
Cc: user
Subject: Re: Reserving Kafka offset in Flink after modifying app

Hi Konstantin,

Thanks for the response. What still concerned me is:

  1.  Am I able to recover from  checkpoints even if I change my program (for 
example, changing Filter and Map functions, data objects, ..) ? I was not able 
to recover from savepoints when I changed my program.

On Mon, Mar 25, 2019 at 3:19 PM Konstantin Knauf 
<konstan...@ververica.com<mailto:konstan...@ververica.com>> wrote:
Hi Son,

yes, this is possible, but your sink needs to play its part in Flink's 
checkpointing mechanism. Depending on the implementation of the sink you should 
either:

* implemented CheckpointedFunction and flush all records to BigQuery in 
snapshotState. This way in case of a failure/restart of the job, all records up 
to the last successful checkpoint will have been written to BigQuery and all 
other records will be replayed.
* use managed operator state to store all pending records in the sink. Thereby 
they will be be persisted in snapshotState. This way in  case of a 
failure/restart of the job, all records up to the last successful checkpoint, 
which have not been written to BigQuery, will be restored in the sink, all 
other records will be replayed.

In both cases, you might write the same record to the BigQuery twice.

If in doubt if your sink fulfills the criteria above, feel free to share it.

Cheers,

Konstantin



On Mon, Mar 25, 2019 at 7:50 AM Son Mai 
<hongson1...@gmail.com<mailto:hongson1...@gmail.com>> wrote:
Hello,

I have a topic in Kafka that Flink reads from. I parse and write messages in 
this topic to BigQuery using streaming insert in batch of 500 messages using in 
CountWindow in Flink.

Problem: I want to commit manually only when a batch was written successfully 
to Bigquery.

Reason:
I saw that Flink KafkaConsumer does not use offset committing to Kafka but uses 
its own checkpointing. I don't know how Flink checkpointing works and I'm 
worried that Flink's checkpointing does not solve my following situation:
- let's say I have a Flink job running and processing a batch of 500 messages 
of Kafka offset 1000-1500.
- I stopped this job before it saves to BigQuery and makes some modifications 
to the program. Savepoints did not work when I tried because it required the 
operators code does not change.

What I want is when I start the modified app, it would start every time from 
offset 1000-1500 in Kafka because these messages have not been written to 
BigQuery.

Is there any way to achieve this in Flink?

Thanks,
SM


--

Konstantin Knauf | Solutions Architect

+49 160 91394525

[https://lh4.googleusercontent.com/1RRzA12SK12Xaowkag-W37QDs5LHrfw4R0tMwVNjKLDKoIu69ld1qtA2hSDn1LSJe9w2THG1A9igK_nXPrNeIqRF87FjbEQoBnZJJgyPXCkKPFYuYc_Vh419P9EOO36ERgdnX5wG]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply via email to