fsk119 commented on code in PR #20609: URL: https://github.com/apache/flink/pull/20609#discussion_r954471863
########## flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java: ########## @@ -537,4 +455,85 @@ public void copyLogsTo(Path targetDirectory) throws IOException { Files.createDirectories(targetDirectory); TestUtils.copyDirectory(log, targetDirectory); } + + /** This rest client is used to submit SQL strings to Rest Endpoint of Sql Gateway. */ + private static class TestSqlGatewayRestClient { + + private final String host; + private final String port; + private final String sessionHandle; + + public TestSqlGatewayRestClient(Configuration configuration) throws Exception { + Map<String, String> configMap = configuration.toMap(); + host = + configMap.getOrDefault( + "sql-gateway.endpoint.rest.address", + InetAddress.getByName("localhost").getHostAddress()); + port = configMap.getOrDefault("sql-gateway.endpoint.rest.port", "8083"); + sessionHandle = openSession(); + } + + private String openSession() throws Exception { + FormBody.Builder builder = new FormBody.Builder(); + FormBody requestBody = builder.build(); + final Request request = + new Request.Builder() + .post(requestBody) + .url(String.format("http://%s:%s/v1/sessions/", host, port)) + .build(); + final JsonNode jsonNode = OBJECT_MAPPER.readTree(sendRequest(request)); + return jsonNode.get("sessionHandle").asText(); + } + + public String executeStatement(String sql) throws Exception { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("statement", sql); + RequestBody requestBody = + RequestBody.create( + MediaType.parse("application/json; charset=utf-8"), + OBJECT_MAPPER.writeValueAsString(objectNode)); + final Request request = + new Request.Builder() + .post(requestBody) + .url( + String.format( + "http://%s:%s/v1/sessions/%s/statements", + host, port, sessionHandle)) + .build(); + final JsonNode jsonNode = OBJECT_MAPPER.readTree(sendRequest(request)); + return jsonNode.get("operationHandle").asText(); + } + + public void waitUntilOperationTerminate(String operationHandle) throws Exception { + String status; + do { + final Request request = + new Request.Builder() + .get() + .url( + String.format( + "http://%s:%s/v1/sessions/%s/operations/%s/status", + host, port, sessionHandle, operationHandle)) + .build(); + final JsonNode jsonNode = OBJECT_MAPPER.readTree(sendRequest(request)); + status = jsonNode.get("status").asText(); + } while (!Objects.equals(status, "FINISHED") && !Objects.equals(status, "ERROR")); + } + + private String sendRequest(Request request) throws Exception { + final OkHttpClient client = new OkHttpClient(); Review Comment: nit: Can we reuse this client? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org