In case you use 0.10.0.2 please have a look into this FAQ https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Igetalockingexceptionsimilarto"Causedby:java.io.IOException:Failedtolockthestatedirectory:/tmp/kafka-streams/<app-id>/0_0".HowcanIresolvethis?
However, if possible I would recommend to upgrade to 0.10.2.0 -Matthias On 2/26/17 9:56 AM, Eno Thereska wrote: > Hi Dan, > > Just checking on the version: you are using 0.10.0.2 or 0.10.2.0 (i.e., > latest)? I ask because state locking was something that was fixed in 0.10.2. > > Thanks > Eno > >> On 26 Feb 2017, at 13:37, Dan Ofir <dan.o...@personali.com> wrote: >> >> Hi, >> >> I encountered an exception while using kafka stream, which I cannot find >> any open bug or any related documentations. >> >> I am using Kafka stream 0.10.0.2, with 3 nodes kafka server running also >> 0.10.0.2. >> >> I am trying to process huge amount of data ~248000000 events. >> my input topic have 100 partitions. >> starting the stream application with only 1 thread against 100 partitions >> is very very slow consuming, which leads to few days of processing this >> amount of data. >> I am trying to increase the amount of thread running in the stream >> application to 16 threads, after initializing state store i have the >> following exception which leads to total crush of the application. >> >> instance spec (m4.xl): >> 16 giga RAM >> 4 cpu >> 100g >> >> 2017-02-26 12:56:13 INFO StreamThread:828 - stream-thread [StreamThread-1] >> Creating active task 1_3 with assigned partitions [[counter_reprocess-3]] >> 2017-02-26 12:56:14 WARN StreamThread:1184 - Could not create task 1_3. >> Will retry. >> org.apache.kafka.streams.errors.LockException: task [1_3] Failed to lock >> the state directory: /tmp/kafka-streams/counter_streamer_reprocess/1_3 >> at >> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) >> at >> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) >> at >> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) >> at >> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) >> at >> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) >> at >> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) >> at >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) >> at >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) >> at >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) >> at >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) >> >> >> These are the properties from within the stream application: >> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, >> configuration.getGroupId); >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >> configuration.getBrokerList()); >> streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, >> configuration.getZookeepr()); >> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> Serdes.String().getClass().getName()); >> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> Serdes.String().getClass().getName()); >> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * >> 1000); >> streamsConfiguration.put(StreamsConfig.POLL_MS_CONFIG, 50); >> streamsConfiguration.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, >> 120000); >> streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, >> WallclockTimestampExtractor.class); >> streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, >> configuration.getStateDir()); >> streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, >> Integer.valueOf(configuration.getNumOfThread())); >> // consumer config >> streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >> "earliest"); >> streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); >> streamsConfiguration.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, >> 600000); >> >> These are the state properties: >> StateStoreSupplier countStore = Stores.create("store") >> .withKeys(stringSerde) >> .withValues(countersSerde) >> .persistent() >> .build(); >> >> >> Can you please advice how to proceed from here? any suggestion? any other >> logs you might need? >> >> Thanks >> Dan Ofir >
signature.asc
Description: OpenPGP digital signature