In case you use 0.10.0.2 please have a look into this FAQ

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Igetalockingexceptionsimilarto"Causedby:java.io.IOException:Failedtolockthestatedirectory:/tmp/kafka-streams/<app-id>/0_0".HowcanIresolvethis?

However, if possible I would recommend to upgrade to 0.10.2.0


-Matthias


On 2/26/17 9:56 AM, Eno Thereska wrote:
> Hi Dan,
> 
> Just checking on the version: you are using 0.10.0.2 or 0.10.2.0 (i.e., 
> latest)? I ask because state locking was something that was fixed in 0.10.2.
> 
> Thanks
> Eno
> 
>> On 26 Feb 2017, at 13:37, Dan Ofir <dan.o...@personali.com> wrote:
>>
>> Hi,
>>
>> I encountered an exception while using kafka stream, which I cannot find
>> any open bug or any related documentations.
>>
>> I am using Kafka stream 0.10.0.2, with 3 nodes kafka server running also
>> 0.10.0.2.
>>
>> I am trying to process huge amount of data ~248000000 events.
>> my input topic have 100 partitions.
>> starting the stream application with only 1 thread against 100 partitions
>> is very very slow consuming, which leads to few days of processing this
>> amount of data.
>> I am trying to increase the amount of thread running in the stream
>> application to 16 threads, after initializing state store i have the
>> following exception which leads to total crush of the application.
>>
>> instance spec (m4.xl):
>> 16 giga RAM
>> 4 cpu
>> 100g
>>
>> 2017-02-26 12:56:13 INFO  StreamThread:828 - stream-thread [StreamThread-1]
>> Creating active task 1_3 with assigned partitions [[counter_reprocess-3]]
>> 2017-02-26 12:56:14 WARN  StreamThread:1184 - Could not create task 1_3.
>> Will retry.
>> org.apache.kafka.streams.errors.LockException: task [1_3] Failed to lock
>> the state directory: /tmp/kafka-streams/counter_streamer_reprocess/1_3
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
>> at
>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>>
>>
>> These are the properties from within the stream application:
>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> configuration.getGroupId);
>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>> configuration.getBrokerList());
>> streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>> configuration.getZookeepr());
>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> Serdes.String().getClass().getName());
>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 *
>> 1000);
>> streamsConfiguration.put(StreamsConfig.POLL_MS_CONFIG, 50);
>> streamsConfiguration.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG,
>> 120000);
>> streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
>> WallclockTimestampExtractor.class);
>> streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
>> configuration.getStateDir());
>> streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
>> Integer.valueOf(configuration.getNumOfThread()));
>> // consumer config
>> streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>> "earliest");
>> streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
>> streamsConfiguration.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
>> 600000);
>>
>> These are the state properties:
>> StateStoreSupplier countStore = Stores.create("store")
>>                .withKeys(stringSerde)
>>                .withValues(countersSerde)
>>                .persistent()
>>                .build();
>>
>>
>> Can you please advice how to proceed from here? any suggestion? any other
>> logs you might need?
>>
>> Thanks
>> Dan Ofir
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to