Re: best approach to rotate the logs and archive on Kubernetes
I think if you just log to the console, kubernetes will manage the log rotation for you. https://kubernetes.io/docs/concepts/cluster-administration/logging/ You can use "kubectl logs" command to fetch the logs or use some logging agent to move the logs centrally. On Tue, Aug 4, 2020 at 8:15 PM Srinivas Seema wrote: > Hi All, > > We have Kafka cluster deployed in Kubernetes and running with docker image > (solsson/kafka:2.4.0) > > I have below logging configuration: config/log4j.properties > > # Unspecified loggers and loggers with additivity=true output to server.log > and stdout > # Note that INFO only applies to unspecified loggers, the log level of the > child logger is used otherwise > log4j.rootLogger=INFO, stdout, kafkaAppender > > log4j.appender.stdout=org.apache.log4j.ConsoleAppender > log4j.appender.stdout.layout=org.apache.log4j.PatternLayout > log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n > > log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender > log4j.appender.kafkaAppender.DatePattern='.'-MM-dd-HH > log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log > log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout > log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n > > > log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender > log4j.appender.stateChangeAppender.DatePattern='.'-MM-dd-HH > log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log > log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout > log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m > (%c)%n > > log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender > log4j.appender.requestAppender.DatePattern='.'-MM-dd-HH > log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log > log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout > log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n > > log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender > log4j.appender.cleanerAppender.DatePattern='.'-MM-dd-HH > log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log > log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout > log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n > > log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender > log4j.appender.controllerAppender.DatePattern='.'-MM-dd-HH > log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log > log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout > log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m > (%c)%n > > log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender > log4j.appender.authorizerAppender.DatePattern='.'-MM-dd-HH > > log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log > log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout > log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m > (%c)%n > > # Change the line below to adjust ZK client logging > log4j.logger.org.apache.zookeeper=INFO > > # Change the two lines below to adjust the general broker logging level > (output to server.log and stdout) > log4j.logger.kafka=INFO > log4j.logger.org.apache.kafka=INFO > > # Change to DEBUG or TRACE to enable request logging > log4j.logger.kafka.request.logger=WARN, requestAppender > log4j.additivity.kafka.request.logger=false > > # Uncomment the lines below and change > log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output > # related to the handling of requests > #log4j.logger.kafka.network.Processor=TRACE, requestAppender > #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender > #log4j.additivity.kafka.server.KafkaApis=false > log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender > log4j.additivity.kafka.network.RequestChannel$=false > > log4j.logger.kafka.controller=TRACE, controllerAppender > log4j.additivity.kafka.controller=false > > log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender > log4j.additivity.kafka.log.LogCleaner=false > > log4j.logger.state.change.logger=TRACE, stateChangeAppender > log4j.additivity.state.change.logger=false > > # Access denials are logged at INFO level, change to DEBUG to also log > allowed accesses > log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender > log4j.additivity.kafka.authorizer.logger=false > > Problem Statement: > logs are getting piled up and using more disk and pods are going to evicted > state. > > I would like to know the best approach to rotate the logs and archive > (kafka component wise & provided size e.g: 10 MB), so that I can analyze > logs and save the disk space. > > ~Srinivas >
Re: Streams, Kafka windows
You can try to convert the final resultant stream to table. Check this page for more info: https://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step This way table would always contain the latest (single) record for a given key. Sachin On Tue, Jan 14, 2020 at 10:11 PM Viktor Markvardt < viktor.markva...@gmail.com> wrote: > Hi, > > My name is Viktor. I'm currently working with Kafka streams and have > several questions about Kafka and I can not find answers in the official > docs. > > 1. Why suppress functionality does not work with Hopping windows? How to > make it work? > > Example of the code: > > KStream finalStream = source > .groupByKey() > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10))) > .reduce((aggValue, newValue) -> newValue, > Materialized.with(Serdes.String(), Serdes.String())) > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > .toStream(); > > finalStream.print(Printed.toSysOut()); > finalStream.to(outputTopic); > > After I run the code above - output stream is empty. There were no > errors/exceptions. > NOTE: With Tumbling Window the code working as expected. > Maybe I simply use it incorrectly? > > 2. Why with Hopping windows (without suppress) there are duplicates in the > output stream? > E.g., I send one record in the input kstream with Hopping window > (duration=30s, advanceBy=2s) but get two same records (duplicate) in the > output kstream. > Is that an expected behavior? If so, how can I filter/switch off these > duplicates? > > 3. Mainly I'm trying to solve this problem: > I have kstream with events inside and events can be repeated (duplicates). > In the output kstream I would like to receive only unique events for the > last 24 hours (window duration) with 1 hour window overlay (window > advanceBy). > Could you recommend me any examples of code or docs please? > I have already read official docs and examples but it was not enough to get > full understanding of how I can achieve this. > > Best regards, > Viktor Markvardt >
[jira] [Created] (KAFKA-4848) Stream thread getting into deadlock state while trying to get rocksdb lock in retryWithBackoff
Sachin Mittal created KAFKA-4848: Summary: Stream thread getting into deadlock state while trying to get rocksdb lock in retryWithBackoff Key: KAFKA-4848 URL: https://issues.apache.org/jira/browse/KAFKA-4848 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.2.0 Reporter: Sachin Mittal Attachments: thr-1 We see a deadlock state when streams thread to process a task takes longer than MAX_POLL_INTERVAL_MS_CONFIG time. In this case this threads partitions are assigned to some other thread including rocksdb lock. When it tries to process the next task it cannot get rocks db lock and simply keeps waiting for that lock forever. in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs = 50L. If it does not get lock the we simply increase the time by 10x and keep trying inside the while true loop. We need to have a upper bound for this backoffTimeM. If the time is greater than MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means this thread's partitions are moved somewhere else and it may not get the lock again. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4848) Stream thread getting into deadlock state while trying to get rocksdb lock in retryWithBackoff
[ https://issues.apache.org/jira/browse/KAFKA-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897462#comment-15897462 ] Sachin Mittal commented on KAFKA-4848: -- The deadlock issue is like this. If a thread has two partitions and while processing first partition it takes more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is evicted from the group and both partitions are now migrated to some other thread. Now when it tries to process the second partition it tries to get the lock to rocks db. It won't get the lock since that partition is now moved to some other thread. So it keeps increasing the backoffTimeMs and keeps trying to get the lock forever. This reaching a deadlock. To fix this we need some upper bound of the time limit till it tries to get that lock. And that upper bound has to be MAX_POLL_INTERVAL_MS_CONFIG, because if by that time it has not got the lock, we can see that this thread was evicted from the group and need to rejoin again to get new partitions. See in attached file: DEBUG 2017-03-01 18:17:42,465 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] creating new task 0_4 DEBUG 2017-03-01 18:24:19,202 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [StreamThread-1] creating new task 0_8 Note from above 2 lines it took more than 5 minutes to process task 0_4. As a result partitions moved to a different thread. Next see following entries for 0_8 WARN 2017-03-01 18:24:20,205 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:24:21,257 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:24:22,360 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:24:23,563 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:24:24,966 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:24:26,768 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:24:29,371 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:24:34,435 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:24:41,837 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:24:55,640 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:25:22,242 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:26:14,445 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:27:57,848 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:31:23,689 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:38:14,294 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 18:51:54,497 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 19:19:13,900 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 20:13:53,014 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-01 22:03:07,629 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-02 01:41:35,831 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. WARN 2017-03-02 08:58:31,234 [StreamThread-1]: org.apache.kafka.streams.processor.internals.StreamThread - Could not create task 0_8. Will retry. >From 2017-03-01 18:24:20,205 to 017-03-02 08:58:31,234 it kept trying to get >the lock, hence deadlock. > Stream thread getting into deadlock state while trying to get rocksdb lock in
[jira] [Commented] (KAFKA-4848) Stream thread getting into deadlock state while trying to get rocksdb lock in retryWithBackoff
[ https://issues.apache.org/jira/browse/KAFKA-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897463#comment-15897463 ] Sachin Mittal commented on KAFKA-4848: -- Please refer https://github.com/apache/kafka/pull/2642 as potential fix for the issue. > Stream thread getting into deadlock state while trying to get rocksdb lock in > retryWithBackoff > -- > > Key: KAFKA-4848 > URL: https://issues.apache.org/jira/browse/KAFKA-4848 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Sachin Mittal > Attachments: thr-1 > > > We see a deadlock state when streams thread to process a task takes longer > than MAX_POLL_INTERVAL_MS_CONFIG time. In this case this threads partitions > are assigned to some other thread including rocksdb lock. When it tries to > process the next task it cannot get rocks db lock and simply keeps waiting > for that lock forever. > in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs = 50L. > If it does not get lock the we simply increase the time by 10x and keep > trying inside the while true loop. > We need to have a upper bound for this backoffTimeM. If the time is greater > than MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means this > thread's partitions are moved somewhere else and it may not get the lock > again. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4848) Stream thread getting into deadlock state while trying to get rocksdb lock in retryWithBackoff
[ https://issues.apache.org/jira/browse/KAFKA-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15935744#comment-15935744 ] Sachin Mittal commented on KAFKA-4848: -- [~guozhang] We ran only once instance of streams application per machine. So if two threads would access same partition on the same machine then this issue would arise. I am not sure what would happen with different threads of different instances (processes) on same machine would try to get lock of same partition. > Stream thread getting into deadlock state while trying to get rocksdb lock in > retryWithBackoff > -- > > Key: KAFKA-4848 > URL: https://issues.apache.org/jira/browse/KAFKA-4848 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Sachin Mittal >Assignee: Sachin Mittal > Fix For: 0.11.0.0 > > Attachments: thr-1 > > > We see a deadlock state when streams thread to process a task takes longer > than MAX_POLL_INTERVAL_MS_CONFIG time. In this case this threads partitions > are assigned to some other thread including rocksdb lock. When it tries to > process the next task it cannot get rocks db lock and simply keeps waiting > for that lock forever. > in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs = 50L. > If it does not get lock the we simply increase the time by 10x and keep > trying inside the while true loop. > We need to have a upper bound for this backoffTimeM. If the time is greater > than MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means this > thread's partitions are moved somewhere else and it may not get the lock > again. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4848) Stream thread getting into deadlock state while trying to get rocksdb lock in retryWithBackoff
[ https://issues.apache.org/jira/browse/KAFKA-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15953777#comment-15953777 ] Sachin Mittal commented on KAFKA-4848: -- Please let me know if this will be done in 0.10.2 branch. Do I need to issue a PR for the same. Also note that in that branch some fixes which are there in trunk like catching the commit failed exception of offset commits is not there, which would be a pre-requiste for this fix. So let me know how are we planning on 0.10.2.1 release. > Stream thread getting into deadlock state while trying to get rocksdb lock in > retryWithBackoff > -- > > Key: KAFKA-4848 > URL: https://issues.apache.org/jira/browse/KAFKA-4848 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Sachin Mittal >Assignee: Sachin Mittal > Fix For: 0.11.0.0, 0.10.2.1 > > Attachments: thr-1 > > > We see a deadlock state when streams thread to process a task takes longer > than MAX_POLL_INTERVAL_MS_CONFIG time. In this case this threads partitions > are assigned to some other thread including rocksdb lock. When it tries to > process the next task it cannot get rocks db lock and simply keeps waiting > for that lock forever. > in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs = 50L. > If it does not get lock the we simply increase the time by 10x and keep > trying inside the while true loop. > We need to have a upper bound for this backoffTimeM. If the time is greater > than MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means this > thread's partitions are moved somewhere else and it may not get the lock > again. -- This message was sent by Atlassian JIRA (v6.3.15#6346)