[ https://issues.apache.org/jira/browse/FLINK-22190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-22190: ----------------------------------- Labels: auto-deprioritized-major stale-minor (was: auto-deprioritized-major) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > no guarantee on Flink exactly_once sink to Kafka > ------------------------------------------------- > > Key: FLINK-22190 > URL: https://issues.apache.org/jira/browse/FLINK-22190 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.12.2 > Environment: *flink: 1.12.2* > *kafka: 2.7.0* > Reporter: Spongebob > Priority: Minor > Labels: auto-deprioritized-major, stale-minor > > When I tried to test the function of flink exactly_once sink to kafka, I > found it can not run as expectation. here's the pipline of the flink > applications: raw data(flink app0)-> kafka topic1 -> flink app1 -> kafka > topic2 -> flink app2, flink tasks may met / byZeroException in random. Below > shows the codes: > {code:java} > //代码占位符 > raw data, flink app0: > class SimpleSource1 extends SourceFunction[String] { > var switch = true > val students: Array[String] = Array("Tom", "Jerry", "Gory") > override def run(sourceContext: SourceFunction.SourceContext[String]): Unit > = { > var i = 0 > while (switch) { > sourceContext.collect(s"${students(Random.nextInt(students.length))},$i") > i += 1 > Thread.sleep(5000) > } > } > override def cancel(): Unit = switch = false > } > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > val dataStream = streamEnv.addSource(new SimpleSource1) > dataStream.addSink(new FlinkKafkaProducer[String]("xfy:9092", > "single-partition-topic-2", new SimpleStringSchema())) > streamEnv.execute("sink kafka") > > flink-app1: > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamEnv.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE) > val prop = new Properties() > prop.setProperty("bootstrap.servers", "xfy:9092") > prop.setProperty("group.id", "test") > val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String]( > "single-partition-topic-2", > new SimpleStringSchema, > prop > )) > val resultStream = dataStream.map(x => { > val data = x.split(",") > (data(0), data(1), data(1).toInt / Random.nextInt(5)).toString() > } > ) > resultStream.print().setParallelism(1) > val propProducer = new Properties() > propProducer.setProperty("bootstrap.servers", "xfy:9092") > propProducer.setProperty("transaction.timeout.ms", s"${1000 * 60 * 5}") > resultStream.addSink(new FlinkKafkaProducer[String]( > "single-partition-topic", > new MyKafkaSerializationSchema("single-partition-topic"), > propProducer, > Semantic.EXACTLY_ONCE)) > streamEnv.execute("sink kafka") > > flink-app2: > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > val prop = new Properties() > prop.setProperty("bootstrap.servers", "xfy:9092") > prop.setProperty("group.id", "test") > prop.setProperty("isolation_level", "read_committed") > val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String]( > "single-partition-topic", > new SimpleStringSchema, > prop > )) > dataStream.print().setParallelism(1) > streamEnv.execute("consumer kafka"){code} > > flink app1 will print some duplicate numbers, and to my expectation flink > app2 will deduplicate them but the fact shows not. -- This message was sent by Atlassian Jira (v8.20.1#820001)