Hi,

I have a use case where I would like to find distinct rows over certain
period of time. Requirement is that new row is emitted asap. Otherwise the
requirement is mainly to just filter out data to have smaller dataset for
downstream. I noticed that SELECT DISTINCT and state retention time of 12
hours would in theory do the trick. You can find the code below. Few
questions.

1) Why is SELECT DISTINCT creating a retract stream? In which scenarios we
would get update/delete rows?

2) If I run the below code with the example data (also below) without state
retention config I get the two append rows (expected). If I run exactly the
code below (with the retention config) I'll get two appends and one delete
for AN1234 and then one append for AN5555. What is going on?

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

StreamQueryConfig qConfig = tableEnv.queryConfig();
// set idle state retention time. min = max = 12 hours
qConfig.withIdleStateRetentionTime(Time.hours(12));

// create a TableSource
CsvTableSource csvSource = CsvTableSource
.builder()
.path("data.csv")
.field("ts", Types.SQL_TIMESTAMP())
.field("aid1", Types.STRING())
.field("aid2", Types.STRING())
.field("advertiser_id", Types.STRING())
.field("platform_id", Types.STRING())
.fieldDelimiter(",")
.build();

tableEnv.registerTableSource("CsvTable", csvSource);

Table result = tableEnv.sqlQuery(
"SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");

StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new
String[] {"aid1", "aid2", "advertiser_id", "platform_id"},
new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING()});

result.writeToSink(out, qConfig);

env.execute();


Here is a simple csv dataset of three rows:

2018-01-31
12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890
2018-01-31
12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890
2018-01-31
12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-1234,1234567891

Reply via email to