Re: Producer request latency increase after client 0.10 upgrade

2016-09-05 Thread Ismael Juma
Hi Yifan,

Just to make sure I understand, the comparison is between 0.8.2.1 and
0.10.0.0 clients (both) connecting to a 0.10.0.0 cluster? We haven't
received reports of increased average producer latency and I'm not aware of
a change that could cause such a dramatic difference. Is compression being
used and did you paste the full producer config?

Ismael

On Fri, Sep 2, 2016 at 6:31 PM, Yifan Ying  wrote:

> The load before and after the upgrade are pretty similar. And all these
> configs stayed same too.
>
> Also please correct me if I am wrong. I think the request latency =
> response received time - request created time. And request created time is
> created when the records are ready to be sent after 'lingering'.
>
> Yifan
>
> On Thu, Sep 1, 2016 at 11:01 PM, Gerard Klijs 
> wrote:
>
> > With a linger of 5 seconds, 2-3 seconds would make sense when the load is
> > smaller, are are sure the measurements with 0.8.2.1 where with the same
> > load and/or linger worked correctly there?
> >
> > On Fri, Sep 2, 2016 at 1:12 AM Yifan Ying  wrote:
> >
> > > We tried to upgrade the Kafka clients dependency from 0.8.2.1 to
> > 0.10.0.0.
> > > As I understand, the producer client doesn't have major changes in
> 0.10,
> > so
> > > we kept the same producer config in the upgrade:
> > >
> > > retries 3
> > > retry.backoff.ms 5000
> > > timeout.ms 1
> > > block.on.buffer.full false
> > > linger.ms 5000
> > > metadata.fetch.timeout.ms 3
> > >
> > > However, after the upgrade, the avg producer request latency increased
> by
> > > 5×, from a few hundreds milliseconds to 2-3 seconds. Anyone has seen
> this
> > > issue? Is there any change in 0.9/0.10 that would cause this?
> > >
> > > Thanks!
> > >
> > > --
> > > Yifan
> > >
> >
>
>
>
> --
> Yifan
>


Re: Using kafka as a "message bus" for an event store

2016-09-05 Thread Tom Crayford
inline

On Mon, Sep 5, 2016 at 12:00 AM, F21  wrote:

> Hi all,
>
> I am currently looking at using Kafka as a "message bus" for an event
> store. I plan to have all my events written into HBase for permanent
> storage and then have a reader/writer that reads from HBase to push them
> into kafka.


> In terms of kafka, I plan to set it to keep all messages indefinitely.
> That way, if any consumers need to rebuild their views or if new consumers
> are created, they can just read from the stream to rebuild the views.
>

Kafka isn't designed at all for permanent message storage, except for
compacted topics. I suggest you rethink this, unless compacted topics work
for you (Kafka is not designed to keep unbounded amounts of data for
unbounded amounts of time, simply to provide messaging and replay over
short, bounded windows).


>
> I plan to use domain-driven design and will use the concept of aggregates
> in the system. An example of an aggregate might be a customer. All events
> for a given aggregate needs to be delivered in order. In the case of kafka,
> I would need to over partition the system by a lot, as any changes in the
> number of partitions could result in messages that were bound for a given
> partition being pushed into a newly created partition. Are there any issues
> if I create a new partition every time an aggregate is created? In a system
> with a large amount of aggregates, this will result in millions or hundreds
> of millions of partitions. Will this cause performance issues?
>

Yes.

Kafka is designed to support hundreds to thousands of partitions per
machine, not millions (and there is an upper bound per cluster which is
well below one million). I suggest you rethink this and likely use a
standard "hash based partitioning" scheme.


>
> Cheers,
>
> Francis
>
>


Re: Producer request latency increase after client 0.10 upgrade

2016-09-05 Thread Ismael Juma
One thing that changed in 0.9.0.0 is the introduction of `max.block.ms`,
which is 60 seconds by default. Since you have "block.on.buffer.full=false",
you may want to tweak `max.block.ms`.

Ismael

On Mon, Sep 5, 2016 at 1:54 PM, Ismael Juma  wrote:

> Hi Yifan,
>
> Just to make sure I understand, the comparison is between 0.8.2.1 and
> 0.10.0.0 clients (both) connecting to a 0.10.0.0 cluster? We haven't
> received reports of increased average producer latency and I'm not aware of
> a change that could cause such a dramatic difference. Is compression being
> used and did you paste the full producer config?
>
> Ismael
>
> On Fri, Sep 2, 2016 at 6:31 PM, Yifan Ying  wrote:
>
>> The load before and after the upgrade are pretty similar. And all these
>> configs stayed same too.
>>
>> Also please correct me if I am wrong. I think the request latency =
>> response received time - request created time. And request created time is
>> created when the records are ready to be sent after 'lingering'.
>>
>> Yifan
>>
>> On Thu, Sep 1, 2016 at 11:01 PM, Gerard Klijs 
>> wrote:
>>
>> > With a linger of 5 seconds, 2-3 seconds would make sense when the load
>> is
>> > smaller, are are sure the measurements with 0.8.2.1 where with the same
>> > load and/or linger worked correctly there?
>> >
>> > On Fri, Sep 2, 2016 at 1:12 AM Yifan Ying  wrote:
>> >
>> > > We tried to upgrade the Kafka clients dependency from 0.8.2.1 to
>> > 0.10.0.0.
>> > > As I understand, the producer client doesn't have major changes in
>> 0.10,
>> > so
>> > > we kept the same producer config in the upgrade:
>> > >
>> > > retries 3
>> > > retry.backoff.ms 5000
>> > > timeout.ms 1
>> > > block.on.buffer.full false
>> > > linger.ms 5000
>> > > metadata.fetch.timeout.ms 3
>> > >
>> > > However, after the upgrade, the avg producer request latency
>> increased by
>> > > 5×, from a few hundreds milliseconds to 2-3 seconds. Anyone has seen
>> this
>> > > issue? Is there any change in 0.9/0.10 that would cause this?
>> > >
>> > > Thanks!
>> > >
>> > > --
>> > > Yifan
>> > >
>> >
>>
>>
>>
>> --
>> Yifan
>>
>
>


Re: Issue adding server (0.10.0.0)

2016-09-05 Thread Wannes De Smet
Hi all

We keep having this issue. After increasing the fetch threads, we cleared
the entire cluster, upgraded to 0.10.0.1, started all nodes, and all was
well. We cannot reduce the fetch size, as it is equal to our
max.message.size. Increasing the number of replica threads to a higher
count increased the memory usage too much, causing countless out of
heapspace / out of direct buffer memory exceptions. We have now set two
fetch threads, which leaves some headroom. Our full config is pasted below
[2].

Today, to make sure this issue was resolved we tried adding a fourth server
to the cluster, and then reassigned all partitions. Unfortunately, the
fourth node will not sync up. This is a snippet from its log file:

...
[2016-09-05 16:13:52,296] WARN [ReplicaFetcherThread-0-2], Error in fetch
kafka.server.ReplicaFetcherThread$FetchRequest@318e6f11
(kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field
'responses': Error reading field 'partition_responses': Error reading field
'record_set': Error reading bytes of size 104856899, only 19862997 bytes
available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
at
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at
kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:136)
at
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
at
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
at
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-05 16:13:58,227] WARN [ReplicaFetcherThread-0-0], Error in fetch
kafka.server.ReplicaFetcherThread$FetchRequest@5dfb502b
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 0 was disconnected before the response
was read
at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
at scala.Option.foreach(Option.scala:257)
at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
at
kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137)
at
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
at
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
at
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-05 16:14:03,228] WARN [ReplicaFetcherThread-1-0], Error in fetch
kafka.server.ReplicaFetcherThread$FetchRequest@1831418c
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 0 was disconnected before the response
was read
at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
at scala.Option.foreach(Option.scala:257)
at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
at
kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137)
at
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
at
kafka.utils.NetworkClientBlockingOps$.blockingSendAndR

Kafka related queries

2016-09-05 Thread Gauri Padbidri
Hi All,

 

I am very new to Kafka. I am still on the verge of deciding the platform to
choose between Kafka and Kinesis for our requirement. Also it may take me
more time to come up with a full proof answer, hence thought of
parallelizing my search. Following are a few queries around Kafka : 

 

. Does Kafka support sending Streaming Records over raw TCP ? 

.  Does Kafka expect the packet to be BASE 64 encoded ? 

.  How much egress and ingress capacity does Kafka support per Partition
? 

.  We would be using Kafka for ONLY FOR Data Ingest, do read replicas
exist for Partitions and if they get replicated to other nodes in the Kafka
cluster?

 

Having these queries answered will help me a lot to freeze on Kafka. Look
forward to your quick response. Thanks !

 

Best Regards,

Gauri Padbidri



After restart Kafka does not see Topics

2016-09-05 Thread Vasilij Syc
Hello,
I am running kafka 0.9.0.1, there I have 1 broker witth around 5 topics
each has 1 partition, I manage kafka via KafkaManager webui tool,
periodically something happens with Kafka and it stops seeing topics,
despite that process runs (withou errors) and topics available on the disk.
In order to return everything back, I have to go via KafkaManager add
Cluster and Topics again, only in that way my publisher see and publish
messages.

In error logs I can see only following warnings:

[2016-09-05 13:46:58,617] WARN Found a corrupted index file,
/usr/local/share/kafka-logs/lesson_finish_stream_production-0/2331.index,
deleting and rebuilding index... (kafka.log.Log)


Have anyone had same issue? How to overcome it?

Thank you.
Sincerely,
Vasilij


How to set the offset of a topic:partition for a specific consumer group to repay / reconsume messages ?

2016-09-05 Thread Vincent Dautremont
Hi,
This seems like a basic question but I can't find the answer :

I'm trying to find the right tool (in kafka/bin ) to set an offset value of
a topic:partition for a specific consumer-group in order to replay consumed
messages

this link tells how to get the offset of the topic:partition of a
consumer-group
http://kafka.apache.org/documentation.html#basic_ops_consumer_lag
I now want to set it.
Note that this is with the high level consumer that and consumer group
offsets stored in Kafka (not ZK).

Thank you,
Vincent.


Kafka Streams: joins without windowing (KStream) and without being KTables

2016-09-05 Thread Guillermo Lammers Corral
Hi,

I've been thinking how to solve with Kafka Streams one of my business
process without success for the moment. Hope someone can help me.

I am reading from two topics events like that (I'll simplify the problem at
this point):

ObjectX
Key: String
Value: String

ObjectY
Key: String
Value: String

I want to do some kind of "join" for all events without windowing but also
without being KTables...

Example:

==

ObjectX("0001", "a") -> TopicA

Expected output TopicResult:

nothing

==

ObjectX("0001", "b") -> Topic A

Expected output TopicResult:

nothing

==

ObjectY("0001", "d") -> Topic B:

Expected output TopicResult:

ObjectZ("0001", ("a", "d"))
ObjectZ("0001", ("b", "d"))

==

==

ObjectY("0001", "e") -> Topic B:

Expected output TopicResult:

ObjectZ("0001", ("a", "e"))
ObjectZ("0001", ("b", "e"))

==

TopicResult at the end:

ObjectZ("0001", ("a", "d"))
ObjectZ("0001", ("b", "d"))
ObjectZ("0001", ("a", "e"))
ObjectZ("0001", ("b", "e"))

==

I think I can't use KTable-KTable join because I want to match all the
events from the beginning of time. Hence, I can't use KStream-KStream join
because force me to use windowing. Same for KStream-KTable join...

Any expert using Kafka Streams could help me with some tips?

Thanks in advance.


Re: Kafka related queries

2016-09-05 Thread Marko Bonaći
Hi Gauri,
I'll take a stab at your questions (others will undoubtedly correct me if
I'm wrong).

Kafka does work over TCP (what else :)
No format expectations (binary).
End to end latency depends on various parameters, like network, memory, but
it is (if you consume straight away; i.e. no consumer lag; i.e. lag can fit
in the OS page cache so you're not even hitting disk when consuming)
measured in low 10s of ms.
No read replicas. You only read from partition's master. I.e. replicas are
used to achieve redundancy.


Marko Bonaći
Monitoring | Alerting | Anomaly Detection | Centralized Log Management
Solr & Elasticsearch Support
Sematext  | Contact


On Mon, Sep 5, 2016 at 9:17 AM, Gauri Padbidri 
wrote:

> Hi All,
>
>
>
> I am very new to Kafka. I am still on the verge of deciding the platform to
> choose between Kafka and Kinesis for our requirement. Also it may take me
> more time to come up with a full proof answer, hence thought of
> parallelizing my search. Following are a few queries around Kafka :
>
>
>
> . Does Kafka support sending Streaming Records over raw TCP ?
>
> .  Does Kafka expect the packet to be BASE 64 encoded ?
>
> .  How much egress and ingress capacity does Kafka support per
> Partition
> ?
>
> .  We would be using Kafka for ONLY FOR Data Ingest, do read replicas
> exist for Partitions and if they get replicated to other nodes in the Kafka
> cluster?
>
>
>
> Having these queries answered will help me a lot to freeze on Kafka. Look
> forward to your quick response. Thanks !
>
>
>
> Best Regards,
>
> Gauri Padbidri
>
>


Re: Kafka related queries

2016-09-05 Thread Marko Bonaći
BTW regarding latency:
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

Marko Bonaći
Monitoring | Alerting | Anomaly Detection | Centralized Log Management
Solr & Elasticsearch Support
Sematext  | Contact


On Mon, Sep 5, 2016 at 7:13 PM, Marko Bonaći 
wrote:

> Hi Gauri,
> I'll take a stab at your questions (others will undoubtedly correct me if
> I'm wrong).
>
> Kafka does work over TCP (what else :)
> No format expectations (binary).
> End to end latency depends on various parameters, like network, memory,
> but it is (if you consume straight away; i.e. no consumer lag; i.e. lag can
> fit in the OS page cache so you're not even hitting disk when consuming)
> measured in low 10s of ms.
> No read replicas. You only read from partition's master. I.e. replicas are
> used to achieve redundancy.
>
>
> Marko Bonaći
> Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> Solr & Elasticsearch Support
> Sematext  | Contact
> 
>
> On Mon, Sep 5, 2016 at 9:17 AM, Gauri Padbidri  > wrote:
>
>> Hi All,
>>
>>
>>
>> I am very new to Kafka. I am still on the verge of deciding the platform
>> to
>> choose between Kafka and Kinesis for our requirement. Also it may take me
>> more time to come up with a full proof answer, hence thought of
>> parallelizing my search. Following are a few queries around Kafka :
>>
>>
>>
>> . Does Kafka support sending Streaming Records over raw TCP ?
>>
>> .  Does Kafka expect the packet to be BASE 64 encoded ?
>>
>> .  How much egress and ingress capacity does Kafka support per
>> Partition
>> ?
>>
>> .  We would be using Kafka for ONLY FOR Data Ingest, do read replicas
>> exist for Partitions and if they get replicated to other nodes in the
>> Kafka
>> cluster?
>>
>>
>>
>> Having these queries answered will help me a lot to freeze on Kafka. Look
>> forward to your quick response. Thanks !
>>
>>
>>
>> Best Regards,
>>
>> Gauri Padbidri
>>
>>
>


Re: Kafka Streams: joins without windowing (KStream) and without being KTables

2016-09-05 Thread Matthias J. Sax
Hey,

are you sure, you want to join everything? This will result in a huge
memory footprint of your application. You are right, that you cannot use
KTable, however, windowed KStream joins would work -- you only need to
specify a huge window (ie, use Long.MAX_VALUE; this will effectively be
"infinitely large") thus that all data falls into a single window.

The issue will be, that all data will be buffered in memory, thus, if
your application run very long, it will eventually fail (I would
assume). Thus, again my initial question: are you sure, you want to join
everything? (It's stream processing, not batch processing...)

If the answer is still yes, and you hit a memory issue, you will need to
fall back to use Processor API instead of DSL to spill data to disk if
it does not fit into memory and more (ie, you will need to implement
your own version of an symmetric-hash-join that spills to disk). Of
course, the disk usage will also be huge. Eventually, your disc might
also become too small...

Can you clarify, why you want to join everything? This does not sound
like a good idea. Very large windows are handleable, but "infinite"
windows are very problematic in stream processing.


-Matthias


On 09/05/2016 06:25 PM, Guillermo Lammers Corral wrote:
> Hi,
> 
> I've been thinking how to solve with Kafka Streams one of my business
> process without success for the moment. Hope someone can help me.
> 
> I am reading from two topics events like that (I'll simplify the problem at
> this point):
> 
> ObjectX
> Key: String
> Value: String
> 
> ObjectY
> Key: String
> Value: String
> 
> I want to do some kind of "join" for all events without windowing but also
> without being KTables...
> 
> Example:
> 
> ==
> 
> ObjectX("0001", "a") -> TopicA
> 
> Expected output TopicResult:
> 
> nothing
> 
> ==
> 
> ObjectX("0001", "b") -> Topic A
> 
> Expected output TopicResult:
> 
> nothing
> 
> ==
> 
> ObjectY("0001", "d") -> Topic B:
> 
> Expected output TopicResult:
> 
> ObjectZ("0001", ("a", "d"))
> ObjectZ("0001", ("b", "d"))
> 
> ==
> 
> ==
> 
> ObjectY("0001", "e") -> Topic B:
> 
> Expected output TopicResult:
> 
> ObjectZ("0001", ("a", "e"))
> ObjectZ("0001", ("b", "e"))
> 
> ==
> 
> TopicResult at the end:
> 
> ObjectZ("0001", ("a", "d"))
> ObjectZ("0001", ("b", "d"))
> ObjectZ("0001", ("a", "e"))
> ObjectZ("0001", ("b", "e"))
> 
> ==
> 
> I think I can't use KTable-KTable join because I want to match all the
> events from the beginning of time. Hence, I can't use KStream-KStream join
> because force me to use windowing. Same for KStream-KTable join...
> 
> Any expert using Kafka Streams could help me with some tips?
> 
> Thanks in advance.
> 



signature.asc
Description: OpenPGP digital signature


Authorization with Topic Wildcards

2016-09-05 Thread Derar Alassi
Hi all,

Although the documentation mentions that one can use wildcards with topic
ACLs, I couldn't get that to work. Essentially, I want to set an Allow
Read/Write ACL on topics com.domain.xyz.* to a certain user. This would
give this user Read/Write access to topics com.domain.xyz.abc and
com.domain.xyz.def .

I set an ACL using this command:
./kafka-acls.sh --authorizer-properties zookeeper.connect=
--add --allow-principal User:"user01"   --topic com.domain.xyz.* --group
group01 --operation read

When I try to consume from the topic com.domain.xyz.abc  using the same
user ID and group, I get NOT_AUTHORIZED error.

Anything I am missing?

Thanks,
Derar


Re: Authorization with Topic Wildcards

2016-09-05 Thread Tom Crayford
if you're running that at a bash or similar shell, you need to quote the
"*" so that bash doesn't expand it as a glob:

./kafka-acls.sh --authorizer-properties zookeeper.connect=
--add --allow-principal User:"user01"   --topic 'com.domain.xyz.*' --group
group01 --operation read

It may be instructive to look at what data is in zookeeper for the acls to
debug this.

On Mon, Sep 5, 2016 at 7:38 PM, Derar Alassi  wrote:

> Hi all,
>
> Although the documentation mentions that one can use wildcards with topic
> ACLs, I couldn't get that to work. Essentially, I want to set an Allow
> Read/Write ACL on topics com.domain.xyz.* to a certain user. This would
> give this user Read/Write access to topics com.domain.xyz.abc and
> com.domain.xyz.def .
>
> I set an ACL using this command:
> ./kafka-acls.sh --authorizer-properties zookeeper.connect=
> --add --allow-principal User:"user01"   --topic com.domain.xyz.* --group
> group01 --operation read
>
> When I try to consume from the topic com.domain.xyz.abc  using the same
> user ID and group, I get NOT_AUTHORIZED error.
>
> Anything I am missing?
>
> Thanks,
> Derar
>


micro-batching in kafka streams

2016-09-05 Thread Ara Ebrahimi
Hi,

What’s the best way to do micro-batching in Kafka Streams? Any plans for a 
built-in mechanism? Perhaps StateStore could act as the buffer? What exactly 
are ProcessorContext.schedule()/punctuate() for? They don’t seem to be used 
anywhere?

http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/

Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: Authorization with Topic Wildcards

2016-09-05 Thread Derar Alassi
Yes, I am running it from the command line. Zookeeper has *com.domain.xyz.**
under /kafka-acl node. So it looks like it's being added correctly. I
actually allowed some time for ACL propagation to the Kafka brokers.



On Mon, Sep 5, 2016 at 11:42 AM, Tom Crayford  wrote:

> if you're running that at a bash or similar shell, you need to quote the
> "*" so that bash doesn't expand it as a glob:
>
> ./kafka-acls.sh --authorizer-properties zookeeper.connect=
> --add --allow-principal User:"user01"   --topic 'com.domain.xyz.*' --group
> group01 --operation read
>
> It may be instructive to look at what data is in zookeeper for the acls to
> debug this.
>
> On Mon, Sep 5, 2016 at 7:38 PM, Derar Alassi 
> wrote:
>
> > Hi all,
> >
> > Although the documentation mentions that one can use wildcards with topic
> > ACLs, I couldn't get that to work. Essentially, I want to set an Allow
> > Read/Write ACL on topics com.domain.xyz.* to a certain user. This would
> > give this user Read/Write access to topics com.domain.xyz.abc and
> > com.domain.xyz.def .
> >
> > I set an ACL using this command:
> > ./kafka-acls.sh --authorizer-properties zookeeper.connect= str>
> > --add --allow-principal User:"user01"   --topic com.domain.xyz.* --group
> > group01 --operation read
> >
> > When I try to consume from the topic com.domain.xyz.abc  using the same
> > user ID and group, I get NOT_AUTHORIZED error.
> >
> > Anything I am missing?
> >
> > Thanks,
> > Derar
> >
>


Re: Authorization with Topic Wildcards

2016-09-05 Thread Ismael Juma
Hi Derar,

The support for wildcards is limited to `*` at this point. Sorry for the
confusion. If you're interested to submit a PR to clarify the
documentation, that would be great. :)

Ismael

On Mon, Sep 5, 2016 at 7:38 PM, Derar Alassi  wrote:

> Hi all,
>
> Although the documentation mentions that one can use wildcards with topic
> ACLs, I couldn't get that to work. Essentially, I want to set an Allow
> Read/Write ACL on topics com.domain.xyz.* to a certain user. This would
> give this user Read/Write access to topics com.domain.xyz.abc and
> com.domain.xyz.def .
>
> I set an ACL using this command:
> ./kafka-acls.sh --authorizer-properties zookeeper.connect=
> --add --allow-principal User:"user01"   --topic com.domain.xyz.* --group
> group01 --operation read
>
> When I try to consume from the topic com.domain.xyz.abc  using the same
> user ID and group, I get NOT_AUTHORIZED error.
>
> Anything I am missing?
>
> Thanks,
> Derar
>


Re: Kafka Streams: joins without windowing (KStream) and without being KTables

2016-09-05 Thread Guillermo Lammers Corral
Hi Matthias,

Good question... the main problem is related with the kind of my data. The
first source of data is time based and the second one not but both have a
field with the same value (I don't know how to use it in the join without
being key. It can't, let me explain why):

ObjectX (sameValue, date, valueX)
ObjectY (uniqueId, sameValue, valueY)

I want to create a result object based on X and Y using sameValue as "key"
but there are some problems here:

   - sameValue of ObjectX cannot be key because I must take care of date
   - sameValue of ObjectY cannot be key because sameValue is not key of
   ObjectX (we couldn't join anything)
   - uniqueId of ObjectY cannot be key because does not exists in ObjectX
   (we couldn't join anything)
   - I couldn't use as key something like someValue_date because date does
   not exists in ObjectY (we couldn't join anything)

So, actually I don't know how to implement this using Kafka Streams. I need
join data using a value field of each message (sameValue but not as key)
and do it indefinetely because I don't know when data will be sent whereas
the process will always be creating new result objects.

Basically, I want to use streaming with Kafka Stream to make joins between
two sources of data but we cannot use KTable (key problems) and we cannot
use windowed KStream (or yes but with memory issues as you said) because I
don't know when data will arrive and I cannot lose data (any matching
between both sources).

Do you see any solution? Will I have to use Processor API instead of DSL to
spill data to disk as you said?

Thanks in advance!

2016-09-05 20:00 GMT+02:00 Matthias J. Sax :

> Hey,
>
> are you sure, you want to join everything? This will result in a huge
> memory footprint of your application. You are right, that you cannot use
> KTable, however, windowed KStream joins would work -- you only need to
> specify a huge window (ie, use Long.MAX_VALUE; this will effectively be
> "infinitely large") thus that all data falls into a single window.
>
> The issue will be, that all data will be buffered in memory, thus, if
> your application run very long, it will eventually fail (I would
> assume). Thus, again my initial question: are you sure, you want to join
> everything? (It's stream processing, not batch processing...)
>
> If the answer is still yes, and you hit a memory issue, you will need to
> fall back to use Processor API instead of DSL to spill data to disk if
> it does not fit into memory and more (ie, you will need to implement
> your own version of an symmetric-hash-join that spills to disk). Of
> course, the disk usage will also be huge. Eventually, your disc might
> also become too small...
>
> Can you clarify, why you want to join everything? This does not sound
> like a good idea. Very large windows are handleable, but "infinite"
> windows are very problematic in stream processing.
>
>
> -Matthias
>
>
> On 09/05/2016 06:25 PM, Guillermo Lammers Corral wrote:
> > Hi,
> >
> > I've been thinking how to solve with Kafka Streams one of my business
> > process without success for the moment. Hope someone can help me.
> >
> > I am reading from two topics events like that (I'll simplify the problem
> at
> > this point):
> >
> > ObjectX
> > Key: String
> > Value: String
> >
> > ObjectY
> > Key: String
> > Value: String
> >
> > I want to do some kind of "join" for all events without windowing but
> also
> > without being KTables...
> >
> > Example:
> >
> > ==
> >
> > ObjectX("0001", "a") -> TopicA
> >
> > Expected output TopicResult:
> >
> > nothing
> >
> > ==
> >
> > ObjectX("0001", "b") -> Topic A
> >
> > Expected output TopicResult:
> >
> > nothing
> >
> > ==
> >
> > ObjectY("0001", "d") -> Topic B:
> >
> > Expected output TopicResult:
> >
> > ObjectZ("0001", ("a", "d"))
> > ObjectZ("0001", ("b", "d"))
> >
> > ==
> >
> > ==
> >
> > ObjectY("0001", "e") -> Topic B:
> >
> > Expected output TopicResult:
> >
> > ObjectZ("0001", ("a", "e"))
> > ObjectZ("0001", ("b", "e"))
> >
> > ==
> >
> > TopicResult at the end:
> >
> > ObjectZ("0001", ("a", "d"))
> > ObjectZ("0001", ("b", "d"))
> > ObjectZ("0001", ("a", "e"))
> > ObjectZ("0001", ("b", "e"))
> >
> > ==
> >
> > I think I can't use KTable-KTable join because I want to match all the
> > events from the beginning of time. Hence, I can't use KStream-KStream
> join
> > because force me to use windowing. Same for KStream-KTable join...
> >
> > Any expert using Kafka Streams could help me with some tips?
> >
> > Thanks in advance.
> >
>
>


Re: Using kafka as a "message bus" for an event store

2016-09-05 Thread F21

Hi Tom,

Thank you so much for your response. I had a feeling that approach would 
run into scalability problems, so thank you for confirming that.


Another approach would be to have each service request a subscription 
from the event store. The event store then creates a unique kafka topic 
for each service. If multiple instances of a service requests a 
subscription, the event store should only create the topic once and 
return the name of the topic to the service.


A reader/writer would read from HBase and push new messages into each topic.

In this case, I would set my topics to retain message for, say, 5 days 
in the event that a service goes down and we need to bring it back up.


The event store would also query kafka to see which topics have not been 
read from for say 30 days and delete them. This would be for cases where 
a service is decommissioned. Does kafka provide a way to check when the 
topic was last read from?


Does this sound like a saner way?

Cheers,
Francis

On 5/09/2016 11:00 PM, Tom Crayford wrote:

inline

On Mon, Sep 5, 2016 at 12:00 AM, F21  wrote:


Hi all,

I am currently looking at using Kafka as a "message bus" for an event
store. I plan to have all my events written into HBase for permanent
storage and then have a reader/writer that reads from HBase to push them
into kafka.



In terms of kafka, I plan to set it to keep all messages indefinitely.
That way, if any consumers need to rebuild their views or if new consumers
are created, they can just read from the stream to rebuild the views.


Kafka isn't designed at all for permanent message storage, except for
compacted topics. I suggest you rethink this, unless compacted topics work
for you (Kafka is not designed to keep unbounded amounts of data for
unbounded amounts of time, simply to provide messaging and replay over
short, bounded windows).



I plan to use domain-driven design and will use the concept of aggregates
in the system. An example of an aggregate might be a customer. All events
for a given aggregate needs to be delivered in order. In the case of kafka,
I would need to over partition the system by a lot, as any changes in the
number of partitions could result in messages that were bound for a given
partition being pushed into a newly created partition. Are there any issues
if I create a new partition every time an aggregate is created? In a system
with a large amount of aggregates, this will result in millions or hundreds
of millions of partitions. Will this cause performance issues?


Yes.

Kafka is designed to support hundreds to thousands of partitions per
machine, not millions (and there is an upper bound per cluster which is
well below one million). I suggest you rethink this and likely use a
standard "hash based partitioning" scheme.



Cheers,

Francis






Re: Question: Data Loss and Data Duplication in Kafka

2016-09-05 Thread Jayesh Thakrar
Thanks Radha Krishna!
So from what I understand, data loss can happen at producer due to 
BufferExhaustedException, failure to close/terminate producer and due to 
communication errors (first figure below).
And at the broker during unclean leader election (i.e. electing a leader that 
was not in ISR) or if a leader crashes and the min.insync.replicas 
configuration for the topic/broker was 1, in which case there is a potential 
loss of small number of messages as contained within the 
replica.lag.time.max.ms duration.  
Also wondering if you and the community can validate these two pictures below?









  From: R Krishna 
 To: users@kafka.apache.org; Jayesh Thakrar  
 Sent: Tuesday, August 30, 2016 2:02 AM
 Subject: Re: Question: Data Loss and Data Duplication in Kafka
   
Experimenting with kafka myself, and found timeouts/batch expiry (valid and 
invalid configurations), and max retries also can drop messages unless you 
handle and log them gracefully. There are also a bunch of 
org.apache.kafka.common.KafkaException hierarchy exceptions some of which are 
thrown for valid reasons but also drop messages like size of messages, buffer 
size, etc.,. 

On Sun, Aug 28, 2016 at 1:55 AM, Jayesh Thakrar  
wrote:

I am looking at ways how one might have data loss and duplication in a Kafka 
cluster and need some help/pointers/discussions.
So far, here's what I have come up with:
Loss at producer-sideSince the data send call is actually adding data to a 
cache/buffer, a crash of the producer can potentially result in data 
loss.Another scenario for data loss is a producer exiting without closing the 
producer connection.
Loss at broker-sideI think there are several situations here - all of which are 
triggered by a broker or controller crash or network issues with zookeepers 
(kind of simulating broker crashes). 
If I understand correctly, KAFKA-1211 (https://issues.apache.org/ 
jira/browse/KAFKA-1211) implies that when acks is set to 0/1 and the leader 
crashes, there is a probability of data loss. Hopefully implementation of 
leader generation will help avoid this (https://issues.apache.org/ 
jira/browse/KAFKA-1211? focusedCommentId=15402622& page=com.atlassian.jira. 
plugin.system.issuetabpanels: comment-tabpanel#comment- 15402622)
And a unique situation as described in KAFKA-3410 (https://issues.apache.org/ 
jira/browse/KAFKA-3410) can cause broker or cluster shutdown leading to data 
loss as described in KAFKA-3924 (resolved in 0.10.0.1).
And data duplication can attributed primarily to consumer offset management 
which is done at batch/periodic intervals.
Can anyone think or know of any other scenarios?
Thanks,Jayesh







-- 
Radha Krishna, Proddaturi253-234-5657

   

Re: Monitoring Kafka client

2016-09-05 Thread Span Marius
Hello Otis,

Thank you for your reply. Sorry for not being very explicit. For this
particular case, the failed application was on the consumer side, however,
monitoring the producer in the same way would be desired as well. I had a
look into SMP. I looks good however I'm up to finding a way to check the
health status programmatically so that I could integrate this into the my
existing monitoring infrastructure.

Marius

On Sat, Sep 3, 2016 at 7:23 AM, Otis Gospodnetić  wrote:

> Hi,
>
> By "kafka client" I assume you mean you Kafka producer and/or consumers?
> If so, any decent Kafka monitoring solution should let you monitor that.
> See https://sematext.com/spm/integrations/kafka-monitoring/ for an
> example.
>
> Otis
> --
> Monitoring - Log Management - Alerting - Anomaly Detection
> Solr & Elasticsearch Consulting Support Training - http://sematext.com/
>
>
> On Thu, Sep 1, 2016 at 8:15 AM, Span Marius  wrote:
>
> > Hi,
> >
> > My application recently experience a network connectivity issue which
> lead
> > into getting the client (0.8.2.2) disconnected. After the network was
> > restored the client failed to reconnect because while trying to do this,
> > resolving the Zookeeper Server hostname to an IP failed as well (DNS
> > failure).
> >
> > Is there a reliable way in checking the health status of the client? I
> > would like to integrate this into the application's healthcheck service,
> > therefore a programmatic approach would fit best.
> >
> > Marius Span
> >
>


Producer/Consumer config for length Kafka broker maintenance

2016-09-05 Thread Harald Kirsch

Hi all,

there are so many timeouts to tweak mentioned in the documentation that 
I wonder what the correct configuration for producer and consumer is to 
survive a, say, 1 hour, broker shutdown.


With "survive" I mean that the processes are idle or blocked and keep 
trying to send their data, and just pick up work shortly after the 
broker appears again.


I have the following suspects to tweak:

PRODUCER:
connections.max.idle.ms: (9 minutes) Does a lost connection count as idle?

max.block.ms: (default 1 minute) Seems a definite candidate to raise to 
1 hour.


request.timeout.ms: (default 1/2 minute)
metadata.fetch.timeout.ms: (default 1 minute)
retry.backoff.ms: (default 100ms)

CONSUMER:
connections.max.idle.ms: (same as above I guess)
request.timeout.ms

What would be a good combination of settings?

Harald.


Re: Kafka bootup exception while recovering log file

2016-09-05 Thread Jaikiran Pai
I'm not from the Kafka dev team so I won't be able to comment whether 
this is an expected way to fail or if this needs to be handled in a more 
cleaner/robust manner (at least very least probably a better exception 
message). Since you have put in efforts to write a test case and narrow 
it down to this specific flow, maybe you can send a mail to their dev 
mailing list and/or maybe create a JIRA to report this.


-Jaikiran

On Tuesday 30 August 2016 12:07 PM, Gaurav Agarwal wrote:

Kafka version: 0.10.0

Exception Trace

java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:37)
at kafka.log.LogSegment.recover(LogSegment.scala:189)
at kafka.log.Log.recoverLog(Log.scala:268)
at kafka.log.Log.loadSegments(Log.scala:243)
at kafka.log.Log.(Log.scala:101)
at kafka.log.LogTest.testCorruptLog(LogTest.scala:830)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Test Code (same exception trace is see in broker logs as well on prod
machines with exactly the same lof files as given in this mini test)
-

val logProps = new Properties()
logProps.put(LogConfig.MaxMessageBytesProp, 15 * 1024 * 1024: java.lang.Integer)
val config = LogConfig(logProps)
val cp = new File("/Users/gaurav/Downloads/corrupt/gaurav/kafka-logs/Topic3-12")
var log = new Log(cp, config, 0, time.scheduler, time


On Tue, Aug 30, 2016 at 11:37 AM, Jaikiran Pai 
wrote:


Can you paste the entire exception stacktrace please?

-Jaikiran

On Tuesday 30 August 2016 11:23 AM, Gaurav Agarwal wrote:


Hi there, just wanted to bump up the thread one more time to check if
someone can point us in the right direction... This one was quite a
serious
failure that took down many of our kafka brokers..

On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal 
We are facing a weird problem where Kafka broker fails to start due to an
unhandled exception while 'recovering' a log segment. I have been able to
isolate the problem to a single record and providing the details below:

During Kafka restart, if index files are corrupted or they don't exist,
kafka broker is trying to 'recover' a LogSegment and rebuild the indexes
-
LogSegment:recover()
I the main while loop here which iterates over the entries in the log:
while(iter.hasNext) { val entry = iter.next}, I get an entry with
complete underlying byte buffer as follows:

[82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102,
10,
39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101,
108,
46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46,
114,
101, 102, 101, 114, 101, 110, 99, 101, 16, -120,

Re: Producer request latency increase after client 0.10 upgrade

2016-09-05 Thread Yifan Ying
Hi Ismael,

Thanks for replying.

Yes. It's the comparison between 0.8.2.1 and 0.10.0.0 clients connecting to
0.10.0.0 brokers(log.message.format.version=0.8.2.1). No compression is
used and that's all the producer config.

>From the Kafka code, I think the request-latency-avg metric seem not
include the blocking time, but only measures the time between creating
requests and getting response from brokers. Blocking on full buffer happens
before that. Please correct me if I am wrong.

Yifan


On Mon, Sep 5, 2016 at 5:59 AM, Ismael Juma  wrote:

> One thing that changed in 0.9.0.0 is the introduction of `max.block.ms`,
> which is 60 seconds by default. Since you have
> "block.on.buffer.full=false",
> you may want to tweak `max.block.ms`.
>
> Ismael
>
> On Mon, Sep 5, 2016 at 1:54 PM, Ismael Juma  wrote:
>
> > Hi Yifan,
> >
> > Just to make sure I understand, the comparison is between 0.8.2.1 and
> > 0.10.0.0 clients (both) connecting to a 0.10.0.0 cluster? We haven't
> > received reports of increased average producer latency and I'm not aware
> of
> > a change that could cause such a dramatic difference. Is compression
> being
> > used and did you paste the full producer config?
> >
> > Ismael
> >
> > On Fri, Sep 2, 2016 at 6:31 PM, Yifan Ying  wrote:
> >
> >> The load before and after the upgrade are pretty similar. And all these
> >> configs stayed same too.
> >>
> >> Also please correct me if I am wrong. I think the request latency =
> >> response received time - request created time. And request created time
> is
> >> created when the records are ready to be sent after 'lingering'.
> >>
> >> Yifan
> >>
> >> On Thu, Sep 1, 2016 at 11:01 PM, Gerard Klijs 
> >> wrote:
> >>
> >> > With a linger of 5 seconds, 2-3 seconds would make sense when the load
> >> is
> >> > smaller, are are sure the measurements with 0.8.2.1 where with the
> same
> >> > load and/or linger worked correctly there?
> >> >
> >> > On Fri, Sep 2, 2016 at 1:12 AM Yifan Ying  wrote:
> >> >
> >> > > We tried to upgrade the Kafka clients dependency from 0.8.2.1 to
> >> > 0.10.0.0.
> >> > > As I understand, the producer client doesn't have major changes in
> >> 0.10,
> >> > so
> >> > > we kept the same producer config in the upgrade:
> >> > >
> >> > > retries 3
> >> > > retry.backoff.ms 5000
> >> > > timeout.ms 1
> >> > > block.on.buffer.full false
> >> > > linger.ms 5000
> >> > > metadata.fetch.timeout.ms 3
> >> > >
> >> > > However, after the upgrade, the avg producer request latency
> >> increased by
> >> > > 5×, from a few hundreds milliseconds to 2-3 seconds. Anyone has seen
> >> this
> >> > > issue? Is there any change in 0.9/0.10 that would cause this?
> >> > >
> >> > > Thanks!
> >> > >
> >> > > --
> >> > > Yifan
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> Yifan
> >>
> >
> >
>



-- 
Yifan


Re: kafka broker is dropping the messages after acknowledging librdkafka

2016-09-05 Thread Mazhar Shaikh
Hi Jun / Kafka Team,

Do we have any solution for this issue ?

"During zookeeper re-init kafka broker truncates messages and ends up in
loosing records"

I'm ok with with duplicate messages being stored instead of dropping.

Is there any configuration in kafka where follower broker replicates these
messages to leader instead of dropping it ?

Thank you.

Regards,
Mazhar Shaikh.



On Tue, Aug 30, 2016 at 6:42 PM, Mazhar Shaikh 
wrote:

> Hi Jun,
>
> Yes, the data is lost during leader broker failure.
> But the leader broker failed due to zookeeper session expiry.
> GC logs doesn't show any error/warns during this period.
>
> Its not easy reproduce. during long run (>12hrs) with 30k msg/sec load
> balanced across 96 partitions, some time in between this failure is noticed
> (once/twice).
>
> looks like this issue is similar to "https://issues.apache.org/
> jira/browse/KAFKA-1211".
>
> But here the leader broker could have synced/committed all the the
> existing data to replica before the replica is elected as leader.
>
> below are few log around this time.
>
> broker b2 was controller.
>
> b2:
> Server.log:
> [2016-08-26 16:15:49,701] INFO re-registering broker info in ZK for broker
> 1 (kafka.server.KafkaHealthcheck)
> [2016-08-26 16:15:49,738] INFO Registered broker 1 at path /brokers/ids/1
> with address b2.broker.com:9092. (kafka.utils.ZkUtils$)
> [2016-08-26 16:15:49,739] INFO done re-registering broker (kafka.server.
> KafkaHealthcheck)
> [2016-08-26 16:15:49,740] INFO Subscribing to /brokers/topics path to
> watch for new topics (kafka.server.KafkaHealthcheck)
> [2016-08-26 16:15:50,055] INFO New leader is 0 (kafka.server.
> ZookeeperLeaderElector$LeaderChangeListener)
> [2016-08-26 16:15:50,538] WARN [KafkaApi-1] Produce request with
> correlation id 422786 from client rdkafka on partition [topic,92] failed
> due to Leader not local for partition [topic,92] on broker 1
> (kafka.server.KafkaApis)
> [2016-08-26 16:15:50,544] INFO Truncating log topic-92 to offset 1746617.
> (kafka.log.Log)
> [2016-08-26 16:15:50,562] WARN [KafkaApi-1] Produce request with
> correlation id 422793 from client rdkafka on partition [topic,92] failed
> due to Leader not local for partition [topic,92] on broker 1
> (kafka.server.KafkaApis)
> [2016-08-26 16:15:50,578] WARN [KafkaApi-1] Produce request with
> correlation id 422897 from client rdkafka on partition [topic,92] failed
> due to Leader not local for partition [topic,92] on broker 1
> (kafka.server.KafkaApis)
> [2016-08-26 16:15:50,719] ERROR Closing socket for /169.254.2.116 because
> of error (kafka.network.Processor)
> kafka.common.KafkaException: Size of FileMessageSet
> /data/kafka/broker-b2/topic-66/.log has been
> truncated during write: old size 1048576, new size 0
> at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144)
> at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70)
> at kafka.network.MultiSend.writeTo(Transmission.scala:101)
> at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125)
> at kafka.network.MultiSend.writeTo(Transmission.scala:101)
> at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
> at kafka.network.Processor.write(SocketServer.scala:472)
> at kafka.network.Processor.run(SocketServer.scala:342)
> at java.lang.Thread.run(Thread.java:744)
> [2016-08-26 16:15:50,729] ERROR Closing socket for /169.254.2.116 because
> of error (kafka.network.Processor)
> kafka.common.KafkaException: Size of FileMessageSet
> /data/kafka/broker-b2/topic-68/.log has been
> truncated during write: old size 1048576, new size 0
> at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144)
> at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70)
> at kafka.network.MultiSend.writeTo(Transmission.scala:101)
> at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125)
> at kafka.network.MultiSend.writeTo(Transmission.scala:101)
> at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
> at kafka.network.Processor.write(SocketServer.scala:472)
> at kafka.network.Processor.run(SocketServer.scala:342)
> at java.lang.Thread.run(Thread.java:744)
>
>
> kafkaServer-gc.log:
> 2016-08-26T16:14:47.123+0530: 6123.763: [GC2016-08-26T16:14:47.123+0530:
> 6123.763: [ParNew: 285567K->5992K(314560K), 0.0115200 secs]
> 648907K->369981K(1013632K), 0.0116800 secs] [Times: user=0.19 sys=0.00,
> real=0.01 secs]
> 2016-08-26T16:14:56.327+0530: 6132.967: [GC2016-08-26T16:14:56.327+0530:
> 6132.967: [ParNew: 285608K->5950K(314560K), 0.0105600 secs]
> 649597K->370626K(1013632K), 0.0107550 secs] [Times: user=0.15 sys=0.01,
> real=0.01 secs]
> 2016-08-26T16:15:06.615+0530: 6143.255: [GC2016-08-26T16:15:06.615+0530:
> 6143.255: [ParNew: 285566K->6529K(314560K), 0.0214330 secs]
> 650242K->371864K(1013632K), 0.0216380 secs] [Times: user=0.30 sys=0.03,
> real=0.02

Re: KIP-33 Opt out from Time Based indexing

2016-09-05 Thread Jan Filipiak

Hi Jun,

sorry for the late reply. Regarding B, my main concern was just 
complexity of understanding what's going on.
As you can see it took me probably some 2 days or so, to fully grab all 
the details in the implementation and what
the impacts are. Usually I prefer to turn things I don't use off, so I 
don't have to bother. Log Append time will work for me.


Rolling logs was my main concern. The producer can specify the timestamp 
and we use epoch inside the message, I'd bet money,
people in the company would have put this epoch also in the produce 
record. => rollings logs as the broker thinks its millis.
So that would probably have caused us at least one outage if a big 
producer had upgraded and done this, IMO likely mistake.


Id just hoped for a more obvious kill-switch, so I didn’t need to bother 
that much.


Best Jan




On 29.08.2016 19:36, Jun Rao wrote:

Jan,

For the usefulness of time index, it's ok if you don't plan to use it.
However, I do think there are other people who will want to use it. Fixing
an application bug always requires some additional work. Intuitively, being
able to seek back to a particular point of time for replay is going to be
much more efficient than always replaying from the very beginning,
especially when the log is retained for a long period of time. Sure, if you
want to have more confidence, you want to rewind a bit conservatively. But
being able to rewind an extra hour makes a big difference from having to
rewind all to way to 7 days or however long the retention time is.

For the OffsetRequest, I actually agree with you that it's useful. People
can use that to find the first and the last offset and the offset based on
a specific point in time. The part that's a bit awkward with OffsetRequest
is that it's based on the last modified time of the log segment, which
makes it imprecise (precision is at the segment level, not message level)
and non-deterministic (last modified time may change). Another awkwardness
is that it supports returning a list of offsets after a specified
timestamp. We did that simply because timestamp was only at the segment
level then. So, our plan is to replace OffsetRequest with a new one. It
will give you the same functionality: find the first and the last offset
and the offset based on a specific point in time. It will just be better
since it's more precise and more deterministic. For your use case, it seems
that you don't care about message creation time. Then, it's possible for
you to configure the broker with the log append time. Whether this should
be default at the Kafka level is debatable, but it won't prevent your use
case.

For your suggesting on refactoring, I still want to understand how
necessary it is. Your main concerns so far seem to be.
(a) Impact on rolling log segments.
(b) Time-based index is not useful for me.

Item (a) is a good point. Thanks for that. We will fix it. Item (b), I have
given my view on this above. Are there any other things that you think that
having a time-based index will hurt?

Thanks,

Jun

On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak 
wrote:


Hi Jun,

thanks for taking the time to answer on such a detailed level. You are
right Log.fetchOffsetByTimestamp works, the comment is just confusing
"// Get all the segments whose largest timestamp is smaller than target
timestamp" wich is apparently is not what takeWhile does (I am more on
the Java end of things, so I relied on the comment).

Regarding the frequent file rolling i didn't think of Logcompaction but
that indeed is a place where  can hit the fan pretty easy. especially
if you don't have many updates in there and you pass the timestamp along in
a kafka-streams application. Bootstrapping a new application then indeed
could produce quite a few old messages kicking this logrolling of until a
recent message appears. I guess that makes it a practical issue again even
with the 7 days. Thanks for pointing out! Id like to see the appendTime as
default, I am very happy that I have it in the backpocket for purpose of
tighter sleep and not to worry to much about someone accidentally doing
something dodgy on a weekend with our clusters

Regarding the usefulness, you will not be able to sell it for me. I don't
know how people build applications with this ¯\_(ツ)_/¯ but I don't want to
see them.
Look at the error recovery with timestamp seek:
For fixing a bug, a user needs to stop the SP, truncate all his downstream
data perfectly based on their time window.Then restart and do the first
fetch based
again on the perfect window timeout. From then on, he still has NO clue
whatsoever if messages that come later now with an earlier timestamp need
to go into the
previous window or not. (Note that there is  >>>absolutly no<<< way to
determine this in aggregated downstream windowed stores). So the user is in
 even though he can seek, he
can't rule out his error. IMO it helps them to build the wrong thing, that
will just be operational pain *somewhere*

Look