reswqa commented on code in PR #19974:
URL: https://github.com/apache/flink/pull/19974#discussion_r899950625


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/emitter/PulsarRecordEmitter.java:
##########
@@ -20,26 +20,69 @@
 
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
 import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
 import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
+import org.apache.flink.util.Collector;
+
+import org.apache.pulsar.client.api.Message;
 
 /**
  * The {@link RecordEmitter} implementation for both {@link 
PulsarOrderedSourceReader} and {@link
  * PulsarUnorderedSourceReader}. We would always update the last consumed 
message id in this
  * emitter.
  */
 public class PulsarRecordEmitter<T>
-        implements RecordEmitter<PulsarMessage<T>, T, 
PulsarPartitionSplitState> {
+        implements RecordEmitter<Message<byte[]>, T, 
PulsarPartitionSplitState> {
+
+    private final PulsarDeserializationSchema<T> deserializationSchema;
+    private final SourceOutputWrapper<T> sourceOutputWrapper = new 
SourceOutputWrapper<>();
+
+    public PulsarRecordEmitter(PulsarDeserializationSchema<T> 
deserializationSchema) {
+        this.deserializationSchema = deserializationSchema;
+    }
 
     @Override
     public void emitRecord(
-            PulsarMessage<T> element, SourceOutput<T> output, 
PulsarPartitionSplitState splitState)
+            Message<byte[]> element, SourceOutput<T> output, 
PulsarPartitionSplitState splitState)
             throws Exception {
-        // Sink the record to source output.
-        output.collect(element.getValue(), element.getEventTime());
-        // Update the split state.
-        splitState.setLatestConsumedId(element.getId());
+        // Update the source output.
+        sourceOutputWrapper.setSourceOutput(output);
+        sourceOutputWrapper.setTimestamp(element);
+
+        deserializationSchema.deserialize(element, sourceOutputWrapper);
+        splitState.setLatestConsumedId(element.getMessageId());
+    }
+
+    private static class SourceOutputWrapper<T> implements Collector<T> {
+        private SourceOutput<T> sourceOutput;
+        private long timestamp;
+
+        @Override
+        public void collect(T record) {
+            if (timestamp > 0) {
+                sourceOutput.collect(record, timestamp);
+            } else {
+                sourceOutput.collect(record);
+            }
+        }
+
+        @Override
+        public void close() {
+            // Nothing to do here.
+        }
+
+        private void setSourceOutput(SourceOutput<T> sourceOutput) {
+            this.sourceOutput = sourceOutput;
+        }
+
+        /**
+         * Get the event timestamp from Pulsar. Zero means there is no event 
time. See {@link
+         * Message#getEventTime()} to get the reason why it returns zero.
+         */
+        private void setTimestamp(Message<?> message) {
+            this.timestamp = message.getEventTime();

Review Comment:
   Why use Message as parameter instead of passing timestamp directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to