Re: Avro Serialization & Schema Registry ..

2017-07-19 Thread Michael Noll
In short, Avro serializers/deserializers provided by Confluent always
integrate with (and thus require) Confluent Schema Registry.  That's why
you must set the `schema.registry.url` configuration for them.

If you want to use Avro but without a schema registry, you'd need to work
with the Avro API directly.  You can also implement your own "no schema
registry" Avro serializers/deserializers for more convenience, of course.

Best wishes,
Michael



On Mon, Jul 17, 2017 at 8:51 PM, Debasish Ghosh 
wrote:

> I am using the class io.confluent.kafka.serializers.KafkaAvroSerializer as
> one of the base abstractions for Avro serialization. From the stack trace I
> see that the instantiation of this class needs set up of
> KafkaAvroSerializerConfig which needs a value for the schema registry url
> ..
>
> regards.
>
> On Tue, Jul 18, 2017 at 12:02 AM, Richard L. Burton III <
> mrbur...@gmail.com>
> wrote:
>
> > For your first question, no you can use the avro API.
> >
> >
> >
> > On Mon, Jul 17, 2017 at 2:29 PM Debasish Ghosh  >
> > wrote:
> >
> >> Hi -
> >>
> >> I am using Avro Serialization in a Kafka Streams application through the
> >> following dependency ..
> >>
> >> "io.confluent"  % "kafka-avro-serializer" % "3.2.2"
> >>
> >> My question is : Is schema registry mandatory for using Avro
> Serialization
> >> ? Because when I run the application I get the following exception where
> >> it
> >> complains that there is no default value for "schema.registry.url". My
> >> current settings for StreamsConfig are the following ..
> >>
> >>   settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >> "kstream-log-processing-avro")
> >>settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)
> >>settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >> Serdes.ByteArray.getClass.getName)
> >>settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >> classOf[SpecificAvroSerde[LogRecordAvro]])
> >>
> >> .. and the exception ..
> >>
> >> 23:49:34.054 TKD [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator
> -
> >> User provided listener
> >> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> >> kstream-log-processing-avro failed on partition assignment
> >> org.apache.kafka.streams.errors.StreamsException: Failed to configure
> >> value
> >> serde class com.lightbend.fdp.sample.kstream.serializers.
> >> SpecificAvroSerde
> >> at org.apache.kafka.streams.StreamsConfig.valueSerde(
> >> StreamsConfig.java:594)
> >> at
> >> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<
> >> init>(AbstractProcessorContext.java:58)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.(
> >> ProcessorContextImpl.java:41)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamTask.(StreamTask.java:137)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamThread.createStreamTask(StreamThread.java:864)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> >> createTask(StreamThread.java:1237)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$
> >> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(
> >> StreamThread.java:967)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.access$600(
> >> StreamThread.java:69)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$1.
> >> onPartitionsAssigned(StreamThread.java:234)
> >> at
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> >> onJoinComplete(ConsumerCoordinator.java:259)
> >> at
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> joinGroupIfNeeded(AbstractCoordinator.java:352)
> >> at
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> ensureActiveGroup(AbstractCoordinator.java:303)
> >> at
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> >> ConsumerCoordinator.java:290)
> >> at
> >> org.apache.kafka.clients.consumer.KafkaConsumer.
> >> pollOnce(KafkaConsumer.java:1029)
> >> at
> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >> KafkaConsumer.java:995)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >> StreamThread.java:592)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:361)
> >> Caused by: io.confluent.common.config.ConfigException: Missing required
> >> configuration "schema.registry.url" which has no default value.
> >> at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:241)
> >> at io.confluent.common.config.AbstractConfig.(
> >> AbstractConfig.java:76)
> >> at
> >> io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.(
> >> AbstractKafkaAvroSerDeConfig.java:51)
> >> at
> >> io.confluent.kafka.serializers.KafkaAvroSerializerConfig.(
> >> KafkaAvroSerializerConfig.java:33)

Re: KTable-KTable Join Semantics on NULL Key

2017-09-14 Thread Michael Noll
Perhaps a clarification to what Damian said:

It is shown in the (HTML) table at the link you shared [1] what happens
when you get null values for a key.

We also have slightly better join documentation at [2], the content/text of
which we are currently migrating over to the official Apache Kafka
documentation for the Streams API (under
kafka.apache.org/documentation/streams).

[1]
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KTable-KTableJoin
[2]
https://docs.confluent.io/current/streams/developer-guide.html#kstream-kstream-join


On Fri, Sep 8, 2017 at 3:19 PM, Damian Guy  wrote:

> It is shown in the table what happens when you get null values for a key.
>
> On Fri, 8 Sep 2017 at 12:31 Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
> > Hi Kafka Users,
> >
> > KTable-KTable Join Semantics is explained in detailed [here][1]. But,
> > it's not clear when the input record is , some times the output
> > records are generated  and in some cases it's not.
> >
> > It will be helpful, if someone explain on how the output records are
> > generated for all the 3 types of joins on receiving a record with NULL
> > value.
> >
> > [1]: https://cwiki.apache.org/confluence/display/KAFKA/
> > Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KTable-KTableJoin
> >
> > -- Kamal
> >
>


Re: left join between PageViews(KStream) & UserProfile (KTable)

2017-10-23 Thread Michael Noll
> *What key should the join on ? *

The message key, on both cases, should contain the user ID in String format.

> *There seems to be no common key (eg. user) between the 2 classes - PageView
and UserProfile*

The user ID is the common key, but the user ID is stored in the respective
message *keys*, whereas PageView and UserProfile values are stored in the
message *values* in Kafka.

Btw, you might also want to take a look at
https://github.com/confluentinc/kafka-streams-examples, which has many more
examples, including a PageView demo called `PageViewRegionLambdaExample`
that is similar to the one you shared above.  `PageViewRegionLambdaExample`
ships with a data generator for the example, and instructions for how to
run it.

It would be great if we had the same setup for the Apache Kafka page view
example, of course.  Pull requests are very welcome. :-)

Hope this helps,
Michael






On Sun, Oct 22, 2017 at 9:30 PM, karan alang  wrote:

> Hello all -
> I'm trying to run the sample PageView Kafka streams example,
> (ref -
> https://github.com/apache/kafka/tree/0.11.0/streams/
> examples/src/main/java/org/apache/kafka/streams/examples/pageview
> )
> and have a question ..
>
> There is a leftJoin between PageView (Kstream) and UserProfile(Ktable) as
> shown below... The join should give - PageViewByRegions
>
> *What key should the join on ? *
>
> *There seems to be no common key (eg. user) between the 2 classes -
> PageView and UserProfile*
>
> *Also, what should a sample input data in the PageView & UserProfile
> specific topics be ?*
> Do we need to add surrogate (id) key to the input for these, to enable the
> left join ?
>
> Code :
>
> *- static Classes  *
>
> >
> > static public class PageView {
> > public String user;
> > public String page;
> > public Long timestamp;
> > }
> >
>
>
> > static public class UserProfile {
> > public String region;
> > public Long timestamp;
> > }
>
>
>
> *--Join between the views( i.e. KStream - PageView) & users (KTable
> - UserProfile) *
>
>
> KStream kstream1 =
> > *views.leftJoin(users, *
> > //PageView - first value type
> > //UserProfile - 2nd value type
> > //PageViewByRegion - Joined Value
> > new ValueJoiner() {
> > @Override
> > public PageViewByRegion apply(PageView view, UserProfile
> > profile) {
> > PageViewByRegion viewByRegion = new PageViewByRegion();
> > viewByRegion.user = view.user;
> > viewByRegion.page = view.page;
> > System.out.println(" viewByRegion.user " +
> > viewByRegion.user);
> > System.out.println(" viewByRegion.page " +
> > viewByRegion.page);
> >
> > if (profile != null) {
> > viewByRegion.region = profile.region;
> > } else {
> > viewByRegion.region = "UNKNOWN";
> > }
> >
> > System.out.println(" viewByRegion.page " +
> > viewByRegion.region);
> >
> > return viewByRegion;
> > }
> > }
> > )
>


Re: Kafka 11 | Stream Application crashed the brokers

2017-12-01 Thread Michael Noll
Thanks for reporting back, Sameer!


On Fri, Dec 1, 2017 at 2:46 AM, Guozhang Wang  wrote:

> Thanks for confirming Sameer.
>
>
> Guozhang
>
> On Thu, Nov 30, 2017 at 3:52 AM, Sameer Kumar 
> wrote:
>
> > Just wanted to let everyone know that this issue got fixed in Kafka
> 1.0.0.
> > I recently migrated to it and didnt find the issue any longer.
> >
> > -Sameer.
> >
> > On Thu, Sep 14, 2017 at 5:50 PM, Sameer Kumar 
> > wrote:
> >
> > > ;Ok. I will inspect this further and keep everyone posted on this.
> > >
> > > -Sameer.
> > >
> > > On Thu, Sep 14, 2017 at 1:46 AM, Guozhang Wang 
> > wrote:
> > >
> > >> When exactly_once is turned on the transactional id would be set
> > >> automatically by the Streams client.
> > >>
> > >> What I'd inspect is the healthiness of the brokers since the "
> > >> *TimeoutException*", if you have metrics on the broker servers
> regarding
> > >> request handler thread idleness / request queue length / request rate
> > etc,
> > >> you can monitor that and see what could be the possible causes of the
> > >> broker unavailability.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Wed, Sep 13, 2017 at 8:26 AM, Sameer Kumar  >
> > >> wrote:
> > >>
> > >> > Adding more info:-
> > >> >
> > >> > Hi Guozhang,
> > >> >
> > >> > I was using exactly_once processing here, I can see this in the
> client
> > >> > logs, however I am not setting transaction id though.
> > >> >
> > >> > application.id = c-7-e6
> > >> > application.server =
> > >> > bootstrap.servers = [172.29.65.190:9092, 172.29.65.192:9092,
> > >> > 172.29.65.193:9092]
> > >> > buffered.records.per.partition = 1
> > >> > cache.max.bytes.buffering = 2097152000
> > >> > client.id =
> > >> > commit.interval.ms = 5000
> > >> > connections.max.idle.ms = 54
> > >> > default.key.serde = class
> > >> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> > >> > default.timestamp.extractor = class
> > >> > org.apache.kafka.streams.processor.FailOnInvalidTimestamp
> > >> > default.value.serde = class
> > >> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> > >> > key.serde = class org.apache.kafka.common.serial
> > >> ization.Serdes$StringSerde
> > >> > metadata.max.age.ms = 6
> > >> > metric.reporters = []
> > >> > metrics.num.samples = 2
> > >> > metrics.recording.level = INFO
> > >> > metrics.sample.window.ms = 3
> > >> > num.standby.replicas = 0
> > >> > num.stream.threads = 15
> > >> > partition.grouper = class
> > >> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
> > >> > poll.ms = 100
> > >> > processing.guarantee = exactly_once
> > >> > receive.buffer.bytes = 32768
> > >> > reconnect.backoff.max.ms = 1000
> > >> > reconnect.backoff.ms = 50
> > >> > replication.factor = 1
> > >> > request.timeout.ms = 4
> > >> > retry.backoff.ms = 100
> > >> > rocksdb.config.setter = null
> > >> > security.protocol = PLAINTEXT
> > >> > send.buffer.bytes = 131072
> > >> > state.cleanup.delay.ms = 4611686018427386903
> > >> > state.dir = /data/streampoc/
> > >> > timestamp.extractor = class
> > >> > org.apache.kafka.streams.processor.WallclockTimestampExtractor
> > >> > value.serde = class org.apache.kafka.common.serialization.Serdes$
> > >> > StringSerde
> > >> > windowstore.changelog.additional.retention.ms = 8640
> > >> > zookeeper.connect =
> > >> >
> > >> >
> > >> > On Wed, Sep 13, 2017 at 12:16 PM, Sameer Kumar <
> > sam.kum.w...@gmail.com>
> > >> > wrote:
> > >> >
> > >> > > Hi Guozhang,
> > >> > >
> > >> > > The producer sending data to this topic is not running
> concurrently
> > >> with
> > >> > > the stream processing. I had first ingested the data from another
> > >> cluster
> > >> > > and then have the stream processing ran on it. The producer code
> is
> > >> > written
> > >> > > by me and it doesnt have transactions on by default.
> > >> > >
> > >> > > I will double check if someone else has transaction turned on, but
> > >> this
> > >> > is
> > >> > > quite unlikely. Is there someway to verify it through logs.
> > >> > >
> > >> > > All of this behavior works fine when brokers are run on Kafka 10,
> > this
> > >> > > might be because transactions are only available on Kafka11. I am
> > >> > > suspecting would there be a case that too much processing is
> causing
> > >> one
> > >> > of
> > >> > > the brokers to crash. The timeouts are indicating that it is
> taking
> > >> time
> > >> > to
> > >> > > send data
> > >> > >
> > >> > > I have tried this behavior also on a another cluster which I
> > >> exclusively
> > >> > > use it for myself and found the same behavior there as well.
> > >> > >
> > >> > > What do you think should be our next step so that we can get to
> the
> > >> root
> > >> > > of the issue.
> > >> > >
> > >> > > -Sameer.
> > >> > >
> > >> > > On Wed, Sep 13, 2017 at 6:14 AM, Guozhang Wang <
> wangg...@gmail.com>
> > >> > wrote:
> > >> > >
> > >> > >> Hi Sameer,
> > >> > >>
> > >> > >> If no clients has transactions turned on the
> `__transaction_state`
>

Re: Kafka-streams: mix Processor API with windowed grouping

2018-04-10 Thread Michael Noll
Also, if you want (or can tolerate) probabilistic counting, with the option
to also do TopN in that manner, we also have an example that uses Count Min
Sketch:
https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala

The link/branch above is the code variant for Kafka 1.0.

The example implements a custom (fault-tolerant) state store backed by CMS,
which is then used in a Transformer.  The Transformer is then plugged into
the DSL for a mix-and-match setup of DSL and Processor API.


On Mon, Apr 9, 2018 at 9:34 PM, Dmitriy Vsekhvalnov 
wrote:

> Thanks again,
>
> yeah we saw that example for sure :)
>
> Ok, gonna try low-level Transformer and see how it goes.
>
>
> On Mon, Apr 9, 2018 at 9:17 PM, Matthias J. Sax 
> wrote:
>
> > For (1), no.
> >
> > If you want to do manual put/get you should use a Transformer and
> > implement a custom operator.
> >
> > Btw: here is an example of TopN:
> > https://github.com/confluentinc/kafka-streams-
> > examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/
> > TopArticlesExampleDriver.java
> >
> >
> >
> > -Matthias
> >
> > On 4/9/18 4:46 AM, Dmitriy Vsekhvalnov wrote:
> > > Hi Matthias, thanks
> > >
> > > clarifications below:
> > >
> > > 1. .aggregate( () -> .. ,
> > >(k, v, agg) -> {
> > >//Can i access KV store here for manual
> > put/get?
> > >   });
> > >
> > > 2. TopN is not hard, we using pretty much same approach you describe,
> > just
> > > with bounded priority queue.  The problematic part with 'remaining
> > count' -
> > > everything else not in topN records. It appeared to be quite complex in
> > > streaming world (or we missed something). I'll try to illustrate,
> > assuming
> > > simplified event flow:
> > >
> > >  - acme.com: 100 hits  -> too small, not in TopN, we adding it to
> > remaining
> > > count
> > >  - ... some time later
> > >  - acme.com: 150 hits -> still too small, adding to remaining count
> > >
> > > Problem: we added 250 hits to remaining, but actually we had to add
> only
> > > 150 hits. We have to subtract previous count and it means we need to
> keep
> > > them all somewhere. That's where we hope access to KV store can help.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Sat, Apr 7, 2018 at 10:11 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> >  ok, question then - is it possible to use state store with
> > .aggregate()?
> > >>
> > >> Not sure what you exactly mean by this. An aggregations always uses a
> > >> store; it's a stateful operation and cannot be computed without a
> store.
> > >>
> > >> For TopN, if you get the hit-count as input, you can use a
> > >> `.aggregate()` operator that uses an array or list out output -- this
> > >> list contains the topN and each time, aggregate() is called, you check
> > >> if the new count replaces and existing count in the array/list.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 4/6/18 10:36 AM, Dmitriy Vsekhvalnov wrote:
> > >>> Thanks guys,
> > >>>
> > >>> ok, question then - is it possible to use state store with
> > .aggregate()?
> > >>>
> > >>> Here are some details on counting, we basically looking for TopN +
> > >>> Remaining calculation.
> > >>>
> > >>> Example:
> > >>>
> > >>> - incoming data: api url -> hit count
> > >>>
> > >>> - we want output: Top 20 urls per each domain per hour + remaining
> > count
> > >>> per domain (e.g. sum of all other urls hits that do not belong to top
> > 10
> > >>> per each domain per hour).
> > >>>
> > >>> With some grouping variations.
> > >>>
> > >>> Make some sense? Always open for better ideas :)
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang 
> > >> wrote:
> > >>>
> >  Hello Dmitriy,
> > 
> >  You can "simulate" an lower-level processor API by 1) adding the
> > stores
> > >> you
> >  need via the builder#addStore(); 2) do a manual "through" call after
> >  "selectKey" (the selected key will be the same as your original
> > groupBy
> >  call), and then from the repartitioned stream add the `transform()`
> >  operator to do manual windowed counting.
> > 
> >  But before you really go into this route, I'd first like to validate
> > if
> > >> the
> >  provided `Aggregate`, `Initialize` functions really cannot meet your
> >  "overcomplicated
> >  version of record counting", could you elaborate a bit more on this
> > >> logic
> >  so maybe we can still around it around with the pure high-level DSL?
> > 
> > 
> >  Guozhang
> > 
> > 
> >  On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov <
> >  dvsekhval...@gmail.com>
> >  wrote:
> > 
> > > Hey, good day everyone,
> > >
> > > another kafka-streams friday question.
> > >
> > > We hit the wal

Re: Question about applicability of Kafka Streams

2016-05-30 Thread Michael Noll
Kim,

yes, Kafka Stream is very suitable for this kind of application.

The code example that Tobias linked to should be a good starting point for
you (thanks, Tobias!).

Best,
Michael



On Fri, May 27, 2016 at 4:35 AM, BYEONG-GI KIM  wrote:

> Thank you very much for the information!
> I'll look into it.
>
> Best regards
>
> KIM
>
> 2016-05-27 11:31 GMT+09:00 Tobias Adamson :
>
> > Hi Kim
> > Would maybe this example work for you?
> >
> >
> https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview
> > <
> >
> https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview
> > >
> >
> > It included JSON -> POJO -> JSON steps and could probably be adapted for
> > your case?
> >
> > Regards
> > Toby
> > > On 27 May 2016, at 10:20 AM, BYEONG-GI KIM  wrote:
> > >
> > > Hello.
> > >
> > > First I thank you so much for the devs since they've been making a
> great
> > > tool as open-source software.
> > >
> > > I'm considering to apply a new feature of the Kafka, aka Kafka Streams,
> > on
> > > my simple handler application, which receives monitoring data from
> > Collectd
> > > and reproduce transformed messages to Kafka Broker(s). For example, I'd
> > > want to change the collected message from Collectd like,
> > >
> > >
> >
> [{"values":[1901474177],"dstypes":["counter"],"dsnames":["value"],"time":1280959128,"interval":10,"host":"
> > > leeloo.octo.it
> > >
> >
> ","plugin":"cpu","plugin_instance":"0","type":"cpu","type_instance":"idle"}]
> > >
> > > to my customized alarm message like,
> > >
> > > {"alarmMsgType":"threshold", "time":145943640, "host":"
> > leeloo.octo.it
> > > ","category":"CPU","type":"IDLE",
> > > "detail":"0","alarmLevel":"critical","message":"cpu
> > > error","value":"1901474177"}
> > >
> > > of course, the re-produced message must be sent to Kafka Broker(s).
> > >
> > > The problem is that, the message(s) from Collectd is Json-formatted so
> > that
> > > it seems the Kafka Streams processing would become complicated, i.e.,
> it
> > > should be JSONParsed from String to JSON and vise versa after
> transform.
> > >
> > > Is it suitable to use the Kafka Stream for this kind of application?
> > >
> > > Any better idea or comments would also really helpful for me. Thanks in
> > > advance!
> > >
> > > Best regards
> > >
> > > KIM
> >
> >
>


Re: Avro deserialization

2016-05-30 Thread Michael Noll
Rick,


Is your code really importing the correct ConsumerConfig objects?

It should be:

import kafka.consumer.ConsumerConfig;

If you are using your IDE's auto-import feature, you might however end up
with the following import, which will give you the "ConsumerConfig is not a
public class" compile error:

import org.apache.kafka.clients.consumer.ConsumerConfig;

Lastly, it looks as if you need to update the following line as well:

// Note that this constructs from props (j.u.Properties), not vProps
(VerifiableProperties)
ConsumerConnector consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(new
ConsumerConfig(props));

Let us know if this solves your error.  The CP 3.0.0 docs might need a fix
then (to change vProps to props).

Best,
Michael



On Sun, May 29, 2016 at 2:49 PM, Rick Mangi  wrote:

> Hello all,
>
> I’m trying to use the new schema registry to read avro encoded messages I
> created with kafka connect as described here:
> http://docs.confluent.io/3.0.0/schema-registry/docs/serializer-formatter.html
>
> The example code is obviously not correct, but beyond the obvious, I can’t
> seem to figure out how to register KafkaAvroDecoder with a consumer. The
> example given
>
> ConsumerConnector consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(
> vProps));
>
> Is illegal, ConsumerConfig is a private class and can’t be instantiated.
> It also seems that KafkaAvroDecoder does not implement Deserializer, and
> thus can’t be used in the normal way deserializers are registered.
>
> Has anyone gotten this stuff to work?
>
> Thanks,
>
> Rick
>
>


-- 
Best regards,
Michael Noll



*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*


Re: Avro deserialization

2016-05-31 Thread Michael Noll
FYI: I fixed the docs of schema registry (vProps -> props).

Best, Michael


On Tue, May 31, 2016 at 2:05 AM, Rick Mangi  wrote:

> That was exactly the problem, I found the example here to be very helpful
> -
> https://github.com/confluentinc/examples/blob/master/kafka-clients/specific-avro-consumer/src/main/java/io/confluent/examples/consumer/AvroClicksSessionizer.java
>
> IMHO it’s confusing having the same class names in different packages when
> most people probably rely on an IDE to manage their imports.
>
> Thanks!
>
> Rick
>
>
> On May 30, 2016, at 5:44 AM, Michael Noll  wrote:
>
> Rick,
>
>
> Is your code really importing the correct ConsumerConfig objects?
>
> It should be:
>
>import kafka.consumer.ConsumerConfig;
>
> If you are using your IDE's auto-import feature, you might however end up
> with the following import, which will give you the "ConsumerConfig is not a
> public class" compile error:
>
>import org.apache.kafka.clients.consumer.ConsumerConfig;
>
> Lastly, it looks as if you need to update the following line as well:
>
>// Note that this constructs from props (j.u.Properties), not vProps
> (VerifiableProperties)
>ConsumerConnector consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(new
> ConsumerConfig(props));
>
> Let us know if this solves your error.  The CP 3.0.0 docs might need a fix
> then (to change vProps to props).
>
> Best,
> Michael
>
>
>
> On Sun, May 29, 2016 at 2:49 PM, Rick Mangi  wrote:
>
> Hello all,
>
> I’m trying to use the new schema registry to read avro encoded messages I
> created with kafka connect as described here:
>
> http://docs.confluent.io/3.0.0/schema-registry/docs/serializer-formatter.html
>
> The example code is obviously not correct, but beyond the obvious, I can’t
> seem to figure out how to register KafkaAvroDecoder with a consumer. The
> example given
>
> ConsumerConnector consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(
> vProps));
>
> Is illegal, ConsumerConfig is a private class and can’t be instantiated.
> It also seems that KafkaAvroDecoder does not implement Deserializer, and
> thus can’t be used in the normal way deserializers are registered.
>
> Has anyone gotten this stuff to work?
>
> Thanks,
>
> Rick
>
>
>
>
> --
> Best regards,
> Michael Noll
>
>
>
> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
> Apache Kafka and Confluent Platform: www.confluent.io/download
> <http://www.confluent.io/download>*
>
>
>


-- 
Best regards,
Michael Noll



*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*


Re: Best monitoring tool for Kafka in production

2016-06-02 Thread Michael Noll
Hafsa,

since you specifically asked about non-free Kafka monitoring options as
well:  As of version 3.0.0, the Confluent Platform provides a commercial
monitoring tool for Kafka called Confluent Control Center.  (Disclaimer: I
work for Confluent.)

Quoting from the product page at
http://www.confluent.io/product/control-center:

"Know where your messages are at every step between source and destination.
Identify slow brokers, delivery failures, and sleuth the truth out of
unexpected latency in your network. Confluent Control Center delivers
end-to-end stream monitoring. Unlike other monitoring tools, this one is
purpose-built for your Kafka environment. Instead of identifying the
throughput in your data center or other “clocks and cables” types of
monitors, it tracks messages."

Best wishes,
Michael




On Wed, May 25, 2016 at 12:42 PM, Hafsa Asif 
wrote:

> Hello,
>
> What is the best monitoring tool for Kafka in production, preferable free
> tool? If there is no free tool, then please mention non-free efficient
> monitoring tools also.
>
> We are feeling so much problem without monitoring tool. Sometimes brokers
> goes down or consumer is not working, we are not informed.
>
> Best Regards,
> Hafsa
>


Re: Does the Kafka Streams DSL support non-Kafka sources/sinks?

2016-06-03 Thread Michael Noll
Avi,

just adding a bit to what Gwen and Eno said, and providing a few pointers.

If you are using the DSL, you can use the `process()` method to "do
whatever you want".  See "Applying a custom processor" in the Kafka Streams
DSL chapter of the Developer Guide at
http://docs.confluent.io/3.0.0/streams/developer-guide.html#applying-a-custom-processor
.

Alternatively, you can also use the low-level Processor API directly.
Here, you'd implement the `Processor` interface, where the most notable
method is (again) a method called `process()`.See the Processor API section
in the Developer Guide at
http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-processor-api
.

Hope this helps!
Michael




On Fri, Jun 3, 2016 at 3:09 AM, Avi Flax  wrote:

> On 6/2/16, 07:03, "Eno Thereska"  wrote:
>
> > Using the low-level streams API you can definitely read or write to
> arbitrary
> > locations inside the process() method.
>
> Ah, good to know — thank you!
>
> > However, back to your original question: even with the low-level streams
> > API the sources and sinks can only be Kafka topics for now. So, as Gwen
> > mentioned, Connect would be the way to go to bring the data to a Kafka
> > Topic first.
>
> Got it — thank you!
>
>


-- 
Best regards,
Michael Noll



*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*


Re: Python kafka client benchmarks

2016-06-16 Thread Michael Noll
Thanks a lot for the investigation and sharing the results, John!

-Michael


On Thu, Jun 16, 2016 at 7:59 AM, Dana Powers  wrote:

> Very nice!
>
> On Wed, Jun 15, 2016 at 6:40 PM, John Dennison 
> wrote:
> > My team has published a post comparing python kafka clients. Might be of
> > interest to python users.
> >
> >
> http://activisiongamescience.github.io/2016/06/15/Kafka-Client-Benchmarking/
>


Re: Groupby Operator

2016-06-16 Thread Michael Noll
Davood,

you are reading the input topic into a KTable, which means that subsequent
records for the same key (such as the key `1`, which appears twice in the
input messages/records) will be considered as updates to any previous
records for that key.  So I think what you actually want to do is read the
input as a KStream instead of a KTable?

The following code works for me, it looks like what you're trying to do.
Note that I am reading the input data into a KStream, not a KTable.

Input:
  new KeyValue<>(1, "message1"),
  new KeyValue<>(1, "message1"),
  new KeyValue<>(2, "message2"),
  new KeyValue<>(3, "message3"),
  new KeyValue<>(4, "message4")

Streams topology:

  KStream input = builder.stream(Serdes.Integer(),
Serdes.String(), inputTopic);
  KTable counted = input
  .map((key, value) -> KeyValue.pair(value, value))
  .countByKey(Serdes.String(), "counted");
  counted.to(Serdes.String(), Serdes.Long(), outputTopic);

Output:
  new KeyValue<>("message1", 1L),
  new KeyValue<>("message1", 2L),
  new KeyValue<>("message2", 1L),
  new KeyValue<>("message3", 1L),
  new KeyValue<>("message4", 1L)

Does that help?
Michael




On Thu, Jun 16, 2016 at 4:20 PM, Davood Rafiei 
wrote:

> Hi,
>
>
> I am trying to use groupby operator in simple example. However, I get
> strange results.
>
> I have inputs  on "test" topic like: (Long, String)
> 1Message_1
> 1Message_1
> 2Message_2
> 3Message_3
> 4Message_4
>
> I want to get counts of each value. So:
> Message_1 2
> Message_1 1
> Message_2 1
> Message_3 1
> Message_4 1
>
> Because there is not any operator like groupby (fieldIndex), I assume that
> groupby works always on keys.
>
> So, my program is:
>
>   KTable source = builder.table(longSerde, stringSerde,
> "test");
>   KTable counts =  source.groupBy(new KeyValueMapper String, KeyValue>() {
>
> @Override
> public KeyValue apply(Long key, String value) {
> // TODO Auto-generated method stub
>  return  KeyValue.pair(value, value);
> }
> },Serdes.String(), Serdes.String()).count("count");
>   counts.print();;
>
> And I get this output as a result:
>
> Message_11
> Message_10
> Message_11
> Message_10
> Message_21
> Message_20
> Message_31
> Message_30
> Message_41
> Message_40
>
> I couldn't  understand this behavior.
>
>
> Cheers
> Davood
>


Re: WordCount example errors

2016-06-16 Thread Michael Noll
Jeyhun,

just to confirm for you: Kafka Streams only works with Kafka 0.10 brokers
[1].

Best,
Michael

[1]
http://docs.confluent.io/3.0.0/streams/faq.html#can-i-use-kafka-streams-with-kafka-clusters-running-0-9-0-8-or-0-7



On Tue, Jun 14, 2016 at 3:03 PM, Jeyhun Karimov 
wrote:

> Thanks for reply. I am using the recent version of kafka and streams
> library. I forked them from github. @Peter, yes probably this is due to
> versions.
> The problem was that, I already installed kafka on OSX with brew and when I
> tried to run, probably it linked to that library instead of the one I
> forked from github. I deleted the installed library, now it works both from
> IDE and terminal.
>
> Thanks
>
> On Tue, Jun 14, 2016 at 2:47 PM Peter Davis  wrote:
>
> > This looks like the error that occurs when you use the 0.10 client with
> an
> > 0.9 broker.  The broker needs to be upgraded first.  Jeyhun, what
> versions
> > are you running?
> >
> > (I sincerely hope this error message will be improved next time there are
> > wire protocol changes!)
> >
> > -Peter
> >
> >
> > > On Jun 14, 2016, at 4:19 AM, Eno Thereska 
> > wrote:
> > >
> > > HI Jeyhun,
> > >
> > > What version of Kafka are you using?
> > >
> > > I haven't run this using Eclipse, but could you try building and
> running
> > from the command line (instead of from within Eclipse) as described in
> that
> > quickstart document? Does that work?
> > >
> > > Thanks
> > > Eno
> > >
> > >> On 13 Jun 2016, at 20:27, Jeyhun Karimov 
> wrote:
> > >>
> > >> Hi Community,
> > >>
> > >> I am new to kafka-streams and trying to run WordCount example.
> > >> I get this error whatever I do:
> > >>
> > >> Exception in thread "StreamThread-1"
> > >> org.apache.kafka.common.protocol.types.SchemaException: Error reading
> > field
> > >> 'topic_metadata': Error reading array of size 1209204, only 46 bytes
> > >> available
> > >>
> > >> at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
> > >>
> > >> at org.apache.kafka.clients.NetworkClient.parseResponse(
> > >> NetworkClient.java:380)
> > >>
> > >> at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
> > >> NetworkClient.java:449)
> > >>
> > >> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
> > >>
> > >> at
> > >>
> >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(
> > >> ConsumerNetworkClient.java:360)
> > >>
> > >> at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:224)
> > >>
> > >> at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:192)
> > >>
> > >> at
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:163)
> > >>
> > >> at
> > >>
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(
> > >> AbstractCoordinator.java:179)
> > >>
> > >> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> > >> KafkaConsumer.java:973)
> > >>
> > >> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > >> KafkaConsumer.java:937)
> > >>
> > >> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:295)
> > >>
> > >> at org.apache.kafka.streams.processor.internals.StreamThread.run(
> > >> StreamThread.java:218)
> > >>
> > >>
> > >> I am running exactly the same code WordCountDemo from github. I
> > downloaded
> > >> all libraries, build with gradle for eclipse, imported them to eclipse
> > and
> > >> run from there.
> > >> I initialized the zookeper and kafka-server and created kafka brooker
> as
> > >> described in http://docs.confluent.io/3.0.0/streams/quickstart.html.
> > >>
> > >> I am using Eclipse Scala IDE 3.0 .
> > >>
> > >>
> > >> Cheers
> > >> Jeyhun
> > >> --
> > >> -Cheers
> > >>
> > >> Jeyhun
> > >
> >
> --
> -Cheers
>
> Jeyhun
>


Re: Kafka without Storm/Spark

2016-07-01 Thread Michael Noll
Numan,

you may also want to take a look at Kafka Streams, which is a new stream
processing library that's included in Apache Kafka since version 0.10.
Kafka Streams is definitely more convenient and quicker to implement than
the "normal" Kafka producer/consumer clients.  Also, Kafka Streams does not
require a separate processing cluster like Spark or Storm -- you only need
the Kafka cluster to read data from / to write data to.

> My question is:Is it possible to consume messages from a Kafka topic in
real-time
> and write directly into another topic without using any streaming
technology such
> as storm or spark? If yes, do you have any examples to do that in Java?

There's one specific examples that does exactly this:
https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/PassThroughIntegrationTest.java

Here are the key snippets that you would be using:

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"pass-through-integration-test");
//...more configs here...

KStreamBuilder builder = new KStreamBuilder();
// Write the input data as-is to the output topic.
builder.stream("my-input-topic").to("my-output-topic");
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

Of course you can also transform the messages easily prior to writing them
to the output topic.  See the other examples/references down below.

Hope this helps,
Michael


Kafka Streams docs:
http://kafka.apache.org/documentation.html#streams
http://docs.confluent.io/3.0.0/streams/index.html

Some Kafka Streams examples:
https://github.com/confluentinc/examples/tree/master/kafka-streams

Some Kafka Streams articles:
http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple



On Fri, Jul 1, 2016 at 4:29 PM, numangoceri 
wrote:

> Hi,
>
> Thanks for your answer. I meant actually if we can verify the data
> reliability without using Storm or Spark. Because i know that by using
> Storm, you can guarantee the messages (depending on the type of the
> Topology) such as exactly once, at least once.
> If i simply use kafka consumer and another producer to forward the
> messages, could the data tranfer completely be guaranteed as well?
>
>
> Numan Göceri
>
> ---
>
> Rakesh Vidyadharan  wrote:
>
> >Definitely.  You can read off kafka using the samples shown in
> KafkaConsumer javadoc, transform if necessary and publish to the
> destination topic.
> >
> >
> >
> >
> >On 01/07/2016 03:24, "numan goceri" 
> wrote:
> >
> >>Hello everyone,
> >>I've a quick question:I'm using Apache Kafka producer to write the
> messages into a topic. My source at the moment a csv file but in the future
> i am supposed to read the messages from another kafka topic.My question
> is:Is it possible to consume messages from a Kafka topic in real-time and
> write directly into another topic without using any streaming technology
> such as storm or spark? If yes, do you have any examples to do that in Java?
> >>To sum up, it should be looking like this:Kafka reads from topic
> "kafkaSource" and writes into the topic "kafkaResult".
> >>
> >>Thanks in advance and Best Regards, Numan
>



-- 
Best regards,
Michael Noll



*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*


Re: Kafka without Storm/Spark

2016-07-01 Thread Michael Noll
> Because i know that by using Storm, you can guarantee the messages
(depending on the type of the Topology)
> such as exactly once, at least once.  If i simply use kafka consumer and
another producer to forward the
> messages, could the data tranfer completely be guaranteed as well?

Addendum:  If you use Kafka Streams, you have at-least-once processing
semantics.  So you do not lose data in the face of failures.

On Fri, Jul 1, 2016 at 4:29 PM, numangoceri 
wrote:

> Hi,
>
> Thanks for your answer. I meant actually if we can verify the data
> reliability without using Storm or Spark. Because i know that by using
> Storm, you can guarantee the messages (depending on the type of the
> Topology) such as exactly once, at least once.
> If i simply use kafka consumer and another producer to forward the
> messages, could the data tranfer completely be guaranteed as well?
>
>
> Numan Göceri
>
> ---
>
> Rakesh Vidyadharan  wrote:
>
> >Definitely.  You can read off kafka using the samples shown in
> KafkaConsumer javadoc, transform if necessary and publish to the
> destination topic.
> >
> >
> >
> >
> >On 01/07/2016 03:24, "numan goceri" 
> wrote:
> >
> >>Hello everyone,
> >>I've a quick question:I'm using Apache Kafka producer to write the
> messages into a topic. My source at the moment a csv file but in the future
> i am supposed to read the messages from another kafka topic.My question
> is:Is it possible to consume messages from a Kafka topic in real-time and
> write directly into another topic without using any streaming technology
> such as storm or spark? If yes, do you have any examples to do that in Java?
> >>To sum up, it should be looking like this:Kafka reads from topic
> "kafkaSource" and writes into the topic "kafkaResult".
> >>
> >>Thanks in advance and Best Regards, Numan
>


Re: documenting Kafka Streams PageView examples

2016-07-06 Thread Michael Noll
Phil,

I suggest to ask this question in the Confluent Platform mailing list
because you're referring to code under
https://github.com/confluentinc/examples (i.e. code that is not part of the
Apache Kafka project).

Best,
Michael


On Tue, Jul 5, 2016 at 5:34 PM, Philippe Derome  wrote:

> Would anyone with a good understanding of serialization be available to
> enhance documentation of the Apache Streams examples? I mean specifically:
> PageViewTypedDemo, PageViewUntypedDemo in package org
> .apache.kafka.streams.examples.pageview
>
> I'd be happy to run them with Confluent Platform 3 producer/consumer shell
> scripts but would need guidance as to how to invoke them and how to specify
> some input file (or stdin format).
>
> That would help me better understand how to get serialization and streams
> to work together.
>
> Thanks,
>
> Phil Derome
>



-- 
Best regards,
Michael Noll



*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*


Re: documenting Kafka Streams PageView examples

2016-07-06 Thread Michael Noll
Correction:  Just realized that I misread your message.  You are indeed
referring to the code examples in Apache Kafka. ;-)

On Wed, Jul 6, 2016 at 11:35 AM, Michael Noll  wrote:

> Phil,
>
> I suggest to ask this question in the Confluent Platform mailing list
> because you're referring to code under
> https://github.com/confluentinc/examples (i.e. code that is not part of
> the Apache Kafka project).
>
> Best,
> Michael
>
>
> On Tue, Jul 5, 2016 at 5:34 PM, Philippe Derome 
> wrote:
>
>> Would anyone with a good understanding of serialization be available to
>> enhance documentation of the Apache Streams examples? I mean specifically:
>> PageViewTypedDemo, PageViewUntypedDemo in package org
>> .apache.kafka.streams.examples.pageview
>>
>> I'd be happy to run them with Confluent Platform 3 producer/consumer shell
>> scripts but would need guidance as to how to invoke them and how to
>> specify
>> some input file (or stdin format).
>>
>> That would help me better understand how to get serialization and streams
>> to work together.
>>
>> Thanks,
>>
>> Phil Derome
>>
>
>
>
> --
> Best regards,
> Michael Noll
>
>
>
> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
> <%2B1%20650.453.5860>Download Apache Kafka and Confluent Platform:
> www.confluent.io/download <http://www.confluent.io/download>*
>



-- 
Best regards,
Michael Noll



*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*


Re: Question on partitions while consuming multiple topics

2016-07-06 Thread Michael Noll
Snehal beat me to it, as my suggestion would have also been to take a look
at Kafka Streams. :-)  Kafka Streams should be the easiest way to achieve
what you're describing.  Snehal's links are good starting points.

Further pointers are:

https://github.com/confluentinc/examples/blob/master/kafka-streams/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java

https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/JoinLambdaIntegrationTest.java

Both of these examples demonstrate how to work on topics that have the same
key (here: a user id).

-Michael


On Wed, Jul 6, 2016 at 8:44 AM, Snehal Nagmote 
wrote:

> Just an update, as I was reading about Kafka Streams, this functionality is
> by default supported with Kafka Streams Library.
> Following links are really helpful
>
>
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#partition-grouper
>
> https://github.com/apache/kafka/blob/0.10.0/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
>
> (Kafka Stream is supported with 0.10.0)
>
> Thanks,
> Snehal
>
> On 5 July 2016 at 11:47, Snehal Nagmote  wrote:
>
> > Hello Yardena ,
> >
> > You may want to take a look at manual assignment for partitions section
> > mentioned here ,
> >
> >
> http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client
> >  .
> >
> > However I have not tried using this for multiple topics , but looking at
> > api , it should be doable.
> >
> > You have to use same partitioning method which was used by producer to
> > determine the correct partition for consumer process for multiple topics.
> >
> > Note that , you would lose the ordering guarantee with this approach
> since
> > Kafka guarantees ordering within partition for a single topic ,
> >
> > Thanks,
> > Snehal
> >
> >
> > On 4 July 2016 at 07:50, Yardena Meymann  wrote:
> >
> >> Hi,
> >>
> >> We have several topics, same number of partitions for each, same key
> used
> >> for all topics.
> >> We also have several processes consuming the topics (one consumer
> group).
> >> What we wish would happen is that messages with the same key would end
> up
> >> consumed by the same process, regardless of the topic.
> >> Can it be achieved with Kafka? What is needed for that?
> >>
> >> Thanks in advance,
> >>   Yardena
> >>
> >
> >
>


Re: Question on partitions while consuming multiple topics

2016-07-06 Thread Michael Noll
PS:  The previous example links that I shared are for the latest `trunk`
version of Kafka.  If you want to use the latest official release instead
(Kafka 0.10.0.0), which most probably is what you want, then please use the
following links to these examples.  Note the `kafka-0.10.0.0-cp-3.0.0`
branch identifier in the urls, which stands for Kafka 0.10.0.0 release and
Confluent Platform 3.0.0 release.

https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java

https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/confluent/examples/streams/JoinLambdaIntegrationTest.java

-Michael



On Wed, Jul 6, 2016 at 11:49 AM, Michael Noll  wrote:

> Snehal beat me to it, as my suggestion would have also been to take a look
> at Kafka Streams. :-)  Kafka Streams should be the easiest way to achieve
> what you're describing.  Snehal's links are good starting points.
>
> Further pointers are:
>
>
> https://github.com/confluentinc/examples/blob/master/kafka-streams/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java
>
>
> https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/JoinLambdaIntegrationTest.java
>
> Both of these examples demonstrate how to work on topics that have the
> same key (here: a user id).
>
> -Michael
>
>
> On Wed, Jul 6, 2016 at 8:44 AM, Snehal Nagmote 
> wrote:
>
>> Just an update, as I was reading about Kafka Streams, this functionality
>> is
>> by default supported with Kafka Streams Library.
>> Following links are really helpful
>>
>>
>> http://docs.confluent.io/3.0.0/streams/developer-guide.html#partition-grouper
>>
>> https://github.com/apache/kafka/blob/0.10.0/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
>>
>> (Kafka Stream is supported with 0.10.0)
>>
>> Thanks,
>> Snehal
>>
>> On 5 July 2016 at 11:47, Snehal Nagmote  wrote:
>>
>> > Hello Yardena ,
>> >
>> > You may want to take a look at manual assignment for partitions section
>> > mentioned here ,
>> >
>> >
>> http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client
>> >  .
>> >
>> > However I have not tried using this for multiple topics , but looking at
>> > api , it should be doable.
>> >
>> > You have to use same partitioning method which was used by producer to
>> > determine the correct partition for consumer process for multiple
>> topics.
>> >
>> > Note that , you would lose the ordering guarantee with this approach
>> since
>> > Kafka guarantees ordering within partition for a single topic ,
>> >
>> > Thanks,
>> > Snehal
>> >
>> >
>> > On 4 July 2016 at 07:50, Yardena Meymann  wrote:
>> >
>> >> Hi,
>> >>
>> >> We have several topics, same number of partitions for each, same key
>> used
>> >> for all topics.
>> >> We also have several processes consuming the topics (one consumer
>> group).
>> >> What we wish would happen is that messages with the same key would end
>> up
>> >> consumed by the same process, regardless of the topic.
>> >> Can it be achieved with Kafka? What is needed for that?
>> >>
>> >> Thanks in advance,
>> >>   Yardena
>> >>
>> >
>> >
>>
>
>
>


-- 
Best regards,
Michael Noll



*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*


Re: Kafka Streams : Old Producer

2016-07-08 Thread Michael Noll
Vivek,

in this case you should manually embed a timestamp within the payload of
the produced messages (e.g. as a Long field in an Avro-encoded message
value).  This would need to be done by the producer.

Then, in Kafka Streams, you'd need to implement a custom
TimestampExtractor that can retrieve this timestamp from the message
payload. And you need to configure your StreamsConfig to use that custom
timestamp.

Hope this helps,
Michael



On Thursday, July 7, 2016, vivek thakre  wrote:

> Thats right Ismael, I am looking for work arounds either on 0.9.0.1
> Producer side or on the Kafka Streams side so that I can process messages
> produced by 0.9.0.1 producer using Kafka Streams Library.
>
> Thanks,
> Vivek
>
> On Thu, Jul 7, 2016 at 9:05 AM, Ismael Juma  > wrote:
>
> > Hi,
> >
> > Matthias, I think Vivek's question is not whether Kafka Streams can be
> used
> > with a Kafka 0.9 broker (which it cannot). The question is whether Kafka
> > Streams can process messages produced with a 0.9.0.1 producer into a
> > 0.10.0.0 broker. Is that right? If so, would a custom TimestampExtractor
> > work?
> >
> > Ismael
> >
> > On Thu, Jul 7, 2016 at 12:29 PM, Matthias J. Sax  >
> > wrote:
> >
> > > Hi Vivek,
> > >
> > > Kafka Streams works only with Kafka 0.10 (but not with 0.9).
> > >
> > > I am not aware of any work around to allow for 0.9 usage.
> > >
> > >
> > > -Matthias
> > >
> > > On 07/07/2016 05:37 AM, vivek thakre wrote:
> > > > Can kafka streams library work with the messages produced by 0.9.0.1
> > > > producer?
> > > > I guess not since the old producer would not add timestamp. ( I am
> > > getting
> > > > invalid timestamp exception)
> > > >
> > > > As I cannot change our producer application setup, I have to use
> > 0.9.0.1
> > > > producer.
> > > > Is there a workaround that I can try to use Kafka Streams?
> > > >
> > > > Thanks,
> > > > Vivek
> > > >
> > >
> > >
> >
>


-- 
Best regards,
Michael Noll



*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*


Re: Contribution : KafkaStreams CEP library

2016-07-11 Thread Michael Noll
Thanks for sharing, Florian!

-Michael


On Fri, Jul 8, 2016 at 6:48 PM, Florian Hussonnois 
wrote:

> Hi All,
>
> Since a few weeks I'm working for fun on a CEP library on top of
> KafkaStreams.
> There is some progress and I think my project start to look something, or
> at least I hope ;)
>
> https://github.com/fhussonnois/kafkastreams-cep
>
> So I'm pleased to share it with you (I already shared it with dev mailing
> list but I just realised that I've forgotten to add the user list ^^ ).
>
> Currently, I'm looking to test my library against real use-cases. If some
> of you test it please I would appreciate any feedback.
>
> Any contribution is welcome. I'm sure this project can be improved in many
> ways.
>
> Thank in advance,
>
> --
> Florian HUSSONNOIS
>


Re: Kafka Consumer for Real-Time Application?

2016-07-12 Thread Michael Noll
To explain what you're seeing:  After you have run a consumer application
once, it will have stored its latest consumer offsets in Kafka.  Upon
restart -- e.g. after a brief shutdown time as you mentioned -- the
consumer application will continue processing from the point of these
stored consumer offsets.

The auto.offset.reset setting that Snehal mentioned above takes effect if
and only if there are *no* consumer offsets stored in Kafka yet (i.e. the
typical situation where auto.offset.reset does take effect is if you are
starting a consumer application for the very first time).  This means that
setting auto.offset.reset=latest won't be sufficient to solve your problem.

To solve your problem you also need to do one of the following, in addition
to setting auto.offset.reset=latest:

1. Delete the consumer offsets / the group (= group.id) of your consumer
application and start fresh.  Kafka's `kafka-consumer-groups.sh` command
allows you to delete the stored consumer offsets / the group (if you are
using the Confluent Platform, the command is called `kafka-consumer-group`,
i.e. it does not have the `.sh` suffix).  This is the approach that I would
recommend.

2. Alternatively, as a crude workaround, you could also change the group.id
setting of your consumer application whenever you restart it.  Changing the
group.id is, in this case, a workaround to starting the processing "from
scratch", because using a new, never-used-before group.id implies that
there are no stored consumer offsets in Kafka from previous runs.

For your convenience I copy-pasted the help display of
`kafka-consumer-groups.sh` below.  If your consumer application uses
Kafka's "new" consumer client, you must set the `--bootstrap-server` CLI
option.  If you are using the old consumer client, you must set the
`--zookeeper` CLI option.

Hope this helps,
Michael


$ ./kafka-consumer-groups
List all consumer groups, describe a consumer group, or delete consumer
group info.
Option Description
-- ---
--bootstrap-server consumer): The server to connect
to.
--command-config  passed to Admin Client and
Consumer.
--delete   Pass in groups to delete topic
 partition offsets and ownership
 information over the entire
consumer
 group. For instance --group g1 --
 group g2
   Pass in groups with a single topic to
 just delete the given topic's
 partition offsets and ownership
 information for the given consumer
 groups. For instance --group g1 --
 group g2 --topic t1
   Pass in just a topic to delete the
 given topic's partition offsets and
 ownership information for every
 consumer group. For instance
--topic
 t1
   WARNING: Group deletion only works
for
 old ZK-based consumer groups, and
 one has to use it carefully to only
 delete groups that are not active.
--describe Describe consumer group and list
 offset lag related to given group.
--groupThe consumer group we wish to act on.
--list List all consumer groups.
--new-consumer Use new consumer.
--topic The topic whose consumer group
 information should be deleted.
--zookeeper  REQUIRED (unless new-consumer is
 used): The connection string for
the
 zookeeper connection in the form
 host:port. Multiple URLS can be
 given to allow fail-over.



On Tue, Jul 12, 2016 at 3:40 AM, BYEONG-GI KIM  wrote:

> Thank you for the reply.
>
> I thought that was what I found, but unfortunately wasn't.
>
> The previous messages still be consumed while the consumer has been
> re-executed with a few shutdown time...
>
>
>
> 2016-07-12 9:54 GMT+09:00 Snehal Nagmote :
>
> > Hello *,*
> >
> > If I understand your question correctly , what you are looking for is a
> > setting in consumer which will only read latest messages .
> >
> > auto.offset.reset =

Re: NetFlow metrics to Kafka

2016-07-13 Thread Michael Noll
Mathieu,

yes, this is possible.  In a past project of mine we have been doing this,
though I wasn't directly involved with coding the Cisco-Kafka part.  As far
as I know there aren't ready-to-use Netflow connectors available (for Kafka
Connect), so you most probably have to write your own connector and/or
application to bridge Cisco and Kafka.

-Michael


On Tue, Jul 12, 2016 at 4:50 PM, OZERAY MATHIEU 
wrote:

> Hello,
>
>
> I have a question about Kafka.
>
> Actually, I produce NetFlow metrics on my Cisco router. I want know if
> it's possible to send NetFlow metrics to Kafka broker to resend this in
> Logstash server ?
>
> Thanks for your answer.
>
> Have a nice day.
>
>
> Mathieu OZERAY
>



-- 


*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
*


Re: KTable DSL join

2016-07-14 Thread Michael Noll
Srikant,

> Its not a case for join() as the keys don't match. Its more a lookup
table.

Yes, the semantics of streaming joins in Kafka Streams are bit different
from joins in traditional RDBMS.
See
http://docs.confluent.io/3.0.0/streams/developer-guide.html#joining-streams.

Also, a heads up:  It turned out that user questions around joins in Kafka
Streams are pretty common.  We are currently working on improving the
documentation for joins to make this more clear.

Best wishes,
Michael



On Thu, Jul 14, 2016 at 10:12 AM, Matthias J. Sax 
wrote:

> I would recommend re-partitioning as described in Option 2.
>
> -Matthias
>
> On 07/13/2016 10:53 PM, Srikanth wrote:
> > Hello,
> >
> > I'm trying the following join using KTable. There are two change log
> tables.
> > Table1
> >   111 -> aaa
> >   222 -> bbb
> >   333 -> aaa
> >
> > Table2
> >   aaa -> 999
> >   bbb -> 888
> >   ccc -> 777
> >
> > My result table should be
> >   111 -> 999
> >   222 -> 888
> >   333 -> 999
> >
> > Its not a case for join() as the keys don't match. Its more a lookup
> table.
> >
> > Option1 is to use a Table1.toStream().process(ProcessSupplier(),
> > "storeName")
> > punctuate() will use regular kafka consumer that reads updates from
> Table2
> > and updates a private map.
> > Process() will do a key-value lookup.
> > This has an advantage when Table1 is much larger than Table2.
> > Each instance of the processor will have to hold entire Table2.
> >
> > Option2 is to re-partition Table1 using through(StreamPartitioner) and
> > partition using value.
> > This will ensure co-location. Then join with Table2. This part might be
> > tricky??
> >
> > Your comments and suggestions are welcome!
> >
> > Srikanth
> >
>
>


-- 

*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
*


Re: Kafka Streams reducebykey and tumbling window - need final windowed KTable values

2016-07-14 Thread Michael Noll
Srikanth,

> This would be useful in place where we use a key-value store just to
> duplicate a KTable for get() operations.
> Any rough idea when this is targeted for release?

We are aiming to add the queryable state feature into the next release of
Kafka.


> Its still not clear how to use this for the case this thread was started
for.
> Does Kafka Stream keep windows alive forever?
> At some point we need to "complete" a window rt?

Kafka Streams keeps windows alive until the so-called window retention
period expires.

Excerpt from
http://docs.confluent.io/3.0.0/streams/developer-guide.html#windowing-a-stream
:

[For the DSL only]: A local state store is usually needed for a
windowing operation
to store recently received records based on the window interval, while
old records
in the store are purged after the specified window retention period.
The retention time
can be set via `Windows#until()`.

Excerpt from
http://docs.confluent.io/3.0.0/streams/concepts.html#streams-concepts-windowing
:

Windowing operations are available in the Kafka Streams DSL, where
users can
specify a retention period for the window. This allows Kafka Streams to
retain
old window buckets for a period of time in order to wait for the late
arrival of records
whose timestamps fall within the window interval. If a record arrives
after the retention
period has passed, the record cannot be processed and is dropped.


> Either based on processing time or event time + watermark, etc.

The time semantics are based on the timestamp extractor you have configured
for your application.  The default timestamp extractor is
`ConsumerRecordTimestampExtractor`, which yields event-time semantics.  If
you want processing-time semantics, you need to configure your application
to use the `WallclockTimestampExtractor`.

Hope this helps,
Michael




On Wed, Jul 13, 2016 at 8:19 PM, Srikanth  wrote:

> Thanks.
>
> This would be useful in place where we use a key-value store just to
> duplicate a KTable for get() operations.
> Any rough idea when this is targeted for release?
>
> Its still not clear how to use this for the case this thread was started
> for.
> Does Kafka Stream keep windows alive forever?
> At some point we need to "complete" a window rt? Either based on processing
> time or event time + watermark, etc.
> How can we tie internal state store query with window completion? i.e, get
> the final value.
>
> Srikanth
>
> On Thu, Jul 7, 2016 at 2:05 PM, Eno Thereska 
> wrote:
>
> > Hi Srikanth, Clive,
> >
> > Today we just added some example code usage in the KIP after feedback
> from
> > the community. There is code that shows how to access a WindowStore (in
> > read-only mode).
> >
> > Thanks
> > Eno
> >
> >
> > > On 7 Jul 2016, at 15:57, Srikanth  wrote:
> > >
> > > Eno,
> > >
> > > I was also looking for something similar. To output aggregate value
> once
> > > the window is "complete".
> > > I'm not sure getting individual update for an aggregate operator is
> that
> > > useful.
> > >
> > > With KIP-67, will we have access to Windowed[key]( key + timestamp) and
> > > value?
> > > Does until() clear this store when time passes?
> > >
> > > Srikanth
> > >
> > > On Thu, Jun 30, 2016 at 4:27 AM, Clive Cox
>  > >
> > > wrote:
> > >
> > >> Hi Eno,
> > >> I've looked at KIP-67. It looks good but its not clear what calls I
> > would
> > >> make to do what I presently need: Get access to each windowed store at
> > some
> > >> time soon after window end time. I can then use the methods specified
> to
> > >> iterate over keys and values. Can you point me to the relevant
> > >> method/technique for this?
> > >>
> > >> Thanks,
> > >> Clive
> > >>
> > >>
> > >>On Tuesday, 28 June 2016, 12:47, Eno Thereska <
> > eno.there...@gmail.com>
> > >> wrote:
> > >>
> > >>
> > >> Hi Clive,
> > >>
> > >> As promised, here is the link to the KIP that just went out today.
> > >> Feedback welcome:
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> > >> <
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67:+Queryable+state+for+Kafka+Streams
> > >>>
> > >>
> > >> Thanks
> > >> Eno
> > >>
> > >>> On 27 Jun 2016, at 20:56, Eno Thereska 
> wrote:
> > >>>
> > >>> Hi Clive,
> > >>>
> > >>> We are working on exposing the state store behind a KTable as part of
> > >> allowing for queries to the structures currently hidden behind the
> > language
> > >> (DSL). The KIP should be out today or tomorrow for you to have a look.
> > You
> > >> can probably do what you need using the low-level processor API but
> then
> > >> you'd lose the benefits of the DSL and would have to maintain your own
> > >> structures.
> > >>>
> > >>> Thanks,
> > >>> Eno
> > >>>
> >  On 26 Jun 2016, at 18:42, Clive Cox 
> > >> wrote:
> > 
> >  Following on from this thread, if I want to iterate over a KTable at
> > >> the end of its hopping/tumbling Time Window how can I do 

Re: KStream-to-KStream Join Example

2016-07-14 Thread Michael Noll
Vivek,

another option for you is to replace the `map()` calls with `mapValues()`.
Can you give that a try?

Background:  You are calling `map()` on your two input streams, but in
neither of the two cases are you actually changing the key (it is always
the userId before the map, and it stays the userId after the map), see
below.  However, Kafka Streams can't deduce that you are effectively not
changing the key during the `map()`.  If you use `mapValues()`, then by
definition you are only changing values but not modifying the keys.

Idea:

  userClickStream
.filter((userId,record)-> userId != null)
.map((userId,record) -> new > KeyValue<>(userId,1l));

  userEventStream
.filter((userId,record)-> userId != null)
.map((userId,record) -> new KeyValue<>(userId,JsonPath.read(record,
"$.event.page.channel").toString()));

should become sth like:

  userClickStream
.filter((userId,record)-> userId != null)
.mapValues(record -> 1L);

  userEventStream
.filter((userId,record)-> userId != null)
.mapValues(record -> JsonPath.read(record,
"$.event.page.channel").toString());


Best,
Michael



On Thu, Jul 14, 2016 at 10:24 AM, Matthias J. Sax 
wrote:

> Hi,
>
> Both streams need to be co-partitioned, ie, if you change the key of one
> join input, you need to re-partitioned this stream on the new key via
> .through(). You should create the topic you use in through() manually,
> before you start your Kafka Streams application.
>
> (For future release, this re-partitioning will happen automatically.)
>
> In your case, if I did not miss anything, you need to re-partition both
> streams as you apply map() on both before the join.
>
> KStream streamA = stream1.map(...).through(...);
> KStream streamB = stream2.map(...).through(...);
>
> KStream joinResult = streamA.join(streamB, ...);
>
>
> -Matthias
>
> On 07/14/2016 03:26 AM, vivek thakre wrote:
> > Yes, there are same number of partitions to both the topic, also same
> > partition key i.e userId
> > If I just join the streams without applying the map functions (in this
> > case userClickStream
> > and userEventSrtream) , it works.
> >
> > Thanks,
> > Vivek
> >
> >
> > On Wed, Jul 13, 2016 at 4:53 PM, Philippe Derome 
> wrote:
> >
> >> Did you specify same number of partitions for the two input topics you
> are
> >> joining? I think that this is usually the first thing people ask to
> verify
> >> with errors similar to yours.
> >>
> >> If you are experimenting with learning some concepts, it is simpler to
> >> always use one partition for your topics.
> >> On 13 Jul 2016 7:40 p.m., "vivek thakre" 
> wrote:
> >>
> >>> Hello,
> >>>
> >>> I want to join 2 Topics (KStreams)
> >>>
> >>>
> >>> Stream 1
> >>> Topic :  userIdClicks
> >>> Key : userId
> >>> Value : JSON String with event details
> >>>
> >>> Stream 2
> >>> Topic :  userIdChannel
> >>> Key : userId
> >>> Value : JSON String  with event details and has channel value
> >>>
> >>> I could not find any examples with KStream-to-KStream Join.
> >>>
> >>> Here is my code
> >>>
> >>> //build stream userIdClicks
>  KStream userClickStream = builder.stream(stringSerde,
> >>> stringSerde,
>  "userClicks");
> 
> >>>
> >>>
>  //create stream -> < userId, 1 (count) >
>  KStream *userClickCountStream* =
> userClickStream.filter((
>  userId,record)-> userId != null) .map((userId,record) -> new
> >> KeyValue<>(
>  userId,1l));
> 
> >>>
> >>>
>  //build stream userChannelStream
>  KStream userEventStream = builder.stream(stringSerde,
>  stringSerde, "userEvents");
> 
> >>>
> >>>
>  //create stream  : extract channel value from json
> >>> string
>  KStream *userChannelStream* =  userEventStream
>  .filter((userId,record)-> userId != null)
>  .map((userId,record) -> new KeyValue<>(userId
>  ,JsonPath.read(record, "$.event.page.channel").toString()));
> 
> >>>
> >>>
>  //join *userClickCountStream* with
>  *userChannelStream*KTable clicksPerChannel =
>  userClickCountStream
>  .join(userChannelStream, new ValueJoiner  ChannelWithClicks>() {
>   @Override
>   public ChannelWithClicks apply(Long clicks, String
> >> channel)
> >>> {
>  return new ChannelWithClicks(channel == null ?
> >> "UNKNOWN"
>  : channel, clicks);
>   }
>   },
> >>> JoinWindows.of("ClicksPerChannelwindowed").after(3).before(3))
>  //30 secs before and after
>  .map((user, channelWithClicks) -> new
> >>> KeyValue<>(channelWithClicks
>  .getChannel(), channelWithClicks.getClicks()))
>  .reduceByKey(
>  (firstClicks, secondClicks) -> firstClicks +
>  secondClicks,
>   stringSerde, longSerde,
> >> "ClicksPerChannelUnwindowed"
>  );
> >>>
> >>> When I run this topology, I get an exception
> >>>
> >>> Invalid topology building: KSTREAM-MA

Re: Building API to make Kafka reactive

2016-07-22 Thread Michael Noll
Shekar,

you mentioned:

> The API should give different status at each part of the pipeline.
> At the ingestion, the API responds with "submitted"
> During the progression, the API returns "in progress"
> After successful completion, the API returns "Success"

May I ask what your motivation is to know the status of each part of the
pipeline?  Do you need this information to, say, detect/prevent message
loss?

-Michael





On Wed, Jun 29, 2016 at 8:39 AM, Shekar Tippur  wrote:

> I am looking at building a reactive api on top of Kafka.
> This API produces event to Kafka topic. I want to add a unique session id
> into the payload.
> The data gets transformed as it goes through different stages of a
> pipeline. I want to specify a final topic where I want the api to know that
> the processing was successful.
> The API should give different status at each part of the pipeline.
> At the ingestion, the API responds with "submitted"
> During the progression, the API returns "in progress"
> After successful completion, the API returns "Success"
>
> Couple of questions:
> 1. Is this feasible?
> 2. I was looking at project reactor (https://projectreactor.io) where the
> docs talk about event bus. I wanted to see if I can implement a consumer
> that points to the "end" topic and throws an event into the event bus.
> Since I would know the session ID, I can process the request accordingly.
>
> Appreciate your inputs.
>
> - Shekar
>


Re: Kafka Streams Latency

2016-07-22 Thread Michael Noll
The SimpleBenchmark included in Apache Kafka [1] is, as Eno mentioned,
quite rudimentary.  It does some simple benchmark runs of Kafka's
standard/native producer and consumer clients, and then some Kafka Streams
specific ones.  So you can compare somewhat between the native clients and
Kafka Streams (which uses the producer and consumer clients behind the
scenes).

By default, the SimpleBenchmark assumes that you have a local ZK instance
running on port 2181 plus a local Kafka broker running on port 9092.  But
note that, in practice, ZooKeeper, Kafka (broker/brokers), and a Kafka
Streams application are usually on different machines, so throughput
numbers from an environment where you co-located these three components on
a single machine (as is assumed by default by SimpleBenchmark) needs to be
taken with a grain of salt.

One way to run SimpleBenchmark is as follows.

- Checkout, build, and locally install Kafka's trunk version.

$ git clone https://github.com/apache/kafka.git
$ cd kafka

# May or may not be needed: bootstrap gradle wrapper (to give you
`gradlew`)
$ gradle

# Note: perhaps e.g. `./gradlew jarAll` would be sufficient, I can't
remember off the top of my head
$ ./gradlew installAll


- Run a local Zookeeper instance (from the cloned repo top-level dir):

$ bin/zookeeper-server-start.sh config/zookeeper.properties

- Then, in another terminal, run a local Kafka broker (from the cloned repo
top-level dir):

$ bin/kafka-server-start.sh config/server.properties

- Then, in another terminal or from within your IDE, run SimpleBenchmark.


Hope this helps!
Michael



[1]
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java


On Fri, Jul 22, 2016 at 4:30 PM, Eno Thereska 
wrote:

> Hi Adrienne,
>
> Also you might want to have a look at the SimpleBenchmark.java file
> included with Kafka Streams (org.apache.kafka.streams.perf.
> SimpleBenchmark). It does some simple measurements of consumer, producer
> and Kafka Streams throughput.
>
> Thanks
> Eno
>
> > On 22 Jul 2016, at 07:21, David Garcia  wrote:
> >
> > You should probably just put reporting in your app.  Dropwizard,
> logs…etc.  You can also look at Kafka JMX consumer metrics (assuming you
> don’t have too many consumers).
> >
> > -David
> >
> > On 7/22/16, 9:13 AM, "Adrienne Kole"  wrote:
> >
> >Hi,
> >
> > How can I measure the latency and throughput in Kafka Streams?
> >
> >Cheers
> >Adrienne
> >
> >
>
>


-- 

*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
*


Re: How to get the Global IP address of the Producer?

2016-07-28 Thread Michael Noll
One option is to implement the producer applications in such a way that
each producer includes its own IP address in the payload of the messages it
produces.

-Michael


On Tue, Jul 26, 2016 at 10:15 AM, Gourab Chowdhury 
wrote:

> Hi,
>
> I am working on a project that includes Apache Kafka. My consumer is a
> Apache Spark and I am try to log the Global IP from where the message was
> generated, to verify from which device/system?
>
> Ofcourse Kafka know the IP of the producer, since it can read the packets.
> Can it append it in the message?
>
> Thanks,
> Gourab
>


Re: Questions about Kafka Streams Partitioning & Deployment

2016-07-29 Thread Michael Noll
Michael,

> Guozhang, in (2) above did you mean "some keys *may be* hashed to
different
> partitions and the existing local state stores will not be valid?"
> That fits with out understanding.

Yes, that's what Guozhang meant.

Corrected version:

When you increase the number of input partitions and hence number of
processors / local stores, however, some keys may be hashed to
different partitions and the existing local state stores will not be
valid
in this case. [...]


Hope this helps,
Michael



On Wed, Jul 20, 2016 at 11:13 PM, Michael Ruberry 
wrote:

> Thank you both for your replies. This is incredibly helpful.
>
> Guozhang, in (2) above did you mean "some keys* may be* hashed to different
> partitions and the existing local state stores will not be valid?" That
> fits with out understanding.
>
> As to your caveats in (3) and (4), we are trying to be sure that our
> datastore will be "loaded" properly before we begin processing. Would you
> say the promise when we request a store value for a key given in
> process(key, ...) is that we get the most up date value for that key? Is
> this promise true if we restart the app or create a new app consuming the
> same local store? I believe that's the case but want to double check now.
>
> On Wed, Jul 20, 2016 at 1:14 PM, Guozhang Wang  wrote:
>
> > Hi Michael,
> >
> > 1. Kafka Streams always tries to colocate the local stores with the
> > processing nodes based on the partition key. For example, if you want to
> do
> > an aggregation based on key K1, but the input topic is not keyed on K1
> and
> > hence not partitioned on that. The library then will auto-repartition
> into
> > an intermediate topic based on K1 to make sure that the local stores used
> > for storing the aggregates based on K1 will be co-located with the
> > processor that gets partitions hashed on K1 as well.
> >
> > 2. When you increase the number of input partitions and hence number of
> > processors / local stores, however, some keys may not be hashed to
> > different partitions and the existing local state stores will not be
> valid
> > in this case. In practice, we recommend users to over-partition their
> input
> > topics (note that multiple partitions can be assigned to the same
> > processor) so that when they increase the number of streams instances
> > later, they do not need to add more partitions.
> >
> > 3. If you change your code, again the existing state stores may not be
> > valid (note colocating is still guaranteed) anymore depending on how you
> > modified the computational logic. In this case either you treat the new
> > code as a new application with a different application id so that
> > everything can be restarted from scratch, or you can "wipe out" the
> > existing invalid processing state, for which we have provided some tools
> > for this purpose and are writing a tutorial about how to do
> "re-processing"
> > in general.
> >
> > 4. About bootstrapping, currently Kafka Streams does not have a
> "bootstrap
> > stream" concept so that it can be processed completely before processing
> > other streams. Instead, we are currently relying on using the record
> > timestamp to "synchronize streams" (similar to the message chooser
> > functionality in Samza) and you can find more details here:
> >
> >
> >
> http://docs.confluent.io/3.0.0/streams/architecture.html#flow-control-with-timestamps
> >
> > And we are currently working on having finer flow control mechanisms as
> > well:
> >
> > https://issues.apache.org/jira/browse/KAFKA-3478
> >
> >
> > Hope these help.
> >
> > Guozhang
> >
> >
> >
> > On Wed, Jul 20, 2016 at 12:36 PM, Eno Thereska 
> > wrote:
> >
> > > Hi Michael,
> > >
> > > These are good questions and I can confirm that the system works in the
> > > way you hope it works, if you use the DSL and don't make up keys
> > > arbitrarily. In other words, there is nothing currently that prevents
> you
> > > from shooting yourself in the foot e.g., by making up keys and using
> the
> > > same made-up key in different processing nodes.
> > >
> > > However, if you use the Kafka Streams primitives, then such bad
> > situations
> > > are not supposed to happen.
> > >
> > > Thanks
> > > Eno
> > >
> > > > On 18 Jul 2016, at 11:28, Michael Ruberry 
> > wrote:
> > > >
> > > > Hi all,
> > > >
> > > > My company, Taboola, has been looking at Kafka Streams and we are
> still
> > > > confused about some details of partitions and store persistence.
> > > >
> > > > So let's say we have an app consuming a single topic and we're using
> an
> > > > embedded and persisted key-value store:
> > > >
> > > >
> > > >   1. If we restart the app, will each node finish loading values from
> > the
> > > >   store before it begins processing? (Analogous to Samza's boot-strap
> > > >   streams?)
> > > >   2. How are are the store's entries partitioned among the nodes? Is
> > > there
> > > >   a promise that if an entry in the store and the stream have the
> same
> > > key
> > > >

Re: Reactive Kafka performance

2016-08-02 Thread Michael Noll
David,

you wrote:

> Each task would effectively block on DB-io for every history-set
retrieval;
> obviously we would use a TTL cache (KTable could be useful here, but it
> wouldn’t be able to hold “all” of the history for every user)

Can you elaborate a bit why you think a KTable wouldn't be able to hold the
full history for every user?  (Not implying I have a different opinion,
simply trying to understand.)

One advantage of using a KTable is data locality for any such lookups
during your stream-table join, i.e. when processing a new incoming record,
you wouldn't incur increased per-record processing latency because of
network round trips (while talking to a remote DB).  I'd expect this would
significantly reduce the time needed to finish processing of one new
incoming message.  There are further advantages for this setup, see [1].
So if a KTable-based approach would be suitable for your use case, I'd
consider giving that a try.


[1] http://www.confluent.io/blog/elastic-scaling-in-kafka-streams



On Sat, Jul 30, 2016 at 5:44 PM, David Garcia  wrote:

> Hey Guozhang, our basic road block is asynchronous processing (this is
> actually related to my previous post about asynchronous processing).  Here
> is the simplest use-case:
>
> The streaming job receives messages.  Each message is a user-event and
> needs to randomly look up that user’s history (for 100’s-of-thousands of
> users, and each user can have 10’s-of-thousands of events in their
> history).  Once the history is retrieved, processing can continue.  We need
> processing to be as fast as possible and we need the ability to easily
> accommodate increases in incoming message traffic.  Here are the two
> designs (with KStreams, and then with Reactive Kafka)
>
> KStreams Approach:
>
> KStreams depth-first approach requires finishing processing of one message
> before the next one becomes available.  So, we would have to first estimate
> the average input message rate (and measure the performance of our app) and
> then partition our topic/s appropriately.  Each task would effectively
> block on DB-io for every history-set retrieval; obviously we would use a
> TTL cache (KTable could be useful here, but it wouldn’t be able to hold
> “all” of the history for every user).  If we need to “scale” our
> application, we would add more partitions and application processing
> instances.  Please suggest any other design choice we could go with.  I’m
> might be missing something.
>
> Reactive Kafka Approach:
>
> Reactive Kafka allows out-of-order processing.  So, while we are fetching
> history for event-1, we can start processing event-2.  In a nutshell
> Reactive-Kafka parallelism is not tightly-coupled to the number of
> partitions in the topic (obviously this doesn’t apply to the input…we can
> only receive events as fast as current partition configuration allows…but
> we don’t’ have to block on io before we receive the next message)
>
>
> I’m new to both technologies, so any and all suggestions are welcome.
>
> -David
>
> On 7/30/16, 9:24 AM, "Guozhang Wang"  wrote:
>
> Hello David,
>
> I'd love to learn details about the "flexibility" of Reactive Kafka
> compared with KStreams in order to see if KStreams can improve on that
> end.
> Would you like to elaborate a bit more on that end?
>
> Guozhang
>
>
> On Thu, Jul 28, 2016 at 12:16 PM, David Garcia 
> wrote:
>
> > Our team is evaluating KStreams and Reactive Kafka (version
> 0.11-M4)  on a
> > confluent 3.0 cluster.  Our testing is very simple (pulling from one
> topic,
> > doing a simple transform) and then writing out to another topic.
> >
> > The performance for the two systems is night and day. Both
> applications
> > were running on a laptop and connecting to kafka over a wifi
> network.  Here
> > are the numbers:
> >
> > KStreams: ~14K messages per second
> > Reactive Kafka: ~110 messages per second
> >
> > Both the input, and output topic had 54 partitions.  I’m fairly
> certain
> > I’m not using Reactive kafka with good configuration.  Here is some
> stubbed
> > out code:
> https://gist.github.com/anduill/2e17cd7a40d4a86fefe19870d1270f5b
> >
> > One note, I am using the confluent stack (hence the
> > CachedSchemaRegistryClient)
> >
> > I like the flexibility of Reactive Kafka, so we’d really like to use
> > it…but if performance is going to be like this, I can’t really
> justify it.
> > I’m a scala/akka/streaming-akka newbie, so I’m sure there are better
> ways
> > to use the API.  Any help is appreciated.
> >
> > -David
> >
>
>
>
> --
> -- Guozhang
>
>
>


Re: Kafka Streams on Windows?

2016-08-05 Thread Michael Noll
Thanks a lot for investing and also for sharing back your findings, Mathieu!

-Michael


On Fri, Aug 5, 2016 at 3:10 PM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> I took that approach.  It was painful, but, ultimately did get me a working
> Windows development environment.
>
> To any who follow in my footsteps, here is my trail:
>
>1. Upgrade to at least Kafka Streams 0.10.0.1 (currently only in RC).
>   - This is necessary because .1 bumps the rocksdb dependency to 4.8.0,
>   where the previous 4.4 dependency did not yet support loading a
> Windows JNI
>   library.
>   2. Follow the instructions here (
>https://github.com/facebook/rocksdb/blob/v4.8/CMakeLists.txt) to build
>rocksdb.
>   - That link is for the v4.8 tag; be sure to match this with the
>   version of rocksdb that Kafka Streams depends upon in the
> future.  0.10.0.1
>   -> v4.8, but future releases of Kafka Streams will likely depend on
> newer
>   versions.
>   - Be sure to include the "-DJNI=1" compile option to build the JNI
>   wrapper.
>   - None of the third-party dependencies (eg. snappy, jemalloc, etc.)
>   seem to be required to get something functional, but, it
> probably isn't the
>   most efficient build.
>   - Ensure that "javac" and "javah" are on your path when running
>   cmake; if there are any errors in the cmake output your JNI wrapper
>   probably won't build.
>   - You may or may not need to make minor patches to make the project
>   build in Windows.  It appears that the Windows build is often
> broken; for
>   example, I had to apply this patch:
>   https://github.com/facebook/rocksdb/pull/1223/files
>   - Phew, that should give you a build\java\Debug\rocksdbjni.dll.  So
>   close to the summit... just a horrible hack left...
>3. Copy rocksdbjni.dll to librocksdbjni-win64.dll.
>4. Insert librocksdbjni-win64.dll into your rocksdbjni-4.8.0.jar.
>   - Ugh, this is the horrible hack.  rocksdbjni seems to only look
>   inside its own jar for its native libraries, so, this is where
> it needs to
>   be.
>   - If you're a gradle user on Windows, you'd find this jar file
>   in C:\Users\...your-windows-user...\.gradle\caches\modules-2\
> files-2.1\org.rocksdb\rocksdbjni\4.8.0\b543fc4ea5b52ad790730dee376ba0
> df06d9f5f7.
>
> And there you go, almost a working Kafka Streams app in Windows.  One other
> detail is that the default state storage directory doesn't seem to be
> created on demand, so I had to mkdir C:\tmp\kafka-streams myself before my
> app would work.
>
> Mathieu
>
>
> On Fri, Aug 5, 2016 at 3:13 AM, Eno Thereska 
> wrote:
>
> > Hi Mathieu,
> >
> > It is true that the DSL currently does not support configuration of the
> > stores.
> >
> > Sounds like it might be worth trying to build RocksDb and dropping into
> > classpath for now.
> >
> > Eno
> >
> > > On 4 Aug 2016, at 17:42, Mathieu Fenniak  >
> > wrote:
> > >
> > > Hi Eno,
> > >
> > > Yes, I've looked at that.  RocksDB can be built and run in Windows,
> but,
> > > the JNI wrapper does not include Windows binarie (
> > > https://github.com/facebook/rocksdb/issues/703).  rocksdbjni-4.4.1.jar
> > > includes librocksdbjni-linux32.so, librocksdbjni-linux64.so, and
> > > librocksdbjni-osx.jnilib, so only supports Linux x86 & x64 and OS X.
> It
> > is
> > > probably possible for me to build it myself and drop it in my
> classpath,
> > > but, I'm looking for a lower friction approach if one exists. :-)
> > >
> > > It looks like this was discussed recently on the Confluent Platform
> > mailing
> > > list (https://groups.google.com/forum/#!topic/confluent-
> > platform/Z1rsfSNrVJk)
> > > and the conclusion there was that high-level streams DSL doesn't
> support
> > > configuration of the stores.
> > >
> > > Mathieu
> > >
> > >
> > > On Thu, Aug 4, 2016 at 10:28 AM, Eno Thereska 
> > > wrote:
> > >
> > >> Hi Mathieu,
> > >>
> > >> Have you had a chance to look at http://rocksdb.org/blog/2033/
> > >> rocksdb-is-now-available-in-windows-platform/? <
> > >> http://rocksdb.org/blog/2033/rocksdb-is-now-available-in-
> > windows-platform/?>
> > >> Curious to hear your and other's comments on whether that worked.
> > >>
> > >> It is possible to have Kafka Streams use an in-memory store (included
> > with
> > >> Kafka Streams) for development purposes. In that scenario RocksDb
> would
> > not
> > >> be needed.
> > >>
> > >> Eno
> > >>
> > >>
> > >>> On 4 Aug 2016, at 16:14, Mathieu Fenniak <
> mathieu.fenn...@replicon.com
> > >
> > >> wrote:
> > >>>
> > >>> Hey all,
> > >>>
> > >>> Is it anyone developing Kafka Streams applications on Windows?
> > >>>
> > >>> It seems like the RocksDB Java library doesn't include a native JNI
> > >> library
> > >>> for Windows, which prevents a Kafka Streams app from running on
> > >> Windows.  I
> > >>> was just wondering if others have run into this, and if so, what
> > approach
> > >>> you took to resol

Re: Automated Testing w/ Kafka Streams

2016-08-15 Thread Michael Noll
Mathieu,

follow-up question:  Are you also doing or considering integration testing
by spawning a local Kafka cluster and then reading/writing to that cluster
(often called embedded or in-memory cluster)?  This approach would be in
the middle between ProcessorTopologyTestDriver (that does not spawn a Kafka
cluster) and your system-level testing (which I suppose is running against
a "real" test Kafka cluster).

-Michael





On Mon, Aug 15, 2016 at 3:44 PM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hey all,
>
> At my workplace, we have a real focus on software automated testing.  I'd
> love to be able to test the composition of a TopologyBuilder with
> org.apache.kafka.test.ProcessorTopologyTestDriver
>  317b95efa4/streams/src/test/java/org/apache/kafka/test/
> ProcessorTopologyTestDriver.java>;
> has there ever been any thought given to making this part of the public API
> of Kafka Streams?
>
> For some background, here are some details on the automated testing plan
> that I have in mind for a Kafka Streams application.  Our goal is to enable
> continuous deployment of any new development we do, so, it has to be
> rigorously tested with complete automation.
>
> As part of our pre-commit testing, we'd first have these gateways; no code
> would reach our master branch without passing these tests:
>
>- At the finest level, unit tests covering individual pieces like a
>Serde, ValueMapper, ValueJoiner, aggregate adder/subtractor, etc.  These
>pieces are very isolated, very easy to unit test.
>- At a higher level, I'd like to have component tests of the composition
>of the TopologyBuilder; this is where ProcessorTopologyTestDriver would
> be
>valuable.  There'd be far fewer of these tests than the lower-level
> tests.
>There are no external dependencies to these tests, so they'd be very
> fast.
>
> Having passed that level of testing, we'd deploy the Kafka Streams
> application to an integration testing area where the rest of our
> application is kept up-to-date, and proceed with these integration tests:
>
>- Systems-level tests where we synthesize inputs to the Kafka topics,
>wait for the Streams app to process the data, and then inspect the
> output
>that it pushes into other Kafka topics.  These tests will be fewer in
>nature than the above tests, but they serve to ensure that the
> application
>is well-configured, executing, and handling inputs & outputs as
> expected.
>- UI-level tests where we verify behaviors that are expected from the
>system as a whole.  As our application is a web app, we'd be using
> Selenium
>to drive a web browser and verifying interactions and outputs that are
>expected from the Streams application matching our real-world use-cases.
>These tests are even fewer in nature than the above.
>
> This is an adaptation of the automated testing scaffold that we currently
> use for microservices; I'd love any input on the plan as a whole.
>
> Thanks,
>
> Mathieu
>


Re: Automated Testing w/ Kafka Streams

2016-08-16 Thread Michael Noll
Mathieu,

FWIW here are some pointers to run embedded Kafka/ZK instances for
integration testing.  The second block of references below uses Curator's
TestingServer for running embedded ZK instances.  See also the relevant
pom.xml for how the integration tests are being run (e.g. disabled JVM
reusage to ensure test isolation).

Unfortunately, Apache Kafka does not publish these testing facilities as
maven artifacts -- that's why everyone is rolling their own.

In Apache Kafka:

Helper classes (e.g. embedded Kafka)

https://github.com/apache/kafka/tree/trunk/streams/src/test/java/org/apache/kafka/streams/integration/utils

Integration test example:

https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java

Also, for kafka.utils.TestUtils usage:

https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala

In confluentinc/examples:

Helper classes (e.g. embedded Kafka, embedded Confluent Schema Registry
for Avro testing)

https://github.com/confluentinc/examples/tree/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/confluent/examples/streams/kafka

Some more sophisticated integration tests:

https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/confluent/examples/streams/WordCountLambdaIntegrationTest.java

https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java

Best,
Michael




On Tue, Aug 16, 2016 at 3:36 PM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hi Radek,
>
> No, I'm not familiar with these tools.  I see that Curator's TestingServer
> looks pretty straight-forward, but, I'm not really sure what
> kafka.util.TestUtils
> is.  I can't find any documentation referring to this, and it doesn't seem
> to be a part of any published maven artifacts in the Kafka project; can you
> point me at what you're using a little more specifically?
>
> Mathieu
>
>
> On Mon, Aug 15, 2016 at 2:39 PM, Radoslaw Gruchalski  >
> wrote:
>
> > Out of curiosity, are you aware of kafka.util.TestUtils and Apache
> Curator
> > TestingServer?
> > I’m using this successfully to test publis / consume scenarios with
> things
> > like Flink, Spark and custom apps.
> > What would stop you from taking the same approach?
> >
> > –
> > Best regards,
> > Radek Gruchalski
> > ra...@gruchalski.com
> >
> >
> > On August 15, 2016 at 9:41:37 PM, Mathieu Fenniak (
> > mathieu.fenn...@replicon.com) wrote:
> >
> > Hi Michael,
> >
> > It would definitely be an option. I am not currently doing any testing
> > like that; it could replace the ProcessorTopologyTestDriver-style
> testing
> > that I'd like to do, but there are some trade-offs to consider:
> >
> > - I can't do an isolated test of just the TopologyBuilder; I'd be
> > bringing in configuration management code (eg. configuring where to
> access
> > ZK + Kafka).
> > - Tests using a running Kafka server wouldn't have a clear end-point; if
> > something in the toplogy doesn't publish a message where I expected it
> to,
> > my test can only fail via a timeout.
> > - Tests are likely to be slower; this might not be significant, but a
> > small difference in test speed has a big impact in productivity after a
> > few
> > months of development
> > - Tests will be more complex & fragile; some additional component needs
> > to manage starting up that Kafka server, making sure it's ready-to-go,
> > running tests, and then tearing it down
> > - Tests will have to be cautious of state existing in Kafka. eg. two
> > test suites that touch the same topics could be influenced by state of a
> > previous test. Either you take a "destroy the world" approach between
> test
> > cases (or test suites), which probably makes test speed much worse, or,
> > you
> > find another way to isolate test's state.
> >
> > I'd have to face all these problems at the higher level that I'm calling
> > "systems-level tests", but, I think it would be better to do the majority
> > of the automated testing at a lower level that doesn't bring these
> > considerations into play.
> >
> > Mathieu
> >
> >
> > On Mon, Aug 15, 2016 at 12:13 PM, Michael Noll 
> > wrote:
> >
> > > Mathieu,
> > >
> > > follow-up question: Are you also doing or considering integration
> > testing
> > &

Re: Automated Testing w/ Kafka Streams

2016-08-16 Thread Michael Noll
Addendum:

> Unfortunately, Apache Kafka does not publish these testing facilities as
maven artifacts -- that's why everyone is rolling their own.

Some testing facilities (like kafka.utils.TestUtils) are published via
maven, but other helpful testing facilities are not.

Since Radek provided a snippet how to pull in the artifact that includes
k.u.TestUtils, here's the same snippet for Maven/pom.xml, with dependency
scope set to `test`:

  
  org.apache.kafka
  kafka_2.11
  0.10.0.0
  test
  test
  



On Tue, Aug 16, 2016 at 7:14 PM, Michael Noll  wrote:

> Mathieu,
>
> FWIW here are some pointers to run embedded Kafka/ZK instances for
> integration testing.  The second block of references below uses Curator's
> TestingServer for running embedded ZK instances.  See also the relevant
> pom.xml for how the integration tests are being run (e.g. disabled JVM
> reusage to ensure test isolation).
>
> Unfortunately, Apache Kafka does not publish these testing facilities as
> maven artifacts -- that's why everyone is rolling their own.
>
> In Apache Kafka:
>
> Helper classes (e.g. embedded Kafka)
> https://github.com/apache/kafka/tree/trunk/streams/src/
> test/java/org/apache/kafka/streams/integration/utils
>
> Integration test example:
> https://github.com/apache/kafka/blob/trunk/streams/src/
> test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
>
> Also, for kafka.utils.TestUtils usage:
> https://github.com/apache/kafka/blob/trunk/core/src/
> test/scala/integration/kafka/api/IntegrationTestHarness.scala
>
> In confluentinc/examples:
>
> Helper classes (e.g. embedded Kafka, embedded Confluent Schema
> Registry for Avro testing)
> https://github.com/confluentinc/examples/tree/
> kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/
> confluent/examples/streams/kafka
>
> Some more sophisticated integration tests:
> https://github.com/confluentinc/examples/blob/
> kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/
> confluent/examples/streams/WordCountLambdaIntegrationTest.java
> https://github.com/confluentinc/examples/blob/
> kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/
> confluent/examples/streams/SpecificAvroIntegrationTest.java
>
> Best,
> Michael
>
>
>
>
> On Tue, Aug 16, 2016 at 3:36 PM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
>> Hi Radek,
>>
>> No, I'm not familiar with these tools.  I see that Curator's TestingServer
>> looks pretty straight-forward, but, I'm not really sure what
>> kafka.util.TestUtils
>> is.  I can't find any documentation referring to this, and it doesn't seem
>> to be a part of any published maven artifacts in the Kafka project; can
>> you
>> point me at what you're using a little more specifically?
>>
>> Mathieu
>>
>>
>> On Mon, Aug 15, 2016 at 2:39 PM, Radoslaw Gruchalski <
>> ra...@gruchalski.com>
>> wrote:
>>
>> > Out of curiosity, are you aware of kafka.util.TestUtils and Apache
>> Curator
>> > TestingServer?
>> > I’m using this successfully to test publis / consume scenarios with
>> things
>> > like Flink, Spark and custom apps.
>> > What would stop you from taking the same approach?
>> >
>> > –
>> > Best regards,
>> > Radek Gruchalski
>> > ra...@gruchalski.com
>> >
>> >
>> > On August 15, 2016 at 9:41:37 PM, Mathieu Fenniak (
>> > mathieu.fenn...@replicon.com) wrote:
>> >
>> > Hi Michael,
>> >
>> > It would definitely be an option. I am not currently doing any testing
>> > like that; it could replace the ProcessorTopologyTestDriver-style
>> testing
>> > that I'd like to do, but there are some trade-offs to consider:
>> >
>> > - I can't do an isolated test of just the TopologyBuilder; I'd be
>> > bringing in configuration management code (eg. configuring where to
>> access
>> > ZK + Kafka).
>> > - Tests using a running Kafka server wouldn't have a clear end-point; if
>> > something in the toplogy doesn't publish a message where I expected it
>> to,
>> > my test can only fail via a timeout.
>> > - Tests are likely to be slower; this might not be significant, but a
>> > small difference in test speed has a big impact in productivity after a
>> > few
>> > months of development
>> > - Tests will be more complex & fragile; some additional component needs
>> > to manage starting up that Kafka server, 

Re: Joining Streams with Kafka Streams

2016-08-26 Thread Michael Noll
First a follow-up question, just in case (sorry if that was obvious to you
already):  Have you considered using Kafka Streams DSL, which has much more
convenient join functionality built-in out of the box?  The reason I am
asking is that you didn't specifically mention that you did try using the
DSL and/or you need to use the Processor API because of other reasons as
well.

-Michael


On Fri, Aug 26, 2016 at 1:51 AM, Caleb Welton  wrote:

> Hello,
>
> I'm trying to understand best practices related to joining streams using
> the Kafka Streams API.
>
> I can configure the topology such that two sources feed into a single
> processor:
>
> topologyBuilder
> .addSource("A", stringDeserializer, itemDeserializer, "a-topic")
> .addSource("B", stringDeserializer, itemDeserializer, "b-topic)
> .addProcessor("hello-join", HelloJoin::new, "A", "B")...
>
> And within my processor I can determine which topic a given message came
> from:
>
> public void process(String Key, String value) {
>  if (context.topic.equals("a-topic") {
>  ...
>  } else {
>  ...
>  }
>
> This allows for a crude form of cross stream join with the following
> issues/limitations:
>
>   i.  A string compare on topic name to decide which stream a message came
> from.  Having actual access to the TopicPartition could lead to more
> efficient validation.  Priority low, as this is just a small performance
> hit, but it is a per message performance hit so would be nice to eliminate.
>
>   ii. This requires "a-topic" and "b-topic" to have the same message
> format, which for general join handling is a pretty big limitation.  What
> would be the recommended way to handle the case of different message
> formats, e.g. needing different deserializers for different input topics?
>
> E.g. how would I define my Processor if the topology was:
>
> topologyBuilder
> .addSource("A", stringDeserializer, itemADeserializer, "a-topic")
> .addSource("B", stringDeserializer, itemBDeserializer, "b-topic)
> .addProcessor("hello-join", HelloJoin::new, "A", "B")...
>
> where itemADeserializer and itemBDeserializer return different classes?
>
> Thanks,
>   Caleb
>


Re: How distributed countByKey works in KStream ?

2016-08-29 Thread Michael Noll
In Kafka Streams, data is partitioned according to the keys of the
key-value records, and operations such as countByKey operate on these
stream partitions.  When reading data from Kafka, these stream partitions
map to the partitions of the Kafka input topic(s), but these may change
once you add processing operations.

To your question:  The first step, if the data isn't already keyed as
needed, is to select the key you want to count by, which results in 1+
output stream partitions.  Here, data may get shuffled across the network
(but if won't if there's no need to, e.g. when the data is already keyed as
needed).  Then the count operation is performed for each stream partition,
which is similar to the sort-and-reduce phase in Hadoop.

On Mon, Aug 29, 2016 at 5:31 PM, Tommy Go  wrote:

> Hi,
>
> For "word count" example in Hadoop, there are shuffle-sort-and-reduce
> phases that handles outputs from different mappers, how does it work in
> KStream ?
>


Re: kafka-streams project compiles using maven but failed using sbt

2016-08-29 Thread Michael Noll
Most probably because, in your build.sbt, you didn't enable the
-Xexperimental compiler flag for Scala.  This is required when using Scala
2.11 (as you do) to enable SAM for Java 8 lambda support.  Because this
compiler flag is not set your build fails because it can translate
`_.toUpperCase()` into a Java 8 lambda.

See
https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/pom.xml#L209-L214
.

Also, note that your build.sbt is not equivalent to your pom.xml.  For
example, the org.apache.kafka:kafka-clients dependency is missing in
build.sbt.


On Sat, Aug 27, 2016 at 2:01 PM, Tommy Go  wrote:

> Hi
>
> I am playing with kafka-streams using Scala, but found some strange issue,
> the following project compiles using mvn but failed using sbt:
>
> https://github.com/deeplambda/kstream-debug
>
> [error] /Users/tommy/tmp/kstream-debug/src/main/scala/kafka/
> streams/WordCountDemo.scala:49:
> missing parameter type for expanded function ((x$1) =>
> x$1.toUpperCase())
> [error] val uppercasedWithMapValues: KStream[Array[Byte], String]
> = textLines.mapValues(_.toUpperCase())
> [error]
>  ^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
>
>
>
> Any ideas why the demo failed compiling using sbt ?
>
> Thanks,
> Tommy
>


Re: Kafka Streaming Join for range of gps coordinates

2016-08-29 Thread Michael Noll
Quick reply only, since I am on my mobile.  Not an exact answer to your
problem but still somewhat related:
http://www.infolace.com/blog/2016/07/14/simple-spatial-windowing-with-kafka-streams/
(perhaps you have seen this already).

-Michael



On Sun, Aug 28, 2016 at 4:55 AM, Farhon Zaharia 
wrote:

> Hello friends,
>
> I am designing a new streaming component and am looking at how to use Kafka
> Streams.
>
> I need some guidance with the appropriate flow.
>
> *Problem to solve:*
> The problem I am working on is I have a large group of riders and drivers.
> I would like to match available riders to nearby drivers.  (For simplicity
> think of Uber or Lyft)
>
> The goal is to have the drivers within a certain delta of gps coordinates
> be notified of a potential rider.
>
> For example, a rider requests to be picked up at location:
> 37.788517, -122.406812
>
> I would like to select the nearby drivers to send notifications of an
> available match by selecting nearby drivers within a range
>
> latitude < 37.798517 && latitude > 37.778517 && longitude < -122.4106812 &&
> longitude > -122.3906812
>
> *Note this is a POC and would prefer to select the most nearby drivers,
> then after lookup the address and use my own graph for streets and
> calculate the shortest path on my own.
>
> I would like to have 3 initial topics:  riders, drivers, and paired-onride
>
> What is the best way to do this with Kafka Streams?
>
> *What I have tried or considered:*
> I was considering storing drivers in a Ktable and having riders in a
> KStream and joining them.  But I don't think this will work because the
> join is dependent on the key, which in this case I was looking more for a
> select statement to look for a range of gps coordinates as noted above.
> The drivers location will be updated periodically.
>
> I was also thinking of filtering the KStream based on the gps range and
> making a smaller subselection of available drivers within a certain
> distance to a rider.
>
> At this point I am seeking some guidance and if this is not an ideal
> use-case that is also ok.  Thanks for any information or direction you can
> provide.
>
>
> -Farhon
>


Re: How distributed countByKey works in KStream ?

2016-08-31 Thread Michael Noll
Can you double-check whether the results in wc-out are not rather:

a 1
b 1
a 2
b 2
c 1

?

On Wed, Aug 31, 2016 at 5:47 AM, Tommy Q  wrote:

> Tried the word count example as discussed, the result in wc-out is wrong:
>
> a 1
> > b 1
> > a 1
> > b 1
> > c 1
>
>
> The expected result should be:
>
> a 2
> > b 2
> > c 1
>
>
> Kafka version is 0.10.0.1
>
>
> On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax 
> wrote:
>
> > No. It does not support hidden topics.
> >
> > The only explanation might be, that there is no repartitioning step. But
> > than the question would be, if there is a bug in Kafka Streams, because
> > between map() and countByKey() repartitioning is required.
> >
> > Can you verify that the result is correct?
> >
> > -Matthias
> >
> > On 08/30/2016 03:24 PM, Tommy Q wrote:
> > > Does Kafka support hidden topics ? (Since all the topics infos are
> stored
> > > in ZK, this probably not the case )
> > >
> > > On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > >> Hi Tommy,
> > >>
> > >> yes, you do understand Kafka Streams correctly. And yes, for
> shuffling,
> > >> na internal topic will be created under the hood. It should be named
> > >> "-something-repartition". I am not sure, why it is
> not
> > >> listed via bin/kafka-topics.sh
> > >>
> > >> The internal topic "-counts-changelog" you see is
> > >> created to back the state of countByKey() operator.
> > >>
> > >> See
> > >> https://cwiki.apache.org/confluence/display/KAFKA/
> > >> Kafka+Streams%3A+Internal+Data+Management
> > >>
> > >> and
> > >>
> > >> http://www.confluent.io/blog/data-reprocessing-with-kafka-
> > >> streams-resetting-a-streams-application
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 08/30/2016 06:55 AM, Tommy Q wrote:
> > >>> Michael, Thanks for your help.
> > >>>
> > >>> Take the word count example, I am trying to walk through the code
> based
> > >> on
> > >>> your explanation:
> > >>>
> > >>> val textLines: KStream[String, String] =
> > >> builder.stream("input-topic")
> > >>> val wordCounts: KStream[String, JLong] = textLines
> > >>>   .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
> > >>>   .map((key: String, word: String) => new KeyValue(word, word))
> > >>>   .countByKey("counts")
> > >>>   .toStream
> > >>>
> > >>> wordCounts.to(stringSerde, longSerde, "wc-out")
> > >>>
> > >>> Suppose the input-topic has two partitions and each partition has a
> > >> string
> > >>> record produced into:
> > >>>
> > >>> input-topic_0 : "a b"
> > >>>> input-topic_1 : "a b c"
> > >>>
> > >>>
> > >>> Suppose we started two instance of the stream topology ( task_0 and
> > >>> task_1). So after flatMapValues & map executed, they should have the
> > >>> following task state:
> > >>>
> > >>> task_0 :  [ (a, "a"), (b, "b") ]
> > >>>> task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
> > >>>
> > >>>
> > >>> Before the execution of  countByKey, the kafka-stream framework
> should
> > >>> insert a invisible shuffle phase internally:
> > >>>
> > >>> shuffled across the network :
> > >>>>
> > >>>
> > >>>
> > >>>> _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
> > >>>> _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
> > >>>
> > >>>
> > >>> countByKey (reduce) :
> > >>>
> > >>> task_0 (counts-changelog_0) :  [ (a, 2) ]
> > >>>
> > >>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
> > >>>
> > >>>
> > >>> And after the execution of `wordCounts.to(stringSerde, longSerde,
> > &g

Re: How distributed countByKey works in KStream ?

2016-09-01 Thread Michael Noll
FYI:  We updated the 0.10.0.x demos for Kafka Streams at
https://github.com/confluentinc/examples to use #partitions >1 and include
`through()`.

See for example [1].

Hope this helps!
Michael


[1]
https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java



On Thu, Sep 1, 2016 at 10:50 AM, Tommy Q  wrote:

> It works after calling through() before countByKey, so many 0.10.0.1
> examples on the web missing the `through()` call and it will fail to get
> the right output when running with input topic > 1 partitions.
>
> Thanks very much all ! Finally got the correct results.
>
> On Thu, Sep 1, 2016 at 4:52 AM, Matthias J. Sax 
> wrote:
>
> > Hi Tommy,
> >
> > I did checkout your github project and can verify the "issue". As you
> > are using Kafka 0.10.0.1 the automatic repartitioning step is not
> > available.
> >
> > If you use "trunk" version, your program will run as expected. If you
> > want to stay with 0.10.0.1, you need to repartition the data after map()
> > explicitly, via a call to through():
> >
> > > val wordCounts: KStream[String, JLong] = textLines
> > >   .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
> > >   .map((key: String, word: String) => new KeyValue(word, word))
> > >   .through("my-repartitioing-topic")
> > >   .countByKey("counts")
> > >   .toStream
> >
> > Keep in mind, that it is recommended to create all user topics manually.
> > Thus, you should create your repartitioning topic you specify in
> > through() before you start your Kafka Streams application.
> >
> >
> > -Matthias
> >
> >
> > On 08/31/2016 09:07 PM, Guozhang Wang wrote:
> > > Hello Tommy,
> > >
> > > Which version of Kafka are you using?
> > >
> > > Guozhang
> > >
> > > On Wed, Aug 31, 2016 at 4:41 AM, Tommy Q  wrote:
> > >
> > >> I cleaned up all the zookeeper & kafka states and run the
> WordCountDemo
> > >> again, the results in wc-out is still wrong:
> > >>
> > >> a 1
> > >>> b 1
> > >>> a 1
> > >>> b 1
> > >>> c 1
> > >>
> > >>
> > >>
> > >> On Wed, Aug 31, 2016 at 5:32 PM, Michael Noll 
> > >> wrote:
> > >>
> > >>> Can you double-check whether the results in wc-out are not rather:
> > >>>
> > >>> a 1
> > >>> b 1
> > >>> a 2
> > >>> b 2
> > >>> c 1
> > >>>
> > >>> ?
> > >>>
> > >>> On Wed, Aug 31, 2016 at 5:47 AM, Tommy Q 
> wrote:
> > >>>
> > >>>> Tried the word count example as discussed, the result in wc-out is
> > >> wrong:
> > >>>>
> > >>>> a 1
> > >>>>> b 1
> > >>>>> a 1
> > >>>>> b 1
> > >>>>> c 1
> > >>>>
> > >>>>
> > >>>> The expected result should be:
> > >>>>
> > >>>> a 2
> > >>>>> b 2
> > >>>>> c 1
> > >>>>
> > >>>>
> > >>>> Kafka version is 0.10.0.1
> > >>>>
> > >>>>
> > >>>> On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax <
> > >> matth...@confluent.io
> > >>>>
> > >>>> wrote:
> > >>>>
> > >>>>> No. It does not support hidden topics.
> > >>>>>
> > >>>>> The only explanation might be, that there is no repartitioning
> step.
> > >>> But
> > >>>>> than the question would be, if there is a bug in Kafka Streams,
> > >> because
> > >>>>> between map() and countByKey() repartitioning is required.
> > >>>>>
> > >>>>> Can you verify that the result is correct?
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>> On 08/30/2016 03:24 PM, Tommy Q wrote:
> > >>>>>> Does Kafka support hidden topics ? (Since all the topics infos are
> > >>>> stored
> > >>>>>> in ZK, this probably not the case )
> > >>>&g

Re: Kafka Streams: joins without windowing (KStream) and without being KTables

2016-09-06 Thread Michael Noll
Also, another upcoming feature is (slightly simplified description follows)
"global" state/KTable.  Today, a KTable is always partitioned/sharded.  One
advantage of global state/tables will be for use cases where you need to
perform non-key joins, similar to what Guillermo described previously in
this thread.  Keep in mind that global tables implies that each
corresponding stream task will get a full, replicated copy of the table, so
you must pay attention to the memory/disk footprint.

On Tue, Sep 6, 2016 at 10:31 AM, Matthias J. Sax 
wrote:

> Hi,
>
> currently, in DSL only primary-key joins are supported. However in you
> case, you have a non-primary-key join. There is already a JIRA to add
> support for this: https://issues.apache.org/jira/browse/KAFKA-3705
>
> Currently, you will need to use Processor API. For the non-time-based
> input (if not too large) you could replicate it to all application
> instances and build a hash table (a simple in-memory HashMap might be
> sufficient). If input is too large to fit into memory, you need to build
> a more sophisticated hash table that also uses the disk.
>
> For replication, you will need an additional KafkaConsumer that assigns
> all partitions manually to itself and does never commit its offset
> (offset commit only work correctly, if each partitions is only assigned
> once -- but in your case, it would be assigned multiple times, depending
> on your overall parallelism of your Streams app).
>
> For the time-based input, you can just read it regularly, and for each
> record you do a look-up in the HashTable to compute the join.
>
> Does this make sense?
>
>
> -Matthias
>
> On 09/05/2016 11:28 PM, Guillermo Lammers Corral wrote:
> > Hi Matthias,
> >
> > Good question... the main problem is related with the kind of my data.
> The
> > first source of data is time based and the second one not but both have a
> > field with the same value (I don't know how to use it in the join without
> > being key. It can't, let me explain why):
> >
> > ObjectX (sameValue, date, valueX)
> > ObjectY (uniqueId, sameValue, valueY)
> >
> > I want to create a result object based on X and Y using sameValue as
> "key"
> > but there are some problems here:
> >
> >- sameValue of ObjectX cannot be key because I must take care of date
> >- sameValue of ObjectY cannot be key because sameValue is not key of
> >ObjectX (we couldn't join anything)
> >- uniqueId of ObjectY cannot be key because does not exists in ObjectX
> >(we couldn't join anything)
> >- I couldn't use as key something like someValue_date because date
> does
> >not exists in ObjectY (we couldn't join anything)
> >
> > So, actually I don't know how to implement this using Kafka Streams. I
> need
> > join data using a value field of each message (sameValue but not as key)
> > and do it indefinetely because I don't know when data will be sent
> whereas
> > the process will always be creating new result objects.
> >
> > Basically, I want to use streaming with Kafka Stream to make joins
> between
> > two sources of data but we cannot use KTable (key problems) and we cannot
> > use windowed KStream (or yes but with memory issues as you said) because
> I
> > don't know when data will arrive and I cannot lose data (any matching
> > between both sources).
> >
> > Do you see any solution? Will I have to use Processor API instead of DSL
> to
> > spill data to disk as you said?
> >
> > Thanks in advance!
> >
> > 2016-09-05 20:00 GMT+02:00 Matthias J. Sax :
> >
> >> Hey,
> >>
> >> are you sure, you want to join everything? This will result in a huge
> >> memory footprint of your application. You are right, that you cannot use
> >> KTable, however, windowed KStream joins would work -- you only need to
> >> specify a huge window (ie, use Long.MAX_VALUE; this will effectively be
> >> "infinitely large") thus that all data falls into a single window.
> >>
> >> The issue will be, that all data will be buffered in memory, thus, if
> >> your application run very long, it will eventually fail (I would
> >> assume). Thus, again my initial question: are you sure, you want to join
> >> everything? (It's stream processing, not batch processing...)
> >>
> >> If the answer is still yes, and you hit a memory issue, you will need to
> >> fall back to use Processor API instead of DSL to spill data to disk if
> >> it does not fit into memory and more (ie, you will need to implement
> >> your own version of an symmetric-hash-join that spills to disk). Of
> >> course, the disk usage will also be huge. Eventually, your disc might
> >> also become too small...
> >>
> >> Can you clarify, why you want to join everything? This does not sound
> >> like a good idea. Very large windows are handleable, but "infinite"
> >> windows are very problematic in stream processing.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 09/05/2016 06:25 PM, Guillermo Lammers Corral wrote:
> >>> Hi,
> >>>
> >>> I've been thinking how to solve with Kafka Strea

Re: enhancing KStream DSL

2016-09-09 Thread Michael Noll
Ara,

you have shared this code snippet:

>allRecords.branch(
>(imsi, callRecord) -> "VOICE".equalsIgnoreCase(
callRecord.getCallCommType()),
>(imsi, callRecord) -> "DATA".equalsIgnoreCase(
callRecord.getCallCommType()),
>(imsi, callRecord) -> true
>);

The branch() operation partitions the allRecords KStream into three
disjoint streams.

I'd suggest the following.

First, update the third predicate in your `branch()` step to be "everything
but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA
records are removed:


KStream[] branches = allRecords
.branch(
(imsi, callRecord) -> "VOICE".equalsIgnoreCase(
callRecord.getCallCommType()),
(imsi, callRecord) -> "DATA".equalsIgnoreCase(
callRecord.getCallCommType()),
(imsi, callRecord) -> !(callRecord.getCallCommType().
equalsIgnoreCase("VOICE") || callRecord.getCallCommType().
equalsIgnoreCase("DATA"))
);

This would give you:

KStream voiceRecords = branches[0];
KStream dataRecords = branches[1];
KStream recordsThatAreNeitherVoiceNorData =
branches[2];

Then, to count "everything" (VOICE + DATA + everything else), simply reuse
the original `allRecords` stream.

-Michael





On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi 
wrote:

> Let’s say I have this:
>
>
> KStream[] branches = allRecords
> .branch(
> (imsi, callRecord) -> "VOICE".equalsIgnoreCase(
> callRecord.getCallCommType()),
> (imsi, callRecord) -> "DATA".equalsIgnoreCase(
> callRecord.getCallCommType()),
> (imsi, callRecord) -> true
> );
> KStream callRecords = branches[0];
> KStream dataRecords = branches[1];
> KStream callRecordCounter = branches[2];
>
> callRecordCounter
> .map((imsi, callRecord) -> new KeyValue<>("", ""))
> .countByKey(
> UnlimitedWindows.of("counter-window"),
> stringSerde
> )
> .print();
>
> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1 if
> data is DATA. Branch 2 is supposed to get triggered regardless of type all
> the type so that then I can count stuff for a time window. BUT the problem
> is branch is implemented like this:
>
> private class KStreamBranchProcessor extends AbstractProcessor {
> @Override
> public void process(K key, V value) {
> for (int i = 0; i < predicates.length; i++) {
> if (predicates[i].test(key, value)) {
> // use forward with childIndex here and then break the loop
> // so that no record is going to be piped to multiple
> streams
> context().forward(key, value, i);
> break;
> }
> }
> }
> }
>
> Note the break. So the counter branch is never reached. I’d like to change
> the behavior of branch so that all predicates are checked and no break
> happens, in say a branchAll() method. What’s the easiest way to this
> functionality to the DSL? I tried process() but it doesn’t return KStream.
>
> Ara.
>
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>


Re: enhancing KStream DSL

2016-09-09 Thread Michael Noll
Oh, my bad.

Updating the third predicate in `branch()` may not even be needed.

You could simply do:

KStream[] branches = allRecords
.branch(
(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
ecord.getCallCommType()),
(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
cord.getCallCommType())
// Any callRecords that aren't matching any of the two
predicates above will be dropped.
);

This would give you two branched streams instead of three:

KStream voiceRecords = branches[0];
KStream dataRecords = branches[1];
// No third branched stream like before.

Then, to count "everything" (VOICE + DATA + everything else), simply reuse
the original `allRecords` stream.



On Fri, Sep 9, 2016 at 2:23 PM, Michael Noll  wrote:

> Ara,
>
> you have shared this code snippet:
>
> >allRecords.branch(
> >(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
> ecord.getCallCommType()),
> >(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
> cord.getCallCommType()),
> >(imsi, callRecord) -> true
> >);
>
> The branch() operation partitions the allRecords KStream into three
> disjoint streams.
>
> I'd suggest the following.
>
> First, update the third predicate in your `branch()` step to be "everything
> but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA
> records are removed:
>
>
> KStream[] branches = allRecords
> .branch(
> (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
> ecord.getCallCommType()),
> (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
> cord.getCallCommType()),
> (imsi, callRecord) -> !(callRecord.getCallCommType().
> equalsIgnoreCase("VOICE") || callRecord.getCallCommType().e
> qualsIgnoreCase("DATA"))
> );
>
> This would give you:
>
> KStream voiceRecords = branches[0];
> KStream dataRecords = branches[1];
> KStream recordsThatAreNeitherVoiceNorData =
> branches[2];
>
> Then, to count "everything" (VOICE + DATA + everything else), simply
> reuse the original `allRecords` stream.
>
> -Michael
>
>
>
>
>
> On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi  > wrote:
>
>> Let’s say I have this:
>>
>>
>> KStream[] branches = allRecords
>> .branch(
>> (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>> ecord.getCallCommType()),
>> (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>> cord.getCallCommType()),
>> (imsi, callRecord) -> true
>> );
>> KStream callRecords = branches[0];
>> KStream dataRecords = branches[1];
>> KStream callRecordCounter = branches[2];
>>
>> callRecordCounter
>> .map((imsi, callRecord) -> new KeyValue<>("", ""))
>> .countByKey(
>> UnlimitedWindows.of("counter-window"),
>> stringSerde
>> )
>> .print();
>>
>> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1
>> if data is DATA. Branch 2 is supposed to get triggered regardless of type
>> all the type so that then I can count stuff for a time window. BUT the
>> problem is branch is implemented like this:
>>
>> private class KStreamBranchProcessor extends AbstractProcessor {
>> @Override
>> public void process(K key, V value) {
>> for (int i = 0; i < predicates.length; i++) {
>> if (predicates[i].test(key, value)) {
>> // use forward with childIndex here and then break the
>> loop
>> // so that no record is going to be piped to multiple
>> streams
>> context().forward(key, value, i);
>> break;
>> }
>> }
>> }
>> }
>>
>> Note the break. So the counter branch is never reached. I’d like to
>> change the behavior of branch so that all predicates are checked and no
>> break happens, in say a branchAll() method. What’s the easiest way to this
>> functionality to the DSL? I tried process() but it doesn’t return KStream.
>>
>> Ara.
>>
>>
>>
>>
>> 
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you have
>> received it in error, please notify the sender immediately and delete the
>> original. Any other use of the e-mail by you is prohibited. Thank you in
>> advance for your cooperation.
>>
>> 
>>
>
>
>


Re: what's the relationship between Zookeeper and Kafka ?

2016-09-13 Thread Michael Noll
Eric,

the latest versions of Kafka use ZooKeeper only on the side of the Kafka
brokers, i.e. the servers in a Kafka cluster.

Background:
In older versions of Kafka, the Kafka consumer API required client
applications (that would read from data Kafka) to also talk to ZK.  Why
would they need to do that:  because ZK was used, in the old Kafka consumer
API, to track which data records they had already consumed, to rewind
reading from Kafka in case of failures like client machine crashes, and so
on.  In other words, consumption-related metadata was managed in ZK.
However, no "actual" data was ever routed through ZK.

The latest versions of Kafka have an improved consumer API that no longer
needs to talk to ZK -- any information that was previously maintained in ZK
(by these client apps) is now stored directly in Kafka.

Going back to your Spark programs:  They are using these older consumer API
versions of Kafka that still require talking to ZooKeeper, hence the need
to set things like "zoo1:2181".

> Does the kafka data actually get routed out of zookeeper before delivering
> the payload onto Spark ?

This was never the case (old API vs. new API).  Otherwise this would have
been a significant bottleneck. :-)  Data has always been served through the
Kafka brokers only.

Hope this helps,
Michael





On Sat, Sep 10, 2016 at 4:22 PM, Valerio Bruno  wrote:

> AFAIK Kafka uses Zookeeper to coordinate the Kafka clusters ( set of
> brokers ).
>
> Consumers usually connect Zookeeper to retrieve the list of brokers. Then
> connect the  broker.
>
> *Valerio*
>
> On 10 September 2016 at 22:11, Eric Ho  wrote:
>
> > I notice that some Spark programs would contact something like
> 'zoo1:2181'
> > when trying to suck data out of Kafka.
> >
> > Does the kafka data actually get routed out of zookeeper before
> delivering
> > the payload onto Spark ?
> >
> >
> >
> > --
> >
> > -eric ho
> >
>
>
>
> --
> *Valerio Bruno*
>
>
>
>
>
> *+39 3383163406+45 2991720...@valeriobruno.it fax: +39
> 1782275656skype: valerio_brunohttp://www.valeriobruno.it
> *
>


Re: micro-batching in kafka streams

2016-09-26 Thread Michael Noll
Ara,

may I ask why you need to use micro-batching in the first place?

Reason why I am asking: Typically, when people talk about micro-batching,
they are refer to the way some originally batch-based stream processing
tools "bolt on" real-time processing by making their batch sizes really
small.  Here, micro-batching belongs to the realm of the inner workings of
the stream processing tool.

Orthogonally to that, you have features/operations such as windowing,
triggers, etc. that -- unlike micro-batching -- allow you as the user of
the stream processing tool to define which exact computation logic you
need.  Whether or not, say, windowing is or is not computed via
micro-batching behind the scenes should (at least in an ideal world) be of
no concern to the user.

-Michael





On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi 
wrote:

> Hi,
>
> What’s the best way to do micro-batching in Kafka Streams? Any plans for a
> built-in mechanism? Perhaps StateStore could act as the buffer? What
> exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
> to be used anywhere?
>
> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/
>
> Ara.
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>


Re: Kafka Streams dynamic partitioning

2016-10-05 Thread Michael Noll
> So, in this case I should know the max number of possible keys so that
> I can create that number of partitions.

Assuming I understand your original question correctly, then you would not
need to do/know this.  Rather, pick the number of partitions in a way that
matches your needs to process the data in parallel (e.g. if you expect that
you require 10 machines in order to process the incoming data, then you'd
need 10 partitions).  Also, as a general recommendation:  It's often a good
idea to over-partition your topics.  For example, even if today 10 machines
(and thus 10 partitions) would be sufficient, pick a higher number of
partitions (say, 50) so you have some wiggle room to add more machines
(11...50) later if need be.



On Wed, Oct 5, 2016 at 9:34 AM, Adrienne Kole 
wrote:

> Hi Guozhang,
>
> So, in this case I should know the max number of possible keys so that I
> can create that number of partitions.
>
> Thanks
>
> Adrienne
>
> On Wed, Oct 5, 2016 at 1:00 AM, Guozhang Wang  wrote:
>
> > By default the partitioner will use murmur hash on the key and mode on
> > current num.partitions to determine which partitions to go to, so records
> > with the same key will be assigned to the same partition. Would that be
> OK
> > for your case?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Oct 4, 2016 at 3:00 PM, Adrienne Kole 
> > wrote:
> >
> > > Hi,
> > >
> > > From Streams documentation, I can see that each Streams instance is
> > > processing data independently (from other instances), reads from topic
> > > partition(s) and writes to specified topic.
> > >
> > >
> > > So here, the partitions of topic should be determined beforehand and
> > should
> > > remain static.
> > > In my usecase I want to create partitioned/keyed (time) windows and
> > > aggregate them.
> > > I can partition the incoming data to specified topic's partitions and
> > each
> > > Stream instance can do windowed aggregations.
> > >
> > > However, if I don't know the number of possible keys (to partition),
> then
> > > what should I do?
> > >
> > > Thanks
> > > Adrienne
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: Kafka Streams dynamic partitioning

2016-10-06 Thread Michael Noll
> I think this should be ' pick number of partitions that matches max number
> of possible keys in stream to be partitioned '.
> At least in my usecase , in which I am trying to partition stream by key
> and make windowed aggregations, if there are less number of topic
> partitions than possible keys,  then application will not work correctly.

As I said above, this is actually not needed -- which (I hope) means good
news for you. :-)



On Wed, Oct 5, 2016 at 11:27 PM, Adrienne Kole 
wrote:

> Thanks, I got the point. That solves my problem.
>
>
>
> On Wed, Oct 5, 2016 at 10:58 PM, Matthias J. Sax 
> wrote:
>
> > -BEGIN PGP SIGNED MESSAGE-
> > Hash: SHA512
> >
> > Hi,
> >
> > even if you have more distinct keys than partitions (ie, different key
> > go to the same partition), if you do "aggregate by key" Streams will
> > automatically separate the keys and compute an aggregate per key.
> > Thus, you do not need to worry about which keys is hashed to what
> > partition.
> >
> > - -Matthias
> >
> > On 10/5/16 1:37 PM, Adrienne Kole wrote:
> > > Hi,
> > >
> > > @Ali IMO, Yes. That is the job of kafka server to assign kafka
> > > instances partition(s) to process. Each instance can process more
> > > than one partition but one partition cannot be processed by more
> > > than one instance.
> > >
> > > @Michael, Thanks for reply.
> > >> Rather, pick the number of partitions in a way that matches your
> > >> needs to
> > > process the data in parallel I think this should be ' pick number
> > > of partitions that matches max number of possible keys in stream to
> > > be partitioned '. At least in my usecase , in which I am trying to
> > > partition stream by key and make windowed aggregations, if there
> > > are less number of topic partitions than possible keys,  then
> > > application will not work correctly.
> > >
> > > That is, if the number of topic partitions is less than possible
> > > stream keys, then different keyed stream tuples will be assigned to
> > > same topic. That was the problem that I was trying to solve and it
> > > seems the only solution is to estimate max number of possible keys
> > > and assign accordingly.
> > >
> > > Thanks Adrienne
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Oct 5, 2016 at 5:55 PM, Ali Akhtar 
> > > wrote:
> > >
> > >>> It's often a good
> > >> idea to over-partition your topics.  For example, even if today
> > >> 10 machines (and thus 10 partitions) would be sufficient, pick a
> > >> higher number of partitions (say, 50) so you have some wiggle
> > >> room to add more machines (11...50) later if need be.
> > >>
> > >> If you create e.g 30 partitions, but only have e.g 5 instances of
> > >> your program, all on the same consumer group, all using kafka
> > >> streams to consume the topic, do you still receive all the data
> > >> posted to the topic, or will you need to have the same instances
> > >> of the program as there are partitions?
> > >>
> > >> (If you have 1 instance, 30 partitions, will the same rules
> > >> apply, i.e it will receive all data?)
> > >>
> > >> On Wed, Oct 5, 2016 at 8:52 PM, Michael Noll
> > >>  wrote:
> > >>
> > >>>> So, in this case I should know the max number of possible
> > >>>> keys so that I can create that number of partitions.
> > >>>
> > >>> Assuming I understand your original question correctly, then
> > >>> you would
> > >> not
> > >>> need to do/know this.  Rather, pick the number of partitions in
> > >>> a way
> > >> that
> > >>> matches your needs to process the data in parallel (e.g. if you
> > >>> expect
> > >> that
> > >>> you require 10 machines in order to process the incoming data,
> > >>> then you'd need 10 partitions).  Also, as a general
> > >>> recommendation:  It's often a
> > >> good
> > >>> idea to over-partition your topics.  For example, even if today
> > >>> 10
> > >> machines
> > >>> (and thus 10 partitions) would be sufficient, pick a higher
> > >>> number of partitions (say, 50) so you have some wiggle room to
> &

Re: Printing to stdin from KStreams?

2016-10-07 Thread Michael Noll
Ali,

adding to what Matthias said:

Kafka 0.10 changed the message format to add so-called "embedded
timestamps" into each Kafka message.  The Java producer included in Kafka
0.10 includes such embedded timestamps into any generated message as
expected.

However, other clients (like the go kafka plugin you are using) may not
have been updated yet to be compatible with the new 0.10 message format.
That's the root cause why see these "-1" negative timestamps.   (The same
negative timestamp problem also happens if you attempt to read messages
that were generated with pre-0.10 versions of Kafka's Java producer.)

FYI: Kafka Streams' default timestamp extractor attempts to read those new
embedded timestamps.  If there are no such embedded timestamps, you run
into these "negative timestamps" errors.

Now, how to fix your problem?

- Fix the root cause: Check if there's a newer version of your Go kafka
plugin that generates messages in the new Kafka 0.10 format.  If there is
no such version, ask the maintainers for an update. :-)

- Work around the problem:  As Matthias said, you can also tell Kafka
Streams to not use its default timestamp extractor.  You can fallback to
the WallclockTimestampExtractor, though this means your application will
not use event-time but processing-time when processing your data, which is
probably not what you want (but it does prevent the -1 timestamp errors).
If your data (generated by the go kafka plugin) *does* contain timestamp
information in the message payload, then the better option is to write a
custom timestamp extract that inspects each message, extracts the timestamp
from the payload, and returns it to Kafka Streams.  The Timestamp Extractor
section in [1] explains how to write a custom one and how to configure your
app to use it.

Hope this helps,
Michael



[1]
http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters






On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax 
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> If you restart your application, it will resume where is left off
> (same as any other Kafka consumer that does use group management and
> commits offsets).
>
> If you want to reprocess data from scratch, you need to reset your
> application using bin/kafka-streams-application-reset.sh
>
> See also
> http://docs.confluent.io/3.0.1/streams/developer-guide.html#application-
> reset-tool
>
> and
> http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resett
> ing-a-streams-application/
>
>
> About the timestamp issue: it seems that your Go client does not
> assign valid timestamps when writing the data. As you already said,
> you need to provide a custom TimestampExtractor (or you
> WallclockTimestampExtractor if semantic permit) instead of default
> ConsumerRecordTimestampExtractor)
>
>
> - -Matthias
>
> On 10/6/16 7:53 PM, Ali Akhtar wrote:
> > Thanks.
> >
> > I'm encountering a strange issue.
> >
> > If I create messages thru console-producer.sh on a new topic,
> > things work fine.
> >
> > But on the topic that I need to consume, the messages are being
> > produced via the go kafka plugin.
> >
> > On this topic, at first, nothing happens when the stream starts
> > (i.e it doesn't process the messages which are already in there)
> >
> > Then, if I produce new messages, then my exception handler is
> > called with the exception that timestamp is negative.
> >
> > I'm pretty sure I'm on kafka 0.10. I downloaded it 2-3 weeks ago.
> >
> > I'm going to write a new timestamp extractor, but any ideas why
> > nothing happens with the old messages which are in the topic, it
> > only responds if i push new messages to this topic?
> >
> > On Fri, Oct 7, 2016 at 7:25 AM, Matthias J. Sax
> >  wrote:
> >
> > Sure.
> >
> > Just use #print() or #writeAsText()
> >
> >
> > -Matthias
> >
> > On 10/6/16 6:25 PM, Ali Akhtar wrote:
>  What the subject says. For dev, it would be a lot easier if
>  debugging info can be printed to stdin instead of another
>  topic, where it will persist.
> 
>  Any ideas if this is possible?
> 
> >>
> >
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJX9xO7AAoJECnhiMLycopPLFwQAK76xmPobB5xFdE/qFWm94d5
> 8lj8LahMMBUCG4xWCEs4shvGHjkh2kx2cUZmdgkUSLtEy7HGK6MgmjTa8Smse+0f
> JxQ0f/F8lkMvJKhuw9wmgOKnT/b/U/jRXvUedWvYXp/r7d8Z36DgW9hzO9Yx7Ugq
> qafvCfMdScE4FZEOoU/wJLiRJ3FZZsEobQSbyXR9Vmjs9UYUExUpq02B2N0ItvWz
> 6JPtWNC2PWSlc7j7C7PK0XYeexuE/ZK9yLrM7iuXh6fYeTy3MtBV3pHsDn3d+I2m
> AOUUMyJt4kaSyMX0BzWZVVKZVvdw7rbbGfsZisw67Mko2p+De2KoIEF3yEEvYEit
> Vks00KzGZ1gvGdMDvKzJJRkMVLUxl2R4LxH/TEwXKn5WYQullEHtQ3xb0U7sl2Ae
> UkIgOw0LY3sQj7NL2OOnt9dMS5m0r+FZPlMoYNmN7coAGxo98iKacIqR1tc3f2qs
> NxW2iUvD9lzVaZhaMOY3AjD1Q2G7yyQ+wRdlcZtNkAAID4mNrbyu7MKT7x6paLUf
> OXGjtl8tcMwegSqZtpEZnJFSquu0SuX2UraDWDiz6NaW+TYIM8Qnq3oF9gWDQX+I
> gvtqMiGzxxs4ZW9ysueT+X1MRoPRrnwlXPQ7XVFR6oKMEJrw0W2x8TkyHJiXVdls
> ZBA0KEjx9U8NNf+eiqN5
> =UMbs
> -END PGP SIGNATURE-

Re: kafka stream to new topic based on message key

2016-10-07 Thread Michael Noll
Gary,

adding to what Guozhang said:  Yes, you can programmatically create a new
Kafka topic from within your application.  But how you'd do that varies a
bit between current Kafka versions and the upcoming 0.10.1 release.

As of today (Kafka versions before the upcoming 0.10.1 release), you would
need to create your topic manually.  This can be on the CLI (which doesn't
help you in your scenario) or programmatically.  Right now programmatically
means you must directly talk to ZooKeeper, e.g. via zkclient.  If you are
looking for an example, the code at [1] may be helpful.  That code creates
a topic in ZK by using zkclient and Kafka's `AdminUtils`.

Looking ahead, Kafka's upcoming 0.10.1 release introduces an admin client
for creating/deleting topics (this new functionality is part of the
not-yet-fully-completed work on KIP-4).  This would give you a new
programmatic approach to create a topic without having to communicate with
ZooKeeper directly.

Lastly, keep in mind that it takes a while for a Kafka topic to be
created.  So you may run into race condition like situations.  You may
therefore want to double-check that the newly created topic is actually
ready-to-use before beginning to write to it or read from it.

Hope this helps!
Michael




[1]
https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/kafka/KafkaEmbedded.java#L133-L160







On Fri, Oct 7, 2016 at 8:12 AM, Guozhang Wang  wrote:

> If you can create a ZK client inside your processor implementation then you
> can definitely to create any topics by talking to ZK directly, it's just
> that Kafka Streams public interface does not expose any efficient ways
> beyond that for now.
>
> Note that in KIP-4 we are trying to introduce the admin client for such
> tasks such as create / delete topics, it has added such requests in the
> upcoming 0.10.1.0 release, but the full implementation is yet to be
> completed.
>
>
> Guozhang
>
>
> On Thu, Oct 6, 2016 at 12:48 PM, Gary Ogden  wrote:
>
> > Thanks Guozhang. I've gotten an example to work using your tips.
> >
> > So, is there no other way in streams to create a topic if
> > "auto.topic.create.enabled"
> > is set to false?  Maybe by creating a record in zookeeper for that topic?
> >
> >
> >
> > On 5 October 2016 at 17:20, Guozhang Wang  wrote:
> >
> > > Hello Gary,
> > >
> > >
> > > 1. The InternalTopicManager is only used by the Streams-instantiated
> > > PartitionAssignor to create internal topics for auto-repartitioning and
> > > changelog.
> > >
> > > 2. About "RecordCollector.Supplier": you are right, and as I wrote in
> the
> > > above email you have to force casting it to RecordCollector.Supplier,
> > > theoretically this is not safe but the internal Impl is always used.
> > >
> > >
> > > If you know before hand of all the possible topics that you would want
> to
> > > send based on the key-value pair, you can then use KStreams.branch() to
> > > branch the source stream into multiple ones based on the content, with
> > each
> > > branched stream to a different topic.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Oct 5, 2016 at 7:48 AM, Gary Ogden  wrote:
> > >
> > > > Guozhang. I was just looking at the source for this, and it looks
> like
> > > the
> > > > RecordCollector.Supplier is part of the internal ProcessorContextImpl
> > > > class.  I don't think that's exposed to me, is it?
> > > >
> > > > If I create a processor class that extends AbstractProcess, it only
> has
> > > > access to the ProcessorContext interface, which doesn't expose the
> > > > Supplier.
> > > >
> > > > On 5 October 2016 at 09:42, Gary Ogden  wrote:
> > > >
> > > > > What if we were to use kafka connect instead of streams? Does it
> have
> > > the
> > > > > ability to specify partitions, rf, segment size etc?
> > > > >
> > > > > On 5 October 2016 at 09:42, Gary Ogden  wrote:
> > > > >
> > > > >> Thanks Guozhang.
> > > > >>
> > > > >> So there's no way we could also use InternalTopicManager to
> specify
> > > the
> > > > >> number of partitions and RF?
> > > > >>
> > > > >> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
> > > > >> /java/org/apache/kafka/streams/processor/internals/InternalT
> > > > >> opicManager.java
> > > > >>
> > > > >> On 4 October 2016 at 19:34, Guozhang Wang 
> > wrote:
> > > > >>
> > > > >>> Hello Gary,
> > > > >>>
> > > > >>> This is also doable in the Processor API, you can use the record
> > > > >>> collector
> > > > >>> from ProcessorContext to send data to arbitrary topics, i.e.:
> > > > >>>
> > > > >>> RecordCollector collector = ((RecordCollector.Supplier)
> > > > >>> context).recordCollector();
> > > > >>> collector.send(new ProducerRecord<>(topic, *...*), keySerializer,
> > > > >>> valSerializer, partitioner);
> > > > >>>
> > > > >>>
> > > > >>> But note that if the new topic, e.g. "123456_lv2" is not created,
> > > then
> > > > >>> the send call will thrown an exception unless the borker-side
> > config
> > > >

Re: kafka stream to new topic based on message key

2016-10-07 Thread Michael Noll
Great, happy to hear that, Gary!

On Fri, Oct 7, 2016 at 3:30 PM, Gary Ogden  wrote:

> Thanks for all the help gents. I really appreciate it. It's exactly what I
> needed.
>
> On 7 October 2016 at 06:56, Michael Noll  wrote:
>
> > Gary,
> >
> > adding to what Guozhang said:  Yes, you can programmatically create a new
> > Kafka topic from within your application.  But how you'd do that varies a
> > bit between current Kafka versions and the upcoming 0.10.1 release.
> >
> > As of today (Kafka versions before the upcoming 0.10.1 release), you
> would
> > need to create your topic manually.  This can be on the CLI (which
> doesn't
> > help you in your scenario) or programmatically.  Right now
> programmatically
> > means you must directly talk to ZooKeeper, e.g. via zkclient.  If you are
> > looking for an example, the code at [1] may be helpful.  That code
> creates
> > a topic in ZK by using zkclient and Kafka's `AdminUtils`.
> >
> > Looking ahead, Kafka's upcoming 0.10.1 release introduces an admin client
> > for creating/deleting topics (this new functionality is part of the
> > not-yet-fully-completed work on KIP-4).  This would give you a new
> > programmatic approach to create a topic without having to communicate
> with
> > ZooKeeper directly.
> >
> > Lastly, keep in mind that it takes a while for a Kafka topic to be
> > created.  So you may run into race condition like situations.  You may
> > therefore want to double-check that the newly created topic is actually
> > ready-to-use before beginning to write to it or read from it.
> >
> > Hope this helps!
> > Michael
> >
> >
> >
> >
> > [1]
> > https://github.com/confluentinc/examples/blob/
> > master/kafka-streams/src/test/java/io/confluent/examples/
> > streams/kafka/KafkaEmbedded.java#L133-L160
> >
> >
> >
> >
> >
> >
> >
> > On Fri, Oct 7, 2016 at 8:12 AM, Guozhang Wang 
> wrote:
> >
> > > If you can create a ZK client inside your processor implementation then
> > you
> > > can definitely to create any topics by talking to ZK directly, it's
> just
> > > that Kafka Streams public interface does not expose any efficient ways
> > > beyond that for now.
> > >
> > > Note that in KIP-4 we are trying to introduce the admin client for such
> > > tasks such as create / delete topics, it has added such requests in the
> > > upcoming 0.10.1.0 release, but the full implementation is yet to be
> > > completed.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Oct 6, 2016 at 12:48 PM, Gary Ogden  wrote:
> > >
> > > > Thanks Guozhang. I've gotten an example to work using your tips.
> > > >
> > > > So, is there no other way in streams to create a topic if
> > > > "auto.topic.create.enabled"
> > > > is set to false?  Maybe by creating a record in zookeeper for that
> > topic?
> > > >
> > > >
> > > >
> > > > On 5 October 2016 at 17:20, Guozhang Wang 
> wrote:
> > > >
> > > > > Hello Gary,
> > > > >
> > > > >
> > > > > 1. The InternalTopicManager is only used by the
> Streams-instantiated
> > > > > PartitionAssignor to create internal topics for auto-repartitioning
> > and
> > > > > changelog.
> > > > >
> > > > > 2. About "RecordCollector.Supplier": you are right, and as I wrote
> in
> > > the
> > > > > above email you have to force casting it to
> RecordCollector.Supplier,
> > > > > theoretically this is not safe but the internal Impl is always
> used.
> > > > >
> > > > >
> > > > > If you know before hand of all the possible topics that you would
> > want
> > > to
> > > > > send based on the key-value pair, you can then use
> KStreams.branch()
> > to
> > > > > branch the source stream into multiple ones based on the content,
> > with
> > > > each
> > > > > branched stream to a different topic.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Wed, Oct 5, 2016 at 7:48 AM, Gary Ogden 
> wrote:
> > > > >
> > > > > > Guozhang. I was just looking at the source for this, and it looks
> > > 

Re: Printing to stdin from KStreams?

2016-10-07 Thread Michael Noll
> Is it possible to have kafka-streams-reset be automatically called during
> development? Something like streams.cleanUp() but which also does reset?

Unfortunately this isn't possible (yet), Ali.  I am also not aware of any
plan to add such a feature in the short-term.



On Fri, Oct 7, 2016 at 1:36 PM, Ali Akhtar  wrote:

> Is it possible to have kafka-streams-reset be automatically called during
> development? Something like streams.cleanUp() but which also does reset?
>
> On Fri, Oct 7, 2016 at 2:45 PM, Michael Noll  wrote:
>
> > Ali,
> >
> > adding to what Matthias said:
> >
> > Kafka 0.10 changed the message format to add so-called "embedded
> > timestamps" into each Kafka message.  The Java producer included in Kafka
> > 0.10 includes such embedded timestamps into any generated message as
> > expected.
> >
> > However, other clients (like the go kafka plugin you are using) may not
> > have been updated yet to be compatible with the new 0.10 message format.
> > That's the root cause why see these "-1" negative timestamps.   (The same
> > negative timestamp problem also happens if you attempt to read messages
> > that were generated with pre-0.10 versions of Kafka's Java producer.)
> >
> > FYI: Kafka Streams' default timestamp extractor attempts to read those
> new
> > embedded timestamps.  If there are no such embedded timestamps, you run
> > into these "negative timestamps" errors.
> >
> > Now, how to fix your problem?
> >
> > - Fix the root cause: Check if there's a newer version of your Go kafka
> > plugin that generates messages in the new Kafka 0.10 format.  If there is
> > no such version, ask the maintainers for an update. :-)
> >
> > - Work around the problem:  As Matthias said, you can also tell Kafka
> > Streams to not use its default timestamp extractor.  You can fallback to
> > the WallclockTimestampExtractor, though this means your application will
> > not use event-time but processing-time when processing your data, which
> is
> > probably not what you want (but it does prevent the -1 timestamp errors).
> > If your data (generated by the go kafka plugin) *does* contain timestamp
> > information in the message payload, then the better option is to write a
> > custom timestamp extract that inspects each message, extracts the
> timestamp
> > from the payload, and returns it to Kafka Streams.  The Timestamp
> Extractor
> > section in [1] explains how to write a custom one and how to configure
> your
> > app to use it.
> >
> > Hope this helps,
> > Michael
> >
> >
> >
> > [1]
> > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > html#optional-configuration-parameters
> >
> >
> >
> >
> >
> >
> > On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax 
> > wrote:
> >
> > > -BEGIN PGP SIGNED MESSAGE-
> > > Hash: SHA512
> > >
> > > If you restart your application, it will resume where is left off
> > > (same as any other Kafka consumer that does use group management and
> > > commits offsets).
> > >
> > > If you want to reprocess data from scratch, you need to reset your
> > > application using bin/kafka-streams-application-reset.sh
> > >
> > > See also
> > > http://docs.confluent.io/3.0.1/streams/developer-guide.
> html#application-
> > > reset-tool
> > >
> > > and
> > > http://www.confluent.io/blog/data-reprocessing-with-kafka-
> streams-resett
> > > ing-a-streams-application/
> > >
> > >
> > > About the timestamp issue: it seems that your Go client does not
> > > assign valid timestamps when writing the data. As you already said,
> > > you need to provide a custom TimestampExtractor (or you
> > > WallclockTimestampExtractor if semantic permit) instead of default
> > > ConsumerRecordTimestampExtractor)
> > >
> > >
> > > - -Matthias
> > >
> > > On 10/6/16 7:53 PM, Ali Akhtar wrote:
> > > > Thanks.
> > > >
> > > > I'm encountering a strange issue.
> > > >
> > > > If I create messages thru console-producer.sh on a new topic,
> > > > things work fine.
> > > >
> > > > But on the topic that I need to consume, the messages are being
> > > > produced via the go kafka plugin.
> > > >
> > > > On this topic, at first, nothing happens when the stream starts
> > > > (i.e it doesn&

Re: Printing to stdin from KStreams?

2016-10-07 Thread Michael Noll
Nice idea. :-)

Happy to hear it works for you, and thanks for sharing your workaround with
the mailing list.

On Fri, Oct 7, 2016 at 5:33 PM, Ali Akhtar  wrote:

> Thank you.
>
> I've resolved this by adding a run config in Intellij for running
> streams-reset, and using the same application id in all applications in
> development (transparently reading the application id from environment
> variables, so in my kubernetes config I can specify different app ids for
> production)
>
> On Fri, Oct 7, 2016 at 8:05 PM, Michael Noll  wrote:
>
> > > Is it possible to have kafka-streams-reset be automatically called
> during
> > > development? Something like streams.cleanUp() but which also does
> reset?
> >
> > Unfortunately this isn't possible (yet), Ali.  I am also not aware of any
> > plan to add such a feature in the short-term.
> >
> >
> >
> > On Fri, Oct 7, 2016 at 1:36 PM, Ali Akhtar  wrote:
> >
> > > Is it possible to have kafka-streams-reset be automatically called
> during
> > > development? Something like streams.cleanUp() but which also does
> reset?
> > >
> > > On Fri, Oct 7, 2016 at 2:45 PM, Michael Noll 
> > wrote:
> > >
> > > > Ali,
> > > >
> > > > adding to what Matthias said:
> > > >
> > > > Kafka 0.10 changed the message format to add so-called "embedded
> > > > timestamps" into each Kafka message.  The Java producer included in
> > Kafka
> > > > 0.10 includes such embedded timestamps into any generated message as
> > > > expected.
> > > >
> > > > However, other clients (like the go kafka plugin you are using) may
> not
> > > > have been updated yet to be compatible with the new 0.10 message
> > format.
> > > > That's the root cause why see these "-1" negative timestamps.   (The
> > same
> > > > negative timestamp problem also happens if you attempt to read
> messages
> > > > that were generated with pre-0.10 versions of Kafka's Java producer.)
> > > >
> > > > FYI: Kafka Streams' default timestamp extractor attempts to read
> those
> > > new
> > > > embedded timestamps.  If there are no such embedded timestamps, you
> run
> > > > into these "negative timestamps" errors.
> > > >
> > > > Now, how to fix your problem?
> > > >
> > > > - Fix the root cause: Check if there's a newer version of your Go
> kafka
> > > > plugin that generates messages in the new Kafka 0.10 format.  If
> there
> > is
> > > > no such version, ask the maintainers for an update. :-)
> > > >
> > > > - Work around the problem:  As Matthias said, you can also tell Kafka
> > > > Streams to not use its default timestamp extractor.  You can fallback
> > to
> > > > the WallclockTimestampExtractor, though this means your application
> > will
> > > > not use event-time but processing-time when processing your data,
> which
> > > is
> > > > probably not what you want (but it does prevent the -1 timestamp
> > errors).
> > > > If your data (generated by the go kafka plugin) *does* contain
> > timestamp
> > > > information in the message payload, then the better option is to
> write
> > a
> > > > custom timestamp extract that inspects each message, extracts the
> > > timestamp
> > > > from the payload, and returns it to Kafka Streams.  The Timestamp
> > > Extractor
> > > > section in [1] explains how to write a custom one and how to
> configure
> > > your
> > > > app to use it.
> > > >
> > > > Hope this helps,
> > > > Michael
> > > >
> > > >
> > > >
> > > > [1]
> > > > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > > > html#optional-configuration-parameters
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Oct 7, 2016 at 5:17 AM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > > > wrote:
> > > >
> > > > > -BEGIN PGP SIGNED MESSAGE-
> > > > > Hash: SHA512
> > > > >
> > > > > If you restart your application, it will resume where is left off
> > > > > (same as any other Kafka consumer that does use group management
> and
> >

Re: puncutuate() never called

2016-10-07 Thread Michael Noll
David,

punctuate() is still data-driven at this point, even when you're using the
WallClock timestamp extractor.

To use an example: Imagine you have configured punctuate() to be run every
5 seconds.  If there's no data being received for a minute, then punctuate
won't be called -- even though you probably would have expected this to
happen 12 times during this 1 minute.

(FWIW, there's an ongoing discussion to improve punctuate(), part of which
is motivated by the current behavior that arguably is not very intuitive to
many users.)

Could this be the problem you're seeing?  See also the related discussion
at
http://stackoverflow.com/questions/39535201/kafka-problems-with-timestampextractor
.






On Fri, Oct 7, 2016 at 6:07 PM, David Garcia  wrote:

> Hello, I’m sure this question has been asked many times.
> We have a test-cluster (confluent 3.0.0 release) of 3 aws m4.xlarges.  We
> have an application that needs to use the punctuate() function to do some
> work on a regular interval.  We are using the WallClock extractor.
> Unfortunately, the method is never called.  I have checked the
> filedescriptor setting for both the user as well as the process, and
> everything seems to be fine.  Is this a known bug, or is there something
> obvious I’m missing?
>
> One note, the application used to work on this cluster, but now it’s not
> working.  Not really sure what is going on?
>
> -David
>


Re: In Kafka Streaming, Serdes should use Optionals

2016-10-07 Thread Michael Noll
Ali, the Apache Kafka project still targets Java 7, which means we can't
use Java 8 features just yet.

FYI: There's on ongoing conversation about when Kafka would move from Java
7 to Java 8.

On Fri, Oct 7, 2016 at 6:14 PM, Ali Akhtar  wrote:

> Since we're using Java 8 in most cases anyway, Serdes / Serialiazers should
> use options, to avoid having to deal with the lovely nulls.
>


Re: Kafka null keys - OK or a problem?

2016-10-09 Thread Michael Noll
FYI: Kafka's new Java producer (which ships with Kafka) the behavior is as
follows:  If no partition is explicitly specified (to send the message to)
AND the key is null, then the DefaultPartitioner [1] will assign messages
to topic partitions in a round-robin fashion.  See the javadoc and also the
little bit of code in [1] for details.

Not sure which Go client you're using exactly so I can't tell whether your
Go client follows the behavior of Kafka's Java producer.

-Michael




[1]
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java


On Mon, Oct 10, 2016 at 7:53 AM, Ali Akhtar  wrote:

> If keys are null, what happens in terms of partitioning, is the load spread
> evenly..?
>
> On Mon, Oct 10, 2016 at 7:59 AM, Gwen Shapira  wrote:
>
> > Kafka itself supports null keys. I'm not sure about the Go client you
> > use, but Confluent's Go client also supports null keys
> > (https://github.com/confluentinc/confluent-kafka-go/).
> >
> > If you decide to generate keys and you want even spread, a random
> > number generator is probably your best bet.
> >
> > Gwen
> >
> > On Sun, Oct 9, 2016 at 6:05 PM, Ali Akhtar  wrote:
> > > A kafka producer written elsewhere that I'm using, which uses the Go
> > kafka
> > > driver, is sending messages where the key is null.
> > >
> > > Is this OK - or will this cause issues due to partitioning not
> happening
> > > correctly?
> > >
> > > What would be a good way to generate keys in this case, to ensure even
> > > partition spread?
> > >
> > > Thanks.
> >
> >
> >
> > --
> > Gwen Shapira
> > Product Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter | blog
> >
>


Re: puncutuate() never called

2016-10-09 Thread Michael Noll
> We have run the application (and have confirmed data is being received)
for over 30 mins…with a 60-second timer.

Ok, so your app does receive data but punctuate() still isn't being called.
:-(


> So, do we need to just rebuild our cluster with bigger machines?

That's worth trying out.  See
http://www.confluent.io/blog/design-and-deployment-considerations-for-deploying-apache-kafka-on-aws/
for some EC2 instance types recommendations.

But I'd also suggest to look into the logs of (1) your application, (2) the
log files of the Kafka broker(s), and (3) the log files of ZooKeeper to see
whether you see anything suspicious?

Sorry for not being able to provide more actionable feedback at this
point.  Typically we have seen such issues only (but not exclusively) in
cases where there have been problems in the environment in which your
application is running and/or the environment of the Kafka clusters.
Unfortunately these environment problems are a bit tricky to debug remotely
via the mailing list.

-Michael





On Fri, Oct 7, 2016 at 8:11 PM, David Garcia  wrote:

> Yeah, this is possible.  We have run the application (and have confirmed
> data is being received) for over 30 mins…with a 60-second timer.  So, do we
> need to just rebuild our cluster with bigger machines?
>
> -David
>
> On 10/7/16, 11:18 AM, "Michael Noll"  wrote:
>
> David,
>
> punctuate() is still data-driven at this point, even when you're using
> the
> WallClock timestamp extractor.
>
> To use an example: Imagine you have configured punctuate() to be run
> every
> 5 seconds.  If there's no data being received for a minute, then
> punctuate
> won't be called -- even though you probably would have expected this to
> happen 12 times during this 1 minute.
>
> (FWIW, there's an ongoing discussion to improve punctuate(), part of
> which
> is motivated by the current behavior that arguably is not very
> intuitive to
> many users.)
>
> Could this be the problem you're seeing?  See also the related
> discussion
> at
> http://stackoverflow.com/questions/39535201/kafka-problems-with-
> timestampextractor
> .
>
>
>
>
>
>
> On Fri, Oct 7, 2016 at 6:07 PM, David Garcia 
> wrote:
>
> > Hello, I’m sure this question has been asked many times.
> > We have a test-cluster (confluent 3.0.0 release) of 3 aws
> m4.xlarges.  We
> > have an application that needs to use the punctuate() function to do
> some
> > work on a regular interval.  We are using the WallClock extractor.
> > Unfortunately, the method is never called.  I have checked the
> > filedescriptor setting for both the user as well as the process, and
> > everything seems to be fine.  Is this a known bug, or is there
> something
> > obvious I’m missing?
> >
> > One note, the application used to work on this cluster, but now it’s
> not
> > working.  Not really sure what is going on?
> >
> > -David
> >
>
>
>


Re: Kafka null keys - OK or a problem?

2016-10-10 Thread Michael Noll
Depends on which partitioner you are using, see [1] and [2].  From what I
understand the `NewHashPartitioner` comes closest to the behavior of Kafka
Java producer, but instead of going round-robin for null-keyed messages it
picks a partition at random.



[1] https://godoc.org/github.com/Shopify/sarama#Partitioner
[2] https://github.com/Shopify/sarama/blob/master/partitioner.go



On Mon, Oct 10, 2016 at 8:51 AM, Ali Akhtar  wrote:

> Hey Michael,
>
> We're using this one: https://github.com/Shopify/sarama
>
> Any ideas how that one works?
>
> On Mon, Oct 10, 2016 at 11:48 AM, Michael Noll 
> wrote:
>
> > FYI: Kafka's new Java producer (which ships with Kafka) the behavior is
> as
> > follows:  If no partition is explicitly specified (to send the message
> to)
> > AND the key is null, then the DefaultPartitioner [1] will assign messages
> > to topic partitions in a round-robin fashion.  See the javadoc and also
> the
> > little bit of code in [1] for details.
> >
> > Not sure which Go client you're using exactly so I can't tell whether
> your
> > Go client follows the behavior of Kafka's Java producer.
> >
> > -Michael
> >
> >
> >
> >
> > [1]
> > https://github.com/apache/kafka/blob/trunk/clients/src/
> > main/java/org/apache/kafka/clients/producer/internals/
> > DefaultPartitioner.java
> >
> >
> > On Mon, Oct 10, 2016 at 7:53 AM, Ali Akhtar 
> wrote:
> >
> > > If keys are null, what happens in terms of partitioning, is the load
> > spread
> > > evenly..?
> > >
> > > On Mon, Oct 10, 2016 at 7:59 AM, Gwen Shapira 
> wrote:
> > >
> > > > Kafka itself supports null keys. I'm not sure about the Go client you
> > > > use, but Confluent's Go client also supports null keys
> > > > (https://github.com/confluentinc/confluent-kafka-go/).
> > > >
> > > > If you decide to generate keys and you want even spread, a random
> > > > number generator is probably your best bet.
> > > >
> > > > Gwen
> > > >
> > > > On Sun, Oct 9, 2016 at 6:05 PM, Ali Akhtar 
> > wrote:
> > > > > A kafka producer written elsewhere that I'm using, which uses the
> Go
> > > > kafka
> > > > > driver, is sending messages where the key is null.
> > > > >
> > > > > Is this OK - or will this cause issues due to partitioning not
> > > happening
> > > > > correctly?
> > > > >
> > > > > What would be a good way to generate keys in this case, to ensure
> > even
> > > > > partition spread?
> > > > >
> > > > > Thanks.
> > > >
> > > >
> > > >
> > > > --
> > > > Gwen Shapira
> > > > Product Manager | Confluent
> > > > 650.450.2760 | @gwenshap
> > > > Follow us: Twitter | blog
> > > >
> > >
> >
>


Re: Convert a KStream to KTable

2016-10-10 Thread Michael Noll
Elias,

yes, that is correct.

I also want to explain why:

One can always convert a KTable to a KStream (note: this is the opposite
direction of what you want to do) because one only needs to iterate through
the table to generate the stream.

To convert a KStream into a KTable (what you want to do) the user must tell
Kafka how multiple values of the same key should be processed to end up
with just a single value for that key (a KTable has only a single value for
each key).  This step -- the semantics desired by the user of how to
"squash" multiple values into a single value -- is something that Kafka
cannot second-guess.  Some users want to compute a sum and thus add all the
values for a key into a final "sum" value;  others may want to compute
MAX(), and so on.  This explains why there's no KStream#toTable()
equivalent to KTable#toStream().

Hope this helps!
Michael





On Sat, Oct 8, 2016 at 5:03 AM, Elias Levy 
wrote:

> I am correct in assuming there is no way to convert a KStream into a
> KTable, similar to KTable.toStream() but in the reverse direction, other
> than using KSteam.reduceByKey and a Reducer or looping back through Kafka
> and using KStreamBuilder.table?
>


Re: sFlow/NetFlow/Pcap Plugin for Kafka Producer

2016-10-10 Thread Michael Noll
Aris,

even today you can already use Kafka to deliver Netflow/Pcap/etc. messages,
and people are already using it for that (I did that in previous projects
of mine, too).

Simply encode your Pcap/... messages appropriately (I'd recommend to take a
look at Avro, which allows you to structure your data similar to e.g.
Pcap's native format [1]), and then write the encoded messages to Kafka.
Your downstream applications can then read the encoded messages back from
Kafka, decode, and commence processing.

That was a brief summary to get you started, feel free to take a look at
the Apache Kafka docs at kafka.apache.org and/or ask further questions here.

-Michael




[1] https://wiki.wireshark.org/Development/LibpcapFileFormat

On Mon, Oct 10, 2016 at 11:19 AM, Aris Risdianto  wrote:

> ​Hello,
>
>
> ​Is there any plan or implementation to use Kafka for delivering
> sFlow/NetFlow/Pcap messages?
>
>
> Best Regards,
> Aris.
>


Re: sFlow/NetFlow/Pcap Plugin for Kafka Producer

2016-10-10 Thread Michael Noll
Aris,

I am not aware of an out of the box tool for Pcap->Kafka ingestion (in my
case back then we wrote our own).  Maybe others know.



On Monday, October 10, 2016, Aris Risdianto  wrote:

> Thank you for answer Michael.
>
> Actually, I have made a simple producer from Pcap to Kafka. Since it is not
> structured, so it is difficult for further processing by a consumer. But, I
> will take a look at Avro as you mentioned.
>
> I just wondering, if there are any proper implementation for this
> requirement, because I couldn't find any tool in the kafka ecosystem page.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem
>
>
> Best Regards,
> Aris.
>
>
> On Mon, Oct 10, 2016 at 6:55 PM, Michael Noll  > wrote:
>
> > Aris,
> >
> > even today you can already use Kafka to deliver Netflow/Pcap/etc.
> messages,
> > and people are already using it for that (I did that in previous projects
> > of mine, too).
> >
> > Simply encode your Pcap/... messages appropriately (I'd recommend to
> take a
> > look at Avro, which allows you to structure your data similar to e.g.
> > Pcap's native format [1]), and then write the encoded messages to Kafka.
> > Your downstream applications can then read the encoded messages back from
> > Kafka, decode, and commence processing.
> >
> > That was a brief summary to get you started, feel free to take a look at
> > the Apache Kafka docs at kafka.apache.org and/or ask further questions
> > here.
> >
> > -Michael
> >
> >
> >
> >
> > [1] https://wiki.wireshark.org/Development/LibpcapFileFormat
> >
> > On Mon, Oct 10, 2016 at 11:19 AM, Aris Risdianto  > wrote:
> >
> > > ​Hello,
> > >
> > >
> > > ​Is there any plan or implementation to use Kafka for delivering
> > > sFlow/NetFlow/Pcap messages?
> > >
> > >
> > > Best Regards,
> > > Aris.
> > >
> >
>


-- 
*Michael G. Noll*
Product Manager | Confluent
+1 650 453 5860 | @miguno <https://twitter.com/miguno>
Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
<http://www.confluent.io/blog>


Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-11 Thread Michael Noll
Ratha,

if you based your problematic code on the PipeDemo example, then you should
have these two lines in your code (which most probably you haven't changed):

props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());

This configures your application to interpret (= encode/decode), by
default, the keys and values of any messages it reads from Kafka as
strings.  This works for the PipeDemo example because the keys and values
are actually strings.

In your application, however, you do:

   KStream kafkaPayloadStream =
builder.stream(sourceTopics);

This won't work, because `builder.stream()`, when calling it without
explicit serdes, will use the default serdes configured for your
application.  So `builder.stream(sourceTopics)` will give you
`KStream`, not `KStream`.  Also, you
can't just cast a String to KafkaPayload to "fix" the problem;  if you
attempt to do so you run into the ClassCastException that you reported
below.

What you need to do fix your problem is:

1. Provide a proper serde for `KafkaPayload`.  See
http://docs.confluent.io/current/streams/developer-guide.html#implementing-custom-serializers-deserializers-serdes.
There are also example implementations of such custom serdes at [1] and [2].

Once you have that, you can e.g. write:

final Serde stringSerde = Serdes.String(); // provided by Kafka
final Serde kafkaPayloadSerde = ...; // must be provided
by you!

2.  Call `builder.stream()` with explicit serdes to overrides the default
serdes.  stringSerde is for the keys, kafkaPayloadSerde is for the values.

KStream kafkaPayloadStream =
builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics);

That should do it.

Lastly, you must think about serialization also when calling `to()` or
`through()`:

kafkaPayloadStream.to(targetTopic);

If you haven't changed to default key and value serdes, then `to()` will
fail because it will by default (in your app configuration) interpret
message values still as strings rather than KafkaPayload.  To fix this you
should call:

kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic);

You need to override the default serdes whenever the data must be written
with, well, non-default serdes.

I'd recommend reading
http://docs.confluent.io/current/streams/developer-guide.html#data-types-and-serialization
to better understand how this works.


Hope this helps,
Michael



[1]
http://docs.confluent.io/current/streams/developer-guide.html#available-serializers-deserializers-serdes
[2]
https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/confluent/examples/streams/utils




On Tue, Oct 11, 2016 at 7:38 AM, Ratha v  wrote:

> I checked my target topic and I see few messages than the source topic. (If
> source topic have 5 messages, I see 2 messages in my target topic) What
> settings I need to do ?
>
> And, when I try to consume message from the target topic, I get ClassCast
> Exception.
>
> java.lang.ClassCastException: java.lang.String cannot be cast to
> xx.yy.core.kafkamodels.KafkaPayload;
>
> * receivedPayload = (KafkaPayload) consumerRecord.value();*
>
>
> I Merge two topics like;
>
> * KStreamBuilder builder = new KStreamBuilder();*
>
> * KStream kafkaPayloadStream =
> builder.stream(sourceTopics);*
>
> * kafkaPayloadStream.to(targetTopic);*
>
> * streams = new KafkaStreams(builder, properties);*
>
> * streams.start();*
>
>
> Why do I see classcast exception when consuming the message?
>
>
> On 11 October 2016 at 15:19, Ratha v  wrote:
>
> > Hi all;
> > I have custom datatype defined (a pojo class).
> > I copy  messages from one topic to another topic.
> > I do not see any messages in my target topic.
> > This works fro string messages, but not for my custom message.
> > Waht might be the cause?
> > I followed this sample [1]
> > [1]
> > https://github.com/apache/kafka/blob/trunk/streams/
> > examples/src/main/java/org/apache/kafka/streams/examples/
> > pipe/PipeDemo.java
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>


Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-11 Thread Michael Noll
When I wrote:

"If you haven't changed to default key and value serdes, then `to()`
will fail because [...]"

it should have read:

"If you haven't changed the default key and value serdes, then `to()`
will fail because [...]"



On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll  wrote:

> Ratha,
>
> if you based your problematic code on the PipeDemo example, then you
> should have these two lines in your code (which most probably you haven't
> changed):
>
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
>
> This configures your application to interpret (= encode/decode), by
> default, the keys and values of any messages it reads from Kafka as
> strings.  This works for the PipeDemo example because the keys and values
> are actually strings.
>
> In your application, however, you do:
>
>KStream kafkaPayloadStream =
> builder.stream(sourceTopics);
>
> This won't work, because `builder.stream()`, when calling it without
> explicit serdes, will use the default serdes configured for your
> application.  So `builder.stream(sourceTopics)` will give you
> `KStream`, not `KStream`.  Also, you
> can't just cast a String to KafkaPayload to "fix" the problem;  if you
> attempt to do so you run into the ClassCastException that you reported
> below.
>
> What you need to do fix your problem is:
>
> 1. Provide a proper serde for `KafkaPayload`.  See
> http://docs.confluent.io/current/streams/developer-
> guide.html#implementing-custom-serializers-deserializers-serdes.  There
> are also example implementations of such custom serdes at [1] and [2].
>
> Once you have that, you can e.g. write:
>
> final Serde stringSerde = Serdes.String(); // provided by Kafka
> final Serde kafkaPayloadSerde = ...; // must be provided
> by you!
>
> 2.  Call `builder.stream()` with explicit serdes to overrides the default
> serdes.  stringSerde is for the keys, kafkaPayloadSerde is for the values.
>
> KStream kafkaPayloadStream =
> builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics);
>
> That should do it.
>
> Lastly, you must think about serialization also when calling `to()` or
> `through()`:
>
> kafkaPayloadStream.to(targetTopic);
>
> If you haven't changed to default key and value serdes, then `to()` will
> fail because it will by default (in your app configuration) interpret
> message values still as strings rather than KafkaPayload.  To fix this you
> should call:
>
> kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic);
>
> You need to override the default serdes whenever the data must be written
> with, well, non-default serdes.
>
> I'd recommend reading http://docs.confluent.io/current/streams/developer-
> guide.html#data-types-and-serialization to better understand how this
> works.
>
>
> Hope this helps,
> Michael
>
>
>
> [1] http://docs.confluent.io/current/streams/developer-
> guide.html#available-serializers-deserializers-serdes
> [2] https://github.com/confluentinc/examples/tree/
> kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/
> confluent/examples/streams/utils
>
>
>
>
> On Tue, Oct 11, 2016 at 7:38 AM, Ratha v  wrote:
>
>> I checked my target topic and I see few messages than the source topic.
>> (If
>> source topic have 5 messages, I see 2 messages in my target topic) What
>> settings I need to do ?
>>
>> And, when I try to consume message from the target topic, I get ClassCast
>> Exception.
>>
>> java.lang.ClassCastException: java.lang.String cannot be cast to
>> xx.yy.core.kafkamodels.KafkaPayload;
>>
>> * receivedPayload = (KafkaPayload) consumerRecord.value();*
>>
>>
>> I Merge two topics like;
>>
>> * KStreamBuilder builder = new KStreamBuilder();*
>>
>> * KStream kafkaPayloadStream =
>> builder.stream(sourceTopics);*
>>
>> * kafkaPayloadStream.to(targetTopic);*
>>
>> * streams = new KafkaStreams(builder, properties);*
>>
>> * streams.start();*
>>
>>
>> Why do I see classcast exception when consuming the message?
>>
>>
>> On 11 October 2016 at 15:19, Ratha v  wrote:
>>
>> > Hi all;
>> > I have custom datatype defined (a pojo class).
>> > I copy  messages from one topic to another topic.
>> > I do not see any messages in my target topic.
>> > This works fro string messages, but not for my custom message.
>> > Waht might be the cause?
>> > I followed this sample [1]
>> > [1]
>> > https://github.com/apache/kafka/blob/trunk/streams/
>> > examples/src/main/java/org/apache/kafka/streams/examples/
>> > pipe/PipeDemo.java
>> >
>> >
>> > --
>> > -Ratha
>> > http://vvratha.blogspot.com/
>> >
>>
>>
>>
>> --
>> -Ratha
>> http://vvratha.blogspot.com/
>>
>
>
>


Re: puncutuate() never called

2016-10-11 Thread Michael Noll
Thanks for the follow-up and the bug report, David.

We're taking a look at that.



On Mon, Oct 10, 2016 at 4:36 PM, David Garcia  wrote:

> Thx for the responses.  I was able to identify a bug in how the times are
> obtained (offsets resolved as unknown cause the issue):
>
> “Actually, I think the bug is more subtle.  What happens when a consumed
> topic stops receiving messages?  The smallest timestamp will always be the
> static timestamp of this topic.
>
> -David
>
> On 10/7/16, 5:03 PM, "David Garcia"  wrote:
>
> Ok I found the bug.  Basically, if there is an empty topic (in the
> list of topics being consumed), any partition-group with partitions from
> the topic will always return -1 as the smallest timestamp (see
> PartitionGroup.java).
>
> To reproduce, simply start a kstreams consumer with one or more empty
> topics.  Punctuate will never be called.
>
> -David ”
>
> On 10/10/16, 1:55 AM, "Michael Noll"  wrote:
>
> > We have run the application (and have confirmed data is being
> received)
> for over 30 mins…with a 60-second timer.
>
> Ok, so your app does receive data but punctuate() still isn't being
> called.
> :-(
>
>
> > So, do we need to just rebuild our cluster with bigger machines?
>
> That's worth trying out.  See
> http://www.confluent.io/blog/design-and-deployment-
> considerations-for-deploying-apache-kafka-on-aws/
> for some EC2 instance types recommendations.
>
> But I'd also suggest to look into the logs of (1) your application,
> (2) the
> log files of the Kafka broker(s), and (3) the log files of ZooKeeper
> to see
> whether you see anything suspicious?
>
> Sorry for not being able to provide more actionable feedback at this
> point.  Typically we have seen such issues only (but not exclusively)
> in
> cases where there have been problems in the environment in which your
> application is running and/or the environment of the Kafka clusters.
> Unfortunately these environment problems are a bit tricky to debug
> remotely
> via the mailing list.
>
> -Michael
>
>
>
>
>
> On Fri, Oct 7, 2016 at 8:11 PM, David Garcia 
> wrote:
>
> > Yeah, this is possible.  We have run the application (and have
> confirmed
> > data is being received) for over 30 mins…with a 60-second timer.
> So, do we
> > need to just rebuild our cluster with bigger machines?
> >
> > -David
> >
> > On 10/7/16, 11:18 AM, "Michael Noll"  wrote:
> >
> > David,
> >
> > punctuate() is still data-driven at this point, even when you're
> using
> > the
> > WallClock timestamp extractor.
> >
> > To use an example: Imagine you have configured punctuate() to be
> run
> > every
> > 5 seconds.  If there's no data being received for a minute, then
> > punctuate
> > won't be called -- even though you probably would have expected
> this to
> > happen 12 times during this 1 minute.
> >
> > (FWIW, there's an ongoing discussion to improve punctuate(),
> part of
> > which
> > is motivated by the current behavior that arguably is not very
> > intuitive to
> > many users.)
> >
> > Could this be the problem you're seeing?  See also the related
> > discussion
> > at
> > http://stackoverflow.com/questions/39535201/kafka-problems-with-
> > timestampextractor
> > .
> >
> >
> >
> >
> >
> >
> > On Fri, Oct 7, 2016 at 6:07 PM, David Garcia <
> dav...@spiceworks.com>
> > wrote:
> >
> > > Hello, I’m sure this question has been asked many times.
> > > We have a test-cluster (confluent 3.0.0 release) of 3 aws
> > m4.xlarges.  We
> > > have an application that needs to use the punctuate() function
> to do
> > some
> > > work on a regular interval.  We are using the WallClock
> extractor.
> > > Unfortunately, the method is never called.  I have checked the
> > > filedescriptor setting for both the user as well as the
> process, and
> > > everything seems to be fine.  Is this a known bug, or is there
> > something
> > > obvious I’m missing?
> > >
> > > One note, the application used to work on this cluster, but
> now it’s
> > not
> > > working.  Not really sure what is going on?
> > >
> > > -David
> > >
> >
> >
> >
>
>
>


Re: Support for Kafka

2016-10-11 Thread Michael Noll
Regarding the JVM, we recommend running the latest version of JDK 1.8 with
the G1 garbage collector:
http://docs.confluent.io/current/kafka/deployment.html#jvm

And yes, Kafka does run on Ubuntu 16.04, too.

(Confluent provides .deb packages [1] for Apache Kafka if you are looking
for these to install Kafka on Ubuntu.)

Hope this helps,
Michael



[1] http://docs.confluent.io/current/installation.html




On Mon, Oct 10, 2016 at 1:38 PM, Jens Rantil  wrote:

> Hi Syed,
>
> Apache Kafka runs on a JVM. I think the question you should ask is -- which
> JVM does Apache Kafka require in production*? It doesn't really depend on
> anything on a specific Linux distribution.
>
> * ...and I don't have that answer ;-)
>
> Cheers,
> Jens
>
> On Wednesday, October 5, 2016, Syed Hussaini <
> syed.hussa...@theexchangelab.com> wrote:
>
> > Dear Kafka team.
> >
> > I am in the Implementation stage of Kafka cluster and looking to find
> > out does Apache Kafka supported for Ubuntu 16.04 LTS – Xenial.
> >
> >
> >
> > Would be great if you please let us know.
> >
> >
> >
> >
> >
> > [image: The Exchange Lab] 
> >
> > *Syed Hussaini*
> > Infrastructure Engineer
> >
> > 1 Neathouse Place
> > 5th Floor
> > London, England, SW1V 1LH
> >
> >
> > syed.hussa...@theexchangelab.com
> > 
> >
> > T 0203 701 3177
> >
> >
> > --
> >
> > Follow us on Twitter: @exchangelab  |
> Visit
> > us on LinkedIn: The Exchange Lab
> > 
> >
> >
> >
> >
> >
>
>
> --
> Jens Rantil
> Backend engineer
> Tink AB
>
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
>
> Facebook  Linkedin
>  companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%
> 2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
>  Twitter 
>


Re: Support for Kafka

2016-10-11 Thread Michael Noll
Actually, I wanted to include the following link for the JVM docs (the
information matches what's written in the earlier link I shared):
http://kafka.apache.org/documentation#java


On Tue, Oct 11, 2016 at 11:21 AM, Michael Noll  wrote:

> Regarding the JVM, we recommend running the latest version of JDK 1.8 with
> the G1 garbage collector:
> http://docs.confluent.io/current/kafka/deployment.html#jvm
>
> And yes, Kafka does run on Ubuntu 16.04, too.
>
> (Confluent provides .deb packages [1] for Apache Kafka if you are looking
> for these to install Kafka on Ubuntu.)
>
> Hope this helps,
> Michael
>
>
>
> [1] http://docs.confluent.io/current/installation.html
>
>
>
>
> On Mon, Oct 10, 2016 at 1:38 PM, Jens Rantil  wrote:
>
>> Hi Syed,
>>
>> Apache Kafka runs on a JVM. I think the question you should ask is --
>> which
>> JVM does Apache Kafka require in production*? It doesn't really depend on
>> anything on a specific Linux distribution.
>>
>> * ...and I don't have that answer ;-)
>>
>> Cheers,
>> Jens
>>
>> On Wednesday, October 5, 2016, Syed Hussaini <
>> syed.hussa...@theexchangelab.com> wrote:
>>
>> > Dear Kafka team.
>> >
>> > I am in the Implementation stage of Kafka cluster and looking to
>> find
>> > out does Apache Kafka supported for Ubuntu 16.04 LTS – Xenial.
>> >
>> >
>> >
>> > Would be great if you please let us know.
>> >
>> >
>> >
>> >
>> >
>> > [image: The Exchange Lab] <http://www.theexchangelab.com/>
>> >
>> > *Syed Hussaini*
>> > Infrastructure Engineer
>> >
>> > 1 Neathouse Place
>> > 5th Floor
>> > London, England, SW1V 1LH
>> >
>> >
>> > syed.hussa...@theexchangelab.com
>> > 
>> >
>> > T 0203 701 3177
>> >
>> >
>> > --
>> >
>> > Follow us on Twitter: @exchangelab <https://twitter.com/exchangelab> |
>> Visit
>> > us on LinkedIn: The Exchange Lab
>> > <https://www.linkedin.com/company/the-exchange-lab>
>> >
>> >
>> >
>> >
>> >
>>
>>
>> --
>> Jens Rantil
>> Backend engineer
>> Tink AB
>>
>> Email: jens.ran...@tink.se
>> Phone: +46 708 84 18 32
>> Web: www.tink.se
>>
>> Facebook <https://www.facebook.com/#!/tink.se> Linkedin
>> <http://www.linkedin.com/company/2735919?trk=vsrp_companies_
>> res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVS
>> RPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
>>  Twitter <https://twitter.com/tink>
>>
>
>
>
>


Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-12 Thread Michael Noll
t;streams.close();
> >>
> >>  }
> >>
> >> }
> >>
> >> }
> >>
> >>
> >>
> >>
> >> *SERDE*
> >>
> >>
> >> public class KafkaPayloadSerdes {
> >>
> >> static private class WrapperSerde implements
> >> Serde {
> >> final private Serializer serializer;
> >> final private Deserializer deserializer;
> >>
> >> public WrapperSerde(Serializer serializer,
> >> Deserializer deserializer) {
> >> this.serializer = serializer;
> >> this.deserializer = deserializer;
> >> }
> >>
> >> @Override
> >> public void configure(Map configs, boolean isKey) {
> >> serializer.configure(configs, isKey);
> >> deserializer.configure(configs, isKey);
> >> }
> >>
> >> @Override
> >> public void close() {
> >> serializer.close();
> >> deserializer.close();
> >> }
> >>
> >> @Override
> >> public Serializer serializer() {
> >> return serializer;
> >> }
> >>
> >> @Override
> >> public Deserializer deserializer() {
> >> return deserializer;
> >> }
> >> }
> >>
> >> static public final class KafkaPayloadSerde extends
> >> WrapperSerde {
> >> public KafkaPayloadSerde() {
> >> super(new KafkaPayloadSerializer(), new KafkaPayloadSerializer());
> >> }
> >> }
> >>
> >> /**
> >> * A serde for nullable < KafkaPayload> type.
> >> */
> >> static public Serde KafkaPayload() {
> >> return new KafkaPayloadSerde();
> >> }
> >>
> >> }
> >>
> >>
> >> *Serilizer/Deserializer*
> >>
> >>
> >>
> >> public class KafkaPayloadSerializer implements Serializer,
> >> Deserializer {
> >>
> >> private static final Logger log = org.apache.logging.log4j.LogManager
> >> .getLogger(MethodHandles.lookup().lookupClass().getCanonicalName());
> >>
> >> @Override
> >> public KafkaPayload deserialize(String topic, byte[] arg1) {
> >> ByteArrayInputStream bis = new ByteArrayInputStream(arg1);
> >> ObjectInput in = null;
> >> Object obj = null;
> >> try {
> >> in = new ObjectInputStream(bis);
> >> obj = in.readObject();
> >> } catch (IOException e) {
> >> log.error(e);
> >> } catch (ClassNotFoundException e) {
> >> log.error(e);
> >> } finally {
> >> try {
> >> bis.close();
> >> if (in != null) {
> >> in.close();
> >> }
> >> } catch (IOException ex) {
> >> log.error(ex);
> >> }
> >> }
> >> return (KafkaPayload) obj;
> >> }
> >>
> >> @Override
> >> public void close() {
> >> // TODO Auto-generated method stub
> >>
> >> }
> >>
> >> @Override
> >> public byte[] serialize(String topic, KafkaPayload kpayload) {
> >> ByteArrayOutputStream bos = new ByteArrayOutputStream();
> >> ObjectOutput out = null;
> >> byte[] payload = null;
> >> try {
> >> out = new ObjectOutputStream(bos);
> >> out.writeObject(kpayload);
> >> payload = bos.toByteArray();
> >>
> >> } catch (IOException e) {
> >> e.printStackTrace();
> >> } finally {
> >> try {
> >> if (out != null) {
> >> out.close();
> >> bos.close();
> >> }
> >> } catch (Exception ex) {
> >> log.error(ex);
> >> }
> >> }
> >> return payload;
> >> }
> >>
> >> @Override
> >> public void configure(Map configs, boolean isKey) {
> >> // TODO Auto-generated method stub
> >>
> >> }
> >>
> >> }
> >>
> >>
> >>
> >> On 11 October 2016 at 20:13, Michael Noll  wrote:
> >>
> >>> When I wrote:
> >>>
> >>> "If you haven't changed to default key and value serdes, then
> `to()`
> >>> will fail because [...]"
> >>>
> >>> it should have read:
> >>>
> >>> "If you haven't changed the default key and value serdes, then
> `to()`
> >>> will fail because [...]"
> >>>
> >>>
> >>>
> >>> On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll 
> >>> wrote:
> >>>
> >>> > Ratha,

Re: Understanding out of order message processing w/ Streaming

2016-10-13 Thread Michael Noll
> But if they arrive out of order, I have to detect / process that myself in
> the processor logic.

Yes -- if your processing logic depends on the specific ordering of
messages (which is the case for you), then you must manually implement this
ordering-specific logic at the moment.

Other use cases may not need to do that and "just work" even with
out-of-order data.  If, for example, you are counting objects or are
computing the sum of numbers, then you do not need to anything special.





On Wed, Oct 12, 2016 at 10:22 PM, Ali Akhtar  wrote:

> Thanks Matthias.
>
> So, if I'm understanding this right, Kafka will not discard which messages
> which arrive out of order.
>
> What it will do is show messages in the order in which they arrive.
>
> But if they arrive out of order, I have to detect / process that myself in
> the processor logic.
>
> Is that correct?
>
> Thanks.
>
> On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax 
> wrote:
>
> > -BEGIN PGP SIGNED MESSAGE-
> > Hash: SHA512
> >
> > Last question first: A KTable is basically in finite window over the
> > whole stream providing a single result (that gets updated when new
> > data arrives). If you use windows, you cut the overall stream into
> > finite subsets and get a result per window. Thus, I guess you do not
> > need windows (if I understood you use case correctly).
> >
> > However, current state of Kafka Streams DSL, you will not be able to
> > use KTable (directly -- see suggestion to fix this below) because is
> > does (currently) not allow to access the timestamp of the current
> > record (thus, you can not know if a record is late or not). You will
> > need to use Processor API which allows you to access the current
> > records timestamp via the Context object given in init()
> >
> > Your reasoning about partitions and Streams instances is correct.
> > However, the following two are not
> >
> > > - Because I'm using a KTable, the timestamp of the messages is
> > > extracted, and I'm not shown the older bid because I've already
> > > processed the later bid. The older bid is ignored.
> >
> > and
> >
> > > - Because of this, the replica already knows which timestamps it
> > > has processed, and is able to ignore the older messages.
> >
> > Late arriving records are not dropped but processes regularly. Thus,
> > your KTable aggregate function will be called for the late arriving
> > record, too (but as described about, you have currently no way to know
> > it is a later record).
> >
> >
> > Last but not least, you last statement is a valid concern:
> >
> > > Also, what will happen if bid 2 arrived and got processed, and then
> > > the particular replica crashed, and was restarted. The restarted
> > > replica won't have any memory of which timestamps it has previously
> > > processed.
> > >
> > > So if bid 2 got processed, replica crashed and restarted, and then
> > > bid 1 arrived, what would happen in that case?
> >
> > In order to make this work, you would need to store the timestamp in
> > you store next to the actual data. Thus, you can compare the timestamp
> > of the latest result (safely stored in operator state) with the
> > timestamp of the current record.
> >
> > Does this makes sense?
> >
> > To fix you issue, you could add a .transformValue() before you KTable,
> > which allows you to access the timestamp of a record. If you add this
> > timestamp to you value and pass it to KTable afterwards, you can
> > access it and it gets also store reliably.
> >
> >  => transformValue =>  > timestamp} => aggregate
> >
> > Hope this helps.
> >
> > - -Matthias
> >
> >
> > On 10/11/16 9:12 PM, Ali Akhtar wrote:
> > > P.S, does my scenario require using windows, or can it be achieved
> > > using just KTable?
> > >
> > > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar 
> > > wrote:
> > >
> > >> Heya,
> > >>
> > >> Say I'm building a live auction site, with different products.
> > >> Different users will bid on different products. And each time
> > >> they do, I want to update the product's price, so it should
> > >> always have the latest price in place.
> > >>
> > >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
> > >> the same product 100 ms later.
> > >>
> > >> The second bid arrives first and the price is updated to $5. Then
> > >> the first bid arrives. I want the price to not be updated in this
> > >> case, as this bid is older than the one I've already processed.
> > >>
> > >> Here's my understanding of how I can achieve this with Kafka
> > >> Streaming - is my understanding correct?
> > >>
> > >> - I have a topic for receiving bids. The topic has N partitions,
> > >> and I have N replicas of my application which hooks up w/ Kafka
> > >> Streaming, up and running.
> > >>
> > >> - I assume each replica of my app will listen to a different
> > >> partition of the topic.
> > >>
> > >> - A user makes a bid on product A.
> > >>
> > >> - This is pushed to the topic with the key bid_a
> > >>
> > >> - Another user makes a bid

Re: How to detect an old consumer

2016-10-17 Thread Michael Noll
Old consumers use ZK to store their offsets.  Could you leverage the
timetamps of the corresponding znodes [1] for this?


[1]
https://zookeeper.apache.org/doc/r3.4.5/zookeeperProgrammers.html#sc_zkDataModel_znodes


On Mon, Oct 17, 2016 at 4:45 PM, Fernando Bugni 
wrote:

> Hello,
>
> I want to detect old consumers in my kafka servers. Is there any tool to
> see the last date when they connected? I tried in kafka-manager but I only
> have the consumer group and its offset, which is not useful to detect that
> problem...
>
> Thanks in advance!
> --
> Fernando Bugni
>


Re: kafka streams metadata request fails on 0.10.0.1 broker/topic from 0.10.1.0 client

2016-10-19 Thread Michael Noll
Apps built with Kafka Streams 0.10.1 only work against Kafka clusters
running 0.10.1+.  This explains your error message above.

Unfortunately, Kafka's current upgrade story means you need to upgrade your
cluster in this situation.  Moving forward, we're planning to improve the
upgrade/compatibility story of Kafka so that you could, for example, run a
newer version of Kafka Streams (or any other Kafka client) against an older
version of Kafka.



On Tue, Oct 18, 2016 at 10:56 PM, saiprasad mishra <
saiprasadmis...@gmail.com> wrote:

> Hi All
>
> Was testing with 0.10.1.0 rc3 build for my new streams app
>
> Seeing issues starting my kafk streams app( 0.10.1.0) on the old version
> broker 0.10.0.1. I dont know if it is supposed to work as is. Will upgrade
> the broker to same version and see whether it goes away
>
> client side issues
>
> ==
>
> java.io.EOFException
>
> at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(
> NetworkReceive.java:83)
> ~[kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.common.network.NetworkReceive.
> readFrom(NetworkReceive.java:71)
> ~[kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.common.network.KafkaChannel.receive(
> KafkaChannel.java:154)
> ~[kafka-clients-0.10.1.0.jar!/:?]
>
> at org.apache.kafka.common.network.KafkaChannel.read(
> KafkaChannel.java:135)
> ~[kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.
> java:343)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:232)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:209)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> awaitMetadataUpdate(ConsumerNetworkClient.java:148)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> awaitMetadataUpdate(ConsumerNetworkClient.java:136)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureCoordinatorReady(AbstractCoordinator.java:197)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:248)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1013)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:979)
> [kafka-clients-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:407)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> [kafka-streams-0.10.1.0.jar!/:?]
>
>
>
> On the broker side the following message appears
>
> =
>
> kafka.network.InvalidRequestException: Error getting request for apiKey: 3
> and apiVersion: 2
>
> at
> kafka.network.RequestChannel$Request.liftedTree2$1(
> RequestChannel.scala:95)
>
> at kafka.network.RequestChannel$Request.(RequestChannel.scala:87)
>
> at
> kafka.network.Processor$$anonfun$processCompletedReceives$1.
> apply(SocketServer.scala:488)
>
> at
> kafka.network.Processor$$anonfun$processCompletedReceives$1.
> apply(SocketServer.scala:483)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
> at kafka.network.Processor.processCompletedReceives(
> SocketServer.scala:483)
>
> at kafka.network.Processor.run(SocketServer.scala:413)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.IllegalArgumentException: Invalid version for API key
> 3: 2
>
> at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(
> ProtoUtils.java:31)
>
> at
> org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:
> 44)
>
> at
> org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:
> 60)
>
> at
> org.apache.kafka.common.requests.MetadataRequest.
> parse(MetadataRequest.java:96)
>
> at
> org.apache.kafka.common.requests.AbstractRequest.
> getRequest(AbstractRequest.java:48)
>
> at
> kafka.network.RequestChannel$Request.liftedTree2$1(
> RequestChannel.scala:92)
>
> Regards
>
> Sai
>


Re: kafka streams metadata request fails on 0.10.0.1 broker/topic from 0.10.1.0 client

2016-10-20 Thread Michael Noll
Absolutely, Sai.  That's exactly why we want to improve the
upgrade/compatibility story.


On Thu, Oct 20, 2016 at 12:28 AM, saiprasad mishra <
saiprasadmis...@gmail.com> wrote:

> Thanks Michael
> Hopefully the upgrade story evolves as 0.10.1+ advances to maturity
>
> Just my 2 cents
>
> Decoupling the kafka streams from the core kafka changes will help so that
> the broker can be upgraded without notice and streaming apps can evolve to
> newer streaming features on their own pace
>
> Regards
> Sai
>
>
> On Wednesday, October 19, 2016, Michael Noll  wrote:
>
> > Apps built with Kafka Streams 0.10.1 only work against Kafka clusters
> > running 0.10.1+.  This explains your error message above.
> >
> > Unfortunately, Kafka's current upgrade story means you need to upgrade
> your
> > cluster in this situation.  Moving forward, we're planning to improve the
> > upgrade/compatibility story of Kafka so that you could, for example, run
> a
> > newer version of Kafka Streams (or any other Kafka client) against an
> older
> > version of Kafka.
> >
> >
> >
> > On Tue, Oct 18, 2016 at 10:56 PM, saiprasad mishra <
> > saiprasadmis...@gmail.com> wrote:
> >
> > > Hi All
> > >
> > > Was testing with 0.10.1.0 rc3 build for my new streams app
> > >
> > > Seeing issues starting my kafk streams app( 0.10.1.0) on the old
> version
> > > broker 0.10.0.1. I dont know if it is supposed to work as is. Will
> > upgrade
> > > the broker to same version and see whether it goes away
> > >
> > > client side issues
> > >
> > > ==
> > >
> > > java.io.EOFException
> > >
> > > at
> > > org.apache.kafka.common.network.NetworkReceive.
> readFromReadableChannel(
> > > NetworkReceive.java:83)
> > > ~[kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.common.network.NetworkReceive.
> > > readFrom(NetworkReceive.java:71)
> > > ~[kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.common.network.KafkaChannel.receive(
> > > KafkaChannel.java:154)
> > > ~[kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at org.apache.kafka.common.network.KafkaChannel.read(
> > > KafkaChannel.java:135)
> > > ~[kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.
> > > java:343)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > > ConsumerNetworkClient.java:232)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > > ConsumerNetworkClient.java:209)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> > > awaitMetadataUpdate(ConsumerNetworkClient.java:148)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> > > awaitMetadataUpdate(ConsumerNetworkClient.java:136)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > ensureCoordinatorReady(AbstractCoordinator.java:197)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> > > ConsumerCoordinator.java:248)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > pollOnce(KafkaConsumer.java:1013)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > KafkaConsumer.java:979)
> > > [kafka-clients-0.10.1.0.jar!/:?]
> > >
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > Stre

Re: How to block tests of Kafka Streams until messages processed?

2016-10-20 Thread Michael Noll
Ali,

my main feedback is similar to what Eno and Dave have already said.  In
your situation, options like these are what you'd currently need to do
since you are writing directly from your Kafka Stream app to Cassandra,
rather than writing from your app to Kafka and then using Kafka Connect to
ingest into Cassandra.



On Wed, Oct 19, 2016 at 11:03 PM, Ali Akhtar  wrote:

> Yeah, I did think to use that method, but as you said, it writes to a dummy
> output topic, which means I'd have to put in magic code just for the tests
> to pass (the actual code writes to cassandra and not to a dummy topic).
>
>
> On Thu, Oct 20, 2016 at 2:00 AM, Tauzell, Dave <
> dave.tauz...@surescripts.com
> > wrote:
>
> > For similar queue related tests we put the check in a loop.  Check every
> > second until either the result is found or a timeout  happens.
> >
> > -Dave
> >
> > -Original Message-
> > From: Ali Akhtar [mailto:ali.rac...@gmail.com]
> > Sent: Wednesday, October 19, 2016 3:38 PM
> > To: users@kafka.apache.org
> > Subject: How to block tests of Kafka Streams until messages processed?
> >
> > I'm using Kafka Streams, and I'm attempting to write integration tests
> for
> > a stream processor.
> >
> > The processor listens to a topic, processes incoming messages, and writes
> > some data to Cassandra tables.
> >
> > I'm attempting to write a test which produces some test data, and then
> > checks whether or not the expected data was written to Cassandra.
> >
> > It looks like this:
> >
> > - Step 1: Produce data in the test
> > - Step 2: Kafka stream gets triggered
> > - Step 3: Test checks whether cassandra got populated
> >
> > The problem is, Step 3 is occurring before Step 2, and as a result, the
> > test fails as it doesn't find the data in the table.
> >
> > I've resolved this by adding a Thread.sleep(2000) call after Step 1,
> which
> > ensures that Step 2 gets triggered before Step 3.
> >
> > However, I'm wondering if there's a more reliable way of blocking the
> test
> > until Kafka stream processor gets triggered?
> >
> > At the moment, I'm using 1 thread for the processor. If I increase that
> to
> > 2 threads, will that achieve what I want?
> > This e-mail and any files transmitted with it are confidential, may
> > contain sensitive information, and are intended solely for the use of the
> > individual or entity to whom they are addressed. If you have received
> this
> > e-mail in error, please notify the sender by reply e-mail immediately and
> > destroy all copies of the e-mail and any attachments.
> >
>


Re: How to block tests of Kafka Streams until messages processed?

2016-10-20 Thread Michael Noll
> Would there be any advantage to using the kafka connect method?

The advantage is to decouple the data processing (which you do in your app)
from the responsibility of making the processing results available to one
or more downstream systems, like Cassandra.

For example, what will your application (that uses Kafka Streams) do if
Cassandra is unavailable or slow?  Will you retry, and if so -- for how
long?  Retrying writes to external systems means that the time spent doing
this will not be spent on processing the next input records, thus
increasing the latency of your processing topology.  At this point you have
coupled your app to the availability of both Kafka and Cassandra.
Upgrading or doing maintenance on Cassandra will now also mean there's
potential impact on your app.



On Thu, Oct 20, 2016 at 9:39 AM, Ali Akhtar  wrote:

> Michael,
>
> Would there be any advantage to using the kafka connect method? Seems like
> it'd just add an extra step of overhead?
>
> On Thu, Oct 20, 2016 at 12:35 PM, Michael Noll 
> wrote:
>
> > Ali,
> >
> > my main feedback is similar to what Eno and Dave have already said.  In
> > your situation, options like these are what you'd currently need to do
> > since you are writing directly from your Kafka Stream app to Cassandra,
> > rather than writing from your app to Kafka and then using Kafka Connect
> to
> > ingest into Cassandra.
> >
> >
> >
> > On Wed, Oct 19, 2016 at 11:03 PM, Ali Akhtar 
> wrote:
> >
> > > Yeah, I did think to use that method, but as you said, it writes to a
> > dummy
> > > output topic, which means I'd have to put in magic code just for the
> > tests
> > > to pass (the actual code writes to cassandra and not to a dummy topic).
> > >
> > >
> > > On Thu, Oct 20, 2016 at 2:00 AM, Tauzell, Dave <
> > > dave.tauz...@surescripts.com
> > > > wrote:
> > >
> > > > For similar queue related tests we put the check in a loop.  Check
> > every
> > > > second until either the result is found or a timeout  happens.
> > > >
> > > > -Dave
> > > >
> > > > -Original Message-
> > > > From: Ali Akhtar [mailto:ali.rac...@gmail.com]
> > > > Sent: Wednesday, October 19, 2016 3:38 PM
> > > > To: users@kafka.apache.org
> > > > Subject: How to block tests of Kafka Streams until messages
> processed?
> > > >
> > > > I'm using Kafka Streams, and I'm attempting to write integration
> tests
> > > for
> > > > a stream processor.
> > > >
> > > > The processor listens to a topic, processes incoming messages, and
> > writes
> > > > some data to Cassandra tables.
> > > >
> > > > I'm attempting to write a test which produces some test data, and
> then
> > > > checks whether or not the expected data was written to Cassandra.
> > > >
> > > > It looks like this:
> > > >
> > > > - Step 1: Produce data in the test
> > > > - Step 2: Kafka stream gets triggered
> > > > - Step 3: Test checks whether cassandra got populated
> > > >
> > > > The problem is, Step 3 is occurring before Step 2, and as a result,
> the
> > > > test fails as it doesn't find the data in the table.
> > > >
> > > > I've resolved this by adding a Thread.sleep(2000) call after Step 1,
> > > which
> > > > ensures that Step 2 gets triggered before Step 3.
> > > >
> > > > However, I'm wondering if there's a more reliable way of blocking the
> > > test
> > > > until Kafka stream processor gets triggered?
> > > >
> > > > At the moment, I'm using 1 thread for the processor. If I increase
> that
> > > to
> > > > 2 threads, will that achieve what I want?
> > > > This e-mail and any files transmitted with it are confidential, may
> > > > contain sensitive information, and are intended solely for the use of
> > the
> > > > individual or entity to whom they are addressed. If you have received
> > > this
> > > > e-mail in error, please notify the sender by reply e-mail immediately
> > and
> > > > destroy all copies of the e-mail and any attachments.
> > > >
> > >
> >
>


Re: Dismissing late messages in Kafka Streams

2016-10-20 Thread Michael Noll
Nicolas,

> I set the maintain duration of the window to 30 days.
> If it consumes a message older than 30 days, then a new aggregate is created
for this old window.

I assume you mean:  If a message should have been included in the original
("old") window but that message happens to arrive late (after the
"original" 30 days), then a new aggregate is created for this old window?
I wanted to ask this first because answering your questions depends on what
exactly you mean here.


> The problem is that this old windowed aggregate is of course incomplete
and
> will overwrite a record in the final database.

Not sure I understand -- why would the old windowed aggregate be
incomplete?  Could you explain a bit more what you mean?


> By the way, is there any article about replaying old messages. Some tips
> and tricks, like "you'd better do that in another deployment of your
> topology", and/or "you'd better use topics dedicated to repair".

I am not aware of a deep dive article or docs on that just yet.  There's a
first blog post [1] about Kakfa's new Application Reset Tool that goes into
this direction, but this is only a first step into the direction of
replaying/reprocessing of old messages.  Do you have specific questions
here that we can help you with in the meantime?

[1]
http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application






On Thu, Oct 20, 2016 at 9:40 AM, Nicolas Fouché  wrote:

> Hi,
>
> I aggregate some data with `aggregateByKey` and a `TimeWindows`.
>
> I set the maintain duration of the window to 30 days.
> If it consumes a message older than 30 days, then a new aggregate is
> created for this old window.
> The problem is that this old windowed aggregate is of course incomplete and
> will overwrite a record in the final database.
>
> So is there a way to dismiss these old messages ?
>
> I only see the point of accepting old messages when the topology is
> launched in "repair" mode.
> By the way, is there any article about replaying old messages. Some tips
> and tricks, like "you'd better do that in another deployment of your
> topology", and/or "you'd better use topics dedicated to repair".
>
> Thanks
> Nicolas
>


Re: Kafka Streaming

2016-10-20 Thread Michael Noll
I suspect you are running Kafka 0.10.0.x on Windows?  If so, this is a
known issue that is fixed in Kafka 0.10.1 that was just released today.

Also: which examples are you referring to?  And, to confirm: which git
branch / Kafka version / OS in case my guess above was wrong.


On Thursday, October 20, 2016, Mohit Anchlia  wrote:

> I am trying to run the examples from git. While running the wordcount
> example I see this error:
>
> Caused by: *java.lang.RuntimeException*: librocksdbjni-win64.dll was not
> found inside JAR.
>
>
> Am I expected to include this jar locally?
>


-- 
*Michael G. Noll*
Product Manager | Confluent
+1 650 453 5860 | @miguno 
Follow us: Twitter  | Blog



Re: [VOTE] KIP-657: Add Customized Kafka Streams Logo

2020-08-19 Thread Michael Noll
Hi all!

Great to see we are in the process of creating a cool logo for Kafka
Streams.  First, I apologize for sharing feedback so late -- I just learned
about it today. :-)

Here's my *personal, subjective* opinion on the currently two logo
candidates for Kafka Streams.

TL;DR: Sorry, but I really don't like either of the proposed "otter" logos.
Let me try to explain why.

   - The choice to use an animal, regardless of which specific animal,
   seems random and doesn't fit Kafka. (What's the purpose? To show that
   KStreams is 'cute'?) In comparison, the O’Reilly books always have an
   animal cover, that’s their style, and it is very recognizable.  Kafka
   however has its own, different style.  The Kafka logo has clear, simple
   lines to achieve an abstract and ‘techy’ look, which also alludes nicely to
   its architectural simplicity. Its logo is also a smart play on the
   Kafka-identifying letter “K” and alluding to it being a distributed system
   (the circles and links that make the K).
   - The proposed logos, however, make it appear as if KStreams is a
   third-party technology that was bolted onto Kafka. They certainly, for me,
   do not convey the message "Kafka Streams is an official part of Apache
   Kafka".
   - I, too, don't like the way the main Kafka logo is obscured (a concern
   already voiced in this thread). Also, the Kafka 'logo' embedded in the
   proposed KStreams logos is not the original one.
   - None of the proposed KStreams logos visually match the Kafka logo.
   They have a totally different style, font, line art, and color scheme.
   - Execution-wise, the main Kafka logo looks great at all sizes.  The
   style of the otter logos, in comparison, becomes undecipherable at smaller
   sizes.

What I would suggest is to first agree on what the KStreams logo is
supposed to convey to the reader.  Here's my personal take:

Objective 1: First and foremost, the KStreams logo should make it clear and
obvious that KStreams is an official and integral part of Apache Kafka.
This applies to both what is depicted and how it is depicted (like font,
line art, colors).
Objective 2: The logo should allude to the role of KStreams in the Kafka
project, which is the processing part.  That is, "doing something useful to
the data in Kafka".

The "circling arrow" aspect of the current otter logos does allude to
"continuous processing", which is going in the direction of (2), but the
logos do not meet (1) in my opinion.

-Michael




On Tue, Aug 18, 2020 at 10:34 PM Matthias J. Sax  wrote:

> Adding the user mailing list -- I think we should accepts votes on both
> lists for this special case, as it's not a technical decision.
>
> @Boyang: as mentioned by Bruno, can we maybe add black/white options for
> both proposals, too?
>
> I also agree that Design B is not ideal with regard to the Kafka logo.
> Would it be possible to change Design B accordingly?
>
> I am not a font expert, but the fonts in both design are different and I
> am wondering if there is an official Apache Kafka font that we should
> reuse to make sure that the logos align -- I would expect that both
> logos (including "Apache Kafka" and "Kafka Streams" names) will be used
> next to each other and it would look awkward if the font differs.
>
>
> -Matthias
>
> On 8/18/20 11:28 AM, Navinder Brar wrote:
> > Hi,
> > Thanks for the KIP, really like the idea. I am +1(non-binding) on A
> mainly because I felt like you have to tilt your head to realize the
> otter's head in B.
> > Regards,Navinder
> >
> > On Tuesday, 18 August, 2020, 11:44:20 pm IST, Guozhang Wang <
> wangg...@gmail.com> wrote:
> >
> >  I'm leaning towards design B primarily because it reminds me of the
> Firefox
> > logo which I like a lot. But I also share Adam's concern that it should
> > better not obscure the Kafka logo --- so if we can tweak a bit to fix it
> my
> > vote goes to B, otherwise A :)
> >
> >
> > Guozhang
> >
> > On Tue, Aug 18, 2020 at 9:48 AM Bruno Cadonna 
> wrote:
> >
> >> Thanks for the KIP!
> >>
> >> I am +1 (non-binding) for A.
> >>
> >> I would also like to hear opinions whether the logo should be colorized
> >> or just black and white.
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >> On 15.08.20 16:05, Adam Bellemare wrote:
> >>> I prefer Design B, but given that I missed the discussion thread, I
> think
> >>> it would be better without the Otter obscuring any part of the Kafka
> >> logo.
> >>>
> >>> On Thu, Aug 13, 2020 at 6:31 PM Boyang Chen <
> reluctanthero...@gmail.com>
> >>> wrote:
> >>>
>  Hello everyone,
> 
>  I would like to start a vote thread for KIP-657:
> 
> 
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-657%3A+Add+Customized+Kafka+Streams+Logo
> 
>  This KIP is aiming to add a new logo for the Kafka Streams library.
> And
> >> we
>  prepared two candidates with a cute otter. You could look up the KIP
> to
>  find those logos.
> 
> 
>  Please post your vote against these two

Re: [VOTE] KIP-657: Add Customized Kafka Streams Logo

2020-08-19 Thread Michael Noll
For what it's worth, here is an example sketch that I came up with. Point
is to show an alternative direction for the KStreams logo.

https://ibb.co/bmZxDCg

Thinking process:

   - It shows much more clearly (I hope) that KStreams is an official part
   of Kafka.
   - The Kafka logo is still front and center, and KStreams orbits around
   it like electrons around the Kafka core/nucleus. That’s important because
   we want users to adopt all of Kafka, not just bits and pieces.
   - It uses and builds upon the same ‘simple is beautiful’ style of the
   original Kafka logo. That also has the nice side-effect that it alludes to
   Kafka’s and KStreams’ architectural simplicity.
   - It picks up the good idea in the original logo candidates to convey
   the movement and flow of stream processing.
   - Execution-wise, and like the main Kafka logo, this logo candidate
   works well in smaller size, too, because of its simple and clear lines.
   (Logo types like the otter ones tend to become undecipherable at smaller
   sizes.)
   - It uses the same color scheme of the revamped AK website for brand
   consistency.

I am sure we can come up with even better logo candidates.  But the
suggestion above is, in my book, certainly a better option than the otters.

-Michael



On Wed, Aug 19, 2020 at 11:09 PM Boyang Chen 
wrote:

> Hey Ben,
>
> that otter was supposed to be a river-otter to connect to "streams". And of
> course, it's cute :)
>
> On Wed, Aug 19, 2020 at 12:41 PM Philip Schmitt <
> philip.schm...@outlook.com>
> wrote:
>
> > Hi,
> >
> > I’m with Robin and Michael here.
> >
> > What this decision needs is a good design brief.
> > This article seems decent:
> >
> https://yourcreativejunkie.com/logo-design-brief-the-ultimate-guide-for-designers/
> >
> > Robin is right about the usage requirements.
> > It goes a bit beyond resolution. How does the logo work when it’s on a
> > sticker on someone’s laptop? Might there be some cases, where you want to
> > print it in black and white?
> > And how would it look if you put the Kafka, ksqlDB, and Streams stickers
> > on a laptop?
> >
> > Of the two, I prefer the first option.
> > The brown on black is a bit subdued – it might not work well on a t-shirt
> > or a laptop sticker. Maybe that could be improved by using a bolder
> color,
> > but once it gets smaller or lower-resolution, it may not work any longer.
> >
> >
> > Regards,
> > Philip
> >
> >
> > P.S.:
> > Another article about what makes a good logo:
> > https://vanschneider.com/what-makes-a-good-logo
> >
> > P.P.S.:
> >
> > If I were to pick a logo for Streams, I’d choose something that fits well
> > with Kafka and ksqlDB.
> >
> > ksqlDB has the rocket.
> > I can’t remember (or find) the reasoning behind the Kafka logo (aside
> from
> > representing a K). Was there something about planets orbiting the sun? Or
> > was it the atom?
> >
> > So I might stick with a space/sience metaphor.
> > Could Streams be a comet? UFO? Star? Eclipse? ...
> > Maybe a satellite logo for Connect.
> >
> > Space inspiration: https://thenounproject.com/term/space/
> >
> >
> >
> >
> > 
> > From: Robin Moffatt 
> > Sent: Wednesday, August 19, 2020 6:24 PM
> > To: users@kafka.apache.org 
> > Cc: d...@kafka.apache.org 
> > Subject: Re: [VOTE] KIP-657: Add Customized Kafka Streams Logo
> >
> > I echo what Michael says here.
> >
> > Another consideration is that logos are often shrunk (when used on
> slides)
> > and need to work at lower resolution (think: printing swag, stitching
> > socks, etc) and so whatever logo we come up with needs to not be too
> fiddly
> > in the level of detail - something that I think both the current proposed
> > options will fall foul of IMHO.
> >
> >
> > On Wed, 19 Aug 2020 at 15:33, Michael Noll  wrote:
> >
> > > Hi all!
> > >
> > > Great to see we are in the process of creating a cool logo for Kafka
> > > Streams.  First, I apologize for sharing feedback so late -- I just
> > learned
> > > about it today. :-)
> > >
> > > Here's my *personal, subjective* opinion on the currently two logo
> > > candidates for Kafka Streams.
> > >
> > > TL;DR: Sorry, but I really don't like either of the proposed "otter"
> > logos.
> > > Let me try to explain why.
> > >
> > >- The choice to use an animal, regardless of which specific animal,
> > >seems random and doesn

Re: Is it possible to resubcribe KafkaStreams in runtime to different set of topics?

2016-11-09 Thread Michael Noll
This is not possible at the moment.  However, depending on your use case,
you might be able to leverage regex topic subscriptions (think: "b*" to
read from all topics starting with letter `b`).

On Wed, Nov 9, 2016 at 10:56 AM, Timur Yusupov  wrote:

> Hello,
>
> In our system it is possible to add/remove topics in runtime and we are
> trying to use KafkaStreams for incoming messages processing.
>
> It is possible to resubscribe KafkaStreams instance to updated set of
> topics?
>
> For now I see the only way is to shutdown exiting KafkaStreams instance and
> start a new one, but sometimes that takes up to 30-40 seconds...
>
> Thanks, Timur.
>


Re: Is it possible to resubcribe KafkaStreams in runtime to different set of topics?

2016-11-09 Thread Michael Noll
I am not aware of any short-term plans to support that, but perhaps others
in the community / mailing list are.

On Wed, Nov 9, 2016 at 11:15 AM, Timur Yusupov  wrote:

> Are there any nearest plans to support that?
>
> On Wed, Nov 9, 2016 at 1:11 PM, Michael Noll  wrote:
>
> > This is not possible at the moment.  However, depending on your use case,
> > you might be able to leverage regex topic subscriptions (think: "b*" to
> > read from all topics starting with letter `b`).
> >
> > On Wed, Nov 9, 2016 at 10:56 AM, Timur Yusupov 
> > wrote:
> >
> > > Hello,
> > >
> > > In our system it is possible to add/remove topics in runtime and we are
> > > trying to use KafkaStreams for incoming messages processing.
> > >
> > > It is possible to resubscribe KafkaStreams instance to updated set of
> > > topics?
> > >
> > > For now I see the only way is to shutdown exiting KafkaStreams instance
> > and
> > > start a new one, but sometimes that takes up to 30-40 seconds...
> > >
> > > Thanks, Timur.
> > >
> >
>


Re: Process KTable on Predicate

2016-11-15 Thread Michael Noll
Nick,

if I understand you correctly you can already do this today:

Think: KTable.toStream().filter().foreach() (or just
KTable.filter().foreach(), depending on what you are aiming to do)

Would that work for you?



On Sun, Nov 13, 2016 at 12:12 AM, Nick DeCoursin 
wrote:

> Feature proposal:
>
> KTable when(predicate);
>
> I have a KTable, but I'd only like to trigger a stream processor on certain
> conditions. I'd like to do something like:
>
> myKtable.when(
>  (key, value) -> some predicate
> );
>
> The result is just the same KTable. The predicate function is called on any
> new event.
>
> Thank you,
> Nick DeCoursin
>


Re: Kafka Streams internal topic naming

2016-11-16 Thread Michael Noll
Srikanth,

no, there's isn't any API to control the naming of internal topics.

Is the reason you're asking for such functionality only/mostly about
multi-tenancy issues (as you mentioned in your first message)?

-Michael



On Wed, Nov 16, 2016 at 8:20 PM, Srikanth  wrote:

> Hello,
>
> Does kafka stream provide an API to control how internal topics are named?
> Right now it uses appId, operator name, etc.
> In a shared kafka cluster its common to have naming convention that may
> require some prefix/suffix.
>
> Srikanth
>


Re: Kafka Streams internal topic naming

2016-11-18 Thread Michael Noll
Srikanth,

as Matthias said, you can achieve some namespacing effects through the use
of (your own in-house) conventions of defining `application.id` across
teams.  The id is used as the prefix for topics, see
http://docs.confluent.io/current/streams/developer-guide.html#required-configuration-parameters
for further details.

Best,
Michael



On Thu, Nov 17, 2016 at 6:30 PM, Matthias J. Sax 
wrote:

> The only way to influence the naming is via application.id which you can
> set as you wish. Hope this is good enough to meet your naming conventions.
>
> As Michael mentioned, there is no way to manually specify internal topic
> names right now.
>
> -Matthias
>
> On 11/17/16 8:45 AM, Srikanth wrote:
> > That is right Michael. Most teams that use kafka library can adhere to
> > certain naming convention.
> > Using streams API will break that.
> >
> > Srikanth
> >
> > On Wed, Nov 16, 2016 at 2:32 PM, Michael Noll 
> wrote:
> >
> >> Srikanth,
> >>
> >> no, there's isn't any API to control the naming of internal topics.
> >>
> >> Is the reason you're asking for such functionality only/mostly about
> >> multi-tenancy issues (as you mentioned in your first message)?
> >>
> >> -Michael
> >>
> >>
> >>
> >> On Wed, Nov 16, 2016 at 8:20 PM, Srikanth 
> wrote:
> >>
> >>> Hello,
> >>>
> >>> Does kafka stream provide an API to control how internal topics are
> >> named?
> >>> Right now it uses appId, operator name, etc.
> >>> In a shared kafka cluster its common to have naming convention that may
> >>> require some prefix/suffix.
> >>>
> >>> Srikanth
> >>>
> >>
> >
>
>


-- 
*Michael G. Noll*
Product Manager | Confluent
+1 650 453 5860 | @miguno <https://twitter.com/miguno>
Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
<http://www.confluent.io/blog>


Re: Kafka windowed table not aggregating correctly

2016-11-21 Thread Michael Noll
On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal  wrote:

> I am using kafka_2.10-0.10.0.1.
> Say I am having a window of 60 minutes advanced by 15 minutes.
> If the stream app using timestamp extractor puts the message in one or more
> bucket(s), it will get aggregated in those buckets.
> I assume this statement is correct.
>

Yes.



>
> Also say when I restart the streams application then bucket aggregation
> will resume from last point of halt.
> I hope this is also correct.
>

Yes.



> What I noticed that once a message is placed in one bucket, that bucket was
> not getting new messages.
>

This should not happen...


> However when I ran a small test case replicating that, it is working
> properly. There maybe some issues in application reset.
>

...and apparently it works (as expected) in your small test case.

Do you have any further information that you could share with us so we can
help you better?  What's the difference, for example, between your "normal"
use case and the small test case you have been referring to?


-Michael


Re: Kafka Streaming message loss

2016-11-21 Thread Michael Noll
Please don't take this comment the wrong way, but have you double-checked
whether your counting code is working correctly?  (I'm not implying this
could be the only reason for what you're observing.)

-Michael


On Fri, Nov 18, 2016 at 4:52 PM, Eno Thereska 
wrote:

> Hi Ryan,
>
> Perhaps you could share some of your code so we can have a look? One thing
> I'd check is if you are using compacted Kafka topics. If so, and if you
> have non-unique keys, compaction happens automatically and you might only
> see the latest value for a key.
>
> Thanks
> Eno
> > On 18 Nov 2016, at 13:49, Ryan Slade  wrote:
> >
> > Hi
> >
> > I'm trialling Kafka Streaming for a large stream processing job, however
> > I'm seeing message loss even in the simplest scenarios.
> >
> > I've tried to boil it down to the simplest scenario where I see loss
> which
> > is the following:
> > 1. Ingest messages from an input stream (String, String)
> > 2. Decode message into a type from JSON
> > 3. If succesful, send to a second stream and update an atomic counter.
> > (String, CustomType)
> > 4. A foreach on the second stream that updates an AtomicCounter each
> time a
> > message arrives.
> >
> > I would expect that since we have at least once guarantees that the
> second
> > stream would see at least as many messages as were sent to it from the
> > first, however I consistently see message loss.
> >
> > I've tested multiple times sending around 200k messages. I don't see
> losses
> > every time, maybe around 1 in 5 runs with the same data. The losses are
> > small, around 100 messages, but I would expect none.
> >
> > I'm running version 0.10.1.0 with Zookeeper, Kafka and the Stream
> Consumer
> > all running on the same machine in order to mitigate packet loss.
> >
> > I'm running Ubuntu 16.04 with OpenJDK.
> >
> > Any advice would be greatly appreciated as I can't move forward with
> Kafka
> > Streams as a solution if messages are consistently lost between stream on
> > the same machine.
> >
> > Thanks
> > Ryan
>
>


Re: Kafka Streaming message loss

2016-11-21 Thread Michael Noll
Also:  Since your testing is purely local, feel free to share the code you
have been using so that we can try to reproduce what you're observing.

-Michael



On Mon, Nov 21, 2016 at 4:04 PM, Michael Noll  wrote:

> Please don't take this comment the wrong way, but have you double-checked
> whether your counting code is working correctly?  (I'm not implying this
> could be the only reason for what you're observing.)
>
> -Michael
>
>
> On Fri, Nov 18, 2016 at 4:52 PM, Eno Thereska 
> wrote:
>
>> Hi Ryan,
>>
>> Perhaps you could share some of your code so we can have a look? One
>> thing I'd check is if you are using compacted Kafka topics. If so, and if
>> you have non-unique keys, compaction happens automatically and you might
>> only see the latest value for a key.
>>
>> Thanks
>> Eno
>> > On 18 Nov 2016, at 13:49, Ryan Slade  wrote:
>> >
>> > Hi
>> >
>> > I'm trialling Kafka Streaming for a large stream processing job, however
>> > I'm seeing message loss even in the simplest scenarios.
>> >
>> > I've tried to boil it down to the simplest scenario where I see loss
>> which
>> > is the following:
>> > 1. Ingest messages from an input stream (String, String)
>> > 2. Decode message into a type from JSON
>> > 3. If succesful, send to a second stream and update an atomic counter.
>> > (String, CustomType)
>> > 4. A foreach on the second stream that updates an AtomicCounter each
>> time a
>> > message arrives.
>> >
>> > I would expect that since we have at least once guarantees that the
>> second
>> > stream would see at least as many messages as were sent to it from the
>> > first, however I consistently see message loss.
>> >
>> > I've tested multiple times sending around 200k messages. I don't see
>> losses
>> > every time, maybe around 1 in 5 runs with the same data. The losses are
>> > small, around 100 messages, but I would expect none.
>> >
>> > I'm running version 0.10.1.0 with Zookeeper, Kafka and the Stream
>> Consumer
>> > all running on the same machine in order to mitigate packet loss.
>> >
>> > I'm running Ubuntu 16.04 with OpenJDK.
>> >
>> > Any advice would be greatly appreciated as I can't move forward with
>> Kafka
>> > Streams as a solution if messages are consistently lost between stream
>> on
>> > the same machine.
>> >
>> > Thanks
>> > Ryan
>>
>>
>


Re: KafkaStreams KTable#through not creating changelog topic

2016-11-23 Thread Michael Noll
Mikael,

regarding your second question:

> 2) Regarding the use case, the topology looks like this:
>
> .stream(...)
> .aggregate(..., "store-1")
> .mapValues(...)
> .through(..., "store-2")

The last operator above would, without "..." ellipsis, be sth like
`KTable#through("through-topic", "store-2")`.  Here, "through-topic" is the
changelog topic for both the KTable and the state store "store-2".  So this
is the changelog topic name that you want to know.

- If you want the "through" topic to have a `-changelog` suffix, then you'd
need to add that yourself in the call to `through(...)`.

- If you wonder why `through()` doesn't add a `-changelog` suffix
automatically:  That's because `through()` -- like `to()` or `stream()`,
`table()` -- require you to explicitly provide a topic name, and of course
Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
added when Kafka creates internal changelog topics behind the scenes for
you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
because it refers to `-changelog`;  we'll fix that as mentioned above.

- Also, in case you want to do some shenanigans (like for some tooling
you're building around state stores/changelogs/interactive queries) such
detecting all state store changelogs by doing the equivalent of `ls
*-changelog`, then this will miss changelogs of KTables that are created by
`through()` and `to()` (unless you come up with a naming convention that
your tooling can assume to be in place, e.g. by always adding `-changelog`
to topic names when you call `through()`).

I hope this helps!
Michael




On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist 
wrote:

> Hi Eno,
>
> 1) Great :)
>
> 2) Yes, we are using the Interactive Queries to access the state stores. In
> addition, we access the changelogs to subscribe to updates. For this reason
> we need to know the changelog topic name.
>
> Thanks,
> Mikael
>
> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska 
> wrote:
>
> > HI Mikael,
> >
> > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is looking
> > into fixing it. I agree that it can be confusing to have topic names that
> > are not what one would expect.
> >
> > 2) If your goal is to query/read from the state stores, you can use
> > Interactive Queries to do that (you don't need to worry about the
> changelog
> > topic name and such). Interactive Queries is a new feature in 0.10.1
> (blog
> > here:
> > https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
> > <
> > https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
> > >).
> >
> > Thanks
> > Eno
> >
> >
> > > On 22 Nov 2016, at 19:27, Mikael Högqvist  wrote:
> > >
> > > Sorry for being unclear, i'll try again :)
> > >
> > > 1) The JavaDoc for through is not correct, it states that a changelog
> > topic
> > > will be created for the state store. That is, if I would call it with
> > > through("topic", "a-store"), I would expect a kafka topic
> > > "my-app-id-a-store-changelog" to be created.
> > >
> > > 2) Regarding the use case, the topology looks like this:
> > >
> > > .stream(...)
> > > .aggregate(..., "store-1")
> > > .mapValues(...)
> > > .through(..., "store-2")
> > >
> > > Basically, I want to materialize both the result from the aggregate
> > method
> > > and the result from mapValues, which is materialized using .through().
> > > Later, I will access both the tables (store-1 and store-2) to a) get
> the
> > > current state of the aggregate, b) subscribe to future updates. This
> > works
> > > just fine. The only issue is that I assumed to have a changelog topic
> for
> > > store-2 created automatically, which didnt happen.
> > >
> > > Since I want to access the changelog topic, it helps if the naming is
> > > consistent. So either we enforce the same naming pattern as kafka when
> > > calling .through() or alternatively the Kafka Streams API can provide a
> > > method to materialize tables which creates a topic name according to
> the
> > > naming pattern. E.g. .through() without the topic parameter.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Mikael
> > >
> > > On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax  >
> > > wrote:
> > >
> > >> I cannot completely follow what want to achieve.
> > >>
> > >> However, the JavaDoc for through() seems not to be correct to me.
> Using
> > >> through() will not create an extra internal changelog topic with the
> > >> described naming schema, because the topic specified in through() can
> be
> > >> used for this (there is no point in duplicating the data).
> > >>
> > >> If you have a KTable and apply a mapValues(), this will not write data
> > >> to any topic. The derived KTable is in-memory because you can easily
> > >> recreate it from its base KTable.
> > >>
> > >> What is the missing part you want to get?
> > >>
> > >> Btw: the internally created changelog topics are only used for
> recovery
> > >> in case

Re: KafkaStreams KTable#through not creating changelog topic

2016-11-23 Thread Michael Noll
> - Also, in case you want to do some shenanigans (like for some tooling
you're building
> around state stores/changelogs/interactive queries) such detecting all
state store changelogs
> by doing the equivalent of `ls *-changelog`, then this will miss
changelogs of KTables that are
> created by `through()` and `to()` [...]

Addendum: And that's because the topic that is created by
`KTable#through()` and `KTable#to()` is, by definition, a changelog of that
KTable and the associated state store.



On Wed, Nov 23, 2016 at 4:15 PM, Michael Noll  wrote:

> Mikael,
>
> regarding your second question:
>
> > 2) Regarding the use case, the topology looks like this:
> >
> > .stream(...)
> > .aggregate(..., "store-1")
> > .mapValues(...)
> > .through(..., "store-2")
>
> The last operator above would, without "..." ellipsis, be sth like
> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is
> the changelog topic for both the KTable and the state store "store-2".  So
> this is the changelog topic name that you want to know.
>
> - If you want the "through" topic to have a `-changelog` suffix, then
> you'd need to add that yourself in the call to `through(...)`.
>
> - If you wonder why `through()` doesn't add a `-changelog` suffix
> automatically:  That's because `through()` -- like `to()` or `stream()`,
> `table()` -- require you to explicitly provide a topic name, and of course
> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
> added when Kafka creates internal changelog topics behind the scenes for
> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
> because it refers to `-changelog`;  we'll fix that as mentioned above.
>
> - Also, in case you want to do some shenanigans (like for some tooling
> you're building around state stores/changelogs/interactive queries) such
> detecting all state store changelogs by doing the equivalent of `ls
> *-changelog`, then this will miss changelogs of KTables that are created by
> `through()` and `to()` (unless you come up with a naming convention that
> your tooling can assume to be in place, e.g. by always adding `-changelog`
> to topic names when you call `through()`).
>
> I hope this helps!
> Michael
>
>
>
>
> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist 
> wrote:
>
>> Hi Eno,
>>
>> 1) Great :)
>>
>> 2) Yes, we are using the Interactive Queries to access the state stores.
>> In
>> addition, we access the changelogs to subscribe to updates. For this
>> reason
>> we need to know the changelog topic name.
>>
>> Thanks,
>> Mikael
>>
>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska 
>> wrote:
>>
>> > HI Mikael,
>> >
>> > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
>> looking
>> > into fixing it. I agree that it can be confusing to have topic names
>> that
>> > are not what one would expect.
>> >
>> > 2) If your goal is to query/read from the state stores, you can use
>> > Interactive Queries to do that (you don't need to worry about the
>> changelog
>> > topic name and such). Interactive Queries is a new feature in 0.10.1
>> (blog
>> > here:
>> > https://www.confluent.io/blog/unifying-stream-processing-and
>> -interactive-queries-in-apache-kafka/
>> > <
>> > https://www.confluent.io/blog/unifying-stream-processing-and
>> -interactive-queries-in-apache-kafka/
>> > >).
>> >
>> > Thanks
>> > Eno
>> >
>> >
>> > > On 22 Nov 2016, at 19:27, Mikael Högqvist 
>> wrote:
>> > >
>> > > Sorry for being unclear, i'll try again :)
>> > >
>> > > 1) The JavaDoc for through is not correct, it states that a changelog
>> > topic
>> > > will be created for the state store. That is, if I would call it with
>> > > through("topic", "a-store"), I would expect a kafka topic
>> > > "my-app-id-a-store-changelog" to be created.
>> > >
>> > > 2) Regarding the use case, the topology looks like this:
>> > >
>> > > .stream(...)
>> > > .aggregate(..., "store-1")
>> > > .mapValues(...)
>> > > .through(..., "store-2")
>> > >
>> > > Basically, I want to materialize both the result from the aggregate
>> > method
>> > > and the result from mapValues, 

Re: KafkaStreams KTable#through not creating changelog topic

2016-11-25 Thread Michael Noll
; > > > My point is that it would help me (and maybe others), if the API of
> > > KTable
> > > > was extended to have a new method that does two things that is not
> part
> > > of
> > > > the implementation of .through(). 1) Create a state store AND the
> > > changelog
> > > > topic 2) follow the Kafka Streams naming convention for changelog
> > topics.
> > > > Basically, I want to have a method that does what .through() is
> > supposed
> > > to
> > > > do according to the documentation, but without the "topic" parameter.
> > > >
> > > > What do you think, would it be possible to extend the API with a
> method
> > > > like that?
> > > >
> > > > Thanks,
> > > > Mikael
> > > >
> > > > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll 
> > > wrote:
> > > >
> > > >> Mikael,
> > > >>
> > > >> regarding your second question:
> > > >>
> > > >>> 2) Regarding the use case, the topology looks like this:
> > > >>>
> > > >>> .stream(...)
> > > >>> .aggregate(..., "store-1")
> > > >>> .mapValues(...)
> > > >>> .through(..., "store-2")
> > > >>
> > > >> The last operator above would, without "..." ellipsis, be sth like
> > > >> `KTable#through("through-topic", "store-2")`.  Here,
> "through-topic"
> > is
> > > the
> > > >> changelog topic for both the KTable and the state store "store-2".
> So
> > > this
> > > >> is the changelog topic name that you want to know.
> > > >>
> > > >> - If you want the "through" topic to have a `-changelog` suffix,
> then
> > > you'd
> > > >> need to add that yourself in the call to `through(...)`.
> > > >>
> > > >> - If you wonder why `through()` doesn't add a `-changelog` suffix
> > > >> automatically:  That's because `through()` -- like `to()` or
> > `stream()`,
> > > >> `table()` -- require you to explicitly provide a topic name, and of
> > > course
> > > >> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is
> > > only
> > > >> added when Kafka creates internal changelog topics behind the scenes
> > for
> > > >> you.)   Unfortunately, the javadocs of `KTable#through()` is
> incorrect
> > > >> because it refers to `-changelog`;  we'll fix that as mentioned
> above.
> > > >>
> > > >> - Also, in case you want to do some shenanigans (like for some
> tooling
> > > >> you're building around state stores/changelogs/interactive queries)
> > such
> > > >> detecting all state store changelogs by doing the equivalent of `ls
> > > >> *-changelog`, then this will miss changelogs of KTables that are
> > > created by
> > > >> `through()` and `to()` (unless you come up with a naming convention
> > that
> > > >> your tooling can assume to be in place, e.g. by always adding
> > > `-changelog`
> > > >> to topic names when you call `through()`).
> > > >>
> > > >> I hope this helps!
> > > >> Michael
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <
> hoegqv...@gmail.com
> > >
> > > >> wrote:
> > > >>
> > > >>> Hi Eno,
> > > >>>
> > > >>> 1) Great :)
> > > >>>
> > > >>> 2) Yes, we are using the Interactive Queries to access the state
> > > stores.
> > > >> In
> > > >>> addition, we access the changelogs to subscribe to updates. For
> this
> > > >> reason
> > > >>> we need to know the changelog topic name.
> > > >>>
> > > >>> Thanks,
> > > >>> Mikael
> > > >>>
> > > >>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <
> eno.there...@gmail.com
> > >
> > > >>> wrote:
> > > >>>
> > > >>>> HI Mikael,
> > > >>>>
> > > >>>> 1) The JavaDoc looks inc

Re: Data (re)processing with Kafka (new wiki page)

2016-11-25 Thread Michael Noll
Thanks a lot, Matthias!

I have already begun to provide feedback.

-Michael



On Wed, Nov 23, 2016 at 11:41 PM, Matthias J. Sax 
wrote:

> Hi,
>
> we added a new wiki page that is supposed to collect data (re)processing
> scenario with Kafka:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Data+(Re)
> Processing+Scenarios
>
> We added already a couple of scenarios we think might be common and want
> to invite all of you to add more. This helps to get a better overview of
> requirements to enable new use cases.
>
> We are looking forward to your feedback!
>
>
> -Matthias
>
>


Re: Interactive Queries

2016-11-28 Thread Michael Noll
There are also some examples/demo applications at
https://github.com/confluentinc/examples that demonstrate the use of
interactive queries:

-
https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java

-
https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java

Note: The `3.1.x` branch is for Kafka 0.10.1.

-Michael




On Sun, Nov 27, 2016 at 3:35 AM, David Garcia  wrote:

> I would start here: http://docs.confluent.io/3.1.0/streams/index.html
>
>
> On 11/26/16, 8:27 PM, "Alan Kash"  wrote:
>
> Hi,
>
> New to Kafka land.
>
> I am looking into Interactive queries feature, which transforms Topics
> into
> Tables with history, neat !
>
> 1. What kind of queries we can run on the store ?  Point or Range ?
> 2. Is Indexing supported ? primary or seconday ?
> 3. Query language - SQL ? Custom Java Native Query ?
>
> I see rocksdb is the persistent layer.
>
> Did the team look at JCache API (JSR 107) -
> https://jcp.org/en/jsr/detail?id=107 ?
>
> Thanks,
> Alan
>
>
>


Re: I need some help with the production server architecture

2016-12-01 Thread Michael Noll
+1 to what Dave said.



On Thu, Dec 1, 2016 at 4:29 PM, Tauzell, Dave 
wrote:

> For low volume zookeeper doesn't seem to use many resources.   I would put
> it on nodejs server as that will have less IO and heavy IO could impact
> zookeeper.  Or, you could put some ZK nodes on nodejs and some on DB
> servers to hedge your bets.   As always, you'll find out a lot once you
> actually start running it in production.
>
> -Dave
>
> -Original Message-
> From: Sachin Mittal [mailto:sjmit...@gmail.com]
> Sent: Thursday, December 1, 2016 6:03 AM
> To: users@kafka.apache.org
> Subject: Re: I need some help with the production server architecture
>
> Folks any help on this.
>
> Just to put it in simple terms, since we have limited resources available
> to us what is better option 1. run zookeeper on servers running the nodejs
> web server or db server.
> 2. what about kafka brokers.
>
> Thanks
> Sachin
>
>
> On Tue, Nov 29, 2016 at 1:06 PM, Sachin Mittal  wrote:
>
> > Hi,
> > Sometime back i was informed on the group that in production we should
> > never run kafka on same physical machine. So based on that I have a
> > question on how to divide the server nodes we have to run zookeper and
> > kafka brokers.
> >
> > I have a following setup
> > Data center 1
> > Lan 1 (3 VMs)
> > 192.168.xx.yy1
> > 192.168.xx.yy2
> > 192.168.xx.yy3
> > Right now here we are running a cluster of 3 nodejs web servers.
> > These collect data from web and write to kafka queue. Each VM has 70
> > GB of space.
> >
> > Lan 2 (3 VMs)
> > 192.168.zz.aa1
> > 192.168.zz.aa2
> > 192.168.zz.aa3
> > These are served the cluster of our database server. Each VM has 400
> > GB of space.
> >
> > Date center 2
> > Lan 1 (3 VMs)
> > 192.168.yy.bb1
> > 192.168.yy.bb2
> > 192.168.yy.bb3
> > Three new machines where we plan to run a cluster of new database to
> > be served as sink of kafka stream applications. Each VM has 400 GB of
> space.
> > These have connectivity only between Lan 2 of Data center 1 with a
> > 100MBs of data transfer rate.
> >
> > Each VM has a 4 core processor and 16 GB of RAM. They all run linux.
> >
> > Now I would like my topics to be replicated with a factor of 3. Since
> > we don't foresee much volume of data, I don't want it to be partitioned.
> >
> > Also we would like one server to be used as streaming application
> > server, where we can run one or more kafka stream applications to
> > process the topics and write to the new database.
> >
> >  So please let me know what is a suitable division to run brokers and
> > zookeeper.
> >
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of the
> individual or entity to whom they are addressed. If you have received this
> e-mail in error, please notify the sender by reply e-mail immediately and
> destroy all copies of the e-mail and any attachments.
>


Re: Kafka Logo as HighRes or Vectorgraphics

2016-12-02 Thread Michael Noll
Jan,

Here's vector files for the logo. One of our teammates went ahead and
helped resized it to fit nicely into a 2x4m board with 15cm of margin all
around.

Note: I was told to kindly remind you (and other readers of this) to follow
the Apache branding guidelines for the logo, and please not manipulate the
vector in any way other than proportionally scaling it as needed.

Minimal version: https://drive.google.com/file/d/0Bxy-
BmVSBalmNFY5YzdnY001ckU/view?usp=sharing
Full version (with "Apache"): https://drive.google.com/file/d/0Bxy-
BmVSBalmdHRUVHgxRHl2ejQ/view?usp=sharing

Hope this helps,
Michael




On Fri, Dec 2, 2016 at 10:04 AM, Jan Filipiak 
wrote:

> Hi,
>
> I was just pointed to this. https://www.vectorlogo.zone/lo
> gos/apache_kafka/
> if someone else is looking for the same thing! thanks a lot
>
> Best Jan
>
>
> On 01.12.2016 13:05, Jan Filipiak wrote:
>
>> Hi Everyone,
>>
>> we want to print some big banners of the Kafka logo to decorate our
>> offices. Can anyone help me find a version
>> of the kafka logo that would still look nice printed onto 2x4m flags?
>> Highly appreciated!
>>
>> Best Jan
>>
>
>


Re: kafka streams consumer partition assignment is uneven

2017-01-09 Thread Michael Noll
What does the processing topology of your Kafka Streams application look
like, and what's the exact topic and partition configuration?  You say you
have 12 partitions in your cluster, presumably across 7 topics -- that
means that most topics have just a single partition.  Depending on your
topology (e.g. if you have defined that single-partition topics A, B, C
must be joined), Kafka Streams is forced to let one of your three Streams
nodes process "more" topics/partitions than the other two nodes.

-Michael



On Mon, Jan 9, 2017 at 6:52 PM, Ara Ebrahimi 
wrote:

> Hi,
>
> I have 3 kafka brokers, each with 4 disks. I have 12 partitions. I have 3
> kafka streams nodes. Each is configured to have 4 streaming threads. My
> topology is quite complex and I have 7 topics and lots of joins and states.
>
> What I have noticed is that each of the 3 kafka streams nodes gets
> configured to process variables number of partitions of a topic. One node
> is assigned to process 2 partitions of topic a and another one gets
> assigned 5. Hence I end up with nonuniform throughput across these nodes.
> One node ends up processing more data than the other.
>
> What’s going on? How can I make sure partitions assignment to kafka
> streams nodes is uniform?
>
> On a similar topic, is there a way to make sure partition assignment to
> disks across kafka brokers is also uniform? Even if I use a round-robin one
> to pin partitions to broker, but there doesn’t seem to be a way to
> uniformly pin partitions to disks. Or maybe I’m missing something here? I
> end up with 2 partitions of topic a on disk 1 and 3 partitions of topic a
> on disk 2. It’s a bit variable. Not totally random, but it’s not uniformly
> distributed either.
>
> Ara.
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>


Re: [ANNOUNCE] New committer: Grant Henke

2017-01-16 Thread Michael Noll
My congratulations, Grant -- more work's awaiting you then. ;-)

Best wishes,
Michael


On Fri, Jan 13, 2017 at 2:50 PM, Jeff Holoman  wrote:

> Well done Grant!  Congrats!
>
> On Thu, Jan 12, 2017 at 1:13 PM, Joel Koshy  wrote:
>
> > Hey Grant - congrats!
> >
> > On Thu, Jan 12, 2017 at 10:00 AM, Neha Narkhede 
> wrote:
> >
> > > Congratulations, Grant. Well deserved!
> > >
> > > On Thu, Jan 12, 2017 at 7:51 AM Grant Henke 
> wrote:
> > >
> > > > Thanks everyone!
> > > >
> > > > On Thu, Jan 12, 2017 at 2:58 AM, Damian Guy 
> > > wrote:
> > > >
> > > > > Congratulations!
> > > > >
> > > > > On Thu, 12 Jan 2017 at 03:35 Jun Rao  wrote:
> > > > >
> > > > > > Grant,
> > > > > >
> > > > > > Thanks for all your contribution! Congratulations!
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Wed, Jan 11, 2017 at 2:51 PM, Gwen Shapira  >
> > > > wrote:
> > > > > >
> > > > > > > The PMC for Apache Kafka has invited Grant Henke to join as a
> > > > > > > committer and we are pleased to announce that he has accepted!
> > > > > > >
> > > > > > > Grant contributed 88 patches, 90 code reviews, countless great
> > > > > > > comments on discussions, a much-needed cleanup to our protocol
> > and
> > > > the
> > > > > > > on-going and critical work on the Admin protocol. Throughout
> > this,
> > > he
> > > > > > > displayed great technical judgment, high-quality work and
> > > willingness
> > > > > > > to contribute where needed to make Apache Kafka awesome.
> > > > > > >
> > > > > > > Thank you for your contributions, Grant :)
> > > > > > >
> > > > > > > --
> > > > > > > Gwen Shapira
> > > > > > > Product Manager | Confluent
> > > > > > > 650.450.2760 <(650)%20450-2760> <(650)%20450-2760> | @gwenshap
> > > > > > > Follow us: Twitter | blog
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Grant Henke
> > > > Software Engineer | Cloudera
> > > > gr...@cloudera.com | twitter.com/gchenke |
> linkedin.com/in/granthenke
> > > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
>


Re: Kafka Streams: from a KStream, aggregating records with the same key and updated metrics ?

2017-01-16 Thread Michael Noll
Nicolas,

quick feedback on timestamps:

> In our system, clients send data to an HTTP API. This API produces the
> records in Kafka. I can't rely on the clock of the clients sending the
> original data, (so the records' timestamps are set by the servers
ingesting
> the records in Kafka), but I can rely on a time difference. The client
only
> gives information about the time spent since the first version of the
> record was sent. Via a custom timestamp extractor, I just need to subtract
> the time spent to the record's timestamp to ensure that it will fall in
> same window.

Alternatively, you can also let the HTTP API handle the timestamp
calculations, and then embed the "final" timestamp in the message payload
(like the messave value).  Then, in your downstream application, you'd
define a custom timestamp extractor that returns this embedded timestamp.

One advantage of the approach I outlined above is that other consumers of
the same data (who may or may not be aware of how you need to compute a
timestamp diff to get the "real" timestamp) can simply re-use the timestamp
embedded in the payload without having to know/worry about the custom
calculation.  It might also be easier for Ops personnel to have access to a
ready-to-use timestamp in case they need to debug or troubleshoot.

-Michael




On Sun, Jan 15, 2017 at 11:10 PM, Nicolas Fouché  wrote:

> Hi Eno,
>
> 2. Well, records could arrive out of order. But it should happen rarely,
> and it's no big deal anyway. So let's forget about the version number if it
> makes things easier !
>
> 3. I completely missed out on KTable aggregations. Thanks a lot for the
> pointer, that opens new perspectives.
>
> ... a few hours pass ...
>
> Ok, in my case, since my input is an infinite stream of new records, I
> would have to "window" my KTables, right ?
> With `KStream.groupBy().reduce()`, I can generate a windowed KTable of
> records, and even use the reducer function to compare the version numbers.
> Next, I use `KTable.groupBy().aggregate()` to benefit from the `adder` and
> `substractor` mechanisms [1].
>
> The last problem is about the record timestamp. If I work on a one-hour
> window, and records are sent between let's say 00:59 and 01:01, they would
> live in two different KTables and this would create duplicates.
> To deal with this, I could mess with the records timestamps, so each new
> record version is considered by Kafka Streams having the same timestamp
> than the first version seen by the producer.
> Here is my idea:
> In our system, clients send data to an HTTP API. This API produces the
> records in Kafka. I can't rely on the clock of the clients sending the
> original data, (so the records' timestamps are set by the servers ingesting
> the records in Kafka), but I can rely on a time difference. The client only
> gives information about the time spent since the first version of the
> record was sent. Via a custom timestamp extractor, I just need to subtract
> the time spent to the record's timestamp to ensure that it will fall in
> same window.
> Long text, small code:
> https://gist.github.com/nfo/6df4d1076af9da5fd1c29b0ad4564f2a .What do you
> think ?
>
> About the windowed KTables in the first step, I guess I should avoid making
> them too long, since they store the whole records. We usually aggregate
> with windows size from 1 hour to 1 month. I should compute all the
> aggregates covering more than 1 hour from the 1-hour aggregates, right ?
>
> [1]
> http://docs.confluent.io/3.1.1/streams/javadocs/org/apache/
> kafka/streams/kstream/KGroupedTable.html#aggregate(
> org.apache.kafka.streams.kstream.Initializer,%20org.
> apache.kafka.streams.kstream.Aggregator,%20org.apache.
> kafka.streams.kstream.Aggregator,%20org.apache.kafka.common.serialization.
> Serde,%20java.lang.String)
>
> Thanks (a lot).
> Nicolas
>
>
> 2017-01-13 17:32 GMT+01:00 Eno Thereska :
>
> > Hi Nicolas,
> >
> > There is a lot here, so let's try to split the concerns around some
> themes:
> >
> > 1. The Processor API is flexible and can definitely do what you want, but
> > as you mentioned, at the cost of you having to manually craft the code.
> > 2. Why are the versions used? I sense there is concern about records
> > arriving out of order so the versions give each record with the same ID
> an
> > order. Is that correct?
> > 3. If you didn't have the version and the count requirement I'd say using
> > a KTable to interpret the stream and then aggregating on that would be
> > sufficient. There might be a way to do that with a mixture of the DSL and
> > the processor API.
> >
> > Another alternative might be to use the Interactive Query APIs (
> > https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-
> > queries-in-apache-kafka/  > processing-and-interactive-queries-in-apache-kafka/>) to first get all
> > your data in KTables and then query it periodically (you can decide on
> the
> > frequency manually)

Re: Kafka Streams: from a KStream, aggregating records with the same key and updated metrics ?

2017-01-17 Thread Michael Noll
Thanks for sharing back your findings/code, Nicolas!

-Michael


On Mon, Jan 16, 2017 at 11:15 PM, Nicolas Fouché  wrote:

> If anyone is interested, here is my custom timestamp extractor:
> https://gist.github.com/nfo/54d5830720e163d2e7e848b6e4baac20 .
>
> 2017-01-16 15:52 GMT+01:00 Nicolas Fouché :
>
> > Hi Michael,
> >
> > got it. I understand that it would be less error-prone to generate the
> > final "altered" timestamp on the Producer side, instead of trying to
> > compute it each time the record is consumed.
> >
> > Thanks.
> > Nicolas.
> >
> > 2017-01-16 10:03 GMT+01:00 Michael Noll :
> >
> >> Nicolas,
> >>
> >> quick feedback on timestamps:
> >>
> >> > In our system, clients send data to an HTTP API. This API produces the
> >> > records in Kafka. I can't rely on the clock of the clients sending the
> >> > original data, (so the records' timestamps are set by the servers
> >> ingesting
> >> > the records in Kafka), but I can rely on a time difference. The client
> >> only
> >> > gives information about the time spent since the first version of the
> >> > record was sent. Via a custom timestamp extractor, I just need to
> >> subtract
> >> > the time spent to the record's timestamp to ensure that it will fall
> in
> >> > same window.
> >>
> >> Alternatively, you can also let the HTTP API handle the timestamp
> >> calculations, and then embed the "final" timestamp in the message
> payload
> >> (like the messave value).  Then, in your downstream application, you'd
> >> define a custom timestamp extractor that returns this embedded
> timestamp.
> >>
> >> One advantage of the approach I outlined above is that other consumers
> of
> >> the same data (who may or may not be aware of how you need to compute a
> >> timestamp diff to get the "real" timestamp) can simply re-use the
> >> timestamp
> >> embedded in the payload without having to know/worry about the custom
> >> calculation.  It might also be easier for Ops personnel to have access
> to
> >> a
> >> ready-to-use timestamp in case they need to debug or troubleshoot.
> >>
> >> -Michael
> >>
> >>
> >>
> >>
> >> On Sun, Jan 15, 2017 at 11:10 PM, Nicolas Fouché 
> >> wrote:
> >>
> >> > Hi Eno,
> >> >
> >> > 2. Well, records could arrive out of order. But it should happen
> rarely,
> >> > and it's no big deal anyway. So let's forget about the version number
> >> if it
> >> > makes things easier !
> >> >
> >> > 3. I completely missed out on KTable aggregations. Thanks a lot for
> the
> >> > pointer, that opens new perspectives.
> >> >
> >> > ... a few hours pass ...
> >> >
> >> > Ok, in my case, since my input is an infinite stream of new records, I
> >> > would have to "window" my KTables, right ?
> >> > With `KStream.groupBy().reduce()`, I can generate a windowed KTable of
> >> > records, and even use the reducer function to compare the version
> >> numbers.
> >> > Next, I use `KTable.groupBy().aggregate()` to benefit from the `adder`
> >> and
> >> > `substractor` mechanisms [1].
> >> >
> >> > The last problem is about the record timestamp. If I work on a
> one-hour
> >> > window, and records are sent between let's say 00:59 and 01:01, they
> >> would
> >> > live in two different KTables and this would create duplicates.
> >> > To deal with this, I could mess with the records timestamps, so each
> new
> >> > record version is considered by Kafka Streams having the same
> timestamp
> >> > than the first version seen by the producer.
> >> > Here is my idea:
> >> > In our system, clients send data to an HTTP API. This API produces the
> >> > records in Kafka. I can't rely on the clock of the clients sending the
> >> > original data, (so the records' timestamps are set by the servers
> >> ingesting
> >> > the records in Kafka), but I can rely on a time difference. The client
> >> only
> >> > gives information about the time spent since the first version of the
> >> > record was sent. Via a custom timestamp extractor, I just need to
> >> subtract
> >> > the time spent 

Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

2017-01-18 Thread Michael Noll
Nicolas,

if I understand your question correctly you'd like to add further
operations after having called `KStream#process()`, which -- as you report
-- doesn't work because `process()` returns void.

If that's indeed the case, +1 to Damian's suggest to use
`KStream.transform()` instead of `KStream.process()`.

-Michael




On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy  wrote:

> You could possibly also use KStream.transform(...)
>
> On Wed, 18 Jan 2017 at 14:22 Damian Guy  wrote:
>
> > Hi Nicolas,
> >
> > Good question! I'm not sure why it is a terminal operation, maybe one of
> > the original authors can chip in. However, you could probably work around
> > it by using TopologyBuilder.addProcessor(...) rather then
> KStream.process
> >
> > Thanks,
> > Damian
> >
> > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché  wrote:
> >
> > Hi,
> >
> > as far as I understand, calling `KStream.process` prevents the developer
> > from adding further operations to a `KStreamBuilder` [1], because its
> > return type is `void`. Good.
> >
> > But it also prevents the developer from adding operations to its
> superclass
> > `TopologyBuilder`. In my case I wanted to add a sink, and the parent of
> > this sink would be the name of the Processor that is created by
> > `KStream.process`. Is there any reason why this method does not return
> the
> > processor name [2] ? Is it because it would be a bad idea continuing
> > building my topology with the low-level API ?
> >
> > [1]
> >
> > https://github.com/confluentinc/examples/blob/3.
> 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> MixAndMatchLambdaIntegrationTest.java%23L56
> > [2]
> >
> > https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e24
> 35985d9101/streams/src/main/java/org/apache/kafka/streams/
> kstream/internals/KStreamImpl.java#L391
> >
> >
> > Thanks.
> > Nicolas.
> >
> >
>


Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`

2017-01-18 Thread Michael Noll
Nicolas,

here's some information I shared on StackOverflow (perhaps a bit outdated
by now, was back in Aug 2016) about how you can add a state store when
using KStreamBuilder: http://stackoverflow.com/a/39086805/1743580

-Michael




On Wed, Jan 18, 2017 at 5:18 PM, Nicolas Fouché  wrote:

> The reason I would not use `KStream.transform()` is that I want to call
> `ProcessorContext.forward()` several times, to different children. These
> children are sinks.
> My use case: I need to route my beacons to different topics. Right now, I
> use a series of `KStream.branch()` calls [1]. But would it be more
> "elegant" to be able to add 5 sinks to a topology, and forward my records
> to them in a custom processor ?
>
> Damian: About `TopologyBuilder.addProcessor(...)`, as far as I know, I
> have
> to give a parent processor. But the parent processor was generated by a
> high-level topologies. And names of processors created by `KStreamBuilder`
> are not accessible. (unless by inspecting the topology nodes I guess)
>
> [1] https://gist.github.com/nfo/c4936a24601352db23b18653a8ccc352
>
> Thanks.
> Nicolas
>
>
> 2017-01-18 15:56 GMT+01:00 Michael Noll :
>
> > Nicolas,
> >
> > if I understand your question correctly you'd like to add further
> > operations after having called `KStream#process()`, which -- as you
> report
> > -- doesn't work because `process()` returns void.
> >
> > If that's indeed the case, +1 to Damian's suggest to use
> > `KStream.transform()` instead of `KStream.process()`.
> >
> > -Michael
> >
> >
> >
> >
> > On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy 
> wrote:
> >
> > > You could possibly also use KStream.transform(...)
> > >
> > > On Wed, 18 Jan 2017 at 14:22 Damian Guy  wrote:
> > >
> > > > Hi Nicolas,
> > > >
> > > > Good question! I'm not sure why it is a terminal operation, maybe one
> > of
> > > > the original authors can chip in. However, you could probably work
> > around
> > > > it by using TopologyBuilder.addProcessor(...) rather then
> > > KStream.process
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché 
> > wrote:
> > > >
> > > > Hi,
> > > >
> > > > as far as I understand, calling `KStream.process` prevents the
> > developer
> > > > from adding further operations to a `KStreamBuilder` [1], because its
> > > > return type is `void`. Good.
> > > >
> > > > But it also prevents the developer from adding operations to its
> > > superclass
> > > > `TopologyBuilder`. In my case I wanted to add a sink, and the parent
> of
> > > > this sink would be the name of the Processor that is created by
> > > > `KStream.process`. Is there any reason why this method does not
> return
> > > the
> > > > processor name [2] ? Is it because it would be a bad idea continuing
> > > > building my topology with the low-level API ?
> > > >
> > > > [1]
> > > >
> > > > https://github.com/confluentinc/examples/blob/3.
> > > 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> > > MixAndMatchLambdaIntegrationTest.java%23L56
> > > > [2]
> > > >
> > > > https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e24
> > > 35985d9101/streams/src/main/java/org/apache/kafka/streams/
> > > kstream/internals/KStreamImpl.java#L391
> > > >
> > > >
> > > > Thanks.
> > > > Nicolas.
> > > >
> > > >
> > >
> >
>


Re: Streams: Global state & topic multiplication questions

2017-01-20 Thread Michael Noll
As Eno said I'd use the interactive queries API for Q2.

Demo apps:
-
https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java
-
https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java

Further docs:
http://docs.confluent.io/current/streams/developer-guide.html#interactive-queries

(FYI: We have begun moving the info in these docs to the Apache Kafka docs,
too, but it will take a while.)

-Michael




On Thu, Jan 19, 2017 at 5:23 PM, Eno Thereska 
wrote:

> For Q2: one way to export the state on demand would be to use the
> Interactive Queries API (https://www.confluent.io/blog/unifying-stream-
> processing-and-interactive-queries-in-apache-kafka/ <
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-
> queries-in-apache-kafka/>). That can be seen as a current, materialized
> view of the state. There is some example code in the blog.
>
> Eno
>
>
> > On 19 Jan 2017, at 16:11, Peter Kopias  wrote:
> >
> > Q1: Thank you, the branch() is what I'm looking for, I just missed it
> > somehow.
> >
> > Q2:
> >   I receive something like "imageid,x,y" as key, and a color as value. I
> > aggregate this to something like average color for example.
> >  So technically I do not have images, I have colored pixels with 3
> > dimensions one being the image...
> >  And then my newcomer user wants to join the fun, so we'd need to serve
> > him an image with the latest current state (all pixels of imageid=x), and
> > of course everything that comes later (updates on the output of the
> > aggregate, but filtered to that imageid).
> >
> > Problem is that how to get the stream api node to "export" part of it's
> > current state on demand. (All imageid=x keys with values).
> >
> > There could be a "request topic" that I could join together with the
> > aggregated ktable maybe?
> >
> > Other problem is following the updates without getting the details of
> > other images (99% of records are not interesting for the specific user).
> >
> > Thanks,
> >
> > Peter
> >
> >
> >
> > On Thu, Jan 19, 2017 at 4:55 PM, Eno Thereska 
> > wrote:
> >
> >> Hi Peter,
> >>
> >> About Q1: The DSL has the "branch" API, where one stream is branched to
> >> several streams, based on a predicate. I think that could help.
> >>
> >> About Q2: I'm not entirely sure I understand the problem space. What is
> >> the definition of a "full image"?
> >>
> >> Thanks
> >> Eno
> >>> On 19 Jan 2017, at 12:07, Peter Kopias  wrote:
> >>>
> >>> Greetings Everyone,
> >>>
> >>> I'm just getting into the kafka world with a sample project, and I've
> got
> >>> two conceptional issues, you might have a trivial answer already at
> hand
> >> to.
> >>>
> >>> Scenario: multiuser painting webapp, with N user working on M images
> >>> simultaneously.   The "brush" events go to one single kafka topic, in a
> >>> format: imageid,x,y -> brushevent  , that I aggregate to imageid,x,y
> >>>
> >>> Q1:
> >>> It would be nice to separate the stream to M output topics, so that
> would
> >>> work nice as "partitioning", and also we could just subscribe to update
> >>> events of a specific image maybe. How can I fan out the records to
> >>> different (maybe not yet existing) topics by using DSL?
> >>>
> >>> Is that a good idea? (If I can solve every processing in a common
> >>> processing graph that would be the best, but then I'd need some high
> >>> performance solution of filtering out the noise, as the subscribers are
> >>> only interested in a very small subset of the soup.)
> >>>
> >>> Q2:
> >>> - When a new user comes, I'd like give him the latest full image?
> >>> (I could do a "fullimages" output topic, but then also comes the
> problem
> >>> of serious overhead on each incoming update, and also the newcomer
> should
> >>> somehow only get the image he's interested in, not read all the images,
> >> and
> >>> ignore the others.)
> >>>
> >>> I know I'm still new to this, but I'd like to learn the best practices
> >> you
> >>> might already tried.
> >>>
> >>> Thank you,
> >>>
> >>> Peter
> >>
> >>
>
>


Re: Fwd: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-27 Thread Michael Noll
ons should be updated accordingly. For example,
>>>>>>>>
>>>>>>> 1)
>>>>>
>>>>>> KStreamBuilder.table(..) has a state store name parameter, and we will
>>>>>>>> always materialize the KTable unless its state store name is set to
>>>>>>>>
>>>>>>> null;
>>>>>
>>>>>> 2) KTable.agg requires the result KTable to be materialized, and hence
>>>>>>>>
>>>>>>> it
>>>>>
>>>>>> also have a state store name; 3) KTable.join requires the joining
>>>>>>>> table
>>>>>>>>
>>>>>>> to
>>>>>>>
>>>>>>>> be materialized. And today we do not actually have a mechanism to
>>>>>>>>
>>>>>>> enforce
>>>>>
>>>>>> that, but will only throw an exception at runtime if it is not (e.g.
>>>>>>>> if
>>>>>>>>
>>>>>>> you
>>>>>>>
>>>>>>>> have "builder.table("topic", null).join()" a RTE will be thrown).
>>>>>>>>
>>>>>>>> I'd make an extended proposal just to kick off the discussion here:
>>>>>>>>
>>>>>>> let's
>>>>>
>>>>>> remove all the state store params in other KTable functions, and if in
>>>>>>>>
>>>>>>> some
>>>>>>>
>>>>>>>> cases KTable have to be materialized (e.g. KTable resulted from
>>>>>>>>
>>>>>>> KXX.agg)
>>>>>
>>>>>> and users do not call materialize(), then we treat it as "users are
>>>>>>>> not
>>>>>>>> interested in querying it at all" and hence use an internal name
>>>>>>>>
>>>>>>> generated
>>>>>>>
>>>>>>>> for the materialized KTable; i.e. although it is materialized the
>>>>>>>> state
>>>>>>>> store is not exposed to users. And if users call materialize()
>>>>>>>>
>>>>>>> afterwards
>>>>>
>>>>>> but we have already decided to materialize it, we can replace the
>>>>>>>>
>>>>>>> internal
>>>>>>>
>>>>>>>> name with the user's provided names. Then from a user's point-view,
>>>>>>>> if
>>>>>>>>
>>>>>>> they
>>>>>>>
>>>>>>>> ever want to query a KTable, they have to call materialize() with a
>>>>>>>>
>>>>>>> given
>>>>>
>>>>>> state store name. This approach has one awkwardness though, that
>>>>>>>> serdes
>>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>>> state store names param are not separated and could be overlapped
>>>>>>>> (see
>>>>>>>> detailed comment #2 below).
>>>>>>>>
>>>>>>>>
>>>>>>>> 2. This step does not need to be included in this KIP, but just as a
>>>>>>>> reference / future work: as we have discussed before, we may enforce
>>>>>>>> materialize KTable.join resulted KTables as well in the future. If
>>>>>>>> we
>>>>>>>>
>>>>>>> do
>>>>>
>>>>>> that, then:
>>>>>>>>
>>>>>>>> a) KXX.agg resulted KTables are always materialized;
>>>>>>>> b) KTable.agg requires the aggregating KTable to always be
>>>>>>>> materialized
>>>>>>>> (otherwise we would not know the old value);
>>>>>>>> c) KTable.join resulted KTables are always materialized, and so are
>>>>>>>> the
>>>>>>>> joining KTables to always be materialized.
>>>>>>>> d) KTable.filter/mapValues resulted KTables materialization depend
>>>>>>>> on
>>>>>>>>
>>>>>>> its
>>>>>
>>>

  1   2   >