oops, the example query should actually be: SELECT table_1.a, table_1.b, table_2.c FROM table_1 LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
and duplicate results should actually be: Record(a = "data a 1", b = "data b 1", c = "data c 1") Record(a = "data a 1", b = "data b 1", c = null) Record(a = "data a 2", b = "data b 2", c = "data c 2") Record(a = "data a 2", b = "data b 2", c = null) On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hey all, > > I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is > reading from a few CSV files and joins some records across them into a > couple of data streams (yes, this could be a batch job won't get into why > we chose streams unless it's relevant). These joins are producing some > duplicate records, one with the joined field present and one with the > joined field as `null`, though this happens only ~25% of the time. Reading > the docs on joins[1], I thought this could be caused by too strict Idle > State Retention[2], so I increased that to min, max (15min, 24h) but that > doesn't seem to have an effect, and the problem still occurs when testing > on a subset of data that finishes processing in under a minute. > > The query roughly looks like: > > table_1 has fields a, b > table_2 has fields b, c > > SELECT table_1.a, table_1.b, table_1.c > FROM table_1 > LEFT OUTER JOIN table_2 ON table_1.b = table_2.b; > > Correct result: > Record(a = "data a 1", b = "data b 1", c = "data c 1") > Record(a = "data a 2", b = "data b 2", c = "data c 2") > > Results seem to be anywhere between all possible dups and the correct > result. > > Record(a = "data a 1", b = "data b 1", c = "data c 1") > Record(a = "data a 1", b = null, c = "data c 1") > Record(a = "data a 2", b = "data b 2", c = "data c 2") > Record(a = "data a 2", b = null, c = "data c 2") > > The CSV files are registered as Flink Tables with the following: > > tableEnv.connect( > new FileSystem() > .path(path) > ) > .withFormat( > new Csv() > .quoteCharacter('"') > .ignoreParseErrors() > ) > .withSchema(schema) > .inAppendMode() > .createTemporaryTable(tableName); > > > I'm creating my table environment like so: > > EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .build(); > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > tableEnvSettings); > > TableConfig tConfig = tEnv.getConfig(); > tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24)); > > > Is there something I'm misconfiguring or have misunderstood the docs? > > Thanks, > Austin > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins > [2]: > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time >