Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r141944569
  
    --- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
    @@ -0,0 +1,797 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.streaming.api.operators.InternalTimer;
    +import org.apache.flink.streaming.api.operators.InternalTimerService;
    +import org.apache.flink.streaming.api.operators.KeyContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ColumnFamilyOptions;
    +import org.rocksdb.CompactionStyle;
    +import org.rocksdb.DBOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.StringAppendOperator;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * {@link InternalTimerService} that stores timers in RocksDB.
    + */
    +public class RocksDBInternalTimerService<K, N> extends 
InternalTimerService<K, N> {
    +   
    +   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
    +   
    +   /** The data base where stores all timers */
    +   private final RocksDB db;
    +   
    +   /** The path where the rocksdb locates */
    +   private final Path dbPath;
    +
    +   /**
    +    * The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
    +    * partition's leader is stored in the heap. When the timers in a 
partition is changed, we
    +    * will change the partition's leader and update the heap accordingly.
    +    */
    +   private final int numPartitions;
    +   private final PersistentTimerHeap eventTimeHeap;
    +   private final PersistentTimerHeap processingTimeHeap;
    +   
    +   private static int MAX_PARTITIONS = (1 << 16);
    +
    +   public RocksDBInternalTimerService(
    +                   int totalKeyGroups,
    +                   KeyGroupRange keyGroupRange,
    +                   KeyContext keyContext,
    +                   ProcessingTimeService processingTimeService,
    +                   Path dbPath) {
    +
    +           super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
    +           
    +           this.dbPath = dbPath;
    +           
    +           try {
    +                   FileSystem fileSystem = this.dbPath.getFileSystem();
    +                   if (fileSystem.exists(this.dbPath)) {
    +                           fileSystem.delete(this.dbPath, true);
    +                   }
    +                   
    +                   fileSystem.mkdirs(dbPath);
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
    +           }
    +
    +           ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
    +                           .setMergeOperator(new StringAppendOperator())
    +                           .setCompactionStyle(CompactionStyle.UNIVERSAL);
    +           ColumnFamilyDescriptor defaultColumnDescriptor = new 
ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions);
    +
    +           DBOptions dbOptions = new DBOptions()
    +                           .setCreateIfMissing(true)
    +                           .setUseFsync(false)
    +                           .setDisableDataSync(true)
    +                           .setMaxOpenFiles(-1);
    +
    +           List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
    +
    +           try {
    +                   this.db = RocksDB.open(dbOptions, dbPath.getPath(), 
Collections.singletonList(defaultColumnDescriptor), columnFamilyHandles);
    +           } catch (RocksDBException e) {
    +                   throw new RuntimeException("Error while creating the 
RocksDB instance.", e);
    --- End diff --
    
    I'd suggest creating a `FlinkRocksDBException` to wrap `RocksDBException` 
and throw it


---

Reply via email to