Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi John, Sorry I haven't had time to prepare the minimal reproducer yet. I still have plans to do it though... On 1/22/19 8:02 PM, John Roesler wrote: Hi Peter, Just to follow up on the actual bug, can you confirm whether: * when you say "restart", do you mean orderly shutdown and restart, or crash and restart? I start it as SpringBoot application from IDEA and then stop it with the red square button. It does initiate the shutdown sequence before exiting... So I think it is by SIGTERM which initiates JVM shutdown hook(s). * have you tried this with EOS enabled? I can imagine some ways that there could be duplicates, but they should be impossible with EOS enabled. Yes, I have EOS enabled. Thanks for your help, -John Regards, Peter On Mon, Jan 14, 2019 at 1:20 PM John Roesler wrote: Hi Peter, I see your train of thought, but the actual implementation of the window store is structured differently from your mental model. Unlike Key/Value stores, we know that the records in a window store will "expire" on a regular schedule, and also that every single record will eventually expire. With this in mind, we have implemented an optimization to avoid a lot of compaction overhead in RocksDB, as well as saving on range scans. Instead of storing everything in one database, we open several databases and bucket windows into them. Then, when windows expire, we just ignore the records (i.e., the API makes them unreachable, but we don't actually delete them). Once all the windows in a database are expired, we just close and delete the whole database. Then, we open a new one for new windows. If you look in the code, these databases are called "segments". Thus, I don't think that you should attempt to use the built-in window stores as you described. Instead, it should be straightforward to implement your own StateStore with a layout that's more favorable to your desired behavior. You should also be able to set up the change log the way you need as well. Explicitly removed entities also would get removed from the log as well, if it's a compacted log. Actually, what you're describing is *very* similar to the implementation for suppress. I might actually suggest that you just copy the suppression implementation and adapt it to your needs, or at the very least, study how it works. In doing so, you might actually discover the cause of the bug yourself! I hope this helps, and thanks for your help, -John On Sat, Jan 12, 2019 at 5:45 AM Peter Levart wrote: Hi Jonh, Thank you very much for explaining how WindowStore works. I have some more questions... On 1/10/19 5:33 PM, John Roesler wrote: Hi Peter, Regarding retention, I was not referring to log retention, but to the window store retention. Since a new window is created every second (for example), there are in principle an unbounded number of windows (the longer the application runs, the more windows there are, with no end). However, we obviously can't store an infinite amount of data, so the window definition includes a retention period. By default, this is 24 hours. After the retention period elapses, all of the data for the window is purged to make room for new windows. Right. Would the following work for example: - configure retention of WindowStore to be "infinite" - explicitly remove records from the store when windows are flushed out - configure WindowStore log topic for compacting Something like the following: Stores .windowStoreBuilder( Stores.persistentWindowStore( storeName, Duration.of(1000L, ChronoUnit.YEARS), // retentionPeriod Duration.ofSeconds(10), // windowSize false ), keySerde, valSerde ) .withCachingEnabled() .withLoggingEnabled( Map.of( TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT ) ); Would in above scenario: - the on-disk WindowStore be kept bounded (there could be some very old entries in it but majority will be new - depending on the activity of particular input keys) - the log topic be kept bounded (explicitly removed entries would be removed from compacted log too) I'm moving away from DSL partly because I have some problems with suppression (which I hope we'll be able to fix) and partly because the DSL can't give me the complicated semantics that I need for the application at hand. I tried to capture what I need in a custom Transformer here: https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f Your knowledge of how WindowStore works would greatly help me decide if this is a workable idea. So what I meant was that if you buffer some key "A" in window (Monday 09:00:00) and then get no further activity for A for over 24 hours, then when you do get that next event for A, say at (Tuesday 11
Re: Open files clogging and KafkaStreams
Hi Guozhang, I think I went a bit ahead of myself in describing the situation, I had an attachment with the context in detail, maybe it was filtered out. Lets try again =) We have a topology looking something like this: input-topic[20 partitions, compacted] | use-case-repartition[20 partitions, infinite retention, segment.ms=10min] | use-case-changelog We have previously hit the TooManyOpenFiles issue and "solved" it by raising the bar to something extreme. Later we found out that we wanted rep factor 3 on all internal topics, so we reset the app and BOOM, now we hit a too many memory mapped files limit instead the input topic contains 30 days of data, where we pretty much have records in every 10minute window for every partition. This means if nothing consumes the repartition topic we will have 6 (10 min slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex files) * 3 replication factor / 5 brokers in cluster = *155.520 *open files just to have this repartition topic in place. You would say, yeah but no problem as it would be deleted and you would not reach such high numbers? But doesn't seem to be the case. What happened in our case is that, due to how the broker multiplexes the topic partitions for the subscribers, the streams application piled up all the repartition records, and only when caught up, all the downstream processes started taking place. I do see this as a design flaw in some component, probably the broker. It cant be the desired behaviour. How many open files do I need to be able to have open in a year of data when resetting/reprocessing an application? By adding more threads than input topic partitions, I managed to force the broker to give out these records earlier and issue was mitigated. Ideally the downstream records should be processed somewhere near in time as the source record. Lets take one partition, containing 1.000.000 records this is the observed behaviour I have seen: (Somewhat simplified) Time Consumer offset Input topic Records in input topic Consumer offset repartition topic Records in repartition topic 00:000 1.000.000 00 00:01100.0001.000.000 0100.000 00:02200.0001.000.000 0200.000 00:03300.0001.000.000 0300.000 00:04400.0001.000.000 0400.000 00:05500.0001.000.000 0500.000 00:06600.0001.000.000 0600.000 00:07700.0001.000.000 0700.000 00:08800.0001.000.000 0800.000 00:09900.0001.000.000 0900.000 00:101.000.000 1.000.000 01000.000 00:111.000.000 1.000.000 100.000 1000.000 00:121.000.000 1.000.000 200.000 1000.000 00:131.000.000 1.000.000 300.000 1000.000 00:141.000.000 1.000.000 400.000 1000.000 00:151.000.000 1.000.000 500.000 1000.000 00:161.000.000 1.000.000 600.000 1000.000 00:171.000.000 1.000.000 700.000 1000.000 00:181.000.000 1.000.000 800.000 1000.000 00:191.000.000 1.000.000 900.000 1000.000 00:201.000.000 1.000.000 1.000.000 1000.000 As you can see, there is no parallel execution and its due to that the broker does not giv
Re: Kafka Consumer Not Assigned Partitions
Hello, I have subscribed to a kafka topic as below . I need to run some logic only after the consumer has been assigned a partition .How ever consumer.assignment() comes back as an empty set no matter how long I wait . If I do not have the while loop and then do a consumer.poll() I do get the records from the topic.Can any one tell me why this is happening ? consumer.subscribe(topics); consumer. Set assigned=Collections.emptySet(); while(isAssigned) { assigned = consumer.assignment(); if(!assigned.isEmpty()) { isAssigned= false; } } Thanks, On Tue, Jan 22, 2019 at 2:14 PM chinchu chinchu wrote: > Hello, > I have subscribed to a kafka topic as below . I need to run some logic > only after the consumer has been assigned a partition .How ever > consumer.assignment() comes back as an empty set no matter how long I wait > . If I do not have the while loop and then do a consumer.poll() I do get > the records from the topic.Can any one tell me why this is happening ? > > consumer.subscribe(topics); > consumer. > > Set assigned=Collections.emptySet(); > while(isAssigned) > { > assigned = consumer.assignment(); > if(!assigned.isEmpty()) { > isAssigned= false; >} >} > > Thanks, > Chinchu >
Kafka's cluster for thousands of clients (up to 30k)
Hi all! We're trying to use Kafka to deliver messages to thousands of clients (different data for each client). Actually we want to get behavior pretty much like message broker, because we have to work with continious flow of small data pieces. Suppose, we should create a partition for each client. Is it right? We stuck with problems of infinite shrink/expand ISR and the "FETCH_SESSION_ID_NOT_FOUND" error. My configuration: 5 servers (8 vCPU, 16GB RAM) 20k partitions 2000 consumers -replication-factor - 2 queued.max.requests = 2000 replica.fetch.wait.max.ms = 5000 replica.lag.time.max.ms = 2 num.replica.fetchers = 5 num.network.threads=9 num.io.threads=24 -- Best Regards, Denis Romanenko Head of ESB Development, PJSC Magnit
Re: Open files clogging and KafkaStreams
I see. What you described is a known issue in the older version of Kafka, that some high traffic topics in the bootstrap mode may effectively "starve" other topics in the fetch response, since brokers used to naively fill in the bytes that meets the max.bytes configuration and returns. This is fixed in 1.1 version via incremental fetch request: https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability The basic idea is to not always request topics like A,B,C; instead if the previous request asks for topics A,B,C and got all data from A, then next request would be B,C,A, etc. So if you are on older versions of Kafka I'd suggest you upgrade to newer version. If you cannot upgrade atm, another suggest as I mentioned above is to change the segment sizes so you can have much larger, and hence fewer segment files. Guozhang On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn wrote: > Hi Guozhang, > > I think I went a bit ahead of myself in describing the situation, I had an > attachment with the context in detail, maybe it was filtered out. Lets try > again =) > > We have a topology looking something like this: > > input-topic[20 partitions, compacted] > | > use-case-repartition[20 partitions, infinite retention, segment.ms=10min] > | > use-case-changelog > > We have previously hit the TooManyOpenFiles issue and "solved" it by > raising the bar to something extreme. > Later we found out that we wanted rep factor 3 on all internal topics, so > we reset the app and BOOM, now we hit a too many memory mapped files limit > instead > > the input topic contains 30 days of data, where we pretty much have records > in every 10minute window for every partition. > This means if nothing consumes the repartition topic we will have 6 (10 min > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex > files) * 3 replication factor / 5 brokers in cluster = *155.520 *open files > just to have this repartition topic in place. > > You would say, yeah but no problem as it would be deleted and you would not > reach such high numbers? But doesn't seem to be the case. > What happened in our case is that, due to how the broker multiplexes the > topic partitions for the subscribers, the streams application piled up all > the repartition records, and only when caught up, all the downstream > processes started taking place. I do see this as a design flaw in some > component, probably the broker. It cant be the desired behaviour. How many > open files do I need to be able to have open in a year of data when > resetting/reprocessing an application? > > By adding more threads than input topic partitions, I managed to force the > broker to give out these records earlier and issue was mitigated. > > Ideally the downstream records should be processed somewhere near in time > as the source record. > > Lets take one partition, containing 1.000.000 records this is the observed > behaviour I have seen: (Somewhat simplified) > > Time Consumer offset Input topic Records in input topic > Consumer offset repartition topic Records in repartition topic > 00:000 1.000.000 > 00 > 00:01100.0001.000.000 > 0100.000 > 00:02200.0001.000.000 > 0200.000 > 00:03300.0001.000.000 > 0300.000 > 00:04400.0001.000.000 > 0400.000 > 00:05500.0001.000.000 > 0500.000 > 00:06600.0001.000.000 > 0600.000 > 00:07700.0001.000.000 > 0700.000 > 00:08800.0001.000.000 > 0800.000 > 00:09900.0001.000.000 > 0900.000 > 00:101.000.000 1.000.000 >01000.000 > 00:111.000.000 1.000.000 >100.000 1000.000 > 00:121.000.000 1.000.000 >200.000 1000.000 > 00:131.000.000
Error deserializing Avro message when using SpecificSerde
Hey folks, I am getting the below error when reading data from a kafka topic . I have used confluent serializers to serialize this data but when trying to consume using confuent deserializer running into the below error.Any idea on what the issue could be here ?.Also how do I skip this record and read the next one ? 10:17:32.924 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to coordinator xxx.com:9092 (id: 2147483645 rack: null) 10:17:33.144 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat response 10:17:34.872 [pool-1-thread-1] DEBUG io.confluent.kafka.schemaregistry.client.rest.RestService - Sending GET with input null to http://xxx.yy.com:8081/schemas/ids/321 10:17:35.983 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to coordinator xxx.com:9092 (id: 2147483645 rack: null) 10:17:36.203 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat response 10:17:39.039 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to coordinator xxx.com:9092 (id: 2147483645 rack: null) 10:17:39.257 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat response org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Logs-0 at offset 25106200. If needed, please seek past the record to continue consumption. Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 321 Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class com.test.model.avro.Log specified in writer's schema whilst finding reader's schema for a SpecificRecord. //Consumer Configs: Properties props = new Properties(); props.put("bootstrap.servers", "xxx:9092,yyy:9092"); props.put("group.id", groupId); props.put("enable.auto.commit", "false"); props.put("session.timeout.ms", "3"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); props.put("schema.registry.url", "http://xxx:8081";); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("max.poll.records", "100");
Re: Kafka Consumer Not Assigned Partitions
Calling `consumer.subscribe()` is a local call. Only when you call `consumer.poll()` the consumer will connect to the broker to get its assignment. Thus, it's save to call `poll()` directly. `assignment()` will return the assignment only after the first `poll()` call. -Matthias On 1/23/19 9:00 AM, chinchu chinchu wrote: > Hello, > I have subscribed to a kafka topic as below . I need to run some logic > only after the consumer has been assigned a partition .How ever > consumer.assignment() comes back as an empty set no matter how long I wait > . If I do not have the while loop and then do a consumer.poll() I do get > the records from the topic.Can any one tell me why this is happening ? > > consumer.subscribe(topics); > consumer. > > Set assigned=Collections.emptySet(); > while(isAssigned) > { > assigned = consumer.assignment(); > if(!assigned.isEmpty()) { >isAssigned= false; >} >} > > Thanks, > > On Tue, Jan 22, 2019 at 2:14 PM chinchu chinchu > wrote: > >> Hello, >> I have subscribed to a kafka topic as below . I need to run some logic >> only after the consumer has been assigned a partition .How ever >> consumer.assignment() comes back as an empty set no matter how long I wait >> . If I do not have the while loop and then do a consumer.poll() I do get >> the records from the topic.Can any one tell me why this is happening ? >> >> consumer.subscribe(topics); >> consumer. >> >> Set assigned=Collections.emptySet(); >> while(isAssigned) >> { >> assigned = consumer.assignment(); >> if(!assigned.isEmpty()) { >> isAssigned= false; >>} >>} >> >> Thanks, >> Chinchu >> > signature.asc Description: OpenPGP digital signature
Re: Error deserializing Avro message when using SpecificSerde
My application did not have the com.test.model.avro.Log class during run time . The log messages indicated this,it was just my oversight. On Wed, Jan 23, 2019 at 10:28 AM chinchu chinchu wrote: > Hey folks, > I am getting the below error when reading data from a kafka topic . I > have used confluent serializers to serialize this data but when trying > to consume using confuent deserializer running into the below error.Any > idea on what the issue could be here ?.Also how do I skip this record and > read the next one ? > > > 10:17:32.924 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to > coordinator xxx.com:9092 (id: 2147483645 rack: null) > 10:17:33.144 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat > response > 10:17:34.872 [pool-1-thread-1] DEBUG > io.confluent.kafka.schemaregistry.client.rest.RestService - Sending GET > with input null to http://xxx.yy.com:8081/schemas/ids/321 > 10:17:35.983 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to > coordinator xxx.com:9092 (id: 2147483645 rack: null) > 10:17:36.203 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat > response > 10:17:39.039 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to > coordinator xxx.com:9092 (id: 2147483645 rack: null) > 10:17:39.257 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat > response > > org.apache.kafka.common.errors.SerializationException: Error deserializing > key/value for partition Logs-0 at offset 25106200. > If needed, please seek past the record to continue consumption. > Caused by: org.apache.kafka.common.errors.SerializationException: Error > deserializing Avro message for id 321 > Caused by: org.apache.kafka.common.errors.SerializationException: Could > not find class com.test.model.avro.Log specified in writer's schema whilst > finding reader's schema for a SpecificRecord. > > > //Consumer Configs: > Properties props = new Properties(); > props.put("bootstrap.servers", "xxx:9092,yyy:9092"); > props.put("group.id", groupId); > props.put("enable.auto.commit", "false"); > props.put("session.timeout.ms", "3"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "io.confluent.kafka.serializers.KafkaAvroDeserializer"); > props.put("schema.registry.url", "http://xxx:8081";); > props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, > true); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > props.put("max.poll.records", "100"); >
Re: Kafka Consumer Not Assigned Partitions
Thanks Matthias. On Wed, Jan 23, 2019 at 10:55 AM Matthias J. Sax wrote: > Calling `consumer.subscribe()` is a local call. Only when you call > `consumer.poll()` the consumer will connect to the broker to get its > assignment. Thus, it's save to call `poll()` directly. > > `assignment()` will return the assignment only after the first `poll()` > call. > > > -Matthias > > On 1/23/19 9:00 AM, chinchu chinchu wrote: > > Hello, > > I have subscribed to a kafka topic as below . I need to run some logic > > only after the consumer has been assigned a partition .How ever > > consumer.assignment() comes back as an empty set no matter how long I > wait > > . If I do not have the while loop and then do a consumer.poll() I do get > > the records from the topic.Can any one tell me why this is happening ? > > > > consumer.subscribe(topics); > > consumer. > > > > Set assigned=Collections.emptySet(); > > while(isAssigned) > > { > > assigned = consumer.assignment(); > > if(!assigned.isEmpty()) { > >isAssigned= false; > >} > >} > > > > Thanks, > > > > On Tue, Jan 22, 2019 at 2:14 PM chinchu chinchu > > > wrote: > > > >> Hello, > >> I have subscribed to a kafka topic as below . I need to run some logic > >> only after the consumer has been assigned a partition .How ever > >> consumer.assignment() comes back as an empty set no matter how long I > wait > >> . If I do not have the while loop and then do a consumer.poll() I do > get > >> the records from the topic.Can any one tell me why this is happening ? > >> > >> consumer.subscribe(topics); > >> consumer. > >> > >> Set assigned=Collections.emptySet(); > >> while(isAssigned) > >> { > >> assigned = consumer.assignment(); > >> if(!assigned.isEmpty()) { > >> isAssigned= false; > >>} > >>} > >> > >> Thanks, > >> Chinchu > >> > > > >
Re: Open files clogging and KafkaStreams
I have to double check what version of broker we run in production but when testing and verifying the issue locally I did reproduce it with both broker and client version 2.1.0 Kind regards Niklas On Wed 23. Jan 2019 at 18:24, Guozhang Wang wrote: > I see. > > What you described is a known issue in the older version of Kafka, that > some high traffic topics in the bootstrap mode may effectively "starve" > other topics in the fetch response, since brokers used to naively fill in > the bytes that meets the max.bytes configuration and returns. This is fixed > in 1.1 version via incremental fetch request: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability > > The basic idea is to not always request topics like A,B,C; instead if the > previous request asks for topics A,B,C and got all data from A, then next > request would be B,C,A, etc. So if you are on older versions of Kafka I'd > suggest you upgrade to newer version. > > If you cannot upgrade atm, another suggest as I mentioned above is to > change the segment sizes so you can have much larger, and hence fewer > segment files. > > Guozhang > > > On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn wrote: > > > Hi Guozhang, > > > > I think I went a bit ahead of myself in describing the situation, I had > an > > attachment with the context in detail, maybe it was filtered out. Lets > try > > again =) > > > > We have a topology looking something like this: > > > > input-topic[20 partitions, compacted] > > | > > use-case-repartition[20 partitions, infinite retention, segment.ms > =10min] > > | > > use-case-changelog > > > > We have previously hit the TooManyOpenFiles issue and "solved" it by > > raising the bar to something extreme. > > Later we found out that we wanted rep factor 3 on all internal topics, so > > we reset the app and BOOM, now we hit a too many memory mapped files > limit > > instead > > > > the input topic contains 30 days of data, where we pretty much have > records > > in every 10minute window for every partition. > > This means if nothing consumes the repartition topic we will have 6 (10 > min > > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex > > files) * 3 replication factor / 5 brokers in cluster = *155.520 *open > files > > just to have this repartition topic in place. > > > > You would say, yeah but no problem as it would be deleted and you would > not > > reach such high numbers? But doesn't seem to be the case. > > What happened in our case is that, due to how the broker multiplexes the > > topic partitions for the subscribers, the streams application piled up > all > > the repartition records, and only when caught up, all the downstream > > processes started taking place. I do see this as a design flaw in some > > component, probably the broker. It cant be the desired behaviour. How > many > > open files do I need to be able to have open in a year of data when > > resetting/reprocessing an application? > > > > By adding more threads than input topic partitions, I managed to force > the > > broker to give out these records earlier and issue was mitigated. > > > > Ideally the downstream records should be processed somewhere near in time > > as the source record. > > > > Lets take one partition, containing 1.000.000 records this is the > observed > > behaviour I have seen: (Somewhat simplified) > > > > Time Consumer offset Input topic Records in input topic > > Consumer offset repartition topic Records in repartition topic > > 00:000 1.000.000 > > 00 > > 00:01100.0001.000.000 > > 0100.000 > > 00:02200.0001.000.000 > > 0200.000 > > 00:03300.0001.000.000 > > 0300.000 > > 00:04400.0001.000.000 > > 0400.000 > > 00:05500.0001.000.000 > > 0500.000 > > 00:06600.0001.000.000 > > 0600.000 > > 00:07700.0001.000.000 > > 0700.000 > > 00:08800.0001.000.000 > > 0800.000 > > 00:09900.0001.000.000 > > 0
Kafka Hdfs Connect Flush Size
Hey folks, I have been going through the hdfs connector code . I have a one question. Is the flush size in connector config the number of records read from a kafka partition or the number of records written to an hdfs path?. Looks like the recordCounter in TopicPartitionWriter is incremented for every record received from a kafka partition. In this case how does this connector handles records from the same kafka partition but going on to two different hdfs paths if flush size is at hdfs file level. After looking through the code and running TopicPartitionWriter test cases I think that the flush size is the number of records written to hdfs from a kafka partition . I ran the test case testWriteRecordFieldPartitioner() in TopicPartitionWriterTest and saw the same. Can some one clarify if my understanding is right ? https://docs.confluent.io/current/connect/kafka-connect-hdfs/configuration_options.html flush.size Number of records written to store before invoking file commits. Type: int Importance: high Thanks, Chinchu
Re: Open files clogging and KafkaStreams
I see (btw attachments are usually not allowed in AK mailing list, but if you have it somewhere like gitcode and can share the url that works). Could you let me know how many physical cores do you have in total hosting your app and how many threads did you configure? From your current description there should have at least 40 tasks (20 reading from source topics and writing to repartition topics, and 20 reading from repartition topics), and I'd like to know how are these tasks be assigned to threads, and how many threads may be executed in parallel from the hardware. Guozhang On Wed, Jan 23, 2019 at 1:21 PM Niklas Lönn wrote: > I have to double check what version of broker we run in production but when > testing and verifying the issue locally I did reproduce it with both broker > and client version 2.1.0 > > Kind regards > Niklas > > On Wed 23. Jan 2019 at 18:24, Guozhang Wang wrote: > > > I see. > > > > What you described is a known issue in the older version of Kafka, that > > some high traffic topics in the bootstrap mode may effectively "starve" > > other topics in the fetch response, since brokers used to naively fill in > > the bytes that meets the max.bytes configuration and returns. This is > fixed > > in 1.1 version via incremental fetch request: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability > > > > The basic idea is to not always request topics like A,B,C; instead if the > > previous request asks for topics A,B,C and got all data from A, then next > > request would be B,C,A, etc. So if you are on older versions of Kafka I'd > > suggest you upgrade to newer version. > > > > If you cannot upgrade atm, another suggest as I mentioned above is to > > change the segment sizes so you can have much larger, and hence fewer > > segment files. > > > > Guozhang > > > > > > On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn > wrote: > > > > > Hi Guozhang, > > > > > > I think I went a bit ahead of myself in describing the situation, I had > > an > > > attachment with the context in detail, maybe it was filtered out. Lets > > try > > > again =) > > > > > > We have a topology looking something like this: > > > > > > input-topic[20 partitions, compacted] > > > | > > > use-case-repartition[20 partitions, infinite retention, segment.ms > > =10min] > > > | > > > use-case-changelog > > > > > > We have previously hit the TooManyOpenFiles issue and "solved" it by > > > raising the bar to something extreme. > > > Later we found out that we wanted rep factor 3 on all internal topics, > so > > > we reset the app and BOOM, now we hit a too many memory mapped files > > limit > > > instead > > > > > > the input topic contains 30 days of data, where we pretty much have > > records > > > in every 10minute window for every partition. > > > This means if nothing consumes the repartition topic we will have 6 (10 > > min > > > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex > > > files) * 3 replication factor / 5 brokers in cluster = *155.520 *open > > files > > > just to have this repartition topic in place. > > > > > > You would say, yeah but no problem as it would be deleted and you would > > not > > > reach such high numbers? But doesn't seem to be the case. > > > What happened in our case is that, due to how the broker multiplexes > the > > > topic partitions for the subscribers, the streams application piled up > > all > > > the repartition records, and only when caught up, all the downstream > > > processes started taking place. I do see this as a design flaw in some > > > component, probably the broker. It cant be the desired behaviour. How > > many > > > open files do I need to be able to have open in a year of data when > > > resetting/reprocessing an application? > > > > > > By adding more threads than input topic partitions, I managed to force > > the > > > broker to give out these records earlier and issue was mitigated. > > > > > > Ideally the downstream records should be processed somewhere near in > time > > > as the source record. > > > > > > Lets take one partition, containing 1.000.000 records this is the > > observed > > > behaviour I have seen: (Somewhat simplified) > > > > > > Time Consumer offset Input topic Records in input topic > > > Consumer offset repartition topic Records in repartition topic > > > 00:000 1.000.000 > > > 00 > > > 00:01100.0001.000.000 > > > 0 > 100.000 > > > 00:02200.0001.000.000 > > > 0 > 200.000 > > > 00:03300.0001.000.000 > > > 0 > 300.000 > > > 00:04400.0001.000.000 > > > 0 > 400.000 > > > 00:05500.000
Single consumer subscribing to multiple topics
Hi Experts, I have a requirement to consume records from multiple topics and process those messages with a similar pattern. The schema of the messages in each topic is different and I am consuming it as Generic records. Both these topics have very high rate of traffic (10-15K messages/second) and would have 50 partitions each (with partition key being pretty much distributed). The offsets are committed manually for these topics to keep it in sync with the processing checkpoints. I was wondering whether I should use a single consumer subscribing to multiple topics or have separate consumer for each topic. Are there any considerable performance implications while consuming from multiple topics with a single consumer? I could not find any guidelines or much documentation on this. Would appreciate any guidelines on this. Thanks, -Manu