This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 99947c98d [INLONG-6869][Sort] Supports dirty data side-output for elasticsearch sink (#6870) 99947c98d is described below commit 99947c98d6437a694ccd3ffd1177f88add12dd31 Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Wed Dec 14 15:23:38 2022 +0800 [INLONG-6869][Sort] Supports dirty data side-output for elasticsearch sink (#6870) --- .../inlong/sort/base/dirty/DirtySinkHelper.java | 108 ++++++++++++++++++ .../apache/inlong/sort/base/dirty/DirtyType.java | 16 +++ .../sort/elasticsearch6/ElasticsearchSink.java | 20 +++- .../table/Elasticsearch6DynamicSink.java | 16 ++- .../table/Elasticsearch6DynamicSinkFactory.java | 14 ++- .../sort/elasticsearch7/ElasticsearchSink.java | 19 +++- .../table/Elasticsearch7DynamicSink.java | 16 ++- .../table/Elasticsearch7DynamicSinkFactory.java | 14 ++- .../sort/elasticsearch/ElasticsearchSinkBase.java | 122 +++++++++++--------- .../table/RowElasticsearchSinkFunction.java | 124 ++++++++++++++------- .../sort/parser/ElasticsearchSqlParseTest.java | 10 +- 11 files changed, 369 insertions(+), 110 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java new file mode 100644 index 000000000..a962b974e --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.dirty; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.Serializable; + +/** + * Dirty sink helper, it helps dirty data sink for {@link DirtySink} + * @param <T> + */ +public class DirtySinkHelper<T> implements Serializable { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = LoggerFactory.getLogger(DirtySinkHelper.class); + + private DirtyOptions dirtyOptions; + private final @Nullable DirtySink<T> dirtySink; + + public DirtySinkHelper(DirtyOptions dirtyOptions, @Nullable DirtySink<T> dirtySink) { + this.dirtyOptions = Preconditions.checkNotNull(dirtyOptions, "dirtyOptions is null"); + this.dirtySink = dirtySink; + } + + /** + * Open for dirty sink + * + * @param configuration The configuration that is used for dirty sink + */ + public void open(Configuration configuration) { + if (dirtySink != null) { + try { + dirtySink.open(configuration); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Dirty data sink + * @param dirtyData The dirty data + * @param dirtyType The dirty type {@link DirtyType} + * @param e The cause of dirty data + */ + public void invoke(T dirtyData, DirtyType dirtyType, Throwable e) { + if (!dirtyOptions.ignoreDirty()) { + RuntimeException ex; + if (e instanceof RuntimeException) { + ex = (RuntimeException) e; + } else { + ex = new RuntimeException(e); + } + throw ex; + } + if (dirtySink != null) { + DirtyData.Builder<T> builder = DirtyData.builder(); + try { + builder.setData(dirtyData) + .setDirtyType(dirtyType) + .setLabels(dirtyOptions.getLabels()) + .setLogTag(dirtyOptions.getLogTag()) + .setDirtyMessage(e.getMessage()) + .setIdentifier(dirtyOptions.getIdentifier()); + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.ignoreSideOutputErrors()) { + throw new RuntimeException(ex); + } + LOGGER.warn("Dirty sink failed", ex); + } + } + } + + public void setDirtyOptions(DirtyOptions dirtyOptions) { + this.dirtyOptions = dirtyOptions; + } + + public DirtyOptions getDirtyOptions() { + return dirtyOptions; + } + + @Nullable + public DirtySink<T> getDirtySink() { + return dirtySink; + } +} diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java index 0637725c3..89789872a 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java @@ -88,6 +88,22 @@ public enum DirtyType { * Extract RowData error */ EXTRACT_ROWDATA_ERROR("ExtractRowDataError"), + /** + * Index generate error + */ + INDEX_GENERATE_ERROR("IndexGenerateError"), + /** + * Index id generate error + */ + INDEX_ID_GENERATE_ERROR("IndexIdGenerateError"), + /** + * Index routing error + */ + INDEX_ROUTING_ERROR("IndexRoutingError"), + /** + * Document parse error + */ + DOCUMENT_PARSE_ERROR("DocumentParseError"), ; private final String format; diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java index f33fb7fa4..5ada044cb 100644 --- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java +++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandl import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory; import org.apache.flink.util.Preconditions; import org.apache.http.HttpHost; +import org.apache.inlong.sort.base.dirty.DirtySinkHelper; import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase; import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction; import org.elasticsearch.action.ActionRequest; @@ -70,14 +71,15 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel ElasticsearchSinkFunction<T> elasticsearchSinkFunction, ActionRequestFailureHandler failureHandler, RestClientFactory restClientFactory, - String inlongMetric) { - + String inlongMetric, + DirtySinkHelper<Object> dirtySinkHelper) { super( new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory), bulkRequestsConfig, elasticsearchSinkFunction, failureHandler, - inlongMetric); + inlongMetric, + dirtySinkHelper); } /** @@ -96,6 +98,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel private RestClientFactory restClientFactory = restClientBuilder -> { }; private String inlongMetric = null; + private DirtySinkHelper<Object> dirtySinkHelper; /** * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link @@ -120,6 +123,14 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel this.inlongMetric = inlongMetric; } + /** + * Set dirty sink helper + * @param dirtySinkHelper The dirty sink helper + */ + public void setDirtySinkHelper(DirtySinkHelper<Object> dirtySinkHelper) { + this.dirtySinkHelper = dirtySinkHelper; + } + /** * Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to * disable it. @@ -244,7 +255,8 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel elasticsearchSinkFunction, failureHandler, restClientFactory, - inlongMetric); + inlongMetric, + dirtySinkHelper); } @Override diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java index 0b0a3cf58..61dfdb3cb 100644 --- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java +++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java @@ -35,6 +35,7 @@ import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.inlong.sort.base.dirty.DirtySinkHelper; import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory; import org.apache.inlong.sort.elasticsearch.table.KeyExtractor; import org.apache.inlong.sort.elasticsearch.table.RequestFactory; @@ -67,6 +68,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink { private final String inlongMetric; private final String auditHostAndPorts; private final ElasticSearchBuilderProvider builderProvider; + private final DirtySinkHelper<Object> dirtySinkHelper; // -------------------------------------------------------------- // Hack to make configuration testing possible. @@ -83,8 +85,10 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink { Elasticsearch6Configuration config, TableSchema schema, String inlongMetric, - String auditHostAndPorts) { - this(format, config, schema, (ElasticsearchSink.Builder::new), inlongMetric, auditHostAndPorts); + String auditHostAndPorts, + DirtySinkHelper<Object> dirtySinkHelper) { + this(format, config, schema, (ElasticsearchSink.Builder::new), + inlongMetric, auditHostAndPorts, dirtySinkHelper); } Elasticsearch6DynamicSink( @@ -93,13 +97,15 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink { TableSchema schema, ElasticSearchBuilderProvider builderProvider, String inlongMetric, - String auditHostAndPorts) { + String auditHostAndPorts, + DirtySinkHelper<Object> dirtySinkHelper) { this.format = format; this.schema = schema; this.config = config; this.builderProvider = builderProvider; this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; + this.dirtySinkHelper = dirtySinkHelper; } @Override @@ -134,7 +140,8 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink { RoutingExtractor.createRoutingExtractor( schema, config.getRoutingField().orElse(null)), inlongMetric, - auditHostAndPorts); + auditHostAndPorts, + dirtySinkHelper); final ElasticsearchSink.Builder<RowData> builder = builderProvider.createBuilder(config.getHosts(), upsertFunction); @@ -145,6 +152,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink { builder.setBulkFlushInterval(config.getBulkFlushInterval()); builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled()); builder.setInLongMetric(inlongMetric); + builder.setDirtySinkHelper(dirtySinkHelper); config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType); config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries); config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay); diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java index 01833cd3c..501d3ccd7 100644 --- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java +++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java @@ -32,6 +32,10 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.SerializationFormatFactory; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.util.StringUtils; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.DirtySinkHelper; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils; import org.apache.inlong.sort.elasticsearch.table.ElasticsearchValidationUtils; import java.util.Set; @@ -39,6 +43,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION; @@ -99,7 +104,7 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory final EncodingFormat<SerializationSchema<RowData>> format = helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION); - helper.validate(); + helper.validateExcept(DIRTY_PREFIX); Configuration configuration = new Configuration(); context.getCatalogTable().getOptions().forEach(configuration::setString); Elasticsearch6Configuration config = @@ -110,9 +115,12 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory String inlongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(null); String auditHostAndPorts = helper.getOptions().getOptional(INLONG_AUDIT).orElse(null); - + final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(helper.getOptions()); + final DirtySink<Object> dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions); + final DirtySinkHelper<Object> dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink); return new Elasticsearch6DynamicSink( - format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inlongMetric, auditHostAndPorts); + format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), + inlongMetric, auditHostAndPorts, dirtySinkHelper); } private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) { diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java index 10b7cdfb8..9dd1dd90f 100644 --- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java +++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java @@ -19,6 +19,7 @@ package org.apache.inlong.sort.elasticsearch7; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.inlong.sort.base.dirty.DirtySinkHelper; import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase; import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler; @@ -71,14 +72,16 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel ElasticsearchSinkFunction<T> elasticsearchSinkFunction, ActionRequestFailureHandler failureHandler, RestClientFactory restClientFactory, - String inlongMetric) { + String inlongMetric, + DirtySinkHelper<Object> dirtySinkHelper) { super( new Elasticsearch7ApiCallBridge(httpHosts, restClientFactory), bulkRequestsConfig, elasticsearchSinkFunction, failureHandler, - inlongMetric); + inlongMetric, + dirtySinkHelper); } /** @@ -97,6 +100,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel private RestClientFactory restClientFactory = restClientBuilder -> { }; private String inlongMetric = null; + private DirtySinkHelper<Object> dirtySinkHelper; /** * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link @@ -121,6 +125,14 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel this.inlongMetric = inlongMetric; } + /** + * Set dirty sink helper + * @param dirtySinkHelper The dirty sink helper + */ + public void setDirtySinkHelper(DirtySinkHelper<Object> dirtySinkHelper) { + this.dirtySinkHelper = dirtySinkHelper; + } + /** * Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to * disable it. @@ -245,7 +257,8 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel elasticsearchSinkFunction, failureHandler, restClientFactory, - inlongMetric); + inlongMetric, + dirtySinkHelper); } @Override diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java index 29e2f1795..009283330 100644 --- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java +++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java @@ -35,6 +35,7 @@ import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.inlong.sort.base.dirty.DirtySinkHelper; import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory; import org.apache.inlong.sort.elasticsearch.table.KeyExtractor; import org.apache.inlong.sort.elasticsearch.table.RequestFactory; @@ -68,6 +69,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink { private final String inlongMetric; private final String auditHostAndPorts; private final ElasticSearchBuilderProvider builderProvider; + private final DirtySinkHelper<Object> dirtySinkHelper; // -------------------------------------------------------------- // Hack to make configuration testing possible. @@ -84,8 +86,10 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink { Elasticsearch7Configuration config, TableSchema schema, String inlongMetric, - String auditHostAndPorts) { - this(format, config, schema, (ElasticsearchSink.Builder::new), inlongMetric, auditHostAndPorts); + String auditHostAndPorts, + DirtySinkHelper<Object> dirtySinkHelper) { + this(format, config, schema, (ElasticsearchSink.Builder::new), + inlongMetric, auditHostAndPorts, dirtySinkHelper); } Elasticsearch7DynamicSink( @@ -94,13 +98,15 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink { TableSchema schema, ElasticSearchBuilderProvider builderProvider, String inlongMetric, - String auditHostAndPorts) { + String auditHostAndPorts, + DirtySinkHelper<Object> dirtySinkHelper) { this.format = format; this.schema = schema; this.config = config; this.builderProvider = builderProvider; this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; + this.dirtySinkHelper = dirtySinkHelper; } @Override @@ -135,7 +141,8 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink { RoutingExtractor.createRoutingExtractor( schema, config.getRoutingField().orElse(null)), inlongMetric, - auditHostAndPorts); + auditHostAndPorts, + dirtySinkHelper); final ElasticsearchSink.Builder<RowData> builder = builderProvider.createBuilder(config.getHosts(), upsertFunction); @@ -146,6 +153,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink { builder.setBulkFlushInterval(config.getBulkFlushInterval()); builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled()); builder.setInLongMetric(inlongMetric); + builder.setDirtySinkHelper(dirtySinkHelper); config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType); config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries); config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay); diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java index 1abbf1705..a1bfaa90e 100644 --- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java +++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java @@ -32,6 +32,10 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.SerializationFormatFactory; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.util.StringUtils; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.DirtySinkHelper; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils; import org.apache.inlong.sort.elasticsearch.table.ElasticsearchValidationUtils; import java.util.Set; @@ -39,6 +43,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION; @@ -99,7 +104,7 @@ public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory final EncodingFormat<SerializationSchema<RowData>> format = helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION); - helper.validate(); + helper.validateExcept(DIRTY_PREFIX); Configuration configuration = new Configuration(); context.getCatalogTable().getOptions().forEach(configuration::setString); Elasticsearch7Configuration config = @@ -110,9 +115,12 @@ public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory String inlongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(null); String auditHostAndPorts = helper.getOptions().getOptional(INLONG_AUDIT).orElse(null); - + final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(helper.getOptions()); + final DirtySink<Object> dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions); + final DirtySinkHelper<Object> dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink); return new Elasticsearch7DynamicSink( - format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inlongMetric, auditHostAndPorts); + format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), + inlongMetric, auditHostAndPorts, dirtySinkHelper); } private void validate(Elasticsearch7Configuration config, Configuration originalConfiguration) { diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java index 50fc2a8f0..c21685de0 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java @@ -28,23 +28,32 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.InstantiationUtil; +import org.apache.inlong.sort.base.dirty.DirtySinkHelper; +import org.apache.inlong.sort.base.dirty.DirtyType; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.rest.RestStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -76,6 +85,8 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSinkBase.class); + // ------------------------------------------------------------------------ // Internal bulk processor configuration // ------------------------------------------------------------------------ @@ -125,6 +136,7 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends */ private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>(); private final String inlongMetric; + private final DirtySinkHelper<Object> dirtySinkHelper; /** * If true, the producer will wait until all outstanding action requests have been sent to * Elasticsearch. @@ -170,8 +182,10 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction, ActionRequestFailureHandler failureHandler, - String inlongMetric) { + String inlongMetric, + DirtySinkHelper<Object> dirtySinkHelper) { this.inlongMetric = inlongMetric; + this.dirtySinkHelper = dirtySinkHelper; this.callBridge = checkNotNull(callBridge); this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction); this.failureHandler = checkNotNull(failureHandler); @@ -275,9 +289,9 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends if (metricOption != null) { sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); } - + dirtySinkHelper.open(parameters); callBridge.verifyClientConnection(client); - bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData)); + bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData, dirtySinkHelper)); requestIndexer = callBridge.createBulkProcessorIndexer( bulkProcessor, flushOnCheckpoint, numPendingRequests); @@ -462,10 +476,13 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends private class BulkProcessorListener implements BulkProcessor.Listener { - private SinkMetricData sinkMetricData; + private final SinkMetricData sinkMetricData; + private final DirtySinkHelper<Object> dirtySinkHelper; - public BulkProcessorListener(SinkMetricData sinkMetricData) { + public BulkProcessorListener(SinkMetricData sinkMetricData, + DirtySinkHelper<Object> dirtySinkHelper) { this.sinkMetricData = sinkMetricData; + this.dirtySinkHelper = dirtySinkHelper; } @Override @@ -477,42 +494,16 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends if (response.hasFailures()) { BulkItemResponse itemResponse; Throwable failure; - RestStatus restStatus; - DocWriteRequest actionRequest; - + int restStatus; try { for (int i = 0; i < response.getItems().length; i++) { itemResponse = response.getItems()[i]; failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { - restStatus = itemResponse.getFailure().getStatus(); - actionRequest = request.requests().get(i); - if (sinkMetricData != null) { - sinkMetricData.invokeDirty(1, 0); - } - if (restStatus == null) { - if (actionRequest instanceof ActionRequest) { - failureHandler.onFailure( - (ActionRequest) actionRequest, - failure, - -1, - failureRequestIndexer); - } else { - throw new UnsupportedOperationException( - "The sink currently only supports ActionRequests"); - } - } else { - if (actionRequest instanceof ActionRequest) { - failureHandler.onFailure( - (ActionRequest) actionRequest, - failure, - restStatus.getStatus(), - failureRequestIndexer); - } else { - throw new UnsupportedOperationException( - "The sink currently only supports ActionRequests"); - } - } + restStatus = itemResponse.getFailure().getStatus() != null + ? itemResponse.getFailure().getStatus().getStatus() + : -1; + handleFailure(request.requests().get(i), restStatus, failure); } } } catch (Throwable t) { @@ -521,7 +512,6 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends failureThrowable.compareAndSet(null, t); } } - if (flushOnCheckpoint) { numPendingRequests.getAndAdd(-request.numberOfActions()); } @@ -530,27 +520,59 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { try { - for (DocWriteRequest writeRequest : request.requests()) { - if (sinkMetricData != null) { - sinkMetricData.invokeDirty(1, 0); - } - if (writeRequest instanceof ActionRequest) { - failureHandler.onFailure( - (ActionRequest) writeRequest, failure, -1, failureRequestIndexer); - } else { - throw new UnsupportedOperationException( - "The sink currently only supports ActionRequests"); - } + for (DocWriteRequest<?> writeRequest : request.requests()) { + handleFailure(writeRequest, -1, failure); } } catch (Throwable t) { // fail the sink and skip the rest of the items // if the failure handler decides to throw an exception failureThrowable.compareAndSet(null, t); } - if (flushOnCheckpoint) { numPendingRequests.getAndAdd(-request.numberOfActions()); } } + + private void handleFailure(DocWriteRequest<?> writeRequest, int restStatus, Throwable failure) + throws Throwable { + // Only supports dirty data sink when the failureHandler is IgnoringFailureHandler + if (failureHandler instanceof IgnoringFailureHandler) { + if (!(writeRequest instanceof ActionRequest)) { + LOGGER.error("The sink currently only supports ActionRequests"); + dirtySinkHelper.invoke(writeRequest.id(), DirtyType.UNSUPPORTED_DATA_TYPE, + new UnsupportedOperationException("The sink currently only supports ActionRequests")); + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(1, writeRequest.id().getBytes(StandardCharsets.UTF_8).length); + } + } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) { + String dirtyData; + if (writeRequest instanceof UpdateRequest) { + dirtyData = ((UpdateRequest) writeRequest).doc().source().utf8ToString(); + } else if (writeRequest instanceof IndexRequest) { + dirtyData = ((IndexRequest) writeRequest).source().utf8ToString(); + } else { + dirtyData = writeRequest.id(); + } + LOGGER.error(String.format("Elasticsearch parse exception, raw data: %s", dirtyData), failure); + dirtySinkHelper.invoke(dirtyData, DirtyType.DOCUMENT_PARSE_ERROR, failure); + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(1, dirtyData.getBytes(StandardCharsets.UTF_8).length); + } + } else { + throw failure; + } + } else { + if (writeRequest instanceof ActionRequest) { + failureHandler.onFailure( + (ActionRequest) writeRequest, + failure, + restStatus, + failureRequestIndexer); + } else { + throw new UnsupportedOperationException( + "The sink currently only supports ActionRequests"); + } + } + } } } diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java index 4ff80bcca..fe605e3c1 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java @@ -26,13 +26,14 @@ import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.inlong.sort.base.dirty.DirtySinkHelper; +import org.apache.inlong.sort.base.dirty.DirtyType; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; -import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; import org.apache.inlong.sort.base.metric.SinkMetricData; @@ -42,9 +43,12 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.xcontent.XContentType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.function.Function; @@ -57,6 +61,8 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R private static final long serialVersionUID = 1L; + private static final Logger LOGGER = LoggerFactory.getLogger(RowElasticsearchSinkFunction.class); + private final IndexGenerator indexGenerator; private final String docType; private final SerializationSchema<RowData> serializationSchema; @@ -73,6 +79,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R private transient RuntimeContext runtimeContext; private SinkMetricData sinkMetricData; + private final DirtySinkHelper<Object> dirtySinkHelper; public RowElasticsearchSinkFunction( IndexGenerator indexGenerator, @@ -83,7 +90,8 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R Function<RowData, String> createKey, @Nullable Function<RowData, String> createRouting, String inlongMetric, - String auditHostAndPorts) { + String auditHostAndPorts, + DirtySinkHelper<Object> dirtySinkHelper) { this.indexGenerator = Preconditions.checkNotNull(indexGenerator); this.docType = docType; this.serializationSchema = Preconditions.checkNotNull(serializationSchema); @@ -93,6 +101,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R this.createRouting = createRouting; this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; + this.dirtySinkHelper = dirtySinkHelper; } @Override @@ -146,55 +155,94 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R @Override public void process(RowData element, RuntimeContext ctx, RequestIndexer indexer) { + final byte[] document; + try { + document = serializationSchema.serialize(element); + } catch (Exception e) { + LOGGER.error(String.format("Serialize error, raw data: %s", element), e); + dirtySinkHelper.invoke(element, DirtyType.SERIALIZE_ERROR, e); + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(1, element.toString().getBytes(StandardCharsets.UTF_8).length); + } + return; + } + final String key; + try { + key = createKey.apply(element); + } catch (Exception e) { + LOGGER.error(String.format("Generate index id error, raw data: %s", element), e); + dirtySinkHelper.invoke(element, DirtyType.INDEX_ID_GENERATE_ERROR, e); + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(1, document.length); + } + return; + } + final String index; + try { + index = indexGenerator.generate(element); + } catch (Exception e) { + LOGGER.error(String.format("Generate index error, raw data: %s", element), e); + dirtySinkHelper.invoke(element, DirtyType.INDEX_GENERATE_ERROR, e); + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(1, document.length); + } + return; + } + addDocument(element, key, index, document, indexer); + } + + private void addDocument(RowData element, String key, String index, byte[] document, RequestIndexer indexer) { + DocWriteRequest<?> request; switch (element.getRowKind()) { case INSERT: case UPDATE_AFTER: - processUpsert(element, indexer); + if (key != null) { + request = requestFactory.createUpdateRequest(index, docType, key, contentType, document); + if (addRouting(request, element, document)) { + indexer.add((UpdateRequest) request); + sendMetrics(document); + } + } else { + request = requestFactory.createIndexRequest(index, docType, key, contentType, document); + if (addRouting(request, element, document)) { + indexer.add((IndexRequest) request); + sendMetrics(document); + } + } break; case UPDATE_BEFORE: case DELETE: - processDelete(element, indexer); + request = requestFactory.createDeleteRequest(index, docType, key); + if (addRouting(request, element, document)) { + indexer.add((DeleteRequest) request); + sendMetrics(document); + } break; default: - throw new TableException("Unsupported message kind: " + element.getRowKind()); + LOGGER.error(String.format("The type of element should be 'RowData' only, raw data: %s", element)); + dirtySinkHelper.invoke(element, DirtyType.UNSUPPORTED_DATA_TYPE, + new RuntimeException("The type of element should be 'RowData' only.")); + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(1, document.length); + } } } - private void processUpsert(RowData row, RequestIndexer indexer) { - final byte[] document = serializationSchema.serialize(row); - final String key = createKey.apply(row); - sendMetrics(document); - if (key != null) { - final UpdateRequest updateRequest = - requestFactory.createUpdateRequest( - indexGenerator.generate(row), docType, key, contentType, document); - addRouting(updateRequest, row); - indexer.add(updateRequest); - } else { - final IndexRequest indexRequest = - requestFactory.createIndexRequest( - indexGenerator.generate(row), docType, key, contentType, document); - addRouting(indexRequest, row); - indexer.add(indexRequest); - } - } - - private void processDelete(RowData row, RequestIndexer indexer) { - // the serialization is just for metrics - final byte[] document = serializationSchema.serialize(row); - sendMetrics(document); - final String key = createKey.apply(row); - final DeleteRequest deleteRequest = - requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key); - addRouting(deleteRequest, row); - indexer.add(deleteRequest); - } - - private void addRouting(DocWriteRequest<?> request, RowData row) { + private boolean addRouting(DocWriteRequest<?> request, RowData row, byte[] document) { if (null != createRouting) { - String routing = createRouting.apply(row); - request.routing(routing); + try { + String routing = createRouting.apply(row); + request.routing(routing); + } catch (Exception e) { + LOGGER.error(String.format("Routing error, raw data: %s", row), e); + dirtySinkHelper.invoke(row, DirtyType.INDEX_ROUTING_ERROR, e); + if (sinkMetricData != null) { + sinkMetricData.invokeDirty(1, document.length); + } + return false; + } } + return true; } @Override diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java index 9e7d4e74d..bf4ef0505 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java @@ -21,6 +21,7 @@ package org.apache.inlong.sort.parser; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -74,8 +75,15 @@ public abstract class ElasticsearchSqlParseTest extends AbstractTestBase { new FieldInfo("name", new StringFormatInfo()))); CsvFormat csvFormat = new CsvFormat(); csvFormat.setDisableQuoteCharacter(true); + Map<String, String> properties = new LinkedHashMap<>(); + properties.put("dirty.side-output.connector", "log"); + properties.put("dirty.ignore", "true"); + properties.put("dirty.side-output.enable", "true"); + properties.put("dirty.side-output.format", "csv"); + properties.put("dirty.side-output.labels", + "SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&table=test"); return new ElasticsearchLoadNode("2", "kafka_output", fields, relations, null, null, - 2, null, + 2, properties, "test", "http://localhost:9200", "elastic", "my_password", null, "age", version); }