[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350685#comment-15350685 ]
ASF GitHub Bot commented on FLINK-3034: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r68545039 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a single Redis server or to Redis sentinels + * If want to connect to a single Redis server, plz use the first constructor {@link #RedisContainer(JedisPool)}. + * If want to connect to a Redis sentinels, Plz use the second constructor {@link #RedisContainer(JedisSentinelPool)} + */ +public class RedisContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); + + private JedisPool jedisPool; + private JedisSentinelPool jedisSentinelPool; + + + /** + * Use this constructor if to connect with single Redis server. + * + * @param jedisPool JedisPool which actually manages Jedis instances + */ + public RedisContainer(JedisPool jedisPool) { + Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be null"); + this.jedisPool = jedisPool; + } + + /** + * Use this constructor if Redis environment is clustered with sentinels. + * + * @param sentinelPool SentinelPool which actually manages Jedis instances + */ + public RedisContainer(final JedisSentinelPool sentinelPool) { + Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool can not be null"); + this.jedisSentinelPool = sentinelPool; + } + + /** + * Closes the Jedis instances. + */ + @Override + public void close() throws IOException { + if (this.jedisPool != null) { + this.jedisPool.close(); + } + if (this.jedisSentinelPool != null) { + this.jedisSentinelPool.close(); + } + } + + @Override + public void hset(final String hashName, final String key, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.hset(hashName, key, value); --- End diff -- The argument order here is wrong. Jedis takes the argument for hset in this order: `(key, hashField, value)`. The primary key goes first, then the secondary. > Redis SInk Connector > -------------------- > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors > Reporter: Matthias J. Sax > Assignee: Subhankar Biswas > Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)