[ 
https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16506081#comment-16506081
 ] 

ASF GitHub Bot commented on FLINK-9168:
---------------------------------------

Github user XiaoZYang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5845#discussion_r194076473
  
    --- Diff: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
 ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.pulsar;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.serialization.SerializationSchema;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
    +import org.apache.flink.util.SerializableObject;
    +
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.MessageBuilder;
    +import org.apache.pulsar.client.api.MessageId;
    +import org.apache.pulsar.client.api.Producer;
    +import org.apache.pulsar.client.api.ProducerConfiguration;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.function.Function;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * Flink Sink to produce data into a Pulsar topic.
    + */
    +public class FlinkPulsarProducer<IN>
    +           extends RichSinkFunction<IN>
    +           implements CheckpointedFunction {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPulsarProducer.class);
    +
    +   /**
    +    * The pulsar service url.
    +    */
    +   protected final String serviceUrl;
    +
    +   /**
    +    * User defined configuration for the producer.
    +    */
    +   protected final ProducerConfiguration producerConfig;
    +
    +   /**
    +    * The name of the default topic this producer is writing data to.
    +    */
    +   protected final String defaultTopicName;
    +
    +   /**
    +    * (Serializable) SerializationSchema for turning objects used with 
Flink into.
    +    * byte[] for Pulsar.
    +    */
    +   protected final SerializationSchema<IN> schema;
    +
    +   /**
    +    * User-provided key extractor for assigning a key to a pulsar message.
    +    */
    +   protected final PulsarKeyExtractor<IN> flinkPulsarKeyExtractor;
    +
    +   /**
    +    * Produce Mode.
    +    */
    +   protected PulsarProduceMode produceMode = 
PulsarProduceMode.AT_LEAST_ONE;
    +
    +   /**
    +    * If true, the producer will wait until all outstanding records have 
been send to the broker.
    +    */
    +   protected boolean flushOnCheckpoint;
    +
    +   // -------------------------------- Runtime fields 
------------------------------------------
    +
    +   /** Pulsar Producer instance. */
    +   protected transient Producer producer;
    +
    +   /** The callback than handles error propagation or logging callbacks. */
    +   protected transient Function<MessageId, MessageId> successCallback = 
msgId -> {
    +           acknowledgeMessage();
    +           return msgId;
    +   };
    +
    +   protected transient Function<Throwable, MessageId> failureCallback;
    +
    +   /** Errors encountered in the async producer are stored here. */
    +   protected transient volatile Exception asyncException;
    +
    +   /** Lock for accessing the pending records. */
    +   protected final SerializableObject pendingRecordsLock = new 
SerializableObject();
    +
    +   /** Number of unacknowledged records. */
    +   protected long pendingRecords;
    +
    +   public FlinkPulsarProducer(String serviceUrl,
    +                                                   String defaultTopicName,
    +                                                   SerializationSchema<IN> 
serializationSchema,
    +                                                   ProducerConfiguration 
producerConfig,
    +                                                   PulsarKeyExtractor<IN> 
keyExtractor) {
    +           this.serviceUrl = checkNotNull(serviceUrl, "Service url not 
set");
    +           this.defaultTopicName = checkNotNull(defaultTopicName, 
"TopicName not set");
    +           this.schema = checkNotNull(serializationSchema, "Serialization 
Schema not set");
    +           this.producerConfig = checkNotNull(producerConfig, "Producer 
Config is not set");
    +           this.flinkPulsarKeyExtractor = 
getOrNullKeyExtractor(keyExtractor);
    +           ClosureCleaner.ensureSerializable(serializationSchema);
    +   }
    +
    +   // ---------------------------------- Properties 
--------------------------
    +
    +
    +   /**
    +    * @return pulsar key extractor.
    +    */
    +   public PulsarKeyExtractor<IN> getKeyExtractor() {
    +           return flinkPulsarKeyExtractor;
    +   }
    +
    +   /**
    +    * Gets this producer's operating mode.
    +    */
    +   public PulsarProduceMode getProduceMode() {
    +           return this.produceMode;
    +   }
    +
    +   /**
    +    * Sets this producer's operating mode.
    +    *
    +    * @param produceMode The mode of operation.
    +    */
    +   public void setProduceMode(PulsarProduceMode produceMode) {
    +           this.produceMode = checkNotNull(produceMode);
    +   }
    +
    +   /**
    +    * If set to true, the Flink producer will wait for all outstanding 
messages in the Pulsar buffers
    +    * to be acknowledged by the Pulsar producer on a checkpoint.
    +    * This way, the producer can guarantee that messages in the Pulsar 
buffers are part of the checkpoint.
    +    *
    +    * @param flush Flag indicating the flushing mode (true = flush on 
checkpoint)
    +    */
    +   public void setFlushOnCheckpoint(boolean flush) {
    +           this.flushOnCheckpoint = flush;
    +   }
    +
    +   // ----------------------------------- Sink Methods 
--------------------------
    +
    +   @SuppressWarnings("unchecked")
    +   private static final <T> PulsarKeyExtractor<T> 
getOrNullKeyExtractor(PulsarKeyExtractor<T> extractor) {
    +           if (null == extractor) {
    +                   return PulsarKeyExtractor.NULL;
    +           } else {
    +                   return extractor;
    +           }
    +   }
    +
    +   private Producer createProducer(ProducerConfiguration configuration) 
throws Exception {
    +           PulsarClient client = PulsarClient.create(serviceUrl);
    +           return client.createProducer(defaultTopicName, configuration);
    +   }
    +
    +   /**
    +    * Initializes the connection to pulsar.
    +    *
    +    * @param parameters configuration used for initialization
    +    * @throws Exception
    +    */
    +   @Override
    +   public void open(Configuration parameters) throws Exception {
    +           this.producer = createProducer(producerConfig);
    +
    +           RuntimeContext ctx = getRuntimeContext();
    +
    +           LOG.info("Starting FlinkPulsarProducer ({}/{}) to produce into 
pulsar topic {}",
    +                   ctx.getIndexOfThisSubtask() + 1, 
ctx.getNumberOfParallelSubtasks(), defaultTopicName);
    +
    +           if (flushOnCheckpoint && !((StreamingRuntimeContext) 
this.getRuntimeContext()).isCheckpointingEnabled()) {
    +                   LOG.warn("Flushing on checkpoint is enabled, but 
checkpointing is not enabled. Disabling flushing.");
    +                   flushOnCheckpoint = false;
    +           }
    +
    +           if (PulsarProduceMode.AT_MOST_ONCE == produceMode) {
    +                   this.failureCallback = cause -> {
    +                           LOG.error("Error while sending record to Pulsar 
: " + cause.getMessage(), cause);
    +                           return null;
    +                   };
    +           } else if (PulsarProduceMode.AT_LEAST_ONE == produceMode){
    +                   this.failureCallback = cause -> {
    +                           if (null == asyncException) {
    +                                   if (cause instanceof Exception) {
    +                                           asyncException = (Exception) 
cause;
    +                                   } else {
    +                                           asyncException = new 
Exception(cause);
    +                                   }
    +                           }
    +                           return null;
    +                   };
    +           } else {
    +                   throw new UnsupportedOperationException("Unsupported 
produce mode " + produceMode);
    +           }
    +   }
    +
    +   @Override
    +   public void invoke(IN value, Context context) throws Exception {
    +           checkErroneous();
    +
    +           byte[] serializedValue = schema.serialize(value);
    +
    +           MessageBuilder msgBuilder = MessageBuilder.create();
    +           if (null != context.timestamp()) {
    +                   msgBuilder = 
msgBuilder.setEventTime(context.timestamp());
    +           }
    +           String msgKey = flinkPulsarKeyExtractor.getKey(value);
    +           if (null != msgKey) {
    +                   msgBuilder = msgBuilder.setKey(msgKey);
    +           }
    +           Message message = msgBuilder
    +                   .setContent(serializedValue)
    +                   .build();
    +
    +           if (flushOnCheckpoint) {
    +                   synchronized (pendingRecordsLock) {
    +                           pendingRecords++;
    +                   }
    +           }
    +           producer.sendAsync(message)
    +                   .thenApply(successCallback)
    +                   .exceptionally(failureCallback);
    +   }
    +
    +   @Override
    +   public void close() throws Exception {
    +           if (producer != null) {
    +                   producer.close();
    +           }
    +
    +           // make sure we propagate pending errors
    +           checkErroneous();
    +   }
    +
    +   // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
    +
    +   private void acknowledgeMessage() {
    +           if (flushOnCheckpoint) {
    +                   synchronized (pendingRecordsLock) {
    +                           pendingRecords--;
    +                           if (pendingRecords == 0) {
    +                                   pendingRecordsLock.notifyAll();
    +                           }
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
    +           // check for asynchronous errors and fail the checkpoint if 
necessary
    +           checkErroneous();
    +
    +           if (flushOnCheckpoint) {
    +                   // wait until all the messages are acknowledged
    +                   synchronized (pendingRecordsLock) {
    +                           while (pendingRecords > 0) {
    +                                   pendingRecordsLock.wait(100);
    +                           }
    +                   }
    +
    +                   // if the flushed requests has errors, we should 
propagate it also and fail the checkpoint
    +                   checkErroneous();
    +           }
    +   }
    +
    +   @Override
    +   public void initializeState(FunctionInitializationContext context) 
throws Exception {
    +           // nothing to do
    +   }
    +
    +   // ----------------------------------- Utilities 
--------------------------
    +
    +   protected void checkErroneous() throws Exception {
    +           Exception e = asyncException;
    +           if (e != null) {
    +                   // prevent double throwing
    +                   asyncException = null;
    +                   throw new Exception("Failed to send data to Kafka: " + 
e.getMessage(), e);
    --- End diff --
    
    Yes, thanks for pointing out this mistake, it's correct in the newest 
commit.
    ping @surryr 


> Pulsar Sink Connector
> ---------------------
>
>                 Key: FLINK-9168
>                 URL: https://issues.apache.org/jira/browse/FLINK-9168
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming Connectors
>    Affects Versions: 1.6.0
>            Reporter: Zongyang Xiao
>            Priority: Minor
>             Fix For: 1.6.0
>
>
> Flink does not provide a sink connector for Pulsar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to