[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930174#comment-15930174 ]
ASF GitHub Bot commented on FLINK-5544: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106671203 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java --- @@ -18,43 +18,306 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Interface for working with time and timers. * * <p>This is the internal version of {@link org.apache.flink.streaming.api.TimerService} * that allows to specify a key and a namespace to which timers should be scoped. * + * All d + * + * @param <K> Type of the keys in the stream * @param <N> Type of the namespace to which timers are scoped. */ @Internal -public interface InternalTimerService<N> { +public abstract class InternalTimerService<K, N> implements ProcessingTimeCallback, EventTimeCallback { + + protected final ProcessingTimeService processingTimeService; + + protected final KeyContext keyContext; + + protected final int totalKeyGroups; + + protected final KeyGroupRange keyGroupRange; + + /** + * The one and only Future (if any) registered to execute the + * next {@link Triggerable} action, when its (processing) time arrives. + */ + protected ScheduledFuture<?> nextTimer; + + /** + * The local event time, as denoted by the last received + * {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}. + */ + private long currentWatermark = Long.MIN_VALUE; + + // Variables to be set when the service is started. + + protected TypeSerializer<K> keySerializer; + + protected TypeSerializer<N> namespaceSerializer; + + private InternalTimer.TimerSerializer<K, N> timerSerializer; + + protected Triggerable<K, N> triggerTarget; + + private volatile boolean isInitialized; + + public InternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + + this.totalKeyGroups = totalKeyGroups; + this.keyGroupRange = checkNotNull(keyGroupRange); + this.keyContext = checkNotNull(keyContext); + this.processingTimeService = checkNotNull(processingTimeService); + } /** Returns the current processing time. */ - long currentProcessingTime(); + public long currentProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } /** Returns the current event-time watermark. */ - long currentWatermark(); + public long currentWatermark() { + return currentWatermark; + } /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer fires. */ - void registerProcessingTimeTimer(N namespace, long time); + abstract public void registerProcessingTimeTimer(N namespace, long time); /** * Deletes the timer for the given key and namespace. */ - void deleteProcessingTimeTimer(N namespace, long time); + abstract public void deleteProcessingTimeTimer(N namespace, long time); /** * Registers a timer to be fired when processing time passes the given time. The namespace * you pass here will be provided when the timer fires. */ - void registerEventTimeTimer(N namespace, long time); + abstract public void registerEventTimeTimer(N namespace, long time); /** * Deletes the timer for the given key and namespace. */ - void deleteEventTimeTimer(N namespace, long time); + abstract public void deleteEventTimeTimer(N namespace, long time); + + /** + * Returns the timers for the given key group. + */ + abstract public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup); + + /** + * Returns the timers for the given key group. + */ + abstract public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup); + + /** + * Restores the timers for the given key group. + */ + abstract public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> timers); + + /** + * Restores the timers for the given key group. + */ + abstract public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> timers); + + /** + * Starts the execution of the timer service + */ + abstract public void start(); + + /** + * Closes the timer service. + */ + abstract public void close(); + + public void advanceWatermark(long watermark) throws Exception { + if (watermark < currentWatermark) { + throw new IllegalStateException("The watermark is late."); + } + + currentWatermark = watermark; + + onEventTime(watermark); + } + + /** + * Snapshots the timers (both processing and event time ones) for a given {@code keyGroupIdx}. + * @param stream the stream to write to. + * @param keyGroupIdx the id of the key-group to be put in the snapshot. + */ + public void snapshotTimersForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx) throws Exception { + InstantiationUtil.serializeObject(stream, keySerializer); + InstantiationUtil.serializeObject(stream, namespaceSerializer); + + // write the event time timers + Collection<InternalTimer<K, N>> eventTimers = getEventTimeTimersForKeyGroup(keyGroupIdx); + if (eventTimers != null) { + stream.writeInt(eventTimers.size()); + for (InternalTimer<K, N> timer : eventTimers) { + this.timerSerializer.serialize(timer, stream); + } + } else { + stream.writeInt(0); + } + + // write the processing time timers + Collection<InternalTimer<K, N>> processingTimers = getProcessingTimeTimersForKeyGroup(keyGroupIdx); + if (processingTimers != null) { + stream.writeInt(processingTimers.size()); + for (InternalTimer<K, N> timer : processingTimers) { + this.timerSerializer.serialize(timer, stream); + } + } else { + stream.writeInt(0); + } + } + + /** + * Restore the timers (both processing and event time ones) for a given {@code keyGroupIdx}. + * @param stream the stream to read from. + * @param keyGroupIdx the id of the key-group to be put in the snapshot. + * @param userCodeClassLoader the class loader that will be used to deserialize + * the local key and namespace serializers. + */ + public void restoreTimersForKeyGroup(DataInputViewStreamWrapper stream, int keyGroupIdx, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { + TypeSerializer<K> tmpKeySerializer = InstantiationUtil.deserializeObject(stream, userCodeClassLoader); + TypeSerializer<N> tmpNamespaceSerializer = InstantiationUtil.deserializeObject(stream, userCodeClassLoader); + + if ((this.keySerializer != null && !this.keySerializer.equals(tmpKeySerializer)) || + (this.namespaceSerializer != null && !this.namespaceSerializer.equals(tmpNamespaceSerializer))) { + + throw new IllegalArgumentException("Tried to restore timers " + + "for the same service with different serializers."); + } + + this.keySerializer = tmpKeySerializer; + this.namespaceSerializer = tmpNamespaceSerializer; + + InternalTimer.TimerSerializer<K, N> timerSerializer = + new InternalTimer.TimerSerializer<>(this.keySerializer, this.namespaceSerializer); + + checkArgument(keyGroupRange.contains(keyGroupIdx), + "Key Group " + keyGroupIdx + " does not belong to the local range."); + + // read the event time timers + int sizeOfEventTimeTimers = stream.readInt(); + if (sizeOfEventTimeTimers > 0) { + List<InternalTimer<K, N>> eventTimeTimers = new ArrayList<>(); + for (int i = 0; i < sizeOfEventTimeTimers; i++) { + InternalTimer<K, N> timer = timerSerializer.deserialize(stream); + + eventTimeTimers.add(timer); + } + + restoreEventTimeTimersForKeyGroup(keyGroupIdx, eventTimeTimers); + } + + // read the processing time timers + int sizeOfProcessingTimeTimers = stream.readInt(); + if (sizeOfProcessingTimeTimers > 0) { + List<InternalTimer<K, N>> processingTimeTimers = new ArrayList<>(); + for (int i = 0; i < sizeOfProcessingTimeTimers; i++) { + InternalTimer<K, N> timer = timerSerializer.deserialize(stream); + processingTimeTimers.add(timer); + } + + restoreProcessingTimeTimersForKeyGroup(keyGroupIdx, processingTimeTimers); + } + } + + /** + * Starts the local {@link InternalTimerService} by: + * <ol> + * <li>Setting the {@code keySerialized} and {@code namespaceSerializer} for the timers it will contain.</li> + * <li>Setting the {@code triggerTarget} which contains the action to be performed when a timer fires.</li> + * <li>Re-registering timers that were retrieved after recoveting from a node failure, if any.</li> + * </ol> + * This method can be called multiple times, as long as it is called with the same serializers. + */ + void startTimerService( + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + Triggerable<K, N> triggerTarget) + { + + if (isInitialized) { --- End diff -- Either `isInitialized` does not require to be volatile or this code is potentially broken. If you need the thread-safety, I suggest replacing this with `AtomicBoolean::compareAndSet(...)`. > Implement Internal Timer Service in RocksDB > ------------------------------------------- > > Key: FLINK-5544 > URL: https://issues.apache.org/jira/browse/FLINK-5544 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing > Reporter: Xiaogang Shi > Assignee: Xiaogang Shi > > Now the only implementation of internal timer service is > HeapInternalTimerService which stores all timers in memory. In the cases > where the number of keys is very large, the timer service will cost too much > memory. A implementation which stores timers in RocksDB seems good to deal > with these cases. > It might be a little challenging to implement a RocksDB timer service because > the timers are accessed in different ways. When timers are triggered, we need > to access timers in the order of timestamp. But when performing checkpoints, > we must have a method to obtain all timers of a given key group. > A good implementation, as suggested by [~StephanEwen], follows the idea of > merge sorting. We can store timers in RocksDB with the format > {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put > together and are sorted. > Then we can deploy an in-memory heap which keeps the first timer of each key > group to get the next timer to trigger. When a key group's first timer is > updated, we can efficiently update the heap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)