[
https://issues.apache.org/jira/browse/KAFKA-8646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
jmhostalet resolved KAFKA-8646.
-------------------------------
Resolution: Not A Bug
> Materialized.withLoggingDisabled() does not disable changelog topics creation
> -----------------------------------------------------------------------------
>
> Key: KAFKA-8646
> URL: https://issues.apache.org/jira/browse/KAFKA-8646
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.3.0
> Reporter: jmhostalet
> Assignee: Bill Bejeck
> Priority: Minor
>
> I have a cluster with 3 brokers running version 0.11
> My kafka-streams app was using kafka-client 0.11.0.1 but recently I've
> migrated to 2.3.0
> I have no executed any migration as my data is disposable, therefore I have
> deleted all intermediate topics, except input and output topics.
> My streams config is:
> {code:java}
> application.id = consumer-id-v1.00
> application.server =
> bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092]
> buffered.records.per.partition = 1000
> cache.max.bytes.buffering = 524288000
> client.id =
> commit.interval.ms = 30000
> connections.max.idle.ms = 540000
> default.deserialization.exception.handler = class
> org.apache.kafka.streams.errors.LogAndFailExceptionHandler
> default.key.serde = class
> org.apache.kafka.common.serialization.Serdes$StringSerde
> default.production.exception.handler = class
> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
> default.timestamp.extractor = class com.acme.stream.TimeExtractor
> default.value.serde = class com.acme.serde.MyDtoSerde
> max.task.idle.ms = 0
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> num.standby.replicas = 0
> num.stream.threads = 25
> partition.grouper = class
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
> poll.ms = 100
> processing.guarantee = at_least_once
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> replication.factor = 1
> request.timeout.ms = 40000
> retries = 0
> retry.backoff.ms = 100
> rocksdb.config.setter = null
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> state.cleanup.delay.ms = 600000
> state.dir = /tmp/kafka-streams
> topology.optimization = none
> upgrade.from = null
> windowstore.changelog.additional.retention.ms = 86400000
> {code}
> in my stream I am using withLoggingDisabled
> {code:java}
> stream.filter((key, val) -> val!=null)
> .selectKey((key, val) -> getId(val))
> .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new
> MyDtoSerde()))
> .windowedBy(TimeWindows.of(aggregationWindowSizeDuration)
> .grace(windowRetentionPeriodDuration))
> .aggregate(MyDto::new,
> new MyUpdater(),
> Materialized.as("aggregation-updater")
> .withLoggingDisabled()
> .with(Serdes.String(), new MyDtoSerde()))
> .toStream((k, v) -> k.key())
> .mapValues(val -> { ...
> {code}
> but changelog topics are created (KSTREAM-AGGREGATE-STATE-STORE), no matter
> if I delete them before running again the app or if I change the
> application.id
> With a new application.id, topics are recreated with the new prefix.
>
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)