Hi,
Newbie question - What I am trying to do is the following:
CameraWithCubeSource source sends data-containing tuples of (cameraNbr,TS).
1. Need to partition data by cameraNbr.
*2. Then sleep for 1 sec to simulate a heavy process in the task.*
*3. Then need to partition data by TS and finally get the DataStream to
connect with another DataStream.*

DataStream<CameraWithCube> cameraWithCubeDataStream = env
                .addSource(new CameraWithCubeSource(cameraFile, delay,
servingSpeedFactor))
                .setParallelism(parallelTasks)
                .setMaxParallelism(parallelTasks)
                .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null
? //partition by cameraNbr
                        cameraWithCube.cameraKey.getCam() : new Object());
//sleep for 1 sec ???? how
                *((KeyedStream)
cameraWithCubeDataStream).timeWindow(Time.seconds(1))*
*                    .apply(new WindowFunction<CameraWithCube,
CameraWithCube, String, TimeWindow>() {*
*                        @Override*
*                        public void apply(String cameraKeyCam, TimeWindow
timeWindow,*
*                                          Iterable<CameraWithCube>
cameraWithCubesAssignedToWindow,*
*                                          Collector<CameraWithCube>
collector) throws Exception {*
*                            Thread.sleep(1000);*
*
cameraWithCubesAssignedToWindow.forEach(cameraWithCube ->
collector.collect(cameraWithCube));*

*                        }*
*                    })//returning void here from apply ??*
* //partition by TS and return DataStream*
*                 .keyBy((cameraWithCube) -> cameraWithCube.cameraKey !=
null ? //partition by cameraNbr*
*                         cameraWithCube.cameraKey.getTS() : new Object());*
;
TIA,
Vijay

Reply via email to