Hi, I encountered an exception while using kafka stream, which I cannot find any open bug or any related documentations.
I am using Kafka stream 0.10.0.2, with 3 nodes kafka server running also 0.10.0.2. I am trying to process huge amount of data ~248000000 events. my input topic have 100 partitions. starting the stream application with only 1 thread against 100 partitions is very very slow consuming, which leads to few days of processing this amount of data. I am trying to increase the amount of thread running in the stream application to 16 threads, after initializing state store i have the following exception which leads to total crush of the application. instance spec (m4.xl): 16 giga RAM 4 cpu 100g 2017-02-26 12:56:13 INFO StreamThread:828 - stream-thread [StreamThread-1] Creating active task 1_3 with assigned partitions [[counter_reprocess-3]] 2017-02-26 12:56:14 WARN StreamThread:1184 - Could not create task 1_3. Will retry. org.apache.kafka.streams.errors.LockException: task [1_3] Failed to lock the state directory: /tmp/kafka-streams/counter_streamer_reprocess/1_3 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) These are the properties from within the stream application: streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, configuration.getGroupId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBrokerList()); streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, configuration.getZookeepr()); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); streamsConfiguration.put(StreamsConfig.POLL_MS_CONFIG, 50); streamsConfiguration.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 120000); streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, configuration.getStateDir()); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, Integer.valueOf(configuration.getNumOfThread())); // consumer config streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); streamsConfiguration.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); These are the state properties: StateStoreSupplier countStore = Stores.create("store") .withKeys(stringSerde) .withValues(countersSerde) .persistent() .build(); Can you please advice how to proceed from here? any suggestion? any other logs you might need? Thanks Dan Ofir