[ https://issues.apache.org/jira/browse/FLINK-3035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140946#comment-15140946 ]
ASF GitHub Bot commented on FLINK-3035: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1617#discussion_r52468882 --- Diff: flink-contrib/flink-statebackend-redis/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRedisState.java --- @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.contrib.streaming.state; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; +import org.apache.flink.util.HDFSCopyFromLocal; +import org.apache.flink.util.HDFSCopyToLocal; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.BinaryJedis; +import redis.clients.jedis.exceptions.JedisConnectionException; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.util.Arrays; +import java.util.List; + +/** + * Base class for {@link State} implementations that store state in a Redis database. + * + * <p>This base class is responsible for setting up the Redis database, for + * checkpointing/restoring the database and for disposal in the {@link #dispose()} method. The + * concrete subclasses just use the Redis handle to store/retrieve state. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <S> The type of {@link State}. + * @param <SD> The type of {@link StateDescriptor}. + * @param <Backend> The type of the backend that snapshots this key/value state. + */ +public abstract class AbstractRedisState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> + implements KvState<K, N, S, SD, Backend>, State { + + /** Serializer for the keys */ + protected final TypeSerializer<K> keySerializer; + + /** Serializer for the namespace */ + protected final TypeSerializer<N> namespaceSerializer; + + /** The current key, which the next value methods will refer to */ + protected K currentKey; + + /** The current namespace, which the next value methods will refer to */ + protected N currentNamespace; + + /** Store it so that we can clean up in dispose() */ + protected final File dbPath; + + protected final String checkpointPath; + + /** Our Redis instance */ + protected Process redisServerProcess; + protected final BinaryJedis db; + + /** + * Creates a new Redis backed state. + * + * @param redisExecPath The path on the local system where Redis executable file can be found. + * @param port The port to start Redis server + * @param keySerializer The serializer for the keys. + * @param namespaceSerializer The serializer for the namespace. + * @param dbPath The path on the local system where Redis data should be stored. + */ + protected AbstractRedisState(String redisExecPath, int port, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, File dbPath, String checkpointPath) { + this.keySerializer = keySerializer; + this.namespaceSerializer = namespaceSerializer; + this.dbPath = dbPath; + this.checkpointPath = checkpointPath; + + if (!dbPath.exists()) { + if (!dbPath.mkdirs()) { + throw new RuntimeException("Could not create Redis data directory."); + } + } + + try { + startRedisServer(redisExecPath, "--port", String.valueOf(port), "--dir", dbPath.getAbsolutePath()); --- End diff -- Not sure about this one. But I don't think it can work the way it is setup now. All the redis servers would try to start on the same port and the state all use the same port. This would not work with several instances of the operator that uses the state on the same machine. > Redis as State Backend > ---------------------- > > Key: FLINK-3035 > URL: https://issues.apache.org/jira/browse/FLINK-3035 > Project: Flink > Issue Type: New Feature > Components: Streaming > Reporter: Matthias J. Sax > Assignee: Subhobrata Dey > Priority: Minor > > Add Redis as a state backend for distributed snapshots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)