Hey Arvid, Yes, I was able to self-answer this one. Was just confused on the non-deterministic behavior of the FULL OUTER join statement. Thinking through it and took a harder read through the Dynamic Tables doc section[1] where "Result Updating" is hinted at, and the behavior makes total sense in a streaming env.
Thanks, Austin [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/dynamic_tables.html On Mon, Aug 31, 2020 at 5:16 AM Arvid Heise <ar...@ververica.com> wrote: > Hi Austin, > > Do I assume correctly, that you self-answered your question? If not, could > you please update your current progress? > > Best, > > Arvid > > On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards < > austin.caw...@gmail.com> wrote: > >> Ah, I think the "Result Updating" is what got me -- INNER joins do the >> job! >> >> On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards < >> austin.caw...@gmail.com> wrote: >> >>> oops, the example query should actually be: >>> >>> SELECT table_1.a, table_1.b, table_2.c >>> FROM table_1 >>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b; >>> >>> and duplicate results should actually be: >>> >>> Record(a = "data a 1", b = "data b 1", c = "data c 1") >>> Record(a = "data a 1", b = "data b 1", c = null) >>> Record(a = "data a 2", b = "data b 2", c = "data c 2") >>> Record(a = "data a 2", b = "data b 2", c = null) >>> >>> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards < >>> austin.caw...@gmail.com> wrote: >>> >>>> Hey all, >>>> >>>> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is >>>> reading from a few CSV files and joins some records across them into a >>>> couple of data streams (yes, this could be a batch job won't get into why >>>> we chose streams unless it's relevant). These joins are producing some >>>> duplicate records, one with the joined field present and one with the >>>> joined field as `null`, though this happens only ~25% of the time. Reading >>>> the docs on joins[1], I thought this could be caused by too strict Idle >>>> State Retention[2], so I increased that to min, max (15min, 24h) but that >>>> doesn't seem to have an effect, and the problem still occurs when testing >>>> on a subset of data that finishes processing in under a minute. >>>> >>>> The query roughly looks like: >>>> >>>> table_1 has fields a, b >>>> table_2 has fields b, c >>>> >>>> SELECT table_1.a, table_1.b, table_1.c >>>> FROM table_1 >>>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b; >>>> >>>> Correct result: >>>> Record(a = "data a 1", b = "data b 1", c = "data c 1") >>>> Record(a = "data a 2", b = "data b 2", c = "data c 2") >>>> >>>> Results seem to be anywhere between all possible dups and the correct >>>> result. >>>> >>>> Record(a = "data a 1", b = "data b 1", c = "data c 1") >>>> Record(a = "data a 1", b = null, c = "data c 1") >>>> Record(a = "data a 2", b = "data b 2", c = "data c 2") >>>> Record(a = "data a 2", b = null, c = "data c 2") >>>> >>>> The CSV files are registered as Flink Tables with the following: >>>> >>>> tableEnv.connect( >>>> new FileSystem() >>>> .path(path) >>>> ) >>>> .withFormat( >>>> new Csv() >>>> .quoteCharacter('"') >>>> .ignoreParseErrors() >>>> ) >>>> .withSchema(schema) >>>> .inAppendMode() >>>> .createTemporaryTable(tableName); >>>> >>>> >>>> I'm creating my table environment like so: >>>> >>>> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance() >>>> .useBlinkPlanner() >>>> .build(); >>>> >>>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >>>> tableEnvSettings); >>>> >>>> TableConfig tConfig = tEnv.getConfig(); >>>> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24)); >>>> >>>> >>>> Is there something I'm misconfiguring or have misunderstood the docs? >>>> >>>> Thanks, >>>> Austin >>>> >>>> [1]: >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins >>>> [2]: >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time >>>> >>> > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >