Hi,

I am pretty new to flink and I don't know what will be the best way to deal with the following use case:

 * as an input, I recieve some alerts from a kafka topic
     o an alert is linked to a network resource (like router-1,
       router-2, switch-1, switch-2, ...)
     o so an alert has two main information (the alert id and the
       resource id of the resource on which this alert has been raised)
 * then I need to do a query to an external system in order to enrich
   the alert with additional information on the resource


(A "natural" candidate for the key on this stream will be the resource id)

The issue I have is that regarding the query to the external system:

 * I do not want to do 1 query per resource id
 * I want to do a small number of queries in parallel (for example 4
   queries in parallel every 500ms), each query requesting the external
   system for several alerts linked to several resource id

Currently, I don't know what will be the best way to deal with that:

 * I can key my stream on the resource id and then define a processing
   time window of 500ms and when the trigger is ok, then I do my query
     o by doing so, I will "group" several alerts in a single query,
       but they will all be linked to the same resource.
     o so I will do 1 query per resource id (which will be too much in
       my use case)
 * I can also do a windowAll on a non keyed stream
     o by doing so, I will "group" together alerts from different
       resource ids, but from what I've read in such a case the
       parallelism will always be one.
     o so in this case, I will only do 1 query whereas I'd like to have
       some parallelism

I am thinking that a way to deal with that will be:

 * define the resource id as the key of stream and put a parallelism of 4
 * and then having a way to do a windowAll on this keyed stream
     o which is that, on a given operator instance, I will "group" on
       the same window all the keys (ie all the resource ids) managed
       by this operator instance
     o with a parallelism of 4, I will do 4 queries in parallel (1 per
       operator instance, and each query will be for several alerts
       linked to several resource ids)

But after looking at the documentation, I cannot see this ability (having a windowAll on a keyed stream).

Am I missing something?

What will be the best way to deal with such a use case?


I've tried for example to review my key and to do something like "resourceId.hahsCode%<max nb of queries in parallel>" and then to use a time window.

In my example above, the <max nb of queries in parallel> will be 4. And all my keys will be 0, 1, 2 or 3.

The issue with this approach is that due to the way the operatorIdx is computed based on the key, it does not distribute well my processing:

 * when this partitioning logic from the "KeyGroupRangeAssignment"
   class is applied
     o //**
             * Assigns the given key to a parallel operator index.
             *
             * @param key the key to assign
             * @param maxParallelism the maximum supported parallelism,
       aka the number of key-groups.
             * @param parallelism the current parallelism of the operator
             * @return the index of the parallel operator to which the
       given key should be routed.
             */
            public static int assignKeyToParallelOperator(Object key,
       int maxParallelism, int parallelism) {
                return computeOperatorIndexForKeyGroup(maxParallelism,
       parallelism, assignToKeyGroup(key, maxParallelism));
            }

            /**
             * Assigns the given key to a key-group index.
             *
             * @param key the key to assign
             * @param maxParallelism the maximum supported parallelism,
       aka the number of key-groups.
             * @return the key-group to which the given key is assigned
             */
            public static int assignToKeyGroup(Object key, int
       maxParallelism) {
                return computeKeyGroupForKeyHash(key.hashCode(),
       maxParallelism);
            }/
     o key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2
       over my 4 operators will not have anything to do)


So, what will be the best way to deal with that?


Thank you in advance for your support.

Regards.


Julien.


Reply via email to