Hi Henri,
I try to answer your question:
1) You are right, SELECT DISTINCT should not need a retract stream.
Internally, this is translated into an aggregation without an aggregate
function call. So this definitely needs improvement.
2) The problem is that SELECT DISTINCT is not officially supported nor
tested. I opened an issue for this [1].
Until this issue is fixed I would recommend to implement a custom
aggregate function that keeps track values seen so far [2].
Regards,
Timo
[1] https://issues.apache.org/jira/browse/FLINK-8564
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/udfs.html#aggregation-functions
Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
Hi,
I have a use case where I would like to find distinct rows over
certain period of time. Requirement is that new row is emitted asap.
Otherwise the requirement is mainly to just filter out data to have
smaller dataset for downstream. I noticed that SELECT DISTINCT and
state retention time of 12 hours would in theory do the trick. You can
find the code below. Few questions.
1) Why is SELECT DISTINCT creating a retract stream? In which
scenarios we would get update/delete rows?
2) If I run the below code with the example data (also below) without
state retention config I get the two append rows (expected). If I run
exactly the code below (with the retention config) I'll get two
appends and one delete for AN1234 and then one append for AN5555. What
is going on?
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set idle state retention time. min = max = 12 hours
qConfig.withIdleStateRetentionTime(Time.hours(12));
// create a TableSource
CsvTableSource csvSource = CsvTableSource
.builder()
.path("data.csv")
.field("ts", Types.SQL_TIMESTAMP())
.field("aid1", Types.STRING())
.field("aid2", Types.STRING())
.field("advertiser_id", Types.STRING())
.field("platform_id", Types.STRING())
.fieldDelimiter(",")
.build();
tableEnv.registerTableSource("CsvTable", csvSource);
Table result = tableEnv.sqlQuery(
"SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");
StdOutRetractStreamTableSink out = new
StdOutRetractStreamTableSink(new String[] {"aid1", "aid2",
"advertiser_id", "platform_id"},
new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING()});
result.writeToSink(out, qConfig);
env.execute();
Here is a simple csv dataset of three rows:
2018-01-31
12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890
2018-01-31
12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890
2018-01-31
12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-1234,1234567891