fsk119 commented on a change in pull request #14017: URL: https://github.com/apache/flink/pull/14017#discussion_r524060554
########## File path: docs/dev/table/connectors/upsert-kafka.md ########## @@ -0,0 +1,204 @@ +--- +title: "Upsert Kafka SQL Connector" +nav-title: Upsert Kafka +nav-parent_id: sql-connectors +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<span class="label label-primary">Scan Source: Unbounded</span> +<span class="label label-primary">Sink: Streaming Upsert Mode</span> + +* This will be replaced by the TOC +{:toc} + +The Upsert Kafka connector allows for reading data from and writing data into Kafka topics in the upsert fashion. + +As a source, the upsert-kafka connector produces a changelog stream, where each data record represents +an update or delete event. More precisely, the value in a data record is interpreted as an UPDATE of +the last value for the same key, if any (if a corresponding key doesn’t exist yet, the update will +be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted +as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null +values are interpreted in a special way: a record with a null value represents a “DELETE”. + +As a sink, the upsert-kafka connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER +data as normal Kafka messages value, and write DELETE data as Kafka messages with null values +(indicate tombstone for the key). Flink will guarantee the message ordering on the primary key by +partition data on the values of the primary key columns, so the update/deletion messages on the same +key will fall into the same partition. + +Dependencies +------------ + +In order to set up the upsert-kafka connector, the following table provide dependency information for +both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. + +{% assign connector = site.data.sql-connectors['upsert-kafka'] %} +{% include sql-connector-download-table.html + connector=connector +%} + +Full Example +---------------- + +The example below shows how to create and use an Upsert Kafka table: + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE pageviews_per_region ( + region STRING, + pv BIGINT, + uv BIGINT, + PRIMARY KEY region NOT ENFORCED +) WITH ( + 'connector' = 'upsert-kafka', + 'topic' = 'pageviews_per_region', + 'properties.bootstrap.servers' = '...', + 'key.format' = 'csv', Review comment: 我会检查下的 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org