C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1362530005
##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##########
@@ -332,722 +186,46 @@ public Set<WorkerHandle> workers() {
return new LinkedHashSet<>(connectCluster);
}
- /**
- * Configure a connector. If the connector does not already exist, a new
one will be created and
- * the given configuration will be applied to it.
- *
- * @param connName the name of the connector
- * @param connConfig the intended configuration
- * @throws ConnectRestException if the REST api returns error status
- * @throws ConnectException if the configuration fails to be serialized or
if the request could not be sent
- */
- public String configureConnector(String connName, Map<String, String>
connConfig) {
- String url = endpointForResource(String.format("connectors/%s/config",
connName));
- return putConnectorConfig(url, connConfig);
- }
-
- /**
- * Validate a given connector configuration. If the configuration
validates or
- * has a configuration error, an instance of {@link ConfigInfos} is
returned. If the validation fails
- * an exception is thrown.
- *
- * @param connClassName the name of the connector class
- * @param connConfig the intended configuration
- * @throws ConnectRestException if the REST api returns error status
- * @throws ConnectException if the configuration fails to
serialize/deserialize or if the request failed to send
- */
- public ConfigInfos validateConnectorConfig(String connClassName,
Map<String, String> connConfig) {
- String url =
endpointForResource(String.format("connector-plugins/%s/config/validate",
connClassName));
- String response = putConnectorConfig(url, connConfig);
- ConfigInfos configInfos;
- try {
- configInfos = new ObjectMapper().readValue(response,
ConfigInfos.class);
- } catch (IOException e) {
- throw new ConnectException("Unable deserialize response into a
ConfigInfos object");
- }
- return configInfos;
- }
-
- /**
- * Execute a PUT request with the given connector configuration on the
given URL endpoint.
- *
- * @param url the full URL of the endpoint that corresponds to the
given REST resource
- * @param connConfig the intended configuration
- * @throws ConnectRestException if the REST api returns error status
- * @throws ConnectException if the configuration fails to be serialized or
if the request could not be sent
- */
- protected String putConnectorConfig(String url, Map<String, String>
connConfig) {
- ObjectMapper mapper = new ObjectMapper();
- String content;
- try {
- content = mapper.writeValueAsString(connConfig);
- } catch (IOException e) {
- throw new ConnectException("Could not serialize connector
configuration and execute PUT request");
- }
- Response response = requestPut(url, content);
- if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
- return responseToString(response);
- }
- throw new ConnectRestException(response.getStatus(),
- "Could not execute PUT request. Error response: " +
responseToString(response));
- }
-
- /**
- * Delete an existing connector.
- *
- * @param connName name of the connector to be deleted
- * @throws ConnectRestException if the REST API returns error status
- * @throws ConnectException for any other error.
- */
- public void deleteConnector(String connName) {
- String url = endpointForResource(String.format("connectors/%s",
connName));
- Response response = requestDelete(url);
- if (response.getStatus() >=
Response.Status.BAD_REQUEST.getStatusCode()) {
- throw new ConnectRestException(response.getStatus(),
- "Could not execute DELETE request. Error response: " +
responseToString(response));
- }
- }
-
- /**
- * Stop an existing connector.
- *
- * @param connName name of the connector to be paused
- * @throws ConnectRestException if the REST API returns error status
- * @throws ConnectException for any other error.
- */
- public void stopConnector(String connName) {
- String url = endpointForResource(String.format("connectors/%s/stop",
connName));
- Response response = requestPut(url, "");
- if (response.getStatus() >=
Response.Status.BAD_REQUEST.getStatusCode()) {
- throw new ConnectRestException(response.getStatus(),
- "Could not execute PUT request. Error response: " +
responseToString(response));
- }
- }
-
- /**
- * Pause an existing connector.
- *
- * @param connName name of the connector to be paused
- * @throws ConnectRestException if the REST API returns error status
- * @throws ConnectException for any other error.
- */
- public void pauseConnector(String connName) {
- String url = endpointForResource(String.format("connectors/%s/pause",
connName));
- Response response = requestPut(url, "");
- if (response.getStatus() >=
Response.Status.BAD_REQUEST.getStatusCode()) {
- throw new ConnectRestException(response.getStatus(),
- "Could not execute PUT request. Error response: " +
responseToString(response));
- }
- }
-
- /**
- * Resume an existing connector.
- *
- * @param connName name of the connector to be resumed
- * @throws ConnectRestException if the REST API returns error status
- * @throws ConnectException for any other error.
- */
- public void resumeConnector(String connName) {
- String url = endpointForResource(String.format("connectors/%s/resume",
connName));
- Response response = requestPut(url, "");
- if (response.getStatus() >=
Response.Status.BAD_REQUEST.getStatusCode()) {
- throw new ConnectRestException(response.getStatus(),
- "Could not execute PUT request. Error response: " +
responseToString(response));
- }
- }
-
- /**
- * Restart an existing connector.
- *
- * @param connName name of the connector to be restarted
- * @throws ConnectRestException if the REST API returns error status
- * @throws ConnectException for any other error.
- */
- public void restartConnector(String connName) {
- String url =
endpointForResource(String.format("connectors/%s/restart", connName));
- Response response = requestPost(url, "", Collections.emptyMap());
- if (response.getStatus() >=
Response.Status.BAD_REQUEST.getStatusCode()) {
- throw new ConnectRestException(response.getStatus(),
- "Could not execute POST request. Error response: " +
responseToString(response));
- }
- }
-
- /**
- * Restart an existing connector and its tasks.
- *
- * @param connName name of the connector to be restarted
- * @param onlyFailed true if only failed instances should be restarted
- * @param includeTasks true if tasks should be restarted, or false if
only the connector should be restarted
- * @param onlyCallOnEmptyWorker true if the REST API call should be called
on a worker not running this connector or its tasks
- * @throws ConnectRestException if the REST API returns error status
- * @throws ConnectException for any other error.
- */
- public ConnectorStateInfo restartConnectorAndTasks(String connName,
boolean onlyFailed, boolean includeTasks, boolean onlyCallOnEmptyWorker) {
- ObjectMapper mapper = new ObjectMapper();
- String restartPath = String.format("connectors/%s/restart?onlyFailed="
+ onlyFailed + "&includeTasks=" + includeTasks, connName);
- String restartEndpoint;
- if (onlyCallOnEmptyWorker) {
- restartEndpoint =
endpointForResourceNotRunningConnector(restartPath, connName);
- } else {
- restartEndpoint = endpointForResource(restartPath);
- }
- Response response = requestPost(restartEndpoint, "",
Collections.emptyMap());
- try {
- if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
- //only the 202 stauts returns a body
- if (response.getStatus() ==
Response.Status.ACCEPTED.getStatusCode()) {
- return mapper.readerFor(ConnectorStateInfo.class)
- .readValue(responseToString(response));
- }
- }
- return null;
- } catch (IOException e) {
- log.error("Could not read connector state from response: {}",
- responseToString(response), e);
- throw new ConnectException("Could not not parse connector state",
e);
- }
- }
- /**
- * Get the connector names of the connectors currently running on this
cluster.
- *
- * @return the list of connector names
- * @throws ConnectRestException if the HTTP request to the REST API failed
with a valid status code.
- * @throws ConnectException for any other error.
- */
- public Collection<String> connectors() {
- ObjectMapper mapper = new ObjectMapper();
- String url = endpointForResource("connectors");
- Response response = requestGet(url);
- if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
- try {
- return
mapper.readerFor(Collection.class).readValue(responseToString(response));
- } catch (IOException e) {
- log.error("Could not parse connector list from response: {}",
- responseToString(response), e
- );
- throw new ConnectException("Could not not parse connector
list", e);
- }
- }
- throw new ConnectRestException(response.getStatus(),
- "Could not read connector list. Error response: " +
responseToString(response));
- }
-
- /**
- * Get the status for a connector running in this cluster.
- *
- * @param connectorName name of the connector
- * @return an instance of {@link ConnectorStateInfo} populated with state
information of the connector and its tasks.
- * @throws ConnectRestException if the HTTP request to the REST API failed
with a valid status code.
- * @throws ConnectException for any other error.
- */
- public ConnectorStateInfo connectorStatus(String connectorName) {
- ObjectMapper mapper = new ObjectMapper();
- String url = endpointForResource(String.format("connectors/%s/status",
connectorName));
- Response response = requestGet(url);
- try {
- if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
- return mapper.readerFor(ConnectorStateInfo.class)
- .readValue(responseToString(response));
- }
- } catch (IOException e) {
- log.error("Could not read connector state from response: {}",
- responseToString(response), e);
- throw new ConnectException("Could not not parse connector state",
e);
- }
- throw new ConnectRestException(response.getStatus(),
- "Could not read connector state. Error response: " +
responseToString(response));
- }
-
- /**
- * Get the active topics of a connector running in this cluster.
- *
- * @param connectorName name of the connector
- * @return an instance of {@link ConnectorStateInfo} populated with state
information of the connector and its tasks.
- * @throws ConnectRestException if the HTTP request to the REST API failed
with a valid status code.
- * @throws ConnectException for any other error.
- */
- public ActiveTopicsInfo connectorTopics(String connectorName) {
- ObjectMapper mapper = new ObjectMapper();
- String url = endpointForResource(String.format("connectors/%s/topics",
connectorName));
- Response response = requestGet(url);
- try {
- if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
- Map<String, Map<String, List<String>>> activeTopics = mapper
- .readerFor(new TypeReference<Map<String, Map<String,
List<String>>>>() { })
- .readValue(responseToString(response));
- return new ActiveTopicsInfo(connectorName,
- activeTopics.get(connectorName).getOrDefault("topics",
Collections.emptyList()));
- }
- } catch (IOException e) {
- log.error("Could not read connector state from response: {}",
- responseToString(response), e);
- throw new ConnectException("Could not not parse connector state",
e);
- }
- throw new ConnectRestException(response.getStatus(),
- "Could not read connector state. Error response: " +
responseToString(response));
- }
-
- /**
- * Get the info of a connector running in this cluster (retrieved via the
<code>GET /connectors/{connector}</code> endpoint).
-
- * @param connectorName name of the connector
- * @return an instance of {@link ConnectorInfo} populated with state
information of the connector and its tasks.
- */
- public ConnectorInfo connectorInfo(String connectorName) {
- ObjectMapper mapper = new ObjectMapper();
- String url = endpointForResource(String.format("connectors/%s",
connectorName));
- Response response = requestGet(url);
- try {
- if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
- return mapper.readValue(responseToString(response),
ConnectorInfo.class);
- }
- } catch (IOException e) {
- log.error("Could not read connector info from response: {}",
- responseToString(response), e);
- throw new ConnectException("Could not not parse connector info",
e);
- }
- throw new ConnectRestException(response.getStatus(),
- "Could not read connector info. Error response: " +
responseToString(response));
- }
-
- /**
- * Get the task configs of a connector running in this cluster.
- *
- * @param connectorName name of the connector
- * @return a list of task configurations for the connector
- */
- public List<TaskInfo> taskConfigs(String connectorName) {
- ObjectMapper mapper = new ObjectMapper();
- String url = endpointForResource(String.format("connectors/%s/tasks",
connectorName));
- Response response = requestGet(url);
- try {
- if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
- // We use String instead of ConnectorTaskId as the key here
since the latter can't be automatically
- // deserialized by Jackson when used as a JSON object key
(i.e., when it's serialized as a JSON string)
- return mapper.readValue(responseToString(response), new
TypeReference<List<TaskInfo>>() { });
- }
- } catch (IOException e) {
- log.error("Could not read task configs from response: {}",
- responseToString(response), e);
- throw new ConnectException("Could not not parse task configs", e);
- }
- throw new ConnectRestException(response.getStatus(),
- "Could not read task configs. Error response: " +
responseToString(response));
- }
-
- /**
- * Reset the set of active topics of a connector running in this cluster.
- *
- * @param connectorName name of the connector
- * @throws ConnectRestException if the HTTP request to the REST API failed
with a valid status code.
- * @throws ConnectException for any other error.
- */
- public void resetConnectorTopics(String connectorName) {
- String url =
endpointForResource(String.format("connectors/%s/topics/reset", connectorName));
- Response response = requestPut(url, null);
- if (response.getStatus() >=
Response.Status.BAD_REQUEST.getStatusCode()) {
- throw new ConnectRestException(response.getStatus(),
- "Resetting active topics for connector " + connectorName +
" failed. "
- + "Error response: " + responseToString(response));
- }
- }
-
- /**
- * Get the offsets for a connector via the <strong><em>GET
/connectors/{connector}/offsets</em></strong> endpoint
- *
- * @param connectorName name of the connector whose offsets are to be
retrieved
- * @return the connector's offsets
- */
- public ConnectorOffsets connectorOffsets(String connectorName) {
- String url =
endpointForResource(String.format("connectors/%s/offsets", connectorName));
- Response response = requestGet(url);
- ObjectMapper mapper = new ObjectMapper();
-
- try {
- if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
- return
mapper.readerFor(ConnectorOffsets.class).readValue(responseToString(response));
- }
- } catch (IOException e) {
- throw new ConnectException("Could not not parse connector
offsets", e);
- }
- throw new ConnectRestException(response.getStatus(),
- "Could not fetch connector offsets. Error response: " +
responseToString(response));
- }
-
- /**
- * Alter the offset for a source connector's partition via the
<strong><em>PATCH /connectors/{connector}/offsets</em></strong>
- * endpoint
- *
- * @param connectorName name of the source connector whose offset is to be
altered
- * @param partition the source partition for which the offset is to be
altered
- * @param offset the source offset to be written
- *
- * @return the API response as a {@link java.lang.String}
- */
- public String alterSourceConnectorOffset(String connectorName, Map<String,
?> partition, Map<String, ?> offset) {
- return alterConnectorOffsets(
- connectorName,
- new ConnectorOffsets(Collections.singletonList(new
ConnectorOffset(partition, offset)))
- );
- }
-
- /**
- * Alter the offset for a sink connector's topic partition via the
<strong><em>PATCH /connectors/{connector}/offsets</em></strong>
- * endpoint
- *
- * @param connectorName name of the sink connector whose offset is to be
altered
- * @param topicPartition the topic partition for which the offset is to be
altered
- * @param offset the offset to be written
- *
- * @return the API response as a {@link java.lang.String}
- */
- public String alterSinkConnectorOffset(String connectorName,
TopicPartition topicPartition, Long offset) {
- return alterConnectorOffsets(
- connectorName,
-
SinkUtils.consumerGroupOffsetsToConnectorOffsets(Collections.singletonMap(topicPartition,
new OffsetAndMetadata(offset)))
- );
- }
-
- /**
- * Alter a connector's offsets via the <strong><em>PATCH
/connectors/{connector}/offsets</em></strong> endpoint
- *
- * @param connectorName name of the connector whose offsets are to be
altered
- * @param offsets offsets to alter
- *
- * @return the API response as a {@link java.lang.String}
- */
- public String alterConnectorOffsets(String connectorName, ConnectorOffsets
offsets) {
- String url =
endpointForResource(String.format("connectors/%s/offsets", connectorName));
- ObjectMapper mapper = new ObjectMapper();
- String content;
- try {
- content = mapper.writeValueAsString(offsets);
- } catch (IOException e) {
- throw new ConnectException("Could not serialize connector offsets
and execute PATCH request");
- }
-
- Response response = requestPatch(url, content);
- if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
- return responseToString(response);
- } else {
- throw new ConnectRestException(response.getStatus(),
- "Could not alter connector offsets. Error response: " +
responseToString(response));
- }
- }
-
- /**
- * Reset a connector's offsets via the <strong><em>DELETE
/connectors/{connector}/offsets</em></strong> endpoint
- *
- * @param connectorName name of the connector whose offsets are to be reset
- */
- public String resetConnectorOffsets(String connectorName) {
- String url =
endpointForResource(String.format("connectors/%s/offsets", connectorName));
- Response response = requestDelete(url);
- if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
- return responseToString(response);
- } else {
- throw new ConnectRestException(response.getStatus(),
- "Could not reset connector offsets. Error response: " +
responseToString(response));
- }
- }
-
- /**
- * Get the full URL of the admin endpoint that corresponds to the given
REST resource
- *
- * @param resource the resource under the worker's admin endpoint
- * @return the admin endpoint URL
- * @throws ConnectException if no admin REST endpoint is available
- */
- public String adminEndpoint(String resource) {
- String url = connectCluster.stream()
- .map(WorkerHandle::adminUrl)
- .filter(Objects::nonNull)
- .findFirst()
- .orElseThrow(() -> new ConnectException("Admin endpoint is
disabled."))
- .toString();
- return url + resource;
- }
-
- /**
- * Get the full URL of the endpoint that corresponds to the given REST
resource
- *
- * @param resource the resource under the worker's admin endpoint
- * @return the admin endpoint URL
- * @throws ConnectException if no REST endpoint is available
- */
- public String endpointForResource(String resource) {
- String url = connectCluster.stream()
- .map(WorkerHandle::url)
- .filter(Objects::nonNull)
- .findFirst()
- .orElseThrow(() -> new ConnectException("Connect workers have
not been provisioned"))
- .toString();
- return url + resource;
- }
-
- /**
- * Get the full URL of the endpoint that corresponds to the given REST
resource using a worker
- * that is not running any tasks or connector instance for the
connectorName provided in the arguments
- *
- * @param resource the resource under the worker's admin endpoint
- * @param connectorName the name of the connector
- * @return the admin endpoint URL
- * @throws ConnectException if no REST endpoint is available
- */
- public String endpointForResourceNotRunningConnector(String resource,
String connectorName) {
- ConnectorStateInfo info = connectorStatus(connectorName);
- Set<String> activeWorkerUrls = new HashSet<>();
- activeWorkerUrls.add(String.format("http://%s/",
info.connector().workerId()));
- info.tasks().forEach(t ->
activeWorkerUrls.add(String.format("http://%s/", t.workerId())));
- String url = connectCluster.stream()
- .map(WorkerHandle::url)
- .filter(Objects::nonNull)
- .filter(workerUrl ->
!activeWorkerUrls.contains(workerUrl.toString()))
- .findFirst()
- .orElseThrow(() -> new ConnectException(
- String.format("Connect workers have not been
provisioned or no free worker found that is not running this connector(%s) or
its tasks", connectorName)))
- .toString();
- return url + resource;
- }
-
- private static void putIfAbsent(Map<String, String> props, String
propertyKey, String propertyValue) {
- if (!props.containsKey(propertyKey)) {
- props.put(propertyKey, propertyValue);
- }
- }
-
- /**
- * Return the handle to the Kafka cluster this Connect cluster connects to.
- *
- * @return the Kafka cluster handle
- */
- public EmbeddedKafkaCluster kafka() {
- return kafkaCluster;
- }
-
- /**
- * Execute a GET request on the given URL.
- *
- * @param url the HTTP endpoint
- * @return the response to the GET request
- * @throws ConnectException if execution of the GET request fails
- * @deprecated Use {@link #requestGet(String)} instead.
- */
- @Deprecated
- public String executeGet(String url) {
- return responseToString(requestGet(url));
- }
-
- /**
- * Execute a GET request on the given URL.
- *
- * @param url the HTTP endpoint
- * @return the response to the GET request
- * @throws ConnectException if execution of the GET request fails
- */
- public Response requestGet(String url) {
- return requestHttpMethod(url, null, Collections.emptyMap(), "GET");
- }
-
- /**
- * Execute a PUT request on the given URL.
- *
- * @param url the HTTP endpoint
- * @param body the payload of the PUT request
- * @return the response to the PUT request
- * @throws ConnectException if execution of the PUT request fails
- * @deprecated Use {@link #requestPut(String, String)} instead.
- */
- @Deprecated
- public int executePut(String url, String body) {
- return requestPut(url, body).getStatus();
- }
-
- /**
- * Execute a PUT request on the given URL.
- *
- * @param url the HTTP endpoint
- * @param body the payload of the PUT request
- * @return the response to the PUT request
- * @throws ConnectException if execution of the PUT request fails
- */
- public Response requestPut(String url, String body) {
- return requestHttpMethod(url, body, Collections.emptyMap(), "PUT");
- }
-
- /**
- * Execute a POST request on the given URL.
- *
- * @param url the HTTP endpoint
- * @param body the payload of the POST request
- * @param headers a map that stores the POST request headers
- * @return the response to the POST request
- * @throws ConnectException if execution of the POST request fails
- * @deprecated Use {@link #requestPost(String, String, java.util.Map)}
instead.
- */
- @Deprecated
- public int executePost(String url, String body, Map<String, String>
headers) {
- return requestPost(url, body, headers).getStatus();
- }
-
- /**
- * Execute a POST request on the given URL.
- *
- * @param url the HTTP endpoint
- * @param body the payload of the POST request
- * @param headers a map that stores the POST request headers
- * @return the response to the POST request
- * @throws ConnectException if execution of the POST request fails
- */
- public Response requestPost(String url, String body, Map<String, String>
headers) {
- return requestHttpMethod(url, body, headers, "POST");
- }
-
- /**
- * Execute a PATCH request on the given URL.
- *
- * @param url the HTTP endpoint
- * @param body the payload of the PATCH request
- * @return the response to the PATCH request
- * @throws ConnectException if execution of the PATCH request fails
- */
- public Response requestPatch(String url, String body) {
- return requestHttpMethod(url, body, Collections.emptyMap(), "PATCH");
- }
-
- /**
- * Execute a DELETE request on the given URL.
- *
- * @param url the HTTP endpoint
- * @return the response to the DELETE request
- * @throws ConnectException if execution of the DELETE request fails
- * @deprecated Use {@link #requestDelete(String)} instead.
- */
- @Deprecated
- public int executeDelete(String url) {
- return requestDelete(url).getStatus();
- }
-
- /**
- * Execute a DELETE request on the given URL.
- *
- * @param url the HTTP endpoint
- * @return the response to the DELETE request
- * @throws ConnectException if execution of the DELETE request fails
- */
- public Response requestDelete(String url) {
- return requestHttpMethod(url, null, Collections.emptyMap(), "DELETE");
- }
-
- /**
- * A general method that executes an HTTP request on a given URL.
- *
- * @param url the HTTP endpoint
- * @param body the payload of the request; null if there isn't one
- * @param headers a map that stores the request headers; empty if there
are no headers
- * @param httpMethod the name of the HTTP method to execute
- * @return the response to the HTTP request
- * @throws ConnectException if execution of the HTTP method fails
- */
- protected Response requestHttpMethod(String url, String body, Map<String,
String> headers,
- String httpMethod) {
- log.debug("Executing {} request to URL={}." + (body != null ? "
Payload={}" : ""),
- httpMethod, url, body);
-
- try {
- Request req = httpClient.newRequest(url);
- req.method(httpMethod);
- if (body != null) {
- headers.forEach(req::header);
- req.content(new StringContentProvider(body),
"application/json");
- }
-
- ContentResponse res = req.send();
- log.info("{} response for URL={} is {}",
- httpMethod, url, res.getContentAsString().isEmpty() ?
"empty" : res.getContentAsString());
- return
Response.status(Response.Status.fromStatusCode(res.getStatus()))
- .entity(res.getContentAsString())
- .build();
- } catch (Exception e) {
- log.error("Could not execute " + httpMethod + " request to " +
url, e);
- throw new ConnectException(e);
- }
- }
-
- private String responseToString(Response response) {
- return response == null ? "empty" : (String) response.getEntity();
- }
Review Comment:
This is also migrated directly to the `EmbeddedConnect` class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]