HI Peter. Thanks. This is my code . I used one of the parquet / avro tests as a reference.
The code will fail on Test testScan(ParquetTestCase) failed with: java.lang.UnsupportedOperationException at org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71) at org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105) at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268) CODE : import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.specific.SpecificRecord; import org.apache.avro.specific.SpecificRecordBuilderBase; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.ParallelIteratorInputFormat; import org.apache.flink.api.java.io.TupleCsvInputFormat; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.ParquetTableSource; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.Row; import org.apache.avro.generic.IndexedRecord; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.schema.MessageType; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; import static org.junit.Assert.assertEquals; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; public class ParquetTestCase extends MultipleProgramsTestBase { private static String avroSchema = "{\n" + " \"name\": \"SimpleRecord\",\n" + " \"type\": \"record\",\n" + " \"fields\": [\n" + " { \"default\": null, \"name\": \"timestamp_edr\", \"type\": [ \"null\", \"long\" ]},\n" + " { \"default\": null, \"name\": \"id\", \"type\": [ \"null\", \"long\" ]},\n" + " { \"default\": null, \"name\": \"recordType_\", \"type\": [ \"null\", \"string\"]}\n" + " ],\n" + " \"schema_id\": 1,\n" + " \"type\": \"record\"\n" + "}"; private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); private static Schema schm = new Schema.Parser().parse(avroSchema); private static Path testPath; public ParquetTestCase() { super(TestExecutionMode.COLLECTION); } @BeforeClass public static void setup() throws Exception { GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(schm); List<IndexedRecord> recs = new ArrayList<>(); for (int i = 0; i < 6; i++) { GenericRecord gr = genericRecordBuilder.set("timestamp_edr", System.currentTimeMillis() / 1000).set("id", 3333333L).set("recordType_", "Type1").build(); recs.add(gr); GenericRecord gr2 = genericRecordBuilder.set("timestamp_edr", System.currentTimeMillis() / 1000).set("id", 222222L).set("recordType_", "Type2").build(); recs.add(gr2); } testPath = new Path("/tmp", UUID.randomUUID().toString()); ParquetWriter<IndexedRecord> writer = AvroParquetWriter.<IndexedRecord>builder( new org.apache.hadoop.fs.Path(testPath.toUri())).withSchema(schm).build(); for (IndexedRecord record : recs) { writer.write(record); } writer.close(); } private ParquetTableSource createParquetTableSource(Path path) throws IOException { MessageType nestedSchema = SCHEMA_CONVERTER.convert(schm); ParquetTableSource parquetTableSource = ParquetTableSource.builder() .path(path.getPath()) .forParquetSchema(nestedSchema) .build(); return parquetTableSource; } @Test public void testScan() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env); ParquetTableSource tableSource = createParquetTableSource(testPath); batchTableEnvironment.registerTableSource("ParquetTable", tableSource); Table tab = batchTableEnvironment.sqlQuery("select id,recordType_ from ParquetTable where id > 222222 "); DataSet<Row> result = batchTableEnvironment.toDataSet(tab, Row.class); result.print(); } } From: Peter Huang <huangzhenqiu0...@gmail.com> Sent: Monday, November 18, 2019 7:22 PM To: dev <dev@flink.apache.org> Cc: u...@flink.apache.org Subject: Re: SQL for Avro GenericRecords on Parquet Hi Hanan, Thanks for reporting the issue. Would you please attach your test code here? I may help to investigate. Best Regards Peter Huang On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai <hanan.yehu...@radcom.com<mailto:hanan.yehu...@radcom.com>> wrote: I have tried to persist Generic Avro records in a parquet file and then read it via ParquetTablesource – using SQL. Seems that the SQL I not executed properly ! The persisted records are : Id , type 3333333,Type1 222222,Type2 3333333,Type1 222222,Type2 3333333,Type1 222222,Type2 3333333,Type1 222222,Type2 3333333,Type1 222222,Type2 3333333,Type1 222222,Type2 While SQL of SELECT id ,recordType_ FROM ParquetTable - return the above ( which is correct) Running : "SELECT id ,recordType_ FROM ParquetTable where recordType_='Type1' " Will result in : 3333333,Type1 222222,Type1 3333333,Type1 222222,Type1 3333333,Type1 222222,Type1 3333333,Type1 222222,Type1 3333333,Type1 222222,Type1 3333333,Type1 222222,Type1 As if the equal sign is assignment and not equal … am I doing something wrong ? is it an issue of Generic record vs SpecificRecords ?