Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-23 Thread Peter Levart

Hi John,

Sorry I haven't had time to prepare the minimal reproducer yet. I still 
have plans to do it though...


On 1/22/19 8:02 PM, John Roesler wrote:

Hi Peter,

Just to follow up on the actual bug, can you confirm whether:
* when you say "restart", do you mean orderly shutdown and restart, or
crash and restart?


I start it as SpringBoot application from IDEA and then stop it with the 
red square button. It does initiate the shutdown sequence before 
exiting... So I think it is by SIGTERM which initiates JVM shutdown hook(s).



* have you tried this with EOS enabled? I can imagine some ways that there
could be duplicates, but they should be impossible with EOS enabled.


Yes, I have EOS enabled.



Thanks for your help,
-John


Regards, Peter



On Mon, Jan 14, 2019 at 1:20 PM John Roesler  wrote:


Hi Peter,

I see your train of thought, but the actual implementation of the
window store is structured differently from your mental model.
Unlike Key/Value stores, we know that the records in a window
store will "expire" on a regular schedule, and also that every single
record will eventually expire. With this in mind, we have implemented
an optimization to avoid a lot of compaction overhead in RocksDB, as
well as saving on range scans.

Instead of storing everything in one database, we open several
databases and bucket windows into them. Then, when windows
expire, we just ignore the records (i.e., the API makes them unreachable,
but we don't actually delete them). Once all the windows in a database
are expired, we just close and delete the whole database. Then, we open
a new one for new windows. If you look in the code, these databases are
called "segments".

Thus, I don't think that you should attempt to use the built-in window
stores
as you described. Instead, it should be straightforward to implement your
own StateStore with a layout that's more favorable to your desired
behavior.

You should also be able to set up the change log the way you need as well.
Explicitly removed entities also would get removed from the log as well, if
it's a compacted log.

Actually, what you're describing is *very* similar to the implementation
for suppress. I might actually suggest that you just copy the suppression
implementation and adapt it to your needs, or at the very least, study
how it works. In doing so, you might actually discover the cause of the
bug yourself!

I hope this helps, and thanks for your help,
-John


On Sat, Jan 12, 2019 at 5:45 AM Peter Levart 
wrote:


Hi Jonh,

Thank you very much for explaining how WindowStore works. I have some
more questions...

On 1/10/19 5:33 PM, John Roesler wrote:

Hi Peter,

Regarding retention, I was not referring to log retention, but to the
window store retention.
Since a new window is created every second (for example), there are in
principle an unbounded
number of windows (the longer the application runs, the more windows

there

are, with no end).
However, we obviously can't store an infinite amount of data, so the

window

definition includes
a retention period. By default, this is 24 hours. After the retention
period elapses, all of the data
for the window is purged to make room for new windows.

Right. Would the following work for example:

- configure retention of WindowStore to be "infinite"
- explicitly remove records from the store when windows are flushed out
- configure WindowStore log topic for compacting

Something like the following:

  Stores
  .windowStoreBuilder(
  Stores.persistentWindowStore(
  storeName,
  Duration.of(1000L, ChronoUnit.YEARS), //
retentionPeriod
  Duration.ofSeconds(10), // windowSize
  false
  ),
  keySerde, valSerde
  )
  .withCachingEnabled()
  .withLoggingEnabled(
  Map.of(
  TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT
  )
  );

Would in above scenario:

- the on-disk WindowStore be kept bounded (there could be some very old
entries in it but majority will be new - depending on the activity of
particular input keys)
- the log topic be kept bounded (explicitly removed entries would be
removed from compacted log too)

I'm moving away from DSL partly because I have some problems with
suppression (which I hope we'll be able to fix) and partly because the
DSL can't give me the complicated semantics that I need for the
application at hand. I tried to capture what I need in a custom
Transformer here:

https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f

Your knowledge of how WindowStore works would greatly help me decide if
this is a workable idea.


So what I meant was that if you buffer some key "A" in window (Monday
09:00:00) and then get
no further activity for A for over 24 hours, then when you do get that

next

event for A, say at
(Tuesday 11

Re: Open files clogging and KafkaStreams

2019-01-23 Thread Niklas Lönn
Hi Guozhang,

I think I went a bit ahead of myself in describing the situation, I had an
attachment with the context in detail, maybe it was filtered out. Lets try
again =)

We have a topology looking something like this:

input-topic[20 partitions, compacted]
|
use-case-repartition[20 partitions, infinite retention, segment.ms=10min]
|
use-case-changelog

We have previously hit the TooManyOpenFiles issue and "solved" it by
raising the bar to something extreme.
Later we found out that we wanted rep factor 3 on all internal topics, so
we reset the app and BOOM, now we hit a too many memory mapped files limit
instead

the input topic contains 30 days of data, where we pretty much have records
in every 10minute window for every partition.
This means if nothing consumes the repartition topic we will have 6 (10 min
slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex
files) * 3 replication factor / 5 brokers in cluster = *155.520 *open files
just to have this repartition topic in place.

You would say, yeah but no problem as it would be deleted and you would not
reach such high numbers? But doesn't seem to be the case.
What happened in our case is that, due to how the broker multiplexes the
topic partitions for the subscribers, the streams application piled up all
the repartition records, and only when caught up, all the downstream
processes started taking place. I do see this as a design flaw in some
component, probably the broker. It cant be the desired behaviour. How many
open files do I need to be able to have open in a year of data when
resetting/reprocessing an application?

By adding more threads than input topic partitions, I managed to force the
broker to give out these records earlier and issue was mitigated.

Ideally the downstream records should be processed somewhere near in time
as the source record.

Lets take one partition, containing 1.000.000 records this is the observed
behaviour I have seen: (Somewhat simplified)

Time Consumer offset Input topic Records in input topic
 Consumer offset repartition topic Records in repartition topic
00:000   1.000.000
 00
00:01100.0001.000.000
 0100.000
00:02200.0001.000.000
 0200.000
00:03300.0001.000.000
 0300.000
00:04400.0001.000.000
 0400.000
00:05500.0001.000.000
 0500.000
00:06600.0001.000.000
 0600.000
00:07700.0001.000.000
 0700.000
00:08800.0001.000.000
 0800.000
00:09900.0001.000.000
 0900.000
00:101.000.000 1.000.000
   01000.000
00:111.000.000 1.000.000
   100.000 1000.000
00:121.000.000 1.000.000
   200.000 1000.000
00:131.000.000 1.000.000
   300.000 1000.000
00:141.000.000 1.000.000
   400.000 1000.000
00:151.000.000 1.000.000
   500.000 1000.000
00:161.000.000 1.000.000
   600.000 1000.000
00:171.000.000 1.000.000
   700.000 1000.000
00:181.000.000 1.000.000
   800.000 1000.000
00:191.000.000 1.000.000
   900.000 1000.000
00:201.000.000 1.000.000
   1.000.000  1000.000

As you can see, there is no parallel execution and its due to that the
broker does not giv

Re: Kafka Consumer Not Assigned Partitions

2019-01-23 Thread chinchu chinchu
Hello,
I  have subscribed to a kafka topic  as below . I need to run some logic
only after the consumer has been assigned a partition .How ever
consumer.assignment() comes back as an empty set no matter  how long I wait
. If I do not have the while loop and then do a  consumer.poll() I do get
the records from the topic.Can any one tell me why this is  happening ?

consumer.subscribe(topics);
  consumer.

  Set assigned=Collections.emptySet();
  while(isAssigned)
{
  assigned = consumer.assignment();
 if(!assigned.isEmpty()) {
   isAssigned= false;
   }
   }

Thanks,

On Tue, Jan 22, 2019 at 2:14 PM chinchu chinchu 
wrote:

> Hello,
> I  have subscribed to a kafka topic  as below . I need to run some logic
> only after the consumer has been assigned a partition .How ever
> consumer.assignment() comes back as an empty set no matter  how long I wait
> . If I do not have the while loop and then do a  consumer.poll() I do get
> the records from the topic.Can any one tell me why this is  happening ?
>
> consumer.subscribe(topics);
>   consumer.
>
>   Set assigned=Collections.emptySet();
>   while(isAssigned)
> {
>   assigned = consumer.assignment();
>  if(!assigned.isEmpty()) {
>   isAssigned= false;
>}
>}
>
> Thanks,
> Chinchu
>


Kafka's cluster for thousands of clients (up to 30k)

2019-01-23 Thread Романенко Денис Владимирович

Hi all!

We're trying to use Kafka to deliver messages to thousands of clients 
(different data for each client).
Actually we want to get behavior pretty much like message broker, 
because we have to work with continious flow of small data pieces.

Suppose, we should create a partition for each client. Is it right?

We stuck with problems of infinite shrink/expand ISR and the 
"FETCH_SESSION_ID_NOT_FOUND" error.


My configuration:
5 servers (8 vCPU, 16GB RAM)

20k partitions
2000 consumers
-replication-factor - 2

queued.max.requests = 2000
replica.fetch.wait.max.ms = 5000
replica.lag.time.max.ms = 2
num.replica.fetchers = 5

num.network.threads=9
num.io.threads=24

--
Best Regards, Denis Romanenko
Head of ESB Development,
PJSC Magnit



Re: Open files clogging and KafkaStreams

2019-01-23 Thread Guozhang Wang
I see.

What you described is a known issue in the older version of Kafka, that
some high traffic topics in the bootstrap mode may effectively "starve"
other topics in the fetch response, since brokers used to naively fill in
the bytes that meets the max.bytes configuration and returns. This is fixed
in 1.1 version via incremental fetch request:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability

The basic idea is to not always request topics like A,B,C; instead if the
previous request asks for topics A,B,C and got all data from A, then next
request would be B,C,A, etc. So if you are on older versions of Kafka I'd
suggest you upgrade to newer version.

If you cannot upgrade atm, another suggest as I mentioned above is to
change the segment sizes so you can have much larger, and hence fewer
segment files.

Guozhang


On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn  wrote:

> Hi Guozhang,
>
> I think I went a bit ahead of myself in describing the situation, I had an
> attachment with the context in detail, maybe it was filtered out. Lets try
> again =)
>
> We have a topology looking something like this:
>
> input-topic[20 partitions, compacted]
> |
> use-case-repartition[20 partitions, infinite retention, segment.ms=10min]
> |
> use-case-changelog
>
> We have previously hit the TooManyOpenFiles issue and "solved" it by
> raising the bar to something extreme.
> Later we found out that we wanted rep factor 3 on all internal topics, so
> we reset the app and BOOM, now we hit a too many memory mapped files limit
> instead
>
> the input topic contains 30 days of data, where we pretty much have records
> in every 10minute window for every partition.
> This means if nothing consumes the repartition topic we will have 6 (10 min
> slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex
> files) * 3 replication factor / 5 brokers in cluster = *155.520 *open files
> just to have this repartition topic in place.
>
> You would say, yeah but no problem as it would be deleted and you would not
> reach such high numbers? But doesn't seem to be the case.
> What happened in our case is that, due to how the broker multiplexes the
> topic partitions for the subscribers, the streams application piled up all
> the repartition records, and only when caught up, all the downstream
> processes started taking place. I do see this as a design flaw in some
> component, probably the broker. It cant be the desired behaviour. How many
> open files do I need to be able to have open in a year of data when
> resetting/reprocessing an application?
>
> By adding more threads than input topic partitions, I managed to force the
> broker to give out these records earlier and issue was mitigated.
>
> Ideally the downstream records should be processed somewhere near in time
> as the source record.
>
> Lets take one partition, containing 1.000.000 records this is the observed
> behaviour I have seen: (Somewhat simplified)
>
> Time Consumer offset Input topic Records in input topic
>  Consumer offset repartition topic Records in repartition topic
> 00:000   1.000.000
>  00
> 00:01100.0001.000.000
>  0100.000
> 00:02200.0001.000.000
>  0200.000
> 00:03300.0001.000.000
>  0300.000
> 00:04400.0001.000.000
>  0400.000
> 00:05500.0001.000.000
>  0500.000
> 00:06600.0001.000.000
>  0600.000
> 00:07700.0001.000.000
>  0700.000
> 00:08800.0001.000.000
>  0800.000
> 00:09900.0001.000.000
>  0900.000
> 00:101.000.000 1.000.000
>01000.000
> 00:111.000.000 1.000.000
>100.000 1000.000
> 00:121.000.000 1.000.000
>200.000 1000.000
> 00:131.000.000   

Error deserializing Avro message when using SpecificSerde

2019-01-23 Thread chinchu chinchu
Hey folks,
I am getting the below error when reading data  from a kafka topic . I have
used  confluent serializers to  serialize this data  but when trying to
consume  using  confuent deserializer  running into the below error.Any
idea on what the issue could be here ?.Also how do I skip this record and
read the next one ?


10:17:32.924 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to
coordinator xxx.com:9092 (id: 2147483645 rack: null)
10:17:33.144 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat
response
10:17:34.872 [pool-1-thread-1] DEBUG
io.confluent.kafka.schemaregistry.client.rest.RestService - Sending GET
with input null to http://xxx.yy.com:8081/schemas/ids/321
10:17:35.983 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to
coordinator xxx.com:9092 (id: 2147483645 rack: null)
10:17:36.203 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat
response
10:17:39.039 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to
coordinator xxx.com:9092 (id: 2147483645 rack: null)
10:17:39.257 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat
response

org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition Logs-0 at offset 25106200.
If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 321
Caused by: org.apache.kafka.common.errors.SerializationException: Could not
find class com.test.model.avro.Log specified in writer's schema whilst
finding reader's schema for a SpecificRecord.


//Consumer Configs:
Properties props = new Properties();
props.put("bootstrap.servers", "xxx:9092,yyy:9092");
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "3");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://xxx:8081";);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("max.poll.records", "100");


Re: Kafka Consumer Not Assigned Partitions

2019-01-23 Thread Matthias J. Sax
Calling `consumer.subscribe()` is a local call. Only when you call
`consumer.poll()` the consumer will connect to the broker to get its
assignment. Thus, it's save to call `poll()` directly.

`assignment()` will return the assignment only after the first `poll()`
call.


-Matthias

On 1/23/19 9:00 AM, chinchu chinchu wrote:
> Hello,
> I  have subscribed to a kafka topic  as below . I need to run some logic
> only after the consumer has been assigned a partition .How ever
> consumer.assignment() comes back as an empty set no matter  how long I wait
> . If I do not have the while loop and then do a  consumer.poll() I do get
> the records from the topic.Can any one tell me why this is  happening ?
> 
> consumer.subscribe(topics);
>   consumer.
> 
>   Set assigned=Collections.emptySet();
>   while(isAssigned)
> {
>   assigned = consumer.assignment();
>  if(!assigned.isEmpty()) {
>isAssigned= false;
>}
>}
> 
> Thanks,
> 
> On Tue, Jan 22, 2019 at 2:14 PM chinchu chinchu 
> wrote:
> 
>> Hello,
>> I  have subscribed to a kafka topic  as below . I need to run some logic
>> only after the consumer has been assigned a partition .How ever
>> consumer.assignment() comes back as an empty set no matter  how long I wait
>> . If I do not have the while loop and then do a  consumer.poll() I do get
>> the records from the topic.Can any one tell me why this is  happening ?
>>
>> consumer.subscribe(topics);
>>   consumer.
>>
>>   Set assigned=Collections.emptySet();
>>   while(isAssigned)
>> {
>>   assigned = consumer.assignment();
>>  if(!assigned.isEmpty()) {
>>   isAssigned= false;
>>}
>>}
>>
>> Thanks,
>> Chinchu
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Error deserializing Avro message when using SpecificSerde

2019-01-23 Thread chinchu chinchu
 My application did not have the  com.test.model.avro.Log  class during run
time . The log messages indicated this,it was just my oversight.

On Wed, Jan 23, 2019 at 10:28 AM chinchu chinchu 
wrote:

> Hey folks,
> I am getting the below error when reading data  from a kafka topic . I
> have used  confluent serializers to  serialize this data  but when trying
> to consume  using  confuent deserializer  running into the below error.Any
> idea on what the issue could be here ?.Also how do I skip this record and
> read the next one ?
>
>
> 10:17:32.924 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to
> coordinator xxx.com:9092 (id: 2147483645 rack: null)
> 10:17:33.144 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat
> response
> 10:17:34.872 [pool-1-thread-1] DEBUG
> io.confluent.kafka.schemaregistry.client.rest.RestService - Sending GET
> with input null to http://xxx.yy.com:8081/schemas/ids/321
> 10:17:35.983 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to
> coordinator xxx.com:9092 (id: 2147483645 rack: null)
> 10:17:36.203 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat
> response
> 10:17:39.039 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=consumer-1, groupId=cgroupId9] Sending Heartbeat request to
> coordinator xxx.com:9092 (id: 2147483645 rack: null)
> 10:17:39.257 [kafka-coordinator-heartbeat-thread | cgroupId9] DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
> clientId=consumer-1, groupId=cgroupId9] Received successful Heartbeat
> response
>
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> key/value for partition Logs-0 at offset 25106200.
> If needed, please seek past the record to continue consumption.
> Caused by: org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id 321
> Caused by: org.apache.kafka.common.errors.SerializationException: Could
> not find class com.test.model.avro.Log specified in writer's schema whilst
> finding reader's schema for a SpecificRecord.
>
>
> //Consumer Configs:
> Properties props = new Properties();
> props.put("bootstrap.servers", "xxx:9092,yyy:9092");
> props.put("group.id", groupId);
> props.put("enable.auto.commit", "false");
> props.put("session.timeout.ms", "3");
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
> "io.confluent.kafka.serializers.KafkaAvroDeserializer");
> props.put("schema.registry.url", "http://xxx:8081";);
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
> true);
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> props.put("max.poll.records", "100");
>


Re: Kafka Consumer Not Assigned Partitions

2019-01-23 Thread chinchu chinchu
Thanks Matthias.

On Wed, Jan 23, 2019 at 10:55 AM Matthias J. Sax 
wrote:

> Calling `consumer.subscribe()` is a local call. Only when you call
> `consumer.poll()` the consumer will connect to the broker to get its
> assignment. Thus, it's save to call `poll()` directly.
>
> `assignment()` will return the assignment only after the first `poll()`
> call.
>
>
> -Matthias
>
> On 1/23/19 9:00 AM, chinchu chinchu wrote:
> > Hello,
> > I  have subscribed to a kafka topic  as below . I need to run some logic
> > only after the consumer has been assigned a partition .How ever
> > consumer.assignment() comes back as an empty set no matter  how long I
> wait
> > . If I do not have the while loop and then do a  consumer.poll() I do get
> > the records from the topic.Can any one tell me why this is  happening ?
> >
> > consumer.subscribe(topics);
> >   consumer.
> >
> >   Set assigned=Collections.emptySet();
> >   while(isAssigned)
> > {
> >   assigned = consumer.assignment();
> >  if(!assigned.isEmpty()) {
> >isAssigned= false;
> >}
> >}
> >
> > Thanks,
> >
> > On Tue, Jan 22, 2019 at 2:14 PM chinchu chinchu  >
> > wrote:
> >
> >> Hello,
> >> I  have subscribed to a kafka topic  as below . I need to run some logic
> >> only after the consumer has been assigned a partition .How ever
> >> consumer.assignment() comes back as an empty set no matter  how long I
> wait
> >> . If I do not have the while loop and then do a  consumer.poll() I do
> get
> >> the records from the topic.Can any one tell me why this is  happening ?
> >>
> >> consumer.subscribe(topics);
> >>   consumer.
> >>
> >>   Set assigned=Collections.emptySet();
> >>   while(isAssigned)
> >> {
> >>   assigned = consumer.assignment();
> >>  if(!assigned.isEmpty()) {
> >>   isAssigned= false;
> >>}
> >>}
> >>
> >> Thanks,
> >> Chinchu
> >>
> >
>
>


Re: Open files clogging and KafkaStreams

2019-01-23 Thread Niklas Lönn
I have to double check what version of broker we run in production but when
testing and verifying the issue locally I did reproduce it with both broker
and client version 2.1.0

Kind regards
Niklas

On Wed 23. Jan 2019 at 18:24, Guozhang Wang  wrote:

> I see.
>
> What you described is a known issue in the older version of Kafka, that
> some high traffic topics in the bootstrap mode may effectively "starve"
> other topics in the fetch response, since brokers used to naively fill in
> the bytes that meets the max.bytes configuration and returns. This is fixed
> in 1.1 version via incremental fetch request:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
>
> The basic idea is to not always request topics like A,B,C; instead if the
> previous request asks for topics A,B,C and got all data from A, then next
> request would be B,C,A, etc. So if you are on older versions of Kafka I'd
> suggest you upgrade to newer version.
>
> If you cannot upgrade atm, another suggest as I mentioned above is to
> change the segment sizes so you can have much larger, and hence fewer
> segment files.
>
> Guozhang
>
>
> On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn  wrote:
>
> > Hi Guozhang,
> >
> > I think I went a bit ahead of myself in describing the situation, I had
> an
> > attachment with the context in detail, maybe it was filtered out. Lets
> try
> > again =)
> >
> > We have a topology looking something like this:
> >
> > input-topic[20 partitions, compacted]
> > |
> > use-case-repartition[20 partitions, infinite retention, segment.ms
> =10min]
> > |
> > use-case-changelog
> >
> > We have previously hit the TooManyOpenFiles issue and "solved" it by
> > raising the bar to something extreme.
> > Later we found out that we wanted rep factor 3 on all internal topics, so
> > we reset the app and BOOM, now we hit a too many memory mapped files
> limit
> > instead
> >
> > the input topic contains 30 days of data, where we pretty much have
> records
> > in every 10minute window for every partition.
> > This means if nothing consumes the repartition topic we will have 6 (10
> min
> > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex
> > files) * 3 replication factor / 5 brokers in cluster = *155.520 *open
> files
> > just to have this repartition topic in place.
> >
> > You would say, yeah but no problem as it would be deleted and you would
> not
> > reach such high numbers? But doesn't seem to be the case.
> > What happened in our case is that, due to how the broker multiplexes the
> > topic partitions for the subscribers, the streams application piled up
> all
> > the repartition records, and only when caught up, all the downstream
> > processes started taking place. I do see this as a design flaw in some
> > component, probably the broker. It cant be the desired behaviour. How
> many
> > open files do I need to be able to have open in a year of data when
> > resetting/reprocessing an application?
> >
> > By adding more threads than input topic partitions, I managed to force
> the
> > broker to give out these records earlier and issue was mitigated.
> >
> > Ideally the downstream records should be processed somewhere near in time
> > as the source record.
> >
> > Lets take one partition, containing 1.000.000 records this is the
> observed
> > behaviour I have seen: (Somewhat simplified)
> >
> > Time Consumer offset Input topic Records in input topic
> >  Consumer offset repartition topic Records in repartition topic
> > 00:000   1.000.000
> >  00
> > 00:01100.0001.000.000
> >  0100.000
> > 00:02200.0001.000.000
> >  0200.000
> > 00:03300.0001.000.000
> >  0300.000
> > 00:04400.0001.000.000
> >  0400.000
> > 00:05500.0001.000.000
> >  0500.000
> > 00:06600.0001.000.000
> >  0600.000
> > 00:07700.0001.000.000
> >  0700.000
> > 00:08800.0001.000.000
> >  0800.000
> > 00:09900.0001.000.000
> >  0 

Kafka Hdfs Connect Flush Size

2019-01-23 Thread chinchu chinchu
Hey folks,
I have been going through the hdfs connector code . I have  a one question.

Is the flush size  in connector config the number of records read from a
kafka partition or the number of records written to an hdfs  path?.
Looks like  the  recordCounter in TopicPartitionWriter is incremented for
every record received from a kafka partition.
In this case  how does  this connector handles records  from the same kafka
partition  but going on to  two different hdfs  paths if flush size is at
 hdfs file level.
After looking through the code  and running TopicPartitionWriter test cases
I  think that the flush size is  the number of  records written to  hdfs
 from a  kafka partition . I ran the test case
testWriteRecordFieldPartitioner() in TopicPartitionWriterTest and saw the
same. Can some one clarify if my understanding is right ?

https://docs.confluent.io/current/connect/kafka-connect-hdfs/configuration_options.html


flush.size
Number of records written to store before invoking file commits.

Type: int
Importance: high
Thanks,
Chinchu


Re: Open files clogging and KafkaStreams

2019-01-23 Thread Guozhang Wang
I see (btw attachments are usually not allowed in AK mailing list, but if
you have it somewhere like gitcode and can share the url that works).

Could you let me know how many physical cores do you have in total hosting
your app and how many threads did you configure? From your current
description there should have at least 40 tasks (20 reading from source
topics and writing to repartition topics, and 20 reading from repartition
topics), and I'd like to know how are these tasks be assigned to threads,
and how many threads may be executed in parallel from the hardware.


Guozhang


On Wed, Jan 23, 2019 at 1:21 PM Niklas Lönn  wrote:

> I have to double check what version of broker we run in production but when
> testing and verifying the issue locally I did reproduce it with both broker
> and client version 2.1.0
>
> Kind regards
> Niklas
>
> On Wed 23. Jan 2019 at 18:24, Guozhang Wang  wrote:
>
> > I see.
> >
> > What you described is a known issue in the older version of Kafka, that
> > some high traffic topics in the bootstrap mode may effectively "starve"
> > other topics in the fetch response, since brokers used to naively fill in
> > the bytes that meets the max.bytes configuration and returns. This is
> fixed
> > in 1.1 version via incremental fetch request:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
> >
> > The basic idea is to not always request topics like A,B,C; instead if the
> > previous request asks for topics A,B,C and got all data from A, then next
> > request would be B,C,A, etc. So if you are on older versions of Kafka I'd
> > suggest you upgrade to newer version.
> >
> > If you cannot upgrade atm, another suggest as I mentioned above is to
> > change the segment sizes so you can have much larger, and hence fewer
> > segment files.
> >
> > Guozhang
> >
> >
> > On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn 
> wrote:
> >
> > > Hi Guozhang,
> > >
> > > I think I went a bit ahead of myself in describing the situation, I had
> > an
> > > attachment with the context in detail, maybe it was filtered out. Lets
> > try
> > > again =)
> > >
> > > We have a topology looking something like this:
> > >
> > > input-topic[20 partitions, compacted]
> > > |
> > > use-case-repartition[20 partitions, infinite retention, segment.ms
> > =10min]
> > > |
> > > use-case-changelog
> > >
> > > We have previously hit the TooManyOpenFiles issue and "solved" it by
> > > raising the bar to something extreme.
> > > Later we found out that we wanted rep factor 3 on all internal topics,
> so
> > > we reset the app and BOOM, now we hit a too many memory mapped files
> > limit
> > > instead
> > >
> > > the input topic contains 30 days of data, where we pretty much have
> > records
> > > in every 10minute window for every partition.
> > > This means if nothing consumes the repartition topic we will have 6 (10
> > min
> > > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex
> > > files) * 3 replication factor / 5 brokers in cluster = *155.520 *open
> > files
> > > just to have this repartition topic in place.
> > >
> > > You would say, yeah but no problem as it would be deleted and you would
> > not
> > > reach such high numbers? But doesn't seem to be the case.
> > > What happened in our case is that, due to how the broker multiplexes
> the
> > > topic partitions for the subscribers, the streams application piled up
> > all
> > > the repartition records, and only when caught up, all the downstream
> > > processes started taking place. I do see this as a design flaw in some
> > > component, probably the broker. It cant be the desired behaviour. How
> > many
> > > open files do I need to be able to have open in a year of data when
> > > resetting/reprocessing an application?
> > >
> > > By adding more threads than input topic partitions, I managed to force
> > the
> > > broker to give out these records earlier and issue was mitigated.
> > >
> > > Ideally the downstream records should be processed somewhere near in
> time
> > > as the source record.
> > >
> > > Lets take one partition, containing 1.000.000 records this is the
> > observed
> > > behaviour I have seen: (Somewhat simplified)
> > >
> > > Time Consumer offset Input topic Records in input topic
> > >  Consumer offset repartition topic Records in repartition topic
> > > 00:000   1.000.000
> > >  00
> > > 00:01100.0001.000.000
> > >  0
> 100.000
> > > 00:02200.0001.000.000
> > >  0
> 200.000
> > > 00:03300.0001.000.000
> > >  0
> 300.000
> > > 00:04400.0001.000.000
> > >  0
> 400.000
> > > 00:05500.000   

Single consumer subscribing to multiple topics

2019-01-23 Thread Manu Jacob
Hi Experts,

I have a requirement to consume records from multiple topics and process those 
messages with a similar pattern. The schema of the messages in each topic is 
different and I am consuming it as Generic records. Both these topics have very 
high rate of traffic (10-15K messages/second) and would have 50 partitions each 
(with partition key being pretty much distributed). The offsets are committed 
manually for these topics to keep it in sync with the processing checkpoints.  
I was wondering whether I should use a single consumer subscribing to multiple 
topics or have separate consumer for each topic. Are there any considerable 
performance implications while consuming from multiple topics with a single 
consumer? I could not find any guidelines or much documentation on this. Would 
appreciate any guidelines on this.

Thanks,
-Manu