[ https://issues.apache.org/jira/browse/FLINK-9641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16549828#comment-16549828 ]
ASF GitHub Bot commented on FLINK-9641: --------------------------------------- Github user cckellogg commented on a diff in the pull request: https://github.com/apache/flink/pull/6200#discussion_r203863637 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.IOUtils; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Pulsar source (consumer) which receives messages from a topic and acknowledges messages. + * When checkpointing is enabled, it guarantees at least once processing semantics. + * + * <p>When checkpointing is disabled, it auto acknowledges messages based on the number of messages it has + * received. In this mode messages may be dropped. + */ +class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageId> implements PulsarSourceBase<T> { + + private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerSource.class); + + private final int messageReceiveTimeoutMs = 100; + private final String serviceUrl; + private final String topic; + private final String subscriptionName; + private final DeserializationSchema<T> deserializer; + + private PulsarClient client; + private Consumer<byte[]> consumer; + + private boolean isCheckpointingEnabled; + + private final long acknowledgementBatchSize; + private long batchCount; + private long totalMessageCount; + + private transient volatile boolean isRunning; + + PulsarConsumerSource(PulsarSourceBuilder<T> builder) { + super(MessageId.class); + this.serviceUrl = builder.serviceUrl; + this.topic = builder.topic; + this.deserializer = builder.deserializationSchema; + this.subscriptionName = builder.subscriptionName; + this.acknowledgementBatchSize = builder.acknowledgementBatchSize; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + final RuntimeContext context = getRuntimeContext(); + if (context instanceof StreamingRuntimeContext) { + isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled(); + } + + client = createClient(); + consumer = createConsumer(client); + + isRunning = true; + } + + @Override + protected void acknowledgeIDs(long checkpointId, Set<MessageId> messageIds) { + if (consumer == null) { + LOG.error("null consumer unable to acknowledge messages"); + throw new RuntimeException("null pulsar consumer unable to acknowledge messages"); + } + + if (messageIds.isEmpty()) { + LOG.info("no message ids to acknowledge"); + return; + } + + Map<String, CompletableFuture<Void>> futures = new HashMap<>(messageIds.size()); + for (MessageId id : messageIds) { + futures.put(id.toString(), consumer.acknowledgeAsync(id)); + } + + futures.forEach((k, f) -> { + try { + f.get(); + } catch (Exception e) { + LOG.error("failed to acknowledge messageId " + k, e); + throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e); --- End diff -- @yanghua can the set passed in be modified? > Pulsar Source Connector > ----------------------- > > Key: FLINK-9641 > URL: https://issues.apache.org/jira/browse/FLINK-9641 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors > Reporter: Chris Kellogg > Priority: Minor > Labels: pull-request-available > > Pulsar (https://github.com/apache/incubator-pulsar) is a distributed pub-sub > messaging system currently in apache incubation. It is a very active project > and there are committers from various companies and good adoption. This pr > will add a source function to allow Flink jobs to process messages from > Pulsar topics. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)