HI Peter.  Thanks.
This is my code .  I used one of the parquet / avro tests as a reference.

The code will fail on
Test testScan(ParquetTestCase) failed with:
java.lang.UnsupportedOperationException
               at 
org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)
               at 
org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)
               at 
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)


CODE :

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBuilderBase;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;

import org.apache.flink.formats.parquet.ParquetTableSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.Row;

import org.apache.avro.generic.IndexedRecord;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.junit.Assert.assertEquals;

import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;


public class  ParquetTestCase extends MultipleProgramsTestBase {

    private static String avroSchema = "{\n" +
            "  \"name\": \"SimpleRecord\",\n" +
            "  \"type\": \"record\",\n" +
            "  \"fields\": [\n" +
            "    { \"default\": null, \"name\": \"timestamp_edr\", \"type\": [ 
\"null\", \"long\" ]},\n" +
            "    { \"default\": null, \"name\": \"id\", \"type\": [ \"null\", 
\"long\" ]},\n" +
            "    { \"default\": null, \"name\": \"recordType_\", \"type\": [ 
\"null\", \"string\"]}\n" +
            "  ],\n" +
            "  \"schema_id\": 1,\n" +
            "  \"type\": \"record\"\n" +
            "}";

    private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
    private static Schema schm = new Schema.Parser().parse(avroSchema);
    private static Path testPath;


    public ParquetTestCase() {
        super(TestExecutionMode.COLLECTION);
    }


    @BeforeClass
    public static void setup() throws Exception {

        GenericRecordBuilder genericRecordBuilder = new 
GenericRecordBuilder(schm);


        List<IndexedRecord> recs = new ArrayList<>();
        for (int i = 0; i < 6; i++) {
            GenericRecord gr = genericRecordBuilder.set("timestamp_edr", 
System.currentTimeMillis() / 1000).set("id", 3333333L).set("recordType_", 
"Type1").build();
            recs.add(gr);
            GenericRecord gr2 = genericRecordBuilder.set("timestamp_edr", 
System.currentTimeMillis() / 1000).set("id", 222222L).set("recordType_", 
"Type2").build();
            recs.add(gr2);
        }

        testPath = new Path("/tmp",  UUID.randomUUID().toString());


        ParquetWriter<IndexedRecord> writer = 
AvroParquetWriter.<IndexedRecord>builder(
                new 
org.apache.hadoop.fs.Path(testPath.toUri())).withSchema(schm).build();

        for (IndexedRecord record : recs) {
            writer.write(record);
        }
        writer.close();
    }


    private ParquetTableSource createParquetTableSource(Path path) throws 
IOException {
        MessageType nestedSchema = SCHEMA_CONVERTER.convert(schm);
        ParquetTableSource parquetTableSource = ParquetTableSource.builder()
                .path(path.getPath())
                .forParquetSchema(nestedSchema)
                .build();
        return parquetTableSource;
    }

    @Test
    public void testScan() throws Exception {
        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

        BatchTableEnvironment batchTableEnvironment  =    
BatchTableEnvironment.create(env);
        ParquetTableSource tableSource = createParquetTableSource(testPath);
        batchTableEnvironment.registerTableSource("ParquetTable", tableSource);

         Table tab = batchTableEnvironment.sqlQuery("select id,recordType_  
from ParquetTable where id > 222222 ");

        DataSet<Row> result = batchTableEnvironment.toDataSet(tab, Row.class);

        result.print();

    }


}


From: Peter Huang <huangzhenqiu0...@gmail.com>
Sent: Monday, November 18, 2019 7:22 PM
To: dev <dev@flink.apache.org>
Cc: u...@flink.apache.org
Subject: Re: SQL for Avro GenericRecords on Parquet

Hi Hanan,

Thanks for reporting the issue. Would you please attach your test code here? I 
may help to investigate.



Best Regards
Peter Huang

On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai 
<hanan.yehu...@radcom.com<mailto:hanan.yehu...@radcom.com>> wrote:
I have tried to persist Generic Avro records in a parquet file and then read it 
via ParquetTablesource – using SQL.
Seems that the SQL I not executed properly !

The persisted records are :
Id  ,  type
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2

While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the above ( 
which is correct)
Running  : "SELECT id  ,recordType_  FROM ParquetTable  where 
recordType_='Type1' "
Will result in :
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1

As if the equal sign is assignment and not equal …

am I doing something wrong ? is it an issue of Generic record vs 
SpecificRecords ?

Reply via email to