Hi Ivan, I don't have much experience with Avro, but extracting the schema and creating a writer for each record sounds like a pretty expensive approach. This might result in significant load and increased GC activity.
Do all records have a different schema or might it make sense to cache the writers in a weak hashmap? Best, Fabian 2017-11-07 19:51 GMT+01:00 Ivan Budincevic <ibudince...@bol.com>: > Hi all, > > > > We recently implemented a feature in our streaming flink job in which we > have a AvroParquetWriter which we build every time the overridden “write” > method from org.apache.flink.streaming.connectors.fs.Writer gets called. > We had to do this because the schema of each record is potentially > different and we have to get the schema for the AvroParquetWriter out of > the record itself first. Previously this builder was built only one time in > the “open” method and from then only the write method was called per > record. > > > > Since implementing this our job crashes with “Connection unexpectedly > closed by remote task manager ‘internal company url’. This might indicate > that the remote task manager was lost.” > > > > We did not run into any issues on our test environments, so we are > suspecting this problem occurs only on higher loads as we have on our > production environment. Unfortunately we still don’t have a proper means of > reproducing this much load on our test environment to debug. > > > > Would having the AvroParquetWriter being built on every write be causing > the problem and if so why would that be the case? > > > > Any help in getting to the bottom of the issue would be really > appreciated. Bellow there is a code snippet of the class which uses the > AvroParquetWriter. > > > > Best regards, > > Ivan Budincevic > > Software engineer, bol.com > > Netherlands > > > > package com.bol.measure.timeblocks.files; > > import com.bol.measure.timeblocks.measurement.SlottedMeasurements; > import org.apache.avro.generic.GenericRecord; > import org.apache.flink.streaming.connectors.fs.Writer; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > import org.apache.parquet.avro.AvroParquetWriter; > import org.apache.parquet.column.ParquetProperties; > import org.apache.parquet.hadoop.ParquetFileWriter; > import org.apache.parquet.hadoop.ParquetWriter; > import org.apache.parquet.hadoop.metadata.CompressionCodecName; > > import java.io.IOException; > > public class SlottedMeasurementsWriter implements > Writer<SlottedMeasurements> { > private transient ParquetWriter<GenericRecord> parquetWriter; > private boolean overwrite; > private Path path; > > public SlottedMeasurementsWriter(boolean overwrite) { > this.overwrite = overwrite; > } > > @Override > public void open(FileSystem fs, Path path) throws IOException { > this.path = path; > } > > @Override > public long flush() throws IOException { > return parquetWriter.getDataSize(); > } > > @Override > public long getPos() throws IOException { > return parquetWriter.getDataSize(); > } > > @Override > public void close() throws IOException { > parquetWriter.close(); > } > > @Override > public void write(SlottedMeasurements slot) throws IOException { > > final AvroParquetWriter.Builder<GenericRecord> writerBuilder = > AvroParquetWriter > .<GenericRecord>*builder*(path) > .withSchema(slot.getMeasurements().get(0).getSchema()) > .withCompressionCodec(CompressionCodecName.*UNCOMPRESSED*) > .withDictionaryEncoding(true) > .withWriterVersion(ParquetProperties.WriterVersion.*PARQUET_1_0*); > if (overwrite) { > writerBuilder.withWriteMode(ParquetFileWriter.Mode.*OVERWRITE*); > } > > parquetWriter = writerBuilder.build(); > > for (GenericRecord measurement : slot.getMeasurements()) { > parquetWriter.write(measurement); > } > } > > > @Override > public Writer<SlottedMeasurements> duplicate() { > return new SlottedMeasurementsWriter(this.overwrite); > } > } > > > > >