Hi Stephan,

###############################

about PK:

> (6) Different consistency guarantees with PK and without PK
> (7) Eventual Consistency Mode vs. Faster Checkpoints

Let's clarify one thing first:
For sink with PK, Flink SQL cannot generate complete changes (without
UPDATE_BEFORE).
- For databases like mysql, MySQL will generate the UPDATE_BEFORE,
that is, MySQL will query the old value corresponding to each data PK
before generating binlog. Mysql upsert sink does not rely on
checkpoint, during failing, Mysql will read binlog and recover the
database, then get eventual consistency.
- For Upsert-Kakfa, regardless of the consistency guarantee, the
resulting log may not contain a complete/correct UPDATE_BEFORE, so
there needs to be a normalized node downstream to remove the
duplication.

So for us, either option 1 or option 2, we don't have the ability to
query per PK at the moment, so we choose 2, which needs downstream de
duplicated records.
If we enable log transactions for cases with PK, the only advantage is
" transactional consistency vs eventual consistency". I don't know if
the price is reasonable. We are moving towards the fast checkpoint,
which is correct, but it will not arrive so soon. For storage, it also
requires a lot of work to adapt to this mode (This is not very easy,
and efficiency is often considered). This means that the current
default mode is "slow". We need to explain to users under what
scenario they can add option to speed up the pipeline.

> (5) Different formats for cases with PK and without PK

Jing Zhang also mentioned that we may need to expose the log format.
So I am +1 to unify log format with PK and without PK.
We can introduce an option to let users choose debezium-json or
debezium-avro-confluent in future. They are all public formats.

###############################

about Concurrent Write:

The story is that the streaming pipeline is running all the time, and
it is difficult to control and modify it.
However, in many cases, we need to recalculate the historical data in
the data warehouse. For example, there are some data problems in the
partition 10 days ago. I need to recalculate the whole pipeline. In
this case, we need a batch job pipeline instead of correcting the root
table of the streaming pipeline. Because the streaming job can not
recalculate the data 10 days ago.
- The streaming job state may have expired.
- Computing is not as efficient as batch jobs.

Conceptually, all changes will generate logs, but in this case, we
need to turn off log generation, so this is "Change Tracking".
So yes. it just skip writing to the change log.

Meanwhile, the streaming job is still running, so two jobs are
committing snapshots at the same time.

The lock is just for multiple snapshot commits. For example, the
insert statements and compact will produce new snapshots.

> What exactly is the optimistic locking behavior

Optimism is reflected in the deletion of files. If the file to be
deleted is found missing when committing, it will fail instead of
locking at the beginning.

> what is the lock scoped to? a single query (via jobID)?

Lock scope is database and table name.

> what happens if a system that holds (like an entry in the Hive Metastore 
> Database) a lock crashes and is manually restarted as a new job (from the 
> latest checkpoint)? does that need manual admin intervention to release the 
> lock (manually update the HMS Database)?

If the client crashes, the lock it holds will also expire with the
heartbeat timeout.

Best,
Jingsong

Reply via email to