Kafka over Satellite links

2016-03-02 Thread Jan
Hi folks; 
does anyone know of Kafka's ability to work over Satellite links. We have a IoT 
Telemetry application that uses Satellite communication to send data from 
remote sites to a Central hub. 
Any help/ input/ links/ gotchas would be much appreciated. 
Regards,Jan

Re: Kafka over Satellite links

2016-03-03 Thread Jan
Thanks for the input. 
The IoT application could have a http ingestion receiver to funnel data into 
Kafka. However, I am thinking about Kafka's ability to be able to injest 
traffic directly from IoT devices.  
Does anyone have any recommendations while using Kafka Connect between producer 
&  consumer sides over Satellite links.       Does the Satellite link really 
make any difference v/s a slow Internet link ? 
Input/ suggestions would be much appreciated.
ThanksJan
 

On Thursday, March 3, 2016 1:16 AM, Christian Csar  
wrote:
 

 I would not do that. I admit I may be a bit biased due to working for
Buddy Platform (IoT backend stuff including telemetry collection), but
you want to send the data via some protocol (HTTP? MQTT? COAP?) to the
central hub and then have those servers put the data into Kafka. Now
if you want to use Kafka there are the various HTTP front ends that
will basically put the data into Kafka for you without the client
needing to deal with the partition management part. But putting data
into Kafka directly really seems like a bad idea even if it's a large
number of messages per second per node, even if the security parts
work out for you.

Christian

On Wed, Mar 2, 2016 at 9:52 PM, Jan  wrote:
> Hi folks;
> does anyone know of Kafka's ability to work over Satellite links. We have a 
> IoT Telemetry application that uses Satellite communication to send data from 
> remote sites to a Central hub.
> Any help/ input/ links/ gotchas would be much appreciated.
> Regards,Jan


  

Re: [ERICSSON] - Trade Compliance: ECCN code for Apache Items

2017-06-27 Thread jan
I appreciate there may be a loss of subtlety traversing languages, but
this doesn't come over to politely.

I can't help you, the best I can find is
<http://www.apache.org/licenses/exports/>. This *may* be more helpful
than posting here although it covers none of the software you mention,
sorry, but maybe it's worth a look through that page.

I have to admit I'd never heard of ECCN classifications and am
surprised it even exists.

cheers

jan


On 27/06/2017, Axelle Margot  wrote:
> Hello,
>
> You were contacted as part of a new project in France.
>
> For all products you offer, HW and / or SW, we need, as usual, you provide
> some information about your products in order to better prepare the orders.
> So can you send us now the following information about your products:
>
>   *   EU ECCN Code: who define if the product is dual use or not.
>
>  This code is in the format:  Digit - Letter- Digit - Digit - Digit + an
> extension of Letter - Digit - Letter
>
> Example: 5D001.d.2.a to the SW or HW for 5A002.d.2.a
>
> Nota: Ericsson France needs the European ECCN Code, not the US ECCN Code.
>
>
>
>   *   HST code or TARIC code: corresponding to the complete description of
> the property and to define the customs taxes
>
>
>
> If you can't find the ECCN Product code:
> - If you are a reseller, you must contact your supplier as soon as possible
> to send us the information quickly.
> - If it's your equipment, the responsibility of the classification is yours.
> You can refer to Regulation (EC) No 428/2009 of 5 May 2009, or for France
> office, you can also have a look on SBDU website (Service Of Dual-use)
> http://www.entreprises.gouv.fr/biens-double -usage / Home
>
>
>
> We need the EU ECCN Code and HST code for the following family product:
>
>
> Apache
>
> Kafka 0.10.2.1
>
> Zookeper 3.4.9
>
> Puppet client
>
>
>
>
> Regarding the ECCN Code, is this is a mass market product, thanks to precise
> us.
>
>
>
> Please find attached some file who can helps you.
>
> I remind you that in our internal data system we can't record your items
> without the EU ECCN Code. This one is mandatory to valid the order.
>
>
>
> We need these information for the end of next week, the 7th of July.
>
> For Further information, please contact us.
> Best regards,
>
> Axelle MARGOT
> Trade Compliance Adviser / Controle Export
> ERICSSON FRANCE & ERICSSON MAGHREB
> 25, Avenue Carnot
> 91348 MASSY Cedex, France
> Phone : +33 1 81 87 44 11
> Mobile : +33 6 60 14 34 28
> Email : axelle.mar...@ericsson.com<mailto:axelle.mar...@ericsson.com>
> www.ericsson.com<http://www.ericsson.com/>
>
> [Description: Ericsson]
>
>
>
> Remember: a dual-use purpose:
> According to the usual definition, fall into this category "property,
> equipment - including technology, software, know-how immaterial or
> intangible - that could have both civilian and military uses or may - wholly
> or partly - contribute to the development, production, handling, operation,
> maintenance, storage, detection, identification, dissemination of weapons of
> mass destruction '' (WMD - nuclear, biological, chemical , etc.).
>
>
>
>
>
> From: Celine Heerdt
> Sent: lundi 26 juin 2017 15:36
> To: users@kafka.apache.org
> Cc: Axelle Margot 
> Subject: ECCN code for Kafka 0.10.2.1
>
> Hi,
> We in Ericsson are interested in the SW Kafka 0.10.2.1 from Apache. In order
> to begin the trade compliance process, could you please send us the ECCN
> codes for that SW.
> Best regards
> Céline Heerdt
>
>
> [Ericsson]<http://www.ericsson.com/>
> Céline Heerdt
> Project Manager
>
> Ericsson
> 25 Avenue Carnot
> 91348 Massy, France
> Mobile +33 6 71 53 92 02
> celine.hee...@ericsson.com<mailto:celine.hee...@ericsson.com>
> www.ericsson.com<http://www.ericsson.com/>
>
>


Re: [ERICSSON] - Trade Compliance: ECCN code for Apache Items

2017-06-28 Thread jan
@Martin Gainty - see my link:

"The Apache Software Foundation (ASF) is a 501(c)3 nonprofit charity
based in the United States of America. All of our products are
developed via online collaboration in public forums and distributed
from a central server within the U.S. Therefore, U.S. export laws and
regulations apply to our distributions and remain in force as products
and technology are re-exported to different parties and places around
the world. Information on export control classifications and
associated restrictions may be required for exporting, re-exporting,
record keeping, bundling/embedding of ASF products, encryption
reporting, and shipping documentation."

I agree with you, it seems bizarre, and wrong.

jan

On 28/06/2017, Martin Gainty  wrote:
>
> MG>am requesting clarification below
> 
> From: Axelle Margot 
> Sent: Tuesday, June 27, 2017 8:48 AM
> To: users@kafka.apache.org
> Cc: Celine Heerdt; Wassila BenJelloul.ext; Florent Poulain
> Subject: [ERICSSON] - Trade Compliance: ECCN code for Apache Items
>
>
> Hello,
>
> You were contacted as part of a new project in France.
>
> For all products you offer, HW and / or SW, we need, as usual, you provide
> some information about your products in order to better prepare the orders.
> So can you send us now the following information about your products:
>
>   *   EU ECCN Code: who define if the product is dual use or not.
>
>  This code is in the format:  Digit - Letter- Digit - Digit - Digit + an
> extension of Letter - Digit – Letter
>
> Example: 5D001.d.2.a to the SW or HW for 5A002.d.2.a
>
> Nota: Ericsson France needs the European ECCN Code, not the US ECCN Code.
>
>
> mg>kafka is licensed as OpenSource by ASF and not a commercial (for sale for
> profit) product  ..how is European/American ECCN applicable?
>
>
>   *   HST code or TARIC code: corresponding to the complete description of
> the property and to define the customs taxes
>   *
>
> mg>kafka is licensed as OpenSource by ASF and not a commercial (for sale for
> profit) product  ..how is HST applicable?
>
>
>
> If you can’t find the ECCN Product code:
> - If you are a reseller, you must contact your supplier as soon as possible
> to send us the information quickly.
> - If it’s your equipment, the responsibility of the classification is yours.
> You can refer to Regulation (EC) No 428/2009 of 5 May 2009, or for France
> office, you can also have a look on SBDU website (Service Of Dual-use)
> http://www.entreprises.gouv.fr/biens-double -usage / Home
>
>
>
> We need the EU ECCN Code and HST code for the following family product:
>
>
>
> Apache
>
>
> Kafka 0.10.2.1
>
>
> Zookeper 3.4.9
>
>
> Puppet client
>
>
>
>
> Regarding the ECCN Code, is this is a mass market product, thanks to precise
> us.
>
>
>
> Please find attached some file who can helps you.
>
> I remind you that in our internal data system we can’t record your items
> without the EU ECCN Code. This one is mandatory to valid the order.
>
>
>
> We need these information for the end of next week, the 7th of July.
>
> For Further information, please contact us.
> Best regards,
>
>
>
> Axelle MARGOT
>
> Trade Compliance Adviser / Controle Export
>
> ERICSSON FRANCE & ERICSSON MAGHREB
>
> 25, Avenue Carnot
> 91348 MASSY Cedex, France
>
> Phone : +33 1 81 87 44 11
>
> Mobile : +33 6 60 14 34 28
>
> Email : axelle.mar...@ericsson.com<mailto:axelle.mar...@ericsson.com>
>
> www.ericsson.com<http://www.ericsson.com/>
>
>
>
> [Description: Ericsson]
>
>
>
>
>
> Remember: a dual-use purpose:
> According to the usual definition, fall into this category "property,
> equipment - including technology, software, know-how immaterial or
> intangible - that could have both civilian and military uses or may - wholly
> or partly - contribute to the development, production, handling, operation,
> maintenance, storage, detection, identification, dissemination of weapons of
> mass destruction '' (WMD - nuclear, biological, chemical , etc.).
>
>
>
>
>
>
>
>
>
>
>
> From: Celine Heerdt
> Sent: lundi 26 juin 2017 15:36
> To: users@kafka.apache.org
> Cc: Axelle Margot 
> Subject: ECCN code for Kafka 0.10.2.1
>
>
>
> Hi,
>
> We in Ericsson are interested in the SW Kafka 0.10.2.1 from Apache. In order
> to begin the trade compliance process, could you please send us the ECCN
> codes for that SW.
>
> Best regards
>
> Céline Heerdt
>
>
>
>
>
> [Ericsson]<http://www.ericsson.com/>
>
> Céline Heerdt
>
> Project Manager
>
> Ericsson
> 25 Avenue Carnot
> 91348 Massy, France
> Mobile +33 6 71 53 92 02
> celine.hee...@ericsson.com<mailto:celine.hee...@ericsson.com>
> www.ericsson.com<http://www.ericsson.com/>
>
>
>


Re: Hello, Help!

2017-07-07 Thread jan
Hi,
I'd is this the right place to ask about cockroachDB?

(well he started it, officer...)

jan

On 07/07/2017, David Garcia  wrote:
> “…events so timely that the bearing upon of which is not immediately
> apparent and are hidden from cognitive regard; the same so tardy, they
> herald apropos”
>
> On 7/7/17, 12:06 PM, "Marcelo Vinicius"  wrote:
>
> Hello, my name is Marcelo, and I am from Brazil. I'm doing a search on
> Kafka. I would like to know if this phrase: "Somehow I struggled
> against
> sensations that contained pure abstraction and no gesture directed at
> the
> present world", is it really kafka? If so, where do I find his phrase?
> In
> what text from kafka?
> Thank you!
>
> --
> *Marcelo Vinicius*
> Universidade Estadual de Feira de Santana - UEFS
> Facebook: www.facebook.com/marcelovinicius02
>
> "Não há poema em si, mas em mim ou em ti"
>
>
>


Re: Need clarification on Kafka Usage within our product..

2017-08-01 Thread jan
Don't know if it helps but <https://kafka.apache.org/> says at the bottom

" The contents of this website are © 2016 Apache Software Foundation
under the terms of the Apache License v2. Apache Kafka, Kafka, and the
Kafka logo are either registered trademarks or trademarks of The
Apache Software Foundation in the United States and other countries. "

Also this which is the original proposition and voting for a kafka
logo. Perhaps the proposer can be contacted; he seems to be the one
that designed the logo
<https://issues.apache.org/jira/browse/KAFKA-982>

You could checkout <https://svn.apache.org/repos/asf/kafka/> and see
if it has any statements on logo use.

Also top 3 hits of <https://www.google.co.uk/search?q=use+logo+apache>
sound promising but I've not looked at them.

Best I can suggest ATM

jan

On 01/08/2017, Sunil, Rinu  wrote:
> Including another mail id which I found online.   Kindly help in addressing
> the below query.
>
>
>
> Thanks,
>
> Rinu
>
>
>
> From: Sunil, Rinu
> Sent: Monday, July 31, 2017 7:19 PM
> To: 'users@kafka.apache.org' 
> Subject: Need clarification on Kafka Usage within our product..
> Importance: High
>
>
>
> Hi,
>
>
>
> I have a question regarding the usage of Apache Kafka logo within our
> product Unisys Data Exchange WorkBench Application.Team is working on
> enhancing the product to support Kafka as Data Manage Type with XSD message
> format along with other database types like SQL Server, DMSII etc...   To
> help users easily distinguish the Kafka XSD Database in the tree view we
> have used Kafka logo with a blue overlapping strip with an "x" character to
> indicate XSD message format.  Could you please verify the below image
> highlighted with yellow border and confirm if its ok to use?  I could not
> find Kafka logo compliance guidance online.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Thanks,
>
> Rinu
>
>


Re: How to clear a particular partition?

2017-08-13 Thread jan
I can't help you here but maybe can focus the question - why would you want to?

jan

On 10/08/2017, Sven Ludwig  wrote:
> Hello,
>
> assume that all producers and consumers regarding a topic-partition have
> been shutdown.
>
> Is it possible in this situation to empty that topic-partition, while the
> other topic-partitions keep working?
>
> Like for example, is it possible to trigger a log truncation to 0 on the
> leader for that partition using some admin tool?
>
> Kind Regards,
> Sven
>
>


Re: why kafka index file use memory mapped files ,however log file doesn't

2018-02-10 Thread jan
I'm not sure I can answer your question, but may I pose another in
return: why do you feel having a memory mapped log file would be a
good thing?


On 09/02/2018, YuFeng Shen  wrote:
> Hi Experts,
>
> We know that kafka use memory mapped files for it's index files ,however
> it's log files don't use the memory mapped files technology.
>
> May I know why index files use memory mapped files, however log files don't
> use the same technology?
>
>
> Jacky
>


Re: why kafka index file use memory mapped files ,however log file doesn't

2018-02-12 Thread jan
A human-readable log file is likely to have much less activity in it
(it was a year ago I was using kafka and we could eat up gigs for the
data files but the log files were a few meg). So there's perhaps
little to gain.

Also if the power isn't pulled and the OS doesn't crash, log messages
will be, I guess, buffered by the OS then written out as a full
buffer, or perhaps every nth tick if the buffer fills up very slowly.
So it's still reasonably efficient.

Adding a few hundred context switches a second for the human log
probably isn't a big deal. I remember seeing several tens of
thousands/sec  when using kafka (although it was other processes
running on those multicore machines to be fair). I guess logging
overhead is down in the noise, though that's just a guess.

Also I remember reading a rather surprising post about mmaping. Just
found it 
<https://lists.freebsd.org/pipermail/freebsd-questions/2004-June/050371.html>.
Sniplets:
"There are major hardware related overheads to the use of mmap(), on
*ANY* operating system, that cannot be circumvented"
-and-
"you are assuming that copying is always bad (it isn't), that copying
is always horrendously expensive (it isn't), that memory mapping is
always cheap (it isn't cheap),"

A bit vague on my part, but HTH anyway

jan


On 12/02/2018, YuFeng Shen  wrote:
> Hi jan ,
>
> I think the reason is the same as why index file using  memory mapped file.
>
> As the memory mapped file can avoid the data copy between user and kernel
> buffer space, so it can improve the performance for the index file IO
> operation ,right? If it is ,why Log file cannot achieve the same performance
> improvement as memory mapped index file?
>
>
> Jacky
>
>
> 
> From: jan 
> Sent: Saturday, February 10, 2018 8:33 PM
> To: users@kafka.apache.org
> Subject: Re: why kafka index file use memory mapped files ,however log file
> doesn't
>
> I'm not sure I can answer your question, but may I pose another in
> return: why do you feel having a memory mapped log file would be a
> good thing?
>
>
> On 09/02/2018, YuFeng Shen  wrote:
>> Hi Experts,
>>
>> We know that kafka use memory mapped files for it's index files ,however
>> it's log files don't use the memory mapped files technology.
>>
>> May I know why index files use memory mapped files, however log files
>> don't
>> use the same technology?
>>
>>
>> Jacky
>>
>


Re: why kafka index file use memory mapped files ,however log file doesn't

2018-02-13 Thread jan
1. Perhaps a human-readable log, being write-only, and which may
buffer on the user side or in the kernel, may be more efficient
because small writes are accumulated in a buffer (cheap) actually
pushed to disk (less cheap)? If you mmap'd this instead, how do you
feel it would behave?

2. Did you read the link to the post about mmapping? The guy knows
more about it than I'll probably ever know and he says it's not that
simple. He's saying mmap not a magic answer to anything.
This bit may be relevant: "APPLICATION BUFFERS WHICH EASILY FIT IN THE
L2 CACHE COST VIRTUALLY NOTHING ON A MODERN CPU!" (NB. post is from
2004 so with cache+cpu VS ram/disk discrepancies growing larger, it
may be more true).
Kafka messages can be largish so perhaps that suggests why they use it
for data files.

If this comes across as bit rude that wasn't intended. I can't really
answer your question, just suggest a bit of reading and some
guesswork.

cheers

jan

On 13/02/2018, YuFeng Shen  wrote:
>  If that is like what you said , why index file use the memory mapped file?
>
> 
> From: jan 
> Sent: Monday, February 12, 2018 9:26 PM
> To: users@kafka.apache.org
> Subject: Re: why kafka index file use memory mapped files ,however log file
> doesn't
>
> A human-readable log file is likely to have much less activity in it
> (it was a year ago I was using kafka and we could eat up gigs for the
> data files but the log files were a few meg). So there's perhaps
> little to gain.
>
> Also if the power isn't pulled and the OS doesn't crash, log messages
> will be, I guess, buffered by the OS then written out as a full
> buffer, or perhaps every nth tick if the buffer fills up very slowly.
> So it's still reasonably efficient.
>
> Adding a few hundred context switches a second for the human log
> probably isn't a big deal. I remember seeing several tens of
> thousands/sec  when using kafka (although it was other processes
> running on those multicore machines to be fair). I guess logging
> overhead is down in the noise, though that's just a guess.
>
> Also I remember reading a rather surprising post about mmaping. Just
> found it
> <https://lists.freebsd.org/pipermail/freebsd-questions/2004-June/050371.html>.
> Sniplets:
> "There are major hardware related overheads to the use of mmap(), on
> *ANY* operating system, that cannot be circumvented"
> -and-
> "you are assuming that copying is always bad (it isn't), that copying
> is always horrendously expensive (it isn't), that memory mapping is
> always cheap (it isn't cheap),"
>
> A bit vague on my part, but HTH anyway
>
> jan
>
>
> On 12/02/2018, YuFeng Shen  wrote:
>> Hi jan ,
>>
>> I think the reason is the same as why index file using  memory mapped
>> file.
>>
>> As the memory mapped file can avoid the data copy between user and kernel
>> buffer space, so it can improve the performance for the index file IO
>> operation ,right? If it is ,why Log file cannot achieve the same
>> performance
>> improvement as memory mapped index file?
>>
>>
>> Jacky
>>
>>
>> 
>> From: jan 
>> Sent: Saturday, February 10, 2018 8:33 PM
>> To: users@kafka.apache.org
>> Subject: Re: why kafka index file use memory mapped files ,however log
>> file
>> doesn't
>>
>> I'm not sure I can answer your question, but may I pose another in
>> return: why do you feel having a memory mapped log file would be a
>> good thing?
>>
>>
>> On 09/02/2018, YuFeng Shen  wrote:
>>> Hi Experts,
>>>
>>> We know that kafka use memory mapped files for it's index files ,however
>>> it's log files don't use the memory mapped files technology.
>>>
>>> May I know why index files use memory mapped files, however log files
>>> don't
>>> use the same technology?
>>>
>>>
>>> Jacky
>>>
>>
>


Re: Documentation/Article/Case Study on Scala as the Kafka Backbone Language

2018-07-28 Thread jan
I'm not a scala expert and haven't touched it for 18 months, but with
respect to Mr. Singh, I'd like to clarify or question a few of his
points.

1. Statelessness is a tool; not an end in itself but a means to an
end. As someone on HackerNews says, "control your state space or die",
but the same guy is *not* saying remove it all. I've seen immutability
overused. Sometimes a bit of state makes things both more
comprehensible and faster.

2. I don't know, and 3. true,

4. @Rahul, I don't understand, can you clarify?

5. Largely true and a huge bonus when used appropriately, but it can
be overused. Sometimes it seems emphasis on "helpful" syntactic
formatting without asking whether it actually helps the programmer.

6. Sounds like you've had more experience with them than me! Perhaps I
don't know how to use them appropriately. I may be missing a trick.

7. I wouldn't argue but I'd warn that some abstractions can be
expensive and I suspect shapeless may be one. Also, for parsers may I
suggest looking at ANTLR?

Idiomatic scala code can be expensive *as curremtly implemented*. Just
understand that cost by profiling, and de-idiomise in hot code as
needed.

It's a fab language.

jan

On 23/07/2018, Rahul Singh  wrote:
> Not necessarily for Kafka, but you can definitely google “Java vs. Scala”
> and find a variety of reasons . I did a study for a client and ultimately
> here are the major reasons I found :
>
> 1. Functional programming language which leads itself to stateless systems
> 2. Better / easier to use stream processing syntax (then at that time in
> Java 8)
> 3. REPL available to quickly test functionality interactively.
> 4. Case classes which can be inferred with or without strongly typed cases.
> 5. Abilty to quickly create DSLs that seem natural to developers
> 6. Awesome partial function syntax
> 7. My personal favorite — as I was using parboiled2 to build a parser —
> libraries like shapeless
>
> Best
>
> --
> Rahul Singh
> rahul.si...@anant.us
>
> Anant Corporation
> On Jul 23, 2018, 8:40 AM -0400, M. Manna , wrote:
>> Hello,
>>
>> Is anyone aware of any links or website where I can find information/case
>> study etc. to why Scala was the best choice for kafka design? I hope this
>> is not too much of a "Naive" question since I have had a very humble
>> introduction to Scala.
>>
>> I understand that Scala is considered where distributed/scalable systems
>> need to be designed. Also, in some cases it reduces multiple complex
>> statements to be formed using a single complex statements i.e. reduce
>> incremental verbosity.
>>
>> So, as a person who has background in Java, but relatively novice in
>> Scala,
>> I wanted to understand whether a study document exists to document the
>> choice?
>>
>> Regards,
>


Re: Kafka on Windows

2018-08-07 Thread jan
I tried using it just for learning a while back and wasted 3 days
because it's not supported on windows. Even basic stuff didn't work. I
did read the docs first!

I think I've seen other people on this list have questions
about/problems for exactly the same reason, and that could be a lot of
time saved if it was in the docs - it needs to be. So how do I ask the
maintainers to put 'No, Not Windows" in there?
Serious question.

I resent losing 3 days of work because of essential missing info. It
sounds like (compared to @M. Manna) that I got off lightly.

So can we put a clear caveat in the documentation, please, right at the top?

jan

On 07/08/2018, M. Manna  wrote:
> The answer is - Absolutely not. If you don’t have Linux rack, or Kubernetes
> deployment  -it will not work on Windows as guaranteed.
>
> I know this because I have tried to make it work for the past 1 year. File
> handling always fails and crashes the cluster on Windows.
>
> Thanks,
>
>
>
> On Tue, 7 Aug 2018 at 01:54, Alew  wrote:
>
>> Hi!
>>
>> Is it recommended to use production Kafka cluster on Windows?
>>
>> Can't get it from the documentation. It is possible to start Kafka on
>> Windows, but maybe it's for development purposes only.
>>
>> Thanks.
>>
>>
>>
>>
>


Re: Kafka on Windows

2018-08-07 Thread jan
This is an excellent suggestion and I intend to do so henceforth
(thanks!), but it would be an adjunct to my request rather than the
answer; it still needs to be made clear in the docs/faq that you
*can't* use windows directly.

jan

On 07/08/2018, Rahul Singh  wrote:
> I would recommend using Docker — it would end up being run on a Linux kernel
> VM on windows and is easier to get started on with a bit of learning curve
> for Docker. Less time wasted overall and at least at that point you would
> know Docker.
>
> Rahul
> On Aug 7, 2018, 4:50 AM -0400, jan , wrote:
>> I tried using it just for learning a while back and wasted 3 days
>> because it's not supported on windows. Even basic stuff didn't work. I
>> did read the docs first!
>>
>> I think I've seen other people on this list have questions
>> about/problems for exactly the same reason, and that could be a lot of
>> time saved if it was in the docs - it needs to be. So how do I ask the
>> maintainers to put 'No, Not Windows" in there?
>> Serious question.
>>
>> I resent losing 3 days of work because of essential missing info. It
>> sounds like (compared to @M. Manna) that I got off lightly.
>>
>> So can we put a clear caveat in the documentation, please, right at the
>> top?
>>
>> jan
>>
>> On 07/08/2018, M. Manna  wrote:
>> > The answer is - Absolutely not. If you don’t have Linux rack, or
>> > Kubernetes
>> > deployment -it will not work on Windows as guaranteed.
>> >
>> > I know this because I have tried to make it work for the past 1 year.
>> > File
>> > handling always fails and crashes the cluster on Windows.
>> >
>> > Thanks,
>> >
>> >
>> >
>> > On Tue, 7 Aug 2018 at 01:54, Alew  wrote:
>> >
>> > > Hi!
>> > >
>> > > Is it recommended to use production Kafka cluster on Windows?
>> > >
>> > > Can't get it from the documentation. It is possible to start Kafka on
>> > > Windows, but maybe it's for development purposes only.
>> > >
>> > > Thanks.
>> > >
>> > >
>> > >
>> > >
>> >
>


Re: Kafka on Windows

2018-08-07 Thread jan
@Jacob Sheck: It was 18 months ago so I don't recall but I was told
clearly windows ain't supported when I reported problems.

I don't know if my problems were down to my inexperience or my use of
windows, but I basically assuming I was controlling for one variable
(n00bish inexperience) but was informed I now had two (n00bness, and
an unsupported platform).

tl;dr if it doesn't work on X, we need to say so clearly. It's just...
good manners, surely?

cheers

jan


On 07/08/2018, M. Manna  wrote:
> By fully broken, i mean not designed and tested to work on Windows.
>
> On Tue, 7 Aug 2018, 16:34 M. Manna,  wrote:
>
>> Log Cleaner functionality is fully broken... If you haven't tried that
>> already.
>>
>>
>>
>> On 7 Aug 2018 4:24 pm, "Jacob Sheck"  wrote:
>>
>> While I agree that it is less frustrating to run Kafka on Linux, I am
>> interested to hear what specific issues you are running into on windows?
>>
>>
>> On Tue, Aug 7, 2018 at 9:42 AM jan 
>> wrote:
>>
>> > This is an excellent suggestion and I intend to do so henceforth
>> > (thanks!), but it would be an adjunct to my request rather than the
>> > answer; it still needs to be made clear in the docs/faq that you
>> > *can't* use windows directly.
>> >
>> > jan
>> >
>> > On 07/08/2018, Rahul Singh  wrote:
>> > > I would recommend using Docker — it would end up being run on a Linux
>> > kernel
>> > > VM on windows and is easier to get started on with a bit of learning
>> > curve
>> > > for Docker. Less time wasted overall and at least at that point you
>> would
>> > > know Docker.
>> > >
>> > > Rahul
>> > > On Aug 7, 2018, 4:50 AM -0400, jan ,
>> > wrote:
>> > >> I tried using it just for learning a while back and wasted 3 days
>> > >> because it's not supported on windows. Even basic stuff didn't work.
>> > >> I
>> > >> did read the docs first!
>> > >>
>> > >> I think I've seen other people on this list have questions
>> > >> about/problems for exactly the same reason, and that could be a lot
>> > >> of
>> > >> time saved if it was in the docs - it needs to be. So how do I ask
>> > >> the
>> > >> maintainers to put 'No, Not Windows" in there?
>> > >> Serious question.
>> > >>
>> > >> I resent losing 3 days of work because of essential missing info. It
>> > >> sounds like (compared to @M. Manna) that I got off lightly.
>> > >>
>> > >> So can we put a clear caveat in the documentation, please, right at
>> the
>> > >> top?
>> > >>
>> > >> jan
>> > >>
>> > >> On 07/08/2018, M. Manna  wrote:
>> > >> > The answer is - Absolutely not. If you don’t have Linux rack, or
>> > >> > Kubernetes
>> > >> > deployment -it will not work on Windows as guaranteed.
>> > >> >
>> > >> > I know this because I have tried to make it work for the past 1
>> year.
>> > >> > File
>> > >> > handling always fails and crashes the cluster on Windows.
>> > >> >
>> > >> > Thanks,
>> > >> >
>> > >> >
>> > >> >
>> > >> > On Tue, 7 Aug 2018 at 01:54, Alew  wrote:
>> > >> >
>> > >> > > Hi!
>> > >> > >
>> > >> > > Is it recommended to use production Kafka cluster on Windows?
>> > >> > >
>> > >> > > Can't get it from the documentation. It is possible to start
>> > >> > > Kafka
>> > on
>> > >> > > Windows, but maybe it's for development purposes only.
>> > >> > >
>> > >> > > Thanks.
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> >
>> > >
>> >
>>
>>
>>
>


Re: Kafka on Windows

2018-09-05 Thread jan
Hi Liam,
as a DB guy  that does MSSQL (on windows, obviously) I literally have
no idea what a .sh file is,or what that would imply. I guess it's bash
so yeah.
But what's so terrifying about actually stating in the FAQ that you
should not run it as production on windows?
Why should I conclude that kafka on windows, with java (which is
supported on windows), and a filesystem (which windows has), and
reliable networking (ditto), doesn't work?
When is "just read the code" ever an acceptable answer?
Why should I lose 3 days just because someone couldn't put "don't do
it" in the FAQ (which I did read), and also have other people need to
ask here whether windows is supported?
Why?
This is just nuts.

cheers

jan


On 05/09/2018, Liam Clarke  wrote:
> Hi Jan,
>
> I'd presume that downloading an archive and seeing a bunch of .sh files
> would imply that Kafka wasn't built to run on Windows.
>
> Given that it's a cluster based technology, I'd assume that it wouldn't be
> supported for Windows given that most server based stuff isn't unless
> Microsoft built it.
>
> Kind regards,
>
> Liam Clarke
>
> On Wed, 8 Aug. 2018, 2:42 am jan,  wrote:
>
>> This is an excellent suggestion and I intend to do so henceforth
>> (thanks!), but it would be an adjunct to my request rather than the
>> answer; it still needs to be made clear in the docs/faq that you
>> *can't* use windows directly.
>>
>> jan
>>
>> On 07/08/2018, Rahul Singh  wrote:
>> > I would recommend using Docker — it would end up being run on a Linux
>> kernel
>> > VM on windows and is easier to get started on with a bit of learning
>> curve
>> > for Docker. Less time wasted overall and at least at that point you
>> > would
>> > know Docker.
>> >
>> > Rahul
>> > On Aug 7, 2018, 4:50 AM -0400, jan ,
>> wrote:
>> >> I tried using it just for learning a while back and wasted 3 days
>> >> because it's not supported on windows. Even basic stuff didn't work. I
>> >> did read the docs first!
>> >>
>> >> I think I've seen other people on this list have questions
>> >> about/problems for exactly the same reason, and that could be a lot of
>> >> time saved if it was in the docs - it needs to be. So how do I ask the
>> >> maintainers to put 'No, Not Windows" in there?
>> >> Serious question.
>> >>
>> >> I resent losing 3 days of work because of essential missing info. It
>> >> sounds like (compared to @M. Manna) that I got off lightly.
>> >>
>> >> So can we put a clear caveat in the documentation, please, right at
>> >> the
>> >> top?
>> >>
>> >> jan
>> >>
>> >> On 07/08/2018, M. Manna  wrote:
>> >> > The answer is - Absolutely not. If you don’t have Linux rack, or
>> >> > Kubernetes
>> >> > deployment -it will not work on Windows as guaranteed.
>> >> >
>> >> > I know this because I have tried to make it work for the past 1
>> >> > year.
>> >> > File
>> >> > handling always fails and crashes the cluster on Windows.
>> >> >
>> >> > Thanks,
>> >> >
>> >> >
>> >> >
>> >> > On Tue, 7 Aug 2018 at 01:54, Alew  wrote:
>> >> >
>> >> > > Hi!
>> >> > >
>> >> > > Is it recommended to use production Kafka cluster on Windows?
>> >> > >
>> >> > > Can't get it from the documentation. It is possible to start Kafka
>> on
>> >> > > Windows, but maybe it's for development purposes only.
>> >> > >
>> >> > > Thanks.
>> >> > >
>> >> > >
>> >> > >
>> >> > >
>> >> >
>> >
>>
>


Re: Kafka on Windows

2018-09-05 Thread jan
@M.Manna - thanks for your reply.
I did read that document but clearly didn't digest that bit properly.


@Liam Clarke
I don't expect code's users (either maintainers or end-users) to need
to deduce *anything*. If it needs knowing, I documented it.
I'll send you an actual example so you know it's not all talk.

>your 3 days you expended on learning the hard way that Windows is the minority 
>market share in servers

I had been accepted for a job which was to be a major career change.
For this I had to learn some basic linux (no, I've still never written
a bash script), scala (in depth) and kafka, and spark as well if I
could fit it in. IIRC I had less than 4 weeks to do it. To lose those
3 days mattered.

> you're resenting a free open source project?
Hardly, just the pointless loss of time.
I also try to add back to the community rather than just take. I had
been working on ANTLR stuff (which I'll return to when bits of me stop
hurting) and currently am trying to suss if permutation encoding can
be done from L1 cache  for large permutations in less than a single
DRAM access time. Look up cuckoo filters to see why.

jan

On 05/09/2018, Liam Clarke  wrote:
> Hi Jan,
>
> .sh is not .bat and easily googleable.
>
> Nothing is terrifying about updating the documentation, I just took umbrage
> at you "resenting" your 3 days you expended on learning the hard way that
> Windows is the minority market share in servers - you're resenting a free
> open source project? Because you don't know what a .sh file signifies?
>
> I'm sorry mate, but that's on you. I code for *nix but I know what .bat or
> .ps1 means.
>
> And given that 99.99% of FOSS is not built for Windows, you should always
> assume the worst unless you downloaded it from Codeplex or via NuGet.
>
> Consider those three days valuable learning instead.
>
> Kind regards,
>
> Liam Clarke
>
> On Wed, 5 Sep. 2018, 9:22 pm jan,  wrote:
>
>> Hi Liam,
>> as a DB guy  that does MSSQL (on windows, obviously) I literally have
>> no idea what a .sh file is,or what that would imply. I guess it's bash
>> so yeah.
>> But what's so terrifying about actually stating in the FAQ that you
>> should not run it as production on windows?
>> Why should I conclude that kafka on windows, with java (which is
>> supported on windows), and a filesystem (which windows has), and
>> reliable networking (ditto), doesn't work?
>> When is "just read the code" ever an acceptable answer?
>> Why should I lose 3 days just because someone couldn't put "don't do
>> it" in the FAQ (which I did read), and also have other people need to
>> ask here whether windows is supported?
>> Why?
>> This is just nuts.
>>
>> cheers
>>
>> jan
>>
>>
>> On 05/09/2018, Liam Clarke  wrote:
>> > Hi Jan,
>> >
>> > I'd presume that downloading an archive and seeing a bunch of .sh files
>> > would imply that Kafka wasn't built to run on Windows.
>> >
>> > Given that it's a cluster based technology, I'd assume that it wouldn't
>> be
>> > supported for Windows given that most server based stuff isn't unless
>> > Microsoft built it.
>> >
>> > Kind regards,
>> >
>> > Liam Clarke
>> >
>> > On Wed, 8 Aug. 2018, 2:42 am jan, 
>> wrote:
>> >
>> >> This is an excellent suggestion and I intend to do so henceforth
>> >> (thanks!), but it would be an adjunct to my request rather than the
>> >> answer; it still needs to be made clear in the docs/faq that you
>> >> *can't* use windows directly.
>> >>
>> >> jan
>> >>
>> >> On 07/08/2018, Rahul Singh  wrote:
>> >> > I would recommend using Docker — it would end up being run on a
>> >> > Linux
>> >> kernel
>> >> > VM on windows and is easier to get started on with a bit of learning
>> >> curve
>> >> > for Docker. Less time wasted overall and at least at that point you
>> >> > would
>> >> > know Docker.
>> >> >
>> >> > Rahul
>> >> > On Aug 7, 2018, 4:50 AM -0400, jan ,
>> >> wrote:
>> >> >> I tried using it just for learning a while back and wasted 3 days
>> >> >> because it's not supported on windows. Even basic stuff didn't
>> >> >> work.
>> I
>> >> >> did read the docs first!
>> >> >>
>> >> >> I think I

Re: Please explain Rest API

2018-11-30 Thread jan
I may have missed this (I'm missing the first few messages), so sorry
in advance if I have, but what OS are you using?
Kafka does not work well on windows, I had problems using it that
sounds a little like this (just a little though) when on win.

jan

On 30/11/2018, Satendra Pratap Singh  wrote:
> Hi Sönke,
>
> when topic got created so i tried to read the topic data using console
> consumer but it didn't worked out consumer didn't consumer a single
> message. i read the log and got this every time. i don't understand where m
> i making mistake.
>
>
> [\!�qg_z��g_z�(�"{"error":{"name":"Error","status":404,"message":"Shared
> class  \"Dia\" has no method handling GET
> /getDia","statusCode":404,"stack":"Error: Shared class  \"Dia\" has no
> method handling GET /getDia\nat restRemoteMethodNotFound
> (/Users/satendra/Api/node_modules/loopback/node_modules/strong-remoting/lib/rest-adapter.js:371:17)\n
>   at Layer.handle [as handle_request]
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/layer.js:95:5)\n
>   at trim_prefix
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:317:13)\n
>   at
> /Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:284:7\n
>   at Function.process_params
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:335:12)\n
>   at next
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:275:10)\n
>   at Function.handle
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:174:3)\n
>   at router
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:47:12)\n
>   at Layer.handle [as handle_request]
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/layer.js:95:5)\n
>   at trim_prefix
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:317:13)\n
>   at
> /Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:284:7\n
>   at Function.process_params
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:335:12)\n
>   at next
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:275:10)\n
>   at jsonParser
> (/Users/satendra/analyticsApi/node_modules/body-parser/lib/types/json.js:109:7)\n
>   at Layer.handle [as handle_request]
> (/Users/satendra/analyticsApi/node_modules/loopback/node_modules/express/lib/router/layer.js:95:5)\n
>   at trim_prefix
> (/Users/satendra/analyticsApi/node_modules/loopback/node_modules/express/lib/router/index.js:317:13)"}}@access-control-allow-credentials["true"]connection["keep-alive"]content-typeF["application/json;
> charset=utf-8"dateB["Thu, 29 Nov 2018 12:38:59
> GMT"etagR["W/\"89a-6BIHHr4YrwjSfEHOpFjIThnXbCY\""]"transfer-encoding["chunked"vary6["Origin,
> Accept-Encoding"],x-content-type-options["nosniff"]$x-download-options["noopen"]x-frame-options["DENY"]
> x-xss-protection"["1; mode=block"]
>
> On Fri, Nov 30, 2018 at 2:30 AM Sönke Liebau
>  wrote:
>
>> Hi Satendra,
>>
>> if  I understand correctly you are using the RestSourceConnector to pull
>> data into a Kafka topic and that seems to work - at least as far as the
>> topic getting created.
>>
>> But are you saying that you tried to read data with 'cat filename.log' on
>> a
>> file within your kafka data directory? I assume I misunderstood this
>> part,
>> can you please add some detail around what you did there?
>>
>> Best regards,
>> Sönke
>>
>> On Thu, Nov 29, 2018 at 7:27 PM Satendra Pratap Singh <
>> sameerp...@gmail.com>
>> wrote:
>>
>> > Hi Sönke,
>> >
>> > I have single node kafka cluster, i have setup web server locally, many
>> > apps are sending logs to this server. I have Rest API which
>> > send/receive
>> > client request to/from server.
>> >
>> > here is my connector configuration : curl -i -X POST -H
>> > "Accept:application/json"  -H  "Content-Type: application/json"
>> > http://$CONNECT_HOST_SOURCE:8083/connectors/
>> > -d '{ "name": "source_rest_telemetry_data”, "config": {
>> > "key.converter":"org.apache.kafka.connect.storage.StringConverter",
>> > "value.converter"

Re: Advice for Kafka project in Africa...

2020-09-08 Thread jan
Everything's down to requirements. I'm unclear on yours. I tried to
look at your website to see if I could pick up any clues but upsail.co
(and just in case, upsail.com) don't exist, so I dunno.

Some questions, rhetorical really:

Is there any reason a standard SQL database would not do - what does
kafka offer that you think is useful? I presume you know SQL already,

What are you trying to do, more precisely? Do you have a clear idea of
the inputs and outputs? What is the amount of data you expect to
process? Is there some tight time limit?

What is your uptime requirement? What is your budget?

>From my experience, mentioning big data at this point is a major red
flag. Make sure you really can't do it on a laptop or a server (both
in money and business requirements) before you start scaling out
because it gets messy. A decent server can crunch a *lot* of data if
well configured.

Also consider your infrastructure such as power supplies and
networking protection as Africa may not be too stable in those
regards. Also perhaps physical protection.
If you skip your own hardware and go for 'the cloud' then you are
totally in the hands of the networking gods.

These are points to mull over. Doubt I can suggest anything further. Good luck.

jan

On 02/09/2020, cedric sende lubuele  wrote:
> Let me introduce myself, my name is Cedric and I am a network engineer
> passionate about new technologies and as part of my new activity, I am
> interested in Big Data. Currently, I live in Africa (Congo) and as everyone
> knows, Africa is very late in terms of IT infrastructure (the Cloud is
> difficult, we work a lot on premise).
>
> While doing some research, I came across Kai Waehner's article (Kafka
> replace
> database?<https://www.kai-waehner.de/blog/2020/03/12/can-apache-kafka-replace-database-acid-storage-%20transactions-sql-nosql-data-lake%20/>)
> and I would like to be able to get an idea about the possibilities of
> Kafka.
>
> Let me explain, I am working on a project for integrating several databases
> (MySQL, MongoDB, Neo4j, ...) and I have to develop with my team, an alert
> system which must detect anomalies on different criteria linked to a person
> in the various departments of the company.
> Would Kafka be a good solution in order to centralize all the data and
> create several analysis scripts to detect an anomaly and send back an alert
> message such as for example a suspect wanted by the police?
>
> Thank you in advance
>
>
>
> Sende Cedric / Network IT
> sende.ced...@hotmail.com<mailto:sende.ced...@hotmail.com> / 082/8446954
>
> UPSAIL GROUP
> http://upsail.co/<https://htmlsig.com/t/01BFBBXF>
>
> [http://upsail.co/wp-content/themes/upsail/images/logo.png]
>


Re: Support for Uni-directional data-diode?

2020-12-22 Thread jan
Dunno if it helps (if in doubt, probably not) but a search for the
term gets some useful articles (inc.
<https://en.wikipedia.org/wiki/Unidirectional_network>) and a company
<https://owlcyberdefense.com/blog/what-is-data-diode-technology-how-does-it-work/>
who may be worth contacting (I'm not affiliated in any way).

The first question I'd ask myself is, would a burn-to-dvd solution
work? Failing that, basic stuff like email?
In any case, what if the data's corrupted, how can the server's detect
and re-request? What are you protecting against exactly? Stuff like
that.

jan

On 22/12/2020, Danny - Terafence  wrote:
> Hello,
>
> Merry Christmas,
>
> My name is Danny Michaeli, I am Terafence’s Technical Services Manager.
>
> One of our customers is using KAFKA to gather ICS SEIM data to collect and
> forward to AI servers.
>
> They have requested us to propose a uni-directional solution to avoid being
> exposed from the AI server site.
>
> Can you, please advise as to if and how can this be done?
>
> B. Regards,
>
> Danny Michaeli
> Technical Services Manager
> [Logo]
> Tel.: +972-73-3791191
> Cell: +972-52-882-3108
>
>


Re: Support for Uni-directional data-diode?

2020-12-24 Thread jan
It might be best to do a web search for companies that know this stuff
 and speak to them.

re. kafka over UDP I dunno but perhaps instead do normal kafka talking
to a proxy machine via TCP and have that proxy forward traffic via
UDP.
If that works, would simplify the problem I guess.

cheers

jan

On 23/12/2020, Danny - Terafence  wrote:
> Thank you Jan,
>
> The aim is to secure the sending side infrastructure and assets. Deny any
> known and unkown attacks from the "outside" while maintaining real-time data
> flowing outbound.
> Data integrity may be maintained in various ways if the forwarded protocol
> has such options.
>
> I wonder if KAFKA can run over UDP... for starters..
>
> Anyone knows?
>
> On Dec 22, 2020 23:25, jan  wrote:
> Dunno if it helps (if in doubt, probably not) but a search for the
> term gets some useful articles (inc.
> <https://en.wikipedia.org/wiki/Unidirectional_network>) and a company
> <https://owlcyberdefense.com/blog/what-is-data-diode-technology-how-does-it-work/>
> who may be worth contacting (I'm not affiliated in any way).
>
> The first question I'd ask myself is, would a burn-to-dvd solution
> work? Failing that, basic stuff like email?
> In any case, what if the data's corrupted, how can the server's detect
> and re-request? What are you protecting against exactly? Stuff like
> that.
>
> jan
>
> On 22/12/2020, Danny - Terafence  wrote:
>> Hello,
>>
>> Merry Christmas,
>>
>> My name is Danny Michaeli, I am Terafence’s Technical Services Manager.
>>
>> One of our customers is using KAFKA to gather ICS SEIM data to collect
>> and
>> forward to AI servers.
>>
>> They have requested us to propose a uni-directional solution to avoid
>> being
>> exposed from the AI server site.
>>
>> Can you, please advise as to if and how can this be done?
>>
>> B. Regards,
>>
>> Danny Michaeli
>> Technical Services Manager
>> [Logo]
>> Tel.: +972-73-3791191
>> Cell: +972-52-882-3108
>>
>>
>


possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread jan
Hi all, I'm something of a kafka n00b.
I posted the following in the  google newsgroup, haven't had a reply
or even a single read so I'll try here. My original msg, slightly
edited, was:



(windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
server, latest version of java)

I've spent several days trying to sort out unexpected behaviour
involving kafka and the kafka console producer and consumer.

 If I set  the console produced and console consumer to look at the
same topic then I can type lines into the producer window and see them
appear in the consumer window, so it works.

If I try to pipe in large amounts of data to the producer, some gets
lost and the producer reports errors eg.

[2017-04-17 18:14:05,868] ERROR Error when sending message to topic
big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
(org.apache.kafka.clients.
producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 8
record(s) expired due to timeout while requesting metadata from
brokers for big_ptns1_repl1_nozip-0

I'm using as input a file either shakespeare's full works (about 5.4
meg ascii), or a much larger file of shakespear's full works
replicated 900 times to make it about 5GB. Lines are ascii and short,
and each line should be a single record when read in by the console
producer. I need to do some benchmarking on time and space and this
was my first try.

As mentioned, data gets lost. I presume it is expected that any data
we pipe into the producer should arrive in the consumer, so if I do
this in one windows console:

kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic
big_ptns1_repl1_nozip --zookeeper localhost:2181 >
F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt

and this in another:

kafka-console-producer.bat --broker-list localhost:9092  --topic
big_ptns1_repl1_nozip <
F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt

then the output file "single_all_shakespear_OUT.txt" should be
identical to the input file "complete_works_no_bare_lines.txt" except
it's not. For the complete works (sabout 5.4 meg uncompressed) I lost
about 130K in the output.
For the replicated shakespeare, which is about 5GB, I lost about 150 meg.

This can't be right surely and it's repeatable but happens at
different places in the file when errors start to be produced, it
seems.

I've done this using all 3 versions of kafak in the 0.10.x.y branch
and I get the same problem (the above commands were using the 0.10.0.0
branch so they look a little obsolete but they are right for that
branch I think). It's cost me some days.
So, am I making a mistake, if so what?

thanks

jan


Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread jan
Hi Serega,

> data didn't reach producer. So why should data appear in consumer?

err, isn't it supposed to? Isn't the loss of data a very serious error?

> loss rate is more or less similar [...] Not so bad.

That made me laugh at least.  Is kafka intended to be a reliable
message delivery system, or is a 2% data loss officially acceptable?

I've been reading the other threads and one says windows is really not
supported, and certainly not for production. Perhaps that's the root
of it. Well I'm hoping to try it on linux shortly so I'll see if I can
replicate the issue but I would like to know whether it *should* work
in windows.

cheers

jan

On 18/04/2017, Serega Sheypak  wrote:
> Hi,
>
> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> (org.apache.kafka.clients.
> producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> record(s) expired due to timeout while requesting metadata from
> brokers for big_ptns1_repl1_nozip-0
>
> data didn't reach producer. So why should data appear in consumer?
> loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03% (150mb /
> 5000gb) Not so bad.
>
>
> 2017-04-18 21:46 GMT+02:00 jan :
>
>> Hi all, I'm something of a kafka n00b.
>> I posted the following in the  google newsgroup, haven't had a reply
>> or even a single read so I'll try here. My original msg, slightly
>> edited, was:
>>
>> 
>>
>> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
>> server, latest version of java)
>>
>> I've spent several days trying to sort out unexpected behaviour
>> involving kafka and the kafka console producer and consumer.
>>
>>  If I set  the console produced and console consumer to look at the
>> same topic then I can type lines into the producer window and see them
>> appear in the consumer window, so it works.
>>
>> If I try to pipe in large amounts of data to the producer, some gets
>> lost and the producer reports errors eg.
>>
>> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
>> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
>> (org.apache.kafka.clients.
>> producer.internals.ErrorLoggingCallback)
>> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
>> record(s) expired due to timeout while requesting metadata from
>> brokers for big_ptns1_repl1_nozip-0
>>
>> I'm using as input a file either shakespeare's full works (about 5.4
>> meg ascii), or a much larger file of shakespear's full works
>> replicated 900 times to make it about 5GB. Lines are ascii and short,
>> and each line should be a single record when read in by the console
>> producer. I need to do some benchmarking on time and space and this
>> was my first try.
>>
>> As mentioned, data gets lost. I presume it is expected that any data
>> we pipe into the producer should arrive in the consumer, so if I do
>> this in one windows console:
>>
>> kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic
>> big_ptns1_repl1_nozip --zookeeper localhost:2181 >
>> F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt
>>
>> and this in another:
>>
>> kafka-console-producer.bat --broker-list localhost:9092  --topic
>> big_ptns1_repl1_nozip <
>> F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt
>>
>> then the output file "single_all_shakespear_OUT.txt" should be
>> identical to the input file "complete_works_no_bare_lines.txt" except
>> it's not. For the complete works (sabout 5.4 meg uncompressed) I lost
>> about 130K in the output.
>> For the replicated shakespeare, which is about 5GB, I lost about 150 meg.
>>
>> This can't be right surely and it's repeatable but happens at
>> different places in the file when errors start to be produced, it
>> seems.
>>
>> I've done this using all 3 versions of kafak in the 0.10.x.y branch
>> and I get the same problem (the above commands were using the 0.10.0.0
>> branch so they look a little obsolete but they are right for that
>> branch I think). It's cost me some days.
>> So, am I making a mistake, if so what?
>>
>> thanks
>>
>> jan
>>
>


Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread jan
Thanks to both of you. Some quick points:

I'd expect there to be backpressure from the producer if the broker is
busy ie. the broker would not respond to the console producer if the
broker was too busy accept more messages, and the producer would hang
on the socket. Alternatively I'd hope the console producer would have
the sense to back off and retry but clearly(?) not.
This behaviour is actually relevant to my old job so I need to know more.

Perhaps the timeout mentioned in the error msg can just be upped?

*Is* the claimed timeout relevant?
> Batch containing 8 record(s) expired due to timeout while requesting metadata 
> from brokers for big_ptns1_repl1_nozip-0

Why is the producer expiring records?

But I'm surprised this happened because my setup is one machine with
everything running on it. No network. Also Kafka writes to the disk
without an fsync (or its equivalent on windows) which means it just
gets cached in ram before being lazily written to disk, and I've got
plenty of ram - 16GB ram vs 5GB of input file. Kafka adds its overhead
so it grows to ~8GB but still, it need not hit disk at all (and the
file goes into the windows memory, not java's).
Maybe it is GC holding things up but I dunno, GC even for a second or
two should not cause a socket failure, just delay the read, though I'm
not an expert on this *at all*.

I'll go over the answers tomorrow more carefully but thanks anyway!

cheers

jan

On 18/04/2017, Serega Sheypak  wrote:
>> err, isn't it supposed to? Isn't the loss of data a very serious error?
> Kafka can't fix networking issues like latencies, blinking, unavailability
> or any other weird stuff. Kafka promises you to persist data if data
> reaches Kafka. Data delivery responsibility to kafka is on your side. You
> fail to do it according to logs.
>
> 0.02% not 2%
> You should check broker logs to figure out what went wrong. All things
> happen on one machine as far as I understand. Maybe your brokers don't have
> enough mem and they stuck because of GC and don't respond to producer.
> Async producer fails to send data. That is why you observe data loss on
> consumer side.
>
>
> 2017-04-18 23:32 GMT+02:00 jan :
>
>> Hi Serega,
>>
>> > data didn't reach producer. So why should data appear in consumer?
>>
>> err, isn't it supposed to? Isn't the loss of data a very serious error?
>>
>> > loss rate is more or less similar [...] Not so bad.
>>
>> That made me laugh at least.  Is kafka intended to be a reliable
>> message delivery system, or is a 2% data loss officially acceptable?
>>
>> I've been reading the other threads and one says windows is really not
>> supported, and certainly not for production. Perhaps that's the root
>> of it. Well I'm hoping to try it on linux shortly so I'll see if I can
>> replicate the issue but I would like to know whether it *should* work
>> in windows.
>>
>> cheers
>>
>> jan
>>
>> On 18/04/2017, Serega Sheypak  wrote:
>> > Hi,
>> >
>> > [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
>> > big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
>> > (org.apache.kafka.clients.
>> > producer.internals.ErrorLoggingCallback)
>> > org.apache.kafka.common.errors.TimeoutException: Batch containing 8
>> > record(s) expired due to timeout while requesting metadata from
>> > brokers for big_ptns1_repl1_nozip-0
>> >
>> > data didn't reach producer. So why should data appear in consumer?
>> > loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03% (150mb
>> > /
>> > 5000gb) Not so bad.
>> >
>> >
>> > 2017-04-18 21:46 GMT+02:00 jan :
>> >
>> >> Hi all, I'm something of a kafka n00b.
>> >> I posted the following in the  google newsgroup, haven't had a reply
>> >> or even a single read so I'll try here. My original msg, slightly
>> >> edited, was:
>> >>
>> >> 
>> >>
>> >> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
>> >> server, latest version of java)
>> >>
>> >> I've spent several days trying to sort out unexpected behaviour
>> >> involving kafka and the kafka console producer and consumer.
>> >>
>> >>  If I set  the console produced and console consumer to look at the
>> >> same topic then I can type lines into the producer window and see them
>> >> appear in the consumer window, so it works.
>> >>
>> >> If I try 

Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-19 Thread jan
@Robert Quinlivan: the producer is just the kafka-console-producer
shell that comes in the kafka/bin directory (kafka/bin/windows in my
case). Nothing special.
I'll try messing with acks because this problem is somewhat incidental
to what I'm trying to do which is see how big the log directory grows.

It's possible kafkacat or other producers would do a better job than
the console producer but I'll try that on linux as getting them
working on windows, meh.

thanks all

jan


On 18/04/2017, David Garcia  wrote:
> The “NewShinyProducer” is also deprecated.
>
> On 4/18/17, 5:41 PM, "David Garcia"  wrote:
>
> The console producer in the 0.10.0.0 release uses the old producer which
> doesn’t have “backoff”…it’s really just for testing simple producing:
>
> object ConsoleProducer {
>
>   def main(args: Array[String]) {
>
> try {
> val config = new ProducerConfig(args)
> val reader =
> Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
> reader.init(System.in, getReaderProps(config))
>
> val producer =
>   if(config.useOldProducer) {
> new OldProducer(getOldProducerProps(config))
>   } else {
> new NewShinyProducer(getNewProducerProps(config))
>   }
>
>
>
> On 4/18/17, 5:31 PM, "Robert Quinlivan"  wrote:
>
> I am curious how your producer is configured. The producer maintains
> an
> internal buffer of messages to be sent over to the broker. Is it
> possible
> you are terminating the producer code in your test before the buffer
> is
> exhausted?
>
> On Tue, Apr 18, 2017 at 5:29 PM, jan 
> wrote:
>
> > Thanks to both of you. Some quick points:
> >
> > I'd expect there to be backpressure from the producer if the
> broker is
> > busy ie. the broker would not respond to the console producer if
> the
> > broker was too busy accept more messages, and the producer would
> hang
> > on the socket. Alternatively I'd hope the console producer would
> have
> > the sense to back off and retry but clearly(?) not.
> > This behaviour is actually relevant to my old job so I need to
> know more.
> >
> > Perhaps the timeout mentioned in the error msg can just be upped?
> >
> > *Is* the claimed timeout relevant?
> > > Batch containing 8 record(s) expired due to timeout while
> requesting
> > metadata from brokers for big_ptns1_repl1_nozip-0
> >
> > Why is the producer expiring records?
> >
> > But I'm surprised this happened because my setup is one machine
> with
> > everything running on it. No network. Also Kafka writes to the
> disk
> > without an fsync (or its equivalent on windows) which means it
> just
> > gets cached in ram before being lazily written to disk, and I've
> got
> > plenty of ram - 16GB ram vs 5GB of input file. Kafka adds its
> overhead
> > so it grows to ~8GB but still, it need not hit disk at all (and
> the
> > file goes into the windows memory, not java's).
> > Maybe it is GC holding things up but I dunno, GC even for a second
> or
> > two should not cause a socket failure, just delay the read, though
> I'm
> > not an expert on this *at all*.
> >
> > I'll go over the answers tomorrow more carefully but thanks
> anyway!
> >
> > cheers
> >
> > jan
> >
> > On 18/04/2017, Serega Sheypak  wrote:
> > >> err, isn't it supposed to? Isn't the loss of data a very
> serious error?
> > > Kafka can't fix networking issues like latencies, blinking,
> > unavailability
> > > or any other weird stuff. Kafka promises you to persist data if
> data
> > > reaches Kafka. Data delivery responsibility to kafka is on your
> side. You
> > > fail to do it according to logs.
> > >
> > > 0.02% not 2%
> > > You should check broker logs to figure out what went wrong. All
> things
> > > happen on one machine as far as I understand. Maybe your brokers
> don't
> > have
> > > enough mem and they stuck because of GC and don't respond to
> producer.
> > &

Re: Issue in Kafka running for few days

2017-04-30 Thread jan
I looked this up yesterday  when I read the grandparent, as my old
company ran two and I needed to know.
Your link is a bit ambiguous but it has a link to the zookeeper
Getting Started guide which says this:

"
For replicated mode, a minimum of three servers are required, and it
is strongly recommended that you have an odd number of servers. If you
only have two servers, then you are in a situation where if one of
them fails, there are not enough machines to form a majority quorum.
Two servers is inherently less stable than a single server, because
there are two single points of failure.
"

<https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html>

cheers

jan


On 30/04/2017, Michal Borowiecki  wrote:
> Svante, I don't share your opinion.
> Having an even number of zookeepers is not a problem in itself, it
> simply means you don't get any better resilience than if you had one
> fewer instance.
> Yes, it's not common or recommended practice, but you are allowed to
> have an even number of zookeepers and it's most likely not related to
> the problem at hand and does NOT need to be addressed first.
> https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup
>
> Abhit, I'm afraid the log snippet is not enough for me to help.
> Maybe someone else in the community with more experience can recognize
> the symptoms but in the meantime, if you haven't already done so, you
> may want to search for similar issues:
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22
>
> searching for text like "ZK expired; shut down all controller" or "No
> broker in ISR is alive for" or other interesting events form the log.
>
> Hope that helps,
> Michal
>
>
> On 26/04/17 21:40, Svante Karlsson wrote:
>> You are not supposed to run an even number of zookeepers. Fix that first
>>
>> On Apr 26, 2017 20:59, "Abhit Kalsotra"  wrote:
>>
>>> Any pointers please
>>>
>>>
>>> Abhi
>>>
>>> On Wed, Apr 26, 2017 at 11:03 PM, Abhit Kalsotra 
>>> wrote:
>>>
>>>> Hi *
>>>>
>>>> My kafka setup
>>>>
>>>>
>>>> **OS: Windows Machine*6 broker nodes , 4 on one Machine and 2 on other
>>>> Machine*
>>>>
>>>> **ZK instance on (4 broker nodes Machine) and another ZK on (2 broker
>>>> nodes machine)*
>>>> ** 2 Topics with partition size = 50 and replication factor = 3*
>>>>
>>>> I am producing on an average of around 500 messages / sec with each
>>>> message size close to 98 bytes...
>>>>
>>>> More or less the message rate stays constant throughout, but after
>>> running
>>>> the setup for close to 2 weeks , my Kafka cluster broke and this
>>>> happened
>>>> twice in a month.  Not able to understand what's the issue, Kafka gurus
>>>> please do share your inputs...
>>>>
>>>> the controlle.log file at the time of Kafka broken looks like
>>>>
>>>>
>>>>
>>>>
>>>> *[2017-04-26 12:03:34,998] INFO [Controller 0]: Broker failure callback
>>>> for 0,1,3,5,6 (kafka.controller.KafkaController)[2017-04-26
>>> 12:03:34,998]
>>>> INFO [Controller 0]: Removed ArrayBuffer() from list of shutting down
>>>> brokers. (kafka.controller.KafkaController)[2017-04-26 12:03:34,998]
>>> INFO
>>>> [Partition state machine on Controller 0]: Invoking state change to
>>>> OfflinePartition for partitions
>>>> [__consumer_offsets,19],[mytopic,11],[__consumer_
>>> offsets,30],[mytopicOLD,18],[mytopic,13],[__consumer_
>>> offsets,47],[mytopicOLD,26],[__consumer_offsets,29],[
>>> mytopicOLD,0],[__consumer_offsets,41],[mytopic,44],[
>>> mytopicOLD,38],[mytopicOLD,2],[__consumer_offsets,17],[__
>>> consumer_offsets,10],[mytopic,20],[mytopic,23],[mytopic,30],
>>> [__consumer_offsets,14],[__consumer_offsets,40],[mytopic,
>>> 31],[mytopicOLD,43],[mytopicOLD,19],[mytopicOLD,35]
>>> ,[__consumer_offsets,18],[mytopic,43],[__consumer_offsets,26],[__consumer_
>>> offsets,0],[mytopic,32],[__consumer_offsets,24],[
>>> mytopicOLD,3],[mytopic,2],[mytopic,3],[mytopicOLD,45],[
>>> mytopic,35],[__consumer_offsets,20],[mytopic,1],[
>>> mytopicOLD,33],[__consumer_offsets,5],[mytopicOLD,47],[__
>>> consumer_offsets,22],[mytopicOLD,8],[mytopic,33],[
>>> mytopic,36],[mytopicOLD,11],[mytopic,47],[myto

Re: Kafka write throughput tuning

2017-05-31 Thread jan
I'm no kafka expert and I've forgotten what little I learnt, however
there must be a bottleneck somewhere.

In your first instance of 3 partitions on 3 disks:
- Are all partitions growing?
- Are they growing about equally?

- What else might be limiting aspect...
-- what's the cpu activity like, perhaps it's cpu bound (unlikely but
please check)
-- are the disks directly attached and not sharing any write paths, or
are they virtual disks over a network? (I've actually seen virtuals
over a network - not pretty)
-- any other limiting factors you can see or imagine?

Also please in future give a fuller picture of your setup eg. OS, OS
version, memory, number of cpus, what actual hardware (PCs are very
different from servers), etc

cheers

jan

On 17/05/2017, 陈 建平Chen Jianping  wrote:
> Hi Group,
>
> Recently I am trying to turn Kafka write performance to improve throughput.
> On my Kafka broker, there are 3 disks (7200 RPM).
> For one disk, the Kafka write throughput can reach 150MB/s. In my opinion,
> if I send message to topic test_p3 (which has 3 partitions located on
> different disk in the same server), the whole write throughput can reach 450
> MB/s due to parallel writing disk. However the test result is still 150MB/s.
> Is there any reason that multiple disk doesn’t multiply the write
> throughput? And is there any bottleneck for the Kafka write throughput or I
> need some configuration to update?
>
> I also try to test sending message to two different topic (whose partition
> on different disk of that server), and the total throughput only reach 200
> MB/s instead of 300 MB/s as I expect. Below is my Kafka configuration and
> setting. Thanks for any idea or advice on it:)
>
> ##Kafka producer setting
> ./kafka-run-class org.apache.kafka.tools.ProducerPerformance --topic test_p3
> --num-records 5000 --record-size 100 --throughput -1 --producer-props
> acks=0 bootstrap.servers=localhost:9092 buffer.memory=33554432
> batch.size=16384
>
> ##OS tuning setting
> net.core.rmem_default = 124928
> net.core.rmem_max = 2048000
> net.core.wmem_default = 124928
> net.core.wmem_max = 2048000
> net.ipv4.tcp_rmem = 4096 87380 4194304
> net.ipv4.tcp_wmem = 4096 87380 4194304
> net.ipv4.tcp_max_tw_buckets = 262144
> net.ipv4.tcp_max_syn_backlog = 1024
> vm.oom_kill_allocating_task = 1
> vm.max_map_count = 20
> vm.swappiness = 1
> vm.dirty_writeback_centisecs = 500
> vm.dirty_expire_centisecs = 500
> vm.dirty_ratio = 60
> vm.dirty_background_ratio = 5
>
>
> Thanks,
> Eric
>
>


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Jan Filipiak

Hello Everyone,

I would very much appreciate if someone could provide me a real world 
examplewhere it is more convenient to implement the serializers instead 
of just making sure to provide bytearrays.


The code we came up with explicitly avoids the serializer api. I think 
it is common understanding that if you want to transport data you need 
to have it as a bytearray.


If at all I personally would like to have a serializer interface that 
takes the same types as the producer


public interface Serializer extends Configurable {
public byte[] serializeKey(K data);
public byte[] serializeValue(V data);
public void close();
}

this would avoid long serialize implementations with branches like 
"switch(topic)" or "if(isKey)". Further serializer per topic makes more 
sense in my opinion. It feels natural to have a one to one relationship 
from types to topics or at least only a few partition per type. But as 
we inherit the type from the producer we would have to create many 
producers. This would create additional unnecessary connections to the 
brokers. With the serializers we create a one type to all topics 
relationship and the only type that satisfies that is the bytearray or 
Object. Am I missing something here? As said in the beginning I would 
like to that usecase that really benefits from using the serializers. I 
think in theory they sound great but they cause real practical issues 
that may lead users to wrong decisions.


-1 for putting the serializers back in.

Looking forward to replies that can show me the benefit of serializes 
and especially how the

Type => topic relationship can be handled nicely.

Best
Jan



On 25.11.2014 02:58, Jun Rao wrote:

Hi, Everyone,

I'd like to start a discussion on whether it makes sense to add the
serializer api back to the new java producer. Currently, the new java
producer takes a byte array for both the key and the value. While this api
is simple, it pushes the serialization logic into the application. This
makes it hard to reason about what type of data is being sent to Kafka and
also makes it hard to share an implementation of the serializer. For
example, to support Avro, the serialization logic could be quite involved
since it might need to register the Avro schema in some remote registry and
maintain a schema cache locally, etc. Without a serialization api, it's
impossible to share such an implementation so that people can easily reuse.
We sort of overlooked this implication during the initial discussion of the
producer api.

So, I'd like to propose an api change to the new producer by adding back
the serializer api similar to what we had in the old producer. Specially,
the proposed api changes are the following.

First, we change KafkaProducer to take generic types K and V for the key
and the value, respectively.

public class KafkaProducer implements Producer {

 public Future send(ProducerRecord record, Callback
callback);

 public Future send(ProducerRecord record);
}

Second, we add two new configs, one for the key serializer and another for
the value serializer. Both serializers will default to the byte array
implementation.

public class ProducerConfig extends AbstractConfig {

 .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
"org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
KEY_SERIALIZER_CLASS_DOC)
 .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
"org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH,
VALUE_SERIALIZER_CLASS_DOC);
}

Both serializers will implement the following interface.

public interface Serializer extends Configurable {
 public byte[] serialize(String topic, T data, boolean isKey);

 public void close();
}

This is more or less the same as what's in the old producer. The slight
differences are (1) the serializer now only requires a parameter-less
constructor; (2) the serializer has a configure() and a close() method for
initialization and cleanup, respectively; (3) the serialize() method
additionally takes the topic and an isKey indicator, both of which are
useful for things like schema registration.

The detailed changes are included in KAFKA-1797. For completeness, I also
made the corresponding changes for the new java consumer api as well.

Note that the proposed api changes are incompatible with what's in the
0.8.2 branch. However, if those api changes are beneficial, it's probably
better to include them now in the 0.8.2 release, rather than later.

I'd like to discuss mainly two things in this thread.
1. Do people feel that the proposed api changes are reasonable?
2. Are there any concerns of including the api changes in the 0.8.2 final
release?

Thanks,

Jun





Re: High CPU usage of Crc32 on Kafka broker

2015-02-22 Thread Jan Filipiak
I just want to bring up that idea of no server side de/recompression 
again. Features like KAFKA-1499 
<https://issues.apache.org/jira/browse/KAFKA-1499> seem to steer the 
project into a different direction and the fact that tickets like 
KAFKA-845 <https://issues.apache.org/jira/browse/KAFKA-845> are not 
getting much attention gives the same impression. This is something my 
head keeps spinning around almost 24/7 recently.


The problem I see is that CPU's are not the cheapest part of a new 
server and if you can spare a gigahertz or some cores by just making 
sure your configs are the same across all producers I would always opt 
for the operational overhead instead of the bigger servers. I think this 
will usually decrease the tco's of kafka installations.


I am currently not familiar enough with the codebase to judge if server 
side decompression happens before acknowledge. If so, these would be 
some additional milliseconds to respond faster if we could spare 
de/recompression.


Those are my thoughts about server side de/recompression. It would be 
great if I could get some responses and thoughts back.


Jan



On 07.11.2014 00:23, Jay Kreps wrote:

I suspect it is possible to save and reuse the CRCs though it might be a
bit of an invasive change. I suspect the first usage is when we are
checking the validity of the messages and the second is from when we
rebuild the compressed message set (I'm assuming you guys are using
compression because I think we optimize this out otherwise). Technically I
think the CRCs stay the same.

An alternative approach, though, would be working to remove the need for
recompression entirely on the broker side by making the offsets in the
compressed message relative to the base offset of the message set. This is
a much more invasive change but potentially better as it would also remove
the recompression done on the broker which is also CPU heavy.

-Jay

On Thu, Nov 6, 2014 at 2:36 PM, Allen Wang 
wrote:


Sure. Here is the link to the screen shot of jmc with the JTR file loaded:

http://picpaste.com/fligh-recorder-crc.png



On Thu, Nov 6, 2014 at 2:12 PM, Neha Narkhede 
wrote:


Allen,

Apache mailing lists don't allow attachments. Could you please link to a
pastebin or something?

Thanks,
Neha

On Thu, Nov 6, 2014 at 12:02 PM, Allen Wang 
wrote:


After digging more into the stack trace got from flight recorder (which

is

attached), it seems that Kafka (0.8.1.1) can optimize the usage of

Crc32.

The stack trace shows that Crc32 is invoked twice from Log.append().

First

is from the line number 231:

val appendInfo = analyzeAndValidateMessageSet(messages)

The second time is from line 252 in the same function:

validMessages = validMessages.assignOffsets(offset, appendInfo.codec)

If one of the Crc32 invocation can be eliminated, we are looking at

saving

at least 7% of CPU usage.

Thanks,
Allen




On Wed, Nov 5, 2014 at 6:32 PM, Allen Wang  wrote:


Hi,

Using flight recorder, we have observed high CPU usage of CRC32
(kafka.utils.Crc32.update()) on Kafka broker. It uses as much as 25%

of

CPU

on an instance. Tracking down stack trace, this method is invoked by
ReplicaFetcherThread.

Is there any tuning we can do to reduce this?

Also on the topic of CPU utilization, we observed that overall CPU
utilization is proportional to AllTopicsBytesInPerSec metric. Does

this

metric include incoming replication traffic?

Thanks,
Allen






Re: Producer does not recognize new brokers

2015-04-13 Thread Jan Filipiak

Hey,

try to not have newlines \n in your jsonfile. I think the parser dies on 
those and then claims the file is empty


Best
Jan




On 13.04.2015 12:06, Ashutosh Kumar wrote:

Probably you should first try to generate proposed plan using --generate
option and then edit that if needed.
thanks


On Mon, Apr 13, 2015 at 3:12 PM, shadyxu  wrote:


Thanks guys. You are right and then here comes another problems:

I added new brokers 4, 5 and 6. Now I want to move partitions 3, 4 and
5(currently on broker 1, 2 and 3) of topic test to these brokers. I wrote
r.json file like this:

{"partitions":
[{"topic": "test","partition": 3,"replicas": [4]},
{"topic":"test","partition":4,"replicas":[5]},
{"topic":"test","partition":5,"replicas":[6]},],
"version":1}

and then ran:

 bin/kafka-reassign-partitions.sh --zookeeper [some-kafka-address]
--reassignment-json-file r.json --execute

Kafka gave me this error message:

 kafka.common.AdminCommandFailedException: Partition reassignment data
file r.json is empty

I googled, seems Kafka parse the json file but found that no partitions
were needed to be removed. Was my json file not properly configured?

2015-04-13 14:00 GMT+08:00 Ashutosh Kumar :


I think you need to re balance the cluster.
something like

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
--topics-to-move-json-file topics-to-move.json --broker-list "5,6"
--generate


On Mon, Apr 13, 2015 at 11:22 AM, shadyxu  wrote:


I added several new brokers to the cluster, there should'v been a

rebalance

but it seemed that the producer was not aware of the new brokers. Data

kept

being sent to the old brokers and there were no partitions on the new
brokers.

I configured the old brokers to the producer and did not restart the
producer or add the new brokers to the configuration.

What may be the problems?





MapDB

2013-01-10 Thread Jan Kotek
Hi,

I am author of MapDB; database engine which provides Maps, Sets, Queues and 
other collections backed by disk (or in-memory) storage. MapDB is probably the 
fastest java db engine, it can do 2 million inserts per second etc... 

I read your design paper and it seems we have common goal (high performance 
persistence). Maybe we could cooperate and share some code. 

More details about MapDB:
https://github.com/jankotek/mapdb


Regards,
Jan Kotek


MapDB

2013-01-10 Thread Jan Kotek
Hi,

I am author of MapDB; database engine which provides Maps, Sets, Queues and 
other collections backed by disk (or in-memory) storage. MapDB is probably the 
fastest java db engine, it can do 2 million inserts per second etc... 

I read your design paper and it seems we have common goal (high performance 
persistence). Maybe we could cooperate and share some code. 

More details about MapDB:
https://github.com/jankotek/mapdb


Regards,
Jan Kotek



Re: MapDB

2013-01-12 Thread Jan Kotek
No, there are no benchmarks yet. 
Also MapDB does not have TCP support. 

Jan

On Friday 11 January 2013 13:28:40 B.e. wrote:
> Have you benchmarked against java-chronicle?
> 
> Thx
> 
> Sent from my iPad
> 
> On Jan 10, 2013, at 9:11 PM, Jan Kotek  wrote:
> > Hi,
> > 
> > I am author of MapDB; database engine which provides Maps, Sets, Queues
> > and
> > other collections backed by disk (or in-memory) storage. MapDB is probably
> > the fastest java db engine, it can do 2 million inserts per second etc...
> > 
> > I read your design paper and it seems we have common goal (high
> > performance
> > persistence). Maybe we could cooperate and share some code.
> > 
> > More details about MapDB:
> > https://github.com/jankotek/mapdb
> > 
> > 
> > Regards,
> > Jan Kotek


Does C++ client support zookeeper based producer load balancing?

2013-08-07 Thread Jan Rudert
Hi,

I am starting with kafka. We use version 0.7.2 currently. Does anyone know
wether automatic producer load balancing based on zookeeper is supported by
the c++ client?

Thank you!
-- Jan


Does C++ client support zookeeper based producer load balancing?

2013-08-07 Thread Jan Rudert
Hi,

I am starting with kafka. We use version 0.7.2 currently. Does anyone know
wether automatic producer load balancing based on zookeeper is supported by
the c++ client?

Thank you!
-- Jan


commitOffsets() in multithreaded consumer process

2013-08-09 Thread Jan Rudert
Hi,

I have an consumer application where I have a message stream per topic and
one thread per stream.

I will do a commitOffsets() when a global shared message counter is
reaching a limit.

I think I need to make sure that no thread is consuming while I call
commitOffsets() to ensure that no concurrent consuming error happens in one
of the threads.

Therefor I use a CyclicBarrier in my threads and do the commitOffsets() in
the barrier action.

The problem arises in case thread A is blocked in stream.next() when there
is no traffic in the topic. When the other threads are blocked in
barrier.await() they have to wait until A receives a message. This can
possible block all consuming.

Is there a best practice on committing properly in a multithreaded consumer?

Thank you!
Jan


Re: commitOffsets() in multithreaded consumer process

2013-08-10 Thread Jan Rudert
Thank you!

So I guess, you suggest a really really small timeout so that the other
consuming threads don't get regularly blocked for the timeout period? My
consumer use case does not allow having "longer" breaks because there are
some high traffic topics.

Thanks
Jan


2013/8/10 Jun Rao 

> The consumer has a config property called consumer.timeout.ms. By setting
> the value to a positive integer, a timeout exception is thrown to the
> consumer if no message is available for consumption after the specified
> timeout value.
>
> Thanks,
> Jun
>
>
> On Fri, Aug 9, 2013 at 9:25 AM, Jan Rudert  wrote:
>
> > Hi,
> >
> > I have an consumer application where I have a message stream per topic
> and
> > one thread per stream.
> >
> > I will do a commitOffsets() when a global shared message counter is
> > reaching a limit.
> >
> > I think I need to make sure that no thread is consuming while I call
> > commitOffsets() to ensure that no concurrent consuming error happens in
> one
> > of the threads.
> >
> > Therefor I use a CyclicBarrier in my threads and do the commitOffsets()
> in
> > the barrier action.
> >
> > The problem arises in case thread A is blocked in stream.next() when
> there
> > is no traffic in the topic. When the other threads are blocked in
> > barrier.await() they have to wait until A receives a message. This can
> > possible block all consuming.
> >
> > Is there a best practice on committing properly in a multithreaded
> > consumer?
> >
> > Thank you!
> > Jan
> >
>


Re: compression performance

2013-08-16 Thread Jan Kotek
> you see that with no compression 80% of the time goes to FileChannel.write,

> But with snappy enabled only 5% goes to writing data, 50% of the time goes
> to byte copying and allocation, and only about 22% goes to actual

I had similar problem with MapDB, it was solved by using memory mapped files. 
Not sure how it applies to this case.

Regards,
Jan Kotek


On Friday 02 August 2013 22:19:34 Jay Kreps wrote:
> Chris commented in another thread about the poor compression performance in
> 0.8, even with snappy.
> 
> Indeed if I run the linear log write throughput test on my laptop I see
> 75MB/sec with no compression and 17MB/sec with snappy.
> 
> This is a little surprising as snappy claims 200MB round-trip performance
> (compress + uncompress) from java. So what is going on?
> 
> Well you may remember I actually filed a bug a while back on all the
> inefficient byte copying in the compression path (KAFKA-527). I didn't
> think too much of it, other than it is a bit sloppy, since after all
> computers are good at copying bytes, right?
> 
> Turns out not so much, if you look at a profile of the standalone log test
> you see that with no compression 80% of the time goes to FileChannel.write,
> which is reasonable since that is what a log does.
> 
> But with snappy enabled only 5% goes to writing data, 50% of the time goes
> to byte copying and allocation, and only about 22% goes to actual
> compression and decompression (with lots of misc stuff in their I haven't
> bothered to tally).
> 
> If someone was to optimize this code path I think we could take a patch in
> 0.8.1. It shouldn't be too hard, just using the backing array on the byte
> buffer and avoiding all the input streams, output streams, byte array
> output streams, and intermediate message blobs.
> 
> I summarized this along with how to reproduce the test results here:
> https://issues.apache.org/jira/browse/KAFKA-527
> 
> -Jay


Pulling Snapshots from Kafka, Log compaction last compact offset

2015-04-30 Thread Jan Filipiak

Hello Everyone,

I am quite exited about the recent example of replicating PostgresSQL 
Changes to Kafka. My view on the log compaction feature always had been 
a very sceptical one, but now with its great potential exposed to the 
wide public, I think its an awesome feature. Especially when pulling 
this data into HDFS as a Snapshot, it is (IMO) a sqoop killer. So I want 
to thank everyone who had the vision of building these kind of systems 
during a time I could not imagine those.


There is one open question that I would like people to help me with. 
When pulling a snapshot of a partition into HDFS using a camus-like 
application I feel the need of keeping a Set of all keys read so far and 
stop as soon as I find a key beeing already in my set. I use this as an 
indicator of how far the log compaction has happened already and only 
pull up to this point. This works quite well as I do not need to keep 
the messages but only their keys in memory.


The question I want to raise with the community is:

How do you prevent pulling the same record twice (in different versions) 
and would it be beneficial if the "OffsetResponse" would also return the 
last offset that got compacted so far and the application would just 
pull up to this point?


Looking forward for some recommendations and comments.

Best
Jan



Recovering from broker failure with KafkaConsumer

2015-06-17 Thread Jan Stette
I'm trying out the new KafkaConsumer client API in the trunk of the source
tree, and while I realise that this is a work in progress, I have a
question that perhaps someone can shed some light on.

I'm looking at how to handle various error scenarios for a Kafka client, in
particular what happens when trying to connect to the broker but it's not
available. The behaviour I'm seeing is that the client will retry
indefinitely (at the configurable interval), basically looping around in
Fetcher.awaitMetadataUpdate() forever.

I would like to have some way to fail the connection attempt to avoid the
calling thread being blocked forever. Is this possible with the current
version of the client? (Snapshot as of 16/6/15). If not, is that something
that's planned for the future?

Jan


Re: Log compaction not working as expected

2015-06-17 Thread Jan Filipiak

Hi,

you might want to have a look here: 
http://kafka.apache.org/documentation.html#topic-config
_segment.ms_ and _segment.bytes _ should allow you to control the 
time/size when segments are rolled.


Best
Jan

On 16.06.2015 14:05, Shayne S wrote:

Some further information, and is this a bug?  I'm using 0.8.2.1.

Log compaction will only occur on the non active segments.  Intentional or
not, it seems that the last segment is always the active segment.  In other
words, an expired segment will not be cleaned until a new segment has been
created.

As a result, a log won't be compacted until new data comes in (per
partition). Does this mean I need to send the equivalent of a pig (
https://en.wikipedia.org/wiki/Pigging) through each partition in order to
force compaction?  Or can I force the cleaning somehow?

Here are the steps to recreate:

1. Create a new topic with a 5 minute segment.ms:

kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
--replication-factor 1 --partitions 1 --config cleanup.policy=compact
--config min.cleanable.dirty.ratio=0.01 --config segment.ms=30

2. Repeatedly add messages with identical keys (3x):

echo "ABC123,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

3. Wait 5+ minutes and confirm no log compaction.
4. Once satisfied, send a new message:

echo "DEF456,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

5. Log compaction will occur quickly soon after.

Is my use case of infrequent logs not supported? Is this intentional
behavior? It's unnecessarily challenging to target each partition with a
dummy message to trigger compaction.

Also, I believe there is another issue with logs originally configured
without a segment timeout that lead to my original issue.  I still cannot
get those logs to compact.

Thanks!
Shayne





Re: Recovering from broker failure with KafkaConsumer

2015-06-17 Thread Jan Stette
Adding some more details to the previous question:

The indefinite wait doesn't happen on calling subscribe() on the consumer,
it happens when I (in this case) call seekToEnd().

A related problem to this is that the seekToEnd() method is synchronized
(as are the other access methods on KafkaConsumer), so the client holds a
lock while sitting in this wait. This means that if another thread tries
 to call close(), which is all synchronized, this thread will also be
blocked.

Holding locks while performing network I/O seems like a bad idea - is this
something that's planned to be fixed?

Jan



On 17 June 2015 at 10:31, Jan Stette  wrote:

> I'm trying out the new KafkaConsumer client API in the trunk of the source
> tree, and while I realise that this is a work in progress, I have a
> question that perhaps someone can shed some light on.
>
> I'm looking at how to handle various error scenarios for a Kafka client,
> in particular what happens when trying to connect to the broker but it's
> not available. The behaviour I'm seeing is that the client will retry
> indefinitely (at the configurable interval), basically looping around in
> Fetcher.awaitMetadataUpdate() forever.
>
> I would like to have some way to fail the connection attempt to avoid the
> calling thread being blocked forever. Is this possible with the current
> version of the client? (Snapshot as of 16/6/15). If not, is that something
> that's planned for the future?
>
> Jan
>
>


Re: Log compaction not working as expected

2015-06-17 Thread Jan Filipiak

Ah misread that sorry!

On 17.06.2015 14:26, Shayne S wrote:

Right, you can see I've got segment.ms set.  The trick is that they don't
actually roll over until something new arrives. If your topic is idle (not
receiving messages), it won't ever roll over to a new segment, and thus the
last segment will never be compacted.

Thanks!
Shayne

On Wed, Jun 17, 2015 at 5:58 AM, Jan Filipiak 
wrote:


Hi,

you might want to have a look here:
http://kafka.apache.org/documentation.html#topic-config
_segment.ms_ and _segment.bytes _ should allow you to control the
time/size when segments are rolled.

Best
Jan


On 16.06.2015 14:05, Shayne S wrote:


Some further information, and is this a bug?  I'm using 0.8.2.1.

Log compaction will only occur on the non active segments.  Intentional or
not, it seems that the last segment is always the active segment.  In
other
words, an expired segment will not be cleaned until a new segment has been
created.

As a result, a log won't be compacted until new data comes in (per
partition). Does this mean I need to send the equivalent of a pig (
https://en.wikipedia.org/wiki/Pigging) through each partition in order to
force compaction?  Or can I force the cleaning somehow?

Here are the steps to recreate:

1. Create a new topic with a 5 minute segment.ms:

kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
--replication-factor 1 --partitions 1 --config cleanup.policy=compact
--config min.cleanable.dirty.ratio=0.01 --config segment.ms=30

2. Repeatedly add messages with identical keys (3x):

echo "ABC123,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

3. Wait 5+ minutes and confirm no log compaction.
4. Once satisfied, send a new message:

echo "DEF456,{\"test\": 1}" | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

5. Log compaction will occur quickly soon after.

Is my use case of infrequent logs not supported? Is this intentional
behavior? It's unnecessarily challenging to target each partition with a
dummy message to trigger compaction.

Also, I believe there is another issue with logs originally configured
without a segment timeout that lead to my original issue.  I still cannot
get those logs to compact.

Thanks!
Shayne






Re: Recovering from broker failure with KafkaConsumer

2015-06-19 Thread Jan Stette
Sounds good, thanks for the clarification.

Jan



On 17 June 2015 at 22:09, Jason Gustafson  wrote:

> We have a couple open tickets to address these issues (see KAFKA-1894 and
> KAFKA-2168). It's definitely something we want to fix.
>
> On Wed, Jun 17, 2015 at 4:21 AM, Jan Stette  wrote:
>
>> Adding some more details to the previous question:
>>
>> The indefinite wait doesn't happen on calling subscribe() on the consumer,
>> it happens when I (in this case) call seekToEnd().
>>
>> A related problem to this is that the seekToEnd() method is synchronized
>> (as are the other access methods on KafkaConsumer), so the client holds a
>> lock while sitting in this wait. This means that if another thread tries
>>  to call close(), which is all synchronized, this thread will also be
>> blocked.
>>
>> Holding locks while performing network I/O seems like a bad idea - is this
>> something that's planned to be fixed?
>>
>> Jan
>>
>>
>>
>> On 17 June 2015 at 10:31, Jan Stette  wrote:
>>
>> > I'm trying out the new KafkaConsumer client API in the trunk of the
>> source
>> > tree, and while I realise that this is a work in progress, I have a
>> > question that perhaps someone can shed some light on.
>> >
>> > I'm looking at how to handle various error scenarios for a Kafka client,
>> > in particular what happens when trying to connect to the broker but it's
>> > not available. The behaviour I'm seeing is that the client will retry
>> > indefinitely (at the configurable interval), basically looping around in
>> > Fetcher.awaitMetadataUpdate() forever.
>> >
>> > I would like to have some way to fail the connection attempt to avoid
>> the
>> > calling thread being blocked forever. Is this possible with the current
>> > version of the client? (Snapshot as of 16/6/15). If not, is that
>> something
>> > that's planned for the future?
>> >
>> > Jan
>> >
>> >
>>
>
>


Questions regarding Kafka-1477

2015-07-02 Thread Jan Filipiak

Hi,

just out of curiosity and because of Eugene's email, I browsed 
Kafka-1477 and it talks about SSL alot. So I thought I might throw in 
this http://tools.ietf.org/html/rfc7568 RFC. It basically says move away 
from SSL now and only do TLS. The title of the ticket still mentions TLS 
but afterwards its only SSL, haven't looked at any patches or library 
code so I can't really judge what's going on.


Further I found people starting to talk about sendfile(2) TLS support, 
here for example https://people.freebsd.org/~rrs/asiabsd_2015_tls.pd 
f. So maybe we 
can keep this door open that at some point the Kernel will be able to do 
TLS for us?






On 02.07.2015 22:24, eugene miretsky wrote:

HI,

There is some work being done on security in Kafka:
Confluence: https://cwiki.apache.org/confluence/display/KAFKA/Security
Jira: https://issues.apache.org/jira/browse/KAFKA-1682

It seems like the main blockers are KAFKA-1477
, KAFKA-1691
  and KAFKA-1690
.

Is there an anticipated road map for when all the security features will be
done and merged in to trunk?


(


Hdfs fSshell getmerge

2015-07-24 Thread Jan Filipiak

Hello hadoop users,

I have an idea about a small feature for the getmerge tool. I recently 
was in the need of using the new line option -nl because the files I 
needed to merge simply didn't had one.
I was merging all the files from one directory and unfortunately this 
directory also included empty files, which effectively led to multiple 
newlines append after some files.

I needed to remove them manually afterwards.

In this situation it is maybe good to have another argument that allows 
skipping empty files. I just wrote down 2 change one could try at the 
end. Do you guys consider this as a good improvement to the command line 
tools?


Thing one could try to implement this feature:

The call for IOUtils.copyBytes(in, out, getConf(), false); doesn't 
return the number of bytes copied which would be convenient as one could 
skip append the new line when 0 bytes where copied

Or one would check the file size before.

Please let me know If you would consider this useful and is worth a 
feature ticket in Jira.


Thank you
Jan


Re: Hdfs fSshell getmerge

2015-07-24 Thread Jan Filipiak

Sorry wrong mailing list

On 24.07.2015 16:44, Jan Filipiak wrote:

Hello hadoop users,

I have an idea about a small feature for the getmerge tool. I recently 
was in the need of using the new line option -nl because the files I 
needed to merge simply didn't had one.
I was merging all the files from one directory and unfortunately this 
directory also included empty files, which effectively led to multiple 
newlines append after some files.

I needed to remove them manually afterwards.

In this situation it is maybe good to have another argument that 
allows skipping empty files. I just wrote down 2 change one could try 
at the end. Do you guys consider this as a good improvement to the 
command line tools?


Thing one could try to implement this feature:

The call for IOUtils.copyBytes(in, out, getConf(), false); doesn't 
return the number of bytes copied which would be convenient as one 
could skip append the new line when 0 bytes where copied

Or one would check the file size before.

Please let me know If you would consider this useful and is worth a 
feature ticket in Jira.


Thank you
Jan




Re: log compaction scaling with ~100m messages

2015-10-08 Thread Jan Filipiak

Hi,

just want to pick this up again. You can always use more partitions to 
reduce the number of keys handled by a single broker and parallelize the 
compaction. So with sufficient number of machines and the ability to 
partition I don’t see you running into problems.


Jan

On 07.10.2015 05:34, Feroze Daud wrote:

hi!
We have a use case where we want to store ~100m keys in kafka. Is there any 
problem with this approach?
I have heard from some people using kafka, that kafka has a problem when doing 
log compaction with those many number of keys.
Another topic might have around 10 different K/V pairs for each key in the 
primary topic. The primary topic's keyspace is approx of 100m keys. We would 
like to store this in kafka because we are doing a lot of stream processing on 
these messages, and want to avoid writing another process to recompute data 
from snapshots.
So, in summary:
primary topic: ~100m keyssecondary topic: ~1B keys
Is it feasible to use log compaction at such a scale of data?
Thanks
feroze.




Re: Consuming "backwards"?

2015-11-09 Thread Jan Filipiak

Hi,

obviously this should be build different IMHO (unless I fail to see 
something that prevents you from doing this).

When you realize you fall behind do this:

1. remember your current
2. get the latest offset
3. fork a process to replicate from the current offset +1 to the latest 
one just fetched.

4. reset your offset to the latest +1

I fail to see the reason why a topic change would be needed. If the 
downstream app cant handle the data, you are in trouble anyways, and if 
your message processing is the bottleneck more partitions might also be 
an option.  I would try to not do any tricks and find out what the lag 
is caused by and then fix whatever causes the lag. Its 1 am in Germany 
there might be off by one errors in the algorithm above.


Best
Jan


On 04.11.2015 18:13, Otis Gospodnetić wrote:

This is an aancient thread, but I thought I'd point to
http://blog.sematext.com/2015/11/04/kafka-real-time-stream-multi-topic-catch-up-trick/
which gives a short description of how we ended up implementing this.
It seems to work well for us, but if there are better ways to do it, esp.
now with 0.9 around the corner, I'm all eyeballs!

Otis
--
Monitoring - Log Management - Alerting - Anomaly Detection
Solr & Elasticsearch Consulting Support Training - http://sematext.com/


On Fri, Dec 6, 2013 at 7:09 PM, Joe Stein  wrote:


yes, also you can read backwards on the stream if you do this in the
consumer

val maoList: Iterable[MessageAndOffset] = for(messageAndOffset <-
messageSet if(numMessagesConsumed < maxMessages)) yield messageAndOffset

for(messageAndOffset <- maoList.toList.reverse) {

this way every read is the latest before the next earliest so when you
fetch 18,19,20 you will see them coming in as 20,19,18

/***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
/


On Fri, Dec 6, 2013 at 7:02 PM, Steven Parkes 
wrote:


Right. If you're not reading contiguously, you need to remember the

ranges

that you have/haven't read. As long as you do that, it all works out, I
think.

A kafka client always has to store the last offset it read. In the
simplest "both directions" case where you start with current and read in
both directions, you just need to remember the first offset you've read

as

well.

On Dec 6, 2013, at 3:50 PM, Joe Stein  wrote:


you have to allow the fetchSize to be variable so in your example since

the

new highwatermark is 12 and the last consumsed message is 10

fetchSize = if (highwatermark - lastConsumedOffset < 3) highwatermark -
lastConsumedOffset else 3

the real trick though is missing messages having to keep track of more

than

one index

so lets say now you have 7 more published (13,14,15,16,17,18,19)

you then read 17,18,19 (and while that happens 5 more are published ...
20,21,22,23,24)

now when you read 22,23,24 ... you have to keep track of not only 22 as

the

last read so you scoop up 20 and 21 but also remember still 17 so you

get

16,15,14,13

so it can be done with some fancy logic to manage the index and offsets
(and persist them)


On Fri, Dec 6, 2013 at 6:44 PM, Otis Gospodnetic <

otis.gospodne...@gmail.com

wrote:
Hi,

On Fri, Dec 6, 2013 at 6:32 PM, Steven Parkes 
wrote:


On Dec 6, 2013, at 2:03 PM, Otis Gospodnetic <

otis.gospodne...@gmail.com

wrote:


but I think the
problem is that each time we grab we could get some of the same

messages

we

already processed

Doesn't setting the fetchSize to "how far back we need to grab"

handle

that?


I *think* it doesn't, but I'm wrong every day N times, so

I think this is what would happen:
1) imagine 10 messages in the broker m1 - m10
2) consumer grabs last N (=3): m8, m9, m10
3) while it's doing that and before consumer polls for more messages
producer publishes 2 more: m11 and m12
4) consumer now polls again. It asks broker for publisher offset and

gets

the answer: 12
5) good, says consumer, let me then fetch everything after offset

12-3=9:

m10, m11, m12

Problem: consumer got m10 again, but it was already processed in 2).

No?  Please correct me if I'm wrong anywhere.

Thanks,
Otis
--
Performance Monitoring * Log Analytics * Search Analytics
Solr & Elasticsearch Support * http://sematext.com/







Producer buffer available bytes constantly decreasing

2015-11-22 Thread Jan Algermissen
Hi,

I am writing to a Kafka Topic from within a Scala/Akka application using the 
new Producer (0.8.2.1).

While writing messages to the queue (at a very reasonable rate of a couple of 
messages per second max) the available buffer is constantly decreasing until 
the producer finally throws an exception saing the buffer is exhausted.

There must be something I do not understand that I am doing wrong, can anyone 
provde a clue?

Here is my config:

 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
 ProducerConfig.RETRIES_CONFIG -> "0",
 ProducerConfig.ACKS_CONFIG -> "1",
 ProducerConfig.COMPRESSION_TYPE_CONFIG -> "none",
 ProducerConfig.TIMEOUT_CONFIG -> new Integer(3),
// ProducerConfig.BATCH_SIZE_CONFIG -> new Integer(16384),
 ProducerConfig.BATCH_SIZE_CONFIG -> new Integer(10),
 ProducerConfig.BUFFER_MEMORY_CONFIG -> new Integer(66554432),
 ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG -> new java.lang.Boolean(false),
 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringSerializer",
 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringSerializer"


And my send code:

kafkaProducer.send(new ProducerRecord[String,String](topic, key, data),new 
Callback {
  def onCompletion(recordMetadata: RecordMetadata, e: Exception):Unit = {
if(e != null) {
  logger.error(s"Could not send $data",e)
}
logger.info("The offset of the record we just sent is: " + 
recordMetadata.offset())
()
  }

})


I am using the metrics() member to periodically look at 
"buffer-available-bytes" and I see it is constantly decreasing over time as 
messages are being sent.


Jan





Help on understanding kafka-topics.sh output

2015-11-22 Thread Jan Algermissen
Hi,

I have a topic with 16 partitions that shows the following output for 

kafka-topics.sh --zookeeper x:2181 --topic capture --describe

Can anyone explain to me what the difference in replicas means and what Leader 
of -1 means?

In the logs of my producer I see that no messages seem to be sent to the 
partitions with '-1' and th eproducer buffer becomes exhausted afetr a while 
(maybe that is related?)

Jan

Topic:capture   PartitionCount:16   ReplicationFactor:1 Configs:

Topic: capture  Partition: 0Leader: 1   Replicas: 1 Isr: 1
Topic: capture  Partition: 1Leader: 1   Replicas: 1 Isr: 1
Topic: capture  Partition: 2Leader: -1  Replicas: 2 Isr: 
Topic: capture  Partition: 3Leader: 1   Replicas: 1 Isr: 1
Topic: capture  Partition: 4Leader: -1  Replicas: 2 Isr: 
Topic: capture  Partition: 5Leader: 1   Replicas: 1 Isr: 1
Topic: capture  Partition: 6Leader: -1  Replicas: 2 Isr: 
Topic: capture  Partition: 7Leader: 1   Replicas: 1 Isr: 1
Topic: capture  Partition: 8Leader: -1  Replicas: 2 Isr: 
Topic: capture  Partition: 9Leader: 1   Replicas: 1 Isr: 1
Topic: capture  Partition: 10   Leader: -1  Replicas: 2 Isr: 
Topic: capture  Partition: 11   Leader: 1   Replicas: 1 Isr: 1
Topic: capture  Partition: 12   Leader: -1  Replicas: 2 Isr: 
Topic: capture  Partition: 13   Leader: 1   Replicas: 1 Isr: 1
Topic: capture  Partition: 14   Leader: -1  Replicas: 2 Isr: 
Topic: capture  Partition: 15   Leader: 1   Replicas: 1 Isr: 1

Re: Help on understanding kafka-topics.sh output

2015-11-22 Thread Jan Algermissen
Hi Todd,

yes, correct - thanks.

However, what I am not getting is that the KafkaProducer (see my other mail 
from today) silently accepts the messages and fills them up in the buffer until 
it is exhausted instead of saying that the broker is not reachable.

IOW, it seems from an application perspective I am unable to detect that 
messages are not being sent out. Is this normal behavior and I am simply doing 
something wrong or could it be a producer bug?

Jan

Config and code again:

ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ProducerConfig.RETRIES_CONFIG -> "0",
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.COMPRESSION_TYPE_CONFIG -> "none",
ProducerConfig.TIMEOUT_CONFIG -> new Integer(3),
// ProducerConfig.BATCH_SIZE_CONFIG -> new Integer(16384),
ProducerConfig.BATCH_SIZE_CONFIG -> new Integer(10),
ProducerConfig.BUFFER_MEMORY_CONFIG -> new Integer(66554432),
ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG -> new java.lang.Boolean(false),
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringSerializer"




kafkaProducer.send(new ProducerRecord[String,String](topic, key, data),new 
Callback {
 def onCompletion(recordMetadata: RecordMetadata, e: Exception):Unit = {
   if(e != null) {
 logger.error(s"Could not send $data",e)
   }
   logger.info("The offset of the record we just sent is: " + 
recordMetadata.offset())
   ()
 }

})


> On 22 Nov 2015, at 20:49, Todd Palino  wrote:
> 
> Replicas and Isr are both a comma separated list of broker IDs. So in this
> output, I am seeing that you have two Kafka brokers with IDs 1 and 2. You
> have a topic, capture, with 16 partitions at replication factor 1 (1
> replica per partition). The broker with ID 2 is not online, which is why it
> shows in the Replica list for some partitions (meaning that it is assigned
> to be a replica), but not in the Isr list (which would indicate that it is
> currently in-sync).
> 
> The Leader field is the broker ID which is currently the leader for that
> partition. For the partitions that are assigned to broker 1, you see that
> broker 1 is the leader. For the partitions that are assigned to broker 2,
> the leader is listed as -1, which indicates that there is no available
> leader. These partitions are considered offline and cannot be produced to
> or consumed from. When broker 2 comes back online, the controller will
> perform an unclean leader election and select broker 2 (the only replica
> available) as the leader for those partitions.
> 
> -Todd
> 
> 
> On Sun, Nov 22, 2015 at 11:39 AM, Jan Algermissen <
> algermissen1...@icloud.com> wrote:
> 
>> Hi,
>> 
>> I have a topic with 16 partitions that shows the following output for
>> 
>> kafka-topics.sh --zookeeper x:2181 --topic capture --describe
>> 
>> Can anyone explain to me what the difference in replicas means and what
>> Leader of -1 means?
>> 
>> In the logs of my producer I see that no messages seem to be sent to the
>> partitions with '-1' and th eproducer buffer becomes exhausted afetr a
>> while (maybe that is related?)
>> 
>> Jan
>> 
>>Topic:capture   PartitionCount:16   ReplicationFactor:1
>> Configs:
>> 
>>Topic: capture  Partition: 0Leader: 1   Replicas: 1
>> Isr: 1
>>Topic: capture  Partition: 1Leader: 1   Replicas: 1
>> Isr: 1
>>Topic: capture  Partition: 2Leader: -1  Replicas: 2
>> Isr:
>>Topic: capture  Partition: 3Leader: 1   Replicas: 1
>> Isr: 1
>>Topic: capture  Partition: 4Leader: -1  Replicas: 2
>> Isr:
>>Topic: capture  Partition: 5Leader: 1   Replicas: 1
>> Isr: 1
>>Topic: capture  Partition: 6Leader: -1  Replicas: 2
>> Isr:
>>Topic: capture  Partition: 7Leader: 1   Replicas: 1
>> Isr: 1
>>Topic: capture  Partition: 8Leader: -1  Replicas: 2
>> Isr:
>>Topic: capture  Partition: 9Leader: 1   Replicas: 1
>> Isr: 1
>>Topic: capture  Partition: 10   Leader: -1  Replicas: 2
>> Isr:
>>Topic: capture  Partition: 11   Leader: 1   Replicas: 1
>> Isr: 1
>>Topic: capture  Partition: 12   Leader: -1  Replicas: 2
>> Isr:
>>Topic: capture  Partition: 13   Leader: 1   Replicas: 1
>> Isr: 1
>>Topic: capture  Partition: 14   Leader: -1  Replicas: 2
>> Isr:
>>Topic: capture  Partition: 15   Leader: 1   Replicas: 1
>> Isr: 1



Re: Help on understanding kafka-topics.sh output

2015-11-22 Thread Jan Algermissen
Hi Todd, thanks very much for the explanations here and in the previous mail - 
helps a lot!

(I assumed the callback would provide an exception when a message could not be 
sent - I observe the callback being called for every message to the up broker, 
but yes, maybe the developers can clarify how to detect a broker being down)

Jan

> On 22 Nov 2015, at 21:42, Todd Palino  wrote:
> 
> Hopefully one of the developers can jump in here. I believe there is a
> future you can use to get the errors back from the producer. In addition,
> you should check the following configs on the producer:
> 
> request.required.acks - this controls whether or not your producer is going
> to wait for an acknowledgement from the broker, and how many brokers it
> waits for
> request.timeout.ms - how long the producer waits to satisfy the acks
> setting before marking the request failed
> retry.backoff.ms - how long the producer waits between retries
> message.send.max.retries - the maximum number of retries the producer will
> attempt a failed request
> 
> -Todd
> 
> 
> On Sun, Nov 22, 2015 at 12:31 PM, Jan Algermissen <
> algermissen1...@icloud.com> wrote:
> 
>> Hi Todd,
>> 
>> yes, correct - thanks.
>> 
>> However, what I am not getting is that the KafkaProducer (see my other
>> mail from today) silently accepts the messages and fills them up in the
>> buffer until it is exhausted instead of saying that the broker is not
>> reachable.
>> 
>> IOW, it seems from an application perspective I am unable to detect that
>> messages are not being sent out. Is this normal behavior and I am simply
>> doing something wrong or could it be a producer bug?
>> 
>> Jan
>> 
>> Config and code again:
>> 
>> ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
>> ProducerConfig.RETRIES_CONFIG -> "0",
>> ProducerConfig.ACKS_CONFIG -> "1",
>> ProducerConfig.COMPRESSION_TYPE_CONFIG -> "none",
>> ProducerConfig.TIMEOUT_CONFIG -> new Integer(3),
>> // ProducerConfig.BATCH_SIZE_CONFIG -> new Integer(16384),
>> ProducerConfig.BATCH_SIZE_CONFIG -> new Integer(10),
>> ProducerConfig.BUFFER_MEMORY_CONFIG -> new Integer(66554432),
>> ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG -> new java.lang.Boolean(false),
>> ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ->
>> "org.apache.kafka.common.serialization.StringSerializer",
>> ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->
>> "org.apache.kafka.common.serialization.StringSerializer"
>> 
>> 
>> 
>> 
>> kafkaProducer.send(new ProducerRecord[String,String](topic, key, data),new
>> Callback {
>> def onCompletion(recordMetadata: RecordMetadata, e: Exception):Unit = {
>>   if(e != null) {
>> logger.error(s"Could not send $data",e)
>>   }
>>   logger.info("The offset of the record we just sent is: " +
>> recordMetadata.offset())
>>   ()
>> }
>> 
>> })
>> 
>> 
>>> On 22 Nov 2015, at 20:49, Todd Palino  wrote:
>>> 
>>> Replicas and Isr are both a comma separated list of broker IDs. So in
>> this
>>> output, I am seeing that you have two Kafka brokers with IDs 1 and 2. You
>>> have a topic, capture, with 16 partitions at replication factor 1 (1
>>> replica per partition). The broker with ID 2 is not online, which is why
>> it
>>> shows in the Replica list for some partitions (meaning that it is
>> assigned
>>> to be a replica), but not in the Isr list (which would indicate that it
>> is
>>> currently in-sync).
>>> 
>>> The Leader field is the broker ID which is currently the leader for that
>>> partition. For the partitions that are assigned to broker 1, you see that
>>> broker 1 is the leader. For the partitions that are assigned to broker 2,
>>> the leader is listed as -1, which indicates that there is no available
>>> leader. These partitions are considered offline and cannot be produced to
>>> or consumed from. When broker 2 comes back online, the controller will
>>> perform an unclean leader election and select broker 2 (the only replica
>>> available) as the leader for those partitions.
>>> 
>>> -Todd
>>> 
>>> 
>>> On Sun, Nov 22, 2015 at 11:39 AM, Jan Algermissen <
>>> algermissen1...@icloud.com> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I have a topic with 16 partitions that shows the following output for
>>>> 
>>>> kafka-topics.sh --zookeeper x:21

Kafka Rest Proxy

2016-03-01 Thread Jan Omar
Hey guys, 

Is someone using the kafka rest proxy from confluent? 

We have an issue, that all messages for a certain topic end up in the same 
partition. Has anyone faced this issue before? We're not using a custom 
  partitioner class, so it's using the default partitioner. We're sending 
messages without a specific partition and without a key, like this: 
  

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data 
'{"records":[{"value":{"foo":"bar"}}]}' "http://x.x.x.x:8082/topics/testme"; 


and yet for some reason every message ends up in partition 11... 

It's a test topic with 30 partitions on 3 brokers and our rest (producer) 
config is very simple:

id=1
zookeeper.connect=zookeeper-four.acc:2181...etc

Any help would be appreciated.

Thanks!




Re: Consumers doesn't always poll first messages

2016-03-02 Thread Jan Omar
Hi Robin,

Why would you expect it to start from the first message?

You're comitting the read offsets automatically every second. The offset is 
persisted, next time you consume again, it will start at the persisted offset 
again.

 consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
 consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

Regards

Jan

> On 2 Mar 2016, at 15:14, Péricé Robin  wrote:
> 
> Hello everybody,
> 
> I'm testing the new 0.9.0.1 API and I try to make a basic example working.
> 
> *Java code* :
> 
> *Consumer *: http://pastebin.com/YtvW0sz5
> *Producer *: http://pastebin.com/anQay9YE
> *Test* : http://pastebin.com/nniYLsHL
> 
> 
> *Kafka configuration* :
> 
> *Zookeeper propertie*s : http://pastebin.com/KC5yZdNx
> *Kafka properties* : http://pastebin.com/Psy4uAYL
> 
> But when I try to run my test and restart Kafka to see what happen. The
> Consumer doesn't always consume first messages. Sometimes it consume
> messages at offset 0 or 574 or 1292 ... The behavior of the test seems to
> be very random.
> 
> Anybody have an idea on that issue ?
> 
> Best Regards,
> 
> Robin



Re: Kafka Streams

2016-03-12 Thread Jan Filipiak

Hi,

I am very exited about all of this in general. Sadly I haven’t had the 
time to really take a deep look. One thing that is/was always a 
difficult topic to resolve many to many relationships in table x table x 
table joins is the repartitioning that has to happen at some point.


From the documentation I saw this:

"The *keys* of data records determine the partitioning of data in both 
Kafka and Kafka Streams, i.e. how data is routed to specific partitions 
within topics."


This feels unnecessarily restrictive as i can't currently imagin how to 
resolve many to many relationships with this. One can also emmit every 
record to many partitions to make up for no read replicas in kafka 
aswell as partitioning schemes that don't work like this (Shards 
processing overlapping key spaces).


I would really love to hear your thoughts on these topics. Great work! 
Google grade technologies for everyone!

I <3 logs



On 10.03.2016 22:26, Jay Kreps wrote:

Hey all,

Lot's of people have probably seen the ongoing work on Kafka Streams
happening. There is no real way to design a system like this in a vacuum,
so we put up a blog, some snapshot docs, and something you can download and
use easily to get feedback:

http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple

We'd love comments or thoughts from anyone...

-Jay





Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-07 Thread Jan Filipiak

Hi Eno,

On 07.06.2017 22:49, Eno Thereska wrote:

Comments inline:


On 5 Jun 2017, at 18:19, Jan Filipiak  wrote:

Hi

just my few thoughts

On 05.06.2017 11:44, Eno Thereska wrote:

Hi there,

Sorry for the late reply, I was out this past week. Looks like good progress 
was made with the discussions either way. Let me recap a couple of points I saw 
into one big reply:

1. Jan mentioned CRC errors. I think this is a good point. As these happen in 
Kafka, before Kafka Streams gets a chance to inspect anything, I'd like to hear 
the opinion of more Kafka folks like Ismael or Jason on this one. Currently the 
documentation is not great with what to do once a CRC check has failed. From 
looking at the code, it looks like the client gets a KafkaException (bubbled up 
from the fetcher) and currently we in streams catch this as part of poll() and 
fail. It might be advantageous to treat CRC handling in a similar way to 
serialisation handling (e.g., have the option to fail/skip). Let's see what the 
other folks say. Worst-case we can do a separate KIP for that if it proved too 
hard to do in one go.

there is no reasonable way to "skip" a crc error. How can you know the length 
you read was anything reasonable? you might be completely lost inside your response.

On the client side, every record received is checked for validity. As it 
happens, if the CRC check fails the exception is wrapped with a KafkaException 
that is thrown all the way to poll(). Assuming we change that and poll() throws 
a CRC exception, I was thinking we could treat it similarly to a deserialize 
exception and pass it to the exception handler to decide what to do. Default 
would be to fail. This might need a Kafka KIP btw and can be done separately 
from this KIP, but Jan, would you find this useful?
I don't think so. IMO you can not reasonably continue parsing when the 
checksum of a message is not correct. If you are not sure you got the 
correct length, how can you be sure to find the next record? I would 
always straight fail in all cases. Its to hard for me to understand why 
one would try to continue. I mentioned CRC's because thats the only bad 
pills I ever saw so far. But I am happy that it just stopped and I could 
check what was going on. This will also be invasive in the client code then.


If you ask me, I am always going to vote for "grind to halt" let the 
developers see what happened and let them fix it. It helps building good 
kafka experiences and better software and architectures. For me this is: 
"force the user todo the right thing". 
https://youtu.be/aAb7hSCtvGw?t=374 eg. not letting unexpected input slip 
by.  Letting unexpected input slip by is what bought us 15+years of war 
of all sorts of ingestion attacks. I don't even dare to estimate how 
many missingrecords-search-teams going be formed, maybe some hackerone 
for stream apps :D


Best Jan




At a minimum, handling this type of exception will need to involve the 
exactly-once (EoS) logic. We'd still allow the option of failing or skipping, 
but EoS would need to clean up by rolling back all the side effects from the 
processing so far. Matthias, how does this sound?

Eos will not help the record might be 5,6 repartitions down into the topology. 
I haven't followed but I pray you made EoS optional! We don't need this and we 
don't want this and we will turn it off if it comes. So I wouldn't recommend 
relying on it. The option to turn it off is better than forcing it and still 
beeing unable to rollback badpills (as explained before)

Yeah as Matthias mentioned EoS is optional.

Thanks,
Eno



6. Will add an end-to-end example as Michael suggested.

Thanks
Eno




On 4 Jun 2017, at 02:35, Matthias J. Sax  wrote:

What I don't understand is this:


 From there on its the easiest way forward: fix, redeploy, start => done

If you have many producers that work fine and a new "bad" producer
starts up and writes bad data into your input topic, your Streams app
dies but all your producers, including the bad one, keep writing.

Thus, how would you fix this, as you cannot "remove" the corrupted date
from the topic? It might take some time to identify the root cause and
stop the bad producer. Up to this point you get good and bad data into
your Streams input topic. If Streams app in not able to skip over those
bad records, how would you get all the good data from the topic? Not
saying it's not possible, but it's extra work copying the data with a
new non-Streams consumer-producer-app into a new topic and than feed
your Streams app from this new topic -- you also need to update all your
upstream producers to write to the new topic.

Thus, if you want to fail fast, you can still do this. And after you
detected and fixed the bad producer you might just reconfigure your app
to skip bad records until it reaches the good part of the data.
Afterwards, you 

Re: Reliably implementing global KeyValueStore#get

2017-06-07 Thread Jan Filipiak

Hi,

have you thought about using connect to put data into a store that is 
more reasonable for your kind of query requirements?


Best Jan

On 07.06.2017 00:29, Steven Schlansker wrote:

On Jun 6, 2017, at 2:52 PM, Damian Guy  wrote:

Steven,

In practice, data shouldn't be migrating that often. If it is then you
probably have bigger problems.

Understood and agreed, but when designing distributed systems, it usually
helps to model for the worst case rather than the "well that should never
happen" case, lest you find yourself fixing those bugs at 3am instead :)

I'd like to be able to induce extreme pain at the Kafka layer (change leader
every 3 seconds and migrate all partitions around randomly) and still have
my app behave correctly.


You should be able to use the metadata api
to find the instance the key should be on and then when you check that node
you can also check with the metadata api that the key should still be on
this host. If streams is rebalancing while you query an exception will be
raised and you'll need to retry the request once the rebalance has
completed.

Agreed here as well.  But let's assume I have a very fast replication
setup (assume it takes zero time, for the sake of argument) -- I'm fairly
sure there's still a race here as this exception only fires *during a migration*
not *after a migration that may have invalidated your metadata lookup completes*


HTH,
Damian

On Tue, 6 Jun 2017 at 18:11 Steven Schlansker 
wrote:


On Jun 6, 2017, at 6:16 AM, Eno Thereska  wrote:

Hi Steven,

Do you know beforehand if a key exists? If you know that and are getting

null() the code will have to retry by refreshing the metadata and going to
the new instance. If you don’t know beforehand if a key exists or not you
might have to check all instances of a store to make sure.
No, I am not presupposing that the key can exist -- this is a user visible
API and will
be prone to "accidents" :)

Thanks for the insight.  I worry that even checking all stores is not
truly sufficient,
as querying different all workers at different times in the presence of
migrating data
can still in theory miss it given pessimal execution.

I'm sure I've long wandered off into the hypothetical, but I dream of some
day being
cool like Jepsen :)


Eno



On Jun 5, 2017, at 10:12 PM, Steven Schlansker <

sschlans...@opentable.com> wrote:

Hi everyone, me again :)

I'm still trying to implement my "remoting" layer that allows
my clients to see the partitioned Kafka Streams state
regardless of which instance they hit.  Roughly, my lookup is:

Message get(Key key) {
  RemoteInstance instance = selectPartition(key);
  return instance.get(key); // http remoting
}

RemoteInstance.get(Key key) { // http endpoint
  return readOnlyKeyValueStore.get(key);
}

However, the mapping of partitions to instances may change.
If you call KeyValueStore.get(K) where K is on a partition you
don't own, it returns null.  This is indistinguishable from a
successful get on a key that doesn't exist.

If one instance selects a sibling instance right as the partition is

failing

off of that instance, it may get routed there and by the time it gets
the request no longer "owns" the partition -- returns a false 'null'.

You can try re-checking after you get a null value, but that's

susceptible

to the same race -- it's unlikely but possible that the data migrates

*back*

before you do this re-check.

Is there any way to correctly implement this without races?  I'd imagine
you need a new primitive like KeyValueStore#get that atomically finds
the key or throws an exception if it is not in an owned partition
at the time of lookup so you know to recheck the partition and retry.

Thoughts?

Thanks again,
Steven







Re: Reliably implementing global KeyValueStore#get

2017-06-07 Thread Jan Filipiak

Depends, embedded postgress puts you into the same spot.

But if you use your state store change log to materialize into a 
postgress; that might work out decently.
Current JDBC doesn't support delete which is an issue but writing a 
custom sink is not to hard.


Best Jan


On 07.06.2017 23:47, Steven Schlansker wrote:

I was actually considering writing my own KeyValueStore backed
by e.g. a Postgres or the like.

Is there some feature Connect gains me that would make it better
than such an approach?

thanks


On Jun 7, 2017, at 2:20 PM, Jan Filipiak  wrote:

Hi,

have you thought about using connect to put data into a store that is more 
reasonable for your kind of query requirements?

Best Jan

On 07.06.2017 00:29, Steven Schlansker wrote:

On Jun 6, 2017, at 2:52 PM, Damian Guy  wrote:

Steven,

In practice, data shouldn't be migrating that often. If it is then you
probably have bigger problems.

Understood and agreed, but when designing distributed systems, it usually
helps to model for the worst case rather than the "well that should never
happen" case, lest you find yourself fixing those bugs at 3am instead :)

I'd like to be able to induce extreme pain at the Kafka layer (change leader
every 3 seconds and migrate all partitions around randomly) and still have
my app behave correctly.


You should be able to use the metadata api
to find the instance the key should be on and then when you check that node
you can also check with the metadata api that the key should still be on
this host. If streams is rebalancing while you query an exception will be
raised and you'll need to retry the request once the rebalance has
completed.

Agreed here as well.  But let's assume I have a very fast replication
setup (assume it takes zero time, for the sake of argument) -- I'm fairly
sure there's still a race here as this exception only fires *during a migration*
not *after a migration that may have invalidated your metadata lookup completes*


HTH,
Damian

On Tue, 6 Jun 2017 at 18:11 Steven Schlansker 
wrote:


On Jun 6, 2017, at 6:16 AM, Eno Thereska  wrote:

Hi Steven,

Do you know beforehand if a key exists? If you know that and are getting

null() the code will have to retry by refreshing the metadata and going to
the new instance. If you don’t know beforehand if a key exists or not you
might have to check all instances of a store to make sure.
No, I am not presupposing that the key can exist -- this is a user visible
API and will
be prone to "accidents" :)

Thanks for the insight.  I worry that even checking all stores is not
truly sufficient,
as querying different all workers at different times in the presence of
migrating data
can still in theory miss it given pessimal execution.

I'm sure I've long wandered off into the hypothetical, but I dream of some
day being
cool like Jepsen :)


Eno



On Jun 5, 2017, at 10:12 PM, Steven Schlansker <

sschlans...@opentable.com> wrote:

Hi everyone, me again :)

I'm still trying to implement my "remoting" layer that allows
my clients to see the partitioned Kafka Streams state
regardless of which instance they hit.  Roughly, my lookup is:

Message get(Key key) {
  RemoteInstance instance = selectPartition(key);
  return instance.get(key); // http remoting
}

RemoteInstance.get(Key key) { // http endpoint
  return readOnlyKeyValueStore.get(key);
}

However, the mapping of partitions to instances may change.
If you call KeyValueStore.get(K) where K is on a partition you
don't own, it returns null.  This is indistinguishable from a
successful get on a key that doesn't exist.

If one instance selects a sibling instance right as the partition is

failing

off of that instance, it may get routed there and by the time it gets
the request no longer "owns" the partition -- returns a false 'null'.

You can try re-checking after you get a null value, but that's

susceptible

to the same race -- it's unlikely but possible that the data migrates

*back*

before you do this re-check.

Is there any way to correctly implement this without races?  I'd imagine
you need a new primitive like KeyValueStore#get that atomically finds
the key or throws an exception if it is not in an owned partition
at the time of lookup so you know to recheck the partition and retry.

Thoughts?

Thanks again,
Steven





Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-22 Thread Jan Filipiak

Hi Eno,

I am less interested in the user facing interface but more in the actual 
implementation. Any hints where I can follow the discussion on this? As 
I still want to discuss upstreaming of KAFKA-3705 with someone


Best Jan


On 21.06.2017 17:24, Eno Thereska wrote:

(cc’ing user-list too)

Given that we already have StateStoreSuppliers that are configurable using the 
fluent-like API, probably it’s worth discussing the other examples with joins 
and serdes first since those have many overloads and are in need of some TLC.

So following your example, I guess you’d have something like:
.join()
.withKeySerdes(…)
.withValueSerdes(…)
.withJoinType(“outer”)

etc?

I like the approach since it still remains declarative and it’d reduce the 
number of overloads by quite a bit.

Eno


On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:

Hi,

I'd like to get a discussion going around some of the API choices we've
made in the DLS. In particular those that relate to stateful operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the API, i.e, there
are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
feel it is only going to get worse as we add more optional params. In
particular we've had some requests to be able to turn caching off, or
change log configs,  on a per operator basis (note this can be done now if
you pass in a StateStoreSupplier, but this can be a bit cumbersome).

So this is a bit of an open question. How can we change the DSL overloads
so that it flows, is simple to use and understand, and is easily extended
in the future?

One option would be to use a fluent API approach for providing the optional
params, so something like this:

groupedStream.count()
   .withStoreName("name")
   .withCachingEnabled(false)
   .withLoggingEnabled(config)
   .table()



Another option would be to provide a Builder to the count method, so it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").withCachingEnabled(false).build())

Another option is to say: Hey we don't need this, what are you on about!

The above has focussed on state store related overloads, but the same ideas
could  be applied to joins etc, where we presently have many join methods
and many overloads.

Anyway, i look forward to hearing your opinions.

Thanks,
Damian




Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-24 Thread Jan Filipiak

I am with Gouzhang here.

I think all the suggestions are far to short-sighted. Especially this 
wired materialize(String) call is broken totally and people go nuts 
about how this will look. + Implementing more and better joins, not this 
wired one we got currently. Implementing an one to many join I couln't 
get away without 3 highly complex value mappers


   final ValueMapper 
keyExtractor,
   final ValueMapper 
joinPrefixFaker,
   final ValueMapper 
leftKeyExtractor,


in addition to the one joiner of course

   final ValueJoiner joiner,

how to specify if its outer or inner is for sure the smallest problem we 
are going to face with proper join semantics. What the resulting Key 
will be is is also highly discussable. What happens to the key is very 
complex and the API has to tell the user.


Bringing this discussion into a good direction, we would need sample 
interfaces we could mock against ( as gouzhang suggested) + We need to 
know how the implementation (of joins especially) will be later. As I 
strongly recommend stopping the usage of ChangeSerde and have "properly" 
repartitioned topic. That is just sane IMO


Best Jan




On 22.06.2017 11:54, Eno Thereska wrote:

Note that while I agree with the initial proposal (withKeySerdes, withJoinType, 
etc), I don't agree with things like .materialize(), .enableCaching(), 
.enableLogging().

The former maintain the declarative DSL, while the later break the declarative 
part by mixing system decisions in the DSL.  I think there is a difference 
between the two proposals.

Eno


On 22 Jun 2017, at 03:46, Guozhang Wang  wrote:

I have been thinking about reducing all these overloaded functions for
stateful operations (there are some other places that introduces overloaded
functions but let's focus on these only in this discussion), what I used to
have is to use some "materialize" function on the KTables, like:

---

// specifying the topology

KStream stream1 = builder.stream();
KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
sessionMerger, sessionWindows);  // do not allow to pass-in a state store
supplier here any more

// additional specs along with the topology above

table1.materialize("queryableStoreName"); // or..
table1.materialize("queryableStoreName").enableCaching().enableLogging();
// or..
table1.materialize(stateStoreSupplier); // add the metrics / logging /
caching / windowing functionalities on top of the store, or..
table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); //
etc..

---

But thinking about it more, I feel Damian's first proposal is better since
my proposal would likely to break the concatenation (e.g. we may not be
able to do sth. like "table1.filter().map().groupBy().aggregate()" if we
want to use different specs for the intermediate filtered KTable).


But since this is a incompatibility change, and we are going to remove the
compatibility annotations soon it means we only have one chance and we
really have to make it right. So I'd call out for anyone try to rewrite
your examples / demo code with the proposed new API and see if it feel
natural, for example, if I want to use a different storage engine than the
default rockDB engine how could I easily specify that with the proposed
APIs?

Meanwhile Damian could you provide a formal set of APIs for people to
exercise on them? Also could you briefly describe how custom storage
engines could be swapped in with the above APIs?



Guozhang


On Wed, Jun 21, 2017 at 9:08 AM, Eno Thereska 
wrote:


To make it clear, it’s outlined by Damian, I just copy pasted what he told
me in person :)

Eno


On Jun 21, 2017, at 4:40 PM, Bill Bejeck  wrote:

+1 for the approach outlined above by Eno.

On Wed, Jun 21, 2017 at 11:28 AM, Damian Guy 

wrote:

Thanks Eno.

Yes i agree. We could apply this same approach to most of the operations
where we have multiple overloads, i.e., we have a single method for each
operation that takes the required parameters and everything else is
specified as you have done above.

On Wed, 21 Jun 2017 at 16:24 Eno Thereska 

wrote:

(cc’ing user-list too)

Given that we already have StateStoreSuppliers that are configurable

using

the fluent-like API, probably it’s worth discussing the other examples

with

joins and serdes first since those have many overloads and are in need

of

some TLC.

So following your example, I guess you’d have something like:
.join()
  .withKeySerdes(…)
  .withValueSerdes(…)
  .withJoinType(“outer”)

etc?

I like the approach since it still remains declarative and it’d reduce

the

number of overloads by quite a bit.

Eno


On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:

Hi,

I'd like to get a discus

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-04 Thread Jan Filipiak

Hi Damian,

I do see your point of something needs to change. But I fully agree with 
Gouzhang when he says.

---

But since this is a incompatibility change, and we are going to remove the
compatibility annotations soon it means we only have one chance and we
really have to make it right.


I fear all suggestions do not go far enough to become something that will carry 
on for very much longer.
I am currently working on KAFKA-3705 and try to find the most easy way for the 
user to give me all the required functionality. The easiest interface I could 
come up so far can be looked at here.

https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L622

And its already horribly complicated. I am currently unable to find the right 
abstraction level to have everything falling into place naturally. To be honest 
I already think introducing

https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L493

was unideal and makes everything a mess. The JoinType:Whatever is also not 
really flexible. 2 things come to my mind:

1. I don't think we should rule out config based decisions say configs like
streams.$applicationID.joins.$joinname.conf = value
This can allow for tremendous changes without single API change and IMO it was 
not considered enough yet.

2. Push logic from the DSL to the Callback classes. A ValueJoiner for example 
can be used to implement different join types as the user wishes.

As Gouzhang said: stopping to break users is very important. especially with 
this changes + All the plans I sadly only have in my head but hopefully the 
first link can give a glimpse.

Thanks for preparing the examples made it way clearer to me what exactly we are 
talking about. I would argue to go a bit slower and more carefull on this one. 
At some point we need to get it right. Peeking over to the hadoop guys with 
their hughe userbase. Config files really work well for them.

Best Jan





On 30.06.2017 09:31, Damian Guy wrote:

Thanks Matthias

On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax  wrote:


I am just catching up on this thread, so sorry for the long email in
advance... Also, it's to some extend a dump of thoughts and not always a
clear proposal. Still need to think about this in more detail. But maybe
it helps other to get new ideas :)



However, I don't understand your argument about putting aggregate()
after the withXX() -- all the calls to withXX() set optional parameters
for aggregate() and not for groupBy() -- but a groupBy().withXX()
indicates that the withXX() belongs to the groupBy(). IMHO, this might
be quite confusion for developers.



I see what you are saying, but the grouped stream is effectively a no-op
until you call one of the aggregate/count/reduce etc functions. So the
optional params are ones that are applicable to any of the operations you
can perform on this grouped stream. Then the final
count()/reduce()/aggregate() call has any of the params that are
required/specific to that function.


I understand your argument, but you don't share the conclusion. If we
need a "final/terminal" call, the better way might be

.groupBy().count().withXX().build()

(with a better name for build() though)



The point is that all the other calls, i.e,withBlah, windowed, etc apply
too all the aggregate functions. The terminal call being the actual type of
aggregation you want to do. I personally find this more natural than
groupBy().count().withBlah().build()



groupedStream.count(/** non windowed count**/)
groupedStream.windowed(TimeWindows.of(10L)).count(...)
groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)


I like this. However, I don't see a reason to have windowed() and
sessionWindowed(). We should have one top-level `Windows` interface that
both `TimeWindows` and `SessionWindows` implement and just have a single
windowed() method that accepts all `Windows`. (I did not like the
separation of `SessionWindows` in the first place, and this seems to be
an opportunity to clean this up. It was hard to change when we
introduced session windows)


Yes - true we should look into that.



Btw: we do you the imperative groupBy() and groupByKey(), and thus we
might also want to use windowBy() (instead of windowed()). Not sure how
important this is, but it seems to be inconsistent otherwise.



Makes sense



About joins:  I don't like .withJoinType(JoinType.LEFT) at all. I think,
defining an inner/left/outer join is not an optional argument but a
first class concept and should have a proper representation in the API
(like the current methods join(), leftJoin, outerJoin()).



Yep, i did originally have it as a required param and maybe that is what we
go with. It could have a default, but maybe that is confusing.




About the tw

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-07 Thread Jan Filipiak

It makes me want to cry.

why on earth is the DSL going to expose all its implementation details now?
especially being materialized or not.

If we want to take usefull steps in that direction maybe we are looking 
for a way to let the user switch back and forth between PAPI and DSL?


A change as the proposed would not eliminate any of my pain points while 
still being a heck of work migrating towards to.


Since I am only following this from the point where Eno CC'ed it into 
the users list:


Can someone please rephrase for me what problem this is trying to solve? 
I don't mean to be rude but It uses a problematic feature
"StateStoreSuppliers in DSL" to justify making it even worse. This helps 
us nowhere in making the configs more flexible, its just syntactic sugar.


A low effort shoot like: lets add a properties to operations that would 
otherwise become overloaded to heavy? Or pull the configs by some naming 
schema
form the overall properties. Additionally to that we get rid of 
StateStoreSuppliers in the DSL and have them also configured by said 
properties.


=> way easier to migrate to, way less risk, way more flexible in the 
future (different implementations of the same operation don't require 
code change to configure)


Line 184 makes especially no sense to me. what is a KTableKTable non 
materialized join anyways?


Hope we can discuss more on this.



On 07.07.2017 17:23, Guozhang Wang wrote:

I messed the indentation on github code repos; this would be easier to read:

https://codeshare.io/GLWW8K


Guozhang


On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang  wrote:


Hi Damian / Kyle,

I think I agree with you guys about the pros / cons of using the builder
pattern v.s. using some "secondary classes". And I'm thinking if we can
take a "mid" manner between these two. I spent some time with a slight
different approach from Damian's current proposal:

https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/
java/org/apache/kafka/streams/RefactoredAPIs.java

The key idea is to tolerate the final "table()" or "stream()" function to
"upgrade" from the secondary classes to the first citizen classes, while
having all the specs inside this function. Also this proposal includes some
other refactoring that people have been discussed about for the builder to
reduce the overloaded functions as well. WDYT?


Guozhang


On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy  wrote:


Hi Jan,

Thanks very much for the input.

On Tue, 4 Jul 2017 at 08:54 Jan Filipiak 
wrote:


Hi Damian,

I do see your point of something needs to change. But I fully agree with
Gouzhang when he says.
---

But since this is a incompatibility change, and we are going to remove

the

compatibility annotations soon it means we only have one chance and we
really have to make it right.




I think we all agree on this one! Hence the discussion.



I fear all suggestions do not go far enough to become something that

will

carry on for very much longer.
I am currently working on KAFKA-3705 and try to find the most easy way

for

the user to give me all the required functionality. The easiest

interface I

could come up so far can be looked at here.


https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2

de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
kafka/streams/kstream/internals/KTableImpl.java#L622



And its already horribly complicated. I am currently unable to find the

right abstraction level to have everything falling into place

naturally. To

be honest I already think introducing



To be fair that is not a particularly easy problem to solve!



https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2

de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
kafka/streams/kstream/internals/KTableImpl.java#L493

was unideal and makes everything a mess.


I'm not sure i agree that it makes everything a mess, but It could have
been done differently.

The JoinType:Whatever is also not really flexible. 2 things come to my
mind:

1. I don't think we should rule out config based decisions say configs

like

 streams.$applicationID.joins.$joinname.conf = value


Is this just for config? Or are you suggesting that we could somehow
"code"
the join in a config file?



This can allow for tremendous changes without single API change and IMO

it

was not considered enough yet.

2. Push logic from the DSL to the Callback classes. A ValueJoiner for
example can be used to implement different join types as the user

wishes.
Do you have an example of how this might look?



As Gouzhang said: stopping to break users is very important.


Of course. We want to make it as easy as possible for people to use
streams.


especially with this changes + All the plans I sadly only have in my head

but hopefully the first link can give a glimpse.

Thanks for preparing the examples made it way clearer to me 

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-08 Thread Jan Filipiak

Hi Matthias thanks,

Exactly what I was guessing.

I don't understand why custom stores in DSL? and I don't understand why 
we are not concidering a more generic config based appraoch?


StateStores in DSL => what I really think we are looking for PAPA => DSL 
=> PAPI  back and forth switcharoo capabilities.


Looking at the most overloaded that I can currently find "through()" 2 
of them come from the broken idea of "the user provides a name for the 
statestore for IQ" and custom statestores.
From the beginning I said that's madness. That is the real disease we 
need to fix IMHO. To be honest I also don't understand why through with 
statestore is particularly usefull, second Unique Key maybe?


also providing Serdes by config is neat. wouldn't even need to go into 
the code then would also save a ton. (We have the defaults one in conf 
why not override the specific ones?)


Does this makes sense to people? what pieces should i outline with code 
(time is currently sparse :( but I can pull of some smaller examples i 
guess)


Best Jan





On 08.07.2017 01:23, Matthias J. Sax wrote:

It's too issues we want to tackle

  - too many overload (for some method we have already more than 10(
  - improve custom store API

-Matthias


On 7/7/17 3:42 PM, Jan Filipiak wrote:

It makes me want to cry.

why on earth is the DSL going to expose all its implementation details now?
especially being materialized or not.

If we want to take usefull steps in that direction maybe we are looking
for a way to let the user switch back and forth between PAPI and DSL?

A change as the proposed would not eliminate any of my pain points while
still being a heck of work migrating towards to.

Since I am only following this from the point where Eno CC'ed it into
the users list:

Can someone please rephrase for me what problem this is trying to solve?
I don't mean to be rude but It uses a problematic feature
"StateStoreSuppliers in DSL" to justify making it even worse. This helps
us nowhere in making the configs more flexible, its just syntactic sugar.

A low effort shoot like: lets add a properties to operations that would
otherwise become overloaded to heavy? Or pull the configs by some naming
schema
form the overall properties. Additionally to that we get rid of
StateStoreSuppliers in the DSL and have them also configured by said
properties.

=> way easier to migrate to, way less risk, way more flexible in the
future (different implementations of the same operation don't require
code change to configure)

Line 184 makes especially no sense to me. what is a KTableKTable non
materialized join anyways?

Hope we can discuss more on this.



On 07.07.2017 17:23, Guozhang Wang wrote:

I messed the indentation on github code repos; this would be easier to
read:

https://codeshare.io/GLWW8K


Guozhang


On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang  wrote:


Hi Damian / Kyle,

I think I agree with you guys about the pros / cons of using the builder
pattern v.s. using some "secondary classes". And I'm thinking if we can
take a "mid" manner between these two. I spent some time with a slight
different approach from Damian's current proposal:

https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/

java/org/apache/kafka/streams/RefactoredAPIs.java

The key idea is to tolerate the final "table()" or "stream()"
function to
"upgrade" from the secondary classes to the first citizen classes, while
having all the specs inside this function. Also this proposal
includes some
other refactoring that people have been discussed about for the
builder to
reduce the overloaded functions as well. WDYT?


Guozhang


On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy  wrote:


Hi Jan,

Thanks very much for the input.

On Tue, 4 Jul 2017 at 08:54 Jan Filipiak 
wrote:


Hi Damian,

I do see your point of something needs to change. But I fully agree
with
Gouzhang when he says.
---

But since this is a incompatibility change, and we are going to remove

the

compatibility annotations soon it means we only have one chance and we
really have to make it right.




I think we all agree on this one! Hence the discussion.



I fear all suggestions do not go far enough to become something that

will

carry on for very much longer.
I am currently working on KAFKA-3705 and try to find the most easy way

for

the user to give me all the required functionality. The easiest

interface I

could come up so far can be looked at here.


https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2

de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
kafka/streams/kstream/internals/KTableImpl.java#L622
And its already horribly complicated. I am currently unable to find the

right abstraction level to have everything falling into place

naturally. To

be honest I already think introducing



To be fair that is not a p

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-09 Thread Jan Filipiak
 is inline with my comment above.



also providing Serdes by config is neat. wouldn't even need to go into
the code then would also save a ton. (We have the defaults one in conf
why not override the specific ones?)

I am not sure, if Serdes are really a config? I mean, the data types are
hard coded into the code, so it does make sense to specify the Serdes
accordingly. I am also not sure how we would map Serdes from the config
to the corresponding operator?
true! maybe not an ideal case where configs help with overloading. I 
guess people are either using the global untyped one or a typed one for 
all steps.
So statestore is probably a better case. Its going to be referenced by a 
name always anyways so one could use this name to provide additional 
configs to the Statestore.

Probably also defining a factory used to build it.

Similarly a join has some sort of name, currently its 3 names, wich 
would need unifying to some degree, but then also the joins could be 
addressed with configs.
But Joins don't seem to have the to heaver overloading problem (Only 
store related :D).  But to be honest I can't judge the usefulness of 
outer and left. Not a pattern
that I came across yet for us its always inner. Maybe materialized but 
not sending old values is that what it does? Sorry can't wrap my head 
round that just now

heading towards 3am.

The example I provided was

streams.$applicationid.stores.$storename.inmemory = false
streams.$applicationid.stores.$storename.cachesize = 40k

for the configs. The Query Handle thing make sense hopefully.

Best Jan



-Matthias


On 7/8/17 2:23 AM, Jan Filipiak wrote:

Hi Matthias thanks,

Exactly what I was guessing.

I don't understand why custom stores in DSL? and I don't understand why
we are not concidering a more generic config based appraoch?

StateStores in DSL => what I really think we are looking for PAPA => DSL
=> PAPI  back and forth switcharoo capabilities.

Looking at the most overloaded that I can currently find "through()" 2
of them come from the broken idea of "the user provides a name for the
statestore for IQ" and custom statestores.
 From the beginning I said that's madness. That is the real disease we
need to fix IMHO. To be honest I also don't understand why through with
statestore is particularly usefull, second Unique Key maybe?

also providing Serdes by config is neat. wouldn't even need to go into
the code then would also save a ton. (We have the defaults one in conf
why not override the specific ones?)

Does this makes sense to people? what pieces should i outline with code
(time is currently sparse :( but I can pull of some smaller examples i
guess)

Best Jan





On 08.07.2017 01:23, Matthias J. Sax wrote:

It's too issues we want to tackle

   - too many overload (for some method we have already more than 10(
   - improve custom store API

-Matthias


On 7/7/17 3:42 PM, Jan Filipiak wrote:

It makes me want to cry.

why on earth is the DSL going to expose all its implementation
details now?
especially being materialized or not.

If we want to take usefull steps in that direction maybe we are looking
for a way to let the user switch back and forth between PAPI and DSL?

A change as the proposed would not eliminate any of my pain points while
still being a heck of work migrating towards to.

Since I am only following this from the point where Eno CC'ed it into
the users list:

Can someone please rephrase for me what problem this is trying to solve?
I don't mean to be rude but It uses a problematic feature
"StateStoreSuppliers in DSL" to justify making it even worse. This helps
us nowhere in making the configs more flexible, its just syntactic
sugar.

A low effort shoot like: lets add a properties to operations that would
otherwise become overloaded to heavy? Or pull the configs by some naming
schema
form the overall properties. Additionally to that we get rid of
StateStoreSuppliers in the DSL and have them also configured by said
properties.

=> way easier to migrate to, way less risk, way more flexible in the
future (different implementations of the same operation don't require
code change to configure)

Line 184 makes especially no sense to me. what is a KTableKTable non
materialized join anyways?

Hope we can discuss more on this.



On 07.07.2017 17:23, Guozhang Wang wrote:

I messed the indentation on github code repos; this would be easier to
read:

https://codeshare.io/GLWW8K


Guozhang


On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang 
wrote:


Hi Damian / Kyle,

I think I agree with you guys about the pros / cons of using the
builder
pattern v.s. using some "secondary classes". And I'm thinking if we
can
take a "mid" manner between these two. I spent some time with a slight
different approach from Damian's current proposal:

https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/

Re: Where to run kafka-consumer-groups.sh from?

2017-07-11 Thread Jan Filipiak

Hi,

very likely due to timing. What problem is it causing you exactly that 
you want to work around?

These differences shouldn't concern you to much I guess.

We use the tool across continents and don't worry about it to much. 
Offset Commit interval makes everything blury anyways. If you can 
specify your pain more precisely maybe we can work around it.


Best Jan

On 10.07.2017 10:31, Dmitriy Vsekhvalnov wrote:

Guys, let me up this one again. Still looking for comments about
kafka-consumer-groups.sh
tool.

Thank you.

On Fri, Jul 7, 2017 at 3:14 PM, Dmitriy Vsekhvalnov 
wrote:


I've tried 3 brokers on command line, like that:

/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server
broker:9092,broker_2:9092,broker_3:9092 --new-consumer --group
logging-svc --describe

it doesn't make any difference, still x10 times difference in figures when
running on broker host vs. remote

Here is snippet from console output (are you looking something specific in
it? it looks normal a far as i can say):


TOPICPARTITION  CURRENT-OFFSET  LOG-END-OFFSETLAG
 CONSUMER-ID

test.topic  54 4304430935
  consumer-26-21f5050c-a43c-4254-bfcf-42e17dbdb651

test.topic  40 4426443610
 consumer-21-24f3ebca-004f-4aac-a348-638c9c6a02f0

test.topic  59 4414442063
  consumer-27-ed34f1b3-1be9-422b-bb07-e3c9913195c7

test.topic  42 4389440376
 consumer-22-75c2fc0a-5d5c-472d-b27e-e873030f82b6

test.topic  27 4416442224
  consumer-18-3be20568-8dd3-4679-a008-0ca64d31083c




On Fri, Jul 7, 2017 at 2:52 PM, M. Manna  wrote:


kafka-consumer-groups.sh --bootstrap-server broker:9092 --new-consumer
--group service-group --describe

how many brokers do you have in the cluster? if you have more than one,
list them all using a comma csv with --bootstrap-server.

Also, could you paste some results from the console printout?

On 7 July 2017 at 12:47, Dmitriy Vsekhvalnov 
wrote:


Hi all,

question about lag checking. We've tried to periodically sample consumer
lag with:

kafka-consumer-groups.sh --bootstrap-server broker:9092 --new-consumer
--group service-group --describe

it's all fine, but depending on host  we run it from it gives different
results.

E.g:

   - when running from one of the broker hosts itself we getting close

to 0

figures.

   - when running from remote host, we getting 30-60 in average (i

suspect

there are multiple remote calls to broker involved, so difference due to
timing).


My question is what is correct way to use it? From broker host itself?







Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-18 Thread Jan Filipiak

Hi,

Sorry for the delay, couldn't get to answer more early. I do understand 
your point perfectly.
I just have a different perspective on what is going on. The most 
crucial piece of abstraction, the KTable is falling apart

and that materializes (no pun intended) itself into many problems.

1. To many overloads:
Currently, whenever a KTable is the result of an operation it gets and 
override with stateStoreName, and StatestoreSupplier in case people want 
to query that.
This is what produces 2/3rd of the overloaded methods right now (not 
counting methods returning KStream)


2. Code copy and pasting.
Almost all KTableProcessorSuppliers have the same block of (if(name != 
null) store.put(k,v))


3. Runtime inefficiencies.
Each querable table almost instantly causes a another store beeing 
required. Storing equivalent data of upstream KTables.


So I really see us tackeling only the first part currently. Wich in my 
opinion is to short-sighted to settle on an Public API.
This is why I want to tackle our approach to IQ-first, as it seems to me 
to be the most disruptive thing. And the cause of most problems.


The Plan:

Table from topic, kstream (don't even like this one, but probaly needed 
for some kind of enhanced flexibility) or aggregations would be the only 
KTables that would get associated with a statestore (their processors).
For these operations one can have the "statestoresupplier" overload but 
also not the "querablestatestore" overload. From this point on KTables 
abstraction would be considered restored.
All the overloads of join and through with respect to IQ would go away. 
"through" would go completely maybe the benefit added is. The method I 
would add is for a table to get a Queryhandle.
This query handle will underneath remember its tables processor name. To 
access the data form IQ we would not rely on the "per processor 
statestore" but go the usual path through ValueGetterSupplier.
*Note:* We do not necessarily have a Serde for V, especially after 
mapValues. also not for any intermediate Data types. It would be each 
KTableProccesors job to provide a serialized version of upstream Datatypes.
KTableKTabkeJoinwould need to bring a JoinInputSerializer that 
would serialize both upstream values for transport across boxes.


This first step would kill all the "Storename" based overloads + many 
Statestore overloads. It would also avoid the bloated copy pasting in 
each KTableProcessor for maintaining the store.
It would also make the runtime more efficient in a way that it does not 
store the same data twice, just for accessing from IQ. Tackeling problem 
1 but also all other three problems mentioned above.


From here ~3 or 4 (from kstream,topic or aggregate) methods would still 
be stuck with StateStoresupplier overload. For me, this is quite an 
improvement already, to reduce further overloads I am thinking
to put a nullable properties to this operations. If people want to use 
all defaults they could throw in null and it wouldn't be to painfull. 
That doesn't necessarily require
them to have config files laying around. They could if they wanted use 
property files to create such properties + we would over to look for 
configs in the streams property.
So the complexity of distributing property files is optional and the 
user might choose to fill the configs by code or files.


I think these steps can rescue the proper abstraction of a KTable. I 
believe that with the current proposals we are only sugarcoating problem 
1 and end up with a broken idea of what KTable is.
I think it will be even harder to develop further from there. Interface 
wise my proposal is like developing backwards as i am very certain we 
did a wrong turn with the IQ we shouldn't try to carry through.


I hope I could explain how this re factoring can tackle  the 3 above 
problems and especially why i don't think we can win tackiling only 
point 1 in the long run.
If anything would need an implementation draft please feel free to ask 
me to provide one. Initially the proposal hopefully would get the job 
done of just removing clutter.


Looking forward to your comments.

Best Jan



On 12.07.2017 21:27, Guozhang Wang wrote:

Hello Jan,

Thanks for your feedbacks. Let me try to clarify a few things with the 
problems that we are trying to resolve and the motivations with the 
current proposals.


As Matthias mentioned, one issue that we are trying to tackle is to 
reduce the number of overloaded functions in the DSL due to serde 
overridden / state store supplier overridden that are needed for 
repartition, or for state store materializations. Another related 
issue is that the current overridden state store supplier is not very 
natural to use, for example:


1) If a user just want to disable caching / logging etc but do not 
want to change the underlying store engine at all, she needs to learn 
to know that, for example, if a wi

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-24 Thread Jan Filipiak

Hi Damian,

thanks for taking the time. I think you read my points individually but 
you seem to not understand the bigger picture I am trying to paint.


From the three problems I mentioned - and that you agreed on to be 
problems -  you are only trying to address the first.


What I am trying to tell you is that if you focus on the later two the 
first one comes for free. On the other hand if you focus on the first
and please allow me to call it the easy part. All you going to archive 
is to break user land and sugar coat the real problems.


This takes away overloads, but still leaves it a mess to implement new 
features. I am currently trying to prep a patch for Kafka-3705 and
I do not understand why I should deal with Interactive Queries what so 
ever. My Output table has a proper ValueGetterSupplier.

That should be it!

I hope I made clear that to improve here quite some hard work has been 
done and that it would be rewariding and that just sugar coating everything

is one of the worst steps we could take from where we are at the moment.

Looking at Kafka-5581 that you mentioned. None of the points are really 
related to what I am saying really. Each of these points is tricky and

requires some carefull thinking but might work out.

Further Looking at you comment that refers to KIP vs. DISCUSS. I don't 
know what I should understand from that.


Regarding your comment mentioning that getQueryHandle() wouldn't work. 
Its the same thing as giving the user a queryable string.
It works the same way with the only difference that we have a wrapper 
object that gives the user what he wants instantly! Instead of giving 
him a String
to get a Store, we just give him a store, plus we don't hand out some 
inflexible native types that we later on don't have control over.

The whole logic about partitioners and what else does not change.

Hope this makes my points more clear.

Best Jan


On 19.07.2017 12:03, Damian Guy wrote:

Hi Jan,

Thanks for your input. Comments inline

On Tue, 18 Jul 2017 at 15:21 Jan Filipiak  wrote:


Hi,


1. To many overloads:
Currently, whenever a KTable is the result of an operation it gets and
override with stateStoreName, and StatestoreSupplier in case people want
to query that.
This is what produces 2/3rd of the overloaded methods right now (not
counting methods returning KStream)



As you state further down we are trying to address this.



2. Code copy and pasting.
Almost all KTableProcessorSuppliers have the same block of (if(name !=
null) store.put(k,v))



Yes, i agree. That is related to the KTable queryable store etc, and can
easily be addressed, but isn't necessarily part of this as it doesn't need
to be a public interface change, i.e., we can clean that up in the
background.



3. Runtime inefficiencies.
Each querable table almost instantly causes a another store beeing
required. Storing equivalent data of upstream KTables.


Agreed. Again, this is not a public interface change. We don't need another
store, i.e., we can just use a "View" on the existing store, which is
basically just using the KTableValueGetter to apply the map or filter
operation to the original store. We also have this jira
https://issues.apache.org/jira/browse/KAFKA-5581 to look into optimizing
when we do and don't need to add additional changelogs.



So I really see us tackeling only the first part currently. Wich in my
opinion is to short-sighted to settle on an Public API.


We are not settling on the public API. We do, however need to do KIPs for
public API discussions. For internal changes we don't necessarily need to
have a public discussion about it.



This is why I want to tackle our approach to IQ-first, as it seems to me
to be the most disruptive thing. And the cause of most problems.

The Plan:

Table from topic, kstream (don't even like this one, but probaly needed
for some kind of enhanced flexibility) or aggregations would be the only
KTables that would get associated with a statestore (their processors).
For these operations one can have the "statestoresupplier" overload but
also not the "querablestatestore" overload. From this point on KTables
abstraction would be considered restored.
All the overloads of join and through with respect to IQ would go away.
"through" would go completely maybe the benefit added is. The method I
would add is for a table to get a Queryhandle.
This query handle will underneath remember its tables processor name. To
access the data form IQ we would not rely on the "per processor
statestore" but go the usual path through ValueGetterSupplier.
*Note:* We do not necessarily have a Serde for V, especially after
mapValues. also not for any intermediate Data types. It would be each
KTableProccesors job to provide a serialized version of upstream Datatypes.
KTableKTabkeJoinwould need to bring a JoinInputSerializer that
would serialize both upstream values for trans

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-01 Thread Jan Filipiak

Hi all,

after some further discussions, the best thing to show my Idea of how it 
should evolve would be a bigger mock/interface description.
The goal is to reduce the store maintaining processors to only the 
Aggregators + and KTableSource. While having KTableSource optionally 
materialized.


Introducing KTable:copy() will allow users to maintain state twice if 
they really want to. KStream::join*() wasn't touched. I never personally 
used that so I didn't feel
comfortable enough touching it. Currently still making up my mind. None 
of the suggestions made it querieable so far. Gouzhangs 'Buffered' idea 
seems ideal here.


please have a look. Looking forward for your opinions.

Best Jan



On 21.06.2017 17:24, Eno Thereska wrote:

(cc’ing user-list too)

Given that we already have StateStoreSuppliers that are configurable using the 
fluent-like API, probably it’s worth discussing the other examples with joins 
and serdes first since those have many overloads and are in need of some TLC.

So following your example, I guess you’d have something like:
.join()
.withKeySerdes(…)
.withValueSerdes(…)
.withJoinType(“outer”)

etc?

I like the approach since it still remains declarative and it’d reduce the 
number of overloads by quite a bit.

Eno


On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:

Hi,

I'd like to get a discussion going around some of the API choices we've
made in the DLS. In particular those that relate to stateful operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the API, i.e, there
are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
feel it is only going to get worse as we add more optional params. In
particular we've had some requests to be able to turn caching off, or
change log configs,  on a per operator basis (note this can be done now if
you pass in a StateStoreSupplier, but this can be a bit cumbersome).

So this is a bit of an open question. How can we change the DSL overloads
so that it flows, is simple to use and understand, and is easily extended
in the future?

One option would be to use a fluent API approach for providing the optional
params, so something like this:

groupedStream.count()
   .withStoreName("name")
   .withCachingEnabled(false)
   .withLoggingEnabled(config)
   .table()



Another option would be to provide a Builder to the count method, so it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").withCachingEnabled(false).build())

Another option is to say: Hey we don't need this, what are you on about!

The above has focussed on state store related overloads, but the same ideas
could  be applied to joins etc, where we presently have many join methods
and many overloads.

Anyway, i look forward to hearing your opinions.

Thanks,
Damian



@InterfaceStability.Evolving
public interface KTable {

KTable filter(final Predicate predicate);
KTable filterNot(final Predicate predicate);
 KTable mapValues(final ValueMapper 
mapper);

KStream toStream();

KTable copy(); Inserts a new KTableSource
KTable copy(Materialized m); inserts a new KTableSource using 
toStream() as parent


   //I see why, Id rather have users using to+table
KTable through(final String topic);
KTable through(Produced p,
 final String topic);

void to(final String topic);
void to(final Produced
final String topic);

 KGroupedTable groupBy(final KeyValueMapper> selector);
 KGroupedTable groupBy(final KeyValueMapper> selector, Serialized s);

 KTable join(final KTable other,
final ValueJoiner joiner);

 KTable leftJoin(final KTable other,
final ValueJoiner joiner);

 KTable outerJoin(final KTable other,
 final ValueJoiner joiner);

UninitializedQueryHandle QueryHandle(); // causes enable sending old 
value / materialize



//Currently marked deprecated, easily reproduced by map or similiar
void writeAsText(final String filePath);
void writeAsText(final String filePath,
 final String streamName);
void  writeAsText(final String filePath,
  final Serde keySerde,
  final Serde valSerde);
void writeAsText(final String filePath,
 final String streamName,
 final Serde keySerde,
 final Serde valSerde);
void foreach(final ForeachAction action);
}


public interface UninitializedQueryHandle{

QueryHandle initialize(KafkaStreams ks);
}

public interface QueryHandle {

V get(K k);

}

public interface Produced{

Produced static with();

Produced serializer(Serialized s);

Produced partitioner(StreamPartitionier sp);

//sneaky new feature. skip

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-02 Thread Jan Filipiak

Hi Bill,

totally! So in the original discussion it was mentioned that the 
overloads are nasty when implementing new features. So we wanted to get 
rid of them. But what I felt was that the
copy & pasted code in the KTableProcessors for maintaining IQ stores was 
as big as a hurdle as the overloads.


With this proposal I try to shift things into the direction of getting 
IQ for free if
KTableValueGetterSupplier is properly implemented (like getting join for 
free then). Instead of having the code for maintaining IQ stores all the 
places. I realized I can do that while getting rid of the overloads, 
that makes me feel my proposal is superior.


Further I try to optimize by using as few stores as possible to give the 
user what he needs. That should save all sorts of resources while 
allowing faster rebalances.


The target ultimately is to only have KTableSource and the Aggregators 
maintain a Store and provide a ValueGetterSupplier.


Does this makes sense to you?

Best Jan

On 02.08.2017 18:09, Bill Bejeck wrote:

Hi Jan,

Thanks for the effort in putting your thoughts down on paper.

Comparing what I see from your proposal and what is presented in 
KIP-182, one of the main differences is the exclusion of 
an`Materialized`  instance in the `KTable` methods.


Can you go into more detail why this is so and the specific problems 
is avoids and or solves with this approach?


Thanks!
Bill

On Wed, Aug 2, 2017 at 4:19 AM, Damian Guy <mailto:damian@gmail.com>> wrote:


Hi Jan,

Thanks for taking the time to put this together, appreciated. For the
benefit of others would you mind explaining a bit about your
motivation?

Cheers,
Damian

On Wed, 2 Aug 2017 at 01:40 Jan Filipiak mailto:jan.filip...@trivago.com>> wrote:

> Hi all,
>
> after some further discussions, the best thing to show my Idea
of how it
> should evolve would be a bigger mock/interface description.
> The goal is to reduce the store maintaining processors to only the
> Aggregators + and KTableSource. While having KTableSource optionally
> materialized.
>
> Introducing KTable:copy() will allow users to maintain state
twice if
> they really want to. KStream::join*() wasn't touched. I never
personally
> used that so I didn't feel
> comfortable enough touching it. Currently still making up my
mind. None
> of the suggestions made it querieable so far. Gouzhangs
'Buffered' idea
> seems ideal here.
    >
> please have a look. Looking forward for your opinions.
>
> Best Jan
>
>
>
> On 21.06.2017 17 :24, Eno Thereska wrote:
> > (cc’ing user-list too)
> >
> > Given that we already have StateStoreSuppliers that are
configurable
> using the fluent-like API, probably it’s worth discussing the other
> examples with joins and serdes first since those have many
overloads and
> are in need of some TLC.
> >
> > So following your example, I guess you’d have something like:
> > .join()
> > .withKeySerdes(…)
> > .withValueSerdes(…)
> > .withJoinType(“outer”)
> >
> > etc?
> >
> > I like the approach since it still remains declarative and
it’d reduce
> the number of overloads by quite a bit.
> >
> > Eno
> >
> >> On Jun 21, 2017, at 3:37 PM, Damian Guy mailto:damian@gmail.com>> wrote:
> >>
> >> Hi,
> >>
> >> I'd like to get a discussion going around some of the API
choices we've
> >> made in the DLS. In particular those that relate to stateful
operations
> >> (though this could expand).
> >> As it stands we lean heavily on overloaded methods in the
API, i.e,
> there
> >> are 9 overloads for KGroupedStream.count(..)! It is becoming
noisy and i
> >> feel it is only going to get worse as we add more optional
params. In
> >> particular we've had some requests to be able to turn caching
off, or
> >> change log configs,  on a per operator basis (note this can
be done now
> if
> >> you pass in a StateStoreSupplier, but this can be a bit
cumbersome).
> >>
> >> So this is a bit of an open question. How can we change the DSL
> overloads
> >> so that it flows, is simple to use and understand, and is easily
> extended
> >> in the future?
> >>
> >> One option would be to use a fluent API approach for
providing the
> optional
> >> params, so something like this

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-04 Thread Jan Filipiak
ving them separatly will increase the readability of topologies by 
a lot IMO.

For these quick example Topologies that we have floating around in all places:
I am pretty sure one can go unbroken on them and usually the last table will be 
the one that
is needed for IQ then.


Thanks again. The second point really got me thinking, as your perspective on 
the importance
of "not break the fluent interface" was not clear to me. I hope I managed to 
line out why I
think it shouldn't have such a big weight in the discussion.

PS.: check out Hive CTE, everyone loves them and our Analytic team is crazy for 
them
because you can name them and that brings clarity. and you get rid of the 
nesting and can
split everything into logical chunks of SQL. KTable variables are the CTE of 
kafka streams.
One can probably sell this to people :)

Best Jan
Enjoyed your feedback! hope mine makes sense




On 03.08.2017 00:10, Guozhang Wang wrote:

Hello Jan,

Thanks for your proposal. As Bill mentioned the main difference is that we
extract the user-customizable materialization logic out of the topology
building DSL workflow. And the main motivations are in two folds:

1) efficiency wise, it allows some KTables to not be materialized if
unnecessary, saving one state store instance and changelog topic.

2) programming wise, it looks nicer to separate the topology construction
code from the KTable materialization for IQ uses code.


Here are my thoughts regarding these two points:

Regarding 1), I think with whichever the public APIs (either Damian's
proposal or yours), we can always apply the internal optimization to not
physically materialize the KTable. You can take a look at the internal
interface of "KTableValueGetterSupplier", which is used exactly for this
purposes such that a get call on a "logically" materialized KTable can be
traced back to its parent KTables that are physically materialized in a
state store. So following proposed APIs, for example:


stream.groupByKey(..).aggregate(.., Materializedas("store1"))//
this resulted KTable is materialized in order to complete the aggregation
operation
 .filter(Materialized.as("store2"))
// this restuled KTable is not materialized but its
GetterSupplier is implemented to get values from "store1"


Or

table1 = stream.groupByKey(..).aggregate(..);
table2 = table1.filter();

tabel1.queryHandle("store1");   // this resulted KTable is materialized
in order to complete the aggregation operation
tabel1.queryHandle("store2")// this restuled KTable is not
materialized but its GetterSupplier is implemented to get values from
"store1"



When user query a value for "store2" which is not actually materialized
into a state store, the GetterSupplier will be triggered to in turn query
the store for "store1", and then apply the filter operator on-the-fly to
return the value. So the bottom line is, we can achieve the same efficiency
optimization with either of the public APIs.


Regarding 2), I actually have proposed a similar API to yours earlier in
this discussion thread:

---

// specifying the topology, should be concise and conveniently
concatenated, no specs of materialization at all

KStream stream1 = builder.stream();
KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
sessionMerger, sessionWindows);  // do not allow to pass-in a state store
supplier here any more

// additional code to the topology above, could be more prescriptive
than descriptive
// only advanced users would want to code in both parts above; while other
users would only code the topology as above.

table1.materialize("queryableStoreName"); // or..
table1.materialize("queryableStoreName").enableCaching().enableLogging();
// or..
table1.materialize(stateStoreSupplier); // we check type (key-value types,
windowed or not etc) at starting time and add the metrics / logging /
caching / windowing wrapper on top of the store, or..
table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); //
etc..

---

But one caveat of that, as illustrated above, is that you need to have
separate object of the KTable in order to call either "queryHandle" or
"materialize" (whatever the function name is) for the specifications of
materialization options. This can break the concatenation of the topology
construction part of the code, that you cannot simply add one operator
directly after another. So I think this is a trade-off we have to make and
the current approach looks better in this regard.



Guozhang




On Wed, Aug 2, 2017 at 2:07 PM, Jan Filipiak
wrote:


Hi Bill,

totally! So in the original discussion it was mentioned that the overloads
are nasty when implementing new features. So we wanted to get r

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-11 Thread Jan Filipiak

Inline

rather sparse for the lack of time.

Sadly I can't agree to any of your arguments and I _hate_ how its gonna 
look,

but we can't have this discussion for ever.

I think I explained everything in enough detail so my points can make sense.
if someone is interested and has specific questions, can always approach me.

Otherwise I am just going to drink the kool-aid now. :(

Best Jan

On 08.08.2017 20:37, Guozhang Wang wrote:

Hello Jan,

Thanks for your feedback. Trying to explain them a bit more here since I
think there are still a bit mis-communication here:

Here are a few things I need to clarify for KIP-182 first:

1. KIP-182 is mainly about refactoring the public APIs, NOT for making any
optimizations on the internal implementations. So we only care that these
public APIs changes do not forbid us to make the internal implementations
in the near future.

To give you a concrete example, as you mentioned that KTableValueGetterSupplier
is NOT used in IQ, and that a materialized physical store is always used
today. Yes that is true, and we do have plans to optimize this case soon;
for example, it is still doable with the proposed KIP-182 that we can
remove the physical materialized store but use KTableValueGetterSupplier to
read form a up-stream's physical store and apply the optimizations. Another
example you mentioned is stream-stream join, where each stream is
physically materialized into a store, we can definitely optimize this in
the future to remove the physical materialized store but use something
else, e.g. a in-memory buffer. Such optimizations are NOT blocked by the
updated public APIs of KIP-182.
One of the big goals of the refactoring at least was to get rid of the 
overloads
to make implementation of new features easier as one has not to take 
care about
all the overloads. Folding 2 Overloads into 1 with a Builder that has 2 
way of beeing build

wont help much here.

Having the DSL express very closely what happens will only help people 
not getting confused.
Having the store overload on every operation is just plain confusing 
right now.





2. One concern you raise that KIP-182 may actually block such
optimizations, is that if users do specify a StateStoreSupplier then we
cannot optimize that away. That is true, but that is by design: if user do
specify a state store supplier in Materialized API, that is equal to say
"forget about doing any smart things library, just use what I give to you".
In other words, the above mentioned optimizations can be applied if users
do not enforce any specific StateStoreSupplier, for example in

public static  Materialized
as(final String
storeName)

i.e. user only provide a store name, which is similar like handler token
for IQ; then the library still have the freedom to do smart things
internally which is totally hidden from the users. It is similar to, like
in RDBMS or some NoSQL stores like in HIVE / Cassandra: the store engine do
not have the freedom to do those query plan optimizations if users already
enforce the specs like join ordering, query plan generation rules, etc.
You call the same method with the builder build differently and its 
going todo
different things. That is my definition of unituitive + The code 
internally has to become
dead ugly as it needs to apply these optimisations basically in the same 
method call or
at the place the Builder is evaluated. This just cries for ugly internal 
code. There is no

way this can become pretty




3. About whether it is worthwhile to "not break the fluent interface", a
key point that we cannot just try to optimize one or two use cases, but
consider the whole user space, and ask what are the percentage of users
that may get affected. Note that in the DSL we have overloaded functions
where Materialized / Joined / other options are NOT needed so for most
normal users they do not need to worry about the specs at all.

So suppose there are only X% "advanced" users who would want to enforce
some state store suppliers, and Y% who like to use IQ, 100-X-Y percent of
normal users see no difference in terms of programming for either of these
two approaches: whether to separate the specs into a different set of APIs.
And for the Y percent of users they are most likely to just use the
simplest APIs which is `operator(..., Materialized.as(storeName))` which
does not sound too bad as to `table = operator(...);
table.materialize(storeName)`. In other words we use the first approach
then only X percent of users may have an awkward programming with complex
option specs along with the operator; if we use the second approach the X+Y
users need to break its programing fluency to call `table.materialize`
separately. And my personal guess is that

0 < X << Y < 1, and that X is very minor compared to Y. That is why I feel
this is a good trade-off.

The keypoint here is that It doesn't matter. Any sufficiently usefull 
topology
will get broken up by the u

Re: Kafka Connect Sink Connector for multiple JDBC sinks

2017-09-16 Thread Jan Filipiak

Hi,

entirely depends on how you want to serialize. You should be able to get 
everything running on Windows anyhow. Nothing expect the broker is 
really extensively using OS support for operating.


To answer your initial question: You would simply start multiple sinks 
and give each sink a different connect string. That should do what you 
want instantly


Best Jan

On 16.09.2017 22:51, M. Manna wrote:

Yes I have, I do need to build and run Schema Registry as a pre-requisite
isn't that correct? because the QuickStart seems to start AVRO - without
AVRO you need your own implementation of transformer/serdes etc.

I am only asking since my deployment platform is Windows Server 2012 - and
Confluent pkg is meant to be run on Linux. I guess there is a lot of manual
conversion I need to do here?

On 16 September 2017 at 21:43, Ted Yu  wrote:


Have you looked at https://github.com/confluentinc/kafka-connect-jdbc ?

On Sat, Sep 16, 2017 at 1:39 PM, M. Manna  wrote:


Sure. But all these are not available via Kafka open source (requires
manual coding), correct? Only Confluence seems to provide some
off-the-shelf connector but Confluent isn't compatible on Windows (yet),
also correct?



On 13 September 2017 at 18:11, Sreejith S  wrote:


This is possible. Once you have records in your put method, its up your
logic how you are redirecting it to multiple jdbc connections for
insertion.

In my use case i have implemented many to many sources and sinks.

Regards,
Srijith

On 13-Sep-2017 10:14 pm, "M. Manna"  wrote:

Hi,

I need a little help/suggestion if possible. Does anyone know if it's
possible in Kafka to develop a connector that can sink for multiple

JDBC

urls for the same topic (i.e. table) ?

The examples I can see on Confluent talks about one JDBC url

(one-to-one

sink). Would it be possible to achieve a one-to-many ?

What I am trying to do is the following:

1) Write to a topic
2) Sink it to multiple DBs (they all will have the same table).

Is this doable/correct way for Connect API?

Kindest Regards,





Re: Log Compaction Not Picking up Topic

2017-10-25 Thread Jan Filipiak

Hi,

unfortunatly there is nothing trivial you could do here.
Without upgrading your kafkas you can only bounce the partition back and 
forth

between brokers so they compact while its still small.

With upgrading you could also just cherrypick this very commit or put a 
logstatement to verify.


Given the Logsizes your dealing with, I am very confident that this is 
your issue.


Best Jan


On 25.10.2017 12:21, Elmar Weber wrote:

Hi,

On 10/25/2017 12:15 PM, Xin Li wrote:
> I think that is a bug, and  should be fixed in this task 
https://issues.apache.org/jira/browse/KAFKA-6030.
> We experience that in our kafka cluster, we just check out the 
11.0.2 version, build it ourselves.


thanks for the hint, as it looks like a calculation issue, would it be 
possible to verify this by manually changing the clean ratio or some 
other settings?


Best,
Elmar





[DISCUSS] KIP-213 Support non-key joining in KTable

2017-10-25 Thread Jan Filipiak

Hello Kafka-users,

I want to continue with the development of KAFKA-3705, which allows the 
Streams DSL to perform KTableKTable-Joins when the KTables have a 
one-to-many relationship.
To make sure we cover the requirements of as many users as possible and 
have a good solution afterwards I invite everyone to read through the 
KIP I put together and

discuss it here in this Thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
https://issues.apache.org/jira/browse/KAFKA-3705
https://github.com/apache/kafka/pull/3720

I think a public discussion and vote on a solution is exactly what is 
needed to bring this feauture into kafka-streams. I am looking forward 
to everyones opinion!


Please keep the discussion on the mailing list rather than commenting on 
the wiki (wiki discussions get unwieldy fast).


Best
Jan








Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-10-26 Thread Jan Filipiak

Thanks for the remarks. hope I didn't miss any.
Not even sure if it makes sense to introduce A and B or just stick with 
"this ktable", "other ktable"


Thank you
Jan


On 27.10.2017 06:58, Ted Yu wrote:

Do you mind addressing my previous comments ?

http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+DISCUSS+KIP+213+Support+non+key+joining+in+KTable

On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak 
wrote:


Hello everyone,

this is the new discussion thread after the ID-clash.

Best
Jan

__


Hello Kafka-users,

I want to continue with the development of KAFKA-3705, which allows the
Streams DSL to perform KTableKTable-Joins when the KTables have a
one-to-many relationship.
To make sure we cover the requirements of as many users as possible and
have a good solution afterwards I invite everyone to read through the KIP I
put together and discuss it here in this Thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
Support+non-key+joining+in+KTable
https://issues.apache.org/jira/browse/KAFKA-3705
https://github.com/apache/kafka/pull/3720

I think a public discussion and vote on a solution is exactly what is
needed to bring this feauture into kafka-streams. I am looking forward to
everyones opinion!

Please keep the discussion on the mailing list rather than commenting on
the wiki (wiki discussions get unwieldy fast).

Best
Jan







Re: No. of Kafk Instances in Single machine

2017-11-06 Thread Jan Filipiak

Hi,

I probably would recommend you to go for 1 instance. You can bump a few 
thread configs to match your hardware better.


Best Jan

On 06.11.2017 12:23, chidigam . wrote:

Hi All,
Let say, I have big machine, which having 120GB RAM,  with lot of cores,
and very high disk capacity.

How many no. of kafka instances are recommended? Is there any general
principle I can apply, to calculate optimal no.

Any help in this regards is highly appreciated.

Regards
Bhanu





Re: Plans to extend streams?

2017-11-29 Thread Jan Filipiak

Hey,

you making some wrong assumptions here.
Kafka Streams is in no way single threaded or
limited to one physical instance.
Having connectivity issues to your brokers is IMO
a problem with the deployment and not at all
with how kafka streams is designed and works.

Kafka Streams moves hundreds of GB per day for us.

Hope this helps.

Best Jan


On 29.11.2017 15:10, Adrienne Kole wrote:

Hi,

The purpose of this email is to get overall intuition for the future  plans
of streams library.

The main question is that, will it be a single threaded application in the
long run and serve microservices use-cases, or are there any plans to
extend it to multi-node execution framework with less kafka dependency.

Currently, each streams node 'talks' with kafka cluster and they can
indirectly talk with each other again through kafka. However, especially if
kafka is not in the same network with streams nodes (actually this can
happen if they are in the same network as well) this will cause high
network overhead and inefficiency.

One solution for this (bypassing network overhead) is to deploy streams
node on kafka cluster to ensure the data locality. However, this is not
recommended as the library and kafka can affect each other's performance
and  streams does not necessarily have to know the internal data
partitioning of kafka.

Another solution would be extending streams library to have a common
runtime. IMO, preserving the current selling points of streams (like
dynamic scale in/out) with this kind of extensions can be very good
improvement.

So my question is that, will streams in the long/short run, will extend its
use-cases to massive and efficient stream processing (and compete with
spark) or stay and strengthen its current position?

Cheers,
Adrienne





Re: Plans to extend streams?

2017-11-30 Thread Jan Filipiak

hi,

I understand your point better now.

I think systems of that kind have been build plenty and I never liked 
their trade-offs.


Samza and Kafka-streams form a great alternative to what is out there in 
great numbers.


I am a big fan of how this is designed and think its really great. Maybe 
you should

give it a shot?

Just to get you interested:
With extreme detailed partition assignment and deploying stream jobs on 
broker instances
you can align all the topics so that you get basically the same kind of 
shuffle other system use.
attaching a little bit of "cruise-control" 
https://engineering.linkedin.com/blog/2017/08/open-sourcing-kafka-cruise-control
could also handle node failures. But usually this is not necessary. The 
hop across the broker is usually just to efficient

to have this kind of fuzz going on.

Hope this can convince you to try it out.


Best Jan


On 29.11.2017 20:15, Guozhang Wang wrote:

Hello Adrienne,

I think your suggested feature to to use not only Kafka as inter-process
communication but also configurable to use TCP directly, right?

There are a few people asking about this before, especially for not using
Kafka for repartitioning (think: shuffling in the batch world), but let
them go through TCP between processes. Though this is doable, I'd point out
that it may have many side-effects such as:

1) back pressure: Streams library do not worry about back pressure at all
since all communication channels are persistent (Kafka topics), using TCP
then you need to face the back pressure issue again.
2) exactly once semantics: the transactional messaging is leveraged by
Streams to achieve EOS, and extending TCP means that we need to add more
gears to handle TCP data loss / duplicates (e.g. other frameworks have been
using buffers with epoch boundaries to do that).
3) state snapshots: imagine if you are shutting down your app, we then need
to make sure all in-flight messages with TCP are drained because otherwise
we are not certain if the committed offsets are valid or not.



Guozhang


On Wed, Nov 29, 2017 at 8:26 AM, Adrienne Kole 
wrote:


Hi,

You misunderstood the focus of the post perhaps or I could not explain
properly. I am not claiming the streams is limited to single node.
Although the whole topology instance can be limited to a single node (each
node run all topology), this is sth else.
Also, I think that "moving 100s of GB data per day" claim is orthogonal
and as this is not big/fast/ enough to reason.

The thing is that, for some use-cases streams-kafka-streams connection can
be a bottleneck.  Yes, if I have 40GB/s or infiniband network bandwidth
this might not be an issue.

Consider a simple topology with operators A>B->C. (B forces to
re-partition)
  Streams nodes are s1(A), s2 (B,C) and kafka resides on cluster k, which
might be in different network switch.
So, rather than transferring data k->s1->s2, we make a round trip
k->s1->k->s2. If we know that s1 and s2 are in the same network and data
transfer is fast between two, we should not go through another intermediate
layer.


Thanks.



On Wed, Nov 29, 2017 at 4:52 PM, Jan Filipiak 
wrote:


Hey,

you making some wrong assumptions here.
Kafka Streams is in no way single threaded or
limited to one physical instance.
Having connectivity issues to your brokers is IMO
a problem with the deployment and not at all
with how kafka streams is designed and works.

Kafka Streams moves hundreds of GB per day for us.

Hope this helps.

Best Jan



On 29.11.2017 15:10, Adrienne Kole wrote:


Hi,

The purpose of this email is to get overall intuition for the future
plans
of streams library.

The main question is that, will it be a single threaded application in

the

long run and serve microservices use-cases, or are there any plans to
extend it to multi-node execution framework with less kafka dependency.

Currently, each streams node 'talks' with kafka cluster and they can
indirectly talk with each other again through kafka. However, especially
if
kafka is not in the same network with streams nodes (actually this can
happen if they are in the same network as well) this will cause high
network overhead and inefficiency.

One solution for this (bypassing network overhead) is to deploy streams
node on kafka cluster to ensure the data locality. However, this is not
recommended as the library and kafka can affect each other's performance
and  streams does not necessarily have to know the internal data
partitioning of kafka.

Another solution would be extending streams library to have a common
runtime. IMO, preserving the current selling points of streams (like
dynamic scale in/out) with this kind of extensions can be very good
improvement.

So my question is that, will streams in the long/short run, will extend
its
use-cases to massive and efficient stream processing (and compete with
spark) or stay and strengthen its current position?

Cheers,
Adrienne









Re: Joins in Kafka Streams and partitioning of the topics

2017-11-30 Thread Jan Filipiak

Hi,

Haven't checked your code. But from what you describe you should be fine.
Upgrading the version might help here and there but should still work 
with 0.10

I guess.

Best Jan


On 30.11.2017 19:16, Artur Mrozowski wrote:

Thank you Damian, it was very helpful.
I have implemented my solution in version 0.11.0.2 but there is one thing I
still wonder.
So what I try to do is what is described in KIP 150. Since it didn't make
to the release for 1.0 I do it the old fashioned way.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
First
KTable table1 =
builder.stream("topic1").groupByKey().aggregate(initializer1,
aggregator1, aggValueSerde1, storeName1);


for all the four topics and then I join the results.
And here is the thing, the topics are partitioned and I don't used global
tables, nor keyed messages and it seems to work fine.

 From Confluents documentation one could get impression that when reading
from partitoned topics you need to use global tables. But is it really
necessary in this case?
And if not then why?

Thanks again
Artur

Here is the link to my implementation

https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java

On Wed, Nov 22, 2017 at 12:10 PM, Damian Guy  wrote:


Hi Artur,

KafkaStreams 0.10.0.0 is quite old and a lot has changed and been fixed
since then. If possible i'd recommend upgrading to at least 0.11.0.2 or
1.0.
For joins you need to ensure that the topics have the same number of
partitions (which they do) and that they are keyed the same.

Thanks,
Damian

On Wed, 22 Nov 2017 at 11:02 Artur Mrozowski  wrote:


Hi,
I am joining 4 different topic with 4 partitions each using 0.10.0.0
version of Kafka Streams.  The joins are KTable to KTable. Is there
anything I should be aware of considering partitions or version of Kafka
Streams? In other words should I be expecting consistent results or do I
need to for example use Global tables.

I'd like to run that application on Kubernetes later on. Should I think

of

anything or do different instances of the same Kafka Streams application
take care of management of the state?

Grateful for any thoughts or a piece of advice

Best Regards
/Artur





Re: Joins in Kafka Streams and partitioning of the topics

2017-11-30 Thread Jan Filipiak
There are some oddities in your topology that make make we wonder if 
they are the true drivers of your question.


https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java#L300
Feels like it should be a KTable to begin with for example otherwise it 
is not clear how big this is supposed to grow

https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java#L325
Same thing for policies. KGlobalTable might be chipped in later if you 
fat up from too many repartitions as some sort of

performance optimisation, but my opinions on it are not to high.


Hope that helps, just keep the questions coming, also check if you might 
want to join confluentcommunity on slack.
Could never imaging that something like a insurance can really be 
modelled as 4 streams ;)


Best Jan





On 30.11.2017 21:07, Artur Mrozowski wrote:

what if I start two instances of that application?  Does the state migrate
between the applications? Is it then I have to use a global table?

BR
Artur

On Thu, Nov 30, 2017 at 7:40 PM, Jan Filipiak 
wrote:


Hi,

Haven't checked your code. But from what you describe you should be fine.
Upgrading the version might help here and there but should still work with
0.10
I guess.

Best Jan



On 30.11.2017 19:16, Artur Mrozowski wrote:


Thank you Damian, it was very helpful.
I have implemented my solution in version 0.11.0.2 but there is one thing
I
still wonder.
So what I try to do is what is described in KIP 150. Since it didn't make
to the release for 1.0 I do it the old fashioned way.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+
Kafka-Streams+Cogroup
First
KTable table1 =
builder.stream("topic1").groupByKey().aggregate(initializer1,
aggregator1, aggValueSerde1, storeName1);


for all the four topics and then I join the results.
And here is the thing, the topics are partitioned and I don't used global
tables, nor keyed messages and it seems to work fine.

  From Confluents documentation one could get impression that when reading
from partitoned topics you need to use global tables. But is it really
necessary in this case?
And if not then why?

Thanks again
Artur

Here is the link to my implementation

https://github.com/afuyo/KStreamsDemo/blob/master/src/main/
java/kstream.demo/CustomerStreamPipelineHDI.java

On Wed, Nov 22, 2017 at 12:10 PM, Damian Guy 
wrote:

Hi Artur,

KafkaStreams 0.10.0.0 is quite old and a lot has changed and been fixed
since then. If possible i'd recommend upgrading to at least 0.11.0.2 or
1.0.
For joins you need to ensure that the topics have the same number of
partitions (which they do) and that they are keyed the same.

Thanks,
Damian

On Wed, 22 Nov 2017 at 11:02 Artur Mrozowski  wrote:

Hi,

I am joining 4 different topic with 4 partitions each using 0.10.0.0
version of Kafka Streams.  The joins are KTable to KTable. Is there
anything I should be aware of considering partitions or version of Kafka
Streams? In other words should I be expecting consistent results or do I
need to for example use Global tables.

I'd like to run that application on Kubernetes later on. Should I think


of


anything or do different instances of the same Kafka Streams application
take care of management of the state?

Grateful for any thoughts or a piece of advice

Best Regards
/Artur






Re: Configuration: Retention and compaction

2017-12-03 Thread Jan Filipiak

Hi

the only retention time that applies for compacted topics is the 
delete.retention.ms
The duration that tombstones for deletes will be kept in the topic 
during compaction.


A very detail explaination on what is going on can be found here:

https://kafka.apache.org/documentation/#compaction

Hope this helps

Best Jan


On 03.12.2017 20:27, Dmitry Minkovsky wrote:

This is a pretty stupid question. Mostly likely I should verify these by
observation, but really I want to verify that my understanding of the
documentation is correct:

Suppose I have topic configurations like:

retention.ms=$time
cleanup.policy=compact


My questions are:

1. After $time, any offsets older than $time will be eligible for
compaction?
2. Regardless of $time, any offsets in the current segment will not be
compacted?


Thank you,
Dmitry





Re: Mirrormaker consumption slowness

2017-12-06 Thread Jan Filipiak

Hi,

two questions. Is your MirrorMaker collocated with the source or the target?
what are the send and receive buffer sizes on the connections that do span
across WAN?

Hope we can get you some help.

Best jan



On 06.12.2017 14:36, Xu, Zhaohui wrote:

Any update on this issue?

We also run into similar situation recently. The mirrormaker is leveraged to 
replicate messages between clusters in different dc. But sometimes a portion of 
partitions are with high consumer lag and tcpdump also shows similar packet 
delivery pattern. The behavior is sort of weird and is not self-explaining. 
Wondering whether it has anything to do with the fact that number of consumers 
is too large?  In our example, we have around 100 consumer connections per 
broker.

Regards,
Jeff

On 12/5/17, 10:14 AM, "tao xiao"  wrote:

 Hi,
 
 any pointer will be highly appreciated
 
 On Thu, 30 Nov 2017 at 14:56 tao xiao  wrote:
 
 > Hi There,

 >
 >
 >
 > We are running into a weird situation when using Mirrormaker to replicate
 > messages between Kafka clusters across datacenter and reach you for help 
in
 > case you also encountered this kind of problem before or have some 
insights
 > in this kind of issue.
 >
 >
 >
 > Here is the scenario. We have setup a deployment where we run 30
 > Mirrormaker instances on 30 different nodes. Each Mirrormaker instance is
 > configure with num.streams=1 thus only one consumer runs. The topics to
 > replicate is configure with 100 partitions and data is almost evenly
 > distributed across all partitions. After running a period of time, weird
 > things happened that some of the Mirrormaker instances seems to slow down
 > and consume at a relative slow speed from source Kafka cluster. The 
output
 > of tcptrack shows the consume rate of problematic instances dropped to
 > ~1MB/s, while the other healthy instances consume at a rate of  ~3MB/s. 
As
 > a result, the consumer lag for corresponding partitions are going high.
 >
 >
 >
 >
 > After triggering a tcpdump, we noticed the traffic pattern in tcp
 > connection of problematic Mirrmaker instances is very different from
 > others. Packets flowing in problematic tcp connections are relatively 
small
 > and seq and ack packets are basically coming in one after another. On the
 > other hand, the packets in healthy tcp connections are coming in a
 > different pattern, basically several seq packets comes with an ack 
packets.
 > Below screenshot shows the situation, and these two captures are got on 
the
 > same mirrormaker node.
 >
 >
 >
 > problematic connection.  ps. 10.kfk.kfk.kfk is kafka broker, 10.mm.mm.mm
 > is Mirrormaker node
 >
 > 
https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fimgur.com%2FZ3odjjT&data=02%7C01%7Czhaohxu%40ebay.com%7Ca8efe84f9feb47ecb5fd08d53b85d7ac%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636480368398154028&sdata=2DdGcjPWD7QI7lZ7v7QDN6I53P9tsSTMzEGdw6IywmU%3D&reserved=0
 >
 >
 > healthy connection
 >
 > 
https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fimgur.com%2Fw0A6qHT&data=02%7C01%7Czhaohxu%40ebay.com%7Ca8efe84f9feb47ecb5fd08d53b85d7ac%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636480368398154028&sdata=v52DmmY9LHN2%2F59Hb5Xo77JuLreOA3lfDyq8eHKmISQ%3D&reserved=0
 >
 >
 > If we stop the problematic Mirrormaker instance and when other instances
 > take over the lagged partitions, they can consume messages quickly and
 > catch up the lag soon. So the broker in source Kafaka cluster is supposed
 > to be good. But if Mirrormaker itself causes the issue, how can one tcp
 > connection is good but others are problematic since the connections are 
all
 > established in the same manner by Kafka library.
 >
 >
 >
 > Consumer configuration for Mirrormaker instance as below.
 >
 > auto.offset.reset=earliest
 >
 >
 > 
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
 >
 > heartbeat.interval.ms=1
 >
 > session.timeout.ms=12
 >
 > request.timeout.ms=15
 >
 > receive.buffer.bytes=1048576
 >
 > max.partition.fetch.bytes=2097152
 >
 > fetch.min.bytes=1048576
 >
 >
 >
 > Kafka version is 0.10.0.0 and we have Kafka and Mirrormaker run on Ubuntu
 > 14.04
 >
 >
 >
 > Any response is appreciated.
 >
 > Regards,
 >
 > Tao
 >
 





Re: Consuming a state store (KTable) basics - 1.0.0

2017-12-06 Thread Jan Filipiak

Hi,

you should be able to retrieve your store with

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1021

This would give you access to the store from inside your current 
application. In your Streams application your could then
expose this Store with a say REST or any other RPC interface, to let 
applications from outside your JVM query it.


So i would say the blogpost still applies quite well.

Hope this helps

Best Jan


On 07.12.2017 04:59, Peter Figliozzi wrote:

I've written a Streams application which creates a KTable like this:

val myTable: KTable[String, GenericRecord] = myStream
 .groupByKey()
 .aggregate(myInitializer, myAdder, myStore)

where myStore was configured like this:

val myStore
 : Materialized[String, GenericRecord, KeyValueStore[Bytes,
Array[Byte]]] =
 Materialized
   .as("my-store")
   .withKeySerde(Serdes.String())
   .withValueSerde(genericValueSerde)

What I'd like to do now is query (read) this store from a separate
application.  How do I query it in 1.0.0?  With a KTable constructor, using
the store string as the topic, i.e.:

public  KTable table(
java.lang.String topic,
Materialized>
materialized)

Or some other way?

I saw this blog post
<https://blog.codecentric.de/en/2017/03/interactive-queries-in-apache-kafka-streams/>
but it appears to be only applicable to the older version of Kafka (please
correct me if I'm wrong).

Thanks,

Pete





Re: Consuming a state store (KTable) basics - 1.0.0

2017-12-07 Thread Jan Filipiak

Hi Peter,

glad it helped,

these are the preferred ways indeed.




On 07.12.2017 15:58, Peter Figliozzi wrote:

Thanks Jan, super helpful!  To summarize (I hope I've got it right), there
are only two ways for external applications to access data derived from a
KTable:

1.  Inside the streams application that builds the KTable, create a
KafkaStreams.store and expose to the outside via a service.

2.  Convert the KTable to a stream and write to a new Kafka topic.  Then
external apps can just consume this feed.  If we only care about the latest
updates, make the topic log-compacted.

latest value per key or last updated might be a different story here,
in the end there is a lot of flexibility here that everyone is free to 
explore


Best Jan



Thanks,

Pete

On Thu, Dec 7, 2017 at 1:42 AM, Jan Filipiak 
wrote:


Hi,

you should be able to retrieve your store with

https://github.com/apache/kafka/blob/trunk/streams/src/main/
java/org/apache/kafka/streams/KafkaStreams.java#L1021

This would give you access to the store from inside your current
application. In your Streams application your could then
expose this Store with a say REST or any other RPC interface, to let
applications from outside your JVM query it.

So i would say the blogpost still applies quite well.

Hope this helps

Best Jan


On 07.12.2017 04:59, Peter Figliozzi wrote:


I've written a Streams application which creates a KTable like this:

val myTable: KTable[String, GenericRecord] = myStream
  .groupByKey()
  .aggregate(myInitializer, myAdder, myStore)

where myStore was configured like this:

val myStore
  : Materialized[String, GenericRecord, KeyValueStore[Bytes,
Array[Byte]]] =
  Materialized
.as("my-store")
.withKeySerde(Serdes.String())
.withValueSerde(genericValueSerde)

What I'd like to do now is query (read) this store from a separate
application.  How do I query it in 1.0.0?  With a KTable constructor,
using
the store string as the topic, i.e.:

public  KTable table(
java.lang.String topic,
Materialized>
materialized)

Or some other way?

I saw this blog post
<https://blog.codecentric.de/en/2017/03/interactive-queries-
in-apache-kafka-streams/>
but it appears to be only applicable to the older version of Kafka (please
correct me if I'm wrong).

Thanks,

Pete






Re: Broker won't exit...

2018-01-10 Thread Jan Filipiak

HI

brokers still try todo a gracefull shutdown I supose?
It would only shut down if it is not the leader of any partition anymore.

Can you verify: there are other brokers alive that took over leadership?
and the broker in question stepped down as a leader for all partitions?

Best Jan



On 10.01.2018 12:57, Ted Yu wrote:

Skip:Can you pastebin the stack trace of the stuck broker ?
Thanks
 Original message From: Skip Montanaro 
 Date: 1/10/18  3:52 AM  (GMT-08:00) To: 
users@kafka.apache.org Subject: Re: Broker won't exit...
Did you stop the broker before stoping zookeeper?


Yes. My stop script executes the server stop scripts in reverse order from
my start script. Should I have stuck in a couple second sleep between
stopping the brokers and stopping zookeeper?

I was actually running two brokers. The one my stop script stopped first
exited properly.

Skip




Re: Kafka Consumer Offsets unavailable during rebalancing

2018-02-13 Thread Jan Filipiak

I would encourage you todo so.
I also think its not reasonable behavior

On 13.02.2018 11:28, Wouter Bancken wrote:

We have upgraded our Kafka version as an attempt to solve this issue.
However, the issue is still present in Kafka 1.0.0.

Can I log a bug for this in JIRA?

Wouter

On 5 February 2018 at 09:22, Wouter Bancken 
wrote:


The consumers in consumer group 'X' do not have a regex subscription
matching the newly created topic 'C'. They simply subscribe with
the subscribe(java.util.Collection topics) method on
topics 'A' and 'B'.

Shouldn't the consumer group have a different state from "Stable" during a
rebalancing regardless of the cause? How else can we determine the consumer
lag of the group during the rebalancing?

Best regards,
Wouter

Have a look at our brand NEW job website: jobs.aca-it.be !


*ACA IT-Solutions NV*
*HQ:* Herkenrodesingel 8B 2.01 | 3500 Hasselt
T +32(0)11 26 50 10 | F +32(0)11 26 50 11
www.aca-it.be | Twitter  | Facebook
 |
Linkedin 

On 5 February 2018 at 00:13, Hans Jespersen  wrote:


Do the consumers in consumer group ‘X’ have a regex subscription that
matches the newly created topic ‘C’?

If they do then they will only discover this new topic once their ‘
metadata.max.age.ms’  metadata refresh interval has passed, which
defaults to 5 minutes.

metadata.max.age.ms The period of time in milliseconds after which
we force a refresh of metadata even if we haven't seen any partition
leadership changes to proactively discover any new brokers or partitions
-hans



On Feb 4, 2018, at 2:16 PM, Wouter Bancken 

wrote:

Hi Hans,

Thanks for the response!

However, I get this result for all topics, not just for the newly

created

topic.

Situation sketch:
1. I have a consumer group 'X' subscribed to topics 'A' and 'B' with
partition assignments and lag information. Consumer group 'X' is

"Stable".

2a. Topic 'C' is (being) created.
2b. During this creation, I do not have a partition assignment for

consumer

group 'X' for topics 'A' and 'B' but the consumer group is still

"Stable".

3. A second later: I have a partition assignment for consumer group 'X'

for

topics 'A' and 'B' again and the consumer group is still "Stable".

I expected the state of consumer group 'X' during step 2b to be
"PreparingRebalance" or "AwaitingSync".

Best regards,
Wouter


On 4 February 2018 at 21:25, Hans Jespersen  wrote:

I believe this is expected behavior.

If there are no subscriptions to a new topic, and therefor no partition
assignments, and definitely no committed offsets, then lag is an

undefined

concept. When the consumers subscribe to this new topic they may chose

to

start at the beginning or end of the commit log so the lag cannot be
predicted in advance.

-hans


On Feb 4, 2018, at 11:51 AM, Wouter Bancken 
wrote:

Can anyone clarify if this is a bug in Kafka or the expected behavior?

Best regards,
Wouter


On 30 January 2018 at 21:04, Wouter Bancken 
Hi,

I'm trying to write an external tool to monitor consumer lag on

Apache

Kafka.

For this purpose, I'm using the kafka-consumer-groups tool to fetch

the

consumer offsets.

When using this tool, partition assignments seem to be unavailable
temporarily during the creation of a new topic even if the consumer

group

has no subscription on this new topic. This seems to match the
documentation


saying *"Topic metadata changes which have no impact on subscriptions
cause resync"*.

However, when this occurs I'd expect the state of the consumer to be
"PreparingRebalance" or "AwaitingSync" but it is simply "Stable".

Is this a bug in the tooling or is there a different way to obtain

the

correct offsets for a consumer group during a rebalance?

I'm using Kafka 10.2.1 but I haven't found any related issues in

recent

changelogs.
Best regards,
Wouter







Re: How to deserialize the object without avro schema?

2016-04-19 Thread jan . omar


Hi, avro Schemas imply a pretty big overhead, if you would include them in 
every message. It's good practice to include a schema id with the message... 
Then you need a schema repository to lookup the matching schema based on the id.

Have a look at confluent.io. They offer a schema repo among other Kafka related 
tools.

Regards 

Jan



Sent from my iPhone
> On 19 Apr 2016, at 08:02, Ratha v  wrote:
> 
> Hi all;
> 
> I try to publish/consume my java objects to kafka. I use Avro schema.
> 
> My basic program works fine. In my program i use my schema in the producer
> (for encoding) and consumer (decoding).
> 
> If i publish different objects to different topics( eg: 100 topics)at the
> receiver, i do not know, what type of message i received. I would like to
> get the avro schema from the received byte and would like to use that for
> decoding. Is that right? If so, how can i retrieve from the received object?
> Or is there any better approach?
> 
> Thanks.
> -- 
> -Ratha
> http://vvratha.blogspot.com/


Re: Can we delete topic in kafka

2016-05-11 Thread Jan Omar
You have to allow topic deletion in server.properties first.

delete.topic.enable = true

Regards

Jan

> On 11 May 2016, at 09:48, Snehalata Nagaje 
>  wrote:
> 
> 
> 
> Hi , 
> 
> Can we delete certain topic in kafka? 
> 
> I have deleted using command 
> 
> ./kafka-topics.sh --delete --topic topic_billing --zookeeper localhost:2181 
> 
> It says topic marked as deletion, but it does not actually delete topic. 
> 
> Thanks, 
> Snehalata 



Is producer relying on file system?

2016-05-27 Thread Jan Algermissen

Hi,

I have a producer question: Is the producer (specifically the normal Java 
producer) using the file system in any way?

If it does so, will a producer work after loosing this file system or its 
content (for example in a containerization scenario)?

Jan

Re: Three consumers on a single partition

2016-06-14 Thread Jan Omar

Hi Rami, 

Each consumer will receive every single message if they belong to different 
consumer groups. Messages will only be distributed between consumers of the 
same consumer group.

So make sure they are in the same consumer group, beware in your case this 
means 2 of the 3 consumers will be starving. To solve this issue you need to 
increase your partition count.

Regards

Jan

> On 14 Jun 2016, at 13:07, Joris Peeters  wrote:
> 
> I suppose the consumers would also need to all belong to the same consumer 
> group for your expectation to hold. If the three consumers belong to 
> different consumer groups, I'd expect each of them to receive all the 
> messages, regardless of the number of partitions.
> So perhaps they are on different consumer groups? What client are you using?
> 
> -Original Message-
> From: thinking [mailto:letianyi...@qq.com]
> Sent: 14 June 2016 12:04
> To: users 
> Subject: Re: Three consumers on a single partition
> 
> hi,
>   1. you should check, if your topic partition really have one.
>   2. does your consumer get same message or different message.
>e.x all message is 1,2,3,4,5,6,7.  consumer1 get 1,3,7 consumer2 get 2,6, 
> consumer3 get 4,5?
> 
> 
> 
> 
> -- Original --
> From:  "Al-Isawi Rami";;
> Date:  Tue, Jun 14, 2016 06:58 PM
> To:  "users@kafka.apache.org";
> 
> Subject:  Three consumers on a single partition
> 
> 
> 
> Hi,
> 
> I have a cluster of 3 brokers and 1 topic which has 1 partition and 
> replication factor of 3. There are also 3 consumers consuming from that topic.
> 
> Now all the docs I have seen say that if number of consumers is bigger than 
> the number of partition ( like in my case 3 consumers 1 partition), then only 
> one consumer will get messages and the other two will not get any, 
> nonetheless, they are all connected.
> 
> However, in my case, all the three consumers are getting messages. Any ideas 
> why this is happening?
> 
> Regards,
> -Rami
> Disclaimer: This message and any attachments thereto are intended solely for 
> the addressed recipient(s) and may contain confidential information. If you 
> are not the intended recipient, please notify the sender by reply e-mail and 
> delete the e-mail (including any attachments thereto) without producing, 
> distributing or retaining any copies thereof. Any review, dissemination or 
> other use of, or taking of any action in reliance upon, this information by 
> persons or entities other than the intended recipient(s) is prohibited. Thank 
> you.
> 
> 
> Winton Capital Management Limited (“Winton”) is a limited company registered 
> in England and Wales with its registered offices at 16 Old Bailey, London, 
> EC4M 7EG (Registered Company No. 3311531). Winton is authorised and regulated 
> by the Financial Conduct Authority in the United Kingdom, registered as an 
> investment adviser with the US Securities and Exchange Commission, registered 
> with the US Commodity Futures Trading Commission and a member of the National 
> Futures Association in the United States.
> 
> This communication, including any attachments, is confidential and may be 
> privileged. This email is for use by the intended recipient only. If you 
> receive it in error, please notify the sender and delete it. You should not 
> copy or disclose all or any part of this email.
> 
> This email does not constitute an offer or solicitation and nothing contained 
> in this email constitutes, and should not be construed as, investment advice. 
> Prospective investors should request offering materials and consult their own 
> advisers with respect to investment decisions and inform themselves as to 
> applicable legal requirements, exchange control regulations and taxes in the 
> countries of their citizenship, residence or domicile. Past performance is 
> not indicative of future results.
> 
> Winton takes reasonable steps to ensure the accuracy and integrity of its 
> communications, including emails. However Winton accepts no liability for any 
> materials transmitted. Emails are not secure and cannot be guaranteed to be 
> error free. Winton handles personal information in accordance with its 
> privacy 
> notice.<https://www.wintoncapital.com/en/regulatory-disclosures#privacypolicy>



Kafka Streams table persistence

2016-06-16 Thread Jan Ehrhardt
Hi,

I am curious about the relationship of KTables and state store in Kafka
Streams. When I create a table with the `table` method, it uses the
original topic as a changelog, thus persisting a table changelog does not
make much sense. When I create invoke `countByKey` on a KStream, the result
is a KTable and I find a changelog topic for it created. Thus Kafka Streams
can recreate the state store. When I join two tables, this will also result
in a new KTable, but this time, there is no changelog topic created.

This is a little bit confusing. Does the `table` method read the whole
original topic as an input again after a restart, or how does it load the
whole table again? Can someone explain the rules to persist or restore a
KTable to or from a changelog?

Best regards
Jan


Re: [DISCUSS] Java 8 as a minimum requirement

2016-06-16 Thread jan . omar

Hi Ismael,

Unfortunately Java 8 doesn't play nice with FreeBSD. We have seen a lot of JVM 
crashes running our 0.9 brokers on Java 8... Java 7 on the other hand is 
totally stable. 

Until these issues have been addressed, this would cause some serious issues 
for us.

Regards 

Jan

Re: Offset Tools

2016-07-11 Thread Jan Omar
Hi Joerg,

We recently built a tool for fetching current offsets (per partition) for a 
given consumer group. Also for setting the group to a specific offset.
It even allows to reset a consumer group to a given timestamp by running bisect 
(binary search) on the messages. 

Unfortunately we're using a proprietary message format, that's why we don't 
have any plans (or capacity) to open source it at the moment.

However builiding that tool was straight forward, it shouldn't take you more 
than a day or two to build something similar. Ping me if you need some help.

Regards

Jan 

> On 11 Jul 2016, at 13:00, Jörg Wagner  wrote:
> 
> Hello!
> 
> We recently switched to Kafka 0.9.0.1 and currently I don't seem to be able 
> to figure out how to read the consumer offsets via cli. We are using the 
> 0.9.0.1 new consumer and are storing the offsets in kafka.
> 
> Status:
> kafka-consumer-offset-checker.sh is old and deprecated, points to 
> kafka-consumer-groups.sh
> kafka-consumer-groups.sh in old consumer mode shows lots of data, but unknown 
> for offsets and lag
> kafka-consumer-groups.sh in new consumer mode shows only one single consumer 
> group, not the one(s) expected. The one available though shows all the 
> correct data I would like to see for a group not shown.
> 
> Short: since 0.9.0.1 I can't see the consumer offsets anymore.
> 
> Anyone else experiencing this?
> 
> Cheers
> Jörg



KIP-33 Opt out from Time Based indexing

2016-08-22 Thread Jan Filipiak

Hello everyone,

I stumbled across KIP-33 and the time based index, while briefly 
checking the wiki and commits, I fail to find a way to opt out.
I saw it having quite some impact on when logs are rolled and was hoping 
not to have to deal with all of that. Is there a disable switch I 
overlooked?


Does anybody have a good use case where the timebase index comes in 
handy? I made a custom console consumer for me,
that can bisect a log based on time. Its just a quick probabilistic shot 
into the log but is sometimes quite useful for some debugging.


Best Jan


Re: Re: KIP-33 Opt out from Time Based indexing

2016-08-24 Thread Jan Filipiak

Hi Jun,

I copy pasted this mail from the archive, as I somehow didn't receive it per 
mail. I will sill make some comments in line,
hopefully you can find them quick enough, my apologies.

To make things more clear, you should also know, that all messages in our kafka 
setup have a common way to access their timestamp already (its encoded in the 
value the same way always)
Sometimes this is a logical time (eg same timestamp accross many different 
topics / partitions), say PHP request start time or the like. So kafkas 
internal timestamps are not really attractive
for us anyways currently.

I hope I can make a point and not waste your time.

Best Jan,

hopefully everything makes sense

----

Jan,

Currently, there is no switch to disable the time based index.

There are quite a few use cases of time based index.

1. From KIP-33's wiki, it allows us to do time-based retention accurately.
Before KIP-33, the time-based retention is based on the last modified time
of each log segment. The main issue is that last modified time can change
over time. For example, if a broker loses storage and has to re-replicate
all data, those re-replicated segments will be retained much longer since
their last modified time is more recent. Having a time-based index allows
us to retain segments based on the message time, not the last modified
time. This can also benefit KIP-71, where we want to combine time-based
retention and compaction.

/If your sparse on discspace, one could try to get by that with retention.bytes/
or, as we did, ssh into the box and rm it, which worked quite good when no one 
reads it.
Chuckles a little when its read but readers usually do an auto.offset.reset
(they are to slow any ways if they reading the last segments hrhr).

2. In KIP-58, we want to delay log compaction based on a configurable
amount of time. Time-based index allows us to do this more accurately.

/good point, seems reasonable/

3. We plan to add an api in the consumer to allow seeking to an offset
based on a timestamp. The time based index allows us to do this more
accurately and fast.

/Sure, I personally feel that you rarely want to do this. For Camus, we used 
max.pull.historic.days (or simmilliar) successfully quite often. we just gave 
it an extra day and got what we wanted
and for debugging my bisect tool works well enough. So these are the 2 usecases 
we expierenced already and found a decent way around it./

Now for the impact.

a. There is a slight change on how time-based rolling works. Before KIP-33,
rolling was based on the time when a segment was loaded in the broker.
After KIP-33, rolling is based on the time of the first message of a
segment. Not sure if this is your concern. In the common case, the two
behave more or less the same. The latter is actually more deterministic
since it's not sensitive to broker restarts.

/This is part of my main concern indeed. This is what scares me and I preffered 
to just opt out, instead of reviewing all our pipelines to check whats gonna 
happen when we put it live.
For Example the Mirrormakers, If they want to preserve create time from the 
source cluster and publish the same create time (wich they should do, if you 
don't encode your own timestamps and want
to have proper kafka-streams windowing). Then I am quite concerned when have 
problems if our cross ocian links and fall behind, say a day or two. Then I can 
think of an very up to date MirrorMaker from
one colocation and a very laggy Mirrormaker from another colocation. For me its 
not 100% clear whats gonna happen. But I can't think of sane defaults there. 
That i love kafka for.
Just tricky to be convinced that an upgrade is safe, wich was usually easy.
/
b. Time-based index potentially adds overhead to producing messages and
loading segments. Our experiments show that the impact to producing is
insignificant. The time to load segments when restarting a broker can be
doubled. However, the absolute time is still reasonable. For example,
loading 10K log segments with time-based index takes about 5 seconds.
/
//Loading should be fine/, totally agree

c Because time-based index is useful in several cases and the impact seems
small, we didn't consider making time based index optional. Finally,
although it's possible to make the time based index optional, it will add
more complexity to the code base. So, we probably should only consider it
if it's truly needed. Thanks,

/I think one can get away with an easier codebase here. The trick is not to 
have the LOG to implement all the logic,
but just have the broker maintain a Set of Indexes, that gets initialized in 
starup and passed to the LOG. One could ask each individual
index, if that logsegment should be rolled, compacted, truncated whatever.  
Once could also give that LogSegment to each index and make it rebuild
the index for example. I didn't figure out the details. But this
https://github.com/apache/kafka/blob/79d3fd2bf0e5c89

Re: KIP-33 Opt out from Time Based indexing

2016-08-24 Thread Jan Filipiak
s now feels wired to me. Gives me a 
feeling of complexity that I don't need and have a hard time figuring 
out how much other people can benefit from it. I hope that this feedback 
is useful and helps to understand my scepticism regarding this thing. 
There were some other oddities that I have a hard time recalling now. So 
i guess the index was build for a specific confluent customer, will 
there be any blogpost about their usecase? or can you share it?


Best Jan

On 24.08.2016 16:47, Jun Rao wrote:

Jan,

Thanks for the reply. I actually wasn't sure what your main concern on 
time-based rolling is. Just a couple of clarifications. (1) Time-based 
rolling doesn't control how long a segment will be retained for. For 
retention, if you use time-based, it will now be based on the 
timestamp in the message. If you use size-based, it works the same as 
before. Is your concern on time-based retention? If so, you can always 
configure the timestamp in all topics to be log append time, which 
will give you the same behavior as before. (2) The creation time of 
the segment is never exposed to the consumer and therefore is never 
preserved in MirrorMaker. In contrast, the timestamp in the message 
will be preserved in MirrorMaker. So, not sure what your concern on 
MirrorMaker is.


Jun

On Wed, Aug 24, 2016 at 5:03 AM, Jan Filipiak 
mailto:jan.filip...@trivago.com>> wrote:


Hi Jun,

I copy pasted this mail from the archive, as I somehow didn't
receive it per mail. I will sill make some comments in line,
hopefully you can find them quick enough, my apologies.

To make things more clear, you should also know, that all messages
in our kafka setup have a common way to access their timestamp
already (its encoded in the value the same way always)
Sometimes this is a logical time (eg same timestamp accross many
different topics / partitions), say PHP request start time or the
like. So kafkas internal timestamps are not really attractive
for us anyways currently.

I hope I can make a point and not waste your time.

Best Jan,

hopefully everything makes sense



Jan,

Currently, there is no switch to disable the time based index.

There are quite a few use cases of time based index.

1. From KIP-33's wiki, it allows us to do time-based retention
accurately.
Before KIP-33, the time-based retention is based on the last
modified time
of each log segment. The main issue is that last modified time can
change
over time. For example, if a broker loses storage and has to
re-replicate
all data, those re-replicated segments will be retained much
longer since
their last modified time is more recent. Having a time-based index
allows
us to retain segments based on the message time, not the last modified
time. This can also benefit KIP-71, where we want to combine
time-based
retention and compaction.

/If your sparse on discspace, one could try to get by that with
retention.bytes/
or, as we did, ssh into the box and rm it, which worked quite good
when no one reads it.
Chuckles a little when its read but readers usually do an
auto.offset.reset
(they are to slow any ways if they reading the last segments hrhr).

2. In KIP-58, we want to delay log compaction based on a configurable
amount of time. Time-based index allows us to do this more accurately.

/good point, seems reasonable/

3. We plan to add an api in the consumer to allow seeking to an offset
based on a timestamp. The time based index allows us to do this more
accurately and fast.

/Sure, I personally feel that you rarely want to do this. For
Camus, we used max.pull.historic.days (or simmilliar) successfully
quite often. we just gave it an extra day and got what we wanted
and for debugging my bisect tool works well enough. So these are
the 2 usecases we expierenced already and found a decent way
around it./

Now for the impact.

a. There is a slight change on how time-based rolling works.
Before KIP-33,
rolling was based on the time when a segment was loaded in the broker.
After KIP-33, rolling is based on the time of the first message of a
segment. Not sure if this is your concern. In the common case, the two
behave more or less the same. The latter is actually more
deterministic
since it's not sensitive to broker restarts.

/This is part of my main concern indeed. This is what scares me
and I preffered to just opt out, instead of reviewing all our
pipelines to check whats gonna happen when we put it live.
For Example the Mirrormakers, If they want to preserve create time
from the source cluster and publish the same create time (wich
they should do, if you don't encode your own timestamps and want
to have proper kafka-streams windowing). Then I am quite conc

Re: KIP-33 Opt out from Time Based indexing

2016-08-26 Thread Jan Filipiak

Hi Jun,

thanks for taking the time to answer on such a detailed level. You are 
right Log.fetchOffsetByTimestamp works, the comment is just confusing
"// Get all the segments whose largest timestamp is smaller than target 
timestamp" wich is apparently is not what takeWhile does (I am more on 
the Java end of things, so I relied on the comment).


Regarding the frequent file rolling i didn't think of Logcompaction but 
that indeed is a place where  can hit the fan pretty easy. 
especially if you don't have many updates in there and you pass the 
timestamp along in a kafka-streams application. Bootstrapping a new 
application then indeed could produce quite a few old messages kicking 
this logrolling of until a recent message appears. I guess that makes it 
a practical issue again even with the 7 days. Thanks for pointing out! 
Id like to see the appendTime as default, I am very happy that I have it 
in the backpocket for purpose of tighter sleep and not to worry to much 
about someone accidentally doing something dodgy on a weekend with our 
clusters


Regarding the usefulness, you will not be able to sell it for me. I 
don't know how people build applications with this ¯\_(ツ)_/¯ but I 
don't want to see them.

Look at the error recovery with timestamp seek:
For fixing a bug, a user needs to stop the SP, truncate all his 
downstream data perfectly based on their time window.Then restart and do 
the first fetch based
again on the perfect window timeout. From then on, he still has NO clue 
whatsoever if messages that come later now with an earlier timestamp 
need to go into the
previous window or not. (Note that there is  >>>absolutly no<<< way to 
determine this in aggregated downstream windowed stores). So the user is 
in  even though he can seek, he
can't rule out his error. IMO it helps them to build the wrong thing, 
that will just be operational pain *somewhere*


Look at the error recovery without timestamp seek:
start your application from beginning with a different output 
(version,key,partition) wait for it to fully catch up. drop the 
timewindows the error happend + confidence interval (if your data isnt 
there anymore, no seek will help) in from the old version. Stop the 
stream processor, merge the data it created, switch back to the original 
(version,key,partition) and start the SP again.
Done. As bigger you choose the confidence interval, the more correct, 
the less the index helps. usually you want maximum confidence => no 
index usage, get everything that is still there. (Maybe even redump from 
hadoop in extreme cases) ironically causing the log to roll all the time 
(as you probably publish to a new topic and have the streams application 
use both) :(


As you can see, even though the users can seek, if they want to create 
proper numbers, Billing information eg. They are in trouble, and giving 
them this index will just make them implement the wrong solution! It 
boils down to: this index is not the kafka way of doing things. The 
index can help the second approach but usually one chooses the 
confidence interval = as much as one can get.


Then the last thing. "OffsetRequest is a legacy request. It's awkward to 
use and we plan to deprecate it over time". You got to be kidding me. It 
was wired to get the byteposition back then, but getting the offsets is 
perfectly reasonable and one of the best things in the world. want to 
know how your stream looked at a specific point in time? get start and 
end offset, fetch whenever you like, you get an perfect snapshot in wall 
time. this is usefull for compacted topis aswell as streaming topics. 
Offsets are a well known thing in kafka and in no way awkward as its 
monotonically increasing property is just great.


For seeking the log based on a confidence interval (the only chance you 
get in non-key logs reprocessing) one can also bisect the log from the 
client. As the case is rare it is intensive and causes at least a few 
hundreds seeks for bigger topics. but I guess the broker does these 
extra for the new index file now.


This index, I feel is just not following the whole "kafka-way". Can you 
suggest on the proposed re-factoring? what are the chance to get it 
upstream if I could pull it off? (unlikely)


Thanks for all the effort you put in into listening to my concerns. 
highly appreciated!


Best Jan



On 25.08.2016 23:36, Jun Rao wrote:

Jan,

Thanks a lot for the feedback. Now I understood your concern better. 
The following are my comments.


The first odd thing that you pointed out could be a real concern. 
Basically, if a producer publishes messages with really old timestamp, 
our default log.roll.hours (7 days) will indeed cause the broker to 
roll a log on ever message, which would be bad. Time-based rolling is 
actually used infrequently. The only use case that I am aware of is 
that for compacted topics, rolling logs based on tim

Re: KIP-33 Opt out from Time Based indexing

2016-09-05 Thread Jan Filipiak

Hi Jun,

sorry for the late reply. Regarding B, my main concern was just 
complexity of understanding what's going on.
As you can see it took me probably some 2 days or so, to fully grab all 
the details in the implementation and what
the impacts are. Usually I prefer to turn things I don't use off, so I 
don't have to bother. Log Append time will work for me.


Rolling logs was my main concern. The producer can specify the timestamp 
and we use epoch inside the message, I'd bet money,
people in the company would have put this epoch also in the produce 
record. => rollings logs as the broker thinks its millis.
So that would probably have caused us at least one outage if a big 
producer had upgraded and done this, IMO likely mistake.


Id just hoped for a more obvious kill-switch, so I didn’t need to bother 
that much.


Best Jan




On 29.08.2016 19:36, Jun Rao wrote:

Jan,

For the usefulness of time index, it's ok if you don't plan to use it.
However, I do think there are other people who will want to use it. Fixing
an application bug always requires some additional work. Intuitively, being
able to seek back to a particular point of time for replay is going to be
much more efficient than always replaying from the very beginning,
especially when the log is retained for a long period of time. Sure, if you
want to have more confidence, you want to rewind a bit conservatively. But
being able to rewind an extra hour makes a big difference from having to
rewind all to way to 7 days or however long the retention time is.

For the OffsetRequest, I actually agree with you that it's useful. People
can use that to find the first and the last offset and the offset based on
a specific point in time. The part that's a bit awkward with OffsetRequest
is that it's based on the last modified time of the log segment, which
makes it imprecise (precision is at the segment level, not message level)
and non-deterministic (last modified time may change). Another awkwardness
is that it supports returning a list of offsets after a specified
timestamp. We did that simply because timestamp was only at the segment
level then. So, our plan is to replace OffsetRequest with a new one. It
will give you the same functionality: find the first and the last offset
and the offset based on a specific point in time. It will just be better
since it's more precise and more deterministic. For your use case, it seems
that you don't care about message creation time. Then, it's possible for
you to configure the broker with the log append time. Whether this should
be default at the Kafka level is debatable, but it won't prevent your use
case.

For your suggesting on refactoring, I still want to understand how
necessary it is. Your main concerns so far seem to be.
(a) Impact on rolling log segments.
(b) Time-based index is not useful for me.

Item (a) is a good point. Thanks for that. We will fix it. Item (b), I have
given my view on this above. Are there any other things that you think that
having a time-based index will hurt?

Thanks,

Jun

On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak 
wrote:


Hi Jun,

thanks for taking the time to answer on such a detailed level. You are
right Log.fetchOffsetByTimestamp works, the comment is just confusing
"// Get all the segments whose largest timestamp is smaller than target
timestamp" wich is apparently is not what takeWhile does (I am more on
the Java end of things, so I relied on the comment).

Regarding the frequent file rolling i didn't think of Logcompaction but
that indeed is a place where  can hit the fan pretty easy. especially
if you don't have many updates in there and you pass the timestamp along in
a kafka-streams application. Bootstrapping a new application then indeed
could produce quite a few old messages kicking this logrolling of until a
recent message appears. I guess that makes it a practical issue again even
with the 7 days. Thanks for pointing out! Id like to see the appendTime as
default, I am very happy that I have it in the backpocket for purpose of
tighter sleep and not to worry to much about someone accidentally doing
something dodgy on a weekend with our clusters

Regarding the usefulness, you will not be able to sell it for me. I don't
know how people build applications with this ¯\_(ツ)_/¯ but I don't want to
see them.
Look at the error recovery with timestamp seek:
For fixing a bug, a user needs to stop the SP, truncate all his downstream
data perfectly based on their time window.Then restart and do the first
fetch based
again on the perfect window timeout. From then on, he still has NO clue
whatsoever if messages that come later now with an earlier timestamp need
to go into the
previous window or not. (Note that there is  >>>absolutly no<<< way to
determine this in aggregated downstream windowed stores). So the user is in
 even though he can seek, he
can&#x

Re: KIP-33 Opt out from Time Based indexing

2016-09-08 Thread Jan Filipiak

Hi Jun,

thanks a lot for the hint, Ill check it out when I get a free minute!

Best Jan

On 07.09.2016 00:35, Jun Rao wrote:

Jan,

For the time rolling issue, Jiangjie has committed a fix (
https://issues.apache.org/jira/browse/KAFKA-4099) to trunk. Perhaps you can
help test out trunk and see if there are any other issues related to
time-based index?

Thanks,

Jun

On Mon, Sep 5, 2016 at 11:52 PM, Jan Filipiak 
wrote:


Hi Jun,

sorry for the late reply. Regarding B, my main concern was just complexity
of understanding what's going on.
As you can see it took me probably some 2 days or so, to fully grab all
the details in the implementation and what
the impacts are. Usually I prefer to turn things I don't use off, so I
don't have to bother. Log Append time will work for me.

Rolling logs was my main concern. The producer can specify the timestamp
and we use epoch inside the message, I'd bet money,
people in the company would have put this epoch also in the produce
record. => rollings logs as the broker thinks its millis.
So that would probably have caused us at least one outage if a big
producer had upgraded and done this, IMO likely mistake.

Id just hoped for a more obvious kill-switch, so I didn’t need to bother
that much.

Best Jan





On 29.08.2016 19:36, Jun Rao wrote:


Jan,

For the usefulness of time index, it's ok if you don't plan to use it.
However, I do think there are other people who will want to use it. Fixing
an application bug always requires some additional work. Intuitively,
being
able to seek back to a particular point of time for replay is going to be
much more efficient than always replaying from the very beginning,
especially when the log is retained for a long period of time. Sure, if
you
want to have more confidence, you want to rewind a bit conservatively. But
being able to rewind an extra hour makes a big difference from having to
rewind all to way to 7 days or however long the retention time is.

For the OffsetRequest, I actually agree with you that it's useful. People
can use that to find the first and the last offset and the offset based on
a specific point in time. The part that's a bit awkward with OffsetRequest
is that it's based on the last modified time of the log segment, which
makes it imprecise (precision is at the segment level, not message level)
and non-deterministic (last modified time may change). Another awkwardness
is that it supports returning a list of offsets after a specified
timestamp. We did that simply because timestamp was only at the segment
level then. So, our plan is to replace OffsetRequest with a new one. It
will give you the same functionality: find the first and the last offset
and the offset based on a specific point in time. It will just be better
since it's more precise and more deterministic. For your use case, it
seems
that you don't care about message creation time. Then, it's possible for
you to configure the broker with the log append time. Whether this should
be default at the Kafka level is debatable, but it won't prevent your use
case.

For your suggesting on refactoring, I still want to understand how
necessary it is. Your main concerns so far seem to be.
(a) Impact on rolling log segments.
(b) Time-based index is not useful for me.

Item (a) is a good point. Thanks for that. We will fix it. Item (b), I
have
given my view on this above. Are there any other things that you think
that
having a time-based index will hurt?

Thanks,

Jun

On Fri, Aug 26, 2016 at 3:41 PM, Jan Filipiak 
wrote:

Hi Jun,

thanks for taking the time to answer on such a detailed level. You are
right Log.fetchOffsetByTimestamp works, the comment is just confusing
"// Get all the segments whose largest timestamp is smaller than target
timestamp" wich is apparently is not what takeWhile does (I am more on
the Java end of things, so I relied on the comment).

Regarding the frequent file rolling i didn't think of Logcompaction but
that indeed is a place where  can hit the fan pretty easy. especially
if you don't have many updates in there and you pass the timestamp along
in
a kafka-streams application. Bootstrapping a new application then indeed
could produce quite a few old messages kicking this logrolling of until a
recent message appears. I guess that makes it a practical issue again
even
with the 7 days. Thanks for pointing out! Id like to see the appendTime
as
default, I am very happy that I have it in the backpocket for purpose of
tighter sleep and not to worry to much about someone accidentally doing
something dodgy on a weekend with our clusters

Regarding the usefulness, you will not be able to sell it for me. I don't
know how people build applications with this ¯\_(ツ)_/¯ but I don't want
to
see them.
Look at the error recovery with timestamp seek:
For fixing a bug, a user needs to stop the SP, truncate all his
downstream
data perfectly based on their ti

Re: To find the Lag of consumer offset using kafka client library

2016-09-29 Thread Jan Omar

Hi Gourab,

Check this out:

https://github.com/linkedin/Burrow <https://github.com/linkedin/Burrow>

Regards

Jan

> On 29 Sep 2016, at 15:47, Gourab Chowdhury  wrote:
> 
> I can get the *Lag* of offsets with the following command:-
> 
> bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper
> localhost:2182 --describe --group DemoConsumer
> 
> I am trying to find code that uses kafka library to find the *Lag* of
> offsets in consumer?
> 
> Also is there any other documentation other than https://github.com/apache/
> kafka/tree/trunk/docs? I can't find much documentation of kafka.
> 
> Thanks,
> Gourab Chowdhury,
> Software Engg. JunctionTV Inc.



  1   2   >