Hi, I’ve got a stream processor with 17 input topics (each with 1 partition) that is experiencing a failure where one of its input topics effectively gets wedged but the other topics continue to receive messages. When this condition occurs, network traffic between the YARN node running my container and my Kafka node shoots up to consume almost all bandwidth across the link (600-700 Mbps). Looking at the metrics, it looks like the KafkaSystemConsumer recognizes that new message are being added to the stalled input topic, but the task metrics show that no messages for that task are being processed. Indeed, when I run the stream processor with debug logging I can see that I get traffic on all other input topics except for the stalled topic.
Has anyone seen this before? Here are the full set of metrics I’m looking at: http://pastebin.com/ppTxHHz0 <http://pastebin.com/ppTxHHz0> Here’s a reduced set of metrics: The minute before the topic gets stuck: 'org.apache.samza.system.SystemConsumersMetrics': {'chose-null': 4252951, 'chose-object': 3926848, 'deserialization error': 0, 'kafka-topic1-messages-chosen': 0, 'kafka-topic2-messages-chosen': 11574, 'kafka-topic3-messages-chosen': 0, 'kafka-topic4-messages-chosen': 92927, 'kafka-topic5-messages-chosen': 11499, 'kafka-topic6-messages-chosen': 345096, 'kafka-topic7-messages-chosen': 24032, 'kafka-topic8-messages-chosen': 1961, 'kafka-topic9-messages-chosen': 30024, 'kafka-topic10-messages-chosen': 910, 'kafka-topic11-messages-chosen': 3281, 'kafka-topic12-messages-chosen': 0, 'kafka-STUCK_TOPIC-messages-chosen': 3405298, 'kafka-topic13-messages-chosen': 6, 'kafka-topic14-messages-chosen': 0, 'kafka-topic15-messages-chosen': 87, 'kafka-topic16-messages-chosen': 153, 'kafka-messages-per-poll': 4253714, 'kafka-polls': 4253715, 'kafka-ssp-fetches-per-poll': 72312241, 'poll-timeout': 10, 'ssps-needed-by-chooser': 1, 'unprocessed-messages': 0}, … 'kafka-STUCK_TOPIC-0-bytes-read': 1602713815, 'kafka-STUCK_TOPIC-0-high-watermark': 459953744, 'kafka-STUCK_TOPIC-0-messages-behind-high-watermark': 0, 'kafka-STUCK_TOPIC-0-messages-read': 3405298, ‘kafka-STUCK_TOPIC-0-offset-change': 459953744, … ‘no-more-messages-SystemStreamPartition [kafka, STUCK_TOPIC, 0]': True, 'metrics': {'org.apache.samza.container.TaskInstanceMetrics': {'commit-calls': 726, 'flush-calls': 726, 'kafka-topic1-0-offset': None, 'kafka-topic2-0-offset': '926407', 'kafka-topic3-0-offset': None, 'kafka-topic4-0-offset': '10560631', 'kafka-topic5-0-offset': '918713', 'kafka-topic6-0-offset': '27402631', 'kafka-topic7-0-offset': '1791853', 'kafka-topic8-0-offset': '1316010', 'kafka-topic9-0-offset': '2466218', 'kafka-topic10-0-offset': '62529', 'kafka-topic11-0-offset': '718910', 'kafka-topic12-0-offset': None, 'kafka-STUCK_TOPIC-0-offset': '459953743', 'kafka-topic13-0-offset': '1020', 'kafka-topic14-0-offset': None, 'kafka-topic15-0-offset': '7385', 'kafka-topic16-0-offset': '21101', 'messages-sent': 3789270, 'process-calls': 3926848, 'send-calls': 3789270, ‘window-calls': 4349}}}, Here’s the first minute of the failure: 'org.apache.samza.system.SystemConsumersMetrics': {'chose-null': 4258829, 'chose-object': 3931043, 'deserialization error': 0, 'kafka-topic1-messages-chosen': 0, 'kafka-topic2-messages-chosen': 11592, 'kafka-topic3-messages-chosen': 0, 'kafka-topic4-messages-chosen': 92981, 'kafka-topic5-messages-chosen': 11516, 'kafka-topic6-messages-chosen': 345588, 'kafka-topic7-messages-chosen': 24065, 'kafka-topic8-messages-chosen': 1963, 'kafka-topic9-messages-chosen': 30068, 'kafka-topic10-messages-chosen': 910, 'kafka-topic11-messages-chosen': 3282, 'kafka-topic12-messages-chosen': 0, 'kafka-STUCK_TOPIC-messages-chosen': 3408832, 'kafka-topic13-messages-chosen': 6, 'kafka-topic14-messages-chosen': 0, 'kafka-topic15-messages-chosen': 87, 'kafka-topic16-messages-chosen': 153, 'kafka-messages-per-poll': 4259592, 'kafka-polls': 4259593, 'kafka-ssp-fetches-per-poll': 72412167, 'poll-timeout': 10, 'ssps-needed-by-chooser': 1, ‘unprocessed-messages': 0}, 'kafka-STUCK_TOPIC-0-bytes-read': 1604389612, 'kafka-STUCK_TOPIC-0-high-watermark': 459958997, 'kafka-STUCK_TOPIC-0-messages-behind-high-watermark': 1719, 'kafka-STUCK_TOPIC-0-messages-read': 3408832, ‘kafka-STUCK_TOPIC-0-offset-change': 459957278, ‘no-more-messages-SystemStreamPartition [kafka, STUCK_TOPIC, 0]': False, 'metrics': {'org.apache.samza.container.TaskInstanceMetrics': {'commit-calls': 727, 'flush-calls': 727, 'kafka-topic1-0-offset': None, 'kafka-topic2-0-offset': '926425', 'kafka-topic3-0-offset': None, 'kafka-topic4-0-offset': '10560685', 'kafka-topic5-0-offset': '918730', 'kafka-topic6-0-offset': '27403123', 'kafka-topic7-0-offset': '1791886', 'kafka-topic8-0-offset': '1316012', 'kafka-topic9-0-offset': '2466262', 'kafka-topic10-0-offset': '62529', 'kafka-topic11-0-offset': '718911', 'kafka-topic12-0-offset': None, 'kafka-STUCK_TOPIC-0-offset': '459957277', 'kafka-topic13-0-offset': '1020', 'kafka-topic14-0-offset': None, 'kafka-topic15-0-offset': '7385', 'kafka-topic16-0-offset': '21101', 'messages-sent': 3793284, 'process-calls': 3931043, 'send-calls': 3793284, ‘window-calls': 4355}}}, Finally, the minute after the topic was stalled: 'org.apache.samza.system.SystemConsumersMetrics': {'chose-null': 4264724, 'chose-object': 3931759, 'deserialization error': 0, 'kafka-topic1-messages-chosen': 0, 'kafka-topic2-messages-chosen': 11610, 'kafka-topic3-messages-chosen': 0, 'kafka-topic4-messages-chosen': 93073, 'kafka-topic5-messages-chosen': 11533, 'kafka-topic6-messages-chosen': 346089, 'kafka-topic7-messages-chosen': 24099, 'kafka-topic8-messages-chosen': 1967, 'kafka-topic9-messages-chosen': 30112, 'kafka-topic10-messages-chosen': 911, 'kafka-topic11-messages-chosen': 3287, 'kafka-topic12-messages-chosen': 0, 'kafka-STUCK_TOPIC-messages-chosen': 3408832, 'kafka-topic13-messages-chosen': 6, 'kafka-topic14-messages-chosen': 0, 'kafka-topic15-messages-chosen': 87, 'kafka-topic16-messages-chosen': 153, 'kafka-messages-per-poll': 4265487, 'kafka-polls': 4265488, 'kafka-ssp-fetches-per-poll': 72512382, 'poll-timeout': 10, 'ssps-needed-by-chooser': 1, ‘unprocessed-messages': 0}, 'kafka-STUCK_TOPIC-0-bytes-read': 1604389612, 'kafka-STUCK_TOPIC-0-high-watermark': 459963957, 'kafka-STUCK_TOPIC-0-messages-behind-high-watermark': 6679, 'kafka-STUCK_TOPIC-0-messages-read': 3408832, ‘kafka-STUCK_TOPIC-0-offset-change': 459957278, ‘no-more-messages-SystemStreamPartition [kafka, STUCK_TOPIC, 0]': False, 'metrics': {'org.apache.samza.container.TaskInstanceMetrics': {'commit-calls': 728, 'flush-calls': 728, 'kafka-topic1-0-offset': None, 'kafka-topic2-0-offset': '926443', 'kafka-topic3-0-offset': None, 'kafka-topic4-0-offset': '10560777', 'kafka-topic5-0-offset': '918747', 'kafka-topic6-0-offset': '27403624', 'kafka-topic7-0-offset': '1791920', 'kafka-topic8-0-offset': '1316016', 'kafka-topic9-0-offset': '2466306', 'kafka-topic10-0-offset': '62530', 'kafka-topic11-0-offset': '718916', 'kafka-topic12-0-offset': None, 'kafka-STUCK_TOPIC-0-offset': '459957277', 'kafka-topic13-0-offset': '1020', 'kafka-topic14-0-offset': None, 'kafka-topic15-0-offset': '7385', 'kafka-topic16-0-offset': '21101', 'messages-sent': 3793847, 'process-calls': 3931759, 'send-calls': 3793847, ‘window-calls': 4361}}}, From all of this, it seems like the KafkaSystemConsumer recognizes that new messages are entering the topic, but no messages are being chosen for some reason, and so the task instance is not processing or updating its offsets. Any help would be much appreciated! —T
smime.p7s
Description: S/MIME cryptographic signature