Hi,
Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec"
instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line.
compressedStream =
factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
Regarding "lzop: unexpected end of file" problem, kindly add
"compressedStream.flush()" in the below method to flush any leftover data
before finishing.
public void finish() throws IOException {
compressedStream.flush();
compressedStream.finish();
}
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/serialization/BulkWriter.html#finish--
Regards,
Ravi
On Tue, Oct 22, 2019 at 4:10 AM amran dean <[email protected]> wrote:
> Hello,
> I'm using BulkWriter to write newline-delimited, LZO-compressed files. The
> logic is very straightforward (See code below).
>
> I am experiencing an issue decompressing the created files created in this
> manner, consistently getting "lzop: unexpected end of file". Is this an
> issue with caller of BulkWriter?
>
> (As an aside), using com.hadoop.compression.lzo.LzoCodec instead results
> in gibberish. I'm very confused what is going on.
>
> private final CompressionOutputStream compressedStream;
>
> public BulkRecordLZOSerializer(OutputStream stream) {
> CompressionCodecFactory factory = new CompressionCodecFactory(new
> Configuration());
> try {
> compressedStream =
> factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream);
> } catch (IOException e) {
> throw new IllegalStateException("Unable to create LZO OutputStream");
> }
> }
>
> public void addElement(KafkaRecord record) throws IOException {
> compressedStream.write(record.getValue());
> compressedStream.write('\n');
> }
>
> public void finish() throws IOException {
> compressedStream.finish();
> }
>
> public void flush() throws IOException {
> compressedStream.flush();
> }
>
>