huwh commented on code in PR #21989: URL: https://github.com/apache/flink/pull/21989#discussion_r1113866534
########## docs/content.zh/docs/learn-flink/etl.md: ########## @@ -156,56 +154,50 @@ keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat)) ### Keyed Stream 的聚合 -以下代码为每个行程结束事件创建了一个新的包含 `startCell` 和时长(分钟)的元组流: +以下代码为每个行程结束事件创建了一个新的包含 `startCell` 和距离的元组流: ```java -import org.joda.time.Interval; - -DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides - .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() { +DataStream<Tuple2<Integer, Double>> minutesByStartCell = enrichedNYCRides + .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Double>>() { @Override public void flatMap(EnrichedRide ride, - Collector<Tuple2<Integer, Minutes>> out) throws Exception { - if (!ride.isStart) { - Interval rideInterval = new Interval(ride.startTime, ride.endTime); - Minutes duration = rideInterval.toDuration().toStandardMinutes(); - out.collect(new Tuple2<>(ride.startCell, duration)); + Collector<Tuple2<Integer, Double>> out) throws Exception { + if (ride.isStart) { Review Comment: The explanation for this section is "for each end-of-ride event", So we should use the !ride.isStart -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org