Hi Hanan,

After investigating the issue by using the test case you provided, I think
there is a big in it. Currently, the parquet predicts push down use the
predicate literal type to construct the FilterPredicate.
The issue happens when the data type of value in predicate inferred from
SQL doesn't match the parquet schema. For example, foo is a long type, foo
< 1 is the predicate. Literal will be recognized as an integration. It
causes the parquet FilterPredicate is mistakenly created for the column of
Integer type. I created a ticket for the issue.
https://issues.apache.org/jira/browse/FLINK-14953. Please also add more
insight by comment directly on it.


Best Regards
Peter Huang

On Mon, Nov 18, 2019 at 12:40 PM Hanan Yehudai <hanan.yehu...@radcom.com>
wrote:

> HI Peter.  Thanks.
>
> This is my code .  I used one of the parquet / avro tests as a reference.
>
>
>
> The code will fail on
>
> *Test testScan(ParquetTestCase) failed with:*
>
> *java.lang.UnsupportedOperationException*
>
> *               at
> org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)*
>
> *               at
> org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)*
>
> *               at
> org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)*
>
>
>
>
>
> CODE :
>
>
>
> import org.apache.avro.Schema;
>
> import org.apache.avro.generic.GenericRecord;
>
> import org.apache.avro.generic.GenericRecordBuilder;
>
> import org.apache.avro.specific.SpecificRecord;
>
> import org.apache.avro.specific.SpecificRecordBuilderBase;
>
> import org.apache.flink.api.common.typeinfo.Types;
>
> import org.apache.flink.api.java.DataSet;
>
> import org.apache.flink.api.java.ExecutionEnvironment;
>
> import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
>
> import org.apache.flink.api.java.io.TupleCsvInputFormat;
>
> import org.apache.flink.api.java.tuple.Tuple;
>
> import org.apache.flink.core.fs.FileSystem;
>
> import org.apache.flink.core.fs.Path;
>
>
>
> import org.apache.flink.formats.parquet.ParquetTableSource;
>
> import org.apache.flink.streaming.api.datastream.DataStream;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>
> import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
>
> import org.apache.flink.table.api.Table;
>
> import org.apache.flink.table.api.TableEnvironment;
>
> import org.apache.flink.table.api.java.BatchTableEnvironment;
>
>
>
> import org.apache.flink.table.api.java.StreamTableEnvironment;
>
> import org.apache.flink.table.sinks.CsvTableSink;
>
> import org.apache.flink.table.sinks.TableSink;
>
> import org.apache.flink.test.util.MultipleProgramsTestBase;
>
> import org.apache.flink.types.Row;
>
>
>
> import org.apache.avro.generic.IndexedRecord;
>
> import org.apache.parquet.avro.AvroSchemaConverter;
>
> import org.apache.parquet.schema.MessageType;
>
> import org.junit.BeforeClass;
>
> import org.junit.ClassRule;
>
> import org.junit.Test;
>
> import org.junit.rules.TemporaryFolder;
>
>
>
> import java.io.IOException;
>
> import java.util.ArrayList;
>
> import java.util.List;
>
> import java.util.UUID;
>
>
>
> import static org.junit.Assert.assertEquals;
>
>
>
> import org.apache.parquet.avro.AvroParquetWriter;
>
> import org.apache.parquet.hadoop.ParquetWriter;
>
>
>
>
>
> public class  ParquetTestCase extends MultipleProgramsTestBase {
>
>
>
>     private static String avroSchema = "{\n" +
>
>             "  \"name\": \"SimpleRecord\",\n" +
>
>             "  \"type\": \"record\",\n" +
>
>             "  \"fields\": [\n" +
>
>             "    { \"default\": null, \"name\": \"timestamp_edr\",
> \"type\": [ \"null\", \"long\" ]},\n" +
>
>             "    { \"default\": null, \"name\": \"id\", \"type\": [
> \"null\", \"long\" ]},\n" +
>
>             "    { \"default\": null, \"name\": \"recordType_\", \"type\":
> [ \"null\", \"string\"]}\n" +
>
>             "  ],\n" +
>
>             "  \"schema_id\": 1,\n" +
>
>             "  \"type\": \"record\"\n" +
>
>             "}";
>
>
>
>     private static final AvroSchemaConverter SCHEMA_CONVERTER = new
> AvroSchemaConverter();
>
>     private static Schema schm = new Schema.Parser().parse(avroSchema);
>
>     private static Path testPath;
>
>
>
>
>
>     public ParquetTestCase() {
>
>         super(TestExecutionMode.COLLECTION);
>
>     }
>
>
>
>
>
>     @BeforeClass
>
>     public static void setup() throws Exception {
>
>
>
>         GenericRecordBuilder genericRecordBuilder = new
> GenericRecordBuilder(schm);
>
>
>
>
>
>         List<IndexedRecord> recs = new ArrayList<>();
>
>         for (int i = 0; i < 6; i++) {
>
>             GenericRecord gr = genericRecordBuilder.set("timestamp_edr",
> System.currentTimeMillis() / 1000).set("id", 3333333L).set("recordType_",
> "Type1").build();
>
>             recs.add(gr);
>
>             GenericRecord gr2 = genericRecordBuilder.set("timestamp_edr",
> System.currentTimeMillis() / 1000).set("id", 222222L).set("recordType_",
> "Type2").build();
>
>             recs.add(gr2);
>
>         }
>
>
>
>         testPath = new Path("/tmp",  UUID.randomUUID().toString());
>
>
>
>
>
>         ParquetWriter<IndexedRecord> writer =
> AvroParquetWriter.<IndexedRecord>builder(
>
>                 new
> org.apache.hadoop.fs.Path(testPath.toUri())).withSchema(schm).build();
>
>
>
>         for (IndexedRecord record : recs) {
>
>             writer.write(record);
>
>         }
>
>         writer.close();
>
>     }
>
>
>
>
>
>     private ParquetTableSource createParquetTableSource(Path path) throws
> IOException {
>
>         MessageType nestedSchema = SCHEMA_CONVERTER.convert(schm);
>
>         ParquetTableSource parquetTableSource =
> ParquetTableSource.builder()
>
>                 .path(path.getPath())
>
>                 .forParquetSchema(nestedSchema)
>
>                 .build();
>
>         return parquetTableSource;
>
>     }
>
>
>
>     @Test
>
>     public void testScan() throws Exception {
>
>         ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>
>
>         BatchTableEnvironment batchTableEnvironment  =
> BatchTableEnvironment.create(env);
>
>         ParquetTableSource tableSource =
> createParquetTableSource(testPath);
>
>         batchTableEnvironment.registerTableSource("ParquetTable",
> tableSource);
>
>
>
>          Table tab = batchTableEnvironment.sqlQuery("select
> id,recordType_  from ParquetTable where id > 222222 ");
>
>
>
>         DataSet<Row> result = batchTableEnvironment.toDataSet(tab,
> Row.class);
>
>
>
>         result.print();
>
>
>
>     }
>
>
>
>
>
> }
>
>
>
>
>
> *From:* Peter Huang <huangzhenqiu0...@gmail.com>
> *Sent:* Monday, November 18, 2019 7:22 PM
> *To:* dev <dev@flink.apache.org>
> *Cc:* u...@flink.apache.org
> *Subject:* Re: SQL for Avro GenericRecords on Parquet
>
>
>
> Hi Hanan,
>
>
>
> Thanks for reporting the issue. Would you please attach your test code
> here? I may help to investigate.
>
>
>
>
>
>
>
> Best Regards
>
> Peter Huang
>
>
>
> On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai <hanan.yehu...@radcom.com>
> wrote:
>
> I have tried to persist Generic Avro records in a parquet file and then
> read it via ParquetTablesource – using SQL.
> Seems that the SQL I not executed properly !
>
> The persisted records are :
> Id  ,  type
> 3333333,Type1
> 222222,Type2
> 3333333,Type1
> 222222,Type2
> 3333333,Type1
> 222222,Type2
> 3333333,Type1
> 222222,Type2
> 3333333,Type1
> 222222,Type2
> 3333333,Type1
> 222222,Type2
>
> While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the
> above ( which is correct)
> Running  : "SELECT id  ,recordType_  FROM ParquetTable  where
> recordType_='Type1' "
> Will result in :
> 3333333,Type1
> 222222,Type1
> 3333333,Type1
> 222222,Type1
> 3333333,Type1
> 222222,Type1
> 3333333,Type1
> 222222,Type1
> 3333333,Type1
> 222222,Type1
> 3333333,Type1
> 222222,Type1
>
> As if the equal sign is assignment and not equal …
>
> am I doing something wrong ? is it an issue of Generic record vs
> SpecificRecords ?
>
>

Reply via email to