Hi,

keyBy() does not work hierarchically. Each keyBy() overrides the previous
partitioning.
You can keyBy(cam, seq#) which guarantees that all records with the same
(cam, seq#) are processed by the same parallel instance.
However, Flink does not give any guarantees about how the (cam, seq#)
partitions are distributed across slots (or even physical nodes).

Btw. why is it important that all records of the same cam are processed by
the same physical node?

Fabian

2018-06-25 21:36 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com>:

> I see a .slotSharingGroup for SingleOutputStreamOperator
> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#slotSharingGroup-java.lang.String->
>  which can put parallel instances of operations in same TM slot.
> I also see a CoLocationGroup but do not see a .coLocationGroup for 
> SingleOutputStreamOperator to
> put a task on the same slot.Seems CoLocationGroup
> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.html>
> is defined at JobVertex level and has nothing to do with for
> SingleOutputStreamOperator.
> TaskManager has many slots. Slots have many threads within it.
> I want to be able to put the cam1 partition(keyBy(cam) in 1 slot and then
> use a keyBy(seq#) to run on many threads within that cam1 slot.
>
> Vijay
>
> On Mon, Jun 25, 2018 at 10:10 AM Vijay Balakrishnan <bvija...@gmail.com>
> wrote:
>
>> Thanks, Fabian.
>> Been reading your excellent book on Flink Streaming.Can't wait for more
>> chapters.
>> Attached a pic.
>>
>> [image: partition-by-cam-ts.jpg]
>>
>> I have records with seq# 1 and cam1 and cam2. I also have records with
>> varying seq#'s.
>> By partitioning on cam field first(keyBy(cam)), I can get cam1 partition
>> on the same task manager instance/slot/vCore(???)
>> Can I then have seq# 1 and seq# 2 for cam1 partition run in different
>> slots/threads on the same Task Manager instance(aka cam1 partition) using
>> keyBy(seq#) & setParallelism() ? Can *forward* Strategy be used to
>> achieve this ?
>>
>> TIA
>>
>>
>> On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Flink distributes task instances to slots and does not expose physical
>>> machines.
>>> Records are partitioned to task instances by hash partitioning. It is
>>> also not possible to guarantee that the records in two different operators
>>> are send to the same slot.
>>> Sharing information by side-passing it (e.g., via a file on a machine or
>>> in a static object) is an anti-pattern and should be avoided.
>>>
>>> Best, Fabian
>>>
>>> 2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> Need to partition by cameraWithCube.getCam() 1st using
>>>> parallelCamTasks(passed in as args).
>>>>
>>>> Then within each partition, need to partition again by
>>>> cameraWithCube.getTs() but need to make sure each of the 2nd partition by
>>>> getTS() runs on the same physical node ?
>>>>
>>>> How do I achieve that ?
>>>>
>>>> DataStream<CameraWithCube> cameraWithCubeDataStream = env
>>>>             .addSource(new Source(....))
>>>>             .keyBy((cameraWithCube) -> cameraWithCube.getCam() )
>>>>             .process(new ProcessFunction<CameraWithCube, CameraWithCube>() 
>>>> {
>>>>                 public void processElement(CameraWithCube cameraWithCube, 
>>>> Context context, Collector<CameraWithCube> collector) throws Exception {
>>>>                     //do nothing
>>>>                 }
>>>>             })
>>>>             .slotSharingGroup("camSharingGroup")//TODO: how to add camera# 
>>>> of the partition
>>>>             .setParallelism(parallelCamTasks)
>>>>             .keyBy((cameraWithCube) -> cameraWithCube.getTs())
>>>>             .process(new ProcessFunction<CameraWithCube, CameraWithCube>() 
>>>> {
>>>>                 public void processElement(CameraWithCube cameraWithCube, 
>>>> Context context, Collector<CameraWithCube> collector) throws Exception {
>>>>                     //TODO: process code
>>>>                 }
>>>>             })
>>>>             .setParallelism(noOfSlotsInEachPhysicalNode)//TODO: how many 
>>>> parallel tasks within physical node
>>>>             .slotSharingGroup("??");//TODO: in same physical node
>>>>
>>>> TIA
>>>>
>>>
>>>

Reply via email to