[ 
https://issues.apache.org/jira/browse/NIFI-5805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16680703#comment-16680703
 ] 

ASF GitHub Bot commented on NIFI-5805:
--------------------------------------

Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/3160#discussion_r232119197
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
 ---
    @@ -33,24 +28,34 @@
     import org.apache.nifi.serialization.record.Record;
     import org.apache.nifi.serialization.record.RecordSchema;
     
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +
     public class WriteAvroResultWithExternalSchema extends 
AbstractRecordSetWriter {
         private final SchemaAccessWriter schemaAccessWriter;
         private final RecordSchema recordSchema;
         private final Schema avroSchema;
         private final BinaryEncoder encoder;
         private final OutputStream buffered;
         private final DatumWriter<GenericRecord> datumWriter;
    +    private final BlockingQueue<BinaryEncoder> recycleQueue;
     
         public WriteAvroResultWithExternalSchema(final Schema avroSchema, 
final RecordSchema recordSchema,
    -        final SchemaAccessWriter schemaAccessWriter, final OutputStream 
out) throws IOException {
    +        final SchemaAccessWriter schemaAccessWriter, final OutputStream 
out, final BlockingQueue<BinaryEncoder> recycleQueue) {
             super(out);
             this.recordSchema = recordSchema;
             this.schemaAccessWriter = schemaAccessWriter;
             this.avroSchema = avroSchema;
             this.buffered = new BufferedOutputStream(out);
    +        this.recycleQueue = recycleQueue;
    +
    +        BinaryEncoder reusableEncoder = recycleQueue.poll();
    +        encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, 
reusableEncoder);
    --- End diff --
    
    Probably, we should add a debug log here to provide information whether 
current number of pool size fits the actual usage. If there are more null 
reusableEncorder and user want to improve performance, then they can increase 
pool size ... etc.


> Avro Record Writer service creates byte buffer for every Writer created
> -----------------------------------------------------------------------
>
>                 Key: NIFI-5805
>                 URL: https://issues.apache.org/jira/browse/NIFI-5805
>             Project: Apache NiFi
>          Issue Type: Bug
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>
> When we use the Avro RecordSet Writer, and do not embed the schema, the 
> Writer uses the Avro BinaryEncoder object to serialize the data. This object 
> can be initialized, but instead we create a new one for each writer. This 
> results in creating a new 64 KB byte[] each time. When we are writing many 
> records to a given FlowFile, this is not a big deal. However, when used in 
> PublishKafkaRecord or similar processors, where a new writer must be created 
> for every Record, this can have a very significant performance impact.
> An improvement would be to have the user configure the maximum number of 
> BinaryEncoder objects to pool and then use a simple pooling mechanism to 
> reuse these objects.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to