I see a .slotSharingGroup for SingleOutputStreamOperator
<https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#slotSharingGroup-java.lang.String->
which
can put parallel instances of operations in same TM slot.
I also see a CoLocationGroup but do not see a .coLocationGroup
for SingleOutputStreamOperator to put a task on the same slot.Seems
CoLocationGroup
<https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.html>
is defined at JobVertex level and has nothing to do with
for SingleOutputStreamOperator.
TaskManager has many slots. Slots have many threads within it.
I want to be able to put the cam1 partition(keyBy(cam) in 1 slot and then
use a keyBy(seq#) to run on many threads within that cam1 slot.

Vijay

On Mon, Jun 25, 2018 at 10:10 AM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Thanks, Fabian.
> Been reading your excellent book on Flink Streaming.Can't wait for more
> chapters.
> Attached a pic.
>
> [image: partition-by-cam-ts.jpg]
>
> I have records with seq# 1 and cam1 and cam2. I also have records with
> varying seq#'s.
> By partitioning on cam field first(keyBy(cam)), I can get cam1 partition
> on the same task manager instance/slot/vCore(???)
> Can I then have seq# 1 and seq# 2 for cam1 partition run in different
> slots/threads on the same Task Manager instance(aka cam1 partition) using
> keyBy(seq#) & setParallelism() ? Can *forward* Strategy be used to
> achieve this ?
>
> TIA
>
>
> On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> Flink distributes task instances to slots and does not expose physical
>> machines.
>> Records are partitioned to task instances by hash partitioning. It is
>> also not possible to guarantee that the records in two different operators
>> are send to the same slot.
>> Sharing information by side-passing it (e.g., via a file on a machine or
>> in a static object) is an anti-pattern and should be avoided.
>>
>> Best, Fabian
>>
>> 2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com>:
>>
>>> Hi,
>>>
>>> Need to partition by cameraWithCube.getCam() 1st using
>>> parallelCamTasks(passed in as args).
>>>
>>> Then within each partition, need to partition again by
>>> cameraWithCube.getTs() but need to make sure each of the 2nd partition by
>>> getTS() runs on the same physical node ?
>>>
>>> How do I achieve that ?
>>>
>>> DataStream<CameraWithCube> cameraWithCubeDataStream = env
>>>             .addSource(new Source(....))
>>>             .keyBy((cameraWithCube) -> cameraWithCube.getCam() )
>>>             .process(new ProcessFunction<CameraWithCube, CameraWithCube>() {
>>>                 public void processElement(CameraWithCube cameraWithCube, 
>>> Context context, Collector<CameraWithCube> collector) throws Exception {
>>>                     //do nothing
>>>                 }
>>>             })
>>>             .slotSharingGroup("camSharingGroup")//TODO: how to add camera# 
>>> of the partition
>>>             .setParallelism(parallelCamTasks)
>>>             .keyBy((cameraWithCube) -> cameraWithCube.getTs())
>>>             .process(new ProcessFunction<CameraWithCube, CameraWithCube>() {
>>>                 public void processElement(CameraWithCube cameraWithCube, 
>>> Context context, Collector<CameraWithCube> collector) throws Exception {
>>>                     //TODO: process code
>>>                 }
>>>             })
>>>             .setParallelism(noOfSlotsInEachPhysicalNode)//TODO: how many 
>>> parallel tasks within physical node
>>>             .slotSharingGroup("??");//TODO: in same physical node
>>>
>>> TIA
>>>
>>
>>

Reply via email to