HoustonPutman commented on code in PR #2541: URL: https://github.com/apache/solr/pull/2541#discussion_r1657812390
########## solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java: ########## @@ -46,21 +45,21 @@ public static void ensureWorkingMockito() { @BeforeClass public static void beforeSimpleSolrIntegrationTest() throws Exception { - + System.setProperty("solr.crossdc.bootstrapServers", "doesnotmatter:9092"); + System.setProperty("solr.crossdc.topicName", "doesnotmatter"); cluster1 = configureCluster(2) - .addConfig("conf", getFile("configs/cloud-minimal/conf").toPath()) + // .addConfig("conf", getFile("configs/cloud-minimal/conf").toPath()) Review Comment: why is this commented out? ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java: ########## @@ -29,345 +33,369 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; -import org.apache.solr.crossdc.common.ResubmitBackoffPolicy; import org.apache.solr.crossdc.common.CrossDcConstants; import org.apache.solr.crossdc.common.IQueueHandler; import org.apache.solr.crossdc.common.MirroredSolrRequest; +import org.apache.solr.crossdc.common.ResubmitBackoffPolicy; import org.apache.solr.crossdc.common.SolrExceptionUtil; -import org.apache.solr.crossdc.consumer.Consumer; +import org.apache.solr.crossdc.manager.consumer.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import java.lang.invoke.MethodHandles; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - /** - * Message processor implements all the logic to process a MirroredSolrRequest. - * It handles: - * 1. Sending the update request to Solr - * 2. Discarding or retrying failed requests - * 3. Flagging requests for resubmission by the underlying consumer implementation. + * Message processor implements all the logic to process a MirroredSolrRequest. It handles: 1. Review Comment: We can probably fix this by adding line breaks and stuff ########## solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java: ########## @@ -248,8 +288,10 @@ public void processDelete(final DeleteUpdateCommand cmd) throws IOException { if (log.isDebugEnabled()) log.debug("processDelete doMirroring={} isLeader={} cmd={}", true, isLeader, cmd); } else { - // DBQs are sent to each shard leader, so we mirror from the original node to only mirror once - // In general there's no way to guarantee that these run identically on the mirror since there are no + // DBQs are sent to each shard leader, so we mirror from the original node to only mirror + // once + // In general there's no way to guarantee that these run identically on the mirror since + // there are no Review Comment: Make this a little prettier ########## solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java: ########## @@ -28,178 +37,200 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import static org.apache.solr.crossdc.common.KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS; - public class KafkaMirroringSink implements RequestMirroringSink, Closeable { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private final KafkaCrossDcConf conf; - private final Producer<String, MirroredSolrRequest> producer; - private final KafkaConsumer<String,MirroredSolrRequest> consumer; - private final String mainTopic; - private final String dlqTopic; - - public KafkaMirroringSink(final KafkaCrossDcConf conf) { - // Create Kafka Mirroring Sink - this.conf = conf; - this.producer = initProducer(); - this.consumer = initConsumer(); - this.mainTopic = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",")[0]; - this.dlqTopic = conf.get(KafkaCrossDcConf.DLQ_TOPIC_NAME); - - checkTopicsAvailability(); + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final KafkaCrossDcConf conf; + private final Producer<String, MirroredSolrRequest<?>> producer; + private final KafkaConsumer<String, MirroredSolrRequest<?>> consumer; + private final String mainTopic; + private final String dlqTopic; + + public KafkaMirroringSink(final KafkaCrossDcConf conf) { + // Create Kafka Mirroring Sink + this.conf = conf; + this.producer = initProducer(); + this.consumer = initConsumer(); + this.mainTopic = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",")[0]; + this.dlqTopic = conf.get(KafkaCrossDcConf.DLQ_TOPIC_NAME); + + checkTopicsAvailability(); + } + + @Override + public void submit(MirroredSolrRequest<?> request) throws MirroringException { + this.submitRequest(request, mainTopic); + } + + @Override + public void submitToDlq(MirroredSolrRequest<?> request) throws MirroringException { + if (dlqTopic != null) { + this.submitRequest(request, dlqTopic); + } else { + if (log.isInfoEnabled()) { + log.info("- no DLQ, dropping failed {}", request); + } } + } - @Override - public void submit(MirroredSolrRequest request) throws MirroringException { - this.submitRequest(request, mainTopic); - } + private void checkTopicsAvailability() { + final Map<String, List<PartitionInfo>> topics = this.consumer.listTopics(); - @Override - public void submitToDlq(MirroredSolrRequest request) throws MirroringException { - if (dlqTopic != null) { - this.submitRequest(request, dlqTopic); - } else { - if (log.isInfoEnabled()) { - log.info("- no DLQ, dropping failed {}", request); - } - } + if (mainTopic != null && !topics.containsKey(mainTopic)) { + throw new RuntimeException("Main topic " + mainTopic + " is not available"); } - - private void checkTopicsAvailability() { - final Map<String, List<PartitionInfo>> topics = this.consumer.listTopics(); - - if (mainTopic != null && !topics.containsKey(mainTopic)) { - throw new RuntimeException("Main topic " + mainTopic + " is not available"); - } - if (dlqTopic != null && !topics.containsKey(dlqTopic)) { - throw new RuntimeException("DLQ topic " + dlqTopic + " is not available"); - } + if (dlqTopic != null && !topics.containsKey(dlqTopic)) { + throw new RuntimeException("DLQ topic " + dlqTopic + " is not available"); } + } - private void submitRequest(MirroredSolrRequest request, String topicName) throws MirroringException { - if (log.isDebugEnabled()) { - log.debug("About to submit a MirroredSolrRequest"); - } - - final long enqueueStartNanos = System.nanoTime(); - - // Create Producer record - try { + private void submitRequest(MirroredSolrRequest<?> request, String topicName) + throws MirroringException { + if (log.isDebugEnabled()) { + log.debug("About to submit a MirroredSolrRequest"); + } - producer.send(new ProducerRecord<>(topicName, request), (metadata, exception) -> { - if (exception != null) { - log.error("Failed adding update to CrossDC queue! request=" + request.getSolrRequest(), exception); - } - }); + final long enqueueStartNanos = System.nanoTime(); - long lastSuccessfulEnqueueNanos = System.nanoTime(); - // Record time since last successful enqueue as 0 - long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos); - // Update elapsed time + // Create Producer record + try { - if (elapsedTimeMillis > conf.getInt(SLOW_SUBMIT_THRESHOLD_MS)) { - slowSubmitAction(request, elapsedTimeMillis); + producer.send( + new ProducerRecord<>(topicName, request), + (metadata, exception) -> { + if (exception != null) { + log.error( + "Failed adding update to CrossDC queue! request=" + request.getSolrRequest(), + exception); } - } catch (Exception e) { - // We are intentionally catching all exceptions, the expected exception form this function is {@link MirroringException} - String message = "Unable to enqueue request " + request + ", configured retries is" + conf.getInt(KafkaCrossDcConf.NUM_RETRIES) + - " and configured max delivery timeout in ms is " + conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS); - log.error(message, e); - throw new MirroringException(message, e); - } + }); + + long lastSuccessfulEnqueueNanos = System.nanoTime(); + // Record time since last successful enqueue as 0 + long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos); + // Update elapsed time + + if (elapsedTimeMillis > conf.getInt(SLOW_SUBMIT_THRESHOLD_MS)) { + slowSubmitAction(request, elapsedTimeMillis); + } + } catch (Exception e) { + // We are intentionally catching all exceptions, the expected exception form this function is + // {@link MirroringException} + String message = + "Unable to enqueue request " + + request + + ", configured retries is" + + conf.getInt(KafkaCrossDcConf.NUM_RETRIES) + + " and configured max delivery timeout in ms is " + + conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS); + log.error(message, e); + throw new MirroringException(message, e); } - - /** - * Create and init the producer using {@link this#conf} - * All producer configs are listed here - * https://kafka.apache.org/documentation/#producerconfigs - * - * @return - */ - private Producer<String, MirroredSolrRequest> initProducer() { - // Initialize and return Kafka producer - Properties kafkaProducerProps = new Properties(); - - log.info("Starting CrossDC Producer {}", conf); - - kafkaProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); - - kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all"); - String retries = conf.get(KafkaCrossDcConf.NUM_RETRIES); - if (retries != null) { - kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retries)); - } - kafkaProducerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, conf.getInt(KafkaCrossDcConf.RETRY_BACKOFF_MS)); - kafkaProducerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS)); - kafkaProducerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES)); - kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getInt(KafkaCrossDcConf.BATCH_SIZE_BYTES)); - kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, conf.getInt(KafkaCrossDcConf.BUFFER_MEMORY_BYTES)); - kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, conf.getInt(KafkaCrossDcConf.LINGER_MS)); - kafkaProducerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); // should be less than time that causes consumer to be kicked out - kafkaProducerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, conf.get(KafkaCrossDcConf.ENABLE_DATA_COMPRESSION)); - - kafkaProducerProps.put("key.serializer", StringSerializer.class.getName()); - kafkaProducerProps.put("value.serializer", MirroredSolrRequestSerializer.class.getName()); - - KafkaCrossDcConf.addSecurityProps(conf, kafkaProducerProps); - - kafkaProducerProps.putAll(conf.getAdditionalProperties()); - - if (log.isDebugEnabled()) { - log.debug("Kafka Producer props={}", kafkaProducerProps); - } - - ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(null); - Producer<String, MirroredSolrRequest> producer; - try { - producer = new KafkaProducer<>(kafkaProducerProps); - } finally { - Thread.currentThread().setContextClassLoader(originalContextClassLoader); - } - return producer; - } - - private KafkaConsumer<String, MirroredSolrRequest> initConsumer() { - final Properties kafkaConsumerProperties = new Properties(); - - kafkaConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); - kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, conf.get(KafkaCrossDcConf.GROUP_ID)); - kafkaConsumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS)); - kafkaConsumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS)); - kafkaConsumerProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS)); - kafkaConsumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - kafkaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - kafkaConsumerProperties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES)); - kafkaConsumerProperties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS)); - kafkaConsumerProperties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES)); - kafkaConsumerProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES)); - kafkaConsumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); - kafkaConsumerProperties.putAll(conf.getAdditionalProperties()); - - return new KafkaConsumer<>(kafkaConsumerProperties, new StringDeserializer(), new MirroredSolrRequestSerializer()); + } + + /** + * Create and init the producer using {@link this#conf} All producer configs are listed here + * https://kafka.apache.org/documentation/#producerconfigs + * + * @return + */ + private Producer<String, MirroredSolrRequest<?>> initProducer() { + // Initialize and return Kafka producer + Properties kafkaProducerProps = new Properties(); + + log.info("Starting CrossDC Producer {}", conf); + + kafkaProducerProps.put( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); + + kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + String retries = conf.get(KafkaCrossDcConf.NUM_RETRIES); + if (retries != null) { + kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retries)); } - - private void slowSubmitAction(Object request, long elapsedTimeMillis) { - log.warn("Enqueuing the request to Kafka took more than {} millis. enqueueElapsedTime={}", - conf.get(KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS), - elapsedTimeMillis); + kafkaProducerProps.put( + ProducerConfig.RETRY_BACKOFF_MS_CONFIG, conf.getInt(KafkaCrossDcConf.RETRY_BACKOFF_MS)); + kafkaProducerProps.put( + ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, + conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS)); + kafkaProducerProps.put( + ProducerConfig.MAX_REQUEST_SIZE_CONFIG, + conf.getInt(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES)); + kafkaProducerProps.put( + ProducerConfig.BATCH_SIZE_CONFIG, conf.getInt(KafkaCrossDcConf.BATCH_SIZE_BYTES)); + kafkaProducerProps.put( + ProducerConfig.BUFFER_MEMORY_CONFIG, conf.getInt(KafkaCrossDcConf.BUFFER_MEMORY_BYTES)); + kafkaProducerProps.put( + ProducerConfig.LINGER_MS_CONFIG, conf.getInt(KafkaCrossDcConf.LINGER_MS)); + kafkaProducerProps.put( + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, + conf.getInt( + KafkaCrossDcConf + .REQUEST_TIMEOUT_MS)); // should be less than time that causes consumer to be kicked Review Comment: Yeah this can probably be made prettier too ########## solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java: ########## @@ -263,58 +305,66 @@ public void processDelete(final DeleteUpdateCommand cmd) throws IOException { throw new SolrException(SERVER_ERROR, "mirror submit failed", e); } } - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("processDelete doMirroring={} cmd={}", true, cmd); + } } - } } - private static void processDBQResults(SolrClient client, String collection, String uniqueField, - QueryResponse rsp) + private static void processDBQResults( + SolrClient client, String collection, String uniqueField, QueryResponse rsp) throws SolrServerException, IOException { SolrDocumentList results = rsp.getResults(); List<String> ids = new ArrayList<>(results.size()); - results.forEach(entries -> { - String id = entries.getFirstValue(uniqueField).toString(); - ids.add(id); - }); - if (ids.size() > 0) { + results.forEach( + entries -> { + String id = entries.getFirstValue(uniqueField).toString(); + ids.add(id); + }); + if (!ids.isEmpty()) { client.deleteById(collection, ids); } } boolean isLeader(SolrQueryRequest req, String id, String route, SolrInputDocument doc) { - CloudDescriptor cloudDesc = - req.getCore().getCoreDescriptor().getCloudDescriptor(); + CloudDescriptor cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor(); String collection = cloudDesc.getCollectionName(); ClusterState clusterState = req.getCore().getCoreContainer().getZkController().getClusterState(); DocCollection coll = clusterState.getCollection(collection); Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll); if (slice == null) { - // No slice found. Most strict routers will have already thrown an exception, so a null return is + // No slice found. Most strict routers will have already thrown an exception, so a null + // return is Review Comment: Make prettier ########## solr/modules/cross-dc/src/test-files/configs/cloud-minimal.zip: ########## Review Comment: Do we actually need this? ########## solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java: ########## @@ -39,8 +39,9 @@ public class KafkaCrossDcConf extends CrossDcConf { public static final String DEFAULT_MAX_REQUEST_SIZE = "5242880"; public static final String DEFAULT_ENABLE_DATA_COMPRESSION = "none"; private static final String DEFAULT_INDEX_UNMIRRORABLE_DOCS = "false"; - public static final String DEFAULT_SLOW_SEND_THRESHOLD= "1000"; - public static final String DEFAULT_NUM_RETRIES = null; // by default, we control retries with DELIVERY_TIMEOUT_MS_DOC + public static final String DEFAULT_SLOW_SEND_THRESHOLD = "1000"; + public static final String DEFAULT_NUM_RETRIES = + null; // by default, we control retries with DELIVERY_TIMEOUT_MS_DOC Review Comment: Lets put the comment on a separate line ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java: ########## @@ -29,345 +33,369 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; -import org.apache.solr.crossdc.common.ResubmitBackoffPolicy; import org.apache.solr.crossdc.common.CrossDcConstants; import org.apache.solr.crossdc.common.IQueueHandler; import org.apache.solr.crossdc.common.MirroredSolrRequest; +import org.apache.solr.crossdc.common.ResubmitBackoffPolicy; import org.apache.solr.crossdc.common.SolrExceptionUtil; -import org.apache.solr.crossdc.consumer.Consumer; +import org.apache.solr.crossdc.manager.consumer.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import java.lang.invoke.MethodHandles; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - /** - * Message processor implements all the logic to process a MirroredSolrRequest. - * It handles: - * 1. Sending the update request to Solr - * 2. Discarding or retrying failed requests - * 3. Flagging requests for resubmission by the underlying consumer implementation. + * Message processor implements all the logic to process a MirroredSolrRequest. It handles: 1. + * Sending the update request to Solr 2. Discarding or retrying failed requests 3. Flagging requests + * for resubmission by the underlying consumer implementation. */ -public class SolrMessageProcessor extends MessageProcessor implements IQueueHandler<MirroredSolrRequest> { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private final MetricRegistry metrics = SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY); +public class SolrMessageProcessor extends MessageProcessor + implements IQueueHandler<MirroredSolrRequest<?>> { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - final CloudSolrClient client; + private final MetricRegistry metrics = + SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY); - private static final String VERSION_FIELD = "_version_"; + final CloudSolrClient client; - public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy resubmitBackoffPolicy) { - super(resubmitBackoffPolicy); - this.client = client; - } + private static final String VERSION_FIELD = "_version_"; - @Override - public Result<MirroredSolrRequest> handleItem(MirroredSolrRequest mirroredSolrRequest) { - try (final MDC.MDCCloseable mdc = MDC.putCloseable("collection", getCollectionFromRequest(mirroredSolrRequest))) { - connectToSolrIfNeeded(); + public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy resubmitBackoffPolicy) { + super(resubmitBackoffPolicy); + this.client = client; + } - // preventCircularMirroring(mirroredSolrRequest); TODO: isn't this handled by the mirroring handler? + @Override + @SuppressWarnings("try") + public Result<MirroredSolrRequest<?>> handleItem(MirroredSolrRequest<?> mirroredSolrRequest) { + try (final MDC.MDCCloseable ignored = + MDC.putCloseable("collection", getCollectionFromRequest(mirroredSolrRequest))) { + connectToSolrIfNeeded(); - return processMirroredRequest(mirroredSolrRequest); - } - } + // preventCircularMirroring(mirroredSolrRequest); TODO: isn't this handled by the mirroring + // handler? - private Result<MirroredSolrRequest> processMirroredRequest(MirroredSolrRequest request) { - final Result<MirroredSolrRequest> result = handleSolrRequest(request); - // Back-off before returning - backoffIfNeeded(result, request.getType()); - return result; + return processMirroredRequest(mirroredSolrRequest); } + } - private Result<MirroredSolrRequest> handleSolrRequest(MirroredSolrRequest mirroredSolrRequest) { - - SolrRequest request = mirroredSolrRequest.getSolrRequest(); - final SolrParams requestParams = request.getParams(); + private Result<MirroredSolrRequest<?>> processMirroredRequest(MirroredSolrRequest<?> request) { + final Result<MirroredSolrRequest<?>> result = handleSolrRequest(request); + // Back-off before returning + backoffIfNeeded(result, request.getType()); + return result; + } - if (log.isDebugEnabled()) { - log.debug("handleSolrRequest start params={}", requestParams); - } + private Result<MirroredSolrRequest<?>> handleSolrRequest( + MirroredSolrRequest<?> mirroredSolrRequest) { - // TODO: isn't this handled by the mirroring handler? -// final String shouldMirror = requestParams.get("shouldMirror"); -// -// if ("false".equalsIgnoreCase(shouldMirror)) { -// log.warn("Skipping mirrored request because shouldMirror is set to false. request={}", requestParams); Review Comment: Should we just get rid of this commented block? ########## solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java: ########## @@ -114,12 +115,18 @@ public class KafkaCrossDcConf extends CrossDcConf { public static final String FETCH_MAX_BYTES = "solr.crossdc.fetchMaxBytes"; - // The maximum delay between invocations of poll() when using consumer group management. This places - // an upper bound on the amount of time that the consumer can be idle before fetching more records. - // If poll() is not called before expiration of this timeout, then the consumer is considered failed - // and the group will rebalance in order to reassign the partitions to another member. For consumers - // using a non-null <code>group.instance.id</code> which reach this timeout, partitions will not be - // immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be + // The maximum delay between invocations of poll() when using consumer group management. This + // places + // an upper bound on the amount of time that the consumer can be idle before fetching more + // records. + // If poll() is not called before expiration of this timeout, then the consumer is considered + // failed + // and the group will rebalance in order to reassign the partitions to another member. For + // consumers + // using a non-null <code>group.instance.id</code> which reach this timeout, partitions will not + // be + // immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will + // be Review Comment: Annoying, but should be easy to fix. ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java: ########## @@ -29,345 +33,369 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; -import org.apache.solr.crossdc.common.ResubmitBackoffPolicy; import org.apache.solr.crossdc.common.CrossDcConstants; import org.apache.solr.crossdc.common.IQueueHandler; import org.apache.solr.crossdc.common.MirroredSolrRequest; +import org.apache.solr.crossdc.common.ResubmitBackoffPolicy; import org.apache.solr.crossdc.common.SolrExceptionUtil; -import org.apache.solr.crossdc.consumer.Consumer; +import org.apache.solr.crossdc.manager.consumer.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import java.lang.invoke.MethodHandles; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - /** - * Message processor implements all the logic to process a MirroredSolrRequest. - * It handles: - * 1. Sending the update request to Solr - * 2. Discarding or retrying failed requests - * 3. Flagging requests for resubmission by the underlying consumer implementation. + * Message processor implements all the logic to process a MirroredSolrRequest. It handles: 1. + * Sending the update request to Solr 2. Discarding or retrying failed requests 3. Flagging requests + * for resubmission by the underlying consumer implementation. */ -public class SolrMessageProcessor extends MessageProcessor implements IQueueHandler<MirroredSolrRequest> { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private final MetricRegistry metrics = SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY); +public class SolrMessageProcessor extends MessageProcessor + implements IQueueHandler<MirroredSolrRequest<?>> { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - final CloudSolrClient client; + private final MetricRegistry metrics = + SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY); - private static final String VERSION_FIELD = "_version_"; + final CloudSolrClient client; - public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy resubmitBackoffPolicy) { - super(resubmitBackoffPolicy); - this.client = client; - } + private static final String VERSION_FIELD = "_version_"; - @Override - public Result<MirroredSolrRequest> handleItem(MirroredSolrRequest mirroredSolrRequest) { - try (final MDC.MDCCloseable mdc = MDC.putCloseable("collection", getCollectionFromRequest(mirroredSolrRequest))) { - connectToSolrIfNeeded(); + public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy resubmitBackoffPolicy) { + super(resubmitBackoffPolicy); + this.client = client; + } - // preventCircularMirroring(mirroredSolrRequest); TODO: isn't this handled by the mirroring handler? + @Override + @SuppressWarnings("try") + public Result<MirroredSolrRequest<?>> handleItem(MirroredSolrRequest<?> mirroredSolrRequest) { + try (final MDC.MDCCloseable ignored = + MDC.putCloseable("collection", getCollectionFromRequest(mirroredSolrRequest))) { + connectToSolrIfNeeded(); - return processMirroredRequest(mirroredSolrRequest); - } - } + // preventCircularMirroring(mirroredSolrRequest); TODO: isn't this handled by the mirroring Review Comment: Move TODO above ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java: ########## @@ -70,95 +89,107 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer { private final ThreadPoolExecutor executor; - private final ExecutorService offsetCheckExecutor = Executors.newCachedThreadPool(r -> { - Thread t = new Thread(r); - t.setName("offset-check-thread"); - return t; - }); - private PartitionManager partitionManager; - - private BlockingQueue<Runnable> queue = new BlockingQueue<>(10); - + private final ExecutorService offsetCheckExecutor = + ExecutorUtil.newMDCAwareCachedThreadPool( + r -> { Review Comment: This can probably be made more succinct. ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java: ########## @@ -70,95 +89,107 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer { private final ThreadPoolExecutor executor; - private final ExecutorService offsetCheckExecutor = Executors.newCachedThreadPool(r -> { - Thread t = new Thread(r); - t.setName("offset-check-thread"); - return t; - }); - private PartitionManager partitionManager; - - private BlockingQueue<Runnable> queue = new BlockingQueue<>(10); - + private final ExecutorService offsetCheckExecutor = + ExecutorUtil.newMDCAwareCachedThreadPool( + r -> { + Thread t = new Thread(r, "offset-check-thread"); + return t; + }); + private final PartitionManager partitionManager; + private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10); /** - * @param conf The Kafka consumer configuration + * @param conf The Kafka consumer configuration * @param startLatch */ public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) { this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(","); this.maxAttempts = conf.getInt(KafkaCrossDcConf.MAX_ATTEMPTS); - this.collapseUpdates = CrossDcConf.CollapseUpdates.getOrDefault(conf.get(CrossDcConf.COLLAPSE_UPDATES), CrossDcConf.CollapseUpdates.PARTIAL); + this.collapseUpdates = + CrossDcConf.CollapseUpdates.getOrDefault( + conf.get(CrossDcConf.COLLAPSE_UPDATES), CrossDcConf.CollapseUpdates.PARTIAL); this.maxCollapseRecords = conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS); this.startLatch = startLatch; final Properties kafkaConsumerProps = new Properties(); - kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); + kafkaConsumerProps.put( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, conf.get(KafkaCrossDcConf.GROUP_ID)); - kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS)); + kafkaConsumerProps.put( + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS)); - kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS)); + kafkaConsumerProps.put( + ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, + conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS)); - kafkaConsumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS)); + kafkaConsumerProps.put( + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS)); kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES)); - kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS)); + kafkaConsumerProps.put( + ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES)); + kafkaConsumerProps.put( + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS)); - kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES)); - kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES)); + kafkaConsumerProps.put( + ConsumerConfig.FETCH_MAX_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES)); + kafkaConsumerProps.put( + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, + conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES)); - kafkaConsumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); + kafkaConsumerProps.put( + ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps); kafkaConsumerProps.putAll(conf.getAdditionalProperties()); int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS); - executor = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, queue, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("KafkaCrossDcConsumerWorker"); - return t; - } - }); + executor = + new ExecutorUtil.MDCAwareThreadPoolExecutor( + threads, + threads, + 0L, + TimeUnit.MILLISECONDS, + queue, + r -> { Review Comment: This too ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java: ########## @@ -70,95 +89,107 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer { private final ThreadPoolExecutor executor; - private final ExecutorService offsetCheckExecutor = Executors.newCachedThreadPool(r -> { - Thread t = new Thread(r); - t.setName("offset-check-thread"); - return t; - }); - private PartitionManager partitionManager; - - private BlockingQueue<Runnable> queue = new BlockingQueue<>(10); - + private final ExecutorService offsetCheckExecutor = + ExecutorUtil.newMDCAwareCachedThreadPool( + r -> { + Thread t = new Thread(r, "offset-check-thread"); + return t; + }); + private final PartitionManager partitionManager; + private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10); /** - * @param conf The Kafka consumer configuration + * @param conf The Kafka consumer configuration * @param startLatch */ public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) { this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(","); this.maxAttempts = conf.getInt(KafkaCrossDcConf.MAX_ATTEMPTS); - this.collapseUpdates = CrossDcConf.CollapseUpdates.getOrDefault(conf.get(CrossDcConf.COLLAPSE_UPDATES), CrossDcConf.CollapseUpdates.PARTIAL); + this.collapseUpdates = + CrossDcConf.CollapseUpdates.getOrDefault( + conf.get(CrossDcConf.COLLAPSE_UPDATES), CrossDcConf.CollapseUpdates.PARTIAL); this.maxCollapseRecords = conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS); this.startLatch = startLatch; final Properties kafkaConsumerProps = new Properties(); - kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); + kafkaConsumerProps.put( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, conf.get(KafkaCrossDcConf.GROUP_ID)); - kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS)); + kafkaConsumerProps.put( + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS)); - kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS)); + kafkaConsumerProps.put( + ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, + conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS)); - kafkaConsumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS)); + kafkaConsumerProps.put( + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS)); kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES)); - kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS)); + kafkaConsumerProps.put( + ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES)); + kafkaConsumerProps.put( + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS)); - kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES)); - kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES)); + kafkaConsumerProps.put( + ConsumerConfig.FETCH_MAX_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES)); + kafkaConsumerProps.put( + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, + conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES)); - kafkaConsumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); + kafkaConsumerProps.put( + ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps); kafkaConsumerProps.putAll(conf.getAdditionalProperties()); int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS); - executor = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, queue, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName("KafkaCrossDcConsumerWorker"); - return t; - } - }); + executor = + new ExecutorUtil.MDCAwareThreadPoolExecutor( + threads, + threads, + 0L, + TimeUnit.MILLISECONDS, + queue, + r -> { + Thread t = new Thread(r, "KafkaCrossDcConsumerWorker"); + return t; + }); executor.prestartAllCoreThreads(); solrClient = createSolrClient(conf); messageProcessor = createSolrMessageProcessor(); - - log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProps); kafkaConsumer = createKafkaConsumer(kafkaConsumerProps); partitionManager = new PartitionManager(kafkaConsumer); // Create producer for resubmitting failed requests log.info("Creating Kafka resubmit producer"); this.kafkaMirroringSink = createKafkaMirroringSink(conf); log.info("Created Kafka resubmit producer"); - } protected SolrMessageProcessor createSolrMessageProcessor() { return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L); } - public KafkaConsumer<String,MirroredSolrRequest> createKafkaConsumer(Properties properties) { - return new KafkaConsumer<>(properties, new StringDeserializer(), new MirroredSolrRequestSerializer()); + public KafkaConsumer<String, MirroredSolrRequest<?>> createKafkaConsumer(Properties properties) { + return new KafkaConsumer<>( + properties, new StringDeserializer(), new MirroredSolrRequestSerializer()); } /** - * This is where the magic happens. - * 1. Polls and gets the packets from the queue - * 2. Extract the MirroredSolrRequest objects - * 3. Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic. + * This is where the magic happens. 1. Polls and gets the packets from the queue 2. Extract the Review Comment: Make prettier ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java: ########## @@ -253,18 +290,20 @@ boolean pollAndProcessRequests() { } // it's an update but with different params Review Comment: Maybe make this less confusing? (in terms of spacing) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org For additional commands, e-mail: issues-h...@solr.apache.org