We have experienced a production issue on Kafka 0.11.0.1 where we believe the contents of the connect-offsets topic (and hence the in-memory map in KafkaOffsetBackingStore) grow without bound for Source Connectors that make use of source partitions with a non-bounded key space.
(1) Compacted Topics for Source Offsets The Connect documentation specifies the use of compacted topics for source offsets: offset.storage.topic (default connect-offsets) - topic to use for storing offsets; this topic should have many partitions, be replicated, and be configured for compaction (2) Source Partitions in an Infinite Key Space In addition, there is example source code for a file-oriented source connector that uses the filename as the source partition (see line 8 below). We are making use of source connectors that set source partitions in this manner. 1 @Override 2 public List<SourceRecord> poll() throws InterruptedException { 3 try { 4 ArrayList<SourceRecord> records = new ArrayList<>(); 5 while (streamValid(stream) && records.isEmpty()) { 6 LineAndOffset line = readToNextLine(stream); 7 if (line != null) { 8 Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename); 9 Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset); 10 records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line)); 11 } else { 12 Thread.sleep(1); 13 } 14 } 15 return records; 16 } catch (IOException e) { 17 // Underlying stream was killed, probably as a result of calling stop. Allow to return 18 // null, and driving thread will handle any shutdown if necessary. 19 } 20 return null; 21 } In situations where there is, say, a timestamp in the filenames, then you will create a new source partition for each file - and the number of source partitions will grow without bound. So the combination of (1) and (2) seems problematic, and indeed, our Connect Workers have shown slow, steady growth in their memory usage. One other note - changing the topic from "compact" to "delete" seems wrong as well - we have other connectors (e.g., RDBMS connectors) that use a small finite set of source partitions. Do you have any advice for me? Did I miss something obvious? Best regards, Dallas Wrege Evolving Systems, Inc. This message may contain confidential information. If you are not the intended recipient, please delete this message and contact the sender. Evolving Systems, Inc. is a Delaware corporation, Number 2580274. Corporate Address: 9800 Pyramid Court, Suite 400, Englewood, Colorado 80112. www.evolving.com.