C0urante commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1047260096
##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -37,98 +34,65 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.Map;
/**
* <p>
- * Command line utility that runs Kafka Connect as a standalone process. In
this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a
single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs
persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In
this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a
single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
* </p>
*/
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
private static final Logger log =
LoggerFactory.getLogger(ConnectStandalone.class);
- public static void main(String[] args) {
-
- if (args.length < 1 || Arrays.asList(args).contains("--help")) {
- log.info("Usage: ConnectStandalone worker.properties
[connector1.properties connector2.properties ...]");
- Exit.exit(1);
- }
-
- try {
- Time time = Time.SYSTEM;
- log.info("Kafka Connect standalone worker initializing ...");
- long initStart = time.hiResClockMs();
- WorkerInfo initInfo = new WorkerInfo();
- initInfo.logAll();
-
- String workerPropsFile = args[0];
- Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
- Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) :
Collections.emptyMap();
-
- log.info("Scanning for plugin classes. This might take a moment
...");
- Plugins plugins = new Plugins(workerProps);
- plugins.compareAndSwapWithDelegatingLoader();
- StandaloneConfig config = new StandaloneConfig(workerProps);
-
- String kafkaClusterId = config.kafkaClusterId();
- log.debug("Kafka cluster ID: {}", kafkaClusterId);
+ protected ConnectStandalone(String... args) {
+ super(args);
+ }
- // Do not initialize a RestClient because the ConnectorsResource
will not use it in standalone mode.
- RestServer rest = new RestServer(config, null);
- rest.initializeServer();
+ @Override
+ protected String usage() {
+ return "ConnectStandalone worker.properties [connector1.properties
connector2.properties ...]";
+ }
- URI advertisedUrl = rest.advertisedUrl();
- String workerId = advertisedUrl.getHost() + ":" +
advertisedUrl.getPort();
+ @Override
+ protected void processExtraArgs(Herder herder, String[] extraArgs) throws
Throwable {
+ for (final String connectorPropsFile : extraArgs) {
+ Map<String, String> connectorProps =
Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+ FutureCallback<Herder.Created<ConnectorInfo>> cb = new
FutureCallback<>((error, info) -> {
+ if (error != null)
+ log.error("Failed to create connector for {}",
connectorPropsFile);
+ else
+ log.info("Created connector {}", info.result().name());
+ });
+ herder.putConnectorConfig(
+ connectorProps.get(ConnectorConfig.NAME_CONFIG),
+ connectorProps, false, cb);
+ cb.get();
Review Comment:
Can we keep the [existing error
message](https://github.com/apache/kafka/blob/526af63cfe57eccb93d9cb79542e87c16b669a15/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java#L121)
if any of these fail?
Can probably be accomplished by wrapping the body of this method in a try
block and logging the error message, stopping the `Connect` instance, and
exiting in the catch body.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -37,98 +34,65 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.Map;
/**
* <p>
- * Command line utility that runs Kafka Connect as a standalone process. In
this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a
single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs
persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In
this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a
single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
Review Comment:
I'd prefer to keep the existing wording of "ad hoc, small, or experimental
jobs". I know it's generally recommended to run Connect in distributed mode but
standalone mode is still a legitimate part of the project and we make that
clear in [our
docs](https://kafka.apache.org/33/documentation.html#connect_running):
> In standalone mode all work is performed in a single process. This
configuration is simpler to setup and get started with and may be useful in
situations where only one worker makes sense (e.g. collecting log files), but
it does not benefit from some of the features of Kafka Connect such as fault
tolerance.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -21,41 +21,51 @@
import org.apache.kafka.common.utils.Utils;
import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
/**
* <p>
- * Command line utility that runs Kafka Connect as a standalone process. In
this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a
single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs
persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In
this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a
single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
* </p>
*/
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
private static final Logger log =
LoggerFactory.getLogger(ConnectStandalone.class);
+ @Override
+ protected Herder createHerder(StandaloneConfig config, String workerId,
Plugins plugins,
+ ConnectorClientConfigOverridePolicy
connectorClientConfigOverridePolicy,
+ RestServer restServer, RestClient
restClient) {
+
+ OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+ offsetBackingStore.configure(config);
+
+ Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config,
offsetBackingStore,
+ connectorClientConfigOverridePolicy);
+
+ return new StandaloneHerder(worker, config.kafkaClusterId(),
connectorClientConfigOverridePolicy);
+ }
+
+ @Override
+ protected StandaloneConfig createConfig(Map<String, String> workerProps) {
+ return new StandaloneConfig(workerProps);
+ }
+
public static void main(String[] args) {
Review Comment:
Agreed that the integration testing framework's use of the CLI class to
instantiate workers is strange. IMO that part is no worse with these changes
than it is currently though, so we don't have to block on cleaning that up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]