Hi Hanan, I created a fix for the problem. Would you please try it from your side? https://github.com/apache/flink/pull/10371
Best Regards Peter Huang On Tue, Nov 26, 2019 at 8:07 AM Peter Huang <huangzhenqiu0...@gmail.com> wrote: > Hi Hanan, > > After investigating the issue by using the test case you provided, I think > there is a big in it. Currently, the parquet predicts push down use the > predicate literal type to construct the FilterPredicate. > The issue happens when the data type of value in predicate inferred from > SQL doesn't match the parquet schema. For example, foo is a long type, foo > < 1 is the predicate. Literal will be recognized as an integration. It > causes the parquet FilterPredicate is mistakenly created for the column of > Integer type. I created a ticket for the issue. > https://issues.apache.org/jira/browse/FLINK-14953. Please also add more > insight by comment directly on it. > > > Best Regards > Peter Huang > > On Mon, Nov 18, 2019 at 12:40 PM Hanan Yehudai <hanan.yehu...@radcom.com> > wrote: > >> HI Peter. Thanks. >> >> This is my code . I used one of the parquet / avro tests as a reference. >> >> >> >> The code will fail on >> >> *Test testScan(ParquetTestCase) failed with:* >> >> *java.lang.UnsupportedOperationException* >> >> * at >> org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)* >> >> * at >> org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)* >> >> * at >> org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)* >> >> >> >> >> >> CODE : >> >> >> >> import org.apache.avro.Schema; >> >> import org.apache.avro.generic.GenericRecord; >> >> import org.apache.avro.generic.GenericRecordBuilder; >> >> import org.apache.avro.specific.SpecificRecord; >> >> import org.apache.avro.specific.SpecificRecordBuilderBase; >> >> import org.apache.flink.api.common.typeinfo.Types; >> >> import org.apache.flink.api.java.DataSet; >> >> import org.apache.flink.api.java.ExecutionEnvironment; >> >> import org.apache.flink.api.java.io.ParallelIteratorInputFormat; >> >> import org.apache.flink.api.java.io.TupleCsvInputFormat; >> >> import org.apache.flink.api.java.tuple.Tuple; >> >> import org.apache.flink.core.fs.FileSystem; >> >> import org.apache.flink.core.fs.Path; >> >> >> >> import org.apache.flink.formats.parquet.ParquetTableSource; >> >> import org.apache.flink.streaming.api.datastream.DataStream; >> >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> >> import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; >> >> import org.apache.flink.table.api.Table; >> >> import org.apache.flink.table.api.TableEnvironment; >> >> import org.apache.flink.table.api.java.BatchTableEnvironment; >> >> >> >> import org.apache.flink.table.api.java.StreamTableEnvironment; >> >> import org.apache.flink.table.sinks.CsvTableSink; >> >> import org.apache.flink.table.sinks.TableSink; >> >> import org.apache.flink.test.util.MultipleProgramsTestBase; >> >> import org.apache.flink.types.Row; >> >> >> >> import org.apache.avro.generic.IndexedRecord; >> >> import org.apache.parquet.avro.AvroSchemaConverter; >> >> import org.apache.parquet.schema.MessageType; >> >> import org.junit.BeforeClass; >> >> import org.junit.ClassRule; >> >> import org.junit.Test; >> >> import org.junit.rules.TemporaryFolder; >> >> >> >> import java.io.IOException; >> >> import java.util.ArrayList; >> >> import java.util.List; >> >> import java.util.UUID; >> >> >> >> import static org.junit.Assert.assertEquals; >> >> >> >> import org.apache.parquet.avro.AvroParquetWriter; >> >> import org.apache.parquet.hadoop.ParquetWriter; >> >> >> >> >> >> public class ParquetTestCase extends MultipleProgramsTestBase { >> >> >> >> private static String avroSchema = "{\n" + >> >> " \"name\": \"SimpleRecord\",\n" + >> >> " \"type\": \"record\",\n" + >> >> " \"fields\": [\n" + >> >> " { \"default\": null, \"name\": \"timestamp_edr\", >> \"type\": [ \"null\", \"long\" ]},\n" + >> >> " { \"default\": null, \"name\": \"id\", \"type\": [ >> \"null\", \"long\" ]},\n" + >> >> " { \"default\": null, \"name\": \"recordType_\", >> \"type\": [ \"null\", \"string\"]}\n" + >> >> " ],\n" + >> >> " \"schema_id\": 1,\n" + >> >> " \"type\": \"record\"\n" + >> >> "}"; >> >> >> >> private static final AvroSchemaConverter SCHEMA_CONVERTER = new >> AvroSchemaConverter(); >> >> private static Schema schm = new Schema.Parser().parse(avroSchema); >> >> private static Path testPath; >> >> >> >> >> >> public ParquetTestCase() { >> >> super(TestExecutionMode.COLLECTION); >> >> } >> >> >> >> >> >> @BeforeClass >> >> public static void setup() throws Exception { >> >> >> >> GenericRecordBuilder genericRecordBuilder = new >> GenericRecordBuilder(schm); >> >> >> >> >> >> List<IndexedRecord> recs = new ArrayList<>(); >> >> for (int i = 0; i < 6; i++) { >> >> GenericRecord gr = genericRecordBuilder.set("timestamp_edr", >> System.currentTimeMillis() / 1000).set("id", 3333333L).set("recordType_", >> "Type1").build(); >> >> recs.add(gr); >> >> GenericRecord gr2 = genericRecordBuilder.set("timestamp_edr", >> System.currentTimeMillis() / 1000).set("id", 222222L).set("recordType_", >> "Type2").build(); >> >> recs.add(gr2); >> >> } >> >> >> >> testPath = new Path("/tmp", UUID.randomUUID().toString()); >> >> >> >> >> >> ParquetWriter<IndexedRecord> writer = >> AvroParquetWriter.<IndexedRecord>builder( >> >> new >> org.apache.hadoop.fs.Path(testPath.toUri())).withSchema(schm).build(); >> >> >> >> for (IndexedRecord record : recs) { >> >> writer.write(record); >> >> } >> >> writer.close(); >> >> } >> >> >> >> >> >> private ParquetTableSource createParquetTableSource(Path path) throws >> IOException { >> >> MessageType nestedSchema = SCHEMA_CONVERTER.convert(schm); >> >> ParquetTableSource parquetTableSource = >> ParquetTableSource.builder() >> >> .path(path.getPath()) >> >> .forParquetSchema(nestedSchema) >> >> .build(); >> >> return parquetTableSource; >> >> } >> >> >> >> @Test >> >> public void testScan() throws Exception { >> >> ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> >> >> >> BatchTableEnvironment batchTableEnvironment = >> BatchTableEnvironment.create(env); >> >> ParquetTableSource tableSource = >> createParquetTableSource(testPath); >> >> batchTableEnvironment.registerTableSource("ParquetTable", >> tableSource); >> >> >> >> Table tab = batchTableEnvironment.sqlQuery("select >> id,recordType_ from ParquetTable where id > 222222 "); >> >> >> >> DataSet<Row> result = batchTableEnvironment.toDataSet(tab, >> Row.class); >> >> >> >> result.print(); >> >> >> >> } >> >> >> >> >> >> } >> >> >> >> >> >> *From:* Peter Huang <huangzhenqiu0...@gmail.com> >> *Sent:* Monday, November 18, 2019 7:22 PM >> *To:* dev <d...@flink.apache.org> >> *Cc:* user@flink.apache.org >> *Subject:* Re: SQL for Avro GenericRecords on Parquet >> >> >> >> Hi Hanan, >> >> >> >> Thanks for reporting the issue. Would you please attach your test code >> here? I may help to investigate. >> >> >> >> >> >> >> >> Best Regards >> >> Peter Huang >> >> >> >> On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai <hanan.yehu...@radcom.com> >> wrote: >> >> I have tried to persist Generic Avro records in a parquet file and then >> read it via ParquetTablesource – using SQL. >> Seems that the SQL I not executed properly ! >> >> The persisted records are : >> Id , type >> 3333333,Type1 >> 222222,Type2 >> 3333333,Type1 >> 222222,Type2 >> 3333333,Type1 >> 222222,Type2 >> 3333333,Type1 >> 222222,Type2 >> 3333333,Type1 >> 222222,Type2 >> 3333333,Type1 >> 222222,Type2 >> >> While SQL of SELECT id ,recordType_ FROM ParquetTable - return the >> above ( which is correct) >> Running : "SELECT id ,recordType_ FROM ParquetTable where >> recordType_='Type1' " >> Will result in : >> 3333333,Type1 >> 222222,Type1 >> 3333333,Type1 >> 222222,Type1 >> 3333333,Type1 >> 222222,Type1 >> 3333333,Type1 >> 222222,Type1 >> 3333333,Type1 >> 222222,Type1 >> 3333333,Type1 >> 222222,Type1 >> >> As if the equal sign is assignment and not equal … >> >> am I doing something wrong ? is it an issue of Generic record vs >> SpecificRecords ? >> >>