We have a system where the Kafka partition a message should go into is a 
function of a value in the message. Often, it’s value % # partitions, but for 
some values it’s not - it’s a specified list of partitions that changes over 
time. Our “simple Java library” that produces messages for this system also has 
a background thread that periodically polls a HTTP endpoint (at a rate of 
1/minute as its default) to refresh that list of special cases.

It’s easy to create a FlinkKafkaPartitioner that does the mod operation; what 
I’m not so sure about is how to get this polling operation into the 
partitioner. I’m about to try it the obvious way (create a background thread 
that polls the URL and updates the partition map), but I wonder if that’s 
actually going to cause a bunch of problems for the Flink runtime.

Here’s the code that I have right now:
public class EventInsertPartitioner extends KafkaPartitioner<Tuple2<Long, 
String>> {
    private final String partitionerURL;
    private final long updateIntervalInMillis;
    private Map<Long, List<Integer>> partitionMap;
    private ScheduledExecutorService executor;

    public EventInsertPartitioner(String partitionerURL, long 
updateIntervalInMillis) {
        this.partitionerURL = partitionerURL;
        this.updateIntervalInMillis = updateIntervalInMillis;
        this.partitionMap = new HashMap<>();
    }

    @Override
    public void open(int parallelInstanceId, int parallelInstances, int[] 
partitions) {
        executor = Executors.newScheduledThreadPool(1);
        executor.scheduleAtFixedRate(
                () -> updatePartitionMapRunnable(),
                updateIntervalInMillis,
                updateIntervalInMillis,
                TimeUnit.MILLISECONDS);

    }

    private void updatePartitionMapRunnable() {
        // Make synchronous request to partitionerURL
        // This is a simple JSON that matches our data
        String response = "{1:[1,2,3],2:[2]}";
        // Replace current partitionMap with new HashMap from the response
        this.partitionMap = convertResponseToMap(response); 
        // Replacing the current value of partitionMap with the updated version 
doesn't
        // require synchronization
    }

    private Map<Long, List<Integer>> convertResponseToMap(String response) {
        Map<Long, List<Integer>> hashMap = new HashMap<>();
        // Convert response to JSON structure and just use that?
        // or Iterate and add to local hashMap
        return hashMap;
    }

    @Override
    public int partition(Tuple2<Long, String> next, byte[] serializedKey, 
byte[] serializedValue, int numPartitions) {
        long myKey = next.f0;
        
        if (partitionMap.containsKey(myKey)) {
            List<Integer> partitions = partitionMap.get(myKey);
            myKey = 
partitions.get(ThreadLocalRandom.current().nextInt(partitions.size()));
        }
        
        return (int)(myKey % numPartitions);
    }
}
Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcroc...@newrelic.com
M: +1 630 363 8835

Reply via email to