I'd suggest to check that shutdown() in close() always completes: @Override public void close() { this.redisson.shutdown(); log.info(String.format("Shut down redisson instance in close method, RedissonRxClient shutdown is %s", redisson.isShutdown())); } maybe by logging on open and then comparing the counts.
It can be that the shutdown is interrupted, times out, or not called (as a result of a bug). Probably, it also makes sense to add isShuttingDown() to see whether shutdown has completed. Regards, Roman On Thu, Sep 23, 2021 at 4:29 AM a773807943 <a773807...@gmail.com> wrote: > > > > I encountered a problem in the process of integrating Flink and Redisson. > When the task encounters abnormalities and keeps retries, it will cause the > number of Redis Clients to increase volatility (sometimes the number > increases, sometimes the number decreases, but the overall trend is growth). > Even if I shutdown the Redisson Instance by overwriting the close function , > the number of Redis-Clients cannot be prevented from continuing to grow, and > eventually the number of Clients will reach the upper limit and an error will > be thrown. Moreover, this situation only occurs in the Flink cluster > operation mode, and the number of Redis-Clients will remain stable in the > local mode. The test code is below. I wonder if you can provide specific > reasons and solutions for this situation, thank you. > > > flink version:1.13.2 > redisson version:3.16.1 > > > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > import org.apache.kafka.clients.consumer.ConsumerConfig; > > > import java.util.Properties; > import java.util.Random; > > > public class ExceptionTest { > public static void main(String[] args) throws Exception{ > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); > env.enableCheckpointing(1000 * 60); > DataStream<String> mock = createDataStream(env); > mock.keyBy(x - > 1) > .process(new ExceptionTestFunction()) > .uid("batch-query-key-process") > .filter(x- >x!=null) > .print(); > env.execute("Exception-Test"); > } > > > private static DataStream<String > > createDataStream(StreamExecutionEnvironment env) { > String topic = "test_topic_xhb03"; > Properties test = new Properties(); > test.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker"); > test.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group"); > > > FlinkKafkaConsumer<String > consumer = new FlinkKafkaConsumer<String > >(topic, new SimpleStringSchema(), test); > consumer.setStartFromLatest(); > > > DataStream<String > source = env.addSource(consumer); > return source; > } > } > > > > > import lombok.extern.slf4j.Slf4j; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > import org.apache.flink.util.Collector; > import org.redisson.Redisson; > import org.redisson.api.RedissonRxClient; > import org.redisson.config.Config; > > > @Slf4j > public class ExceptionTestFunction extends KeyedProcessFunction<Integer, > String, String > { > private RedissonRxClient redisson; > > > @Override > public void close() { > this.redisson.shutdown(); > log.info(String.format("Shut down redisson instance in close method, > RedissonRxClient shutdown is %s", redisson.isShutdown())); > > > } > > @Override > public void open(Configuration parameters) { > String prefix = "redis://"; > Config config = new Config(); > config.useSingleServer() > .setClientName("xhb-redisson-main") > .setTimeout(5000) > .setConnectTimeout(10000) > .setConnectionPoolSize(4) > .setConnectionMinimumIdleSize(2) > .setIdleConnectionTimeout(10000) > .setAddress("127.0.0.1:6379") > .setDatabase(0) > .setPassword(null); > this.redisson = Redisson.create(config).rxJava(); > } > > > @Override > public void processElement(String value, Context ctx, Collector<String > > out) throws Exception { > throw new NullPointerException("Null Pointer in ProcessElement"); > } > }