Hi Austin,

Do I assume correctly, that you self-answered your question? If not, could
you please update your current progress?

Best,

Arvid

On Thu, Aug 27, 2020 at 11:41 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Ah, I think the "Result Updating" is what got me -- INNER joins do the
> job!
>
> On Thu, Aug 27, 2020 at 3:38 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> oops, the example query should actually be:
>>
>> SELECT table_1.a, table_1.b, table_2.c
>> FROM table_1
>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>
>> and duplicate results should actually be:
>>
>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>> Record(a = "data a 1", b = "data b 1", c = null)
>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>> Record(a = "data a 2", b = "data b 2", c = null)
>>
>> On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
>>> reading from a few CSV files and joins some records across them into a
>>> couple of data streams (yes, this could be a batch job won't get into why
>>> we chose streams unless it's relevant). These joins are producing some
>>> duplicate records, one with the joined field present and one with the
>>> joined field as `null`, though this happens only ~25% of the time. Reading
>>> the docs on joins[1], I thought this could be caused by too strict Idle
>>> State Retention[2], so I increased that to min, max (15min, 24h) but that
>>> doesn't seem to have an effect, and the problem still occurs when testing
>>> on a subset of data that finishes processing in under a minute.
>>>
>>> The query roughly looks like:
>>>
>>> table_1 has fields a, b
>>> table_2 has fields b, c
>>>
>>> SELECT table_1.a, table_1.b, table_1.c
>>> FROM table_1
>>> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>>>
>>> Correct result:
>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>>
>>> Results seem to be anywhere between all possible dups and the correct
>>> result.
>>>
>>> Record(a = "data a 1", b = "data b 1", c = "data c 1")
>>> Record(a = "data a 1", b = null, c = "data c 1")
>>> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>>> Record(a = "data a 2", b = null, c = "data c 2")
>>>
>>> The CSV files are registered as Flink Tables with the following:
>>>
>>> tableEnv.connect(
>>>     new FileSystem()
>>>         .path(path)
>>> )
>>>     .withFormat(
>>>         new Csv()
>>>             .quoteCharacter('"')
>>>             .ignoreParseErrors()
>>>     )
>>>     .withSchema(schema)
>>>     .inAppendMode()
>>>     .createTemporaryTable(tableName);
>>>
>>>
>>> I'm creating my table environment like so:
>>>
>>> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
>>>     .useBlinkPlanner()
>>>     .build();
>>>
>>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>>> tableEnvSettings);
>>>
>>> TableConfig tConfig = tEnv.getConfig();
>>> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));
>>>
>>>
>>> Is there something I'm misconfiguring or have misunderstood the docs?
>>>
>>> Thanks,
>>> Austin
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to