Hi Ian,

thanks for reporting this. I had a look at the stack trace and code and
the whole situation is quite confusing. The exception itself is expected
but we have a try-catch-block that should swallow the exception and it
should never bubble up:

In
  AbstractTaskCreator.retryWithBackoff

a call to
  TaskCreator.createTask

is done (cf your stack trace). This call is guarded against a
LockExcption (cf StreamThread.java code):

> try {
>     createTask(taskId, partitions);
>     it.remove();
> } catch (final LockException e) {
>     // ignore and retry
>     log.warn("Could not create task {}. Will retry.", taskId, e);
> }


Can you verify, that you loaded the correct jar file when running the
test? Ie, not caching issue loading old code etc.

Another theory is about class loading. Do you use custom class loaders?

One more thing you can try out is to delete the local app state
directory. This will give you a clean restart -- on the cost of state
recreation (for the first start only). Afterward stop and restart you
app to see if the issue is resolved.


Right now, I cannot reproduce the problem.


-Matthias



On 2/10/17 9:47 AM, Ian Duffy wrote:
> Seeing the following failure when using multi-threaded streams
> 
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]:
> org.apache.kafka.streams.errors.LockException: task [0_21] Failed to lock
> the state directory: /tmp/kafka-streams/text_pipeline_id/0_21
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
> Feb 10 17:21:15 ip-172-31-137-57 docker/43e65fe123cd[826]: #011at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> 
> On restarting the process it continues to just hang on attempting to rejoin
> the group id.
> Seen the same issue on 10.1 but thought it was due to be fixed for the 10.2
> release.
> 
> Thanks,
> Ian.
> 
> On 10 February 2017 at 16:51, Ewen Cheslack-Postava <e...@confluent.io>
> wrote:
> 
>> Hello Kafka users, developers and client-developers,
>>
>> This is RC1 for release of Apache Kafka 0.10.2.0.
>>
>> This is a minor version release of Apache Kafka. It includes 19 new KIPs.
>> See the release notes and release plan (https://cwiki.apache.org/
>> confluence/display/KAFKA/Release+Plan+0.10.2.0) for more details. A few
>> feature highlights: SASL-SCRAM support, improved client compatibility to
>> allow use of clients newer than the broker, session windows and global
>> tables in the Kafka Streams API, single message transforms in the Kafka
>> Connect framework.
>>
>> Important note: in addition to the artifacts generated using JDK7 for Scala
>> 2.10 and 2.11, this release also includes experimental artifacts built
>> using JDK8 for Scala 2.12.
>>
>> Important code changes since RC0 (non-docs, non system tests):
>>
>> * KAFKA-4728; KafkaConsumer#commitSync should copy its input
>> * KAFKA-4441; Monitoring incorrect during topic creation and deletion
>> * KAFKA-4734; Trim the time index on old segments
>> * KAFKA-4725; Stop leaking messages in produce request body when requests
>> are delayed
>> * KAFKA-4716: Fix case when controller cannot be reached
>>
>> Release notes for the 0.10.2.0 release:
>> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/RELEASE_NOTES.html
>>
>> *** Please download, test and vote by Monday, Feb 13, 5pm PT ***
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> http://kafka.apache.org/KEYS
>>
>> * Release artifacts to be voted upon (source and binary):
>> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/
>>
>> * Maven artifacts to be voted upon:
>> https://repository.apache.org/content/groups/staging/
>>
>> * Javadoc:
>> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/javadoc/
>>
>> * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.0 tag:
>> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
>> e825b7994bf8c8c4871d1e0973e287e6d31c7ae4
>>
>>
>> * Documentation:
>> http://kafka.apache.org/0102/documentation.html
>>
>> * Protocol:
>> http://kafka.apache.org/0102/protocol.html
>>
>> * Successful Jenkins builds for the 0.10.2 branch:
>> Unit/integration tests: https://builds.apache.org/job/
>> kafka-0.10.2-jdk7/74/
>> System tests: https://jenkins.confluent.io/job/system-test-kafka-0.10.2/
>> 25/
>>
>> /**************************************
>>
>> Thanks,
>> Ewen
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to