Hi Marco,

sorry for the late reply. Have you looked into user-defined aggregate functions for SQL? I think your requirements can be easily implemented there. You can declare multiple aggregate functions per window. There is also the built-in function LISTAGG that might help for your use case. But Flink SQL aggregate functions support arbirary data types (e.g. arrays as result type).

Regarding `do I need to wait another 15 minutes to aggregate this`: This is another example of why event time is important. Actually you would like to process the data quicker than wall-clock time. If your example would work in event-time, the watermark would be emitted after the window 1 has been processed and this watermark would also trigger the second window immediately without the need to another 15 min in processing time.

I hope this helps.

Regards,
Timo

On 12.12.20 01:38, Marco Villalobos wrote:
Alright, maybe my example needs to be more concrete. How about this:
In this example, I don't want to create to windows just to re-combine what was just aggregated in SQL.  Is there a way to transform the aggregate results into one datastream object so that I don't have to aggregate again?


// aggregate this stream for 15 minutes
final Table employeeDailyPurchasesTable =tableEnv.sqlQuery("SELECT\n" +
" t.organization_id, t.department_id, s.date, s.employee_id, t.fullName, t.dob, SUM(s.purchase) AS purchases\n" +
       "FROM\n" +
       " employee_purchases s\n" +
       "LEFT JOIN\n" +
" employees FOR SYSTEM_TIME AS OF s.procTime AS t ON t.organization = s.organization AND t.department = s.department AND t.employee_id = s.employee_id\n" +
       "GROUP BY\n" +
" TUMBLE(s.procTime, INTERVAL '15' MINUTE), t.organization_id, t.department_id, s.date, s.employee_id, t.fullName, t.dob");

// now I want everything that was just aggregated processed together,
// below gives me each row again in a stream
final DataStream<Row> employeeDailyPurchasesDataStream 
=tableEnv.toAppendStream(employeeDailyPurchasesTable, Row.class);

// so, do I need to wait another 15 minutes to aggregate this? It was just aggregated for 15 minutes above! // how do I get the previous aggregated results into one object so that I don't have to wait and aggregate it again
final DataStream<DailyEmployeePurchases> aggregatedAgainBecauseINeedHelp 
=employeeDailyPurchasesDataStream
.keyBy(0, 1, 2)
       .window(TumblingProcessingTimeWindows.of(Time.minutes(15)))
       .aggregate(new AggregateFunction<Row, DailyEmployeePurchases, 
DailyEmployeePurchases>() {

          @Override
public DailyEmployeePurchases createAccumulator() {
             return new DailyEmployeePurchases();
}

          @Override
public DailyEmployeePurchases add(Row value, DailyEmployeePurchases 
accumulator) {
             return accumulator.add(value);
}

          @Override
public DailyEmployeePurchases getResult(DailyEmployeePurchases accumulator) {
             return accumulator;
}

          @Override
public DailyEmployeePurchases merge(DailyEmployeePurchases a, 
DailyEmployeePurchases b) {
             return a.merge(b);
}
       });

// important business logic that needs to be applied to the group of employees
aggregatedAgainBecauseINeedHelp.keyBy("organizationId", "departmentId")
       .process(new KeyedProcessFunction<Tuple, DailyEmployeePurchases, 
DailyEmployeePurchases>() {

          @Override
public void processElement(DailyEmployeePurchases value, Context ctx, 
Collector<DailyEmployeePurchases> out)throws Exception {
             // very important stuff here
}
       });




Reply via email to