Hi Austin, Do I assume correctly, that you self-answered your question? If not, could you please update your current progress?
Best, Arvid On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Ah, I think the "Result Updating" is what got me -- INNER joins do the > job! > > On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards < > austin.caw...@gmail.com> wrote: > >> oops, the example query should actually be: >> >> SELECT table_1.a, table_1.b, table_2.c >> FROM table_1 >> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b; >> >> and duplicate results should actually be: >> >> Record(a = "data a 1", b = "data b 1", c = "data c 1") >> Record(a = "data a 1", b = "data b 1", c = null) >> Record(a = "data a 2", b = "data b 2", c = "data c 2") >> Record(a = "data a 2", b = "data b 2", c = null) >> >> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards < >> austin.caw...@gmail.com> wrote: >> >>> Hey all, >>> >>> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is >>> reading from a few CSV files and joins some records across them into a >>> couple of data streams (yes, this could be a batch job won't get into why >>> we chose streams unless it's relevant). These joins are producing some >>> duplicate records, one with the joined field present and one with the >>> joined field as `null`, though this happens only ~25% of the time. Reading >>> the docs on joins[1], I thought this could be caused by too strict Idle >>> State Retention[2], so I increased that to min, max (15min, 24h) but that >>> doesn't seem to have an effect, and the problem still occurs when testing >>> on a subset of data that finishes processing in under a minute. >>> >>> The query roughly looks like: >>> >>> table_1 has fields a, b >>> table_2 has fields b, c >>> >>> SELECT table_1.a, table_1.b, table_1.c >>> FROM table_1 >>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b; >>> >>> Correct result: >>> Record(a = "data a 1", b = "data b 1", c = "data c 1") >>> Record(a = "data a 2", b = "data b 2", c = "data c 2") >>> >>> Results seem to be anywhere between all possible dups and the correct >>> result. >>> >>> Record(a = "data a 1", b = "data b 1", c = "data c 1") >>> Record(a = "data a 1", b = null, c = "data c 1") >>> Record(a = "data a 2", b = "data b 2", c = "data c 2") >>> Record(a = "data a 2", b = null, c = "data c 2") >>> >>> The CSV files are registered as Flink Tables with the following: >>> >>> tableEnv.connect( >>> new FileSystem() >>> .path(path) >>> ) >>> .withFormat( >>> new Csv() >>> .quoteCharacter('"') >>> .ignoreParseErrors() >>> ) >>> .withSchema(schema) >>> .inAppendMode() >>> .createTemporaryTable(tableName); >>> >>> >>> I'm creating my table environment like so: >>> >>> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance() >>> .useBlinkPlanner() >>> .build(); >>> >>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >>> tableEnvSettings); >>> >>> TableConfig tConfig = tEnv.getConfig(); >>> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24)); >>> >>> >>> Is there something I'm misconfiguring or have misunderstood the docs? >>> >>> Thanks, >>> Austin >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins >>> [2]: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time >>> >> -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng