Copilot commented on code in PR #9610: URL: https://github.com/apache/seatunnel/pull/9610#discussion_r2224654891
########## seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchAuthIT.java: ########## @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.elasticsearch; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.shade.com.google.common.collect.Lists; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth.AuthenticationProvider; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.auth.AuthenticationProviderFactory; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.util.EntityUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.time.Duration; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@Slf4j +public class ElasticsearchAuthIT extends TestSuiteBase implements TestResource { + + private static final String ELASTICSEARCH_IMAGE = "elasticsearch:8.9.0"; + private static final long INDEX_REFRESH_DELAY = 2000L; + + // Test data constants + private static final String TEST_INDEX = "auth_test_index"; + private static final String VALID_USERNAME = "elastic"; + private static final String VALID_PASSWORD = "elasticsearch"; + private static final String INVALID_USERNAME = "wrong_user"; + private static final String INVALID_PASSWORD = "wrong_password"; + + // API Key test constants - will be set dynamically after container starts + private String validApiKeyId; + private String validApiKeySecret; + private String validEncodedApiKey; + private static final String INVALID_API_KEY_ID = "invalid-key-id"; + private static final String INVALID_API_KEY_SECRET = "invalid-key-secret"; + + private ElasticsearchContainer elasticsearchContainer; + private EsRestClient esRestClient; + private ObjectMapper objectMapper = new ObjectMapper(); + private CloseableHttpClient httpClient; + + @BeforeEach + @Override + public void startUp() throws Exception { + // Initialize HTTP client with SSL trust all strategy + initializeHttpClient(); + + // Start Elasticsearch container + elasticsearchContainer = + new ElasticsearchContainer( + DockerImageName.parse(ELASTICSEARCH_IMAGE) + .asCompatibleSubstituteFor( + "docker.elastic.co/elasticsearch/elasticsearch")) + .withNetwork(NETWORK) + .withEnv("cluster.routing.allocation.disk.threshold_enabled", "false") + .withEnv("xpack.security.authc.api_key.enabled", "true") + .withNetworkAliases("elasticsearch") + .withPassword("elasticsearch") + .withStartupAttempts(5) + .withStartupTimeout(Duration.ofMinutes(5)) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger("elasticsearch:8.9.0"))); + Startables.deepStart(Stream.of(elasticsearchContainer)).join(); + log.info("Elasticsearch container started"); + + // Wait for Elasticsearch to be ready and create real API keys + waitForElasticsearchReady(); + createRealApiKeys(); + + // Initialize ES client for test data setup + Map<String, Object> configMap = new HashMap<>(); + configMap.put( + "hosts", + Lists.newArrayList("https://" + elasticsearchContainer.getHttpHostAddress())); + configMap.put("username", "elastic"); + configMap.put("password", "elasticsearch"); + configMap.put("tls_verify_certificate", false); + configMap.put("tls_verify_hostname", false); + ReadonlyConfig config = ReadonlyConfig.fromMap(configMap); + esRestClient = EsRestClient.createInstance(config); + createTestIndex(); + insertTestData(); + } + + @AfterEach + @Override + public void tearDown() throws Exception { + if (esRestClient != null) { + esRestClient.close(); + } + if (httpClient != null) { + httpClient.close(); + } + if (elasticsearchContainer != null) { + elasticsearchContainer.stop(); + } + } + + /** Initialize HTTP client with SSL trust all strategy for testing */ + private void initializeHttpClient() + throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + httpClient = + HttpClients.custom() + .setSSLContext( + SSLContextBuilder.create() + .loadTrustMaterial(TrustAllStrategy.INSTANCE) + .build()) + .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) + .build(); + log.info("HTTP client initialized with SSL trust all strategy"); + } + + /** Wait for Elasticsearch to be ready */ + private void waitForElasticsearchReady() throws IOException, InterruptedException { + String elasticsearchUrl = "https://" + elasticsearchContainer.getHttpHostAddress(); + String healthUrl = elasticsearchUrl + "/_cluster/health"; + + log.info("Waiting for Elasticsearch to be ready at: {}", healthUrl); + + for (int i = 0; i < 30; i++) { + try { + HttpGet request = new HttpGet(healthUrl); + String auth = + Base64.getEncoder() + .encodeToString( + (VALID_USERNAME + ":" + VALID_PASSWORD) + .getBytes(StandardCharsets.UTF_8)); + request.setHeader("Authorization", "Basic " + auth); + + HttpResponse response = httpClient.execute(request); + if (response.getStatusLine().getStatusCode() == 200) { + log.info("Elasticsearch is ready"); + return; + } + } catch (Exception e) { + log.debug("Elasticsearch not ready yet, attempt {}/30: {}", i + 1, e.getMessage()); + } + + TimeUnit.SECONDS.sleep(2); + } + + throw new RuntimeException("Elasticsearch failed to become ready within timeout"); + } + + /** Create real API keys using Elasticsearch API */ + private void createRealApiKeys() throws IOException { + String elasticsearchUrl = "https://" + elasticsearchContainer.getHttpHostAddress(); + String apiKeyUrl = elasticsearchUrl + "/_security/api_key"; + + log.info("Creating real API key at: {}", apiKeyUrl); + + // Create API key request body + String requestBody = + "{\n" + + " \"name\": \"seatunnel-test-api-key\",\n" + + " \"role_descriptors\": {\n" + + " \"seatunnel_test_role\": {\n" + + " \"cluster\": [\"monitor\", \"manage_index_templates\"],\n" + + " \"indices\": [\n" + + " {\n" + + " \"names\": [\"" + + TEST_INDEX + + "\", \"test_*\"],\n" + + " \"privileges\": [\"all\"]\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"metadata\": {\n" + + " \"application\": \"seatunnel-test\",\n" + + " \"environment\": \"integration-test\"\n" + + " }\n" + + "}"; + + HttpPost request = new HttpPost(apiKeyUrl); + String auth = + Base64.getEncoder() + .encodeToString( + (VALID_USERNAME + ":" + VALID_PASSWORD) + .getBytes(StandardCharsets.UTF_8)); + request.setHeader("Authorization", "Basic " + auth); + request.setHeader("Content-Type", "application/json"); + request.setEntity(new StringEntity(requestBody, StandardCharsets.UTF_8)); + + HttpResponse response = httpClient.execute(request); + String responseBody = EntityUtils.toString(response.getEntity()); + + if (response.getStatusLine().getStatusCode() != 200) { + throw new RuntimeException("Failed to create API key: " + responseBody); + } + + // Parse response to extract API key details + try { + JsonNode jsonResponse = objectMapper.readTree(responseBody); + validApiKeyId = jsonResponse.get("id").asText(); + validApiKeySecret = jsonResponse.get("api_key").asText(); + validEncodedApiKey = + Base64.getEncoder() + .encodeToString( + (validApiKeyId + ":" + validApiKeySecret) + .getBytes(StandardCharsets.UTF_8)); + + log.info( + "API Key created successfully - ID: {}, Secret: {}, Encoded: {}", + validApiKeyId, + validApiKeySecret, + validEncodedApiKey); + + // Verify the API key works + verifyApiKey(); + + } catch (Exception e) { + throw new RuntimeException("Failed to parse API key response: " + responseBody, e); + } + } + + /** Verify that the created API key works */ + private void verifyApiKey() throws IOException { + String elasticsearchUrl = "https://" + elasticsearchContainer.getHttpHostAddress(); + String authUrl = elasticsearchUrl + "/_security/_authenticate"; + + HttpGet request = new HttpGet(authUrl); + request.setHeader("Authorization", "ApiKey " + validEncodedApiKey); + + HttpResponse response = httpClient.execute(request); + String responseBody = EntityUtils.toString(response.getEntity()); + + if (response.getStatusLine().getStatusCode() == 200) { + log.info("API Key verification successful: {}", responseBody); + } else { + throw new RuntimeException("API Key verification failed: " + responseBody); + } + } + + private void createTestIndex() throws Exception { + String mapping = + "{" + + "\"mappings\": {" + + "\"properties\": {" + + "\"id\": {\"type\": \"integer\"}," + + "\"name\": {\"type\": \"text\"}," + + "\"value\": {\"type\": \"double\"}" + + "}" + + "}" + + "}"; + + log.info("Creating test index: {}", TEST_INDEX); + + try { + esRestClient.createIndex(TEST_INDEX, mapping); + log.info("Test index '{}' created successfully", TEST_INDEX); + } catch (Exception e) { + log.error("Failed to create test index: {}", e.getMessage(), e); + throw new RuntimeException("Failed to create test index: " + TEST_INDEX, e); + } + } + + private void insertTestData() throws Exception { + StringBuilder requestBody = new StringBuilder(); + String indexHeader = "{\"index\":{\"_index\":\"" + TEST_INDEX + "\"}}\n"; + + for (int i = 1; i <= 3; i++) { + Map<String, Object> doc = new HashMap<>(); + doc.put("id", i); + doc.put("name", "test_" + i); + doc.put("value", i * 10.5); + + requestBody.append(indexHeader); + requestBody.append(objectMapper.writeValueAsString(doc)); + requestBody.append("\n"); + } + + log.info("Inserting test data into index: {}", TEST_INDEX); + + try { + BulkResponse response = esRestClient.bulk(requestBody.toString()); + if (response.isErrors()) { + log.error("Bulk insert had errors: {}", response.getResponse()); + throw new RuntimeException("Failed to insert test data: " + response.getResponse()); + } + + Thread.sleep(INDEX_REFRESH_DELAY); Review Comment: Using Thread.sleep() in tests can lead to flaky tests. Consider using a more reliable waiting mechanism like polling with a timeout or using Elasticsearch's refresh API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
