Hi Ivan,

I don't have much experience with Avro, but extracting the schema and
creating a writer for each record sounds like a pretty expensive approach.
This might result in significant load and increased GC activity.

Do all records have a different schema or might it make sense to cache the
writers in a weak hashmap?

Best, Fabian


2017-11-07 19:51 GMT+01:00 Ivan Budincevic <ibudince...@bol.com>:

> Hi all,
>
>
>
> We recently implemented a feature in our streaming flink job in which we
> have a AvroParquetWriter which we build every time the overridden “write”
> method from org.apache.flink.streaming.connectors.fs.Writer gets called.
> We had to do this because the schema of each record is potentially
> different and we have to get the schema for the AvroParquetWriter out of
> the record itself first. Previously this builder was built only one time in
> the “open” method and from then only the write method was called per
> record.
>
>
>
> Since implementing this our job crashes with “Connection unexpectedly
> closed by remote task manager ‘internal company url’. This might indicate
> that the remote task manager was lost.”
>
>
>
> We did not run into any issues on our test environments, so we are
> suspecting this problem occurs only on higher loads as we have on our
> production environment. Unfortunately we still don’t have a proper means of
> reproducing this much load on our test environment to debug.
>
>
>
> Would having the AvroParquetWriter being built on every write be causing
> the problem and if so why would that be the case?
>
>
>
> Any help in getting to the bottom of the issue would be really
> appreciated. Bellow there is a code snippet of the class which uses the
> AvroParquetWriter.
>
>
>
> Best regards,
>
> Ivan Budincevic
>
> Software engineer, bol.com
>
> Netherlands
>
>
>
> package com.bol.measure.timeblocks.files;
>
> import com.bol.measure.timeblocks.measurement.SlottedMeasurements;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.flink.streaming.connectors.fs.Writer;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.parquet.avro.AvroParquetWriter;
> import org.apache.parquet.column.ParquetProperties;
> import org.apache.parquet.hadoop.ParquetFileWriter;
> import org.apache.parquet.hadoop.ParquetWriter;
> import org.apache.parquet.hadoop.metadata.CompressionCodecName;
>
> import java.io.IOException;
>
> public class SlottedMeasurementsWriter implements
> Writer<SlottedMeasurements> {
>   private transient ParquetWriter<GenericRecord> parquetWriter;
>   private boolean overwrite;
>   private Path path;
>
>   public SlottedMeasurementsWriter(boolean overwrite) {
>     this.overwrite = overwrite;
>   }
>
>   @Override
>   public void open(FileSystem fs, Path path) throws IOException {
>     this.path = path;
>   }
>
>   @Override
>   public long flush() throws IOException {
>     return parquetWriter.getDataSize();
>   }
>
>   @Override
>   public long getPos() throws IOException {
>     return parquetWriter.getDataSize();
>   }
>
>   @Override
>   public void close() throws IOException {
>     parquetWriter.close();
>   }
>
>   @Override
>   public void write(SlottedMeasurements slot) throws IOException {
>
>     final AvroParquetWriter.Builder<GenericRecord> writerBuilder =
>       AvroParquetWriter
>         .<GenericRecord>*builder*(path)
>         .withSchema(slot.getMeasurements().get(0).getSchema())
>         .withCompressionCodec(CompressionCodecName.*UNCOMPRESSED*)
>         .withDictionaryEncoding(true)
>         .withWriterVersion(ParquetProperties.WriterVersion.*PARQUET_1_0*);
>     if (overwrite) {
>       writerBuilder.withWriteMode(ParquetFileWriter.Mode.*OVERWRITE*);
>     }
>
>     parquetWriter = writerBuilder.build();
>
>     for (GenericRecord measurement : slot.getMeasurements()) {
>       parquetWriter.write(measurement);
>     }
>   }
>
>
>   @Override
>   public Writer<SlottedMeasurements> duplicate() {
>     return new SlottedMeasurementsWriter(this.overwrite);
>   }
> }
>
>
>
>
>

Reply via email to