Shengkai Fang created FLINK-22969: ------------------------------------- Summary: Validate the topic is not null or empty string when create kafka source/sink function Key: FLINK-22969 URL: Project: Flink Issue Type: Bug Reporter: Shengkai Fang
Add test in UpsertKafkaTableITCase {code:java} @Test public void testSourceSinkWithKeyAndPartialValue() throws Exception { // we always use a different topic name for each parameterized topic, // in order to make sure the topic can be created. final String topic = "key_partial_value_topic_" + format; createTestTopic(topic, 1, 1); // use single partition to guarantee orders in tests // ---------- Produce an event time stream into Kafka ------------------- String bootstraps = standardProps.getProperty("bootstrap.servers"); // k_user_id and user_id have different data types to verify the correct mapping, // fields are reordered on purpose final String createTable = String.format( "CREATE TABLE upsert_kafka (\n" + " `k_user_id` BIGINT,\n" + " `name` STRING,\n" + " `timestamp` TIMESTAMP(3) METADATA,\n" + " `k_event_id` BIGINT,\n" + " `user_id` INT,\n" + " `payload` STRING,\n" + " PRIMARY KEY (k_event_id, k_user_id) NOT ENFORCED" + ") WITH (\n" + " 'connector' = 'upsert-kafka',\n" + " 'topic' = '%s',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'key.format' = '%s',\n" + " 'key.fields-prefix' = 'k_',\n" + " 'value.format' = '%s',\n" + " 'value.fields-include' = 'EXCEPT_KEY'\n" + ")", "", bootstraps, format, format); tEnv.executeSql(createTable); String initialValues = "INSERT INTO upsert_kafka\n" + "VALUES\n" + " (1, 'name 1', TIMESTAMP '2020-03-08 13:12:11.123', 100, 41, 'payload 1'),\n" + " (2, 'name 2', TIMESTAMP '2020-03-09 13:12:11.123', 101, 42, 'payload 2'),\n" + " (3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 43, 'payload 3'),\n" + " (2, 'name 2', TIMESTAMP '2020-03-11 13:12:11.123', 101, 42, 'payload')"; tEnv.executeSql(initialValues).await(); // ---------- Consume stream from Kafka ------------------- final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM upsert_kafka"), 5); final List<Row> expected = Arrays.asList( changelogRow( "+I", 1L, "name 1", LocalDateTime.parse("2020-03-08T13:12:11.123"), 100L, 41, "payload 1"), changelogRow( "+I", 2L, "name 2", LocalDateTime.parse("2020-03-09T13:12:11.123"), 101L, 42, "payload 2"), changelogRow( "+I", 3L, "name 3", LocalDateTime.parse("2020-03-10T13:12:11.123"), 102L, 43, "payload 3"), changelogRow( "-U", 2L, "name 2", LocalDateTime.parse("2020-03-09T13:12:11.123"), 101L, 42, "payload 2"), changelogRow( "+U", 2L, "name 2", LocalDateTime.parse("2020-03-11T13:12:11.123"), 101L, 42, "payload")); assertThat(result, deepEqualTo(expected, true)); // ------------- cleanup ------------------- deleteTestTopic(topic); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)