merlimat commented on a change in pull request #538: Make broker configurable to own non-persistent topic URL: https://github.com/apache/incubator-pulsar/pull/538#discussion_r129213798
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java ########## @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.nonpersistent; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.util.Rate; +import org.apache.pulsar.broker.service.Replicator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.persistent.PersistentReplicator; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.SendCallback; +import org.apache.pulsar.common.policies.data.ReplicatorStats; + +import io.netty.buffer.ByteBuf; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; + +public class NonPersistentReplicator implements Replicator { + + private final BrokerService brokerService; + private final NonPersistentTopic topic; + private final String topicName; + private final String localCluster; + private final String remoteCluster; + private final PulsarClientImpl client; + + private volatile ProducerImpl producer; + + private final Rate msgOut = new Rate(); + private final Rate msgExpired = new Rate(); + + private static final ProducerConfiguration producerConfiguration = new ProducerConfiguration().setSendTimeout(0, + TimeUnit.SECONDS); + + private final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES); + + private final ReplicatorStats stats = new ReplicatorStats(); + + public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster, + BrokerService brokerService) { + this.brokerService = brokerService; + this.topic = topic; + this.topicName = topic.getName(); + this.localCluster = localCluster; + this.remoteCluster = remoteCluster; + this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster); + this.producer = null; + STATE_UPDATER.set(this, State.Stopped); + + producerConfiguration + .setMaxPendingMessages(brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize()); + producerConfiguration.setBlockIfQueueFull(false); + + startProducer(); + } + + public String getRemoteCluster() { + return remoteCluster; + } + + enum State { + Stopped, Starting, Started, Stopping + } + + private static final AtomicReferenceFieldUpdater<NonPersistentReplicator, State> STATE_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(NonPersistentReplicator.class, State.class, "state"); + private volatile State state = State.Stopped; + + // This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer + // the end result can be disconnect. + public synchronized void startProducer() { + if (STATE_UPDATER.get(this) == State.Stopping) { + long waitTimeMs = backOff.next(); + if (log.isDebugEnabled()) { + log.debug( + "[{}][{} -> {}] waiting for producer to close before attempting to reconnect, retrying in {} s", + topicName, localCluster, remoteCluster, waitTimeMs / 1000.0); + } + // BackOff before retrying + brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); + return; + } + State state = STATE_UPDATER.get(this); + if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) { + if (state == State.Started) { + // Already running + if (log.isDebugEnabled()) { + log.debug("[{}][{} -> {}] Replicator was already running", topicName, localCluster, remoteCluster); + } + } else { + log.info("[{}][{} -> {}] Replicator already being started. Replicator state: {}", topicName, + localCluster, remoteCluster, state); + } + + return; + } + + log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster); + client.createProducerAsync(topicName, producerConfiguration, + getReplicatorName(topic.replicatorPrefix, localCluster)).thenAccept(producer -> { + + this.producer = (ProducerImpl) producer; + + if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) { + log.info("[{}][{} -> {}] Created replicator producer", topicName, localCluster, remoteCluster); + backOff.reset(); + } else { + log.info( + "[{}][{} -> {}] Replicator was stopped while creating the producer. Closing it. Replicator state: {}", + topicName, localCluster, remoteCluster, STATE_UPDATER.get(this)); + STATE_UPDATER.set(this, State.Stopping); + closeProducerAsync(); + return; + } + }).exceptionally(ex -> { + if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { + long waitTimeMs = backOff.next(); + log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName, + localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0); + + // BackOff before retrying + brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); + } else { + log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName, + localCluster, remoteCluster, STATE_UPDATER.get(this), ex); + } + return null; + }); + + } + + public void sendMessage(Entry entry) { Review comment: Is it possible to share any logic with the other replicator? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services