Hi Julien,
    If I am not misunderstand, I think you can key your stream on a 
`Random.nextInt() % parallesm`, this way  you can "group" together alerts from 
different and benefit from multi parallems.




发自网易邮箱大师


On 02/19/2018 09:08,Xingcan Cui<xingc...@gmail.com> wrote:
Hi Julien,


sorry for my misunderstanding before. For now, the window can only be defined 
on a KeyedStream or an ordinary DataStream but with parallelism = 1. I’d like 
to provide three options for your scenario.


1. If your external data is static and can be fit into the memory, you can use 
ManagedStates to cache them without considering the querying problem.
2. Or you can use a CustomPartitioner to manually distribute your alert data 
and simulate an window operation by yourself in a ProcessFuncton.
3. You may also choose to use some external systems such as in-memory store, 
which can work as a cache for your queries.


Best,
Xingcan



On 19 Feb 2018, at 5:55 AM, Julien <jmassio...@gmail.com> wrote:


Hi Xingcan,

Thanks for your answer.
Yes, I understand that point:

if I have 100 resource IDs with parallelism of 4, then each operator instance 
will handle about 25 keys




The issue I have is that I want, on a given operator instance, to group those 
25 keys together in order to do only 1 query to an external system per operator 
instance:

on a given operator instance, I will do 1 query for my 25 keys
so with the 4 operator instances, I will do 4 query in parallel (with about 25 
keys per query)


I do not know how I can do that.

If I define a window on my keyed stream (with for example 
stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))),
 then my understanding is that the window is "associated" to the key. So in 
this case, on a given operator instance, I will have 25 of those windows (one 
per key), and I will do 25 queries (instead of 1).

Do you understand my point ?
Or maybe am I missing something ?

I'd like to find a way on operator instance 1 to group all the alerts received 
on those 25 resource ids and do 1 query for those 25 resource ids.
Same thing for operator instance 2, 3 and 4.


Thank you,
Regards.


On 18/02/2018 14:43, Xingcan Cui wrote:

Hi Julien,


the cardinality of your keys (e.g., resource ID) will not be restricted to the 
parallelism. For instance, if you have 100 resource IDs processed by 
KeyedStream with parallelism 4, each operator instance will handle about 25 
keys. 


Hope that helps.


Best,
Xingcan


On 18 Feb 2018, at 8:49 PM, Julien <jmassio...@gmail.com> wrote:



Hi,

I am pretty new to flink and I don't know what will be the best way to deal 
with the following use case:

as an input, I recieve some alerts from a kafka topic
an alert is linked to a network resource (like router-1, router-2, switch-1, 
switch-2, ...)
so an alert has two main information (the alert id and the resource id of the 
resource on which this alert has been raised)
then I need to do a query to an external system in order to enrich the alert 
with additional information on the resource

(A "natural" candidate for the key on this stream will be the resource id)

The issue I have is that regarding the query to the external system:

I do not want to do 1 query per resource id
I want to do a small number of queries in parallel (for example 4 queries in 
parallel every 500ms), each query requesting the external system for several 
alerts linked to several resource id
Currently, I don't know what will be the best way to deal with that:

I can key my stream on the resource id and then define a processing time window 
of 500ms and when the trigger is ok, then I do my query
by doing so, I will "group" several alerts in a single query, but they will all 
be linked to the same resource.
so I will do 1 query per resource id (which will be too much in my use case)
I can also do a windowAll on a non keyed stream
by doing so, I will "group" together alerts from different resource ids, but 
from what I've read in such a case the parallelism will always be one.
so in this case, I will only do 1 query whereas I'd like to have some 
parallelism

I am thinking that a way to deal with that will be:

define the resource id as the key of stream and put a parallelism of 4
and then having a way to do a windowAll on this keyed stream
which is that, on a given operator instance, I will "group" on the same window 
all the keys (ie all the resource ids) managed by this operator instance
with a parallelism of 4, I will do 4 queries in parallel (1 per operator 
instance, and each query will be for several alerts linked to several resource 
ids)

But after looking at the documentation, I cannot see this ability (having a 
windowAll on a keyed stream).

Am I missing something?

What will be the best way to deal with such a use case?




I've tried for example to review my key and to do something like 
"resourceId.hahsCode%<max nb of queries in parallel>" and then to use a time 
window.

In my example above, the <max nb of queries in parallel> will be 4. And all my 
keys will be 0, 1, 2 or 3.

The issue with this approach is that due to the way the operatorIdx is computed 
based on the key, it does not distribute well my processing:

when this partitioning logic from the "KeyGroupRangeAssignment" class is applied

    /**
     * Assigns the given key to a parallel operator index.
     *
     * @param key the key to assign
     * @param maxParallelism the maximum supported parallelism, aka the number 
of key-groups.
     * @param parallelism the current parallelism of the operator
     * @return the index of the parallel operator to which the given key should 
be routed.
     */
    public static int assignKeyToParallelOperator(Object key, int 
maxParallelism, int parallelism) {
        return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, 
assignToKeyGroup(key, maxParallelism));
    }

    /**
     * Assigns the given key to a key-group index.
     *
     * @param key the key to assign
     * @param maxParallelism the maximum supported parallelism, aka the number 
of key-groups.
     * @return the key-group to which the given key is assigned
     */
    public static int assignToKeyGroup(Object key, int maxParallelism) {
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }
key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2 over my 4 
operators will not have anything to do)




So, what will be the best way to deal with that?




Thank you in advance for your support.

Regards.




Julien.











Reply via email to