[ https://issues.apache.org/jira/browse/FLINK-3035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140914#comment-15140914 ]
ASF GitHub Bot commented on FLINK-3035: --------------------------------------- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1617#discussion_r52466745 --- Diff: flink-contrib/flink-statebackend-redis/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRedisState.java --- @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.contrib.streaming.state; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; +import org.apache.flink.util.HDFSCopyFromLocal; +import org.apache.flink.util.HDFSCopyToLocal; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.BinaryJedis; +import redis.clients.jedis.exceptions.JedisConnectionException; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.util.Arrays; +import java.util.List; + +/** + * Base class for {@link State} implementations that store state in a Redis database. + * + * <p>This base class is responsible for setting up the Redis database, for + * checkpointing/restoring the database and for disposal in the {@link #dispose()} method. The + * concrete subclasses just use the Redis handle to store/retrieve state. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <S> The type of {@link State}. + * @param <SD> The type of {@link StateDescriptor}. + * @param <Backend> The type of the backend that snapshots this key/value state. + */ +public abstract class AbstractRedisState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> + implements KvState<K, N, S, SD, Backend>, State { + + /** Serializer for the keys */ + protected final TypeSerializer<K> keySerializer; + + /** Serializer for the namespace */ + protected final TypeSerializer<N> namespaceSerializer; + + /** The current key, which the next value methods will refer to */ + protected K currentKey; + + /** The current namespace, which the next value methods will refer to */ + protected N currentNamespace; + + /** Store it so that we can clean up in dispose() */ + protected final File dbPath; + + protected final String checkpointPath; + + /** Our Redis instance */ + protected Process redisServerProcess; + protected final BinaryJedis db; + + /** + * Creates a new Redis backed state. + * + * @param redisExecPath The path on the local system where Redis executable file can be found. + * @param port The port to start Redis server + * @param keySerializer The serializer for the keys. + * @param namespaceSerializer The serializer for the namespace. + * @param dbPath The path on the local system where Redis data should be stored. + */ + protected AbstractRedisState(String redisExecPath, int port, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, File dbPath, String checkpointPath) { + this.keySerializer = keySerializer; --- End diff -- I guess not of them can be `null`. Please check for it. > Redis as State Backend > ---------------------- > > Key: FLINK-3035 > URL: https://issues.apache.org/jira/browse/FLINK-3035 > Project: Flink > Issue Type: New Feature > Components: Streaming > Reporter: Matthias J. Sax > Assignee: Subhobrata Dey > Priority: Minor > > Add Redis as a state backend for distributed snapshots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)