海洋饼干 created PULSAR-22: -------------------------- Summary: flink消费pulsar时出现错误,但是仍可消费到数据 Key: PULSAR-22 URL: https://issues.apache.org/jira/browse/PULSAR-22 Project: Pulsar Issue Type: Bug Environment: kafka(json) -> flink(Stream) -> custom
Serializer -> kop -> pulsar -> flink(stream) keyshared consum(问题所在区域) Reporter: 海洋饼干 [Source Data Fetcher for Source: kafkaSource (4/4)#0] ERROR org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase - Error in polling message from pulsar consumer. java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException$TransactionConflictException: \{"errorMsg":"org.apache.pulsar.transaction.common.exception.TransactionConflictException: [persistent://public/default/HB0002_rdwBaseCtr_aaamore5005-test-pulsar-partition-0][flink-source] Transaction:(1,12) try to ack batch message:810:24 in pending ack status.","reqId":4357026755663389273, "remote":"tk01-bd-test-pulsar-7-139/192.168.7.139:6650", "local":"/192.168.34.54:9756"} at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.pollMessage(PulsarUnorderedPartitionSplitReader.java:98) at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:110) at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.fetch(PulsarUnorderedPartitionSplitReader.java:56) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.pulsar.client.api.PulsarClientException$TransactionConflictException: \{"errorMsg":"org.apache.pulsar.transaction.common.exception.TransactionConflictException: [persistent://public/default/HB0002_rdwBaseCtr_aaamore5005-test-pulsar-partition-0][flink-source] Transaction:(1,12) try to ack batch message:810:24 in pending ack status.","reqId":4357026755663389273, "remote":"tk01-bd-test-pulsar-7-139/192.168.7.139:6650", "local":"/192.168.34.54:9756"} at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1177) at org.apache.pulsar.client.impl.ClientCnx.handleAckResponse(ClientCnx.java:431) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:150) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ... 1 more -- This message was sent by Atlassian Jira (v8.20.10#820010)