Just some advice - do not use sleep to simulate a heavy task. Use real data or 
generated data to simulate. This sleep is garbage from a software quality point 
of view. Furthermore, it is often forgotten etc.

> On 16. May 2018, at 22:32, Vijay Balakrishnan <[email protected]> wrote:
> 
> Hi,
> Newbie question - What I am trying to do is the following:
> CameraWithCubeSource source sends data-containing tuples of (cameraNbr,TS).
> 1. Need to partition data by cameraNbr.
> 2. Then sleep for 1 sec to simulate a heavy process in the task.
> 3. Then need to partition data by TS and finally get the DataStream to 
> connect with another DataStream.
> 
> DataStream<CameraWithCube> cameraWithCubeDataStream = env
>                 .addSource(new CameraWithCubeSource(cameraFile, delay, 
> servingSpeedFactor))
>                 .setParallelism(parallelTasks)
>                 .setMaxParallelism(parallelTasks)
>                 .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? 
> //partition by cameraNbr
>                         cameraWithCube.cameraKey.getCam() : new Object());
>                               //sleep for 1 sec ???? how
>                 ((KeyedStream) 
> cameraWithCubeDataStream).timeWindow(Time.seconds(1))
>                     .apply(new WindowFunction<CameraWithCube, CameraWithCube, 
> String, TimeWindow>() {
>                         @Override
>                         public void apply(String cameraKeyCam, TimeWindow 
> timeWindow,
>                                           Iterable<CameraWithCube> 
> cameraWithCubesAssignedToWindow,
>                                           Collector<CameraWithCube> 
> collector) throws Exception {
>                             Thread.sleep(1000);
>                             
> cameraWithCubesAssignedToWindow.forEach(cameraWithCube -> 
> collector.collect(cameraWithCube));
> 
>                         }
>                     })//returning void here from apply ??
>                                       //partition by TS and return DataStream
>                       .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != 
> null ? //partition by cameraNbr
>                               cameraWithCube.cameraKey.getTS() : new 
> Object());
>                                       ;
>                                       
> TIA,
> Vijay

Reply via email to