Hi Shekar, that warning is expected during rebalances and should generally resolve itself. How many threads/app instances are you running? It is impossible to tell what is happening with the full logs.
Thanks, Damian On Mon, 7 Aug 2017 at 22:46 Shekar Tippur <ctip...@gmail.com> wrote: > Damien, > > Thanks for pointing out the error. I had tried a different version of > initializing the store. > > Now that I am able to compile, I started to get the below error. I looked > up other suggestions for the same error and followed up to upgrade Kafka to > 0.11.0.0 version. I still get this error :/ > > [2017-08-07 14:40:41,264] WARN stream-thread > [streams-pipe-b67a7ffa-5535-4311-8886-ad6362617dc5-StreamThread-1] Could > not create task 0_0. Will retry: > (org.apache.kafka.streams.processor.internals.StreamThread) > > org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock > the state directory for task 0_0 > > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99) > > at > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:111) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > > at > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > > at > > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > > at > > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > > On Fri, Aug 4, 2017 at 4:16 PM, Shekar Tippur <ctip...@gmail.com> wrote: > > > Damian, > > > > I am getting a syntax error. I have responded on gist. > > Appreciate any inputs. > > > > - Shekar > > > > On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy <damian....@gmail.com> > wrote: > > > >> Hi, > >> > >> I left a comment on your gist. > >> > >> Thanks, > >> Damian > >> > >> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur <ctip...@gmail.com> wrote: > >> > >> > Damien, > >> > > >> > Here is a public gist: > >> > https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8 > >> > > >> > - Shekar > >> > > >> > On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <damian....@gmail.com> > >> wrote: > >> > > >> > > It might be easier if you make a github gist with your code. It is > >> quite > >> > > difficult to see what is happening in an email. > >> > > > >> > > Cheers, > >> > > Damian > >> > > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ctip...@gmail.com> > >> wrote: > >> > > > >> > > > Thanks a lot Damien. > >> > > > I am able to get to see if the join worked (using foreach). I > tried > >> to > >> > > add > >> > > > the logic to query the store after starting the streams: > >> > > > Looks like the code is not getting there. Here is the modified > code: > >> > > > > >> > > > KafkaStreams streams = new KafkaStreams(builder, props); > >> > > > > >> > > > streams.start(); > >> > > > > >> > > > > >> > > > parser.foreach(new ForeachAction<String, JsonNode>() { > >> > > > @Override > >> > > > public void apply(String key, JsonNode value) { > >> > > > System.out.println(key + ": " + value); > >> > > > if (value == null){ > >> > > > System.out.println("null match"); > >> > > > ReadOnlyKeyValueStore<String, Long> keyValueStore = > >> > > > null; > >> > > > try { > >> > > > keyValueStore = > >> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store", > >> > > > QueryableStoreTypes.keyValueStore(), streams); > >> > > > } catch (InterruptedException e) { > >> > > > e.printStackTrace(); > >> > > > } > >> > > > > >> > > > KeyValueIterator kviterator = > >> > > > keyValueStore.range("test_nod","test_node"); > >> > > > } > >> > > > } > >> > > > }); > >> > > > > >> > > > > >> > > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy < > damian....@gmail.com> > >> > > wrote: > >> > > > > >> > > > > Hi, > >> > > > > The store won't be queryable until after you have called > >> > > streams.start(). > >> > > > > No stores have been created until the application is up and > >> running > >> > and > >> > > > > they are dependent on the underlying partitions. > >> > > > > > >> > > > > To check that a stateful operation has produced a result you > would > >> > > > normally > >> > > > > add another operation after the join, i.e., > >> > > > > stream.join(other,...).foreach(..) or > stream.join(other,...).to(" > >> > > topic") > >> > > > > > >> > > > > Thanks, > >> > > > > Damian > >> > > > > > >> > > > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ctip...@gmail.com> > >> > wrote: > >> > > > > > >> > > > > > One more thing.. How do we check if the stateful join > operation > >> > > > resulted > >> > > > > in > >> > > > > > a kstream of some value in it (size of kstream)? How do we > check > >> > the > >> > > > > > content of a kstream? > >> > > > > > > >> > > > > > - S > >> > > > > > > >> > > > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur < > >> ctip...@gmail.com> > >> > > > > wrote: > >> > > > > > > >> > > > > > > Damien, > >> > > > > > > > >> > > > > > > Thanks a lot for pointing out. > >> > > > > > > > >> > > > > > > I got a little further. I am kind of stuck with the > >> sequencing. > >> > > > Couple > >> > > > > of > >> > > > > > > issues: > >> > > > > > > 1. I cannot initialise KafkaStreams before the parser.to(). > >> > > > > > > 2. Do I need to create a new KafkaStreams object when I > >> create a > >> > > > > > > KeyValueStore? > >> > > > > > > 3. How do I initialize KeyValueIterator with <String, > >> JsonNode> I > >> > > > seem > >> > > > > to > >> > > > > > > get a error when I try: > >> > > > > > > *KeyValueIterator <String,JsonNode> kviterator > >> > > > > > > = keyValueStore.range("test_nod","test_node");* > >> > > > > > > > >> > > > > > > /////// START CODE ///////// > >> > > > > > > //parser is a kstream as a result of join > >> > > > > > > if (parser.toString().matches("null")){ > >> > > > > > > > >> > > > > > > ReadOnlyKeyValueStore<String, Long> keyValueStore = > >> > > > > > > null; > >> > > > > > > KafkaStreams newstreams = new KafkaStreams(builder, > >> props); > >> > > > > > > try { > >> > > > > > > keyValueStore = > >> > > > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store", > >> > > > > > > QueryableStoreTypes.keyValueStore(), newstreams); > >> > > > > > > } catch (InterruptedException e) { > >> > > > > > > e.printStackTrace(); > >> > > > > > > } > >> > > > > > > * KeyValueIterator kviterator > >> > > > > > > = keyValueStore.range("test_nod","test_node");* > >> > > > > > > }else { > >> > > > > > > > >> > > > > > > * parser.to <http://parser.to>(stringSerde, jsonSerde, > >> > > > "parser");*} > >> > > > > > > > >> > > > > > > *KafkaStreams streams = new KafkaStreams(builder, props);* > >> > > > > > > streams.start(); > >> > > > > > > > >> > > > > > > /////// END CODE ///////// > >> > > > > > > > >> > > > > > > - S > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy < > >> > damian....@gmail.com > >> > > > > >> > > > > > wrote: > >> > > > > > > > > >> > > > > > > > It is part of the ReadOnlyKeyValueStore interface: > >> > > > > > > > > >> > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/ > >> > > > > > > main/java/org/apache/kafka/streams/state/ > >> > > ReadOnlyKeyValueStore.java > >> > > > > > > > > >> > > > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur < > >> ctip...@gmail.com> > >> > > > > wrote: > >> > > > > > > > > >> > > > > > > > > That's cool. This feature is a part of rocksdb object > and > >> not > >> > > > > ktable? > >> > > > > > > > > > >> > > > > > > > > Sent from my iPhone > >> > > > > > > > > > >> > > > > > > > > > On Jul 27, 2017, at 07:57, Damian Guy < > >> > damian....@gmail.com> > >> > > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > Yes they can be strings, > >> > > > > > > > > > > >> > > > > > > > > > so you could do something like: > >> > > > > > > > > > store.range("test_host", "test_hosu"); > >> > > > > > > > > > > >> > > > > > > > > > This would return an iterator containing all of the > >> values > >> > > > > > > (inclusive) > >> > > > > > > > > from > >> > > > > > > > > > "test_host" -> "test_hosu". > >> > > > > > > > > > > >> > > > > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur < > >> > > ctip...@gmail.com > >> > > > > > >> > > > > > > wrote: > >> > > > > > > > > >> > >> > > > > > > > > >> Can you please point me to an example? Can from and > to > >> be > >> > a > >> > > > > > string? > >> > > > > > > > > >> > >> > > > > > > > > >> Sent from my iPhone > >> > > > > > > > > >> > >> > > > > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy < > >> > > damian....@gmail.com> > >> > > > > > > wrote: > >> > > > > > > > > >>> > >> > > > > > > > > >>> Hi, > >> > > > > > > > > >>> > >> > > > > > > > > >>> You can't use a regex, but you could use a range > >> query. > >> > > > > > > > > >>> i.e, keyValueStore.range(from, to) > >> > > > > > > > > >>> > >> > > > > > > > > >>> Thanks, > >> > > > > > > > > >>> Damian > >> > > > > > > > > >>> > >> > > > > > > > > >>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur < > >> > > > ctip...@gmail.com > >> > > > > > > >> > > > > > > wrote: > >> > > > > > > > > >>>> > >> > > > > > > > > >>>> Hello, > >> > > > > > > > > >>>> > >> > > > > > > > > >>>> I am able to get the kstream to ktable join work. I > >> have > >> > > > some > >> > > > > > use > >> > > > > > > > > cases > >> > > > > > > > > >>>> where the key is not always a exact match. > >> > > > > > > > > >>>> I was wondering if there is a way to lookup keys > >> based > >> > on > >> > > > > regex. > >> > > > > > > > > >>>> > >> > > > > > > > > >>>> For example, > >> > > > > > > > > >>>> I have these entries for a ktable: > >> > > > > > > > > >>>> test_host1,{ "source": "test_host", "UL1": > >> "test1_l1" } > >> > > > > > > > > >>>> > >> > > > > > > > > >>>> test_host2,{ "source": "test_host2", "UL1": > >> "test2_l2" } > >> > > > > > > > > >>>> > >> > > > > > > > > >>>> test_host3,{ "source": "test_host3", "UL1": > >> "test3_l3" } > >> > > > > > > > > >>>> > >> > > > > > > > > >>>> blah,{ "source": "blah_host", "UL1": "blah_l3" } > >> > > > > > > > > >>>> > >> > > > > > > > > >>>> and this for a kstream: > >> > > > > > > > > >>>> > >> > > > > > > > > >>>> test_host,{ "source": "test_host", "custom": { > "test > >> ": > >> > { > >> > > > > > > > > >> "creation_time ": > >> > > > > > > > > >>>> "1234 " } } } > >> > > > > > > > > >>>> > >> > > > > > > > > >>>> In this case, if the exact match does not work, I > >> would > >> > > like > >> > > > > to > >> > > > > > > lookup > >> > > > > > > > > >>>> ktable for all entries that contains "test_host*" > in > >> it > >> > > and > >> > > > > have > >> > > > > > > > > >>>> application logic to determine what would be the > best > >> > fit. > >> > > > > > > > > >>>> > >> > > > > > > > > >>>> Appreciate input. > >> > > > > > > > > >>>> > >> > > > > > > > > >>>> - Shekar > >> > > > > > > > > >>>> > >> > > > > > > > > >> > >> > > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > > >