We have a system where the Kafka partition a message should go into is a function of a value in the message. Often, it’s value % # partitions, but for some values it’s not - it’s a specified list of partitions that changes over time. Our “simple Java library” that produces messages for this system also has a background thread that periodically polls a HTTP endpoint (at a rate of 1/minute as its default) to refresh that list of special cases.
It’s easy to create a FlinkKafkaPartitioner that does the mod operation; what I’m not so sure about is how to get this polling operation into the partitioner. I’m about to try it the obvious way (create a background thread that polls the URL and updates the partition map), but I wonder if that’s actually going to cause a bunch of problems for the Flink runtime. Here’s the code that I have right now: public class EventInsertPartitioner extends KafkaPartitioner<Tuple2<Long, String>> { private final String partitionerURL; private final long updateIntervalInMillis; private Map<Long, List<Integer>> partitionMap; private ScheduledExecutorService executor; public EventInsertPartitioner(String partitionerURL, long updateIntervalInMillis) { this.partitionerURL = partitionerURL; this.updateIntervalInMillis = updateIntervalInMillis; this.partitionMap = new HashMap<>(); } @Override public void open(int parallelInstanceId, int parallelInstances, int[] partitions) { executor = Executors.newScheduledThreadPool(1); executor.scheduleAtFixedRate( () -> updatePartitionMapRunnable(), updateIntervalInMillis, updateIntervalInMillis, TimeUnit.MILLISECONDS); } private void updatePartitionMapRunnable() { // Make synchronous request to partitionerURL // This is a simple JSON that matches our data String response = "{1:[1,2,3],2:[2]}"; // Replace current partitionMap with new HashMap from the response this.partitionMap = convertResponseToMap(response); // Replacing the current value of partitionMap with the updated version doesn't // require synchronization } private Map<Long, List<Integer>> convertResponseToMap(String response) { Map<Long, List<Integer>> hashMap = new HashMap<>(); // Convert response to JSON structure and just use that? // or Iterate and add to local hashMap return hashMap; } @Override public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { long myKey = next.f0; if (partitionMap.containsKey(myKey)) { List<Integer> partitions = partitionMap.get(myKey); myKey = partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); } return (int)(myKey % numPartitions); } } Ron — Ron Crocker Principal Engineer & Architect ( ( •)) New Relic rcroc...@newrelic.com M: +1 630 363 8835