Hey all, I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is reading from a few CSV files and joins some records across them into a couple of data streams (yes, this could be a batch job won't get into why we chose streams unless it's relevant). These joins are producing some duplicate records, one with the joined field present and one with the joined field as `null`, though this happens only ~25% of the time. Reading the docs on joins[1], I thought this could be caused by too strict Idle State Retention[2], so I increased that to min, max (15min, 24h) but that doesn't seem to have an effect, and the problem still occurs when testing on a subset of data that finishes processing in under a minute.
The query roughly looks like: table_1 has fields a, b table_2 has fields b, c SELECT table_1.a, table_1.b, table_1.c FROM table_1 LEFT OUTER JOIN table_2 ON table_1.b = table_2.b; Correct result: Record(a = "data a 1", b = "data b 1", c = "data c 1") Record(a = "data a 2", b = "data b 2", c = "data c 2") Results seem to be anywhere between all possible dups and the correct result. Record(a = "data a 1", b = "data b 1", c = "data c 1") Record(a = "data a 1", b = null, c = "data c 1") Record(a = "data a 2", b = "data b 2", c = "data c 2") Record(a = "data a 2", b = null, c = "data c 2") The CSV files are registered as Flink Tables with the following: tableEnv.connect( new FileSystem() .path(path) ) .withFormat( new Csv() .quoteCharacter('"') .ignoreParseErrors() ) .withSchema(schema) .inAppendMode() .createTemporaryTable(tableName); I'm creating my table environment like so: EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, tableEnvSettings); TableConfig tConfig = tEnv.getConfig(); tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24)); Is there something I'm misconfiguring or have misunderstood the docs? Thanks, Austin [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time