[ https://issues.apache.org/jira/browse/PULSAR-22?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
海洋饼干 closed PULSAR-22. ---------------------- Resolution: Cannot Reproduce > flink消费pulsar时出现错误,但是仍可消费到数据 > ---------------------------- > > Key: PULSAR-22 > URL: https://issues.apache.org/jira/browse/PULSAR-22 > Project: Pulsar > Issue Type: Bug > Environment: kafka(json) -> flink(Stream) -> custom > Serializer -> kop -> pulsar -> flink(stream) keyshared consum(问题所在区域) > Reporter: 海洋饼干 > Priority: Major > > [Source Data Fetcher for Source: kafkaSource (4/4)#0] ERROR > org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase > - Error in polling message from pulsar consumer. > java.util.concurrent.ExecutionException: > org.apache.pulsar.client.api.PulsarClientException$TransactionConflictException: > > \{"errorMsg":"org.apache.pulsar.transaction.common.exception.TransactionConflictException: > > [persistent://public/default/HB0002_rdwBaseCtr_aaamore5005-test-pulsar-partition-0][flink-source] > Transaction:(1,12) try to ack batch message:810:24 in pending ack > status.","reqId":4357026755663389273, > "remote":"tk01-bd-test-pulsar-7-139/192.168.7.139:6650", > "local":"/192.168.34.54:9756"} > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.pollMessage(PulsarUnorderedPartitionSplitReader.java:98) > at > org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:110) > at > org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.fetch(PulsarUnorderedPartitionSplitReader.java:56) > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > Caused by: > org.apache.pulsar.client.api.PulsarClientException$TransactionConflictException: > > \{"errorMsg":"org.apache.pulsar.transaction.common.exception.TransactionConflictException: > > [persistent://public/default/HB0002_rdwBaseCtr_aaamore5005-test-pulsar-partition-0][flink-source] > Transaction:(1,12) try to ack batch message:810:24 in pending ack > status.","reqId":4357026755663389273, > "remote":"tk01-bd-test-pulsar-7-139/192.168.7.139:6650", > "local":"/192.168.34.54:9756"} > at > org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1177) > at > org.apache.pulsar.client.impl.ClientCnx.handleAckResponse(ClientCnx.java:431) > at > org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:150) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) > at > org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) > at > org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) > at > org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) > at > org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) > at > org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) > at > org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) > at > org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) > at > org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at > org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) > ... 1 more -- This message was sent by Atlassian Jira (v8.20.10#820010)