tisonkun commented on code in PR #56:

@@ -0,0 +1,678 @@
+package org.apache.flink.connector.pulsar.table;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.connector.pulsar.table.testutils.TestingUser;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static 
+import static 
+import static org.apache.flink.util.CollectionUtil.entry;
+import static org.apache.flink.util.CollectionUtil.map;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+ * IT cases for the Pulsar table source and sink. It aims to verify runtime 
behaviour and certain
+ * use cases are correct and can produce/consume the desired records as user 
+ */
+public class PulsarTableITCase extends PulsarTableTestBase {
+    protected static final String JSON_FORMAT = "json";
+    protected static final String AVRO_FORMAT = "avro";
+    protected static final String CSV_FORMAT = "csv";
+    protected static final String RAW_FORMAT = "raw";
+    @ParameterizedTest
+    @ValueSource(strings = {JSON_FORMAT, AVRO_FORMAT, CSV_FORMAT})
+    void pulsarSourceSink(String format) throws Exception {
+        // we always use a different topic name for each parameterized topic,
+        // in order to make sure the topic can be created.
+        final String topic = "test_topic_" + format + randomAlphanumeric(3);
+        createTestTopic(topic, 1);
+        // ---------- Produce an event time stream into Pulsar 
+        String randomTableName = randomAlphabetic(5);
+        final String createTable =
+                String.format(
+                        "create table %s (\n"
+                                + "  `computed-price` as price + 1.0,\n"
+                                + "  price decimal(38, 18),\n"
+                                + "  currency string,\n"
+                                + "  log_date date,\n"
+                                + "  log_time time(3),\n"
+                                + "  log_ts timestamp(3),\n"
+                                + "  ts as log_ts + INTERVAL '1' SECOND,\n"
+                                + "  watermark for ts as ts\n"
+                                + ") with (\n"
+                                + "  'connector' = '%s',\n"
+                                + "  'topics' = '%s',\n"
+                                + "  'service-url' = '%s',\n"
+                                + "  'admin-url' = '%s',\n"
+                                + "  'format' = '%s'\n"
+                                + ")",
+                        randomTableName,
+                        PulsarTableFactory.IDENTIFIER,
+                        topic,
+                        pulsar.operator().serviceUrl(),
+                        pulsar.operator().adminUrl(),
+                        format);
+        tableEnv.executeSql(createTable);
+        String initialValues =
+                String.format(
+                        "INSERT INTO %s\n"
+                                + "SELECT CAST(price AS DECIMAL(10, 2)), 
currency, "
+                                + " CAST(d AS DATE), CAST(t AS TIME(0)), 
+                                + "FROM (VALUES (2.02,'Euro','2019-12-12', 
'00:00:01', '2019-12-12 00:00:01.001001'), \n"
+                                + "  (1.11,'US Dollar','2019-12-12', 
'00:00:02', '2019-12-12 00:00:02.002001'), \n"
+                                + "  (50,'Yen','2019-12-12', '00:00:03', 
'2019-12-12 00:00:03.004001'), \n"
+                                + "  (3.1,'Euro','2019-12-12', '00:00:04', 
'2019-12-12 00:00:04.005001'), \n"
+                                + "  (5.33,'US Dollar','2019-12-12', 
'00:00:05', '2019-12-12 00:00:05.006001'), \n"
+                                + "  (0,'DUMMY','2019-12-12', '00:00:10', 
'2019-12-12 00:00:10'))\n"
+                                + "  AS orders (price, currency, d, t, ts)",
+                        randomTableName);
+        tableEnv.executeSql(initialValues).await();
+        String query =
+                String.format(
+                        "SELECT\n"
+                                + "  CAST(TUMBLE_END(ts, INTERVAL '5' SECOND) 
+                                + "  CAST(MAX(log_date) AS VARCHAR),\n"
+                                + "  CAST(MAX(log_time) AS VARCHAR),\n"
+                                + "  CAST(MAX(ts) AS VARCHAR),\n"
+                                + "  COUNT(*),\n"
+                                + "  CAST(MAX(price) AS DECIMAL(10, 2))\n"
+                                + "FROM %s\n"
+                                + "GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)",
+                        randomTableName);
+        DataStream<Row> result = 
+        TestingSinkFunction sink = new TestingSinkFunction(2);
+        result.addSink(sink).setParallelism(1);
+        try {
+            env.execute("Job_2");
+        } catch (Throwable e) {
+            if (!isCausedByJobFinished(e)) {
+                // re-throw
+                throw e;
+            }
+        }
+        List<String> expected =
+                Arrays.asList(
+                        "+I[2019-12-12 00:00:05.000, 2019-12-12, 00:00:03, 
2019-12-12 00:00:04.004, 3, 50.00]",
+                        "+I[2019-12-12 00:00:10.000, 2019-12-12, 00:00:05, 
2019-12-12 00:00:06.006, 2, 5.33]");
+        assertThat(TestingSinkFunction.rows).isEqualTo(expected);
+    }
+    @ParameterizedTest
+    @ValueSource(strings = {JSON_FORMAT, AVRO_FORMAT, CSV_FORMAT})
+    void pulsarSourceSinkWithKeyAndPartialValue(String format) throws 
Exception {

Review Comment:
   This test still doesn't pass. But maybe we don't support this use case at 

