lajith created FLINK-38580:
------------------------------

             Summary: Inconsistent data between Table API and DataStream API 
when using datagen connector
                 Key: FLINK-38580
                 URL: https://issues.apache.org/jira/browse/FLINK-38580
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.20.2
            Reporter: lajith


When using the datagen connector with Table API and then converting to 
DataStream API using toDataStream(), the data values change unexpectedly. This 
inconsistency only occurs with the datagen connector and not with other 
connectors like Kafka.

Steps to Reproduce:

1. Create a Table using SQL with the datagen connector
2. Execute the Table and observe the generated data
3. Convert the same Table to DataStream using toDataStream()
4. Execute the DataStream and observe the data
5. Notice that the data values are different between steps 2 and 4

Sample Code :
{noformat}
        // Step 1: Set up environments
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // Step 2: Create Datagen source table
        tableEnv.executeSql(    "CREATE TABLE datagen_source (" +    "  id 
INT," +    "  name STRING" +    ") WITH " +
                "(" +    "  'connector' = 'datagen'," +    "  'rows-per-second' 
= '5'," +    "  'fields.id.kind' = 'sequence'," +
                "  'fields.id.start' = '1'," +    "  'fields.id.end' = '10'," + 
   "  'fields.name.length' = '10'" +    ")");

        // Step 3: Apply SQL
        Table table = tableEnv.sqlQuery("SELECT * FROM datagen_source");
        TableResult result = table.execute();
        // Step 4: print table.
        result.print();

        // Step 5: Convert to DataStream
        DataStream<Row> dataStream = tableEnv.toDataStream(table);

        // Step 6: Print DataStream (after conversion)
        dataStream.print("After Conversion");

        // Step 7: Execute the job
        env.execute("Datagen SQL to DataStream Example");{noformat}

Observed Behavior:


{noformat}
Table results
+----+-------------+--------------------------------+
| op |          id |                           name |
+----+-------------+--------------------------------+
| +I |           1 |                     0deb105a16 |
| +I |           2 |                     badce9aae9 |
| +I |           3 |                     e47ab6e948 |
| +I |           4 |                     ce6ff2d766 |
| +I |           5 |                     375c6d4d80 |
| +I |           6 |                     32db6d87b0 |
| +I |           7 |                     9268b4b5cd |
| +I |           8 |                     026b54358c |
| +I |           9 |                     446cb47153 |
| +I |          10 |                     bc7950df1a |
+----+-------------+--------------------------------+

After conversion to DataStream. 
10 rows in set
After Conversion:1> +I[1, 63faf42e44]
After Conversion:2> +I[2, 6d5e0b4e5c]
After Conversion:3> +I[3, 53ecd6883a]
After Conversion:4> +I[4, 55eb56b2bf]
After Conversion:6> +I[6, 6d5feb43b2]
After Conversion:5> +I[5, 70daedf4f5]
After Conversion:7> +I[7, c1ee369d41]
After Conversion:8> +I[8, 14cbcf03d8]
After Conversion:9> +I[9, 707854f4ed]
After Conversion:10> +I[10, 86cf8d8c6b] {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to