wuwenchi created FLINK-33821:
--------------------------------

             Summary: ArrowSerializer$finishCurrentBatch consumes too much time
                 Key: FLINK-33821
                 URL: https://issues.apache.org/jira/browse/FLINK-33821
             Project: Flink
          Issue Type: Technical Debt
          Components: API / Python
    Affects Versions: 1.18.0
            Reporter: wuwenchi


We convert the data into arrow format through flink and send it to doris.
Data convertion likes this: RowData --> arrow --> doris.
But during testing, we found that the `ArrowSerializer` provided by 
flink-python consumes a lot of time in the `finishCurrentBatch` function.
A total of 1.4G parquet files, the overall conversion time is 70 seconds, but 
`finishCurrentBatch` takes a total of 40 seconds(especially `writeBatch` cost 
39 seconds in `finishCurrentBatch`).

 

So, we compare with spark, data convertion likes this: InternalRow --> arrow 
--> doris.
Using the same parquet file, the overall conversion time only takes 35 seconds, 
and `writeBatch` only cost 10 seconds.

 

In spark, we use `org.apache.spark.sql.execution.arrow.ArrowWriter` to convert 
`InternalRow` into arrowVector, and then serialize arrowVector into binary 
through `org.apache.arrow.vector.ipc.ArrowStreamWriter$writeBatch`.
Simple code like this:

 
{code:java}
            ArrowWriter arrowWriter = ArrowWriter.create(vectorSchemaRoot);

            // --- phase1: InternalRow to arrowVector
            while (....) {
                arrowWriter.write(iterator.next());
            }
            arrowWriter.finish();

            // --- phase2: arrowVector to binary
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, 
null, out);
            writer.writeBatch();
            writer.end();

            // --- phase3: get binary
            out.toByteArray(); {code}
 

In flink, we use 
`org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer`. This class 
is very useful, not only includes the conversion of RowData to arrowVector, but 
also the serialization of arrowVector to binary.
Simple code like this:


{code:java}
            arrowSerializer = new ArrowSerializer(rowType, rowType);
            outputStream = new ByteArrayOutputStream();
            arrowSerializer.open(new ByteArrayInputStream(new byte[0]), 
outputStream);

            // --- phase1: RowData to arrowVector
            while(....) {
                arrowSerializer.write(rowData);
            }

            // --- phase2: arrowVector to binary
            arrowSerializer.finishCurrentBatch();

            // --- phase3: get binary
            outputStream.toByteArray();
            outputStream.reset(); {code}
In phase 1 and phase 3, the time of flink and spark is basically the same. In 
phase 2, spark's writeBatch function took 10 seconds, but the writeBatch 
function in flink's finishCurrentBatch took 40 seconds.

Is there any flink related configuration that I am missing? Or, did I use it 
wrong somewhere in flink?

 

Looking forward to your reply! Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to