ng]. My question is how do I
inject the proper Serde here ? map does not take any Serde as arguments.
The initial topic from where I read has ByteArray and a custom Serde and it
works fine. But then the map blows up ..
Any help will be appreciated ..
regards.
--
Debasish Ghosh
htt
On Tue, Jun 20, 2017 at 7:03 PM, Damian Guy wrote:
> m.groupBy(mapper, Serdes.String(), Serdes.String()).count("
> HostAggregateCount")
>
Thanks! It works ..
--
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh
Twttr: @debasishg
Blog: http://debasishg.blog
q)
localStore.get(key)
For a custom store type, how do I create an instance of
QueryableStoreType[...] ?
any help will be appreciated ..
regards.
--
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh
Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg
elated to
https://issues.apache.org/jira/browse/KAFKA-5528. It comes from the class
BFStoreChangeLogger in the line I marked above with //**//.
Any help / workaround will be appreciated ..
regards.
--
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh
Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg
HTH,
> Damian
>
> On Fri, 30 Jun 2017 at 13:00 Debasish Ghosh
> wrote:
>
>> Hi -
>>
>> I have a custom state store in my Kafka Streams application. I have
>> developed the whole Topology and the Processor abstractions. I have the
>> custom state
PM, Debasish Ghosh
wrote:
> Hi -
>
> I have implemented a custom state store named BFStore with a change
> logger as follows:
>
> class BFStoreChangeLogger[K, V](val storeName: String,
> val context: ProcessorContext,
>
mian
>
> On Sat, 1 Jul 2017 at 19:14 Debasish Ghosh
> wrote:
>
>> Just to give some more information, the ProcessorContext that gets passed
>> to the init method of the custom store has a null RecordContext. Gave the
>> following debug statement ..
>>
>> p
should log the change when you write to the store, i.e, i think when
> you do:
> bfStore + r.host
>
>
> Does that help?
>
> Thanks,
> Damian
>
>
> On Mon, 3 Jul 2017 at 10:12 Debasish Ghosh
> wrote:
>
>> The only place where I am doing comm
ove the` logChange` from `flush` and do it when you write to the store.
>
> i.e, in the BFStore + function
>
>
> On Mon, 3 Jul 2017 at 10:43, Debasish Ghosh
> wrote:
>
>> Ok, so I make the following change .. Is this the change that u suggested
>> ?
>>
>>
1)
Looks like some internal processing failed and control went to an
unexpected path. I have 2 questions ..
1. any idea why this could happen ? I don't think it's related to Mesos
DC/OS though - may be some concurrency issue ?
2. how do I recover from such errors ? The stream
you, then?
>
> Thanks,
> Damian
>
> On Tue, 4 Jul 2017 at 16:40 Debasish Ghosh
> wrote:
>
> > Hi -
> >
> > I have been running a streaming application on some data set. Things
> > usually run ok. Today I was trying to run the same application on Kafka
&
Hi Damien -
Just 1 question .. by "terminate the process" do you mean System.exit(..) ?
Because streams.close() will not terminate the process - right ?
regards.
On Tue, Jul 4, 2017 at 9:36 PM, Debasish Ghosh
wrote:
> Hi Damian -
>
> I also thought so .. yes, I wil
Thanks!
On Tue, Jul 4, 2017 at 10:28 PM, Damian Guy wrote:
> Yes, System.exit(..)
> streams.close(..) just attempts to stop any running stream threads.
>
> On Tue, 4 Jul 2017 at 17:49 Debasish Ghosh
> wrote:
>
>> Hi Damien -
>>
>> Just 1 question .. by
Connect APIs ?
My use case is as follows :-
I have a Kafka Streams app from which I plan to use a HDFS sink connector
to write to HDFS. And plan to use embedded Kafka Connect API.
regards.
--
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh
Twttr: @debasishg
Blog: http
t;
>
>
>
>
> On 7/12/17, 8:27 AM, "Debasish Ghosh" wrote:
>
> Hi -
>
> I would like to use the embedded API of Kafka Connect as per
> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=58851767.
> But cannot find enough
accordingly and (c) is this recommended ?
regards.
--
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh
Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg
opics (e.g.,
> by merging two topics into one, or vice versa by taking one topic and
> splitting it into more topics), then
> that would work fine.
>
> Eno
>
> > On 13 Jul 2017, at 23:49, Debasish Ghosh
> wrote:
> >
> > Hi -
> >
> > I have a ques
ld help if we
> know more about what you
> > are trying to do. For example, if behind the scenes you add or remove
> partitions that would not work
> > well with Kafka Streams. However, if you use the Kafka Streams itself to
> create new topics (e.g.,
> > by merging two t
afkaAvroSerializer.configure(KafkaAvroSerializer.java:49)
at
com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerializer.configure(SpecificAvroSerializer.scala:21)
at
com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerde.configure(SpecificAvroSerde.scala:18)
at org.apache.kafka.streams.StreamsConfig.valueSerde(StreamsConfig.java:591)
...
Tue, Jul 18, 2017 at 12:02 AM, Richard L. Burton III
wrote:
> For your first question, no you can use the avro API.
>
>
>
> On Mon, Jul 17, 2017 at 2:29 PM Debasish Ghosh
> wrote:
>
>> Hi -
>>
>> I am using Avro Serialization in a Kafka Streams applicat
parseHierarchical(URI.java:3106)
> ...
Why is the Kafka Connect program going in an infinite loop ? How can I
prevent it ?
I am using Confluent 3.2.2 for the schema registry, Avro serialization part
and Apache Kafka 0.10.2.1 for Kafka Streams client and the broker part.
Help ?
regards.
--
D
nless you have a single partition topic, you need to up this number for
>> better parallelism.
>>
>> HTH,
>> Abdoulaye
>>
>>
>> On Tue, Jul 18, 2017 at 11:12 AM, Debasish Ghosh <
>> ghosh.debas...@gmail.com> wrote:
>>
>>>
t; > Kafka Streams processor the order1 ProductReserved event to update
> > ProductsStore before the order2 OrderCreated is processed. Then in cases
> > ProductService will generate a ProductReserved for order2 incorrectly,
> > generating an inconsistency.
> >
> > IMPORTANT: You can find the detailed description, with code and the
> events
> > that are published and consumed in the previously referenced
> stackoverflow
> > question.
> >
> > After so much thinking and looking up online I haven't found a single
> place
> > where I can get a clear way to deal with Event Sourcing with Kafka+Kafka
> > Streams solving the problem of atomicity.
> >
> > I'd really appreciate if someone could propose a solution for this.
> >
> > Regards
> > Jose
> >
>
--
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh
Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg
istry-in-dc-os
on SoF says the same thing. Just wondering if there has been any progress
on this front ..
regards.
--
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh
Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg
hread is quite out of date, as it mentions Confluent
> Platform 2.0 (the latest version as of now is 3.2.2).
>
>
> On Mon, Jul 24, 2017 at 2:16 PM, Debasish Ghosh
> wrote:
>
>> Hi -
>>
>> Is it possible to run schema registry service on DC/OS ? I checked the
ma-registry/
> docs/intro.html#quickstart
>
>
> On Tue, Jul 25, 2017 at 2:09 AM, Debasish Ghosh
> wrote:
>
>> Thanks a lot .. I found it from the community supported packages on the
>> DC/OS UI. Installed it and it runs ok. One question - is there any CLI for
>
as not yet been implemented. It
> likely requires some additional design work as only a prototype API was
> proposed in the KIP describing the framework as a whole.
>
> -Ewen
>
> On Wed, Jul 12, 2017 at 10:45 AM, Debasish Ghosh > wrote:
>
>> Thanks .. I found th
} finally {
System.exit(-1)
}
})
Ideally the application should terminate and Mesos should have restarted
it. But I see that the application doesn't terminate though I have a
System.exit(-1) in the finally clause. Any idea what's happening or how can
estart the application
>> streams.setUncaughtExceptionHandler(new
>> Thread.UncaughtExceptionHandler() {
>> override def uncaughtException(t: Thread, e: Throwable): Unit = try
>> {
>> logger.error(s"Stream terminated because of uncaught excep
rify that it exists?
> Also, i'm assuming you are on 0.10.2.x?
>
> On Wed, 26 Jul 2017 at 10:54 Debasish Ghosh
> wrote:
>
> > Thanks Damien .. this worked. But now after the application restarts, I
> > see the following exception ..
> >
> > 09:41:26.51
at there is currently no leader for the partition, i.e.,
> leader -1. Also there are no replicas? Something up with your brokers?
>
> Thanks,
> Damian
>
> On Wed, 26 Jul 2017 at 12:34 Debasish Ghosh
> wrote:
>
>> Hi Damian -
>>
>> Yes, it exists .. It
Thanks!
On Thu, Jul 27, 2017 at 4:12 PM, Damian Guy wrote:
>
> On Wed, 26 Jul 2017 at 15:53 Debasish Ghosh
> wrote:
>
>> One of the brokers died. The good thing is that it's not a production
>> cluster, it's just a demo cluster. I have no replicas. But I
fc7306945cdd27bda3b94/repo/packages/C/confluent-schema-registry/5
> >
> > The stackoverflow thread is quite out of date, as it mentions Confluent
> > Platform 2.0 (the latest version as of now is 3.2.2).
> >
> >
> > On Mon, Jul 24, 2017 at 2:16 PM, Debasi
reams.processor.internals.StreamThread.run(StreamThread.java:361)
BTW the application runs fine when I have 2 instances running on the same
host (my laptop) or on 1 instance on the cluster. The problem surfaces only
when I start the second instance.
Any help / pointer / area to look into ?
--
Deb
hosts?
>
> The second stack trace is this bug:
> https://issues.apache.org/jira/browse/KAFKA-5556
>
> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh
> wrote:
>
> > Hi -
> >
> > In my Kafka Streams application, I have a state store resulting from a
> > stateful st
I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r
referring to. Just now I noticed that I may also need to set
REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1).
Anything else that I may be missing ?
regards.
On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh
be set to 2 (default is 1).
>>> Anything else that I may be missing ?
>>>
>>>
>>> regards.
>>>
>>> On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh <
>>> ghosh.debas...@gmail.com>
>>> wrote:
>>>
>>> > Hi Damien -
ng to work as both nodes will have the same
> setting for StreamsConfig.APPLICATION_SERVER_PORT, i.e, 0.0.0.0:7070
>
> On Fri, 28 Jul 2017 at 16:02 Debasish Ghosh
> wrote:
>
>> The log file is a huge one. I can send it to you though. Before that let
>> me confirm one point ..
metadata ?
- Should I check KafkaStreams.state() and only assume that I have got
the correct metadata when the state() call returns Running. If it
returns Rebalancing, then I should re-query. Is this correct approach ?
regards.
--
Debasish Ghosh
http://manning.com/ghosh2
http
the rebalancing is done or I exceed some max retry count, then I think
I should good.
Or am I missing something ?
regards.
On Tue, Aug 1, 2017 at 1:10 PM, Damian Guy wrote:
> Hi,
>
> On Tue, 1 Aug 2017 at 08:34 Debasish Ghosh
> wrote:
>
>> Hi -
>>
>> I have
ion
> if you wan't to know that the metadata needs refreshing,
>
> On Tue, 1 Aug 2017 at 13:25 Debasish Ghosh
> wrote:
>
>> Regarding the last point, do I need to set up the listener ?
>>
>> All I want is to do a query from the store. For that I need to
t-sourcet>
> But the artifacts referred to there can’t be found at Maven Central:
> https://mvnrepository.com/artifact/io.confluent.kafka <
> https://mvnrepository.com/artifact/io.confluent.kafka>
>
> Thanks in advance!
>
>
--
Debasish Ghosh
http://manning.com/ghos
fka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(Thr
protocol between the version you're writing with and the version of your
> cluster.
>
> Hopefully that gives you something to look at.
>
> On Thu, Aug 3, 2017 at 7:29 AM, Debasish Ghosh
> wrote:
>
> > I installed the Confluent Connect package on DC/OS. It started a w
oop cluster (in the hadoop cluster
> config and make sure the port is open, reachable, and listening). If its
> all running locally, make sure any port forwarding is setup correctly, if
> applicable.
>
> On Thu, Aug 3, 2017 at 9:16 AM, Debasish Ghosh
> wrote:
>
> > Thanks
ure out.
any suggestions as to what I should look for ?
regards.
--
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh
Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg
o 1.0.0 ?
regards.
--
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh
Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg
tore is of type
> `KeyValueStore`, however the outer store will be
> `KeyValueStore`.
> It should work fine.
>
> Thanks,
> Damian
>
> On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh
> wrote:
>
>> Hello -
>>
>> In my Kafka Streams 0.11 application I have
, KeyValueStore[Bytes,
> Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE)
>.withKeySerde(Serdes.String())
>
> Thanks,
> Damian
>
>
> On Wed, 15 Nov 2017 at 10:51 Debasish Ghosh
> wrote:
>
> > It's not working fine .. I get the following exception during run
Hello -
The above API gives me the range of values between fromKey and toKey for a
local state store.
Suppose I have an application running in distributed mode (multiple nodes
same application id). How does this API translate to multiple nodes ? I
know the basic implementation is for a local nod
/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala#L112-L113.
My question is how does this serialization work here ? I mean how does the
tuple get serialized with the default serializers ? And leftJoin only works
with default serializers ..
regards.
--
Debasish Ghosh
http
the input
> for the (stringSerde, stringSerde) specified on line 142:
>
> .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if
> (region == null) "UNKNOWN" else region, clicks))
>
> FYI
>
> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh >
>
m/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Consumed.java
> >
> > and correlate with the sample code.
> >
> > Thanks
> >
> > On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh <
> ghosh.debas...@gmail.com>
> > wrote:
>
> `#stream()`
>
> The default Serdes are used when reading/writing from/to a topic/store
> (including repartition or changelog) and if the operator does not
> overwrite the default Serdes via passed-in parameters.
>
>
> -Matthias
>
> On 2/10/18 10:34 PM, Debasish Ghosh wrote:
l` and set the Serde for each
> operator individually.
>
>
> -Matthias
>
> On 2/12/18 2:16 AM, Debasish Ghosh wrote:
> > Thanks a lot for the clear answer.
> >
> > One of the concerns that I have is that it's not always obvious when the
> > default serializers are
e for the KTable and for the resulting
> type. We don't need the Serde for the stream at this point as the value has
> already been deserialized.
>
> HTH,
> Damian
>
> On Tue, 13 Feb 2018 at 05:39 Debasish Ghosh
> wrote:
>
> > Regarding “has an according overload”
/ pointer will be appreciated ?
regards.
--
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh
Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg
.
On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh
wrote:
> Hello -
>
> I am a beginner in Kafka .. with my first Kafka streams application ..
>
> I have a streams application that reads from a topic, does some
> transformation on the data and writes to another topic. The record t
Java KafkaProducer does verify the
> timestamp locally before sending the message to the broker, thus on -1
> there will be the client side exception you did observe(
> 4. I assume that you do consumer different topic with different TS
> fields in you records.
>
> Also have a
g 0.10 bin/kafka-console-produer.sh
> and thus all works fine.
>
>
> - -Matthias
>
>
> On 10/28/16 11:00 PM, Debasish Ghosh wrote:
> > Hello Mathias -
> >
> > Thanks a lot for the response. I think what may be happening is a
> > version mismatch b
producer.sh does use 0.9 producer. Broker are
> > backward compatible, thus, a 0.9 producer can write to 0.10 broker
> > (and in this case record TS would be invalid). While I assume that
> > in you local environment you are using 0.10
> > bin/kafka-console-produer.sh an
Hello Mathias -
Regarding ..
In case you do have 0.10 brokers, it might however happen, that
bin/kafka-console-producer.sh
> does use 0.9 producer.
How can I check this ?
Thanks!
On Sat, Oct 29, 2016 at 12:23 PM, Debasish Ghosh
wrote:
> I agree .. the problem is DC/OS still ships the
rgument.
>
> You should see the classpath be printed out. Look for
> 'kafka-clients-XXX.jar' -- XXX will be the version number.
>
>
> - -Matthias
>
>
> On 10/29/16 12:11 AM, Debasish Ghosh wrote:
> > Hello Mathias -
> >
> > Regarding ..
> >
oducer behavior, a valid TS will be
> set for all (newly) written records. Keep in mind, that this no global
> but a per-topic broker setting.
>
> - -Matthias
>
>
> On 10/30/16 1:15 AM, Debasish Ghosh wrote:
> > I think I found out what happened .. I was installing Kafka
Which version of Scala are u using ?
On Sun, 9 Sep 2018 at 10:44 AM, Michael Eugene wrote:
> Hi,
>
> I am using kafak-sreams-scala
> https://github.com/lightbend/kafka-streams-scala, and I am trying to
> implement something very simple and I am getting a compilation error by the
> "aggregate"
ep 9, 2018, at 12:13 PM, Debasish Ghosh
> wrote:
> >
> > Which version of Scala are u using ?
> >
> >> On Sun, 9 Sep 2018 at 10:44 AM, Michael Eugene
> wrote:
> >>
> >> Hi,
> >>
> >> I am using kafak-sreams-scala
> >&g
66 matches
Mail list logo