lajith created FLINK-38580:
------------------------------
Summary: Inconsistent data between Table API and DataStream API
when using datagen connector
Key: FLINK-38580
URL: https://issues.apache.org/jira/browse/FLINK-38580
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.20.2
Reporter: lajith
When using the datagen connector with Table API and then converting to
DataStream API using toDataStream(), the data values change unexpectedly. This
inconsistency only occurs with the datagen connector and not with other
connectors like Kafka.
Steps to Reproduce:
1. Create a Table using SQL with the datagen connector
2. Execute the Table and observe the generated data
3. Convert the same Table to DataStream using toDataStream()
4. Execute the DataStream and observe the data
5. Notice that the data values are different between steps 2 and 4
Sample Code :
{noformat}
// Step 1: Set up environments
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Step 2: Create Datagen source table
tableEnv.executeSql( "CREATE TABLE datagen_source (" + " id
INT," + " name STRING" + ") WITH " +
"(" + " 'connector' = 'datagen'," + " 'rows-per-second'
= '5'," + " 'fields.id.kind' = 'sequence'," +
" 'fields.id.start' = '1'," + " 'fields.id.end' = '10'," +
" 'fields.name.length' = '10'" + ")");
// Step 3: Apply SQL
Table table = tableEnv.sqlQuery("SELECT * FROM datagen_source");
TableResult result = table.execute();
// Step 4: print table.
result.print();
// Step 5: Convert to DataStream
DataStream<Row> dataStream = tableEnv.toDataStream(table);
// Step 6: Print DataStream (after conversion)
dataStream.print("After Conversion");
// Step 7: Execute the job
env.execute("Datagen SQL to DataStream Example");{noformat}
Observed Behavior:
{noformat}
Table results
+----+-------------+--------------------------------+
| op | id | name |
+----+-------------+--------------------------------+
| +I | 1 | 0deb105a16 |
| +I | 2 | badce9aae9 |
| +I | 3 | e47ab6e948 |
| +I | 4 | ce6ff2d766 |
| +I | 5 | 375c6d4d80 |
| +I | 6 | 32db6d87b0 |
| +I | 7 | 9268b4b5cd |
| +I | 8 | 026b54358c |
| +I | 9 | 446cb47153 |
| +I | 10 | bc7950df1a |
+----+-------------+--------------------------------+
After conversion to DataStream.
10 rows in set
After Conversion:1> +I[1, 63faf42e44]
After Conversion:2> +I[2, 6d5e0b4e5c]
After Conversion:3> +I[3, 53ecd6883a]
After Conversion:4> +I[4, 55eb56b2bf]
After Conversion:6> +I[6, 6d5feb43b2]
After Conversion:5> +I[5, 70daedf4f5]
After Conversion:7> +I[7, c1ee369d41]
After Conversion:8> +I[8, 14cbcf03d8]
After Conversion:9> +I[9, 707854f4ed]
After Conversion:10> +I[10, 86cf8d8c6b] {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)