aljoscha commented on a change in pull request #12147:
URL: https://github.com/apache/flink/pull/12147#discussion_r426165759



##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
##########
@@ -350,92 +339,38 @@ protected abstract void doCommitInternalOffsetsToKafka(
         */
        protected void emitRecordsWithTimestamps(
                        Queue<T> records,
-                       KafkaTopicPartitionState<KPH> partitionState,
+                       KafkaTopicPartitionState<T, KPH> partitionState,
                        long offset,
                        long kafkaEventTimestamp) {
                // emit the records, using the checkpoint lock to guarantee
                // atomicity of record emission and offset state update
                synchronized (checkpointLock) {
                        T record;
                        while ((record = records.poll()) != null) {
-                               // timestamps will be of the same size as 
records.
-                               long timestamp = getTimestampForRecord(record, 
partitionState, kafkaEventTimestamp);
-                               sourceContext.collectWithTimestamp(record, 
timestamp);
-                               if (timestampWatermarkMode == 
PUNCTUATED_WATERMARKS) {
-                                       emitPunctuatedWatermark(record, 
timestamp, partitionState);
-                               }
-                       }
-                       partitionState.setOffset(offset);
-               }
-       }
-
-       private void emitPunctuatedWatermark(
-                       T record,
-                       long timestamp,
-                       KafkaTopicPartitionState<KPH> partitionState) {
-               final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> 
withWatermarksState =
-                       (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, 
KPH>) partitionState;
-
-               Watermark newWatermark = 
withWatermarksState.checkAndGetNewWatermark(record, timestamp);
-
-               // if we also have a new per-partition watermark, check if that 
is also a
-               // new cross-partition watermark
-               if (newWatermark != null) {
-                       updateMinPunctuatedWatermark(newWatermark);
-               }
-       }
-
-       protected long getTimestampForRecord(
-                       T record,
-                       KafkaTopicPartitionState<KPH> partitionState,
-                       long kafkaEventTimestamp) {
-               if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
-                       return kafkaEventTimestamp;
-               } else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-                       final KafkaTopicPartitionStateWithPeriodicWatermarks<T, 
KPH> withWatermarksState =
-                               
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
-
-                       // extract timestamp - this accesses/modifies the 
per-partition state inside the
-                       // watermark generator instance, so we need to lock the 
access on the
-                       // partition state. concurrent access can happen from 
the periodic emitter
-                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
-                       synchronized (withWatermarksState) {
-                               return 
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
-                       }
-               } else {
-                       final 
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
-                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
-
-                       // only one thread ever works on accessing timestamps 
and watermarks
-                       // from the punctuated extractor
-                       return 
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
-               }
-       }
+                               long timestamp;
 
-       /**
-        *Checks whether a new per-partition watermark is also a new 
cross-partition watermark.
-        */
-       private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
-               if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
-                       long newMin = Long.MAX_VALUE;
-
-                       for (KafkaTopicPartitionState<?> state : 
subscribedPartitionStates) {
-                               @SuppressWarnings("unchecked")
-                               final 
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
-                                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
-
-                               newMin = Math.min(newMin, 
withWatermarksState.getCurrentPartitionWatermark());
-                       }
+                               //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                               synchronized (partitionState) {
+
+                                       // You would expect that we don't have 
to do this under lock. You would be wrong:
+                                       // A WatermarkStrategy can wrap an 
old-style combined
+                                       // timestamp extractor/watermark 
assigner, in which case the TimestampAssigner and
+                                       // WatermarkGenerator wrap one and the 
same object, where extracting the timestamp
+                                       // updates the internal state of the 
assigner.
+                                       timestamp = 
partitionState.extractTimestamp(record, kafkaEventTimestamp);
+                                       partitionState.onEvent(record, 
timestamp);

Review comment:
       Ah, I forgot to remove it. I had already added the call after element 
emission... 😅




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to