[ 
https://issues.apache.org/jira/browse/FLINK-35324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SuDewei updated FLINK-35324:
----------------------------
    Description: 
AvroFormatFactory.java#createDecodingFormat would return a 
ProjectableDecodingFormat,which means avro format deserializer could perform 
the projection pushdown. However, it is found in practice that the Avro format 
seems unable to perform projection pushdown for specific fields. 

For example, there are such schema and sample data in Kafka:
{code:java}
-- schema
CREATE TABLE kafka (
   `user_id` BIGINT,
   `name` STRING,
    `timestamp` TIMESTAMP(3) METADATA,
    `event_id` BIGINT,
    `payload` STRING not null
) WITH (
     'connector' = 'kafka',
     ...
)
 
 -- sample data like    
(3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 'payload 3') {code}
The data can be successfully deserialized in this way:
{code:java}
Projection physicalProjections = Projection.of( new int[] {0,1,2} );

DataType physicalFormatDataType = 
physicalProjections.project(this.physicalDataType);

(DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
    .createRuntimeDecoder(context, this.physicalDataType, 
physicalProjections.toNestedIndexes()); {code}
The data would be:
{code:java}
+I(3,name 3,102) {code}
However, when the projection index is replaced with values that do not start 
from 0, the data cannot be successfully parsed, for example:
{code:java}
Projection physicalProjections = Projection.of( new int[] {1,2} );

DataType physicalFormatDataType = 
physicalProjections.project(this.physicalDataType);

(DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
    .createRuntimeDecoder(context, this.physicalDataType, 
physicalProjections.toNestedIndexes()); {code}
The exception would be like:
{code:java}
Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
        at 
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
        at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
        at 
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
        at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
        ... 19 more {code}
It seems that Avro format does not support projection pushdown for arbitrary 
fields. Is my understanding correct?

If this is the case, then I think Avro format should not implement the 
ProjectableDecodingFormat interface , since it can only provide very limited 
pushdown capabilities.

This problem may block the connector implementing the projection pushdown 
capability since the connector would determine whether projection pushdown can 
be performed by judging whether the format has implemented the 
ProjectableDecodingFormat interface or not.

 

  was:
AvroFormatFactory.java#createDecodingFormat would return a 
ProjectableDecodingFormat,which means avro format deserializer could perform 
the projection pushdown. However, it is found in practice that the Avro format 
seems unable to perform projection pushdown for specific fields. 

For example, there are such schema and sample data in Kafka:
{code:java}
-- schema
CREATE TABLE kafka (
   `user_id` BIGINT,
   `name` STRING,
    `timestamp` TIMESTAMP(3) METADATA,
    `event_id` BIGINT,
    `payload` STRING not null
     ) WITH (
     'connector' = 'kafka',
     ...
     )
 
 -- sample data like    
(3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 'payload 3') {code}
The data can be successfully deserialized in this way:
{code:java}
Projection physicalProjections = Projection.of( new int[] {0,1,2} );

DataType physicalFormatDataType = 
physicalProjections.project(this.physicalDataType);

(DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
    .createRuntimeDecoder(context, this.physicalDataType, 
physicalProjections.toNestedIndexes()); {code}
The data would be:
{code:java}
+I(3,name 3,102) {code}
However, when the projection index is replaced with values that do not start 
from 0, the data cannot be successfully parsed, for example:
{code:java}
Projection physicalProjections = Projection.of( new int[] {1,2} );

DataType physicalFormatDataType = 
physicalProjections.project(this.physicalDataType);

(DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
    .createRuntimeDecoder(context, this.physicalDataType, 
physicalProjections.toNestedIndexes()); {code}
The exception would be like:
{code:java}
Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
        at 
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
        at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
        at 
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
        at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
        ... 19 more {code}
It seems that Avro format does not support projection pushdown for arbitrary 
fields. Is my understanding correct?

If this is the case, then I think Avro format should not implement the 
ProjectableDecodingFormat interface , since it can only provide very limited 
pushdown capabilities.

This problem may block the connector implementing the projection pushdown 
capability since the connector would determine whether projection pushdown can 
be performed by judging whether the format has implemented the 
ProjectableDecodingFormat interface or not.

 


> Avro format can not perform projection pushdown for specific fields
> -------------------------------------------------------------------
>
>                 Key: FLINK-35324
>                 URL: https://issues.apache.org/jira/browse/FLINK-35324
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.17.0
>            Reporter: SuDewei
>            Priority: Blocker
>
> AvroFormatFactory.java#createDecodingFormat would return a 
> ProjectableDecodingFormat,which means avro format deserializer could perform 
> the projection pushdown. However, it is found in practice that the Avro 
> format seems unable to perform projection pushdown for specific fields. 
> For example, there are such schema and sample data in Kafka:
> {code:java}
> -- schema
> CREATE TABLE kafka (
>    `user_id` BIGINT,
>    `name` STRING,
>     `timestamp` TIMESTAMP(3) METADATA,
>     `event_id` BIGINT,
>     `payload` STRING not null
> ) WITH (
>      'connector' = 'kafka',
>      ...
> )
>  
>  -- sample data like    
> (3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 'payload 3') {code}
> The data can be successfully deserialized in this way:
> {code:java}
> Projection physicalProjections = Projection.of( new int[] {0,1,2} );
> DataType physicalFormatDataType = 
> physicalProjections.project(this.physicalDataType);
> (DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
>     .createRuntimeDecoder(context, this.physicalDataType, 
> physicalProjections.toNestedIndexes()); {code}
> The data would be:
> {code:java}
> +I(3,name 3,102) {code}
> However, when the projection index is replaced with values that do not start 
> from 0, the data cannot be successfully parsed, for example:
> {code:java}
> Projection physicalProjections = Projection.of( new int[] {1,2} );
> DataType physicalFormatDataType = 
> physicalProjections.project(this.physicalDataType);
> (DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
>     .createRuntimeDecoder(context, this.physicalDataType, 
> physicalProjections.toNestedIndexes()); {code}
> The exception would be like:
> {code:java}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
>         at 
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
>         at 
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
>         at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
>         at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
>         at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
>         at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
>         at 
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
>         at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>         ... 19 more {code}
> It seems that Avro format does not support projection pushdown for arbitrary 
> fields. Is my understanding correct?
> If this is the case, then I think Avro format should not implement the 
> ProjectableDecodingFormat interface , since it can only provide very limited 
> pushdown capabilities.
> This problem may block the connector implementing the projection pushdown 
> capability since the connector would determine whether projection pushdown 
> can be performed by judging whether the format has implemented the 
> ProjectableDecodingFormat interface or not.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to