This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit bbffb2e840b208b944ab3874e65842a49ed264aa Author: Oneal65 <liush...@foxmail.com> AuthorDate: Fri Sep 2 20:31:15 2022 +0800 [INLONG-5608][Sort] Reformat connector codes for reporting metrics (#5612) --- .../sort/elasticsearch6/ElasticsearchSink.java | 18 +++++----- .../table/Elasticsearch6DynamicSink.java | 18 +++++----- .../table/Elasticsearch6DynamicSinkFactory.java | 4 +-- .../sort/elasticsearch7/ElasticsearchSink.java | 18 +++++----- .../table/Elasticsearch7DynamicSink.java | 18 +++++----- .../table/Elasticsearch7DynamicSinkFactory.java | 4 +-- .../sort/elasticsearch/ElasticsearchSinkBase.java | 17 +++++----- .../table/RowElasticsearchSinkFunction.java | 20 ++++++------ .../sort/hbase/HBase2DynamicTableFactory.java | 2 +- .../org/apache/inlong/sort/hive/HiveTableSink.java | 12 +++---- .../hive/filesystem/AbstractStreamingWriter.java | 18 +++++----- .../sort/hive/filesystem/CompactFileWriter.java | 4 +-- .../sort/hive/filesystem/StreamingFileWriter.java | 4 +-- .../inlong/sort/hive/filesystem/StreamingSink.java | 8 ++--- .../sort/hive/table/HiveTableInlongFactory.java | 4 +-- .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 14 ++++---- .../sort/iceberg/sink/IcebergStreamWriter.java | 18 +++++----- .../jdbc/internal/JdbcBatchingOutputFormat.java | 38 +++++++++++----------- .../jdbc/internal/TableJdbcUpsertOutputFormat.java | 8 ++--- .../jdbc/table/JdbcDynamicOutputFormatBuilder.java | 10 +++--- .../sort/jdbc/table/JdbcDynamicTableFactory.java | 4 +-- .../sort/jdbc/table/JdbcDynamicTableSink.java | 14 ++++---- .../inlong/sort/kafka/FlinkKafkaProducer.java | 30 ++++++++--------- .../apache/inlong/sort/kafka/KafkaDynamicSink.java | 14 ++++---- .../table/DynamicKafkaDeserializationSchema.java | 22 ++++++------- .../sort/kafka/table/KafkaDynamicSource.java | 10 +++--- .../sort/kafka/table/KafkaDynamicTableFactory.java | 16 ++++----- .../table/UpsertKafkaDynamicTableFactory.java | 4 +-- .../sort/cdc/mongodb/DebeziumSourceFunction.java | 3 +- .../mongodb/table/MongoDBTableSourceFactory.java | 2 +- .../sort/cdc/debezium/DebeziumSourceFunction.java | 3 +- .../mysql/table/MySqlTableInlongSourceFactory.java | 2 +- .../sort/cdc/oracle/DebeziumSourceFunction.java | 3 +- .../cdc/oracle/table/OracleTableSourceFactory.java | 2 +- .../DebeziumSourceFunction.java | 3 +- .../cdc/postgres/table/PostgreSQLTableFactory.java | 2 +- .../pulsar/table/PulsarDynamicTableFactory.java | 8 +++-- .../table/UpsertPulsarDynamicTableFactory.java | 2 +- .../sqlserver/table/DebeziumSourceFunction.java | 16 ++++----- .../cdc/sqlserver/table/SqlServerTableFactory.java | 2 +- 40 files changed, 209 insertions(+), 210 deletions(-) diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java index c30787d7c..32848dfa6 100644 --- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java +++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java @@ -70,14 +70,14 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel ElasticsearchSinkFunction<T> elasticsearchSinkFunction, ActionRequestFailureHandler failureHandler, RestClientFactory restClientFactory, - String inLongMetric) { + String inlongMetric) { super( new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory), bulkRequestsConfig, elasticsearchSinkFunction, failureHandler, - inLongMetric); + inlongMetric); } /** @@ -95,7 +95,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler(); private RestClientFactory restClientFactory = restClientBuilder -> { }; - private String inLongMetric = null; + private String inlongMetric = null; /** * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link @@ -114,10 +114,10 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel /** * set InLongMetric for reporting metrics - * @param inLongMetric + * @param inlongMetric */ - public void setInLongMetric(String inLongMetric) { - this.inLongMetric = inLongMetric; + public void setInLongMetric(String inlongMetric) { + this.inlongMetric = inlongMetric; } /** @@ -244,7 +244,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel elasticsearchSinkFunction, failureHandler, restClientFactory, - inLongMetric + inlongMetric ); } @@ -262,7 +262,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel && Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig) && Objects.equals(failureHandler, builder.failureHandler) && Objects.equals(restClientFactory, builder.restClientFactory) - && Objects.equals(inLongMetric, builder.inLongMetric); + && Objects.equals(inlongMetric, builder.inlongMetric); } @Override @@ -273,7 +273,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel bulkRequestsConfig, failureHandler, restClientFactory, - inLongMetric); + inlongMetric); } } } diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java index 2550edeb4..bff1ad476 100644 --- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java +++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java @@ -64,7 +64,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink { private final EncodingFormat<SerializationSchema<RowData>> format; private final TableSchema schema; private final Elasticsearch6Configuration config; - private final String inLongMetric; + private final String inlongMetric; private final String auditHostAndPorts; private final ElasticSearchBuilderProvider builderProvider; @@ -82,9 +82,9 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink { EncodingFormat<SerializationSchema<RowData>> format, Elasticsearch6Configuration config, TableSchema schema, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { - this(format, config, schema, (ElasticsearchSink.Builder::new), inLongMetric, auditHostAndPorts); + this(format, config, schema, (ElasticsearchSink.Builder::new), inlongMetric, auditHostAndPorts); } Elasticsearch6DynamicSink( @@ -92,13 +92,13 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink { Elasticsearch6Configuration config, TableSchema schema, ElasticSearchBuilderProvider builderProvider, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { this.format = format; this.schema = schema; this.config = config; this.builderProvider = builderProvider; - this.inLongMetric = inLongMetric; + this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; } @@ -133,7 +133,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink { KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()), RoutingExtractor.createRoutingExtractor( schema, config.getRoutingField().orElse(null)), - inLongMetric, + inlongMetric, auditHostAndPorts); final ElasticsearchSink.Builder<RowData> builder = @@ -144,7 +144,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink { builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20)); builder.setBulkFlushInterval(config.getBulkFlushInterval()); builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled()); - builder.setInLongMetric(inLongMetric); + builder.setInLongMetric(inlongMetric); config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType); config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries); config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay); @@ -198,12 +198,12 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink { && Objects.equals(schema, that.schema) && Objects.equals(config, that.config) && Objects.equals(builderProvider, that.builderProvider) - && Objects.equals(inLongMetric, that.inLongMetric); + && Objects.equals(inlongMetric, that.inlongMetric); } @Override public int hashCode() { - return Objects.hash(format, schema, config, builderProvider, inLongMetric); + return Objects.hash(format, schema, config, builderProvider, inlongMetric); } @FunctionalInterface diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java index ce1581ddd..df5937b64 100644 --- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java +++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java @@ -107,12 +107,12 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory validate(config, configuration); - String inLongMetric = helper.getOptions().get(INLONG_METRIC); + String inlongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(null); String auditHostAndPorts = helper.getOptions().getOptional(INLONG_AUDIT).orElse(null); return new Elasticsearch6DynamicSink( - format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inLongMetric, auditHostAndPorts); + format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inlongMetric, auditHostAndPorts); } private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) { diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java index eab2dd119..10b7cdfb8 100644 --- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java +++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java @@ -71,14 +71,14 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel ElasticsearchSinkFunction<T> elasticsearchSinkFunction, ActionRequestFailureHandler failureHandler, RestClientFactory restClientFactory, - String inLongMetric) { + String inlongMetric) { super( new Elasticsearch7ApiCallBridge(httpHosts, restClientFactory), bulkRequestsConfig, elasticsearchSinkFunction, failureHandler, - inLongMetric); + inlongMetric); } /** @@ -96,7 +96,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler(); private RestClientFactory restClientFactory = restClientBuilder -> { }; - private String inLongMetric = null; + private String inlongMetric = null; /** * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link @@ -115,10 +115,10 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel /** * set InLongMetric for reporting metrics - * @param inLongMetric + * @param inlongMetric */ - public void setInLongMetric(String inLongMetric) { - this.inLongMetric = inLongMetric; + public void setInLongMetric(String inlongMetric) { + this.inlongMetric = inlongMetric; } /** @@ -245,7 +245,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel elasticsearchSinkFunction, failureHandler, restClientFactory, - inLongMetric); + inlongMetric); } @Override @@ -262,7 +262,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel && Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig) && Objects.equals(failureHandler, builder.failureHandler) && Objects.equals(restClientFactory, builder.restClientFactory) - && Objects.equals(inLongMetric, builder.inLongMetric); + && Objects.equals(inlongMetric, builder.inlongMetric); } @Override @@ -273,7 +273,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel bulkRequestsConfig, failureHandler, restClientFactory, - inLongMetric); + inlongMetric); } } } diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java index 202990336..392e35dbf 100644 --- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java +++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java @@ -65,7 +65,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink { private final EncodingFormat<SerializationSchema<RowData>> format; private final TableSchema schema; private final Elasticsearch7Configuration config; - private final String inLongMetric; + private final String inlongMetric; private final String auditHostAndPorts; private final ElasticSearchBuilderProvider builderProvider; @@ -83,9 +83,9 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink { EncodingFormat<SerializationSchema<RowData>> format, Elasticsearch7Configuration config, TableSchema schema, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { - this(format, config, schema, (ElasticsearchSink.Builder::new), inLongMetric, auditHostAndPorts); + this(format, config, schema, (ElasticsearchSink.Builder::new), inlongMetric, auditHostAndPorts); } Elasticsearch7DynamicSink( @@ -93,13 +93,13 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink { Elasticsearch7Configuration config, TableSchema schema, ElasticSearchBuilderProvider builderProvider, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { this.format = format; this.schema = schema; this.config = config; this.builderProvider = builderProvider; - this.inLongMetric = inLongMetric; + this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; } @@ -134,7 +134,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink { KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()), RoutingExtractor.createRoutingExtractor( schema, config.getRoutingField().orElse(null)), - inLongMetric, + inlongMetric, auditHostAndPorts); final ElasticsearchSink.Builder<RowData> builder = @@ -145,7 +145,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink { builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20)); builder.setBulkFlushInterval(config.getBulkFlushInterval()); builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled()); - builder.setInLongMetric(inLongMetric); + builder.setInLongMetric(inlongMetric); config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType); config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries); config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay); @@ -199,13 +199,13 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink { && Objects.equals(schema, that.schema) && Objects.equals(config, that.config) && Objects.equals(builderProvider, that.builderProvider) - && Objects.equals(inLongMetric, that.inLongMetric) + && Objects.equals(inlongMetric, that.inlongMetric) && Objects.equals(auditHostAndPorts, that.auditHostAndPorts); } @Override public int hashCode() { - return Objects.hash(format, schema, config, builderProvider, inLongMetric, auditHostAndPorts); + return Objects.hash(format, schema, config, builderProvider, inlongMetric, auditHostAndPorts); } @FunctionalInterface diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java index 0f27db9f1..d29c646bd 100644 --- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java +++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java @@ -107,12 +107,12 @@ public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory validate(config, configuration); - String inLongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(null); + String inlongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(null); String auditHostAndPorts = helper.getOptions().getOptional(INLONG_AUDIT).orElse(null); return new Elasticsearch7DynamicSink( - format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inLongMetric, auditHostAndPorts); + format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inlongMetric, auditHostAndPorts); } private void validate(Elasticsearch7Configuration config, Configuration originalConfiguration) { diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java index da5b5f9ef..000e1c23a 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java @@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.inlong.sort.base.Constants.DELIMITER; /** * Base class for all Flink Elasticsearch Sinks. @@ -121,7 +122,7 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends * sink is closed. */ private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>(); - private final String inLongMetric; + private final String inlongMetric; /** * If true, the producer will wait until all outstanding action requests have been sent to * Elasticsearch. @@ -167,8 +168,8 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction, ActionRequestFailureHandler failureHandler, - String inLongMetric) { - this.inLongMetric = inLongMetric; + String inlongMetric) { + this.inlongMetric = inlongMetric; this.callBridge = checkNotNull(callBridge); this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction); this.failureHandler = checkNotNull(failureHandler); @@ -265,11 +266,11 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends @Override public void open(Configuration parameters) throws Exception { client = callBridge.createClient(userConfig); - if (inLongMetric != null && !inLongMetric.isEmpty()) { - String[] inLongMetricArray = inLongMetric.split("&"); - String groupId = inLongMetricArray[0]; - String streamId = inLongMetricArray[1]; - String nodeId = inLongMetricArray[2]; + if (inlongMetric != null && !inlongMetric.isEmpty()) { + String[] inlongMetricArray = inlongMetric.split(DELIMITER); + String groupId = inlongMetricArray[0]; + String streamId = inlongMetricArray[1]; + String nodeId = inlongMetricArray[2]; sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup()); sinkMetricData.registerMetricsForDirtyBytes(); sinkMetricData.registerMetricsForDirtyRecords(); diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java index 80abeef54..0ae93231d 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java @@ -55,7 +55,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R private final XContentType contentType; private final RequestFactory requestFactory; private final Function<RowData, String> createKey; - private final String inLongMetric; + private final String inlongMetric; private final String auditHostAndPorts; private final Function<RowData, String> createRouting; @@ -77,7 +77,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R RequestFactory requestFactory, Function<RowData, String> createKey, @Nullable Function<RowData, String> createRouting, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { this.indexGenerator = Preconditions.checkNotNull(indexGenerator); this.docType = docType; @@ -86,7 +86,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R this.requestFactory = Preconditions.checkNotNull(requestFactory); this.createKey = Preconditions.checkNotNull(createKey); this.createRouting = createRouting; - this.inLongMetric = inLongMetric; + this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; } @@ -94,11 +94,11 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R public void open(RuntimeContext ctx) { indexGenerator.open(); this.runtimeContext = ctx; - if (inLongMetric != null && !inLongMetric.isEmpty()) { - String[] inLongMetricArray = inLongMetric.split("&"); - groupId = inLongMetricArray[0]; - streamId = inLongMetricArray[1]; - String nodeId = inLongMetricArray[2]; + if (inlongMetric != null && !inlongMetric.isEmpty()) { + String[] inlongMetricArray = inlongMetric.split(DELIMITER); + groupId = inlongMetricArray[0]; + streamId = inlongMetricArray[1]; + String nodeId = inlongMetricArray[2]; sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup()); sinkMetricData.registerMetricsForNumBytesOut(); sinkMetricData.registerMetricsForNumRecordsOut(); @@ -202,7 +202,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R && contentType == that.contentType && Objects.equals(requestFactory, that.requestFactory) && Objects.equals(createKey, that.createKey) - && Objects.equals(inLongMetric, that.inLongMetric); + && Objects.equals(inlongMetric, that.inlongMetric); } @Override @@ -214,6 +214,6 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R contentType, requestFactory, createKey, - inLongMetric); + inlongMetric); } } diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java index 63e3f87a4..da14c947a 100644 --- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java +++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java @@ -104,7 +104,7 @@ public class HBase2DynamicTableFactory HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions); String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema); - String inlongMetric = tableOptions.get(INLONG_METRIC); + String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); String inlongAudit = tableOptions.get(INLONG_AUDIT); ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit); diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java index 821cfe72c..7d0048cea 100644 --- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java +++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java @@ -118,7 +118,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap<>(); private boolean overwrite = false; private boolean dynamicGrouping = false; - private String inLongMetric; + private String inlongMetric; private String auditHostAndPorts; public HiveTableSink( @@ -127,7 +127,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su ObjectIdentifier identifier, CatalogTable table, @Nullable Integer configuredParallelism, - final String inLongMetric, + final String inlongMetric, final String auditHostAndPorts) { this.flinkConf = flinkConf; this.jobConf = jobConf; @@ -140,7 +140,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema()); this.configuredParallelism = configuredParallelism; - this.inLongMetric = inLongMetric; + this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; } @@ -339,7 +339,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su createCompactReaderFactory(sd, tableProps), compactionSize, parallelism, - inLongMetric, + inlongMetric, auditHostAndPorts); } else { writerStream = @@ -348,7 +348,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su bucketCheckInterval, builder, parallelism, - inLongMetric, + inlongMetric, auditHostAndPorts); } @@ -492,7 +492,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su identifier, catalogTable, configuredParallelism, - inLongMetric, + inlongMetric, auditHostAndPorts); sink.staticPartitionSpec = staticPartitionSpec; sink.overwrite = overwrite; diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java index 16a7bd528..5d0ca7bf0 100644 --- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java @@ -56,7 +56,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe bucketsBuilder; @Nullable - private String inLongMetric; + private String inlongMetric; @Nullable private String auditHostAndPorts; @@ -77,11 +77,11 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe StreamingFileSink.BucketsBuilder< IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>> bucketsBuilder, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { this.bucketCheckInterval = bucketCheckInterval; this.bucketsBuilder = bucketsBuilder; - this.inLongMetric = inLongMetric; + this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -111,13 +111,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe @Override public void open() throws Exception { super.open(); - if (inLongMetric != null) { - String[] inLongMetricArray = inLongMetric.split(DELIMITER); - String inLongGroupId = inLongMetricArray[0]; - String inLongStreamId = inLongMetricArray[1]; - String nodeId = inLongMetricArray[2]; + if (inlongMetric != null) { + String[] inlongMetricArray = inlongMetric.split(DELIMITER); + String inlongGroupId = inlongMetricArray[0]; + String inlongStreamId = inlongMetricArray[1]; + String nodeId = inlongMetricArray[2]; metricData = new SinkMetricData( - inLongGroupId, inLongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts); + inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts); metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter()); metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter()); metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter()); diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/CompactFileWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/CompactFileWriter.java index 22e4b3a5d..2c368ba19 100644 --- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/CompactFileWriter.java +++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/CompactFileWriter.java @@ -36,9 +36,9 @@ public class CompactFileWriter<T> StreamingFileSink.BucketsBuilder< T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { - super(bucketCheckInterval, bucketsBuilder, inLongMetric, auditHostAndPorts); + super(bucketCheckInterval, bucketsBuilder, inlongMetric, auditHostAndPorts); } @Override diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingFileWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingFileWriter.java index 43703598a..b0c8aee26 100644 --- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingFileWriter.java +++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingFileWriter.java @@ -45,9 +45,9 @@ public class StreamingFileWriter<IN> extends AbstractStreamingWriter<IN, Partiti StreamingFileSink.BucketsBuilder< IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>> bucketsBuilder, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { - super(bucketCheckInterval, bucketsBuilder, inLongMetric, auditHostAndPorts); + super(bucketCheckInterval, bucketsBuilder, inlongMetric, auditHostAndPorts); } @Override diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java index 31bd6c1b6..1c7663064 100644 --- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java +++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java @@ -65,10 +65,10 @@ public class StreamingSink { T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, int parallelism, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { StreamingFileWriter<T> fileWriter = - new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder, inLongMetric, auditHostAndPorts); + new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder, inlongMetric, auditHostAndPorts); return inputStream .transform( StreamingFileWriter.class.getSimpleName(), @@ -92,10 +92,10 @@ public class StreamingSink { CompactReader.Factory<T> readFactory, long targetFileSize, int parallelism, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { CompactFileWriter<T> writer = new CompactFileWriter<>( - bucketCheckInterval, bucketsBuilder, inLongMetric, auditHostAndPorts); + bucketCheckInterval, bucketsBuilder, inlongMetric, auditHostAndPorts); SupplierWithException<FileSystem, IOException> fsSupplier = (SupplierWithException<FileSystem, IOException> & Serializable) diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java index 9dbe065aa..5a744cd0b 100644 --- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java +++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java @@ -102,7 +102,7 @@ public class HiveTableInlongFactory implements DynamicTableSourceFactory, Dynami Integer configuredParallelism = Configuration.fromMap(context.getCatalogTable().getOptions()) .get(FileSystemOptions.SINK_PARALLELISM); - final String inLongMetric = context.getCatalogTable().getOptions() + final String inlongMetric = context.getCatalogTable().getOptions() .getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()); final String auditHostAndPorts = context.getCatalogTable().getOptions() .getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()); @@ -113,7 +113,7 @@ public class HiveTableInlongFactory implements DynamicTableSourceFactory, Dynami context.getObjectIdentifier(), context.getCatalogTable(), configuredParallelism, - inLongMetric, + inlongMetric, auditHostAndPorts); } else { return FactoryUtil.createTableSink( diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java index a7099c3cc..d86857338 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java @@ -134,7 +134,7 @@ public class FlinkSink { private boolean upsert = false; private List<String> equalityFieldColumns = null; private String uidPrefix = null; - private String inLongMetric = null; + private String inlongMetric = null; private String auditHostAndPorts = null; private Builder() { @@ -198,12 +198,12 @@ public class FlinkSink { /** * Add metric output for iceberg writer - * @param inLongMetric + * @param inlongMetric * @param auditHostAndPorts * @return */ - public Builder metric(String inLongMetric, String auditHostAndPorts) { - this.inLongMetric = inLongMetric; + public Builder metric(String inlongMetric, String auditHostAndPorts) { + this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; return this; } @@ -400,7 +400,7 @@ public class FlinkSink { } IcebergStreamWriter<RowData> streamWriter = createStreamWriter( - table, flinkRowType, equalityFieldIds, upsertMode, inLongMetric, auditHostAndPorts); + table, flinkRowType, equalityFieldIds, upsertMode, inlongMetric, auditHostAndPorts); int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism; SingleOutputStreamOperator<WriteResult> writerStream = input @@ -474,7 +474,7 @@ public class FlinkSink { RowType flinkRowType, List<Integer> equalityFieldIds, boolean upsert, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { Preconditions.checkArgument(table != null, "Iceberg table should't be null"); Map<String, String> props = table.properties(); @@ -486,7 +486,7 @@ public class FlinkSink { serializableTable, flinkRowType, targetFileSize, fileFormat, equalityFieldIds, upsert); - return new IcebergStreamWriter<>(table.name(), taskWriterFactory, inLongMetric, auditHostAndPorts); + return new IcebergStreamWriter<>(table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts); } private static FileFormat getFileFormat(Map<String, String> properties) { diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java index 118a97e15..75eca46c5 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java @@ -43,7 +43,7 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> private final String fullTableName; private final TaskWriterFactory<T> taskWriterFactory; - private final String inLongMetric; + private final String inlongMetric; private final String auditHostAndPorts; private transient TaskWriter<T> writer; @@ -55,11 +55,11 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> IcebergStreamWriter( String fullTableName, TaskWriterFactory<T> taskWriterFactory, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { this.fullTableName = fullTableName; this.taskWriterFactory = taskWriterFactory; - this.inLongMetric = inLongMetric; + this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -76,13 +76,13 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> this.writer = taskWriterFactory.create(); // Initialize metric - if (inLongMetric != null) { - String[] inLongMetricArray = inLongMetric.split(DELIMITER); - String inLongGroupId = inLongMetricArray[0]; - String inLongStreamId = inLongMetricArray[1]; - String nodeId = inLongMetricArray[2]; + if (inlongMetric != null) { + String[] inlongMetricArray = inlongMetric.split(DELIMITER); + String inlongGroupId = inlongMetricArray[0]; + String inlongStreamId = inlongMetricArray[1]; + String nodeId = inlongMetricArray[2]; metricData = new SinkMetricData( - inLongGroupId, inLongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts); + inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts); metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter()); metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter()); metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter()); diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java index 98f0f0cf6..66af78c4d 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java @@ -69,7 +69,7 @@ public class JdbcBatchingOutputFormat< private final JdbcExecutionOptions executionOptions; private final StatementExecutorFactory<JdbcExec> statementExecutorFactory; private final RecordExtractor<In, JdbcIn> jdbcRecordExtractor; - private final String inLongMetric; + private final String inlongMetric; private final String auditHostAndPorts; private transient JdbcExec jdbcStatementExecutor; private transient int batchCount = 0; @@ -80,8 +80,8 @@ public class JdbcBatchingOutputFormat< private transient RuntimeContext runtimeContext; private SinkMetricData sinkMetricData; - private String inLongGroupId; - private String inLongStreamId; + private String inlongGroupId; + private String inlongStreamId; private transient AuditImp auditImp; private Long dataSize = 0L; private Long rowSize = 0L; @@ -91,13 +91,13 @@ public class JdbcBatchingOutputFormat< @Nonnull JdbcExecutionOptions executionOptions, @Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory, @Nonnull RecordExtractor<In, JdbcIn> recordExtractor, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { super(connectionProvider); this.executionOptions = checkNotNull(executionOptions); this.statementExecutorFactory = checkNotNull(statementExecutorFactory); this.jdbcRecordExtractor = checkNotNull(recordExtractor); - this.inLongMetric = inLongMetric; + this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; } @@ -130,12 +130,12 @@ public class JdbcBatchingOutputFormat< public void open(int taskNumber, int numTasks) throws IOException { super.open(taskNumber, numTasks); this.runtimeContext = getRuntimeContext(); - if (inLongMetric != null && !inLongMetric.isEmpty()) { - String[] inLongMetricArray = inLongMetric.split(DELIMITER); - inLongGroupId = inLongMetricArray[0]; - inLongStreamId = inLongMetricArray[1]; - String nodeId = inLongMetricArray[2]; - sinkMetricData = new SinkMetricData(inLongGroupId, inLongStreamId, nodeId, runtimeContext.getMetricGroup()); + if (inlongMetric != null && !inlongMetric.isEmpty()) { + String[] inlongMetricArray = inlongMetric.split(DELIMITER); + inlongGroupId = inlongMetricArray[0]; + inlongStreamId = inlongMetricArray[1]; + String nodeId = inlongMetricArray[2]; + sinkMetricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, runtimeContext.getMetricGroup()); sinkMetricData.registerMetricsForDirtyBytes(); sinkMetricData.registerMetricsForDirtyRecords(); sinkMetricData.registerMetricsForNumBytesOut(); @@ -207,8 +207,8 @@ public class JdbcBatchingOutputFormat< if (auditImp != null) { auditImp.add( AUDIT_SORT_INPUT, - inLongGroupId, - inLongStreamId, + inlongGroupId, + inlongStreamId, System.currentTimeMillis(), 1, length); @@ -371,7 +371,7 @@ public class JdbcBatchingOutputFormat< private String[] fieldNames; private String[] keyFields; private int[] fieldTypes; - private String inLongMetric; + private String inlongMetric; private String auditHostAndPorts; private JdbcExecutionOptions.Builder executionOptionsBuilder = JdbcExecutionOptions.builder(); @@ -409,10 +409,10 @@ public class JdbcBatchingOutputFormat< } /** - * required, inLongMetric + * required, inlongMetric */ - public Builder setinLongMetric(String inLongMetric) { - this.inLongMetric = inLongMetric; + public Builder setinlongMetric(String inlongMetric) { + this.inlongMetric = inlongMetric; return this; } @@ -471,7 +471,7 @@ public class JdbcBatchingOutputFormat< new SimpleJdbcConnectionProvider(options), dml, executionOptionsBuilder.build(), - inLongMetric, + inlongMetric, auditHostAndPorts); } else { // warn: don't close over builder fields @@ -493,7 +493,7 @@ public class JdbcBatchingOutputFormat< Preconditions.checkArgument(tuple2.f0); return tuple2.f1; }, - inLongMetric, + inlongMetric, auditHostAndPorts); } } diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java index 4ef1ff2e3..4101913c9 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java @@ -58,14 +58,14 @@ class TableJdbcUpsertOutputFormat JdbcConnectionProvider connectionProvider, JdbcDmlOptions dmlOptions, JdbcExecutionOptions batchOptions, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { this( connectionProvider, batchOptions, ctx -> createUpsertRowExecutor(dmlOptions, ctx), ctx -> createDeleteExecutor(dmlOptions, ctx), - inLongMetric, + inlongMetric, auditHostAndPorts); } @@ -76,11 +76,11 @@ class TableJdbcUpsertOutputFormat StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> statementExecutorFactory, StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> deleteStatementExecutorFactory, - String inLongMetric, + String inlongMetric, String auditHostAndPorts ) { super(connectionProvider, batchOptions, statementExecutorFactory, tuple2 -> tuple2.f1, - inLongMetric, auditHostAndPorts); + inlongMetric, auditHostAndPorts); this.deleteStatementExecutorFactory = deleteStatementExecutorFactory; } diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java index 1e0369e2c..fa46f8436 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java @@ -64,7 +64,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable { private boolean appendMode; private TypeInformation<RowData> rowDataTypeInformation; private DataType[] fieldDataTypes; - private String inLongMetric; + private String inlongMetric; private String auditHostAndPorts; public JdbcDynamicOutputFormatBuilder() { @@ -236,8 +236,8 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable { return this; } - public JdbcDynamicOutputFormatBuilder setInLongMetric(String inLongMetric) { - this.inLongMetric = inLongMetric; + public JdbcDynamicOutputFormatBuilder setInLongMetric(String inlongMetric) { + this.inlongMetric = inlongMetric; return this; } @@ -264,7 +264,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable { createBufferReduceExecutor( dmlOptions, ctx, rowDataTypeInformation, logicalTypes), JdbcBatchingOutputFormat.RecordExtractor.identity(), - inLongMetric, + inlongMetric, auditHostAndPorts); } else { // append only query @@ -285,7 +285,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable { sql, rowDataTypeInformation), JdbcBatchingOutputFormat.RecordExtractor.identity(), - inLongMetric, + inlongMetric, auditHostAndPorts); } } diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java index 328f86a45..1efe0a91b 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java @@ -188,7 +188,7 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); boolean appendMode = config.get(SINK_APPEND_MODE); - String inLongMetric = config.getOptional(INLONG_METRIC).orElse(null); + String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); String auditHostAndPorts = config.getOptional(INLONG_AUDIT).orElse(null); return new JdbcDynamicTableSink( jdbcOptions, @@ -196,7 +196,7 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam getJdbcDmlOptions(jdbcOptions, physicalSchema), physicalSchema, appendMode, - inLongMetric, + inlongMetric, auditHostAndPorts); } diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java index 92e54e816..f6dd579b3 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java @@ -50,7 +50,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink { private final TableSchema tableSchema; private final String dialectName; - private final String inLongMetric; + private final String inlongMetric; private final String auditHostAndPorts; private final boolean appendMode; @@ -60,7 +60,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink { JdbcDmlOptions dmlOptions, TableSchema tableSchema, boolean appendMode, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { this.jdbcOptions = jdbcOptions; this.executionOptions = executionOptions; @@ -68,7 +68,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink { this.tableSchema = tableSchema; this.dialectName = dmlOptions.getDialect().dialectName(); this.appendMode = appendMode; - this.inLongMetric = inLongMetric; + this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; } @@ -101,7 +101,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink { builder.setJdbcExecutionOptions(executionOptions); builder.setRowDataTypeInfo(rowDataTypeInformation); builder.setFieldDataTypes(tableSchema.getFieldDataTypes()); - builder.setInLongMetric(inLongMetric); + builder.setInLongMetric(inlongMetric); builder.setAuditHostAndPorts(auditHostAndPorts); return SinkFunctionProvider.of( new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism()); @@ -110,7 +110,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink { @Override public DynamicTableSink copy() { return new JdbcDynamicTableSink(jdbcOptions, executionOptions, dmlOptions, - tableSchema, appendMode, inLongMetric, auditHostAndPorts); + tableSchema, appendMode, inlongMetric, auditHostAndPorts); } @Override @@ -132,13 +132,13 @@ public class JdbcDynamicTableSink implements DynamicTableSink { && Objects.equals(dmlOptions, that.dmlOptions) && Objects.equals(tableSchema, that.tableSchema) && Objects.equals(dialectName, that.dialectName) - && Objects.equals(inLongMetric, that.inLongMetric) + && Objects.equals(inlongMetric, that.inlongMetric) && Objects.equals(auditHostAndPorts, that.auditHostAndPorts); } @Override public int hashCode() { return Objects.hash(jdbcOptions, executionOptions, dmlOptions, tableSchema, dialectName, - inLongMetric, auditHostAndPorts); + inlongMetric, auditHostAndPorts); } } \ No newline at end of file diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java index 75b965340..b2efd2c3e 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java @@ -215,7 +215,7 @@ public class FlinkKafkaProducer<IN> /** * Metric for InLong */ - private final String inLongMetric; + private final String inlongMetric; /** * audit host and ports */ @@ -245,11 +245,11 @@ public class FlinkKafkaProducer<IN> /** * inLong groupId */ - private String inLongGroupId; + private String inlongGroupId; /** * inLong streamId */ - private String inLongStreamId; + private String inlongStreamId; /** * sink metric data */ @@ -609,7 +609,7 @@ public class FlinkKafkaProducer<IN> Properties producerConfig, FlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { this( defaultTopic, @@ -619,7 +619,7 @@ public class FlinkKafkaProducer<IN> producerConfig, semantic, kafkaProducersPoolSize, - inLongMetric, + inlongMetric, auditHostAndPorts); } @@ -659,13 +659,13 @@ public class FlinkKafkaProducer<IN> Properties producerConfig, FlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { super( new FlinkKafkaProducer.TransactionStateSerializer(), new FlinkKafkaProducer.ContextStateSerializer()); - this.inLongMetric = inLongMetric; + this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic is null"); @@ -905,12 +905,12 @@ public class FlinkKafkaProducer<IN> RuntimeContextInitializationContextAdapters.serializationAdapter( getRuntimeContext(), metricGroup -> metricGroup.addGroup("user"))); } - if (inLongMetric != null && !inLongMetric.isEmpty()) { - String[] inLongMetricArray = inLongMetric.split(DELIMITER); - inLongGroupId = inLongMetricArray[0]; - inLongStreamId = inLongMetricArray[1]; - String nodeId = inLongMetricArray[2]; - metricData = new SinkMetricData(inLongGroupId, inLongStreamId, nodeId, ctx.getMetricGroup()); + if (inlongMetric != null && !inlongMetric.isEmpty()) { + String[] inlongMetricArray = inlongMetric.split(DELIMITER); + inlongGroupId = inlongMetricArray[0]; + inlongStreamId = inlongMetricArray[1]; + String nodeId = inlongMetricArray[2]; + metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup()); metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter()); metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter()); metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter()); @@ -945,8 +945,8 @@ public class FlinkKafkaProducer<IN> if (auditImp != null) { auditImp.add( Constants.AUDIT_SORT_OUTPUT, - inLongGroupId, - inLongStreamId, + inlongGroupId, + inlongStreamId, System.currentTimeMillis(), 1, record.value().length); diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java index 387baee79..6f66a6e3d 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java @@ -139,7 +139,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada /** * Metric for inLong */ - private final String inLongMetric; + private final String inlongMetric; /** * audit host and ports */ @@ -172,7 +172,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada boolean upsertMode, SinkBufferFlushMode flushMode, @Nullable Integer parallelism, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { // Format attributes this.consumedDataType = @@ -200,7 +200,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada "Sink buffer flush is only supported in upsert-kafka."); } this.parallelism = parallelism; - this.inLongMetric = inLongMetric; + this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; } @@ -302,7 +302,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada upsertMode, flushMode, parallelism, - inLongMetric, + inlongMetric, auditHostAndPorts); copy.metadataKeys = metadataKeys; return copy; @@ -337,7 +337,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada && Objects.equals(upsertMode, that.upsertMode) && Objects.equals(flushMode, that.flushMode) && Objects.equals(parallelism, that.parallelism) - && Objects.equals(inLongMetric, that.inLongMetric) + && Objects.equals(inlongMetric, that.inlongMetric) && Objects.equals(auditHostAndPorts, that.auditHostAndPorts); } @@ -359,7 +359,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada upsertMode, flushMode, parallelism, - inLongMetric, + inlongMetric, auditHostAndPorts); } @@ -420,7 +420,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada properties, FlinkKafkaProducer.Semantic.valueOf(semantic.toString()), FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE, - inLongMetric, + inlongMetric, auditHostAndPorts); } diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java index ef8660280..17e92abda 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java @@ -67,11 +67,11 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro private SourceMetricData metricData; - private String inLongGroupId; + private String inlongGroupId; private String auditHostAndPorts; - private String inLongStreamId; + private String inlongStreamId; private transient AuditImp auditImp; @@ -85,7 +85,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro MetadataConverter[] metadataConverters, TypeInformation<RowData> producedTypeInfo, boolean upsertMode, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { if (upsertMode) { Preconditions.checkArgument( @@ -105,7 +105,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro upsertMode); this.producedTypeInfo = producedTypeInfo; this.upsertMode = upsertMode; - this.inlongMetric = inLongMetric; + this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; } @@ -117,11 +117,11 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro } valueDeserialization.open(context); if (inlongMetric != null && !inlongMetric.isEmpty()) { - String[] inLongMetricArray = inlongMetric.split(DELIMITER); - inLongGroupId = inLongMetricArray[0]; - inLongStreamId = inLongMetricArray[1]; - String nodeId = inLongMetricArray[2]; - metricData = new SourceMetricData(inLongGroupId, inLongStreamId, nodeId, context.getMetricGroup()); + String[] inlongMetricArray = inlongMetric.split(DELIMITER); + inlongGroupId = inlongMetricArray[0]; + inlongStreamId = inlongMetricArray[1]; + String nodeId = inlongMetricArray[2]; + metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, context.getMetricGroup()); metricData.registerMetricsForNumBytesIn(); metricData.registerMetricsForNumBytesInPerSecond(); metricData.registerMetricsForNumRecordsIn(); @@ -186,8 +186,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro if (auditImp != null) { auditImp.add( Constants.AUDIT_SORT_INPUT, - inLongGroupId, - inLongStreamId, + inlongGroupId, + inlongStreamId, System.currentTimeMillis(), 1, record.value().length); diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java index d49af2d00..f3580a8f1 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java @@ -140,7 +140,7 @@ public class KafkaDynamicSource /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */ protected final boolean upsertMode; - protected final String inLongMetric; + protected final String inlongMetric; protected final String auditHostAndPorts; @@ -158,7 +158,7 @@ public class KafkaDynamicSource Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis, boolean upsertMode, - final String inLongMetric, + final String inlongMetric, final String auditHostAndPorts) { // Format attributes this.physicalDataType = @@ -192,7 +192,7 @@ public class KafkaDynamicSource specificStartupOffsets, "Specific offsets must not be null."); this.startupTimestampMillis = startupTimestampMillis; this.upsertMode = upsertMode; - this.inLongMetric = inLongMetric; + this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; } @@ -214,7 +214,7 @@ public class KafkaDynamicSource final FlinkKafkaConsumer<RowData> kafkaConsumer = createKafkaConsumer(keyDeserialization, valueDeserialization, - producedTypeInfo, inLongMetric, auditHostAndPorts); + producedTypeInfo, inlongMetric, auditHostAndPorts); return SourceFunctionProvider.of(kafkaConsumer, false); } @@ -284,7 +284,7 @@ public class KafkaDynamicSource startupMode, specificStartupOffsets, startupTimestampMillis, - upsertMode, inLongMetric, auditHostAndPorts); + upsertMode, inlongMetric, auditHostAndPorts); copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java index 8efee1317..2127886d3 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java @@ -254,7 +254,7 @@ public class KafkaDynamicTableFactory final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); - final String inLongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); + final String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null); @@ -271,7 +271,7 @@ public class KafkaDynamicTableFactory startupOptions.startupMode, startupOptions.specificOffsets, startupOptions.startupTimestampMillis, - inLongMetric, + inlongMetric, auditHostAndPorts); } @@ -307,7 +307,7 @@ public class KafkaDynamicTableFactory final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null); - final String inLongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); + final String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null); @@ -324,7 +324,7 @@ public class KafkaDynamicTableFactory getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null), getSinkSemantic(tableOptions), parallelism, - inLongMetric, + inlongMetric, auditHostAndPorts); } @@ -343,7 +343,7 @@ public class KafkaDynamicTableFactory StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { return new KafkaDynamicSource( physicalDataType, @@ -359,7 +359,7 @@ public class KafkaDynamicTableFactory specificStartupOffsets, startupTimestampMillis, false, - inLongMetric, + inlongMetric, auditHostAndPorts); } @@ -376,7 +376,7 @@ public class KafkaDynamicTableFactory FlinkKafkaPartitioner<RowData> partitioner, KafkaSinkSemantic semantic, Integer parallelism, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { return new KafkaDynamicSink( physicalDataType, @@ -394,7 +394,7 @@ public class KafkaDynamicTableFactory false, SinkBufferFlushMode.DISABLED, parallelism, - inLongMetric, + inlongMetric, auditHostAndPorts); } } diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java index 6e803cfd9..e3fa28abc 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -256,7 +256,7 @@ public class UpsertKafkaDynamicTableFactory Duration batchInterval = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL); SinkBufferFlushMode flushMode = new SinkBufferFlushMode(batchSize, batchInterval.toMillis()); - String inLongMetric = tableOptions.get(INLONG_METRIC); + String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null); // use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}. @@ -277,7 +277,7 @@ public class UpsertKafkaDynamicTableFactory true, flushMode, parallelism, - inLongMetric, + inlongMetric, auditHostAndPorts); } diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java index 450990eb4..42bb53732 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java @@ -59,7 +59,6 @@ import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.inlong.audit.AuditImp; -import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; @@ -418,7 +417,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> metricGroup.gauge( "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime()); if (StringUtils.isNotEmpty(this.inlongMetric)) { - String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER); + String[] inlongMetricArray = inlongMetric.split(DELIMITER); String groupId = inlongMetricArray[0]; String streamId = inlongMetricArray[1]; String nodeId = inlongMetricArray[2]; diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java index d235bc860..919c227e4 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java @@ -224,7 +224,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zoneId) ? ZoneId.systemDefault() : ZoneId.of(zoneId); - final String inlongMetric = config.get(INLONG_METRIC); + final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); final String inlongAudit = config.get(INLONG_AUDIT); ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit); diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java index d5b3c676b..a7eebdbcd 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java @@ -48,7 +48,6 @@ import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.inlong.audit.AuditImp; -import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer; import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeFetcher; @@ -418,7 +417,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> metricGroup.gauge( "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime()); if (StringUtils.isNotEmpty(this.inlongMetric)) { - String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER); + String[] inlongMetricArray = inlongMetric.split(DELIMITER); String groupId = inlongMetricArray[0]; String streamId = inlongMetricArray[1]; String nodeId = inlongMetricArray[2]; diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java index c3c475e92..c8a70b8bd 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java @@ -122,7 +122,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX, JdbcUrlUtils.PROPERTIES_PREFIX); final ReadableConfig config = helper.getOptions(); - final String inlongMetric = config.get(INLONG_METRIC); + final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); final String inlongAudit = config.get(INLONG_AUDIT); ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit); final String hostname = config.get(HOSTNAME); diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java index 4098cc563..1458693fb 100644 --- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java @@ -59,7 +59,6 @@ import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.inlong.audit.AuditImp; -import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; @@ -418,7 +417,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> metricGroup.gauge( "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime()); if (StringUtils.isNotEmpty(this.inlongMetric)) { - String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER); + String[] inlongMetricArray = inlongMetric.split(DELIMITER); String groupId = inlongMetricArray[0]; String streamId = inlongMetricArray[1]; String nodeId = inlongMetricArray[2]; diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java index 830bfd078..0b4471ae3 100644 --- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java +++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java @@ -112,7 +112,7 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory { int port = config.get(PORT); StartupOptions startupOptions = getStartupOptions(config); ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); - String inlongMetric = config.get(INLONG_METRIC); + String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); String inlongAudit = config.get(INLONG_AUDIT); ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit); return new OracleTableSource( diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java index ece63db62..cd78621db 100644 --- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java @@ -59,7 +59,6 @@ import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.inlong.audit.AuditImp; -import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; @@ -418,7 +417,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> metricGroup.gauge( "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime()); if (StringUtils.isNotEmpty(this.inlongMetric)) { - String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER); + String[] inlongMetricArray = inlongMetric.split(DELIMITER); String groupId = inlongMetricArray[0]; String streamId = inlongMetricArray[1]; String nodeId = inlongMetricArray[2]; diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java index e13fc49ae..a886f8aef 100644 --- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java +++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java @@ -125,7 +125,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory { String pluginName = config.get(DECODING_PLUGIN_NAME); String slotName = config.get(SLOT_NAME); ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); - String inlongMetric = config.get(INLONG_METRIC); + String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); String inlongAudit = config.get(INLONG_AUDIT); ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit); diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java index 13d6e7c21..bf74dc8d2 100644 --- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java @@ -273,7 +273,9 @@ public class PulsarDynamicTableFactory implements String adminUrl = tableOptions.get(ADMIN_URL); String serviceUrl = tableOptions.get(SERVICE_URL); - String inlongMetric = tableOptions.get(INLONG_METRIC); + + String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); + String auditHostAndPorts = tableOptions.get(INLONG_AUDIT); return createPulsarTableSource( @@ -361,7 +363,7 @@ public class PulsarDynamicTableFactory implements String adminUrl, Properties properties, PulsarTableOptions.StartupOptions startupOptions, - String inLongMetric, + String inlongMetric, String auditHostAndPorts) { return new PulsarDynamicTableSource( physicalDataType, @@ -377,7 +379,7 @@ public class PulsarDynamicTableFactory implements properties, startupOptions, false, - inLongMetric, + inlongMetric, auditHostAndPorts); } } diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java index c709daa66..ff0361b16 100644 --- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java @@ -187,7 +187,7 @@ public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactor String serverUrl = tableOptions.get(SERVICE_URL); List<String> topics = tableOptions.get(TOPIC); String topicPattern = tableOptions.get(TOPIC_PATTERN); - String inlongMetric = tableOptions.get(INLONG_METRIC); + String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); String auditHostAndPorts = tableOptions.get(INLONG_AUDIT); return new PulsarDynamicTableSource( diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java index c1cea9dba..c28115846 100644 --- a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java @@ -217,11 +217,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> private SourceMetricData metricData; - private String inLongGroupId; + private String inlongGroupId; private String auditHostAndPorts; - private String inLongStreamId; + private String inlongStreamId; private transient AuditImp auditImp; @@ -413,11 +413,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> metricGroup.gauge( "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime()); if (StringUtils.isNotEmpty(this.inlongMetric)) { - String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER); - inLongGroupId = inlongMetricArray[0]; - inLongStreamId = inlongMetricArray[1]; + String[] inlongMetricArray = inlongMetric.split(DELIMITER); + inlongGroupId = inlongMetricArray[0]; + inlongStreamId = inlongMetricArray[1]; String nodeId = inlongMetricArray[2]; - metricData = new SourceMetricData(inLongGroupId, inLongStreamId, nodeId, metricGroup); + metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, metricGroup); metricData.registerMetricsForNumRecordsIn(); metricData.registerMetricsForNumBytesIn(); metricData.registerMetricsForNumBytesInPerSecond(); @@ -511,8 +511,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> if (auditImp != null) { auditImp.add( Constants.AUDIT_SORT_INPUT, - inLongGroupId, - inLongStreamId, + inlongGroupId, + inlongStreamId, System.currentTimeMillis(), 1, record.value().toString().getBytes(StandardCharsets.UTF_8).length); diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java index e6a7e5be6..39e6a642c 100644 --- a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java +++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java @@ -114,7 +114,7 @@ public class SqlServerTableFactory implements DynamicTableSourceFactory { String schemaName = config.get(SCHEMA_NAME); String databaseName = config.get(DATABASE_NAME); String tableName = config.get(TABLE_NAME); - String inlongMetric = config.get(INLONG_METRIC); + String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); String auditHostAndPorts = config.get(INLONG_AUDIT); ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE)); int port = config.get(PORT);