Hi, When we try to use the same (square brackets), the internal topics are failing to get created. Any suggestions?
changelogConfig.put("cleanup.policy", "[compact, delete]"); org.apache.kafka.streams.errors.StreamsException: Could not create topic stream_digital_01-hrly-changelog. at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:137) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:655) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:463) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:358) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:520) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:822) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:802) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:827) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:784) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: Invalid value [compact for configuration cleanup.policy: String must be one of: compact, delete at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258) at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:121) Thanks & Regards, Ashok -----Original Message----- From: Manikumar [mailto:manikumar.re...@gmail.com] Sent: Thursday, June 07, 2018 12:11 AM To: Users Subject: Re: cleanup.policy - doesn't accept compact,delete As described in usage description, to group the values which contain commas, we need to use square brackets. ex: --add-config cleanup.policy=[compact,delete] On Thu, Jun 7, 2018 at 8:49 AM, Jayaraman, AshokKumar (CCI-Atlanta-CON) < ashokkumar.jayara...@cox.com> wrote: > Hi, > > We are on Kafka version 1.0.0. Per the below new feature, a topic can > allow both compact and delete. I tried all the combinations, but they all > fail to accept values that are not either compact OR delete. Is this > feature valid in updated releases, since 0.10.2? If this is not a > feature available, how to cleanup the growing compacted topic scenario? > > https://issues.apache.org/jira/browse/KAFKA-4015 > > $ ./kafka-configs.sh --zookeeper <<XXXXX>>:2181--alter --entity-type > topics --entity-name stream_output --add-config > cleanup.policy=compact,delete Error while executing config command > requirement failed: Invalid entity > config: all configs to be added must be in the format "key=val". > java.lang.IllegalArgumentException: requirement failed: Invalid entity > config: all configs to be added must be in the format "key=val". > at scala.Predef$.require(Predef.scala:233) > at kafka.admin.ConfigCommand$.parseConfigsToBeAdded( > ConfigCommand.scala:128) > at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:78) > at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:65) > at kafka.admin.ConfigCommand.main(ConfigCommand.scala) > > > $ ./kafka-configs.sh --zookeeper <<XXXXX>>:2181 --alter --entity-type > topics --entity-name ash_stream_output --add-config > cleanup.policy=compact_delete Error while executing config command > Invalid value compact_delete for configuration cleanup.policy: String > must be one of: compact, delete > org.apache.kafka.common.config.ConfigException: Invalid value > compact_delete for configuration cleanup.policy: String must be one of: > compact, delete > at org.apache.kafka.common.config.ConfigDef$ValidString. > ensureValid(ConfigDef.java:851) > at org.apache.kafka.common.config.ConfigDef$ValidList. > ensureValid(ConfigDef.java:827) > at org.apache.kafka.common.config.ConfigDef.parse( > ConfigDef.java:427) > at kafka.log.LogConfig$.validate(LogConfig.scala:331) > at kafka.admin.AdminUtils$.changeTopicConfig(AdminUtils.scala:524) > at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:90) > at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:65) > at kafka.admin.ConfigCommand.main(ConfigCommand.scala) > > > $ ./kafka-configs.sh --zookeeper <<XXXXX>>:2181 --alter --entity-type > topics --entity-name ash_stream_output --add-config > cleanup.policy=compact_and_delete Error while executing config command > Invalid value compact_delete for configuration cleanup.policy: String > must be one of: compact, delete > org.apache.kafka.common.config.ConfigException: Invalid value > compact_delete for configuration cleanup.policy: String must be one of: > compact, delete > at org.apache.kafka.common.config.ConfigDef$ValidString. > ensureValid(ConfigDef.java:851) > at org.apache.kafka.common.config.ConfigDef$ValidList. > ensureValid(ConfigDef.java:827) > at org.apache.kafka.common.config.ConfigDef.parse( > ConfigDef.java:427) > at kafka.log.LogConfig$.validate(LogConfig.scala:331) > at kafka.admin.AdminUtils$.changeTopicConfig(AdminUtils.scala:524) > at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:90) > at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:65) > at kafka.admin.ConfigCommand.main(ConfigCommand.scala) > > > Thanks & Regards, > Ashok > >