[ https://issues.apache.org/jira/browse/FLINK-15563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sujun updated FLINK-15563: -------------------------- Description: This is my code: {code:java} def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tableEnv = StreamTableEnvironment.create(env) val schema = "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}" val parquetTableSource: ParquetTableSource = ParquetTableSource .builder .forParquetSchema(new org.apache.parquet.avro.AvroSchemaConverter().convert( org.apache.avro.Schema.parse(schema, true))) .path("/Users/sujun/Documents/tmp/login_data") .build tableEnv.registerTableSource("source",parquetTableSource) val t1 = tableEnv.sqlQuery("select log_id,city from source where city = '274' ") tableEnv.registerTable("t1",t1) val t2 = tableEnv.sqlQuery("select * from t1 where log_id='5927070661978133'") t2.toAppendStream[Row].print() env.execute()} {code} When the two SQLS each have a where condition, the main program will hang until OOM. When the filter push down code of ParquetTableSource is deleted, the program runs normally. Through my debugging, I found that the program hangs in the org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp method May be a bug in the calcite optimizer caused by filter push down code was: This is my code: {code:java} def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tableEnv = StreamTableEnvironment.create(env) val schema = "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}" val parquetTableSource: ParquetTableSource = ParquetTableSource .builder .forParquetSchema(new org.apache.parquet.avro.AvroSchemaConverter().convert( org.apache.avro.Schema.parse(schema, true))) .path("/Users/sujun/Documents/tmp/login_data") .build tableEnv.registerTableSource("source",parquetTableSource) val t1 = tableEnv.sqlQuery("select log_id,city from source where city = '274' ") tableEnv.registerTable("t1",t1) val t2 = tableEnv.sqlQuery("select * from t1 where log_id='5927070661978133'") t2.toAppendStream[Row].print() env.execute()} {code} When the two SQLS each have a where condition, the main program will hang until OOM. When the filter push down code of ParquetTableSource is deleted, the program runs normally. Through my debugging, I found that the program hangs in the org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp method May be a bug in the calcul optimizer caused by filter push down code Labels: Parquet (was: ) > When using ParquetTableSource, The program hangs until OOM > ----------------------------------------------------------- > > Key: FLINK-15563 > URL: https://issues.apache.org/jira/browse/FLINK-15563 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem > Affects Versions: 1.8.1, 1.9.1 > Reporter: sujun > Priority: Critical > Labels: Parquet > > This is my code: > {code:java} > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tableEnv = StreamTableEnvironment.create(env) val schema = > "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}" > > val parquetTableSource: ParquetTableSource = ParquetTableSource > .builder > .forParquetSchema(new > org.apache.parquet.avro.AvroSchemaConverter().convert( > org.apache.avro.Schema.parse(schema, true))) > .path("/Users/sujun/Documents/tmp/login_data") > .build > tableEnv.registerTableSource("source",parquetTableSource) > val t1 = tableEnv.sqlQuery("select log_id,city from source where city = > '274' ") > tableEnv.registerTable("t1",t1) > val t2 = tableEnv.sqlQuery("select * from t1 where > log_id='5927070661978133'") > t2.toAppendStream[Row].print() > env.execute()} > {code} > > When the two SQLS each have a where condition, the main program will hang > until OOM. When the filter push down code of ParquetTableSource is deleted, > the program runs normally. > > Through my debugging, I found that the program hangs in the > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp method > > May be a bug in the calcite optimizer caused by filter push down code > -- This message was sent by Atlassian Jira (v8.3.4#803005)