tsreaper commented on a change in pull request #17520:
URL: https://github.com/apache/flink/pull/17520#discussion_r738223176



##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AbstractAvroBulkFormat.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.IteratorResultIterator;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.utils.FSDataInputStreamWrapper;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/** Provides a {@link BulkFormat} for Avro records. */
+@Internal
+public abstract class AbstractAvroBulkFormat<A, T, SplitT extends 
FileSourceSplit>
+        implements BulkFormat<T, SplitT> {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final Schema readerSchema;
+
+    protected AbstractAvroBulkFormat(Schema readerSchema) {
+        this.readerSchema = readerSchema;
+    }
+
+    @Override
+    public AvroReader createReader(Configuration config, SplitT split) throws 
IOException {
+        open(split);
+        return createReader(split);
+    }
+
+    @Override
+    public AvroReader restoreReader(Configuration config, SplitT split) throws 
IOException {
+        open(split);
+        return createReader(split);
+    }
+
+    @Override
+    public boolean isSplittable() {
+        return true;
+    }
+
+    private AvroReader createReader(SplitT split) throws IOException {
+        long end = split.offset() + split.length();
+        if (split.getReaderPosition().isPresent()) {
+            CheckpointedPosition position = split.getReaderPosition().get();
+            return new AvroReader(
+                    split.path(),
+                    split.offset(),
+                    end,
+                    position.getOffset(),
+                    position.getRecordsAfterOffset());
+        } else {
+            return new AvroReader(split.path(), split.offset(), end, -1, 0);
+        }
+    }
+
+    protected void open(SplitT split) {}
+
+    protected abstract T convert(A record);
+
+    protected abstract A createReusedAvroRecord();
+
+    private class AvroReader implements BulkFormat.Reader<T> {
+
+        private final DataFileReader<A> reader;
+
+        private final long end;
+        private final Pool<A> pool;
+
+        private long currentBlockStart;
+        private long currentRecordsToSkip;
+
+        private AvroReader(Path path, long offset, long end, long blockStart, 
long recordsToSkip)
+                throws IOException {
+            A reuse = createReusedAvroRecord();
+
+            this.reader = createReaderFromPath(path);
+            if (blockStart >= 0) {
+                reader.seek(blockStart);
+            } else {
+                reader.sync(offset);
+            }
+            for (int i = 0; i < recordsToSkip; i++) {
+                reader.next(reuse);
+            }
+
+            this.end = end;
+            this.pool = new Pool<>(1);
+            this.pool.add(reuse);
+
+            this.currentBlockStart = reader.previousSync();
+            this.currentRecordsToSkip = recordsToSkip;
+        }
+
+        private DataFileReader<A> createReaderFromPath(Path path) throws 
IOException {
+            FileSystem fileSystem = path.getFileSystem();
+            DatumReader<A> datumReader = new GenericDatumReader<>(null, 
readerSchema);
+            SeekableInput in =
+                    new FSDataInputStreamWrapper(
+                            fileSystem.open(path), 
fileSystem.getFileStatus(path).getLen());
+            return (DataFileReader<A>) DataFileReader.openReader(in, 
datumReader);
+        }
+
+        @Nullable
+        @Override
+        public RecordIterator<T> readBatch() throws IOException {
+            A reuse;
+            try {
+                reuse = pool.pollEntry();

Review comment:
       This is not the case. There are two steps for avro readers to perform 
before producing a record.
   1. Avro reader reads a block from the file, decompress it and store inflated 
bytes in memory.
   2. Avro reader deserialize the inflated bytes in memory one by one to 
produce each record.
   
   What I have done is to move step 1 into the fetcher thread, while step 2 is 
still in the reader thread. This is OK because the actual heavy disk IO is 
performed only in step 1. `ParquetVectorizedInputFormat` is also doing the same 
thing. It returns a `VectorizedColumnBatch` for each batch and deserialize each 
column when they are read by the readers. So
   
   > Now that you read on the producer thread, you don't need the sync 
primitives anymore
   
   We still need the reader to be open because inflated bytes are stored there, 
and only the avro readers know how to deserialize them.
   
   > now you pay the thread to thread overhead for each record
   
   I'm still passing batches between threads instead of records. Records are 
deserialized from batches in reader threads in step 2.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to