[ 
https://issues.apache.org/jira/browse/FLINK-23850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409456#comment-17409456
 ] 

Matthias commented on FLINK-23850:
----------------------------------

Another test run was performed with a simpler job testing {{exactly-once}} 
semantics using the normal {{kafka}} source and sink.
{code:java}
 final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20, 2000));
env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.setParallelism(6);

final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.createTable("T1",
        TableDescriptor.forConnector("kafka")
                .schema(Schema.newBuilder()
                        .column("pk", DataTypes.STRING().notNull())
                        .column("x", DataTypes.STRING().notNull())
                        .build())
                .option("topic", "flink-23850-in1")
                .option("properties.bootstrap.servers", 
FLINK23850Utils.BOOTSTRAP_SERVERS)
                .option("value.format", "csv")
                .option("scan.startup.mode", "earliest-offset")
                .build());

final Table resultTable =
        tableEnv.sqlQuery(
                "SELECT "
                        + "T1.pk, "
                        + "'asd', "
                        + "'foo', "
                        + "'bar' "
                        + "FROM T1");

tableEnv.createTable("T4",
        TableDescriptor.forConnector("kafka")
                .schema(Schema.newBuilder()
                        .column("pk", DataTypes.STRING().notNull())
                        .column("some_calculated_value", DataTypes.STRING())
                        .column("pk1", DataTypes.STRING())
                        .column("pk2", DataTypes.STRING())
                        .build())
                .option("topic", "flink-23850-out")
                .option("properties.bootstrap.servers", 
FLINK23850Utils.BOOTSTRAP_SERVERS)
                .option("value.format", "csv")
                .option("sink.delivery-guarantee", "exactly-once")
                .option("sink.transactional-id-prefix", "flink-23850")
                .option("scan.startup.mode", "earliest-offset")
                .build());

resultTable.executeInsert("T4");
{code}

{{sink.delivery-guarantee}} results in nothing being written into the output 
stream. I verified it by reading the output topic with consumer isolation level 
being set to {{read_committed}}:
{code}
./kafka_2.12-2.4.1/bin/kafka-console-consumer.sh --topic flink-23850-out 
--from-beginning --bootstrap-server localhost:9092 --consumer-property 
isolation.level=read_committed
{code}

Disabling {{sink.delivery-guarantee}} makes the job output data as expected.

> Test Kafka table connector with new runtime provider
> ----------------------------------------------------
>
>                 Key: FLINK-23850
>                 URL: https://issues.apache.org/jira/browse/FLINK-23850
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>            Reporter: Qingsheng Ren
>            Assignee: Matthias
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.14.0
>
>
> The runtime provider of Kafka table connector has been replaced with new 
> KafkaSource and KafkaSink. The table connector requires to be tested to make 
> sure nothing is surprised to Table/SQL API users. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to