DataStream<DataSource> skuDataStream = stream.map(
(MapFunction<String, DataSource>) s -> {
DataSource ret = JSONObject.parseObject(s,
DataSource.class);
ret.setEvent_ts(dateTime.parse(ret.getLog_creation_time(),
pattern).getMillis());
return ret;
}
).filter((FilterFunction<DataSource>) DataSource ->
DataSource.getregion_id() > 0
&& DataSource.getprovince_id() > 0
&& DataSource.getCity_id() > 0
&& DataSource.getDistrict_id() > 0
&& DataSource.getCode() > 0
&& DataSource.getCategory1_id() > 0
&& DataSource.getCategory2_id() > 0
&& DataSource.getCategory3_id() > 0 && DataSource.getUnion_id()
!= null);
Schema schema = Schema.newBuilder()
.column("code", "bigint")
.column("flow_source", "string")
.column("origin_sku_id","bigint")
.column("sku_id","bigint")
.column("region_id","bigint")
.column("province_id","bigint")
.column("city_id","bigint")
.column("district_id","bigint")
.column("category1_id","bigint")
.column("category2_id","bigint")
.column("category3_id","bigint")
.column("union_id", "string")
.column("channel1_id", "string")
.column("channel2_id", "string")
.columnByExpression("row_ltz",
Expressions.callSql("TO_TIMESTAMP_LTZ(event_ts, 3)"))
.watermark("row_ltz", "row_ltz - INTERVAL '10' SECOND")
.build();
Table inputTable = tableEnv.fromDataStream(skuDataStream, schema);
tableEnv.createTemporaryView("source_table", inputTable);
String sink = "CREATE TABLE sink_table (\n" +
" window_start string,\n" +
" window_end string,\n" +
" region_id bigint,\n" +
" province_id bigint,\n" +
" city_id bigint,\n" +
" district_id bigint,\n" +
" code bigint,\n" +
" category1_id bigint,\n" +
" category2_id bigint,\n" +
" category3_id bigint,\n" +
" uv bigint,\n" +
" uv1 bigint,\n" +
" uv2 bigint,\n" +
" uv3 bigint,\n" +
" uv4 bigint,\n" +
" uv5 bigint,\n" +
" uv6 bigint\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
String sql =
"insert into sink_table \n" +
"SELECT \n" +
"cast(window_start as string) as window_start, \n" +
"cast(window_end as string) as window_end, \n" +
"region_id, \n" +
"province_id, \n" +
"city_id, \n" +
"district_id, \n" +
"code, \n" +
"category1_id, \n" +
"category2_id, \n" +
"category3_id, \n" +
"count(distinct union_id) as uv, \n" +
"COUNT(DISTINCT union_id) FILTER (WHERE channel1_id in
('1', '2')) AS uv1, \n" +
"COUNT(DISTINCT union_id) FILTER (WHERE channel1_id
='1') AS uv2, \n" +
"COUNT(DISTINCT union_id) FILTER (WHERE channel1_id
='3') AS uv3, \n" +
"COUNT(DISTINCT union_id) FILTER (WHERE channel1_id
='1' and channel2_id='1') AS uv4, \n" +
"COUNT(DISTINCT union_id) FILTER (WHERE channel1_id
='3' and channel2_id='1') AS uv5, \n" +
"COUNT(DISTINCT union_id) FILTER (WHERE channel1_id
='1' and origin_sku_id = sku_id) AS uv6 \n" +
"FROM TABLE(CUMULATE(TABLE source_table,
DESCRIPTOR(row_ltz), INTERVAL '10' MINUTES, INTERVAL '1' DAY)) \n" +
//"GROUP BY window_start,
window_end,code,region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id";
"GROUP BY window_start, window_end," +
"GROUPING SETS (\n" +
"(code,region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id),\n"
+
"(code,region_id,province_id,city_id,district_id,category1_id,category2_id),\n"
+
"(code,region_id,province_id,city_id,district_id,category1_id),\n" +
"\n" +
"(code,region_id,province_id,city_id,category1_id,category2_id,category3_id),\n"
+
"(code,region_id,province_id,category1_id,category2_id,category3_id),\n" +
"(code,region_id,category1_id,category2_id,category3_id),\n" +
"\n" +
"(code,region_id,province_id,city_id,district_id),\n" +
"(code,region_id,province_id,city_id),\n" +
"(code,region_id,province_id),\n" +
"(code,region_id),\n" +
"\n" +
"(code,category1_id,category2_id,category3_id),\n" +
"(code,category1_id,category2_id),\n" +
"(code,category1_id),\n" +
"\n" +
"(code),\n" +
"\n" +
"\n" +
"(region_id,province_id,city_id,district_id,category1_id,category2_id,category3_id),\n"
+
"(region_id,province_id,city_id,district_id,category1_id,category2_id),\n" +
"(region_id,province_id,city_id,district_id,category1_id),\n" +
"\n" +
"(region_id,province_id,city_id,category1_id,category2_id,category3_id),\n" +
"(region_id,province_id,category1_id,category2_id,category3_id),\n" +
"(region_id,category1_id,category2_id,category3_id),\n"
+
"\n" +
"(region_id,province_id,city_id,district_id),\n" +
"(region_id,province_id,city_id),\n" +
"(region_id,province_id),\n" +
"(region_id),\n" +
"\n" +
"(category1_id,category2_id,category3_id),\n" +
"(category1_id,category2_id),\n" +
"(category1_id),\n" +
"()\n" +
");";
tableEnv.executeSql(sink);
tableEnv.executeSql(sql);
}