Hi, I’ve got a stream processor with 17 input topics (each with 1 partition) 
that is experiencing a failure where one of its input topics effectively gets 
wedged but the other topics continue to receive messages.  When this condition 
occurs, network traffic between the YARN node running my container and my Kafka 
node shoots up to consume almost all bandwidth across the link (600-700 Mbps).  
Looking at the metrics, it looks like the KafkaSystemConsumer recognizes that 
new message are being added to the stalled input topic, but the task metrics 
show that no messages for that task are being processed.  Indeed, when I run 
the stream processor with debug logging I can see that I get traffic on all 
other input topics except for the stalled topic.

Has anyone seen this before?  Here are the full set of metrics I’m looking at: 
http://pastebin.com/ppTxHHz0 <http://pastebin.com/ppTxHHz0>

Here’s a reduced set of metrics:

The minute before the topic gets stuck:

 'org.apache.samza.system.SystemConsumersMetrics': {'chose-null': 4252951,
   'chose-object': 3926848,
   'deserialization error': 0,
   'kafka-topic1-messages-chosen': 0,
   'kafka-topic2-messages-chosen': 11574,
   'kafka-topic3-messages-chosen': 0,
   'kafka-topic4-messages-chosen': 92927,
   'kafka-topic5-messages-chosen': 11499,
   'kafka-topic6-messages-chosen': 345096,
   'kafka-topic7-messages-chosen': 24032,
   'kafka-topic8-messages-chosen': 1961,
   'kafka-topic9-messages-chosen': 30024,
   'kafka-topic10-messages-chosen': 910,
   'kafka-topic11-messages-chosen': 3281,
   'kafka-topic12-messages-chosen': 0,
   'kafka-STUCK_TOPIC-messages-chosen': 3405298,
   'kafka-topic13-messages-chosen': 6,
   'kafka-topic14-messages-chosen': 0,
   'kafka-topic15-messages-chosen': 87,
   'kafka-topic16-messages-chosen': 153,
   'kafka-messages-per-poll': 4253714,
   'kafka-polls': 4253715,
   'kafka-ssp-fetches-per-poll': 72312241,
   'poll-timeout': 10,
   'ssps-needed-by-chooser': 1,
   'unprocessed-messages': 0},

…
   'kafka-STUCK_TOPIC-0-bytes-read': 1602713815,
   'kafka-STUCK_TOPIC-0-high-watermark': 459953744,
   'kafka-STUCK_TOPIC-0-messages-behind-high-watermark': 0,
   'kafka-STUCK_TOPIC-0-messages-read': 3405298,
   ‘kafka-STUCK_TOPIC-0-offset-change': 459953744,
…

   ‘no-more-messages-SystemStreamPartition [kafka, STUCK_TOPIC, 0]': True,

 'metrics': {'org.apache.samza.container.TaskInstanceMetrics': {'commit-calls': 
726,
   'flush-calls': 726,
   'kafka-topic1-0-offset': None,
   'kafka-topic2-0-offset': '926407',
   'kafka-topic3-0-offset': None,
   'kafka-topic4-0-offset': '10560631',
   'kafka-topic5-0-offset': '918713',
   'kafka-topic6-0-offset': '27402631',
   'kafka-topic7-0-offset': '1791853',
   'kafka-topic8-0-offset': '1316010',
   'kafka-topic9-0-offset': '2466218',
   'kafka-topic10-0-offset': '62529',
   'kafka-topic11-0-offset': '718910',
   'kafka-topic12-0-offset': None,
   'kafka-STUCK_TOPIC-0-offset': '459953743',
   'kafka-topic13-0-offset': '1020',
   'kafka-topic14-0-offset': None,
   'kafka-topic15-0-offset': '7385',
   'kafka-topic16-0-offset': '21101',
   'messages-sent': 3789270,
   'process-calls': 3926848,
   'send-calls': 3789270,
   ‘window-calls': 4349}}},

Here’s the first minute of the failure:


  'org.apache.samza.system.SystemConsumersMetrics': {'chose-null': 4258829,
   'chose-object': 3931043,
   'deserialization error': 0,
   'kafka-topic1-messages-chosen': 0,
   'kafka-topic2-messages-chosen': 11592,
   'kafka-topic3-messages-chosen': 0,
   'kafka-topic4-messages-chosen': 92981,
   'kafka-topic5-messages-chosen': 11516,
   'kafka-topic6-messages-chosen': 345588,
   'kafka-topic7-messages-chosen': 24065,
   'kafka-topic8-messages-chosen': 1963,
   'kafka-topic9-messages-chosen': 30068,
   'kafka-topic10-messages-chosen': 910,
   'kafka-topic11-messages-chosen': 3282,
   'kafka-topic12-messages-chosen': 0,
   'kafka-STUCK_TOPIC-messages-chosen': 3408832,
   'kafka-topic13-messages-chosen': 6,
   'kafka-topic14-messages-chosen': 0,
   'kafka-topic15-messages-chosen': 87,
   'kafka-topic16-messages-chosen': 153,
   'kafka-messages-per-poll': 4259592,
   'kafka-polls': 4259593,
   'kafka-ssp-fetches-per-poll': 72412167,
   'poll-timeout': 10,
   'ssps-needed-by-chooser': 1,
   ‘unprocessed-messages': 0},


   'kafka-STUCK_TOPIC-0-bytes-read': 1604389612,
   'kafka-STUCK_TOPIC-0-high-watermark': 459958997,
   'kafka-STUCK_TOPIC-0-messages-behind-high-watermark': 1719,
   'kafka-STUCK_TOPIC-0-messages-read': 3408832,
   ‘kafka-STUCK_TOPIC-0-offset-change': 459957278,


   ‘no-more-messages-SystemStreamPartition [kafka, STUCK_TOPIC, 0]': False,

 'metrics': {'org.apache.samza.container.TaskInstanceMetrics': {'commit-calls': 
727,
   'flush-calls': 727,
   'kafka-topic1-0-offset': None,
   'kafka-topic2-0-offset': '926425',
   'kafka-topic3-0-offset': None,
   'kafka-topic4-0-offset': '10560685',
   'kafka-topic5-0-offset': '918730',
   'kafka-topic6-0-offset': '27403123',
   'kafka-topic7-0-offset': '1791886',
   'kafka-topic8-0-offset': '1316012',
   'kafka-topic9-0-offset': '2466262',
   'kafka-topic10-0-offset': '62529',
   'kafka-topic11-0-offset': '718911',
   'kafka-topic12-0-offset': None,
   'kafka-STUCK_TOPIC-0-offset': '459957277',
   'kafka-topic13-0-offset': '1020',
   'kafka-topic14-0-offset': None,
   'kafka-topic15-0-offset': '7385',
   'kafka-topic16-0-offset': '21101',
   'messages-sent': 3793284,
   'process-calls': 3931043,
   'send-calls': 3793284,
   ‘window-calls': 4355}}},


Finally, the minute after the topic was stalled:

'org.apache.samza.system.SystemConsumersMetrics': {'chose-null': 4264724,
   'chose-object': 3931759,
   'deserialization error': 0,
   'kafka-topic1-messages-chosen': 0,
   'kafka-topic2-messages-chosen': 11610,
   'kafka-topic3-messages-chosen': 0,
   'kafka-topic4-messages-chosen': 93073,
   'kafka-topic5-messages-chosen': 11533,
   'kafka-topic6-messages-chosen': 346089,
   'kafka-topic7-messages-chosen': 24099,
   'kafka-topic8-messages-chosen': 1967,
   'kafka-topic9-messages-chosen': 30112,
   'kafka-topic10-messages-chosen': 911,
   'kafka-topic11-messages-chosen': 3287,
   'kafka-topic12-messages-chosen': 0,
   'kafka-STUCK_TOPIC-messages-chosen': 3408832,
   'kafka-topic13-messages-chosen': 6,
   'kafka-topic14-messages-chosen': 0,
   'kafka-topic15-messages-chosen': 87,
   'kafka-topic16-messages-chosen': 153,
   'kafka-messages-per-poll': 4265487,
   'kafka-polls': 4265488,
   'kafka-ssp-fetches-per-poll': 72512382,
   'poll-timeout': 10,
   'ssps-needed-by-chooser': 1,
   ‘unprocessed-messages': 0},

   'kafka-STUCK_TOPIC-0-bytes-read': 1604389612,
   'kafka-STUCK_TOPIC-0-high-watermark': 459963957,
   'kafka-STUCK_TOPIC-0-messages-behind-high-watermark': 6679,
   'kafka-STUCK_TOPIC-0-messages-read': 3408832,
   ‘kafka-STUCK_TOPIC-0-offset-change': 459957278,

   ‘no-more-messages-SystemStreamPartition [kafka, STUCK_TOPIC, 0]': False,

 'metrics': {'org.apache.samza.container.TaskInstanceMetrics': {'commit-calls': 
728,
   'flush-calls': 728,
   'kafka-topic1-0-offset': None,
   'kafka-topic2-0-offset': '926443',
   'kafka-topic3-0-offset': None,
   'kafka-topic4-0-offset': '10560777',
   'kafka-topic5-0-offset': '918747',
   'kafka-topic6-0-offset': '27403624',
   'kafka-topic7-0-offset': '1791920',
   'kafka-topic8-0-offset': '1316016',
   'kafka-topic9-0-offset': '2466306',
   'kafka-topic10-0-offset': '62530',
   'kafka-topic11-0-offset': '718916',
   'kafka-topic12-0-offset': None,
   'kafka-STUCK_TOPIC-0-offset': '459957277',
   'kafka-topic13-0-offset': '1020',
   'kafka-topic14-0-offset': None,
   'kafka-topic15-0-offset': '7385',
   'kafka-topic16-0-offset': '21101',
   'messages-sent': 3793847,
   'process-calls': 3931759,
   'send-calls': 3793847,
   ‘window-calls': 4361}}},

From all of this, it seems like the KafkaSystemConsumer recognizes that new 
messages are entering the topic, but no messages are being chosen for some 
reason, and so the task instance is not processing or updating its offsets.  
Any help would be much appreciated!
—T

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to