reswqa commented on code in PR #19974: URL: https://github.com/apache/flink/pull/19974#discussion_r899950625
########## flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/emitter/PulsarRecordEmitter.java: ########## @@ -20,26 +20,69 @@ import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.connector.base.source.reader.RecordEmitter; -import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader; import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState; +import org.apache.flink.util.Collector; + +import org.apache.pulsar.client.api.Message; /** * The {@link RecordEmitter} implementation for both {@link PulsarOrderedSourceReader} and {@link * PulsarUnorderedSourceReader}. We would always update the last consumed message id in this * emitter. */ public class PulsarRecordEmitter<T> - implements RecordEmitter<PulsarMessage<T>, T, PulsarPartitionSplitState> { + implements RecordEmitter<Message<byte[]>, T, PulsarPartitionSplitState> { + + private final PulsarDeserializationSchema<T> deserializationSchema; + private final SourceOutputWrapper<T> sourceOutputWrapper = new SourceOutputWrapper<>(); + + public PulsarRecordEmitter(PulsarDeserializationSchema<T> deserializationSchema) { + this.deserializationSchema = deserializationSchema; + } @Override public void emitRecord( - PulsarMessage<T> element, SourceOutput<T> output, PulsarPartitionSplitState splitState) + Message<byte[]> element, SourceOutput<T> output, PulsarPartitionSplitState splitState) throws Exception { - // Sink the record to source output. - output.collect(element.getValue(), element.getEventTime()); - // Update the split state. - splitState.setLatestConsumedId(element.getId()); + // Update the source output. + sourceOutputWrapper.setSourceOutput(output); + sourceOutputWrapper.setTimestamp(element); + + deserializationSchema.deserialize(element, sourceOutputWrapper); + splitState.setLatestConsumedId(element.getMessageId()); + } + + private static class SourceOutputWrapper<T> implements Collector<T> { + private SourceOutput<T> sourceOutput; + private long timestamp; + + @Override + public void collect(T record) { + if (timestamp > 0) { + sourceOutput.collect(record, timestamp); + } else { + sourceOutput.collect(record); + } + } + + @Override + public void close() { + // Nothing to do here. + } + + private void setSourceOutput(SourceOutput<T> sourceOutput) { + this.sourceOutput = sourceOutput; + } + + /** + * Get the event timestamp from Pulsar. Zero means there is no event time. See {@link + * Message#getEventTime()} to get the reason why it returns zero. + */ + private void setTimestamp(Message<?> message) { + this.timestamp = message.getEventTime(); Review Comment: Why use Message as parameter instead of passing timestamp directly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org