Hi Julien,

I'd run into a similar situation, where I need to have a keyed stream, but I 
want (effectively) one key per task.

It’s possible to generate keys that will get distributed as you need, though it 
does require making assumptions about how Flink generates hashes/key groups.

And once you start talking about state, then it gets a bit harder, as you need 
to know the max parallelism, which is used to calculate “key groups”.

Below is a cheesy function I wrote to make an Integer that (if used as the key) 
will partition the record to the target operator.

I use it in a custom Map function to add a key field.

— Ken

        /**
         * Return an integer value that will get partitioned to the target 
<operatorIndex>, given the
         * workflow's <maxParallelism> (for key groups) and the operator 
<parallelism>.
         * 
         * @param maxParallelism
         * @param parallelism
         * @param operatorIndex
         * @return Integer suitable for use in a record as the key.
         */
        public static Integer makeKeyForOperatorIndex(int maxParallelism, int 
parallelism, int operatorIndex) {
                if (maxParallelism == ExecutionJobVertex.VALUE_NOT_SET) {
                        maxParallelism = 
KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
                }
                
                for (int i = 0; i < maxParallelism * 2; i++) {
                        Integer key = new Integer(i);
                        int keyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
                        int index = 
KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, 
parallelism, keyGroup);
                        if (index == operatorIndex) {
                                return key;
                        }
                }
                
                throw new RuntimeException(String.format("Unable to find key 
for target operator index %d (max parallelism = %d, parallelism = %d", 
                                operatorIndex, maxParallelism, parallelism));
        }


> On Feb 19, 2018, at 12:34 AM, Julien <jmassio...@gmail.com> wrote:
> 
> Hello,
> 
> I've already tried to key my stream with "resourceId.hashCode%parallelism" 
> (with parallelism of 4 in my example).
> So all my keys will be either 0,1, 2 or 3. I can then benefit from a time 
> window on this keyed stream and do only 4 queries to my external system.
> But it is not well distributed with the default partitioner on keyed stream. 
> (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).
> 
> I think I should explore the customer partitioner, as you suggested Xingcan.
> Maybe my last question on this will be: "can you give me more details on this 
> point "and simulate a window operation by yourself in a ProcessFunction" ?
> 
> When I look at the documentation about the custom partitioner, I can see that 
> the result of partitionCustom is a DataStream.
> It is not a KeyedStream.
> So the only window I have will be windowAll (which will bring me back to a 
> parallelism of 1, no ?).
> 
> And if I do something like "myStream.partitionCustom(<my new partitioner>,<my 
> key>).keyBy(<myKey>).window(...)", will it preserve my custom partitioner ?
> When looking at the "KeyedStream" class, it seems that it will go back to the 
> "KeyGroupStreamPartitioner" and forget my custom partitioner ?
> 
> Thanks again for your feedback,
> 
> Julien.
> 
> 
> On 19/02/2018 03:45, 周思华 wrote:
>> Hi Julien,
>>     If I am not misunderstand, I think you can key your stream on a 
>> `Random.nextInt() % parallesm`, this way  you can "group" together alerts 
>> from different and benefit from multi parallems.
>> 
>> 
>> 发自网易邮箱大师
>> 
>> On 02/19/2018 09:08,Xingcan Cui<xingc...@gmail.com> 
>> <mailto:xingc...@gmail.com> wrote: 
>> Hi Julien,
>> 
>> sorry for my misunderstanding before. For now, the window can only be 
>> defined on a KeyedStream or an ordinary DataStream but with parallelism = 1. 
>> I’d like to provide three options for your scenario.
>> 
>> 1. If your external data is static and can be fit into the memory, you can 
>> use ManagedStates 
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state>
>>  to cache them without considering the querying problem.
>> 2. Or you can use a CustomPartitioner 
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning>
>>  to manually distribute your alert data and simulate an window operation by 
>> yourself in a ProcessFuncton.
>> 3. You may also choose to use some external systems such as in-memory store, 
>> which can work as a cache for your queries.
>> 
>> Best,
>> Xingcan
>> 
>>> On 19 Feb 2018, at 5:55 AM, Julien <jmassio...@gmail.com 
>>> <mailto:jmassio...@gmail.com>> wrote:
>>> 
>>> Hi Xingcan,
>>> 
>>> Thanks for your answer.
>>> Yes, I understand that point:
>>> if I have 100 resource IDs with parallelism of 4, then each operator 
>>> instance will handle about 25 keys
>>> 
>>> 
>>> The issue I have is that I want, on a given operator instance, to group 
>>> those 25 keys together in order to do only 1 query to an external system 
>>> per operator instance:
>>> 
>>> on a given operator instance, I will do 1 query for my 25 keys
>>> so with the 4 operator instances, I will do 4 query in parallel (with about 
>>> 25 keys per query)
>>> 
>>> I do not know how I can do that.
>>> 
>>> If I define a window on my keyed stream (with for example 
>>> stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))),
>>>  then my understanding is that the window is "associated" to the key. So in 
>>> this case, on a given operator instance, I will have 25 of those windows 
>>> (one per key), and I will do 25 queries (instead of 1).
>>> 
>>> Do you understand my point ?
>>> Or maybe am I missing something ?
>>> 
>>> I'd like to find a way on operator instance 1 to group all the alerts 
>>> received on those 25 resource ids and do 1 query for those 25 resource ids.
>>> Same thing for operator instance 2, 3 and 4.
>>> 
>>> 
>>> Thank you,
>>> Regards.
>>> 
>>> 
>>> On 18/02/2018 14:43, Xingcan Cui wrote:
>>>> Hi Julien,
>>>> 
>>>> the cardinality of your keys (e.g., resource ID) will not be restricted to 
>>>> the parallelism. For instance, if you have 100 resource IDs processed by 
>>>> KeyedStream with parallelism 4, each operator instance will handle about 
>>>> 25 keys. 
>>>> 
>>>> Hope that helps.
>>>> 
>>>> Best,
>>>> Xingcan
>>>> 
>>>>> On 18 Feb 2018, at 8:49 PM, Julien <jmassio...@gmail.com 
>>>>> <mailto:jmassio...@gmail.com>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> I am pretty new to flink and I don't know what will be the best way to 
>>>>> deal with the following use case:
>>>>> 
>>>>> as an input, I recieve some alerts from a kafka topic
>>>>> an alert is linked to a network resource (like router-1, router-2, 
>>>>> switch-1, switch-2, ...)
>>>>> so an alert has two main information (the alert id and the resource id of 
>>>>> the resource on which this alert has been raised)
>>>>> then I need to do a query to an external system in order to enrich the 
>>>>> alert with additional information on the resource
>>>>> 
>>>>> (A "natural" candidate for the key on this stream will be the resource id)
>>>>> 
>>>>> The issue I have is that regarding the query to the external system:
>>>>> I do not want to do 1 query per resource id
>>>>> I want to do a small number of queries in parallel (for example 4 queries 
>>>>> in parallel every 500ms), each query requesting the external system for 
>>>>> several alerts linked to several resource id
>>>>> Currently, I don't know what will be the best way to deal with that:
>>>>> I can key my stream on the resource id and then define a processing time 
>>>>> window of 500ms and when the trigger is ok, then I do my query
>>>>> by doing so, I will "group" several alerts in a single query, but they 
>>>>> will all be linked to the same resource.
>>>>> so I will do 1 query per resource id (which will be too much in my use 
>>>>> case)
>>>>> I can also do a windowAll on a non keyed stream
>>>>> by doing so, I will "group" together alerts from different resource ids, 
>>>>> but from what I've read in such a case the parallelism will always be one.
>>>>> so in this case, I will only do 1 query whereas I'd like to have some 
>>>>> parallelism
>>>>> I am thinking that a way to deal with that will be:
>>>>> 
>>>>> define the resource id as the key of stream and put a parallelism of 4
>>>>> and then having a way to do a windowAll on this keyed stream
>>>>> which is that, on a given operator instance, I will "group" on the same 
>>>>> window all the keys (ie all the resource ids) managed by this operator 
>>>>> instance
>>>>> with a parallelism of 4, I will do 4 queries in parallel (1 per operator 
>>>>> instance, and each query will be for several alerts linked to several 
>>>>> resource ids)
>>>>> But after looking at the documentation, I cannot see this ability (having 
>>>>> a windowAll on a keyed stream).
>>>>> 
>>>>> Am I missing something?
>>>>> 
>>>>> What will be the best way to deal with such a use case?
>>>>> 
>>>>> 
>>>>> 
>>>>> I've tried for example to review my key and to do something like 
>>>>> "resourceId.hahsCode%<max nb of queries in parallel>" and then to use a 
>>>>> time window.
>>>>> 
>>>>> In my example above, the <max nb of queries in parallel> will be 4. And 
>>>>> all my keys will be 0, 1, 2 or 3.
>>>>> 
>>>>> The issue with this approach is that due to the way the operatorIdx is 
>>>>> computed based on the key, it does not distribute well my processing:
>>>>> 
>>>>> when this partitioning logic from the "KeyGroupRangeAssignment" class is 
>>>>> applied
>>>>>     /**
>>>>>      * Assigns the given key to a parallel operator index.
>>>>>      *
>>>>>      * @param key the key to assign
>>>>>      * @param maxParallelism the maximum supported parallelism, aka the 
>>>>> number of key-groups.
>>>>>      * @param parallelism the current parallelism of the operator
>>>>>      * @return the index of the parallel operator to which the given key 
>>>>> should be routed.
>>>>>      */
>>>>>     public static int assignKeyToParallelOperator(Object key, int 
>>>>> maxParallelism, int parallelism) {
>>>>>         return computeOperatorIndexForKeyGroup(maxParallelism, 
>>>>> parallelism, assignToKeyGroup(key, maxParallelism));
>>>>>     }
>>>>> 
>>>>>     /**
>>>>>      * Assigns the given key to a key-group index.
>>>>>      *
>>>>>      * @param key the key to assign
>>>>>      * @param maxParallelism the maximum supported parallelism, aka the 
>>>>> number of key-groups.
>>>>>      * @return the key-group to which the given key is assigned
>>>>>      */
>>>>>     public static int assignToKeyGroup(Object key, int maxParallelism) {
>>>>>         return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
>>>>>     }
>>>>> key 0, 1, 2 and 3 are only assigned to operator 2 and 3 (so 2 over my 4 
>>>>> operators will not have anything to do)
>>>>> 
>>>>> 
>>>>> So, what will be the best way to deal with that?
>>>>> 
>>>>> 
>>>>> 
>>>>> Thank you in advance for your support.
>>>>> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply via email to