HoustonPutman commented on code in PR #2541:
URL: https://github.com/apache/solr/pull/2541#discussion_r1657812390


##########
solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java:
##########
@@ -46,21 +45,21 @@ public static void ensureWorkingMockito() {
 
   @BeforeClass
   public static void beforeSimpleSolrIntegrationTest() throws Exception {
-
+    System.setProperty("solr.crossdc.bootstrapServers", "doesnotmatter:9092");
+    System.setProperty("solr.crossdc.topicName", "doesnotmatter");
     cluster1 =
         configureCluster(2)
-            .addConfig("conf", getFile("configs/cloud-minimal/conf").toPath())
+            //           .addConfig("conf", 
getFile("configs/cloud-minimal/conf").toPath())

Review Comment:
   why is this commented out?



##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java:
##########
@@ -29,345 +33,369 @@
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
 import org.apache.solr.crossdc.common.CrossDcConstants;
 import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
 import org.apache.solr.crossdc.common.SolrExceptionUtil;
-import org.apache.solr.crossdc.consumer.Consumer;
+import org.apache.solr.crossdc.manager.consumer.Consumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 /**
- * Message processor implements all the logic to process a MirroredSolrRequest.
- * It handles:
- *  1. Sending the update request to Solr
- *  2. Discarding or retrying failed requests
- *  3. Flagging requests for resubmission by the underlying consumer 
implementation.
+ * Message processor implements all the logic to process a 
MirroredSolrRequest. It handles: 1.

Review Comment:
   We can probably fix this by adding line breaks and stuff



##########
solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java:
##########
@@ -248,8 +288,10 @@ public void processDelete(final DeleteUpdateCommand cmd) 
throws IOException {
         if (log.isDebugEnabled())
           log.debug("processDelete doMirroring={} isLeader={} cmd={}", true, 
isLeader, cmd);
       } else {
-        // DBQs are sent to each shard leader, so we mirror from the original 
node to only mirror once
-        // In general there's no way to guarantee that these run identically 
on the mirror since there are no
+        // DBQs are sent to each shard leader, so we mirror from the original 
node to only mirror
+        // once
+        // In general there's no way to guarantee that these run identically 
on the mirror since
+        // there are no

Review Comment:
   Make this a little prettier



##########
solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java:
##########
@@ -28,178 +37,200 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS;
-
 public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
 
-    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-    private final KafkaCrossDcConf conf;
-    private final Producer<String, MirroredSolrRequest> producer;
-    private final KafkaConsumer<String,MirroredSolrRequest> consumer;
-    private final String mainTopic;
-    private final String dlqTopic;
-
-    public KafkaMirroringSink(final KafkaCrossDcConf conf) {
-        // Create Kafka Mirroring Sink
-        this.conf = conf;
-        this.producer = initProducer();
-        this.consumer = initConsumer();
-        this.mainTopic = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",")[0];
-        this.dlqTopic = conf.get(KafkaCrossDcConf.DLQ_TOPIC_NAME);
-
-        checkTopicsAvailability();
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final KafkaCrossDcConf conf;
+  private final Producer<String, MirroredSolrRequest<?>> producer;
+  private final KafkaConsumer<String, MirroredSolrRequest<?>> consumer;
+  private final String mainTopic;
+  private final String dlqTopic;
+
+  public KafkaMirroringSink(final KafkaCrossDcConf conf) {
+    // Create Kafka Mirroring Sink
+    this.conf = conf;
+    this.producer = initProducer();
+    this.consumer = initConsumer();
+    this.mainTopic = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",")[0];
+    this.dlqTopic = conf.get(KafkaCrossDcConf.DLQ_TOPIC_NAME);
+
+    checkTopicsAvailability();
+  }
+
+  @Override
+  public void submit(MirroredSolrRequest<?> request) throws MirroringException 
{
+    this.submitRequest(request, mainTopic);
+  }
+
+  @Override
+  public void submitToDlq(MirroredSolrRequest<?> request) throws 
MirroringException {
+    if (dlqTopic != null) {
+      this.submitRequest(request, dlqTopic);
+    } else {
+      if (log.isInfoEnabled()) {
+        log.info("- no DLQ, dropping failed {}", request);
+      }
     }
+  }
 
-    @Override
-    public void submit(MirroredSolrRequest request) throws MirroringException {
-        this.submitRequest(request, mainTopic);
-    }
+  private void checkTopicsAvailability() {
+    final Map<String, List<PartitionInfo>> topics = this.consumer.listTopics();
 
-    @Override
-    public void submitToDlq(MirroredSolrRequest request) throws 
MirroringException {
-        if (dlqTopic != null) {
-            this.submitRequest(request, dlqTopic);
-        } else {
-            if (log.isInfoEnabled()) {
-                log.info("- no DLQ, dropping failed {}", request);
-            }
-        }
+    if (mainTopic != null && !topics.containsKey(mainTopic)) {
+      throw new RuntimeException("Main topic " + mainTopic + " is not 
available");
     }
-
-    private void checkTopicsAvailability() {
-        final Map<String, List<PartitionInfo>> topics = 
this.consumer.listTopics();
-
-        if (mainTopic != null && !topics.containsKey(mainTopic)) {
-            throw new RuntimeException("Main topic " + mainTopic + " is not 
available");
-        }
-        if (dlqTopic != null && !topics.containsKey(dlqTopic)) {
-            throw new RuntimeException("DLQ topic " + dlqTopic + " is not 
available");
-        }
+    if (dlqTopic != null && !topics.containsKey(dlqTopic)) {
+      throw new RuntimeException("DLQ topic " + dlqTopic + " is not 
available");
     }
+  }
 
-    private void submitRequest(MirroredSolrRequest request, String topicName) 
throws MirroringException {
-        if (log.isDebugEnabled()) {
-            log.debug("About to submit a MirroredSolrRequest");
-        }
-
-        final long enqueueStartNanos = System.nanoTime();
-
-        // Create Producer record
-        try {
+  private void submitRequest(MirroredSolrRequest<?> request, String topicName)
+      throws MirroringException {
+    if (log.isDebugEnabled()) {
+      log.debug("About to submit a MirroredSolrRequest");
+    }
 
-            producer.send(new ProducerRecord<>(topicName, request), (metadata, 
exception) -> {
-                if (exception != null) {
-                    log.error("Failed adding update to CrossDC queue! 
request=" + request.getSolrRequest(), exception);
-                }
-            });
+    final long enqueueStartNanos = System.nanoTime();
 
-            long lastSuccessfulEnqueueNanos = System.nanoTime();
-            // Record time since last successful enqueue as 0
-            long elapsedTimeMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
-            // Update elapsed time
+    // Create Producer record
+    try {
 
-            if (elapsedTimeMillis > conf.getInt(SLOW_SUBMIT_THRESHOLD_MS)) {
-                slowSubmitAction(request, elapsedTimeMillis);
+      producer.send(
+          new ProducerRecord<>(topicName, request),
+          (metadata, exception) -> {
+            if (exception != null) {
+              log.error(
+                  "Failed adding update to CrossDC queue! request=" + 
request.getSolrRequest(),
+                  exception);
             }
-        } catch (Exception e) {
-            // We are intentionally catching all exceptions, the expected 
exception form this function is {@link MirroringException}
-            String message = "Unable to enqueue request " + request + ", 
configured retries is" + conf.getInt(KafkaCrossDcConf.NUM_RETRIES) +
-                    " and configured max delivery timeout in ms is " + 
conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS);
-            log.error(message, e);
-            throw new MirroringException(message, e);
-        }
+          });
+
+      long lastSuccessfulEnqueueNanos = System.nanoTime();
+      // Record time since last successful enqueue as 0
+      long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() 
- enqueueStartNanos);
+      // Update elapsed time
+
+      if (elapsedTimeMillis > conf.getInt(SLOW_SUBMIT_THRESHOLD_MS)) {
+        slowSubmitAction(request, elapsedTimeMillis);
+      }
+    } catch (Exception e) {
+      // We are intentionally catching all exceptions, the expected exception 
form this function is
+      // {@link MirroringException}
+      String message =
+          "Unable to enqueue request "
+              + request
+              + ", configured retries is"
+              + conf.getInt(KafkaCrossDcConf.NUM_RETRIES)
+              + " and configured max delivery timeout in ms is "
+              + conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS);
+      log.error(message, e);
+      throw new MirroringException(message, e);
     }
-
-    /**
-     * Create and init the producer using {@link this#conf}
-     * All producer configs are listed here
-     * https://kafka.apache.org/documentation/#producerconfigs
-     *
-     * @return
-     */
-    private Producer<String, MirroredSolrRequest> initProducer() {
-        // Initialize and return Kafka producer
-        Properties kafkaProducerProps = new Properties();
-
-        log.info("Starting CrossDC Producer {}", conf);
-
-        kafkaProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
-
-        kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        String retries = conf.get(KafkaCrossDcConf.NUM_RETRIES);
-        if (retries != null) {
-            kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, 
Integer.parseInt(retries));
-        }
-        kafkaProducerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.RETRY_BACKOFF_MS));
-        kafkaProducerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS));
-        kafkaProducerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES));
-        kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 
conf.getInt(KafkaCrossDcConf.BATCH_SIZE_BYTES));
-        kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 
conf.getInt(KafkaCrossDcConf.BUFFER_MEMORY_BYTES));
-        kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.LINGER_MS));
-        kafkaProducerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); // should be less than time 
that causes consumer to be kicked out
-        kafkaProducerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
conf.get(KafkaCrossDcConf.ENABLE_DATA_COMPRESSION));
-
-        kafkaProducerProps.put("key.serializer", 
StringSerializer.class.getName());
-        kafkaProducerProps.put("value.serializer", 
MirroredSolrRequestSerializer.class.getName());
-
-        KafkaCrossDcConf.addSecurityProps(conf, kafkaProducerProps);
-
-        kafkaProducerProps.putAll(conf.getAdditionalProperties());
-
-        if (log.isDebugEnabled()) {
-            log.debug("Kafka Producer props={}", kafkaProducerProps);
-        }
-
-        ClassLoader originalContextClassLoader = 
Thread.currentThread().getContextClassLoader();
-        Thread.currentThread().setContextClassLoader(null);
-        Producer<String, MirroredSolrRequest> producer;
-        try {
-            producer = new KafkaProducer<>(kafkaProducerProps);
-        } finally {
-            
Thread.currentThread().setContextClassLoader(originalContextClassLoader);
-        }
-        return producer;
-    }
-
-    private KafkaConsumer<String, MirroredSolrRequest> initConsumer() {
-        final Properties kafkaConsumerProperties = new Properties();
-
-        kafkaConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
-        kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, 
conf.get(KafkaCrossDcConf.GROUP_ID));
-        kafkaConsumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
-        
kafkaConsumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
-        kafkaConsumerProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
-        kafkaConsumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        kafkaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
false);
-        kafkaConsumerProperties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
-        kafkaConsumerProperties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
-        kafkaConsumerProperties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
-        
kafkaConsumerProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
-        kafkaConsumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS));
-        kafkaConsumerProperties.putAll(conf.getAdditionalProperties());
-
-        return new KafkaConsumer<>(kafkaConsumerProperties, new 
StringDeserializer(), new MirroredSolrRequestSerializer());
+  }
+
+  /**
+   * Create and init the producer using {@link this#conf} All producer configs 
are listed here
+   * https://kafka.apache.org/documentation/#producerconfigs
+   *
+   * @return
+   */
+  private Producer<String, MirroredSolrRequest<?>> initProducer() {
+    // Initialize and return Kafka producer
+    Properties kafkaProducerProps = new Properties();
+
+    log.info("Starting CrossDC Producer {}", conf);
+
+    kafkaProducerProps.put(
+        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+
+    kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+    String retries = conf.get(KafkaCrossDcConf.NUM_RETRIES);
+    if (retries != null) {
+      kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, 
Integer.parseInt(retries));
     }
-
-    private void slowSubmitAction(Object request, long elapsedTimeMillis) {
-        log.warn("Enqueuing the request to Kafka took more than {} millis. 
enqueueElapsedTime={}",
-                conf.get(KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS),
-                elapsedTimeMillis);
+    kafkaProducerProps.put(
+        ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.RETRY_BACKOFF_MS));
+    kafkaProducerProps.put(
+        ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
+        conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS));
+    kafkaProducerProps.put(
+        ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
+        conf.getInt(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES));
+    kafkaProducerProps.put(
+        ProducerConfig.BATCH_SIZE_CONFIG, 
conf.getInt(KafkaCrossDcConf.BATCH_SIZE_BYTES));
+    kafkaProducerProps.put(
+        ProducerConfig.BUFFER_MEMORY_CONFIG, 
conf.getInt(KafkaCrossDcConf.BUFFER_MEMORY_BYTES));
+    kafkaProducerProps.put(
+        ProducerConfig.LINGER_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.LINGER_MS));
+    kafkaProducerProps.put(
+        ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
+        conf.getInt(
+            KafkaCrossDcConf
+                .REQUEST_TIMEOUT_MS)); // should be less than time that causes 
consumer to be kicked

Review Comment:
   Yeah this can probably be made prettier too



##########
solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java:
##########
@@ -263,58 +305,66 @@ public void processDelete(final DeleteUpdateCommand cmd) 
throws IOException {
             throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
           }
         }
-        if (log.isDebugEnabled())
+        if (log.isDebugEnabled()) {
           log.debug("processDelete doMirroring={} cmd={}", true, cmd);
+        }
       }
-
     }
   }
 
-  private static void processDBQResults(SolrClient client, String collection, 
String uniqueField,
-      QueryResponse rsp)
+  private static void processDBQResults(
+      SolrClient client, String collection, String uniqueField, QueryResponse 
rsp)
       throws SolrServerException, IOException {
     SolrDocumentList results = rsp.getResults();
     List<String> ids = new ArrayList<>(results.size());
-    results.forEach(entries -> {
-      String id = entries.getFirstValue(uniqueField).toString();
-      ids.add(id);
-    });
-    if (ids.size() > 0) {
+    results.forEach(
+        entries -> {
+          String id = entries.getFirstValue(uniqueField).toString();
+          ids.add(id);
+        });
+    if (!ids.isEmpty()) {
       client.deleteById(collection, ids);
     }
   }
 
   boolean isLeader(SolrQueryRequest req, String id, String route, 
SolrInputDocument doc) {
-    CloudDescriptor cloudDesc =
-        req.getCore().getCoreDescriptor().getCloudDescriptor();
+    CloudDescriptor cloudDesc = 
req.getCore().getCoreDescriptor().getCloudDescriptor();
     String collection = cloudDesc.getCollectionName();
     ClusterState clusterState =
         req.getCore().getCoreContainer().getZkController().getClusterState();
     DocCollection coll = clusterState.getCollection(collection);
     Slice slice = coll.getRouter().getTargetSlice(id, doc, route, 
req.getParams(), coll);
 
     if (slice == null) {
-      // No slice found.  Most strict routers will have already thrown an 
exception, so a null return is
+      // No slice found.  Most strict routers will have already thrown an 
exception, so a null
+      // return is

Review Comment:
   Make prettier



##########
solr/modules/cross-dc/src/test-files/configs/cloud-minimal.zip:
##########


Review Comment:
   Do we actually need this?



##########
solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java:
##########
@@ -39,8 +39,9 @@ public class KafkaCrossDcConf extends CrossDcConf {
   public static final String DEFAULT_MAX_REQUEST_SIZE = "5242880";
   public static final String DEFAULT_ENABLE_DATA_COMPRESSION = "none";
   private static final String DEFAULT_INDEX_UNMIRRORABLE_DOCS = "false";
-  public static final String DEFAULT_SLOW_SEND_THRESHOLD= "1000";
-  public static final String DEFAULT_NUM_RETRIES = null; // by default, we 
control retries with DELIVERY_TIMEOUT_MS_DOC
+  public static final String DEFAULT_SLOW_SEND_THRESHOLD = "1000";
+  public static final String DEFAULT_NUM_RETRIES =
+      null; // by default, we control retries with DELIVERY_TIMEOUT_MS_DOC

Review Comment:
   Lets put the comment on a separate line



##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java:
##########
@@ -29,345 +33,369 @@
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
 import org.apache.solr.crossdc.common.CrossDcConstants;
 import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
 import org.apache.solr.crossdc.common.SolrExceptionUtil;
-import org.apache.solr.crossdc.consumer.Consumer;
+import org.apache.solr.crossdc.manager.consumer.Consumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 /**
- * Message processor implements all the logic to process a MirroredSolrRequest.
- * It handles:
- *  1. Sending the update request to Solr
- *  2. Discarding or retrying failed requests
- *  3. Flagging requests for resubmission by the underlying consumer 
implementation.
+ * Message processor implements all the logic to process a 
MirroredSolrRequest. It handles: 1.
+ * Sending the update request to Solr 2. Discarding or retrying failed 
requests 3. Flagging requests
+ * for resubmission by the underlying consumer implementation.
  */
-public class SolrMessageProcessor extends MessageProcessor implements 
IQueueHandler<MirroredSolrRequest>  {
-    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-    private final MetricRegistry metrics = 
SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);
+public class SolrMessageProcessor extends MessageProcessor
+    implements IQueueHandler<MirroredSolrRequest<?>> {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-    final CloudSolrClient client;
+  private final MetricRegistry metrics =
+      SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);
 
-    private static final String VERSION_FIELD = "_version_";
+  final CloudSolrClient client;
 
-    public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy 
resubmitBackoffPolicy) {
-        super(resubmitBackoffPolicy);
-        this.client = client;
-    }
+  private static final String VERSION_FIELD = "_version_";
 
-    @Override
-    public Result<MirroredSolrRequest> handleItem(MirroredSolrRequest 
mirroredSolrRequest) {
-        try (final MDC.MDCCloseable mdc = MDC.putCloseable("collection", 
getCollectionFromRequest(mirroredSolrRequest))) {
-            connectToSolrIfNeeded();
+  public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy 
resubmitBackoffPolicy) {
+    super(resubmitBackoffPolicy);
+    this.client = client;
+  }
 
-            // preventCircularMirroring(mirroredSolrRequest); TODO: isn't this 
handled by the mirroring handler?
+  @Override
+  @SuppressWarnings("try")
+  public Result<MirroredSolrRequest<?>> handleItem(MirroredSolrRequest<?> 
mirroredSolrRequest) {
+    try (final MDC.MDCCloseable ignored =
+        MDC.putCloseable("collection", 
getCollectionFromRequest(mirroredSolrRequest))) {
+      connectToSolrIfNeeded();
 
-            return processMirroredRequest(mirroredSolrRequest);
-        }
-    }
+      // preventCircularMirroring(mirroredSolrRequest); TODO: isn't this 
handled by the mirroring
+      // handler?
 
-    private Result<MirroredSolrRequest> 
processMirroredRequest(MirroredSolrRequest request) {
-        final Result<MirroredSolrRequest> result = handleSolrRequest(request);
-        // Back-off before returning
-        backoffIfNeeded(result, request.getType());
-        return result;
+      return processMirroredRequest(mirroredSolrRequest);
     }
+  }
 
-    private Result<MirroredSolrRequest> handleSolrRequest(MirroredSolrRequest 
mirroredSolrRequest) {
-
-        SolrRequest request = mirroredSolrRequest.getSolrRequest();
-        final SolrParams requestParams = request.getParams();
+  private Result<MirroredSolrRequest<?>> 
processMirroredRequest(MirroredSolrRequest<?> request) {
+    final Result<MirroredSolrRequest<?>> result = handleSolrRequest(request);
+    // Back-off before returning
+    backoffIfNeeded(result, request.getType());
+    return result;
+  }
 
-        if (log.isDebugEnabled()) {
-            log.debug("handleSolrRequest start params={}", requestParams);
-        }
+  private Result<MirroredSolrRequest<?>> handleSolrRequest(
+      MirroredSolrRequest<?> mirroredSolrRequest) {
 
-        // TODO: isn't this handled by the mirroring handler?
-//        final String shouldMirror = requestParams.get("shouldMirror");
-//
-//        if ("false".equalsIgnoreCase(shouldMirror)) {
-//            log.warn("Skipping mirrored request because shouldMirror is set 
to false. request={}", requestParams);

Review Comment:
   Should we just get rid of this commented block?



##########
solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java:
##########
@@ -114,12 +115,18 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
   public static final String FETCH_MAX_BYTES = "solr.crossdc.fetchMaxBytes";
 
-  // The maximum delay between invocations of poll() when using consumer group 
management. This places
-  // an upper bound on the amount of time that the consumer can be idle before 
fetching more records.
-  // If poll() is not called before expiration of this timeout, then the 
consumer is considered failed
-  // and the group will rebalance in order to reassign the partitions to 
another member. For consumers
-  // using a non-null <code>group.instance.id</code> which reach this timeout, 
partitions will not be
-  // immediately reassigned. Instead, the consumer will stop sending 
heartbeats and partitions will be
+  // The maximum delay between invocations of poll() when using consumer group 
management. This
+  // places
+  // an upper bound on the amount of time that the consumer can be idle before 
fetching more
+  // records.
+  // If poll() is not called before expiration of this timeout, then the 
consumer is considered
+  // failed
+  // and the group will rebalance in order to reassign the partitions to 
another member. For
+  // consumers
+  // using a non-null <code>group.instance.id</code> which reach this timeout, 
partitions will not
+  // be
+  // immediately reassigned. Instead, the consumer will stop sending 
heartbeats and partitions will
+  // be

Review Comment:
   Annoying, but should be easy to fix.



##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java:
##########
@@ -29,345 +33,369 @@
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
 import org.apache.solr.crossdc.common.CrossDcConstants;
 import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
 import org.apache.solr.crossdc.common.SolrExceptionUtil;
-import org.apache.solr.crossdc.consumer.Consumer;
+import org.apache.solr.crossdc.manager.consumer.Consumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 /**
- * Message processor implements all the logic to process a MirroredSolrRequest.
- * It handles:
- *  1. Sending the update request to Solr
- *  2. Discarding or retrying failed requests
- *  3. Flagging requests for resubmission by the underlying consumer 
implementation.
+ * Message processor implements all the logic to process a 
MirroredSolrRequest. It handles: 1.
+ * Sending the update request to Solr 2. Discarding or retrying failed 
requests 3. Flagging requests
+ * for resubmission by the underlying consumer implementation.
  */
-public class SolrMessageProcessor extends MessageProcessor implements 
IQueueHandler<MirroredSolrRequest>  {
-    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-    private final MetricRegistry metrics = 
SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);
+public class SolrMessageProcessor extends MessageProcessor
+    implements IQueueHandler<MirroredSolrRequest<?>> {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-    final CloudSolrClient client;
+  private final MetricRegistry metrics =
+      SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);
 
-    private static final String VERSION_FIELD = "_version_";
+  final CloudSolrClient client;
 
-    public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy 
resubmitBackoffPolicy) {
-        super(resubmitBackoffPolicy);
-        this.client = client;
-    }
+  private static final String VERSION_FIELD = "_version_";
 
-    @Override
-    public Result<MirroredSolrRequest> handleItem(MirroredSolrRequest 
mirroredSolrRequest) {
-        try (final MDC.MDCCloseable mdc = MDC.putCloseable("collection", 
getCollectionFromRequest(mirroredSolrRequest))) {
-            connectToSolrIfNeeded();
+  public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy 
resubmitBackoffPolicy) {
+    super(resubmitBackoffPolicy);
+    this.client = client;
+  }
 
-            // preventCircularMirroring(mirroredSolrRequest); TODO: isn't this 
handled by the mirroring handler?
+  @Override
+  @SuppressWarnings("try")
+  public Result<MirroredSolrRequest<?>> handleItem(MirroredSolrRequest<?> 
mirroredSolrRequest) {
+    try (final MDC.MDCCloseable ignored =
+        MDC.putCloseable("collection", 
getCollectionFromRequest(mirroredSolrRequest))) {
+      connectToSolrIfNeeded();
 
-            return processMirroredRequest(mirroredSolrRequest);
-        }
-    }
+      // preventCircularMirroring(mirroredSolrRequest); TODO: isn't this 
handled by the mirroring

Review Comment:
   Move TODO above



##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java:
##########
@@ -70,95 +89,107 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
   private final ThreadPoolExecutor executor;
 
-  private final ExecutorService offsetCheckExecutor = 
Executors.newCachedThreadPool(r -> {
-      Thread t = new Thread(r);
-      t.setName("offset-check-thread");
-      return t;
-  });
-  private PartitionManager partitionManager;
-
-  private BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
-
+  private final ExecutorService offsetCheckExecutor =
+      ExecutorUtil.newMDCAwareCachedThreadPool(
+          r -> {

Review Comment:
   This can probably be made more succinct.



##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java:
##########
@@ -70,95 +89,107 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
   private final ThreadPoolExecutor executor;
 
-  private final ExecutorService offsetCheckExecutor = 
Executors.newCachedThreadPool(r -> {
-      Thread t = new Thread(r);
-      t.setName("offset-check-thread");
-      return t;
-  });
-  private PartitionManager partitionManager;
-
-  private BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
-
+  private final ExecutorService offsetCheckExecutor =
+      ExecutorUtil.newMDCAwareCachedThreadPool(
+          r -> {
+            Thread t = new Thread(r, "offset-check-thread");
+            return t;
+          });
+  private final PartitionManager partitionManager;
 
+  private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
 
   /**
-   * @param conf       The Kafka consumer configuration
+   * @param conf The Kafka consumer configuration
    * @param startLatch
    */
   public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch 
startLatch) {
 
     this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
     this.maxAttempts = conf.getInt(KafkaCrossDcConf.MAX_ATTEMPTS);
-    this.collapseUpdates = 
CrossDcConf.CollapseUpdates.getOrDefault(conf.get(CrossDcConf.COLLAPSE_UPDATES),
 CrossDcConf.CollapseUpdates.PARTIAL);
+    this.collapseUpdates =
+        CrossDcConf.CollapseUpdates.getOrDefault(
+            conf.get(CrossDcConf.COLLAPSE_UPDATES), 
CrossDcConf.CollapseUpdates.PARTIAL);
     this.maxCollapseRecords = 
conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS);
     this.startLatch = startLatch;
     final Properties kafkaConsumerProps = new Properties();
 
-    kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+    kafkaConsumerProps.put(
+        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
 
     kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
conf.get(KafkaCrossDcConf.GROUP_ID));
 
-    kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
+    kafkaConsumerProps.put(
+        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
 
-    kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
+    kafkaConsumerProps.put(
+        ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
+        conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
 
-    kafkaConsumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
+    kafkaConsumerProps.put(
+        ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
     kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
 
     kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
-    kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
-    kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
+    kafkaConsumerProps.put(
+        ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
+    kafkaConsumerProps.put(
+        ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
 
-    kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
-    kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
+    kafkaConsumerProps.put(
+        ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
+    kafkaConsumerProps.put(
+        ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
+        conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
 
-    kafkaConsumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS));
+    kafkaConsumerProps.put(
+        ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS));
 
     KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps);
 
     kafkaConsumerProps.putAll(conf.getAdditionalProperties());
     int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS);
 
-    executor = new ThreadPoolExecutor(threads, threads, 0L, 
TimeUnit.MILLISECONDS, queue, new ThreadFactory() {
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread t = new Thread(r);
-                t.setName("KafkaCrossDcConsumerWorker");
-                return t;
-            }
-    });
+    executor =
+        new ExecutorUtil.MDCAwareThreadPoolExecutor(
+            threads,
+            threads,
+            0L,
+            TimeUnit.MILLISECONDS,
+            queue,
+            r -> {

Review Comment:
   This too



##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java:
##########
@@ -70,95 +89,107 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
   private final ThreadPoolExecutor executor;
 
-  private final ExecutorService offsetCheckExecutor = 
Executors.newCachedThreadPool(r -> {
-      Thread t = new Thread(r);
-      t.setName("offset-check-thread");
-      return t;
-  });
-  private PartitionManager partitionManager;
-
-  private BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
-
+  private final ExecutorService offsetCheckExecutor =
+      ExecutorUtil.newMDCAwareCachedThreadPool(
+          r -> {
+            Thread t = new Thread(r, "offset-check-thread");
+            return t;
+          });
+  private final PartitionManager partitionManager;
 
+  private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
 
   /**
-   * @param conf       The Kafka consumer configuration
+   * @param conf The Kafka consumer configuration
    * @param startLatch
    */
   public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch 
startLatch) {
 
     this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
     this.maxAttempts = conf.getInt(KafkaCrossDcConf.MAX_ATTEMPTS);
-    this.collapseUpdates = 
CrossDcConf.CollapseUpdates.getOrDefault(conf.get(CrossDcConf.COLLAPSE_UPDATES),
 CrossDcConf.CollapseUpdates.PARTIAL);
+    this.collapseUpdates =
+        CrossDcConf.CollapseUpdates.getOrDefault(
+            conf.get(CrossDcConf.COLLAPSE_UPDATES), 
CrossDcConf.CollapseUpdates.PARTIAL);
     this.maxCollapseRecords = 
conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS);
     this.startLatch = startLatch;
     final Properties kafkaConsumerProps = new Properties();
 
-    kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+    kafkaConsumerProps.put(
+        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
 
     kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
conf.get(KafkaCrossDcConf.GROUP_ID));
 
-    kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
+    kafkaConsumerProps.put(
+        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
 
-    kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
+    kafkaConsumerProps.put(
+        ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
+        conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
 
-    kafkaConsumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
+    kafkaConsumerProps.put(
+        ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
     kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
 
     kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
-    kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
-    kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
+    kafkaConsumerProps.put(
+        ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
+    kafkaConsumerProps.put(
+        ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
 
-    kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
-    kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
+    kafkaConsumerProps.put(
+        ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
+    kafkaConsumerProps.put(
+        ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
+        conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
 
-    kafkaConsumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS));
+    kafkaConsumerProps.put(
+        ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS));
 
     KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps);
 
     kafkaConsumerProps.putAll(conf.getAdditionalProperties());
     int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS);
 
-    executor = new ThreadPoolExecutor(threads, threads, 0L, 
TimeUnit.MILLISECONDS, queue, new ThreadFactory() {
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread t = new Thread(r);
-                t.setName("KafkaCrossDcConsumerWorker");
-                return t;
-            }
-    });
+    executor =
+        new ExecutorUtil.MDCAwareThreadPoolExecutor(
+            threads,
+            threads,
+            0L,
+            TimeUnit.MILLISECONDS,
+            queue,
+            r -> {
+              Thread t = new Thread(r, "KafkaCrossDcConsumerWorker");
+              return t;
+            });
     executor.prestartAllCoreThreads();
 
     solrClient = createSolrClient(conf);
 
     messageProcessor = createSolrMessageProcessor();
 
-
-
     log.info("Creating Kafka consumer with configuration {}", 
kafkaConsumerProps);
     kafkaConsumer = createKafkaConsumer(kafkaConsumerProps);
     partitionManager = new PartitionManager(kafkaConsumer);
     // Create producer for resubmitting failed requests
     log.info("Creating Kafka resubmit producer");
     this.kafkaMirroringSink = createKafkaMirroringSink(conf);
     log.info("Created Kafka resubmit producer");
-
   }
 
   protected SolrMessageProcessor createSolrMessageProcessor() {
     return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L);
   }
 
-  public KafkaConsumer<String,MirroredSolrRequest> 
createKafkaConsumer(Properties properties) {
-    return new KafkaConsumer<>(properties, new StringDeserializer(), new 
MirroredSolrRequestSerializer());
+  public KafkaConsumer<String, MirroredSolrRequest<?>> 
createKafkaConsumer(Properties properties) {
+    return new KafkaConsumer<>(
+        properties, new StringDeserializer(), new 
MirroredSolrRequestSerializer());
   }
 
   /**
-   * This is where the magic happens.
-   * 1. Polls and gets the packets from the queue
-   * 2. Extract the MirroredSolrRequest objects
-   * 3. Send the request to the MirroredSolrRequestHandler that has the 
processing, retry, error handling logic.
+   * This is where the magic happens. 1. Polls and gets the packets from the 
queue 2. Extract the

Review Comment:
   Make prettier



##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java:
##########
@@ -253,18 +290,20 @@ boolean pollAndProcessRequests() {
             }
 
             // it's an update but with different params

Review Comment:
   Maybe make this less confusing? (in terms of spacing)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org
For additional commands, e-mail: issues-h...@solr.apache.org


Reply via email to