[ https://issues.apache.org/jira/browse/FLINK-23850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409456#comment-17409456 ]
Matthias commented on FLINK-23850: ---------------------------------- Another test run was performed with a simpler job testing {{exactly-once}} semantics using the normal {{kafka}} source and sink. {code:java} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20, 2000)); env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); // env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); env.setParallelism(6); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.createTable("T1", TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("pk", DataTypes.STRING().notNull()) .column("x", DataTypes.STRING().notNull()) .build()) .option("topic", "flink-23850-in1") .option("properties.bootstrap.servers", FLINK23850Utils.BOOTSTRAP_SERVERS) .option("value.format", "csv") .option("scan.startup.mode", "earliest-offset") .build()); final Table resultTable = tableEnv.sqlQuery( "SELECT " + "T1.pk, " + "'asd', " + "'foo', " + "'bar' " + "FROM T1"); tableEnv.createTable("T4", TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("pk", DataTypes.STRING().notNull()) .column("some_calculated_value", DataTypes.STRING()) .column("pk1", DataTypes.STRING()) .column("pk2", DataTypes.STRING()) .build()) .option("topic", "flink-23850-out") .option("properties.bootstrap.servers", FLINK23850Utils.BOOTSTRAP_SERVERS) .option("value.format", "csv") .option("sink.delivery-guarantee", "exactly-once") .option("sink.transactional-id-prefix", "flink-23850") .option("scan.startup.mode", "earliest-offset") .build()); resultTable.executeInsert("T4"); {code} {{sink.delivery-guarantee}} results in nothing being written into the output stream. I verified it by reading the output topic with consumer isolation level being set to {{read_committed}}: {code} ./kafka_2.12-2.4.1/bin/kafka-console-consumer.sh --topic flink-23850-out --from-beginning --bootstrap-server localhost:9092 --consumer-property isolation.level=read_committed {code} Disabling {{sink.delivery-guarantee}} makes the job output data as expected. > Test Kafka table connector with new runtime provider > ---------------------------------------------------- > > Key: FLINK-23850 > URL: https://issues.apache.org/jira/browse/FLINK-23850 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka > Affects Versions: 1.14.0 > Reporter: Qingsheng Ren > Assignee: Matthias > Priority: Blocker > Labels: release-testing > Fix For: 1.14.0 > > > The runtime provider of Kafka table connector has been replaced with new > KafkaSource and KafkaSink. The table connector requires to be tested to make > sure nothing is surprised to Table/SQL API users. -- This message was sent by Atlassian Jira (v8.3.4#803005)