[ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16186256#comment-16186256 ]
ASF GitHub Bot commented on FLINK-5544: --------------------------------------- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3359#discussion_r141944634 --- Diff: flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java --- @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.api.operators; + + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * {@link InternalTimerService} that stores timers in RocksDB. + */ +public class RocksDBInternalTimerService<K, N> extends InternalTimerService<K, N> { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBInternalTimerService.class); + + /** The data base where stores all timers */ + private final RocksDB db; + + /** The path where the rocksdb locates */ + private final Path dbPath; + + /** + * The in-memory heaps backed by rocksdb to retrieve the next timer to trigger. Each + * partition's leader is stored in the heap. When the timers in a partition is changed, we + * will change the partition's leader and update the heap accordingly. + */ + private final int numPartitions; + private final PersistentTimerHeap eventTimeHeap; + private final PersistentTimerHeap processingTimeHeap; + + private static int MAX_PARTITIONS = (1 << 16); + + public RocksDBInternalTimerService( + int totalKeyGroups, + KeyGroupRange keyGroupRange, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + Path dbPath) { + + super(totalKeyGroups, keyGroupRange, keyContext, processingTimeService); + + this.dbPath = dbPath; + + try { + FileSystem fileSystem = this.dbPath.getFileSystem(); + if (fileSystem.exists(this.dbPath)) { + fileSystem.delete(this.dbPath, true); + } + + fileSystem.mkdirs(dbPath); + } catch (IOException e) { + throw new RuntimeException("Error while creating directory for rocksdb timer service.", e); + } + + ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions() + .setMergeOperator(new StringAppendOperator()) + .setCompactionStyle(CompactionStyle.UNIVERSAL); + ColumnFamilyDescriptor defaultColumnDescriptor = new ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions); + + DBOptions dbOptions = new DBOptions() + .setCreateIfMissing(true) + .setUseFsync(false) + .setDisableDataSync(true) + .setMaxOpenFiles(-1); + + List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); + + try { + this.db = RocksDB.open(dbOptions, dbPath.getPath(), Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles); + } catch (RocksDBException e) { + throw new RuntimeException("Error while creating the RocksDB instance.", e); + } + + this.numPartitions = Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS); + + ColumnFamilyHandle eventTimeColumnFamilyHandle; + ColumnFamilyHandle processingTimeColumnFamilyHandle; + try { + ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions); + ColumnFamilyDescriptor processingTimeColumnFamilyDescriptor = new ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions); + eventTimeColumnFamilyHandle = db.createColumnFamily(eventTimeColumnFamilyDescriptor); + processingTimeColumnFamilyHandle = db.createColumnFamily(processingTimeColumnFamilyDescriptor); + } catch (RocksDBException e) { + throw new RuntimeException("Error while creating the column families.", e); + } + + this.processingTimeHeap = new PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle); + this.eventTimeHeap = new PersistentTimerHeap(numPartitions, eventTimeColumnFamilyHandle); + } + + // ------------------------------------------------------------------------ + // InternalTimerService Implementation + // ------------------------------------------------------------------------ + + @Override + public void start() { + // rebuild the heaps + eventTimeHeap.initialize(); + processingTimeHeap.initialize(); + + // register the processing timer with the minimum timestamp + Tuple4<Integer, Long, K, N> headProcessingTimer = processingTimeHeap.top(); + if (headProcessingTimer != null) { + nextTimer = processingTimeService.registerTimer(headProcessingTimer.f1, this); + } + } + + @Override + public void close() { + if (db != null) { + db.close(); + } + + if (dbPath != null) { + try { + FileSystem fileSystem = dbPath.getFileSystem(); + if (fileSystem.exists(dbPath)) { + fileSystem.delete(dbPath, true); + } + } catch (IOException e) { + throw new RuntimeException("Error while cleaning directory for rocksdb timer service.", e); --- End diff -- I'd suggest creating a `FlinkRocksDBException` to wrap `RocksDBException` and throw it > Implement Internal Timer Service in RocksDB > ------------------------------------------- > > Key: FLINK-5544 > URL: https://issues.apache.org/jira/browse/FLINK-5544 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing > Reporter: Xiaogang Shi > Assignee: Xiaogang Shi > > Now the only implementation of internal timer service is > HeapInternalTimerService which stores all timers in memory. In the cases > where the number of keys is very large, the timer service will cost too much > memory. A implementation which stores timers in RocksDB seems good to deal > with these cases. > It might be a little challenging to implement a RocksDB timer service because > the timers are accessed in different ways. When timers are triggered, we need > to access timers in the order of timestamp. But when performing checkpoints, > we must have a method to obtain all timers of a given key group. > A good implementation, as suggested by [~StephanEwen], follows the idea of > merge sorting. We can store timers in RocksDB with the format > {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put > together and are sorted. > Then we can deploy an in-memory heap which keeps the first timer of each key > group to get the next timer to trigger. When a key group's first timer is > updated, we can efficiently update the heap. -- This message was sent by Atlassian JIRA (v6.4.14#64029)