yashmayya commented on code in PR #12947: URL: https://github.com/apache/kafka/pull/12947#discussion_r1047409620
########## connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java: ########## @@ -37,98 +34,65 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; -import java.util.Arrays; -import java.util.Collections; import java.util.Map; /** * <p> - * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not - * distributed. Instead, all the normal Connect machinery works within a single process. This is - * useful for ad hoc, small, or experimental jobs. - * </p> - * <p> - * By default, no job configs or offset data is persistent. You can make jobs persistent and - * fault tolerant by overriding the settings to use file storage for both. + * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not + * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development + * and testing Kafka Connect on a local machine. * </p> */ -public class ConnectStandalone { +public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> { private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class); - public static void main(String[] args) { - - if (args.length < 1 || Arrays.asList(args).contains("--help")) { - log.info("Usage: ConnectStandalone worker.properties [connector1.properties connector2.properties ...]"); - Exit.exit(1); - } - - try { - Time time = Time.SYSTEM; - log.info("Kafka Connect standalone worker initializing ..."); - long initStart = time.hiResClockMs(); - WorkerInfo initInfo = new WorkerInfo(); - initInfo.logAll(); - - String workerPropsFile = args[0]; - Map<String, String> workerProps = !workerPropsFile.isEmpty() ? - Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap(); - - log.info("Scanning for plugin classes. This might take a moment ..."); - Plugins plugins = new Plugins(workerProps); - plugins.compareAndSwapWithDelegatingLoader(); - StandaloneConfig config = new StandaloneConfig(workerProps); - - String kafkaClusterId = config.kafkaClusterId(); - log.debug("Kafka cluster ID: {}", kafkaClusterId); + protected ConnectStandalone(String... args) { + super(args); + } - // Do not initialize a RestClient because the ConnectorsResource will not use it in standalone mode. - RestServer rest = new RestServer(config, null); - rest.initializeServer(); + @Override + protected String usage() { + return "ConnectStandalone worker.properties [connector1.properties connector2.properties ...]"; + } - URI advertisedUrl = rest.advertisedUrl(); - String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); + @Override + protected void processExtraArgs(Herder herder, String[] extraArgs) throws Throwable { + for (final String connectorPropsFile : extraArgs) { + Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile)); + FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> { + if (error != null) + log.error("Failed to create connector for {}", connectorPropsFile); + else + log.info("Created connector {}", info.result().name()); + }); + herder.putConnectorConfig( + connectorProps.get(ConnectorConfig.NAME_CONFIG), + connectorProps, false, cb); + cb.get(); Review Comment: Hm, this is already handled here - https://github.com/apache/kafka/blob/86df7c561029f44f7c2a1292f6eaba17e5d7d8d7/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java#L148-L154 Is your suggestion to move that handling from `AbstractConnectCli` to its implementations by also passing the `Connect` instance to `processExtraArgs` so that we can have more relevant error messages than `Stopping Connect due to an error while processing CLI arguments`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org