Hi, If possible, kindly share one output file to inspect, in the meanwhile you could also give a try with "org.apache.hadoop.io.compress.GzipCodec"
Regards, Ravi On Tue, Oct 22, 2019 at 7:25 PM amran dean <adfs54545...@gmail.com> wrote: > > Hello, > These changes result in the following error: > $ lzop -d part-1-0 > lzop: part-1-0: not a lzop file > > > public class BulkRecordLZOSerializer implements BulkWriter<KafkaRecord> { > > private final CompressionOutputStream compressedStream; > > public BulkRecordLZOSerializer(OutputStream stream) { > CompressionCodecFactory factory = new CompressionCodecFactory(new > Configuration()); > try { > compressedStream = > factory.getCodecByClassName("com.hadoop.compression.lzo.LzoCodec").createOutputStream(stream); > } catch (IOException e) { > throw new IllegalStateException("Unable to create LZO > OutputStream"); > } > } > > public void addElement(KafkaRecord record) throws IOException { > compressedStream.write(record.getValue()); > compressedStream.write('\n'); > } > > public void finish() throws IOException { > compressedStream.flush(); > compressedStream.finish(); > } > > public void flush() throws IOException { > compressedStream.flush(); > } > } > > > On Mon, Oct 21, 2019 at 11:17 PM Ravi Bhushan Ratnakar < > ravibhushanratna...@gmail.com> wrote: > >> Hi, >> >> Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec" >> instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line. >> >> compressedStream = >> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream); >> >> >> Regarding "lzop: unexpected end of file" problem, kindly add >> "compressedStream.flush()" in the below method to flush any leftover data >> before finishing. >> >> public void finish() throws IOException { >> compressedStream.flush(); >> compressedStream.finish(); >> } >> >> >> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/serialization/BulkWriter.html#finish-- >> >> Regards, >> Ravi >> >> On Tue, Oct 22, 2019 at 4:10 AM amran dean <adfs54545...@gmail.com> >> wrote: >> >>> Hello, >>> I'm using BulkWriter to write newline-delimited, LZO-compressed files. >>> The logic is very straightforward (See code below). >>> >>> I am experiencing an issue decompressing the created files created in >>> this manner, consistently getting "lzop: unexpected end of file". Is this >>> an issue with caller of BulkWriter? >>> >>> (As an aside), using com.hadoop.compression.lzo.LzoCodec instead results >>> in gibberish. I'm very confused what is going on. >>> >>> private final CompressionOutputStream compressedStream; >>> >>> public BulkRecordLZOSerializer(OutputStream stream) { >>> CompressionCodecFactory factory = new CompressionCodecFactory(new >>> Configuration()); >>> try { >>> compressedStream = >>> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream); >>> } catch (IOException e) { >>> throw new IllegalStateException("Unable to create LZO >>> OutputStream"); >>> } >>> } >>> >>> public void addElement(KafkaRecord record) throws IOException { >>> compressedStream.write(record.getValue()); >>> compressedStream.write('\n'); >>> } >>> >>> public void finish() throws IOException { >>> compressedStream.finish(); >>> } >>> >>> public void flush() throws IOException { >>> compressedStream.flush(); >>> } >>> >>>