Hi, exactly. You have to make sure that you can write data for the same ID multiple times. Exactly once in Flink is only guaranteed for registered state. So if you have a flatMap() with a "counter" variable, that is held in a "ValueState", this counter will always be in sync with the number of elements in the kafka topic (because the counter is reset on a failure).
On Tue, Feb 21, 2017 at 4:04 PM, Y. Sakamoto <phonypian...@gmail.com> wrote: > Thank you for your reply. > > Under my understanding, Map / Filter Function operate with "at least once" > when a failure occurs, and it is necessary to code that it will be saved > (overwritten) in Elasticsearch with the same ID even if double data comes. > Is it correct? > (sorry, I cannot understand how to "write changes to Flink's state to > Elastic") > > Regards, > Yuichiro > > > On 2017/02/21 3:56, Stephan Ewen wrote: > >> Hi! >> >> Exactly-once end-to-end requires sinks that support that kind of behavior >> (typically some form of transactions support). >> >> Kafka currently does not have the mechanisms in place to support >> exactly-once sinks, but the Kafka project is working on that feature. >> For ElasticSearch, it is also not simply possible (because of missing >> transactions), but you can use Flink's state as the "authorative" state (it >> is exactly once) and then write changes to Flink's state to Elastic. That >> way the writes to ElasticSearch become "idempotent", which means duplicates >> simple make no additional changes. >> >> Hope that helps! >> >> Stephan >> >> >> >> >> On Mon, Feb 20, 2017 at 5:53 PM, Y. Sakamoto <phonypian...@gmail.com >> <mailto:phonypian...@gmail.com>> wrote: >> >> Hi, >> I'm using Flink 1.2.0 and try to do "exactly once" data transfer >> from Kafka to Elasticsearch, but I cannot. >> (Scala 2.11, Kafka 0.10, without YARN) >> >> There are 2 Flink TaskManager nodes, and when processing >> with 2 parallelism, shutdown one of them (simulating node failure). >> >> Using flink-connector-kafka, I wrote following code: >> >> StreamExecutionEnvironment env = StreamExecutionEnvironment >> .getExecutionEnvironment(); >> env.enableCheckpointing(1000L); >> env.setParallelism(2); >> >> Properties kafkaProp = new Properties(); >> kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092 < >> http://192.168.97.42:9092>"); >> kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181 < >> http://192.168.97.42:2181>"); >> kafkaProp.setProperty("group.id <http://group.id>", "id"); >> >> DataStream<String> stream = env.addSource(new >> FlinkKafkaConsumer010<>( >> "topic", new SimpleStringSchema(), kafkaProp)); >> >> I found duplicated data transfer on map function. >> Data from the checkpoint before node failure seems duplicated. >> >> Is there any way to achieve "exactly once" on failure? >> >> >> Thanks. >> Yuichiro >> >> >> > > -- > ☆ ─────────────── ─ ─ - - > Yuichiro SAKAMOTO > ks...@muc.biglobe.ne.jp > phonypian...@gmail.com > http://phonypianist.sakura.ne.jp > >