Hi, Newbie question - What I am trying to do is the following: CameraWithCubeSource source sends data-containing tuples of (cameraNbr,TS). 1. Need to partition data by cameraNbr. *2. Then sleep for 1 sec to simulate a heavy process in the task.* *3. Then need to partition data by TS and finally get the DataStream to connect with another DataStream.*
DataStream<CameraWithCube> cameraWithCubeDataStream = env .addSource(new CameraWithCubeSource(cameraFile, delay, servingSpeedFactor)) .setParallelism(parallelTasks) .setMaxParallelism(parallelTasks) .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? //partition by cameraNbr cameraWithCube.cameraKey.getCam() : new Object()); //sleep for 1 sec ???? how *((KeyedStream) cameraWithCubeDataStream).timeWindow(Time.seconds(1))* * .apply(new WindowFunction<CameraWithCube, CameraWithCube, String, TimeWindow>() {* * @Override* * public void apply(String cameraKeyCam, TimeWindow timeWindow,* * Iterable<CameraWithCube> cameraWithCubesAssignedToWindow,* * Collector<CameraWithCube> collector) throws Exception {* * Thread.sleep(1000);* * cameraWithCubesAssignedToWindow.forEach(cameraWithCube -> collector.collect(cameraWithCube));* * }* * })//returning void here from apply ??* * //partition by TS and return DataStream* * .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? //partition by cameraNbr* * cameraWithCube.cameraKey.getTS() : new Object());* ; TIA, Vijay