[ https://issues.apache.org/jira/browse/FLINK-26642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17506782#comment-17506782 ]
goldenyang commented on FLINK-26642: ------------------------------------ [~affe] Thanks. Please take a look, we tried to use this Pulsar Sink, and we fixed a version after encountering this problem. Please see if this modification is acceptable. > Pulsar sink fails with non-partitioned topic > -------------------------------------------- > > Key: FLINK-26642 > URL: https://issues.apache.org/jira/browse/FLINK-26642 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar > Affects Versions: 1.15.0 > Reporter: goldenyang > Priority: Major > Labels: pull-request-available > Original Estimate: 72h > Remaining Estimate: 72h > > Flink support pulsar sink now in > [FLINK-20732|https://issues.apache.org/jira/browse/FLINK-20732]. I > encountered a problem when using pulsar sink in master branch, when I use > non-partitioned topic. > The current test found that both partitioned topics and non-partitioned > topics ending with -partition-i can be supported, but ordinary > non-partitioned topics without -partition-i will have problems, such as > 'test_topic'. > Reproducing the problem requires writing to a non-partitioned topic. Below is > the stack information when the exception is encountered. I briefly > communicated with [~Jianyun Zhao] , this may be a bug. > > {code:java} > 2022-03-08 21:39:13,622 - INFO - > [flink-akka.actor.default-dispatcher-13:Execution@1419] - Source: Pulsar > Source -> (Sink: Writer -> Sink: Committer, Sink: Print to Std. Out) (1/6) > (44af5e8a2b9d553952c7ed3e5d40e672) switched from RUNNING to FAILED on > 54284e57-42a9-4e2e-9c41-54b0ad559832 @ 127.0.0.1 > (dataPort=-1).java.lang.IllegalArgumentException: You should provide topics > for routing topic by message key hash.at > org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument(Preconditions.java:144)at > > org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:54)at > > org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:138)at > > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:124)at > > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)at > > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)at > > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)at > > org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)at > > org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)at > > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:205)at > > org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)at > > org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:41)at > > org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:33)at > > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)at > > org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)at > > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:382)at > > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)at > > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)at > > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at > java.lang.Thread.run(Thread.java:748) {code} > > -- This message was sent by Atlassian Jira (v8.20.1#820001)