Hi all, We recently implemented a feature in our streaming flink job in which we have a AvroParquetWriter which we build every time the overridden “write” method from org.apache.flink.streaming.connectors.fs.Writer gets called. We had to do this because the schema of each record is potentially different and we have to get the schema for the AvroParquetWriter out of the record itself first. Previously this builder was built only one time in the “open” method and from then only the write method was called per record.
Since implementing this our job crashes with “Connection unexpectedly closed by remote task manager ‘internal company url’. This might indicate that the remote task manager was lost.” We did not run into any issues on our test environments, so we are suspecting this problem occurs only on higher loads as we have on our production environment. Unfortunately we still don’t have a proper means of reproducing this much load on our test environment to debug. Would having the AvroParquetWriter being built on every write be causing the problem and if so why would that be the case? Any help in getting to the bottom of the issue would be really appreciated. Bellow there is a code snippet of the class which uses the AvroParquetWriter. Best regards, Ivan Budincevic Software engineer, bol.com Netherlands package com.bol.measure.timeblocks.files; import com.bol.measure.timeblocks.measurement.SlottedMeasurements; import org.apache.avro.generic.GenericRecord; import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.io.IOException; public class SlottedMeasurementsWriter implements Writer<SlottedMeasurements> { private transient ParquetWriter<GenericRecord> parquetWriter; private boolean overwrite; private Path path; public SlottedMeasurementsWriter(boolean overwrite) { this.overwrite = overwrite; } @Override public void open(FileSystem fs, Path path) throws IOException { this.path = path; } @Override public long flush() throws IOException { return parquetWriter.getDataSize(); } @Override public long getPos() throws IOException { return parquetWriter.getDataSize(); } @Override public void close() throws IOException { parquetWriter.close(); } @Override public void write(SlottedMeasurements slot) throws IOException { final AvroParquetWriter.Builder<GenericRecord> writerBuilder = AvroParquetWriter .<GenericRecord>builder(path) .withSchema(slot.getMeasurements().get(0).getSchema()) .withCompressionCodec(CompressionCodecName.UNCOMPRESSED) .withDictionaryEncoding(true) .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0); if (overwrite) { writerBuilder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE); } parquetWriter = writerBuilder.build(); for (GenericRecord measurement : slot.getMeasurements()) { parquetWriter.write(measurement); } } @Override public Writer<SlottedMeasurements> duplicate() { return new SlottedMeasurementsWriter(this.overwrite); } }