Hello, These changes result in the following error: $ lzop -d part-1-0 lzop: part-1-0: not a lzop file
public class BulkRecordLZOSerializer implements BulkWriter<KafkaRecord> { private final CompressionOutputStream compressedStream; public BulkRecordLZOSerializer(OutputStream stream) { CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration()); try { compressedStream = factory.getCodecByClassName("com.hadoop.compression.lzo.LzoCodec").createOutputStream(stream); } catch (IOException e) { throw new IllegalStateException("Unable to create LZO OutputStream"); } } public void addElement(KafkaRecord record) throws IOException { compressedStream.write(record.getValue()); compressedStream.write('\n'); } public void finish() throws IOException { compressedStream.flush(); compressedStream.finish(); } public void flush() throws IOException { compressedStream.flush(); } } On Mon, Oct 21, 2019 at 11:17 PM Ravi Bhushan Ratnakar < ravibhushanratna...@gmail.com> wrote: > Hi, > > Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec" > instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line. > > compressedStream = > factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream); > > > Regarding "lzop: unexpected end of file" problem, kindly add > "compressedStream.flush()" in the below method to flush any leftover data > before finishing. > > public void finish() throws IOException { > compressedStream.flush(); > compressedStream.finish(); > } > > > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/serialization/BulkWriter.html#finish-- > > Regards, > Ravi > > On Tue, Oct 22, 2019 at 4:10 AM amran dean <adfs54545...@gmail.com> wrote: > >> Hello, >> I'm using BulkWriter to write newline-delimited, LZO-compressed files. >> The logic is very straightforward (See code below). >> >> I am experiencing an issue decompressing the created files created in >> this manner, consistently getting "lzop: unexpected end of file". Is this >> an issue with caller of BulkWriter? >> >> (As an aside), using com.hadoop.compression.lzo.LzoCodec instead results >> in gibberish. I'm very confused what is going on. >> >> private final CompressionOutputStream compressedStream; >> >> public BulkRecordLZOSerializer(OutputStream stream) { >> CompressionCodecFactory factory = new CompressionCodecFactory(new >> Configuration()); >> try { >> compressedStream = >> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream); >> } catch (IOException e) { >> throw new IllegalStateException("Unable to create LZO OutputStream"); >> } >> } >> >> public void addElement(KafkaRecord record) throws IOException { >> compressedStream.write(record.getValue()); >> compressedStream.write('\n'); >> } >> >> public void finish() throws IOException { >> compressedStream.finish(); >> } >> >> public void flush() throws IOException { >> compressedStream.flush(); >> } >> >>