Ozgur created KAFKA-7628: ---------------------------- Summary: KafkaStream is not closing Key: KAFKA-7628 URL: https://issues.apache.org/jira/browse/KAFKA-7628 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.11.0.1 Environment: Macbook Pro Reporter: Ozgur
I'm closing a KafkaStream when I need based on a certain condition: Closing: {code:java} if(kafkaStream == null) { logger.info("KafkaStream already closed?"); } else { boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS); if(closed) { kafkaStream = null; logger.info("KafkaStream closed"); } else { logger.info("KafkaStream could not closed"); } } {code} Starting: {code:java} if(kafkaStream == null) { logger.info("KafkaStream is starting"); kafkaStream = KafkaManager.getInstance().getStream(this.getConfigFilePath(), this, this.getTopic() ); kafkaStream.start(); logger.info("KafkaStream is started"); } {code} In my implementation of Processor, {{process(String key, byte[] value)}} is still called although successfully closing stream: {code:java} // code placeholder public abstract class BaseKafkaProcessor implements Processor<String, byte[]> { private static Logger logger = LogManager.getLogger(BaseKafkaProcessor.class); private ProcessorContext context; private ProcessorContext getContext() { return context; } @Override public void init(ProcessorContext context) { this.context = context; this.context.schedule(1000); } @Override public void process(String key, byte[] value) { try { String topic = key.split("-")[0]; byte[] uncompressed = GzipCompressionUtil.uncompress(value); String json = new String(uncompressed, "UTF-8"); processRecord(topic, json); this.getContext().commit(); } catch (Exception e) { logger.error("Error processing json", e); } } protected abstract void processRecord(String topic, String json); @Override public void punctuate(long timestamp) { this.getContext().commit(); } @Override public void close() { this.getContext().commit(); } } {code} My configuration for KafkaStreams: {code:java} application.id=dv_ws_in_app_activity_dev4 bootstrap.servers=VLXH1 auto.offset.reset=latest num.stream.threads=1 key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde poll.ms = 100 commit.interval.ms=1000 state.dir=../../temp/kafka-state-dir {code} Version: *0.11.0.1* I'm witnessing that after closing() the streams, these ports are still listening: {code:java} $ sudo lsof -i -n -P | grep 9092 java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED) java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED) java 29457 ozgur 135u IPv6 0x531e55051a789903 0t0 TCP x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED) java 29457 ozgur 136u IPv6 0x531e55051a78aa43 0t0 TCP x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED) java 29457 ozgur 140u IPv6 0x531e55051a78c703 0t0 TCP x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED) java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)