Hi Julien,

sorry for my misunderstanding before. For now, the window can only be defined 
on a KeyedStream or an ordinary DataStream but with parallelism = 1. I’d like 
to provide three options for your scenario.

1. If your external data is static and can be fit into the memory, you can use 
ManagedStates 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state>
 to cache them without considering the querying problem.
2. Or you can use a CustomPartitioner 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning>
 to manually distribute your alert data and simulate an window operation by 
yourself in a ProcessFuncton.
3. You may also choose to use some external systems such as in-memory store, 
which can work as a cache for your queries.

Best,
Xingcan

> On 19 Feb 2018, at 5:55 AM, Julien <jmassio...@gmail.com> wrote:
> 
> Hi Xingcan,
> 
> Thanks for your answer.
> Yes, I understand that point:
> if I have 100 resource IDs with parallelism of 4, then each operator instance 
> will handle about 25 keys
> 
> The issue I have is that I want, on a given operator instance, to group those 
> 25 keys together in order to do only 1 query to an external system per 
> operator instance:
> 
> on a given operator instance, I will do 1 query for my 25 keys
> so with the 4 operator instances, I will do 4 query in parallel (with about 
> 25 keys per query)
> 
> I do not know how I can do that.
> 
> If I define a window on my keyed stream (with for example 
> stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))),
>  then my understanding is that the window is "associated" to the key. So in 
> this case, on a given operator instance, I will have 25 of those windows (one 
> per key), and I will do 25 queries (instead of 1).
> 
> Do you understand my point ?
> Or maybe am I missing something ?
> 
> I'd like to find a way on operator instance 1 to group all the alerts 
> received on those 25 resource ids and do 1 query for those 25 resource ids.
> Same thing for operator instance 2, 3 and 4.
> 
> 
> Thank you,
> Regards.
> 
> 
> On 18/02/2018 14:43, Xingcan Cui wrote:
>> Hi Julien,
>> 
>> the cardinality of your keys (e.g., resource ID) will not be restricted to 
>> the parallelism. For instance, if you have 100 resource IDs processed by 
>> KeyedStream with parallelism 4, each operator instance will handle about 25 
>> keys. 
>> 
>> Hope that helps.
>> 
>> Best,
>> Xingcan
>> 
>>> On 18 Feb 2018, at 8:49 PM, Julien <jmassio...@gmail.com 
>>> <mailto:jmassio...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I am pretty new to flink and I don't know what will be the best way to deal 
>>> with the following use case:
>>> 
>>> as an input, I recieve some alerts from a kafka topic
>>> an alert is linked to a network resource (like router-1, router-2, 
>>> switch-1, switch-2, ...)
>>> so an alert has two main information (the alert id and the resource id of 
>>> the resource on which this alert has been raised)
>>> then I need to do a query to an external system in order to enrich the 
>>> alert with additional information on the resource
>>> 
>>> (A "natural" candidate for the key on this stream will be the resource id)
>>> 
>>> The issue I have is that regarding the query to the external system:
>>> I do not want to do 1 query per resource id
>>> I want to do a small number of queries in parallel (for example 4 queries 
>>> in parallel every 500ms), each query requesting the external system for 
>>> several alerts linked to several resource id
>>> Currently, I don't know what will be the best way to deal with that:
>>> I can key my stream on the resource id and then define a processing time 
>>> window of 500ms and when the trigger is ok, then I do my query
>>> by doing so, I will "group" several alerts in a single query, but they will 
>>> all be linked to the same resource.
>>> so I will do 1 query per resource id (which will be too much in my use case)
>>> I can also do a windowAll on a non keyed stream
>>> by doing so, I will "group" together alerts from different resource ids, 
>>> but from what I've read in such a case the parallelism will always be one.
>>> so in this case, I will only do 1 query whereas I'd like to have some 
>>> parallelism
>>> I am thinking that a way to deal with that will be:
>>> 
>>> define the resource id as the key of stream and put a parallelism of 4
>>> and then having a way to do a windowAll on this keyed stream
>>> which is that, on a given operator instance, I will "group" on the same 
>>> window all the keys (ie all the resource ids) managed by this operator 
>>> instance
>>> with a parallelism of 4, I will do 4 queries in parallel (1 per operator 
>>> instance, and each query will be for several alerts linked to several 
>>> resource ids)
>>> But after looking at the documentation, I cannot see this ability (having a 
>>> windowAll on a keyed stream).
>>> 
>>> Am I missing something?
>>> 
>>> What will be the best way to deal with such a use case?
>>> 
>>> 
>>> I've tried for example to review my key and to do something like 
>>> "resourceId.hahsCode%<max nb of queries in parallel>" and then to use a 
>>> time window.
>>> 
>>> In my example above, the <max nb of queries in parallel> will be 4. And all 
>>> my keys will be 0, 1, 2 or 3.
>>> 
>>> The issue with this approach is that due to the way the operatorIdx is 
>>> computed based on the key, it does not distribute well my processing:
>>> 
>>> when this partitioning logic from the "KeyGroupRangeAssignment" class is 
>>> applied
>>>     /**
>>>      * Assigns the given key to a parallel operator index.
>>>      *
>>>      * @param key the key to assign
>>>      * @param maxParallelism the maximum supported parallelism, aka the 
>>> number of key-groups.
>>>      * @param parallelism the current parallelism of the operator
>>>      * @return the index of the parallel operator to which the given key 
>>> should be routed.
>>>      */
>>>     public static int assignKeyToParallelOperator(Object key, int 
>>> maxParallelism, int parallelism) {
>>>         return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, 
>>> assignToKeyGroup(key, maxParallelism));
>>>     }
>>> 
>>>     /**
>>>      * Assigns the given key to a key-group index.
>>>      *
>>>      * @param key the key to assign
>>>      * @param maxParallelism the maximum supported parallelism, aka the 
>>> number of key-groups.
>>>      * @return the key-group to which the given key is assigned
>>>      */
>>>     public static int assignToKeyGroup(Object key, int maxParallelism) {
>>>         return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
>>>     }
>>> key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2 over my 4 
>>> operators will not have anything to do)
>>> 
>>> So, what will be the best way to deal with that?
>>> 
>>> 
>>> 
>>> Thank you in advance for your support.
>>> 
>>> Regards.
>>> 
>>> 
>>> 
>>> Julien.
>>> 
>>> 
>> 
> 

Reply via email to