sujun created FLINK-15563: ----------------------------- Summary: When using ParquetTableSource, The program hangs until OOM Key: FLINK-15563 URL: https://issues.apache.org/jira/browse/FLINK-15563 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.9.1, 1.8.1 Reporter: sujun
def main(args: Array[String]): Unit = \{ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tableEnv = StreamTableEnvironment.create(env) val schema = "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},\{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},\{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},\{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"** val parquetTableSource: ParquetTableSource = ParquetTableSource .builder .forParquetSchema(new org.apache.parquet.avro.AvroSchemaConverter().convert( org.apache.avro.Schema.parse(schema, true))) .path("/path/to/login_data") .build tableEnv.registerTableSource("source",parquetTableSource) val t1 = tableEnv.sqlQuery("select log_id,city from source where city = '274' ") tableEnv.registerTable("t1",t1) val t4 = tableEnv.sqlQuery("select * from t1 where log_id='5927070661978133'") t1.toAppendStream[Row].print() env.execute() } -- This message was sent by Atlassian Jira (v8.3.4#803005)