Hello,

I've already tried to key my stream with "resourceId.hashCode%parallelism" (with parallelism of 4 in my example). So all my keys will be either 0,1, 2 or 3. I can then benefit from a time window on this keyed stream and do only 4 queries to my external system. But it is not well distributed with the default partitioner on keyed stream. (keys 0, 1, 2 and 3 only goes to operator idx 2, 3).

I think I should explore the customer partitioner, as you suggested Xingcan.
Maybe my last question on this will be: "can you give me more details on this point "and simulate a window operation by yourself in a ProcessFunction" ?

When I look at the documentation about the custom partitioner, I can see that the result of partitionCustom is a DataStream.
It is not a KeyedStream.
So the only window I have will be windowAll (which will bring me back to a parallelism of 1, no ?).

And if I do something like "myStream.partitionCustom(<my new partitioner>,<my key>).keyBy(<myKey>).window(...)", will it preserve my custom partitioner ? When looking at the "KeyedStream" class, it seems that it will go back to the "KeyGroupStreamPartitioner" and forget my custom partitioner ?

Thanks again for your feedback,

Julien.


On 19/02/2018 03:45, 周思华 wrote:
Hi Julien,
    If I am not misunderstand, I think you can key your stream on a `Random.nextInt() % parallesm`, this way  you can "group" together alerts from different and benefit from multi parallems.


发自网易邮箱大师

On 02/19/2018 09:08,Xingcan Cui<xingc...@gmail.com> <mailto:xingc...@gmail.com> wrote:

    Hi Julien,

    sorry for my misunderstanding before. For now, the window can only
    be defined on a KeyedStream or an ordinary DataStream but with
    parallelism = 1. I’d like to provide three options for your scenario.

    1. If your external data is static and can be fit into the memory,
    you can use ManagedStates
    
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state>
 to
    cache them without considering the querying problem.
    2. Or you can use a CustomPartitioner
    
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning>
 to
    manually distribute your alert data and simulate an window
    operation by yourself in a ProcessFuncton.
    3. You may also choose to use some external systems such as
    in-memory store, which can work as a cache for your queries.

    Best,
    Xingcan

    On 19 Feb 2018, at 5:55 AM, Julien <jmassio...@gmail.com
    <mailto:jmassio...@gmail.com>> wrote:

    Hi Xingcan,

    Thanks for your answer.
    Yes, I understand that point:

      * if I have 100 resource IDs with parallelism of 4, then each
        operator instance will handle about 25 keys


    The issue I have is that I want, on a given operator instance, to
    group those 25 keys together in order to do only 1 query to an
    external system per operator instance:

      * on a given operator instance, I will do 1 query for my 25 keys
      * so with the 4 operator instances, I will do 4 query in
        parallel (with about 25 keys per query)


    I do not know how I can do that.

    If I define a window on my keyed stream (with for example
    
/stream.key(_.resourceId).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))),
    /then my understanding is that the window is "associated" to the
    key. So in this case, on a given operator instance, I will have
    25 of those windows (one per key), and I will do 25 queries
    (instead of 1).

    Do you understand my point ?
    Or maybe am I missing something ?

    I'd like to find a way on operator instance 1 to group all the
    alerts received on those 25 resource ids and do 1 query for those
    25 resource ids.
    Same thing for operator instance 2, 3 and 4.


    Thank you,
    Regards.


    On 18/02/2018 14:43, Xingcan Cui wrote:
    Hi Julien,

    the cardinality of your keys (e.g., resource ID) will not be
    restricted to the parallelism. For instance, if you have 100
    resource IDs processed by KeyedStream with parallelism 4, each
    operator instance will handle about 25 keys.

    Hope that helps.

    Best,
    Xingcan

    On 18 Feb 2018, at 8:49 PM, Julien <jmassio...@gmail.com
    <mailto:jmassio...@gmail.com>> wrote:

    Hi,

    I am pretty new to flink and I don't know what will be the best
    way to deal with the following use case:

      * as an input, I recieve some alerts from a kafka topic
          o an alert is linked to a network resource (like
            router-1, router-2, switch-1, switch-2, ...)
          o so an alert has two main information (the alert id and
            the resource id of the resource on which this alert has
            been raised)
      * then I need to do a query to an external system in order to
        enrich the alert with additional information on the resource


    (A "natural" candidate for the key on this stream will be the
    resource id)

    The issue I have is that regarding the query to the external
    system:

      * I do not want to do 1 query per resource id
      * I want to do a small number of queries in parallel (for
        example 4 queries in parallel every 500ms), each query
        requesting the external system for several alerts linked to
        several resource id

    Currently, I don't know what will be the best way to deal with
    that:

      * I can key my stream on the resource id and then define a
        processing time window of 500ms and when the trigger is ok,
        then I do my query
          o by doing so, I will "group" several alerts in a single
            query, but they will all be linked to the same resource.
          o so I will do 1 query per resource id (which will be too
            much in my use case)
      * I can also do a windowAll on a non keyed stream
          o by doing so, I will "group" together alerts from
            different resource ids, but from what I've read in such
            a case the parallelism will always be one.
          o so in this case, I will only do 1 query whereas I'd
            like to have some parallelism

    I am thinking that a way to deal with that will be:

      * define the resource id as the key of stream and put a
        parallelism of 4
      * and then having a way to do a windowAll on this keyed stream
          o which is that, on a given operator instance, I will
            "group" on the same window all the keys (ie all the
            resource ids) managed by this operator instance
          o with a parallelism of 4, I will do 4 queries in
            parallel (1 per operator instance, and each query will
            be for several alerts linked to several resource ids)

    But after looking at the documentation, I cannot see this
    ability (having a windowAll on a keyed stream).

    Am I missing something?

    What will be the best way to deal with such a use case?


    I've tried for example to review my key and to do something
    like "resourceId.hahsCode%<max nb of queries in parallel>" and
    then to use a time window.

    In my example above, the <max nb of queries in parallel> will
    be 4. And all my keys will be 0, 1, 2 or 3.

    The issue with this approach is that due to the way the
    operatorIdx is computed based on the key, it does not
    distribute well my processing:

      * when this partitioning logic from the
        "KeyGroupRangeAssignment" class is applied
          o //**
                 * Assigns the given key to a parallel operator index.
                 *
                 * @param key the key to assign
                 * @param maxParallelism the maximum supported
            parallelism, aka the number of key-groups.
                 * @param parallelism the current parallelism of
            the operator
                 * @return the index of the parallel operator to
            which the given key should be routed.
                 */
                public static int
            assignKeyToParallelOperator(Object key, int
            maxParallelism, int parallelism) {
                    return
            computeOperatorIndexForKeyGroup(maxParallelism,
            parallelism, assignToKeyGroup(key, maxParallelism));
                }

                /**
                 * Assigns the given key to a key-group index.
                 *
                 * @param key the key to assign
                 * @param maxParallelism the maximum supported
            parallelism, aka the number of key-groups.
                 * @return the key-group to which the given key is
            assigned
                 */
                public static int assignToKeyGroup(Object key, int
            maxParallelism) {
                    return
            computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
                }/
          o key 0, 1, 2 and 3 are only assigned to operator 2 and 3
            (so 2 over my 4 operators will not have anything to do)


    So, what will be the best way to deal with that?


    Thank you in advance for your support.

    Regards.


    Julien.






Reply via email to