xiaoweige1101 opened a new issue, #300:
URL: https://github.com/apache/rocketmq-streams/issues/300
1. Firstly, I run the demo code as follows:
`
public class WordCount {
public static void main(String[] args) {
StreamBuilder builder = new StreamBuilder("wordCount");
builder.source("sourceTopic", total -> {
String value = new String(total, StandardCharsets.UTF_8);
return new Pair<>(null, value);
})
.flatMap((ValueMapperAction<String, List<String>>) value -> {
String[] splits = value.toLowerCase().split(",");
return Arrays.asList(splits);
})
.keyBy(value -> value)
.count()
.toRStream()
.print();
TopologyBuilder topologyBuilder = builder.build();
Properties properties = new Properties();
properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder,
properties);
Runtime.getRuntime().addShutdownHook(new
Thread("wordcount-shutdown-hook") {
@Override
public void run() {
rocketMQStream.stop();
}
});
rocketMQStream.start();
}
}
`
2. And then I send a mq message in rocketmq dashborad, the message content
is "a,b,c"
3. But some error happened in the demo program, message detail as follows:
`
2023-06-26 23:41:49.945 [ROCKETMQ_STREAMS_wordCount_0] INFO
org.apache.rocketmq.streams.core.util.RocketMQUtil -
topic[wordCount-ROCKETMQ-COUNT-00004-shuffleTopic] already exist.
2023-06-26 23:41:50.138 [ROCKETMQ_STREAMS_wordCount_0] INFO
o.a.rocketmq.streams.core.running.WorkerThread - worker
thread=[ROCKETMQ_STREAMS_wordCount_0], start task success, jobId:wordCount
2023-06-26 23:41:53.917 [RebalanceService] INFO
o.a.r.s.core.running.MessageQueueListenerWrapper - recover messageQueue finish,
addQueue: [[]], removeQueue:[[]].
2023-06-26 23:42:13.974 [ROCKETMQ_STREAMS_wordCount_0] ERROR
o.a.rocketmq.streams.core.running.WorkerThread - ignore error,
jobId=[wordCount], skip this data.
java.lang.NullPointerException: null
at
org.apache.rocketmq.streams.core.running.WorkerThread$PlanetaryEngine.runInLoop(WorkerThread.java:194)
at
org.apache.rocketmq.streams.core.running.WorkerThread.run(WorkerThread.java:104)
`
I have no idea, why the demo program throws NullPointerException, has any
one ever had this problem?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]