---------- 已转发邮件 ---------- 发件人: Xin Wang <[email protected]> 日期: 2018年3月21日 下午8:34 主题: Re: 请教rocketmq-spark问题 收件人: Yingjun Li 李颖俊 <[email protected]>, [email protected]
你好, 很抱歉错过了这封邮件。由于这是个已知问题为了减少对用户的影响,故抄送到dev mailing list。 这是RocketMQ-Spark去年已经修复的一个BUG(详情:https://github.com/apache/ rocketmq-externals/pull/36) 可以使用最新版本解决此问题。 BTW,RocketMQ-Spark近期将会进行一次优化改进,为RocketMQ用户提供一个更稳定、可靠的大数据集成组件。 谢谢。 在 2017年10月14日 下午7:53,Yingjun Li 李颖俊 <[email protected]>写道: > 你好: > 最近我在使用apache/rocketmq-externals 这个项目里面有关spark-streaming > 的插件,然后通过在github上面了解到consumer push mode是你实现合并上去的。方便打扰一下向你请教点问题吗? > 我运行的环境如附件。 > > > JavaInputDStream<Message> stream = RocketMqUtils.createJavaReliab > leMQPushStream( > jscc,pushConsumerProperties,St > orageLevel.MEMORY_AND_DISK_SER()); > > 我采用的是reliable的push mode,数据可以从rocketMQ出来存进去队列BlockingQueue<MessageSet> > queue; > 但是到了这一步 > > try { > // According to the official docs > // 'To implement a reliable receiver, you have to use > store(multiple-records) to store data' > ReliableRocketMQReceiver.this.store(messageSet); > ack(messageSet.getId()); > } catch (Exception e) { > fail(messageSet.getId()); > } > MessageSender的进程就一直卡住了,过一段时间就出现OOM错误,如下: > > java.lang.OutOfMemoryError: Java heap space > at scala.reflect.ManifestFactory$$anon$2.newArray(Manifest.scal > a:177) > at scala.reflect.ManifestFactory$$anon$2.newArray(Manifest.scal > a:176) > at org.apache.spark.util.collection.PrimitiveVector.copyArrayWi > thLength(PrimitiveVector.scala:87) > at org.apache.spark.util.collection.PrimitiveVector.resize( > PrimitiveVector.scala:74) > at org.apache.spark.util.collection.SizeTrackingVector.resize( > SizeTrackingVector.scala:35) > at org.apache.spark.util.collection.PrimitiveVector.$plus$eq( > PrimitiveVector.scala:41) > at org.apache.spark.util.collection.SizeTrackingVector.$plus$ > eq(SizeTrackingVector.scala:30) > at org.apache.spark.storage.memory.MemoryStore.putIteratorAsVal > ues(MemoryStore.scala:216) > at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator > $1.apply(BlockManager.scala:1038) > at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator > $1.apply(BlockManager.scala:1029) > at org.apache.spark.storage.BlockManager.doPut(BlockManager. > scala:969) > at org.apache.spark.storage.BlockManager.doPutIterator(BlockMan > ager.scala:1029) > at org.apache.spark.storage.BlockManager.putIterator(BlockManag > er.scala:792) > at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHa > ndler.storeBlock(ReceivedBlockHandler.scala:84) > at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.p > ushAndReportBlock(ReceiverSupervisorImpl.scala:158) > at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.p > ushIterator(ReceiverSupervisorImpl.scala:138) > at org.apache.spark.streaming.receiver.Receiver.store(Receiver. > scala:152) > at org.apache.rocketmq.spark.streaming.ReliableRocketMQReceiver > $MessageSender.run(ReliableRocketMQReceiver.java:119) > > 通过dump发现List<Object>实例过多的问题,但是我本身才往rocketMQ插入500条记录。 > num #instances #bytes class name > ---------------------------------------------- > 1: 5005 537537816 [Ljava.lang.Object; > 2: 1664 135648072 [B > 3: 30846 2588360 [C > > 后面我就暂时无解了,只是猜测会不会跟MessageSet 里面的private List<MessageExt> data有关。 > 看到邮件如果有时间方便的话非常希望能得到你的帮助,谢谢! > > 李颖俊 > > > 2017.10.14 > > > -- Thanks, Xin -- Thanks, Xin
