Hi Shekar, that warning is expected during rebalances and should generally
resolve itself.
How many threads/app instances are you running?
It is impossible to tell what is happening with the full logs.

Thanks,
Damian

On Mon, 7 Aug 2017 at 22:46 Shekar Tippur <ctip...@gmail.com> wrote:

> Damien,
>
> Thanks for pointing out the error. I had tried a different version of
> initializing the store.
>
> Now that I am able to compile, I started to get the below error. I looked
> up other suggestions for the same error and followed up to upgrade Kafka to
> 0.11.0.0 version. I still get this error :/
>
> [2017-08-07 14:40:41,264] WARN stream-thread
> [streams-pipe-b67a7ffa-5535-4311-8886-ad6362617dc5-StreamThread-1] Could
> not create task 0_0. Will retry:
> (org.apache.kafka.streams.processor.internals.StreamThread)
>
> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock
> the state directory for task 0_0
>
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99)
>
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:111)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>
> On Fri, Aug 4, 2017 at 4:16 PM, Shekar Tippur <ctip...@gmail.com> wrote:
>
> > Damian,
> >
> > I am getting a syntax error. I have responded on gist.
> > Appreciate any inputs.
> >
> > - Shekar
> >
> > On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy <damian....@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >> I left a comment on your gist.
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur <ctip...@gmail.com> wrote:
> >>
> >> > Damien,
> >> >
> >> > Here is a public gist:
> >> > https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
> >> >
> >> > - Shekar
> >> >
> >> > On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <damian....@gmail.com>
> >> wrote:
> >> >
> >> > > It might be easier if you make a github gist with your code. It is
> >> quite
> >> > > difficult to see what is happening in an email.
> >> > >
> >> > > Cheers,
> >> > > Damian
> >> > > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ctip...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Thanks a lot Damien.
> >> > > > I am able to get to see if the join worked (using foreach). I
> tried
> >> to
> >> > > add
> >> > > > the logic to query the store after starting the streams:
> >> > > > Looks like the code is not getting there. Here is the modified
> code:
> >> > > >
> >> > > > KafkaStreams streams = new KafkaStreams(builder, props);
> >> > > >
> >> > > > streams.start();
> >> > > >
> >> > > >
> >> > > > parser.foreach(new ForeachAction<String, JsonNode>() {
> >> > > >     @Override
> >> > > >     public void apply(String key, JsonNode value) {
> >> > > >         System.out.println(key + ": " + value);
> >> > > >         if (value == null){
> >> > > >             System.out.println("null match");
> >> > > >             ReadOnlyKeyValueStore<String, Long> keyValueStore =
> >> > > >                     null;
> >> > > >             try {
> >> > > >                 keyValueStore =
> >> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> >> > > > QueryableStoreTypes.keyValueStore(), streams);
> >> > > >             } catch (InterruptedException e) {
> >> > > >                 e.printStackTrace();
> >> > > >             }
> >> > > >
> >> > > >             KeyValueIterator  kviterator =
> >> > > > keyValueStore.range("test_nod","test_node");
> >> > > >         }
> >> > > >     }
> >> > > > });
> >> > > >
> >> > > >
> >> > > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <
> damian....@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Hi,
> >> > > > > The store won't be queryable until after you have called
> >> > > streams.start().
> >> > > > > No stores have been created until the application is up and
> >> running
> >> > and
> >> > > > > they are dependent on the underlying partitions.
> >> > > > >
> >> > > > > To check that a stateful operation has produced a result you
> would
> >> > > > normally
> >> > > > > add another operation after the join, i.e.,
> >> > > > > stream.join(other,...).foreach(..) or
> stream.join(other,...).to("
> >> > > topic")
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Damian
> >> > > > >
> >> > > > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ctip...@gmail.com>
> >> > wrote:
> >> > > > >
> >> > > > > > One more thing.. How do we check if the stateful join
> operation
> >> > > > resulted
> >> > > > > in
> >> > > > > > a kstream of some value in it (size of kstream)? How do we
> check
> >> > the
> >> > > > > > content of a kstream?
> >> > > > > >
> >> > > > > > - S
> >> > > > > >
> >> > > > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <
> >> ctip...@gmail.com>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Damien,
> >> > > > > > >
> >> > > > > > > Thanks a lot for pointing out.
> >> > > > > > >
> >> > > > > > > I got a little further. I am kind of stuck with the
> >> sequencing.
> >> > > > Couple
> >> > > > > of
> >> > > > > > > issues:
> >> > > > > > > 1. I cannot initialise KafkaStreams before the parser.to().
> >> > > > > > > 2. Do I need to create a new KafkaStreams object when I
> >> create a
> >> > > > > > > KeyValueStore?
> >> > > > > > > 3. How do I initialize KeyValueIterator with <String,
> >> JsonNode> I
> >> > > > seem
> >> > > > > to
> >> > > > > > > get a error when I try:
> >> > > > > > > *KeyValueIterator <String,JsonNode> kviterator
> >> > > > > > > = keyValueStore.range("test_nod","test_node");*
> >> > > > > > >
> >> > > > > > > /////// START CODE /////////
> >> > > > > > > //parser is a kstream as a result of join
> >> > > > > > > if (parser.toString().matches("null")){
> >> > > > > > >
> >> > > > > > >     ReadOnlyKeyValueStore<String, Long> keyValueStore =
> >> > > > > > >             null;
> >> > > > > > >     KafkaStreams newstreams = new KafkaStreams(builder,
> >> props);
> >> > > > > > >     try {
> >> > > > > > >         keyValueStore =
> >> > > > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> >> > > > > > > QueryableStoreTypes.keyValueStore(), newstreams);
> >> > > > > > >     } catch (InterruptedException e) {
> >> > > > > > >         e.printStackTrace();
> >> > > > > > >     }
> >> > > > > > > *    KeyValueIterator kviterator
> >> > > > > > > = keyValueStore.range("test_nod","test_node");*
> >> > > > > > > }else {
> >> > > > > > >
> >> > > > > > > *    parser.to <http://parser.to>(stringSerde, jsonSerde,
> >> > > > "parser");*}
> >> > > > > > >
> >> > > > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> >> > > > > > > streams.start();
> >> > > > > > >
> >> > > > > > > /////// END CODE /////////
> >> > > > > > >
> >> > > > > > > - S
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
> >> > damian....@gmail.com
> >> > > >
> >> > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > It is part of the ReadOnlyKeyValueStore interface:
> >> > > > > > > >
> >> > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> >> > > > > > > main/java/org/apache/kafka/streams/state/
> >> > > ReadOnlyKeyValueStore.java
> >> > > > > > > >
> >> > > > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <
> >> ctip...@gmail.com>
> >> > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > That's cool. This feature is a part of rocksdb object
> and
> >> not
> >> > > > > ktable?
> >> > > > > > > > >
> >> > > > > > > > > Sent from my iPhone
> >> > > > > > > > >
> >> > > > > > > > > > On Jul 27, 2017, at 07:57, Damian Guy <
> >> > damian....@gmail.com>
> >> > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > Yes they can be strings,
> >> > > > > > > > > >
> >> > > > > > > > > > so you could do something like:
> >> > > > > > > > > > store.range("test_host", "test_hosu");
> >> > > > > > > > > >
> >> > > > > > > > > > This would return an iterator containing all of the
> >> values
> >> > > > > > > (inclusive)
> >> > > > > > > > > from
> >> > > > > > > > > > "test_host" -> "test_hosu".
> >> > > > > > > > > >
> >> > > > > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <
> >> > > ctip...@gmail.com
> >> > > > >
> >> > > > > > > wrote:
> >> > > > > > > > > >>
> >> > > > > > > > > >> Can you please point me to an example? Can from and
> to
> >> be
> >> > a
> >> > > > > > string?
> >> > > > > > > > > >>
> >> > > > > > > > > >> Sent from my iPhone
> >> > > > > > > > > >>
> >> > > > > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy <
> >> > > damian....@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > > > > >>>
> >> > > > > > > > > >>> Hi,
> >> > > > > > > > > >>>
> >> > > > > > > > > >>> You can't use a regex, but you could use a range
> >> query.
> >> > > > > > > > > >>> i.e, keyValueStore.range(from, to)
> >> > > > > > > > > >>>
> >> > > > > > > > > >>> Thanks,
> >> > > > > > > > > >>> Damian
> >> > > > > > > > > >>>
> >> > > > > > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
> >> > > > ctip...@gmail.com
> >> > > > > >
> >> > > > > > > wrote:
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> Hello,
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> I am able to get the kstream to ktable join work. I
> >> have
> >> > > > some
> >> > > > > > use
> >> > > > > > > > > cases
> >> > > > > > > > > >>>> where the key is not always a exact match.
> >> > > > > > > > > >>>> I was wondering if there is a way to lookup keys
> >> based
> >> > on
> >> > > > > regex.
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> For example,
> >> > > > > > > > > >>>> I have these entries for a ktable:
> >> > > > > > > > > >>>> test_host1,{ "source": "test_host", "UL1":
> >> "test1_l1" }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> test_host2,{ "source": "test_host2", "UL1":
> >> "test2_l2" }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> test_host3,{ "source": "test_host3", "UL1":
> >> "test3_l3" }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> and this for a kstream:
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> test_host,{ "source": "test_host", "custom": {
> "test
> >> ":
> >> > {
> >> > > > > > > > > >> "creation_time ":
> >> > > > > > > > > >>>> "1234 " } } }
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> In this case, if the exact match does not work, I
> >> would
> >> > > like
> >> > > > > to
> >> > > > > > > lookup
> >> > > > > > > > > >>>> ktable for all entries that contains "test_host*"
> in
> >> it
> >> > > and
> >> > > > > have
> >> > > > > > > > > >>>> application logic to determine what would be the
> best
> >> > fit.
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> Appreciate input.
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>>> - Shekar
> >> > > > > > > > > >>>>
> >> > > > > > > > > >>
> >> > > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to