eskabetxe commented on code in PR #67:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/67#discussion_r1397647438


##########
flink-connector-jdbc/pom.xml:
##########
@@ -38,10 +38,12 @@ under the License.
                <scala.binary.version>2.12</scala.binary.version>
                <scala-library.version>2.12.7</scala-library.version>
                <assertj.version>3.23.1</assertj.version>
+               <jackson.version>2.15.2</jackson.version>
                <postgres.version>42.5.1</postgres.version>
                <oracle.version>21.8.0.0</oracle.version>
                <trino.version>418</trino.version>
                <byte-buddy.version>1.12.10</byte-buddy.version>
+               <elasticsearch.version>8.8.1</elasticsearch.version>

Review Comment:
   why this version? last version is 8.11.1



##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchDatabase.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.elasticsearch;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.time.Duration;
+
+import static 
org.apache.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchMetadata.PASSWORD;
+import static 
org.apache.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchMetadata.USERNAME;
+
+/** Elasticsearch database for testing. */
+public class ElasticsearchDatabase extends DatabaseExtension implements 
ElasticsearchImages {
+
+    private static final ElasticsearchContainer CONTAINER =
+            new ElasticsearchContainer(ELASTICSEARCH_8)
+                    .waitingFor(
+                            Wait.forHttp("/")
+                                    .withBasicCredentials(USERNAME, PASSWORD)
+                                    .withReadTimeout(Duration.ofMinutes(2)))
+                                    .withStartupTimeout(Duration.ofMinutes(5));
+
+    private static ElasticsearchMetadata metadata;
+    private static ElasticsearchRestClient client;
+
+    public static ElasticsearchMetadata getMetadata() {
+        if (!CONTAINER.isRunning()) {
+            throw new FlinkRuntimeException("Container is stopped.");
+        }
+        if (metadata == null) {
+            metadata = new ElasticsearchMetadata(CONTAINER);
+        }
+        return metadata;
+    }
+
+    private static ElasticsearchRestClient getClient() {
+        if (!CONTAINER.isRunning()) {
+            throw new FlinkRuntimeException("Container is stopped.");
+        }
+        if (client == null) {
+            client = new ElasticsearchRestClient(getMetadata());
+        }
+        return client;
+    }
+
+    @Override
+    protected DatabaseMetadata startDatabase() throws Exception {
+        CONTAINER.withEnv("xpack.security.enabled", "true");
+        CONTAINER.withEnv("ELASTIC_PASSWORD", PASSWORD);
+        CONTAINER.withEnv("ES_JAVA_OPTS", "-Xms1g -Xmx1g");
+        CONTAINER.start();
+
+        // JDBC plugin is available only in Platinum and Enterprise licenses 
or in trial.
+        if (!getClient().trialEnabled()) {

Review Comment:
   @MartijnVisser can you validate this? I dont know if by license we can do 
this.



##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialect.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.elasticsearch.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+
+/** JDBC dialect for Elastic. */
+@Internal
+public class ElasticsearchDialect extends AbstractDialect {
+
+    private static final long serialVersionUID = 1L;
+
+    // Define MAX/MIN precision of TIMESTAMP type according to Elastic docs:
+    // 
https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
+    private static final int MIN_TIMESTAMP_PRECISION = 0;
+    private static final int MAX_TIMESTAMP_PRECISION = 9;
+
+    @Override
+    public String dialectName() {
+        return "Elasticsearch";
+    }
+
+    @Override
+    public Optional<String> defaultDriverName() {
+        return Optional.of("org.elasticsearch.xpack.sql.jdbc.EsDriver");
+    }
+
+    @Override
+    public Set<LogicalTypeRoot> supportedTypes() {
+        // The list of types supported by Elastic SQL.
+        // 
https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
+        return EnumSet.of(
+                LogicalTypeRoot.BIGINT,
+                LogicalTypeRoot.BOOLEAN,
+                LogicalTypeRoot.DATE,
+                LogicalTypeRoot.DOUBLE,
+                LogicalTypeRoot.INTEGER,
+                LogicalTypeRoot.FLOAT,
+                LogicalTypeRoot.SMALLINT,
+                LogicalTypeRoot.TINYINT,
+                LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+                LogicalTypeRoot.VARBINARY,
+                LogicalTypeRoot.VARCHAR);
+    }
+
+    @Override
+    public Optional<Range> timestampPrecisionRange() {
+        return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, 
MAX_TIMESTAMP_PRECISION));
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter(RowType rowType) {
+        return new ElasticsearchRowConverter(rowType);
+    }
+
+    @Override
+    public String getLimitClause(long limit) {
+        return "LIMIT " + limit;
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return '"' + identifier + '"';
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(

Review Comment:
   if read is the only option for ES, this should be on documentation



##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchImages.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.elasticsearch;
+
+/** Elasticsearch docker images. */
+public interface ElasticsearchImages {
+
+    String ELASTICSEARCH_8 = 
"docker.elastic.co/elasticsearch/elasticsearch:8.8.1";

Review Comment:
   any reason for that specific version? current on 8.11.1



##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchMetadata.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.elasticsearch;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import javax.sql.XADataSource;
+
+/** Elasticsearch metadata. */
+public class ElasticsearchMetadata implements DatabaseMetadata {
+
+    static final int ELASTIC_PORT = 9200;
+    static final String USERNAME = "elastic";
+    static final String PASSWORD = "password";
+
+    private final String username;
+    private final String password;
+    private final String jdbcUrl;
+    private final String driver;
+    private final String version;
+    private final String containerHost;
+    private final int containerPort;
+
+    public ElasticsearchMetadata(ElasticsearchContainer container) {
+        this.containerHost = container.getHost();
+        this.containerPort = container.getMappedPort(ELASTIC_PORT);
+        this.username = USERNAME;

Review Comment:
   can we obtain this from container?



##########
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchRestClient.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.elasticsearch;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RestClient;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.IOException;
+
+import static java.lang.String.format;
+
+/** Elasticsearch REST API client. */
+public class ElasticsearchRestClient {
+
+    private static final ObjectMapper OBJECT_MAPPER =
+            new 
ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+
+    private final RestClient restClient;
+
+    public ElasticsearchRestClient(ElasticsearchMetadata metadata) {
+        this(
+                metadata.getContainerHost(),
+                metadata.getContainerPort(),
+                metadata.getUsername(),
+                metadata.getPassword());
+    }
+
+    public ElasticsearchRestClient(String host, int port, String username, 
String password) {
+        final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+        credentialsProvider.setCredentials(
+                AuthScope.ANY, new UsernamePasswordCredentials(username, 
password));
+        this.restClient =
+                RestClient.builder(new HttpHost(host, port, "http"))
+                        .setHttpClientConfigCallback(
+                                builder ->
+                                        
builder.setDefaultCredentialsProvider(credentialsProvider))
+                        .build();
+    }
+
+    public boolean trialEnabled() throws Exception {
+        Request request = new Request("GET", "/_license");
+        ElasticLicenseResponse response = executeRequest(request, 
ElasticLicenseResponse.class);
+        return response != null
+                && response.license.status.equals("active")
+                && response.license.type.equals("trial");
+    }
+
+    public void enableTrial() throws Exception {

Review Comment:
   @MartijnVisser here comes the real action



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to