Ah, I think the "Result Updating" is what got me -- INNER joins do the job!
On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > oops, the example query should actually be: > > SELECT table_1.a, table_1.b, table_2.c > FROM table_1 > LEFT OUTER JOIN table_2 ON table_1.b = table_2.b; > > and duplicate results should actually be: > > Record(a = "data a 1", b = "data b 1", c = "data c 1") > Record(a = "data a 1", b = "data b 1", c = null) > Record(a = "data a 2", b = "data b 2", c = "data c 2") > Record(a = "data a 2", b = "data b 2", c = null) > > On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards < > austin.caw...@gmail.com> wrote: > >> Hey all, >> >> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is >> reading from a few CSV files and joins some records across them into a >> couple of data streams (yes, this could be a batch job won't get into why >> we chose streams unless it's relevant). These joins are producing some >> duplicate records, one with the joined field present and one with the >> joined field as `null`, though this happens only ~25% of the time. Reading >> the docs on joins[1], I thought this could be caused by too strict Idle >> State Retention[2], so I increased that to min, max (15min, 24h) but that >> doesn't seem to have an effect, and the problem still occurs when testing >> on a subset of data that finishes processing in under a minute. >> >> The query roughly looks like: >> >> table_1 has fields a, b >> table_2 has fields b, c >> >> SELECT table_1.a, table_1.b, table_1.c >> FROM table_1 >> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b; >> >> Correct result: >> Record(a = "data a 1", b = "data b 1", c = "data c 1") >> Record(a = "data a 2", b = "data b 2", c = "data c 2") >> >> Results seem to be anywhere between all possible dups and the correct >> result. >> >> Record(a = "data a 1", b = "data b 1", c = "data c 1") >> Record(a = "data a 1", b = null, c = "data c 1") >> Record(a = "data a 2", b = "data b 2", c = "data c 2") >> Record(a = "data a 2", b = null, c = "data c 2") >> >> The CSV files are registered as Flink Tables with the following: >> >> tableEnv.connect( >> new FileSystem() >> .path(path) >> ) >> .withFormat( >> new Csv() >> .quoteCharacter('"') >> .ignoreParseErrors() >> ) >> .withSchema(schema) >> .inAppendMode() >> .createTemporaryTable(tableName); >> >> >> I'm creating my table environment like so: >> >> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .build(); >> >> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >> tableEnvSettings); >> >> TableConfig tConfig = tEnv.getConfig(); >> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24)); >> >> >> Is there something I'm misconfiguring or have misunderstood the docs? >> >> Thanks, >> Austin >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins >> [2]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time >> >