This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new abc0175  [HUDI-1290] [RFC-39] Deltastreamer avro source for Debezium 
CDC (#4048)
abc0175 is described below

commit abc0175cf7a3577ac5078f6a4cd43569105ccf7e
Author: rmahindra123 <[email protected]>
AuthorDate: Wed Nov 24 17:31:34 2021 -0800

    [HUDI-1290] [RFC-39] Deltastreamer avro source for Debezium CDC (#4048)
    
    * Add RFC entry for deltastreamer source for debezium
    
    * Add RFC for debezium source
    
    * Add RFC for debezium source
    
    * Add RFC for debezium source
    
    * fix hyperlink issue and rebase
    
    * Update progress
    
    Co-authored-by: Rajesh Mahindra <[email protected]>
---
 rfc/README.md        |   2 +-
 rfc/rfc-39/arch.png  | Bin 0 -> 321790 bytes
 rfc/rfc-39/rfc-39.md | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 106 insertions(+), 1 deletion(-)

diff --git a/rfc/README.md b/rfc/README.md
index 6fe6827..c61dc24 100644
--- a/rfc/README.md
+++ b/rfc/README.md
@@ -62,5 +62,5 @@ The list of all RFCs can be found here.
 | 36 | [HUDI Metastore 
Server](https://cwiki.apache.org/confluence/display/HUDI/%5BWIP%5D+RFC-36%3A+HUDI+Metastore+Server)
 | `UNDER REVIEW` |
 | 37 | [Hudi metadata based bloom index] | `UNDER REVIEW` |
 | 38 | [Spark Datasource V2 Integration] | `UNDER REVIEW` |
-| 39 | [Incremental source for Debezium] | `UNDER REVIEW` |
+| 39 | [Incremental source for Debezium](./rfc-39/rfc-39.md) | `IN PROGRESS` |
 | 40 | [Hudi Connector for Trino] | `UNDER REVIEW` |
diff --git a/rfc/rfc-39/arch.png b/rfc/rfc-39/arch.png
new file mode 100644
index 0000000..8864f43
Binary files /dev/null and b/rfc/rfc-39/arch.png differ
diff --git a/rfc/rfc-39/rfc-39.md b/rfc/rfc-39/rfc-39.md
new file mode 100644
index 0000000..9dc6335
--- /dev/null
+++ b/rfc/rfc-39/rfc-39.md
@@ -0,0 +1,105 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-[39]: Deltastreamer avro-based source for Debezium CDC
+
+
+
+## Proposers
+- @rmahindra
+- @vbalaji
+
+## Approvers
+ - @vinoth
+
+## Status
+
+JIRA: 
[https://issues.apache.org/jira/browse/HUDI-1290](https://issues.apache.org/jira/browse/HUDI-1290)
+
+## Abstract
+
+We intend to implement a source for ingesting Debezium Change Data Capture 
(CDC) logs into Deltastreamer/ Hudi. With this capability, we can continuously 
capture row-level changes that insert, update and delete records that were 
committed to a database. While debezium support multiple databases, we will 
focus on postgres for the RFC. At the end, we will explain how it can be 
extended to support Mysql.
+
+## Background
+The architecture of Debezium is shown in figure below. 
[Debezium](https://debezium.io/documentation/reference/stable/connectors/postgresql.html)
 is implemented as a Kafka connect source, that reads change logs from 
databases ([logical 
decoding](https://www.postgresql.org/docs/current/logicaldecoding-explanation.html)
 in PostgreSQL and `binlog` in MySQL) and ingests them into a kafka topic. 
Debezium uses a single kafka topic per table in the source database.
+
+
+
+The first time it connects to a PostgreSQL server or cluster, the connector 
takes a consistent snapshot of all schemas. After that snapshot is complete, 
the connector continuously captures row-level changes that insert, update, and 
deletes. The connector generates data change event records and streams them to 
Kafka topics. For each table, the default behavior is that the connector 
streams all generated events to a separate Kafka topic for that table. 
Applications and services consume dat [...]
+
+
+
+The schema of the events for debezium consists of a before, after, source, op 
and ts\_ms. The `before` field contains the values of the row before the 
operation took place. And `after` field contains the values of the original 
database row after the operation took place. The operation is specified in `op` 
field, which can be either `r` (initial snapshot), `c` (insert), `u` (update) 
or `d` (delete). In case of insert, the `before` field will be null which for a 
delete, the `after` field w [...]
+
+
+
+There are other ways to deploy Debezium, such as Debezium Server, that can 
write events to other stream systems, such as pulsar, kenisis, google pub/sub 
etc. However, this RFC focuses on the debezium source in deltastreamer that 
will assume Kafka as the source for the change log events.
+
+![](arch.png)
+
+## Implementation
+
+As shown in the figure above, in order to ingest the rows from the database 
into hudi and maintain the change operations done on database in hudi, we need 
to perform 2 steps: (I) We have to bootstrap the initial data from the database 
and (ii) incrementally consume the change logs to insert or update the records 
into hudi
+
+
+
+To bootstrap the initial rows from the database, we can either do a full fetch 
directly from the database using JDBC, or alike and then incrementally pull the 
change logs from the appropriate checkpoint. The other option is to let the 
debezium connector perform an initial _consistent snapshot_ of the database up 
to a specific checkpoint. Subsequently, the debezium publishes change logs over 
the initial snapshot, that can be read incrementally by the deltastreamer.
+
+
+
+To incrementally ingest the changelogs from the debezium connector, we propose 
to implement a few classes.`DebeziumAvroSource.java` implements the source 
class that reads the kafka change log events. We reuse `KafkaOffsetGen.java` 
that helps reading events from Kafka incrementally. The 
`DebeziumAvroSource.java` pulls the latest schema from the schema registry, 
applies the schema to a batch of incoming avro records of the change logs, and 
transforms the records to extract the actual field [...]
+
+Since we change the schema of the incoming record in the source class, we have 
to provide a schema for the target record. We propose to implement 
DebeziumAvroSource.java as a RowSource and allow spark to infer the schema of 
the transformed record. An alternative approach is to implement a 
DebeziumSchemaRegistryProvider.java class that extends the current 
SchemaRegistryProvider.java, and implements the method getTargetSchema . It 
constructs the target schema from the original schema by in [...]
+
+
+To ensure proper de-dup, merging, and hard deletes of the records, we 
implement a custom AvroPayload class for debeizum: `DebeziumAvroPayload.java.` 
During writes, we check if the `op` field of the record is `d` , we return an 
empty payload to ensure the record is deleted in storage. In the case of 
`preCombine` or `combineAndGetUpdateValue` (merge handling of records), we 
return the existing stored record if the `LSN` (in case of PostgresSQL) of the 
existing record is higher than the new [...]
+
+###Handling merges with Postgres Toast Columns
+
+[TOAST](https://www.postgresql.org/docs/current/storage-toast.html) (The 
Oversized-Attribute Storage Technique) is a mechanism in Postgres which stores 
large column values in multiple physical rows, circumventing the page size 
limit of 8 KB.
+
+
+
+Typically, TOAST storage is transparent to the user. There’s an exception, 
though: if a table row has changed, any _unchanged_ values that were stored 
using the TOAST mechanism are not included in the message that Debezium 
receives from the database, unless they are part of the table’s [replica 
identity](https://debezium.io/documentation/reference/0.10/connectors/postgresql.html#replica-identity).
 Consequently, such unchanged TOAST column value will not be contained in 
Debezium data chan [...]
+
+
+
+During merging, we check for toast columns in the insert records, and if 
present, we update their value using the values from the current record on disk.
+
+
+
+
+### Deltastreamer configuration
+
+To run the deltastreamer, we need to configure the following:
+
+1.  The `source ordering field` should be set to `_source_lsn.`
+2.  Configure the schema registry server that is used by the Debezium 
connector.
+3.  Record Key(s) should be the primary key(s) of the database and can be 
obtained from the schema registry, since debezium uses the primary key(s) as 
the key for the kafka topic.
+4.  Configure the deltastreamer to use the DebeziumSource and 
DebeziumAvroPayload classes for the source and payload classes respectively.
+
+### Current Limitations
+
+With the current constraints within Hudi, we discuss a few limitations of the 
current implementation for CDC. Consider a case where we have the following 
change log events for a single row/ record in the following order from kafka: 
Insert (LSN=1), Delete (LSN=3), Updated (LSN=2). If all these events are 
ingested in the same batch, then dedup will only pick the second event, since 
it has the highest LSN. However, if the second and third event are ingested in 
different batches, then the se [...]
+
+
+## Rollout/Adoption Plan
+
+This is a new feature specific to Debezium CDC use case, and should not impact 
existing jobs or tables.
+
+## Test Plan
+
+We plan to test the Debezium source by setting up a AWS RDS instance of 
PostgresSQL, debezium connector using strimzi operator on k8s and a AWS MSK 
kafka cluster. We will test for correctness by performing SQL based DDL 
operations, such as insert, update and deletions on multiple records/ rows in 
the Postgres DB, and query the hudi table to validate that the operations took 
effect on the records in the hudi table.
\ No newline at end of file

Reply via email to