aljoscha commented on a change in pull request #12147: URL: https://github.com/apache/flink/pull/12147#discussion_r426165759
########## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ########## @@ -350,92 +339,38 @@ protected abstract void doCommitInternalOffsetsToKafka( */ protected void emitRecordsWithTimestamps( Queue<T> records, - KafkaTopicPartitionState<KPH> partitionState, + KafkaTopicPartitionState<T, KPH> partitionState, long offset, long kafkaEventTimestamp) { // emit the records, using the checkpoint lock to guarantee // atomicity of record emission and offset state update synchronized (checkpointLock) { T record; while ((record = records.poll()) != null) { - // timestamps will be of the same size as records. - long timestamp = getTimestampForRecord(record, partitionState, kafkaEventTimestamp); - sourceContext.collectWithTimestamp(record, timestamp); - if (timestampWatermarkMode == PUNCTUATED_WATERMARKS) { - emitPunctuatedWatermark(record, timestamp, partitionState); - } - } - partitionState.setOffset(offset); - } - } - - private void emitPunctuatedWatermark( - T record, - long timestamp, - KafkaTopicPartitionState<KPH> partitionState) { - final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState = - (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState; - - Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp); - - // if we also have a new per-partition watermark, check if that is also a - // new cross-partition watermark - if (newWatermark != null) { - updateMinPunctuatedWatermark(newWatermark); - } - } - - protected long getTimestampForRecord( - T record, - KafkaTopicPartitionState<KPH> partitionState, - long kafkaEventTimestamp) { - if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { - return kafkaEventTimestamp; - } else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { - final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState = - (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState; - - // extract timestamp - this accesses/modifies the per-partition state inside the - // watermark generator instance, so we need to lock the access on the - // partition state. concurrent access can happen from the periodic emitter - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (withWatermarksState) { - return withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp); - } - } else { - final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState = - (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState; - - // only one thread ever works on accessing timestamps and watermarks - // from the punctuated extractor - return withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp); - } - } + long timestamp; - /** - *Checks whether a new per-partition watermark is also a new cross-partition watermark. - */ - private void updateMinPunctuatedWatermark(Watermark nextWatermark) { - if (nextWatermark.getTimestamp() > maxWatermarkSoFar) { - long newMin = Long.MAX_VALUE; - - for (KafkaTopicPartitionState<?> state : subscribedPartitionStates) { - @SuppressWarnings("unchecked") - final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState = - (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state; - - newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark()); - } + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (partitionState) { + + // You would expect that we don't have to do this under lock. You would be wrong: + // A WatermarkStrategy can wrap an old-style combined + // timestamp extractor/watermark assigner, in which case the TimestampAssigner and + // WatermarkGenerator wrap one and the same object, where extracting the timestamp + // updates the internal state of the assigner. + timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp); + partitionState.onEvent(record, timestamp); Review comment: Ah, I forgot to remove it. I had already added the call after element emission... 😅 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org