[ 
https://issues.apache.org/jira/browse/FLINK-3035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140905#comment-15140905
 ] 

ASF GitHub Bot commented on FLINK-3035:
---------------------------------------

Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1617#discussion_r52466186
  
    --- Diff: 
flink-contrib/flink-statebackend-redis/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRedisState.java
 ---
    @@ -0,0 +1,374 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.commons.io.FileUtils;
    +import org.apache.commons.io.IOUtils;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.KvState;
    +import org.apache.flink.runtime.state.KvStateSnapshot;
    +import org.apache.flink.util.HDFSCopyFromLocal;
    +import org.apache.flink.util.HDFSCopyToLocal;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import redis.clients.jedis.BinaryJedis;
    +import redis.clients.jedis.exceptions.JedisConnectionException;
    +
    +import java.io.BufferedReader;
    +import java.io.ByteArrayOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.net.URI;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +/**
    + * Base class for {@link State} implementations that store state in a 
Redis database.
    + *
    + * <p>This base class is responsible for setting up the Redis database, for
    + * checkpointing/restoring the database and for disposal in the {@link 
#dispose()} method. The
    + * concrete subclasses just use the Redis handle to store/retrieve state.
    + *
    + * @param <K> The type of the key.
    + * @param <N> The type of the namespace.
    + * @param <S> The type of {@link State}.
    + * @param <SD> The type of {@link StateDescriptor}.
    + * @param <Backend> The type of the backend that snapshots this key/value 
state.
    + */
    +public abstract class AbstractRedisState<K, N, S extends State, SD extends 
StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
    +   implements KvState<K, N, S, SD, Backend>, State {
    +
    +   /** Serializer for the keys */
    +   protected final TypeSerializer<K> keySerializer;
    +
    +   /** Serializer for the namespace */
    +   protected final TypeSerializer<N> namespaceSerializer;
    +
    +   /** The current key, which the next value methods will refer to */
    +   protected K currentKey;
    +
    +   /** The current namespace, which the next value methods will refer to */
    +   protected N currentNamespace;
    +
    +   /** Store it so that we can clean up in dispose() */
    +   protected final File dbPath;
    +
    +   protected final String checkpointPath;
    +
    +   /** Our Redis instance */
    +   protected Process redisServerProcess;
    +   protected final BinaryJedis db;
    +
    +   /**
    +    * Creates a new Redis backed state.
    +    *
    +    * @param redisExecPath The path on the local system where Redis 
executable file can be found.
    +    * @param port The port to start Redis server
    +    * @param keySerializer The serializer for the keys.
    +    * @param namespaceSerializer The serializer for the namespace.
    +    * @param dbPath The path on the local system where Redis data should 
be stored.
    +    */
    +   protected AbstractRedisState(String redisExecPath, int port, 
TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, File 
dbPath, String checkpointPath) {
    +           this.keySerializer = keySerializer;
    +           this.namespaceSerializer = namespaceSerializer;
    +           this.dbPath = dbPath;
    +           this.checkpointPath = checkpointPath;
    +
    +           if (!dbPath.exists()) {
    +                   if (!dbPath.mkdirs()) {
    +                           throw new RuntimeException("Could not create 
Redis data directory.");
    +                   }
    +           }
    +
    +           try {
    +                   startRedisServer(redisExecPath, "--port", 
String.valueOf(port), "--dir", dbPath.getAbsolutePath());
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Could not start Redis 
server", e);
    +           }
    +
    +           try {
    +                   db = new BinaryJedis("localhost", port);
    +                   db.ping();
    +           } catch (JedisConnectionException e) {
    +                   throw new RuntimeException("Error while opening 
connection to Redis server", e);
    +           }
    +   }
    +
    +   /**
    +    * Creates a new Redis backed state and restores from the given backup 
directory. After
    +    * restoring the backup directory is deleted.
    +    *
    +    * @param redisExecPath The path on the local system where Redis 
executable file can be found.
    +    * @param port The port to start Redis server
    +    * @param keySerializer The serializer for the keys.
    +    * @param namespaceSerializer The serializer for the namespace.
    +    * @param dbPath The path on the local system where RocksDB data should 
be stored.
    +    * @param restorePath The path to a backup directory from which to 
restore Redis database.
    +    */
    +   protected AbstractRedisState(String redisExecPath, int port, 
TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, File 
dbPath, String checkpointPath, String restorePath) {
    +           try {
    +                   HDFSCopyToLocal.copyToLocal(new URI(restorePath + 
"/dump.rdb"), dbPath);
    +           } catch (Exception e) {
    +                   throw new RuntimeException("Error while restoring Redis 
state from " + restorePath, e);
    +           }
    +
    +           try {
    +                   startRedisServer(redisExecPath, "--port", 
String.valueOf(port), "--dir", dbPath.getAbsolutePath());
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Could not start Redis 
server");
    +           }
    +
    +           this.keySerializer = keySerializer;
    +           this.namespaceSerializer = namespaceSerializer;
    +           this.dbPath = dbPath;
    +           this.checkpointPath = checkpointPath;
    +
    +           if (!dbPath.exists()) {
    +                   if (!dbPath.mkdirs()) {
    +                           throw new RuntimeException("Could not create 
Redis data directory.");
    +                   }
    +           }
    +
    +           try {
    +                   db = new BinaryJedis("localhost", port);
    +                   db.ping();
    +           } catch (JedisConnectionException e) {
    +                   throw new RuntimeException("Error while opening 
connection to Redis server", e);
    +           }
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   final public void clear() {
    +           ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +           DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
    +           try {
    +                   writeKeyAndNamespace(out);
    +                   byte[] key = baos.toByteArray();
    +                   db.del(key);
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Error while removing entry 
from Redis", e);
    +           }
    +   }
    +
    +   protected void writeKeyAndNamespace(DataOutputView out) throws 
IOException {
    +           keySerializer.serialize(currentKey, out);
    +           out.writeByte(42);
    +           namespaceSerializer.serialize(currentNamespace, out);
    +   }
    +
    +   @Override
    +   public void setCurrentKey(K currentKey) {
    +           this.currentKey = currentKey;
    +   }
    +
    +   @Override
    +   public void setCurrentNamespace(N currentNamespace) {
    +           this.currentNamespace = currentNamespace;
    +   }
    +
    +   protected abstract KvStateSnapshot<K, N, S, SD, Backend> 
createRedisSnapshot(URI backupUri, long checkpointId);
    +
    +   @Override
    +   public KvStateSnapshot<K, N, S, SD, Backend> snapshot(long 
checkpointId, long timestamp) throws Exception {
    +           boolean success = false;
    +
    +           final File localBackupPath = new File(dbPath, "backup-" + 
checkpointId);
    +           final URI backupUri = new URI(checkpointPath + "/chk-" + 
checkpointId);
    +
    +           try {
    +                   if (!localBackupPath.exists()) {
    +                           if (!localBackupPath.mkdirs()) {
    +                                   throw new RuntimeException("Could not 
create local backup path " + localBackupPath);
    +                           }
    +                   }
    +
    +                   db.configSet("dir".getBytes(), 
localBackupPath.getAbsolutePath().getBytes());
    +                   db.save();
    +
    +                   HDFSCopyFromLocal.copyFromLocal(localBackupPath, 
backupUri);
    +                   KvStateSnapshot<K, N, S, SD, Backend> result = 
createRedisSnapshot(backupUri, checkpointId);
    +                   success = true;
    +                   return result;
    +           }finally {
    +                   FileUtils.deleteDirectory(localBackupPath);
    +                   if (!success) {
    +                           FileSystem fs = FileSystem.get(backupUri, new 
Configuration());
    +                           fs.delete(new Path(backupUri), true);
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public void dispose() {
    +           db.disconnect();
    +           stopRedisServer();
    +           try {
    +                   FileUtils.deleteDirectory(dbPath);
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Error disposing Redis data 
directory.", e);
    +           }
    +   }
    +
    +   /**
    +    * Starts the Redis server
    +    *
    +    * @param args
    +    * @throws IOException
    +     */
    +   private synchronized void startRedisServer(String... args) throws 
IOException {
    +           List<String> argsList = Arrays.asList(args);
    +           File executable = new File(argsList.get(0));
    +           ProcessBuilder pb = new ProcessBuilder(argsList);
    +           pb.directory(executable.getParentFile());
    +
    +           redisServerProcess = pb.start();
    +
    +           BufferedReader reader = new BufferedReader(new 
InputStreamReader(redisServerProcess.getInputStream()));
    +           try {
    +                   String outputLine;
    +                   do {
    +                           outputLine = reader.readLine();
    +                           if (outputLine == null) {
    +                                   //Something goes wrong. Stream is ended 
before server was activated.
    +                                   throw new RuntimeException("Can't start 
redis server. Check logs for details.");
    +                           }
    +                   } while (!outputLine.matches(".*The server is now ready 
to accept connections on port.*"));
    +           } finally {
    +                   IOUtils.closeQuietly(reader);
    +           }
    +   }
    +
    +   /**
    +    * Stops the Redis server
    +    */
    +   private synchronized void stopRedisServer() {
    +           try {
    +                   redisServerProcess.destroy();
    +                   redisServerProcess.waitFor();
    +           } catch (InterruptedException e) {
    +                   throw new RuntimeException("Failed to stop redis 
instance", e);
    +           }
    +   }
    +
    +   public static abstract class AbstractRedisSnapshot<K, N, S extends 
State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> 
implements KvStateSnapshot<K, N, S, SD, Backend> {
    +           private static final long serialVersionUID = 1L;
    +
    +           private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRedisSnapshot.class);
    +
    +           // 
------------------------------------------------------------------------
    +           //  Ctor parameters for Redis state
    +           // 
------------------------------------------------------------------------
    +
    +           /** Store it so that we can clean up in dispose() */
    +           protected final File dbPath;
    +
    +           /** Where we should put Redis backups */
    +           protected final String checkpointPath;
    +
    +           // 
------------------------------------------------------------------------
    +           //  Info about this checkpoint
    +           // 
------------------------------------------------------------------------
    +
    +           protected final URI backupUri;
    +
    +           protected long checkpointId;
    +
    +           // 
------------------------------------------------------------------------
    +           //  For sanity checks
    +           // 
------------------------------------------------------------------------
    +
    +           /** Key serializer */
    +           protected final TypeSerializer<K> keySerializer;
    +
    +           /** Namespace serializer */
    +           protected final TypeSerializer<N> namespaceSerializer;
    +
    +           /** Hash of the StateDescriptor, for sanity checks */
    +           protected final SD stateDesc;
    +
    +           public AbstractRedisSnapshot(File dbPath, String 
checkpointPath, URI backupUri, long checkpointId, TypeSerializer<K> 
keySerializer, TypeSerializer<N> namespaceSerializer, SD stateDesc) {
    +                   this.dbPath = dbPath;
    +                   this.checkpointPath = checkpointPath;
    +                   this.backupUri = backupUri;
    +                   this.checkpointId = checkpointId;
    +
    +                   this.stateDesc = stateDesc;
    +                   this.keySerializer = keySerializer;
    +                   this.namespaceSerializer = namespaceSerializer;
    +           }
    +
    +           protected abstract KvState<K, N, S, SD, Backend> 
createRedisState(TypeSerializer<K> keySerializer, TypeSerializer<N> 
namespaceSerializer, SD stateDesc, File dbPath, String backupPath, String 
restorePath) throws Exception;
    +
    +           @Override
    +           public KvState<K, N, S, SD, Backend> restoreState(Backend 
stateBackend, TypeSerializer<K> keySerializer, ClassLoader classLoader, long 
recoveryTimestamp) throws Exception {
    +
    +                   // validity checks
    +                   if (!this.keySerializer.equals(keySerializer)) {
    +                           throw new IllegalArgumentException(
    +                                   "Cannot restore the state from the 
snapshot with the given serializers. " +
    +                                           "State (K/V) was serialized 
with " +
    +                                           "(" + keySerializer + ") " +
    +                                           "now is (" + keySerializer + 
")");
    +                   }
    +
    +                   if (!dbPath.exists()) {
    +                           if (!dbPath.mkdirs()) {
    +                                   throw new RuntimeException("Could not 
create Redis base path " + dbPath);
    +                           }
    +                   }
    +
    +                   FileSystem fs = FileSystem.get(backupUri, new 
Configuration());
    --- End diff --
    
    Variable `fs` is never used...


> Redis as State Backend
> ----------------------
>
>                 Key: FLINK-3035
>                 URL: https://issues.apache.org/jira/browse/FLINK-3035
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Matthias J. Sax
>            Assignee: Subhobrata Dey
>            Priority: Minor
>
> Add Redis as a state backend for distributed snapshots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to