Hello, I have a large compressed file which contains lines of text (delimited by \n). I would like to send the file to a topic with kafka Connect.
I tried writing a custom Converter but it's not clear how is it supposed to delimit by the \n characters. As I see it, the converter receives an array of bytes and that's all. The size of the array seems to be determined by Kafka and of course the \n character will never be at the last element, so the decompression will never work. Here's some sample code I wrote: public byte[] fromConnectData(String topic, Schema schema, Object value) { try { byte[] arr = value.toString().getBytes(); return MyLib.compress(arr); } catch (Exception ex) { throw new DataException("Failed to compress data.", ex); } } public SchemaAndValue toConnectData(String topic, byte[] data) { try { byte[] decompress = MyLib.decompress(data); return new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, new String(decompress)); } catch (Exception ex) { throw new DataException("Failed to decompress data.", ex); } } I'm able to do the decompression if I use plain Java InputStreams. Here's a naive implementation, it works. FileInputStream is = new FileInputStream(new File("input_file.txt")); ByteArrayOutputStream buffer = new ByteArrayOutputStream(); int n = -1; while ((n = is.read()) != -1) { buffer.write(n); if (n == '\n') { String str = decompress(buffer.toByteArray()); kafkaProducer.send(new ProducerRecord<>("topic", str)); buffer.reset(); } } How to write a connector which does the same as above? Thanks, Csaba