[ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930170#comment-15930170
 ] 

ASF GitHub Bot commented on FLINK-5544:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106676403
  
    --- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends 
InternalTimerService<K, N> {
    +   
    +   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +   
    +   /** The data base where stores all timers */
    +   private final RocksDB db;
    +   
    +   /** The path where the rocksdb locates */
    +   private final Path dbPath;
    +
    +   /**
    +    * The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
    +    * partition's leader is stored in the heap. When the timers in a 
partition is changed, we
    +    * will change the partition's leader and update the heap accordingly.
    +    */
    +   private final int numPartitions;
    +   private final PersistentTimerHeap eventTimeHeap;
    +   private final PersistentTimerHeap processingTimeHeap;
    +   
    +   private static int MAX_PARTITIONS = (1 << 16);
    +
    +   public RocksDBInternalTimerService(
    +                   int totalKeyGroups,
    +                   KeyGroupRange keyGroupRange,
    +                   KeyContext keyContext,
    +                   ProcessingTimeService processingTimeService,
    +                   Path dbPath) {
    +
    +           super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
    +           
    +           this.dbPath = dbPath;
    +           
    +           try {
    +                   FileSystem fileSystem = this.dbPath.getFileSystem();
    +                   if (fileSystem.exists(this.dbPath)) {
    +                           fileSystem.delete(this.dbPath, true);
    +                   }
    +                   
    +                   fileSystem.mkdirs(dbPath);
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
    +           }
    +
    +           ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
    +                           .setMergeOperator(new StringAppendOperator())
    +                           .setCompactionStyle(CompactionStyle.UNIVERSAL);
    +           ColumnFamilyDescriptor defaultColumnDescriptor = new 
ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +           DBOptions dbOptions = new DBOptions()
    +                           .setCreateIfMissing(true)
    +                           .setUseFsync(false)
    +                           .setDisableDataSync(true)
    +                           .setMaxOpenFiles(-1);
    +
    +           List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
    +
    +           try {
    +                   this.db = RocksDB.open(dbOptions, dbPath.getPath(), 
Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +           } catch (RocksDBException e) {
    +                   throw new RuntimeException("Error while creating the 
RocksDB instance.", e);
    +           }
    +
    +           this.numPartitions = 
Math.min(keyGroupRange.getNumberOfKeyGroups(), MAX_PARTITIONS);
    +
    +           ColumnFamilyHandle eventTimeColumnFamilyHandle;
    +           ColumnFamilyHandle processingTimeColumnFamilyHandle;
    +           try {
    +                   ColumnFamilyDescriptor eventTimeColumnFamilyDescriptor 
= new ColumnFamilyDescriptor("eventTime".getBytes(), columnFamilyOptions);
    +                   ColumnFamilyDescriptor 
processingTimeColumnFamilyDescriptor = new 
ColumnFamilyDescriptor("processingTime".getBytes(), columnFamilyOptions);
    +                   eventTimeColumnFamilyHandle = 
db.createColumnFamily(eventTimeColumnFamilyDescriptor);
    +                   processingTimeColumnFamilyHandle = 
db.createColumnFamily(processingTimeColumnFamilyDescriptor);
    +           } catch (RocksDBException e) {
    +                   throw new RuntimeException("Error while creating the 
column families.", e);
    +           }
    +
    +           this.processingTimeHeap = new 
PersistentTimerHeap(numPartitions, processingTimeColumnFamilyHandle);
    +           this.eventTimeHeap = new PersistentTimerHeap(numPartitions, 
eventTimeColumnFamilyHandle);
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  InternalTimerService Implementation
    +   // 
------------------------------------------------------------------------
    +   
    +   @Override
    +   public void start() {
    +           // rebuild the heaps
    +           eventTimeHeap.initialize();
    +           processingTimeHeap.initialize();
    +           
    +           // register the processing timer with the minimum timestamp
    +           Tuple4<Integer, Long, K, N> headProcessingTimer = 
processingTimeHeap.top();
    +           if (headProcessingTimer != null) {
    +                   nextTimer = 
processingTimeService.registerTimer(headProcessingTimer.f1, this);
    +           }
    +   }
    +
    +   @Override
    +   public void close() {
    +           if (db != null) {
    +                   db.close();
    +           }
    +           
    +           if (dbPath != null) {
    +                   try {
    +                           FileSystem fileSystem = dbPath.getFileSystem();
    +                           if (fileSystem.exists(dbPath)) {
    +                                   fileSystem.delete(dbPath, true);
    +                           }
    +                   } catch (IOException e) {
    +                           throw new RuntimeException("Error while 
cleaning directory for rocksdb timer service.", e);
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public void onEventTime(long timestamp) throws Exception {
    +           List<Tuple4<Integer, Long, K, N>> timers = 
eventTimeHeap.peek(timestamp);
    +           for (Tuple4<Integer, Long, K, N> timer : timers) {
    +                   keyContext.setCurrentKey(timer.f2);
    +                   triggerTarget.onEventTime(new InternalTimer<>(timer.f1, 
timer.f2, timer.f3));
    +           }
    +   }
    +
    +   @Override
    +   public void onProcessingTime(long timestamp) throws Exception {
    +           nextTimer = null;
    +
    +           List<Tuple4<Integer, Long, K, N>> timers = 
processingTimeHeap.peek(timestamp);
    +           for (Tuple4<Integer, Long, K, N> timer : timers) {
    +                   keyContext.setCurrentKey(timer.f2);
    +                   triggerTarget.onProcessingTime(new 
InternalTimer<>(timer.f1, timer.f2, timer.f3));
    +           }
    +
    +           if (nextTimer == null) {
    +                   Tuple4<Integer, Long, K, N> headTimer = 
processingTimeHeap.top();
    +                   if (headTimer != null) {
    +                           nextTimer = 
processingTimeService.registerTimer(headTimer.f1, this);
    +                   }
    +           }
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   @Override
    +   public void registerProcessingTimeTimer(N namespace, long time) {
    +           boolean isNewHead = 
processingTimeHeap.add((K)keyContext.getCurrentKey(), namespace, time);
    +
    +           if (isNewHead) {
    +                   if (nextTimer != null) {
    +                           nextTimer.cancel(false);
    +                   }
    +
    +                   Tuple4<Integer, Long, K, N> newHeadTimer = 
processingTimeHeap.top();
    +                   if (newHeadTimer == null || newHeadTimer.f1 != time) {
    +                           throw new IllegalStateException();
    +                   }
    +
    +                   nextTimer = 
processingTimeService.registerTimer(newHeadTimer.f1, this);
    +           }
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   @Override
    +   public void deleteProcessingTimeTimer(N namespace, long time) {
    +           boolean isCurrentHead = 
processingTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, time);
    +
    +           if (isCurrentHead) {
    +                   if (nextTimer != null) {
    +                           nextTimer.cancel(false);
    +                   }
    +
    +                   Tuple4<Integer, Long, K, N> newHeadTimer = 
processingTimeHeap.top();
    +                   if (newHeadTimer != null) {
    +                           if (newHeadTimer.f1 < time) {
    +                                   throw new IllegalStateException();
    +                           }
    +
    +                           nextTimer = 
processingTimeService.registerTimer(newHeadTimer.f1, this);
    +                   }
    +           }
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   @Override
    +   public void registerEventTimeTimer(N namespace, long time) {
    +           eventTimeHeap.add((K)keyContext.getCurrentKey(), namespace, 
time);
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   @Override
    +   public void deleteEventTimeTimer(N namespace, long time) {
    +           eventTimeHeap.remove((K)keyContext.getCurrentKey(), namespace, 
time);
    +   }
    +
    +   @Override
    +   public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int 
keyGroup) {
    +           return eventTimeHeap.getTimers(keyGroup);
    +   }
    +
    +   @Override
    +   public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int 
keyGroup) {
    +           return processingTimeHeap.getTimers(keyGroup);
    +   }
    +
    +   @Override
    +   public void restoreEventTimeTimersForKeyGroup(int keyGroup, 
Iterable<InternalTimer<K, N>> internalTimers) {
    +           eventTimeHeap.restoreTimers(keyGroup, internalTimers);
    +   }
    +
    +   @Override
    +   public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, 
Iterable<InternalTimer<K, N>> internalTimers) {
    +           processingTimeHeap.restoreTimers(keyGroup, internalTimers);
    +   }
    +
    +   @Override
    +   public int numProcessingTimeTimers() {
    +           return processingTimeHeap.numTimers(null);
    +   }
    +
    +   @Override
    +   public int numEventTimeTimers() {
    +           return eventTimeHeap.numTimers(null);
    +   }
    +
    +   @Override
    +   public int numProcessingTimeTimers(N namespace) {
    +           return processingTimeHeap.numTimers(namespace);
    +   }
    +
    +   @Override
    +   public int numEventTimeTimers(N namespace) {
    +           return eventTimeHeap.numTimers(namespace);
    +   }
    +   
    +   // 
------------------------------------------------------------------------
    +   //  Partitioning Methods
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Assigns the given key group to a partition.
    +    */
    +   private static int getPartitionForKeyGroup(KeyGroupRange keyGroupRange, 
int keyGroup, int numPartitions) {
    +           Preconditions.checkArgument(keyGroupRange != null, "The range 
must not be null");
    +           Preconditions.checkArgument(numPartitions > 0, "Partition count 
must not be smaller than zero.");
    +
    +           Preconditions.checkArgument(keyGroup >= 
keyGroupRange.getStartKeyGroup() && keyGroup <= keyGroupRange.getEndKeyGroup(), 
"Key group must be in the range");
    +
    +           long numKeyGroupsPerPartition = (keyGroupRange.getEndKeyGroup() 
- keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +           long numFatPartitions = (keyGroupRange.getEndKeyGroup() - 
keyGroupRange.getStartKeyGroup() + 1L) - numKeyGroupsPerPartition * 
numPartitions;
    +
    +           keyGroup -= keyGroupRange.getStartKeyGroup();
    +
    +           if (keyGroup >= (numKeyGroupsPerPartition + 1L) * 
numFatPartitions) {
    +                   return (int)((keyGroup - (numKeyGroupsPerPartition + 
1L) * numFatPartitions) / numKeyGroupsPerPartition + numFatPartitions);
    +           } else {
    +                   return (int)(keyGroup / (numKeyGroupsPerPartition + 
1L));
    +           }
    +   }
    +
    +   /**
    +    * Compute the range of the given partition
    +    */
    +   private static KeyGroupRange getRangeForPartition(KeyGroupRange 
keyGroupRange, int partitionIndex, int numPartitions) {
    +           Preconditions.checkArgument(keyGroupRange != null, "The range 
must not be null");
    +           Preconditions.checkArgument(partitionIndex >= 0, "Partition 
index must be not smaller than zero.");
    +           Preconditions.checkArgument(numPartitions > 0, "Partition count 
must be greater than zero.");
    +
    +           long numKeysPerPartition = (keyGroupRange.getEndKeyGroup() - 
keyGroupRange.getStartKeyGroup() + 1L) / numPartitions;
    +           long numFatPartitions = (keyGroupRange.getEndKeyGroup() - 
keyGroupRange.getStartKeyGroup() + 1L) - numKeysPerPartition * numPartitions;
    +
    +           if (partitionIndex >= numFatPartitions) {
    +                   int startKeyGroup = keyGroupRange.getStartKeyGroup() + 
(int)(numFatPartitions * (numKeysPerPartition + 1L) + (partitionIndex - 
numFatPartitions) * numKeysPerPartition);
    +                   int endKeyGroup = (int)(startKeyGroup + 
numKeysPerPartition - 1L);
    +
    +                   return (startKeyGroup > endKeyGroup ? null : new 
KeyGroupRange(startKeyGroup, endKeyGroup));
    +           } else {
    +                   int startKeyGroup = keyGroupRange.getStartKeyGroup() + 
(int)(partitionIndex * (numKeysPerPartition + 1L));
    +                   int endKeyGroup = (int)(startKeyGroup + 
numKeysPerPartition);
    +
    +                   return new KeyGroupRange(startKeyGroup, endKeyGroup);
    +           }
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Serialization Methods
    +   // 
------------------------------------------------------------------------
    +
    +   private byte[] serializeKeyGroup(int keyGroup) {
    +           ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream();
    +           DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(outputStream);
    +
    +           try {
    +                   IntSerializer.INSTANCE.serialize(keyGroup, outputView);
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Error while deserializing 
the key group.", e);
    +           }
    +
    +           return outputStream.toByteArray();
    +   }
    +
    +   private byte[] serializeRawTimer(Tuple4<Integer, Long, K, N> rawTimer) {
    +           ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream();
    +           DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(outputStream);
    +
    +           try {
    +                   IntSerializer.INSTANCE.serialize(rawTimer.f0, 
outputView);
    +                   LongSerializer.INSTANCE.serialize(rawTimer.f1, 
outputView);
    +                   keySerializer.serialize(rawTimer.f2, outputView);
    +                   namespaceSerializer.serialize(rawTimer.f3, outputView);
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Error while serializing the 
raw timer.", e);
    --- End diff --
    
    I would disencourage the usage of plain `RuntimeException` for the same 
reason we use more specific subclasses of `Exception`, like  `IOException`. 
Better use a more specific subclass of `RuntimeException`, like 
`IllegalStateException` or whatever looks more appropriate. This should be 
changed in serveral places in this class.


> Implement Internal Timer Service in RocksDB
> -------------------------------------------
>
>                 Key: FLINK-5544
>                 URL: https://issues.apache.org/jira/browse/FLINK-5544
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Xiaogang Shi
>            Assignee: Xiaogang Shi
>
> Now the only implementation of internal timer service is 
> HeapInternalTimerService which stores all timers in memory. In the cases 
> where the number of keys is very large, the timer service will cost too much 
> memory. A implementation which stores timers in RocksDB seems good to deal 
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because 
> the timers are accessed in different ways. When timers are triggered, we need 
> to access timers in the order of timestamp. But when performing checkpoints, 
> we must have a method to obtain all timers of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of 
> merge sorting. We can store timers in RocksDB with the format 
> {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put 
> together and are sorted. 
> Then we can deploy an in-memory heap which keeps the first timer of each key 
> group to get the next timer to trigger. When a key group's first timer is 
> updated, we can efficiently update the heap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to