[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930177#comment-15930177 ]
ASF GitHub Bot commented on FLINK-5544: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r106677685 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** + * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each + * partition's leader is stored in the heap. When the timers in a partition is changed, we + * will change the partition's leader and update the heap accordingly. + */ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() + .setMergeOperator(new StringAppendOperator()) + .setCompactionStyle(CompactionStyle.UNIVERSAL); + ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions); + + DBOptions dbOptions = new DBOptions() + .setCreateIfMissing(true) + .setUseFsync(false) + .setDisableDataSync(true) + .setMaxOpenFiles(-1); + + List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); + + try { + this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles); + } catch (RocksDBException e) { + throw new RuntimeException("Error while creating the RocksDB instance.", e); + } + + this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS); + + ColumnFamilyHandle eventTimeColumnFamilyHandle; + ColumnFamilyHandle processingTimeColumnFamilyHandle; + try { + ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions); + ColumnFamilyDescriptor processingTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions); + eventTimeColumnFamilyHandle = db.createColumnFamily(eventTimeColumnFamilyDescriptor); + processingTimeColumnFamilyHandle = db.createColumnFamily(processingTimeColumnFamilyDescriptor); + } catch (RocksDBException e) { + throw new RuntimeException("Error while creating the column families.", e); + } + + this.processingTimeHeap = new PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle); + this.eventTimeHeap = new PersistentTimerHeap(numPartitions, eventTimeColumnFamilyHandle); + } + + // ------------------------------------------------------------------------ + // InternalTimerService Implementation + // ------------------------------------------------------------------------ + + @Override + public void start() { + // rebuild the heaps + eventTimeHeap.initialize(); + processingTimeHeap.initialize(); + + // register the processing timer with the minimum timestamp + Tuple4<Integer, Long, K, N> headProcessingTimer = processingTimeHeap.top(); + if (headProcessingTimer != null) { + nextTimer = processingTimeService.registerTimer(headProcessingTimer.f1, this); + } + } + + @Override + public void close() { + if (db != null) { + db.close(); + } + + if (dbPath != null) { + try { + FileSystem fileSystem = dbPath.getFileSystem(); + if (fileSystem.exists(dbPath)) { + fileSystem.delete(dbPath, true); + } + } catch (IOException e) { + throw new RuntimeException("Error while cleaning directory for rocksdb timer service.", e); + } + } + } + + @Override + public void onEventTime(long timestamp) throws Exception { + List<Tuple4<Integer, Long, K, N>> timers = eventTimeHeap.peek(timestamp); + for (Tuple4<Integer, Long, K, N> timer : timers) { + keyContext.setCurrentKey(timer.f2); + triggerTarget.onEventTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3)); + } + } + + @Override + public void onProcessingTime(long timestamp) throws Exception { + nextTimer = null; + + List<Tuple4<Integer, Long, K, N>> timers = processingTimeHeap.peek(timestamp); + for (Tuple4<Integer, Long, K, N> timer : timers) { + keyContext.setCurrentKey(timer.f2); + triggerTarget.onProcessingTime(new InternalTimer<>(timer.f1, timer.f2, timer.f3)); + } + + if (nextTimer == null) { + Tuple4<Integer, Long, K, N> headTimer = processingTimeHeap.top(); + if (headTimer != null) { + nextTimer = processingTimeService.registerTimer(headTimer.f1, this); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public void registerProcessingTimeTimer(N namespace, long time) { + boolean isNewHead = processingTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time); + + if (isNewHead) { + if (nextTimer != null) { + nextTimer.cancel(false); + } + + Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top(); + if (newHeadTimer == null || newHeadTimer.f1 != time) { + throw new IllegalStateException(); + } + + nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this); + } + } + + @SuppressWarnings("unchecked") + @Override + public void deleteProcessingTimeTimer(N namespace, long time) { + boolean isCurrentHead = processingTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time); + + if (isCurrentHead) { + if (nextTimer != null) { + nextTimer.cancel(false); + } + + Tuple4<Integer, Long, K, N> newHeadTimer = processingTimeHeap.top(); + if (newHeadTimer != null) { + if (newHeadTimer.f1 < time) { + throw new IllegalStateException(); + } + + nextTimer = processingTimeService.registerTimer(newHeadTimer.f1, this); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public void registerEventTimeTimer(N namespace, long time) { + eventTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time); + } + + @SuppressWarnings("unchecked") + @Override + public void deleteEventTimeTimer(N namespace, long time) { + eventTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time); + } + + @Override + public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int keyGroup) { + return eventTimeHeap.getTimers(keyGroup); + } + + @Override + public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int keyGroup) { + return processingTimeHeap.getTimers(keyGroup); + } + + @Override + public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) { + eventTimeHeap.restoreTimers(keyGroup, internalTimers); + } + + @Override + public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K, N>> internalTimers) { + processingTimeHeap.restoreTimers(keyGroup, internalTimers); + } + + @Override + public int numProcessingTimeTimers() { + return processingTimeHeap.numTimers(null); + } + + @Override + public int numEventTimeTimers() { + return eventTimeHeap.numTimers(null); + } + + @Override + public int numProcessingTimeTimers(N namespace) { + return processingTimeHeap.numTimers(namespace); + } + + @Override + public int numEventTimeTimers(N namespace) { + return eventTimeHeap.numTimers(namespace); + } + + // ------------------------------------------------------------------------ + // Partitioning Methods + // ------------------------------------------------------------------------ + + /** + * Assigns the given key group to a partition. + */ + private static int getPartitionForKeyGroup(KeyGroupRange keyGroupRange, int keyGroup, int numPartitions) { + Preconditions.checkArgument(keyGroupRange != null, "The range must not be null"); + Preconditions.checkArgument(numPartitions > 0, "Partition count must not be smaller than zero."); + + Preconditions.checkArgument(keyGroup >= keyGroupRange.getStartKeyGroup() && keyGroup <= keyGroupRange.getEndKeyGroup(), "Key group must be in the range"); + + long numKeyGroupsPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions; + long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeyGroupsPerPartition * numPartitions; + + keyGroup -= keyGroupRange.getStartKeyGroup(); + + if (keyGroup >= (numKeyGroupsPerPartition + 1L) * numFatPartitions) { + return (int)((keyGroup - (numKeyGroupsPerPartition + 1L) * numFatPartitions) / numKeyGroupsPerPartition + numFatPartitions); + } else { + return (int)(keyGroup / (numKeyGroupsPerPartition + 1L)); + } + } + + /** + * Compute the range of the given partition + */ + private static KeyGroupRange getRangeForPartition(KeyGroupRange keyGroupRange, int partitionIndex, int numPartitions) { + Preconditions.checkArgument(keyGroupRange != null, "The range must not be null"); + Preconditions.checkArgument(partitionIndex >= 0, "Partition index must be not smaller than zero."); + Preconditions.checkArgument(numPartitions > 0, "Partition count must be greater than zero."); + + long numKeysPerPartition = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) / numPartitions; + long numFatPartitions = (keyGroupRange.getEndKeyGroup() - keyGroupRange.getStartKeyGroup() + 1L) - numKeysPerPartition * numPartitions; + + if (partitionIndex >= numFatPartitions) { + int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(numFatPartitions * (numKeysPerPartition + 1L) + (partitionIndex - numFatPartitions) * numKeysPerPartition); + int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition - 1L); + + return (startKeyGroup > endKeyGroup ? null : new KeyGroupRange(startKeyGroup, endKeyGroup)); + } else { + int startKeyGroup = keyGroupRange.getStartKeyGroup() + (int)(partitionIndex * (numKeysPerPartition + 1L)); + int endKeyGroup = (int)(startKeyGroup + numKeysPerPartition); + + return new KeyGroupRange(startKeyGroup, endKeyGroup); + } + } + + // ------------------------------------------------------------------------ + // Serialization Methods + // ------------------------------------------------------------------------ + + private byte[] serializeKeyGroup(int keyGroup) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream); + + try { + IntSerializer.INSTANCE.serialize(keyGroup, outputView); + } catch (IOException e) { + throw new RuntimeException("Error while deserializing the key group.", e); + } + + return outputStream.toByteArray(); + } + + private byte[] serializeRawTimer(Tuple4<Integer, Long, K, N> rawTimer) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream); + + try { + IntSerializer.INSTANCE.serialize(rawTimer.f0, outputView); + LongSerializer.INSTANCE.serialize(rawTimer.f1, outputView); + keySerializer.serialize(rawTimer.f2, outputView); + namespaceSerializer.serialize(rawTimer.f3, outputView); + } catch (IOException e) { + throw new RuntimeException("Error while serializing the raw timer.", e); + } + + return outputStream.toByteArray(); + } + + private Tuple4<Integer, Long, K, N> deserializeRawTimer(byte[] bytes) { + ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); + DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream); + + try { + int keyGroup = IntSerializer.INSTANCE.deserialize(inputView); + long timestamp = LongSerializer.INSTANCE.deserialize(inputView); + K key = keySerializer.deserialize(inputView); + N namespace = namespaceSerializer.deserialize(inputView); + + return new Tuple4<>(keyGroup, timestamp, key, namespace); + } catch (IOException e) { + throw new RuntimeException("Error while deserializing the raw timer.", e); + } + } + + /** + * A timer store backed by RocksDB. + * + * The timers are stored in RocksDB in the order of key groups. To allow + * efficient access, the timers are partitioned and the leader of each + * partition is stored in an in-memory heap. The top of the heap is + * exactly the first timer to trigger. The heap is updated whenever the + * partition's leader is updated. + */ + private class PersistentTimerHeap { --- End diff -- I think this class could benefit from a more detailed documentation. For example, about the difference/relationship of key-groups, partitions, fatpartitions. You could also explain, that this is conceptually like a heap-of-heaps, where the outer heap holds inner heaps, one for each key-group and that the inner heaps hold the timers. Then, the outer heap is in-memory and the inner heaps are in fact completely ordered and held in RocksDB. > Implement Internal Timer Service in RocksDB > ------------------------------------------- > > Key: FLINK-5544 > URL: https://issues.apache.org/jira/browse/FLINK-5544 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing > Reporter: Xiaogang Shi > Assignee: Xiaogang Shi > > Now the only implementation of internal timer service is > HeapInternalTimerService which stores all timers in memory. In the cases > where the number of keys is very large, the timer service will cost too much > memory. A implementation which stores timers in RocksDB seems good to deal > with these cases. > It might be a little challenging to implement a RocksDB timer service because > the timers are accessed in different ways. When timers are triggered, we need > to access timers in the order of timestamp. But when performing checkpoints, > we must have a method to obtain all timers of a given key group. > A good implementation, as suggested by [~StephanEwen], follows the idea of > merge sorting. We can store timers in RocksDB with the format > {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put > together and are sorted. > Then we can deploy an in-memory heap which keeps the first timer of each key > group to get the next timer to trigger. When a key group's first timer is > updated, we can efficiently update the heap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)