tweise commented on a change in pull request #8517: [FLINK-10921] [kinesis] 
Shard watermark synchronization in Kinesis consumer

 File path: 
 @@ -0,0 +1,268 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.util;
+import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+ * Emitter that handles event time synchronization between producer threads.
+ *
+ * <p>Records are organized into per producer queues that will block when 
capacity is exhausted.
+ *
+ * <p>Records are emitted by selecting the oldest available element of all 
producer queues,
+ * as long as the timestamp does not exceed the current shared watermark plus 
allowed lookahead interval.
+ *
+ * @param <T>
+ */
+public abstract class RecordEmitter<T extends TimestampedValue> implements 
Runnable {
+       private static final Logger LOG = 
+       /**
+        * The default capacity of a single queue.
+        *
+        * <p>Larger queue size can lead to higher throughput, but also to
+        * very high heap space consumption, depending on the size of elements.
+        *
+        * <p>Note that this is difficult to tune, because it does not take 
into account
+        * the size of individual objects.
+        */
+       public static final int DEFAULT_QUEUE_CAPACITY = 100;
+       private final int queueCapacity;
+       private final ConcurrentHashMap<Integer, AsyncRecordQueue<T>> queues = 
new ConcurrentHashMap<>();
+       private final ConcurrentHashMap<AsyncRecordQueue<T>, Boolean> 
emptyQueues = new ConcurrentHashMap<>();
+       private final PriorityQueue<AsyncRecordQueue<T>> heads = new 
+       private volatile boolean running = true;
+       private volatile long maxEmitTimestamp = Long.MAX_VALUE;
+       private long maxLookaheadMillis = 60 * 1000; // one minute
+       private long idleSleepMillis = 100;
+       private final Object condition = new Object();
+       public RecordEmitter(int queueCapacity) {
+               this.queueCapacity = queueCapacity;
+       }
+       private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue 
right) {
+               return, right.headTimestamp);
+       }
+       /**
+        * Accepts records from readers.
+        *
+        * @param <T>
+        */
+       public interface RecordQueue<T> {
+               void put(T record) throws InterruptedException;
+               int getQueueId();
+               int getSize();
+               T peek();
+       }
+       private class AsyncRecordQueue<T> implements RecordQueue<T> {
+               private final ArrayBlockingQueue<T> queue;
+               private final int queueId;
+               long headTimestamp;
+               private AsyncRecordQueue(int queueId) {
+                       super();
+                       this.queue = new ArrayBlockingQueue<>(queueCapacity);
+                       this.queueId = queueId;
+                       this.headTimestamp = Long.MAX_VALUE;
+               }
+               public void put(T record) throws InterruptedException {
+                       queue.put(record);
+                       // TODO: not pretty having this here
+                       synchronized (condition) {
+                               condition.notify();
+                       }
+               }
+               public int getQueueId() {
+                       return queueId;
+               }
+               public int getSize() {
+                       return queue.size();
+               }
+               public T peek() {
+                       return queue.peek();
+               }
+       }
+       /**
+        * The queue for the given producer (i.e. Kinesis shard consumer 
+        *
+        * <p>The producer may hold on to the queue for subsequent records.
+        *
+        * @param producerIndex
+        * @return
+        */
+       public RecordQueue<T> getQueue(int producerIndex) {
+               return queues.computeIfAbsent(producerIndex, (key) -> {
+                       AsyncRecordQueue<T> q = new 
+                       emptyQueues.put(q, false);
+                       return q;
+               });
+       }
+       /**
+        * How far ahead of the watermark records should be emitted.
+        *
+        * <p>Needs to account for the latency of obtaining the global 
+        *
+        * @param maxLookaheadMillis
+        */
+       public void setMaxLookaheadMillis(long maxLookaheadMillis) {
+               this.maxLookaheadMillis = maxLookaheadMillis;
+     "[setMaxLookaheadMillis] Max lookahead millis set to 
{}", maxLookaheadMillis);
+       }
+       /**
+        * Set the current watermark.
+        *
+        * <p>This watermark will be used to control which records to emit from 
the queues of pending
+        * elements. When an element timestamp is ahead of the watermark by at 
least {@link
+        * #maxLookaheadMillis}, elements in that queue will wait until the 
watermark advances.
+        *
+        * @param watermark
+        */
+       public void setCurrentWatermark(long watermark) {
+               maxEmitTimestamp = watermark + maxLookaheadMillis;
+               synchronized (condition) {
+                       condition.notify();
+               }
+                       "[setCurrentWatermark] Current watermark set to {}, 
maxEmitTimestamp = {}",
+                       watermark,
+                       maxEmitTimestamp);
+       }
+       /**
+        * Run loop, does not return unless {@link #stop()} was called.
+        */
+       @Override
+       public void run() {
 Review comment:
   That seems possible, although given the moderate size of the run method and 
the need to understand it in its entirety I'm not sure how much we would gain. 
It would be better made as separate change anyways.

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

Reply via email to