we need to use square brackets only for command line tool. In your code, you just need to supply "compact,delete" string to props object.
On Wed, Jul 11, 2018 at 8:52 AM Jayaraman, AshokKumar (CCI-Atlanta-CON) < ashokkumar.jayara...@cox.com> wrote: > Hi Matthias, > > Kept as [compact,delete] and still got the same exception. > > Thanks & Regards, > > Ashok > > -----Original Message----- > From: Matthias J. Sax [mailto:matth...@confluent.io] > Sent: Tuesday, July 10, 2018 10:09 PM > To: users@kafka.apache.org > Subject: Re: cleanup.policy - doesn't accept compact,delete > > Try to remove the space after the comma. > > -Matthias > > On 7/10/18 10:43 AM, Jayaraman, AshokKumar (CCI-Atlanta-CON) wrote: > > Hi, > > > > When we try to use the same (square brackets), the internal topics are > failing to get created. Any suggestions? > > > > changelogConfig.put("cleanup.policy", "[compact, delete]"); > > > > > > org.apache.kafka.streams.errors.StreamsException: Could not create topic > stream_digital_01-hrly-changelog. > > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:137) > > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:655) > > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:463) > > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:358) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:520) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:822) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:802) > > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:827) > > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:784) > > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamTh > > read.java:720) Caused by: java.util.concurrent.ExecutionException: > > org.apache.kafka.common.errors.UnknownServerException: Invalid value > [compact for configuration cleanup.policy: String must be one of: compact, > delete > > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258) > > at > > org.apache.kafka.streams.processor.internals.InternalTopicManager.make > > Ready(InternalTopicManager.java:121) > > > > > > > > Thanks & Regards, > > > > Ashok > > > > -----Original Message----- > > From: Manikumar [mailto:manikumar.re...@gmail.com] > > Sent: Thursday, June 07, 2018 12:11 AM > > To: Users > > Subject: Re: cleanup.policy - doesn't accept compact,delete > > > > As described in usage description, to group the values which contain > commas, we need to use square brackets. > > > > ex: --add-config cleanup.policy=[compact,delete] > > > > On Thu, Jun 7, 2018 at 8:49 AM, Jayaraman, AshokKumar (CCI-Atlanta-CON) > < ashokkumar.jayara...@cox.com> wrote: > > > >> Hi, > >> > >> We are on Kafka version 1.0.0. Per the below new feature, a topic > >> can allow both compact and delete. I tried all the combinations, but > they all > >> fail to accept values that are not either compact OR delete. Is this > >> feature valid in updated releases, since 0.10.2? If this is not a > >> feature available, how to cleanup the growing compacted topic scenario? > >> > >> https://issues.apache.org/jira/browse/KAFKA-4015 > >> > >> $ ./kafka-configs.sh --zookeeper <<XXXXX>>:2181--alter --entity-type > >> topics --entity-name stream_output --add-config > >> cleanup.policy=compact,delete Error while executing config command > >> requirement failed: Invalid entity > >> config: all configs to be added must be in the format "key=val". > >> java.lang.IllegalArgumentException: requirement failed: Invalid > >> entity > >> config: all configs to be added must be in the format "key=val". > >> at scala.Predef$.require(Predef.scala:233) > >> at kafka.admin.ConfigCommand$.parseConfigsToBeAdded( > >> ConfigCommand.scala:128) > >> at > kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:78) > >> at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:65) > >> at kafka.admin.ConfigCommand.main(ConfigCommand.scala) > >> > >> > >> $ ./kafka-configs.sh --zookeeper <<XXXXX>>:2181 --alter --entity-type > >> topics --entity-name ash_stream_output --add-config > >> cleanup.policy=compact_delete Error while executing config command > >> Invalid value compact_delete for configuration cleanup.policy: String > >> must be one of: compact, delete > >> org.apache.kafka.common.config.ConfigException: Invalid value > >> compact_delete for configuration cleanup.policy: String must be one of: > >> compact, delete > >> at org.apache.kafka.common.config.ConfigDef$ValidString. > >> ensureValid(ConfigDef.java:851) > >> at org.apache.kafka.common.config.ConfigDef$ValidList. > >> ensureValid(ConfigDef.java:827) > >> at org.apache.kafka.common.config.ConfigDef.parse( > >> ConfigDef.java:427) > >> at kafka.log.LogConfig$.validate(LogConfig.scala:331) > >> at > kafka.admin.AdminUtils$.changeTopicConfig(AdminUtils.scala:524) > >> at > kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:90) > >> at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:65) > >> at kafka.admin.ConfigCommand.main(ConfigCommand.scala) > >> > >> > >> $ ./kafka-configs.sh --zookeeper <<XXXXX>>:2181 --alter --entity-type > >> topics --entity-name ash_stream_output --add-config > >> cleanup.policy=compact_and_delete Error while executing config > >> command Invalid value compact_delete for configuration > >> cleanup.policy: String must be one of: compact, delete > >> org.apache.kafka.common.config.ConfigException: Invalid value > >> compact_delete for configuration cleanup.policy: String must be one of: > >> compact, delete > >> at org.apache.kafka.common.config.ConfigDef$ValidString. > >> ensureValid(ConfigDef.java:851) > >> at org.apache.kafka.common.config.ConfigDef$ValidList. > >> ensureValid(ConfigDef.java:827) > >> at org.apache.kafka.common.config.ConfigDef.parse( > >> ConfigDef.java:427) > >> at kafka.log.LogConfig$.validate(LogConfig.scala:331) > >> at > kafka.admin.AdminUtils$.changeTopicConfig(AdminUtils.scala:524) > >> at > kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:90) > >> at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:65) > >> at kafka.admin.ConfigCommand.main(ConfigCommand.scala) > >> > >> > >> Thanks & Regards, > >> Ashok > >> > >> > >