zhangbin created FLINK-27418: -------------------------------- Summary: Flink SQL TopN result is wrong Key: FLINK-27418 URL: https://issues.apache.org/jira/browse/FLINK-27418 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.14.3, 1.12.2 Environment: Flink 1.12.2 and Flink 1.14.3 test results are sometimes wrong Reporter: zhangbin
Flink SQL TopN is executed multiple times with different results, sometimes with correct results and sometimes with incorrect results. Example: @Test public void testFlinkSqlJoinRetract() { EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000)); RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo(); RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo(); SourceFunction<Row> waybillSourceFunction = buildWaybillStreamSource(waybillTableTypeInfo); SourceFunction<Row> itemSourceFunction = buildItemStreamSource(itemTableTypeInfo); String waybillTable = "waybill"; String itemTable = "item"; DataStreamSource<Row> waybillStream = streamEnv.addSource( waybillSourceFunction, waybillTable, waybillTableTypeInfo); DataStreamSource<Row> itemStream = streamEnv.addSource( itemSourceFunction, itemTable, itemTableTypeInfo); Expression[] waybillFields = ExpressionParser .parseExpressionList(String.join(",", waybillTableTypeInfo.getFieldNames()) + ",proctime.proctime").toArray(new Expression[0]); Expression[] itemFields = ExpressionParser .parseExpressionList( String.join(",", itemTableTypeInfo.getFieldNames()) + ",proctime.proctime") .toArray(new Expression[0]); tableEnv.createTemporaryView(waybillTable, waybillStream, waybillFields); tableEnv.createTemporaryView(itemTable, itemStream, itemFields); String sql = "select \n" + " city_id, \n" + " count(*) as cnt\n" + "from (\n" + " select id,city_id\n" + " from (\n" + " select \n" + " id,\n" + " city_id,\n" + " row_number() over(partition by id order by utime desc ) as rno \n" + " from (\n" + " select \n" + " waybill.id as id,\n" + " coalesce(item.city_id, waybill.city_id) as city_id,\n" + " waybill.utime as utime \n" + " from waybill left join item \n" + " on waybill.id = item.id \n" + " ) \n" + " )\n" + " where rno =1\n" + ")\n" + "group by city_id"; StatementSet statementSet = tableEnv.createStatementSet(); Table table = tableEnv.sqlQuery(sql); DataStream<Tuple2<Boolean, Row>> rowDataStream = tableEnv.toRetractStream(table, Row.class); rowDataStream.printToErr(); try { streamEnv.execute(); } catch (Exception e) { e.printStackTrace(); } } private static RowTypeInfo buildWaybillTableTypeInfo() { TypeInformation[] types = new TypeInformation[]\\{Types.INT(), Types.STRING(), Types.LONG(), Types.LONG()} ; String[] fields = new String[]\{"id", "city_id", "rider_id", "utime"}; return new RowTypeInfo(types, fields); } private static RowTypeInfo buildItemTableTypeInfo() { TypeInformation[] types = new TypeInformation[]\\{Types.INT(), Types.STRING(), Types.LONG()} ; String[] fields = new String[]\{"id", "city_id", "utime"}; return new RowTypeInfo(types, fields); } //id,rider_id,city_id,utime private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo rowTypeInfo) { return new SourceFunction<Row>() { private volatile boolean stopped = false; int count = 0; int[] ids = \\{111, 222, 333, 111} ; String[] cityIds = \{"A", "A", "B", "A"}; @Override public void run(SourceContext<Row> ctx) throws Exception { while (!stopped) { int id = ids[count % ids.length]; String cityId = cityIds[count % cityIds.length]; Row row = new Row(4); row.setField(0, id); row.setField(1, cityId); row.setField(2, (long) RandomUtils.nextInt(1000, 2000)); row.setField(3, System.currentTimeMillis()); printRow(rowTypeInfo, row); ctx.collect(row); if (++count > 3) { stopped = true; } } } @Override public void cancel() { stopped = true; } }; } //id,city_id,utime private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo rowTypeInfo) { return new SourceFunction<Row>() { private volatile boolean stopped = false; int count = 0; int[] ids = \\{111, 333} ; String[] cityIds = \{"C", "D"}; @Override public void run(SourceContext<Row> ctx) throws Exception { while (!stopped) { Thread.sleep(RandomUtils.nextInt(1000, 2000)); int id = ids[count % ids.length]; String cityId = cityIds[count % cityIds.length]; Row row = new Row(3); row.setField(0, id); row.setField(1, cityId); //row.setField(2, System.currentTimeMillis()); printRow(rowTypeInfo, row); ctx.collect(row); if (++count >= 2) { stopped = true; } } } @Override public void cancel() { stopped = true; } }; } public static void printRow(RowTypeInfo rowTypeInfo, Row row) { String prefix = ""; for (int i = 0; i < rowTypeInfo.getArity(); ++i) { prefix = i > 0 ? "," : ""; System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + row.getField(i)); } System.out.println(); } ------------------------------------------------------------ Wrong Result: id:111,city_id:A,rider_id:1137,utime:1650979957702 id:222,city_id:A,rider_id:1976,utime:1650979957725 id:333,city_id:B,rider_id:1916,utime:1650979957725 id:111,city_id:A,rider_id:1345,utime:1650979957725 (true,A,1) (false,A,1) (true,A,2) (true,B,1) (false,A,2) (true,A,1) (false,A,1) (true,A,2) id:111,city_id:C,utime:null (false,A,2) (true,A,1) (true,C,1) (false,A,1) (false,C,1) (true,C,2) id:333,city_id:D,utime:null (false,B,1) (true,D,1) The final result: C,2 D,1 is wrong. ------------------------------------------ Right result: ------------------------------------------ id:111,city_id:A,rider_id:1155,utime:1650980662019 id:222,city_id:A,rider_id:1875,utime:1650980662042 id:333,city_id:B,rider_id:1430,utime:1650980662042 id:111,city_id:A,rider_id:1308,utime:1650980662042 (true,A,1) (false,A,1) (true,A,2) (true,B,1) (false,A,2) (true,A,1) (false,A,1) (true,A,2) id:111,city_id:C,utime:null (false,A,2) (true,A,1) (false,A,1) (true,A,2) (false,A,2) (true,A,1) (true,C,1) id:333,city_id:D,utime:null (false,B,1) (true,D,1) The final result: A,1 C,2 D,1 is right. -- This message was sent by Atlassian Jira (v8.20.7#820007)