oops, the example query should actually be:

SELECT table_1.a, table_1.b, table_2.c
FROM table_1
LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;

and duplicate results should actually be:

Record(a = "data a 1", b = "data b 1", c = "data c 1")
Record(a = "data a 1", b = "data b 1", c = null)
Record(a = "data a 2", b = "data b 2", c = "data c 2")
Record(a = "data a 2", b = "data b 2", c = null)

On Thu, Aug 27, 2020 at 3:34 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey all,
>
> I've got a Flink 1.10 Streaming SQL job using the Blink Planner that is
> reading from a few CSV files and joins some records across them into a
> couple of data streams (yes, this could be a batch job won't get into why
> we chose streams unless it's relevant). These joins are producing some
> duplicate records, one with the joined field present and one with the
> joined field as `null`, though this happens only ~25% of the time. Reading
> the docs on joins[1], I thought this could be caused by too strict Idle
> State Retention[2], so I increased that to min, max (15min, 24h) but that
> doesn't seem to have an effect, and the problem still occurs when testing
> on a subset of data that finishes processing in under a minute.
>
> The query roughly looks like:
>
> table_1 has fields a, b
> table_2 has fields b, c
>
> SELECT table_1.a, table_1.b, table_1.c
> FROM table_1
> LEFT OUTER JOIN table_2 ON table_1.b = table_2.b;
>
> Correct result:
> Record(a = "data a 1", b = "data b 1", c = "data c 1")
> Record(a = "data a 2", b = "data b 2", c = "data c 2")
>
> Results seem to be anywhere between all possible dups and the correct
> result.
>
> Record(a = "data a 1", b = "data b 1", c = "data c 1")
> Record(a = "data a 1", b = null, c = "data c 1")
> Record(a = "data a 2", b = "data b 2", c = "data c 2")
> Record(a = "data a 2", b = null, c = "data c 2")
>
> The CSV files are registered as Flink Tables with the following:
>
> tableEnv.connect(
>     new FileSystem()
>         .path(path)
> )
>     .withFormat(
>         new Csv()
>             .quoteCharacter('"')
>             .ignoreParseErrors()
>     )
>     .withSchema(schema)
>     .inAppendMode()
>     .createTemporaryTable(tableName);
>
>
> I'm creating my table environment like so:
>
> EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
>     .useBlinkPlanner()
>     .build();
>
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> tableEnvSettings);
>
> TableConfig tConfig = tEnv.getConfig();
> tConfig.setIdleStateRetentionTime(Time.minutes(15), Time.hours(24));
>
>
> Is there something I'm misconfiguring or have misunderstood the docs?
>
> Thanks,
> Austin
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
>

Reply via email to