Hi  everyone,

In a streaming Job, we are using the JdbcSink.exactlyOnceSink() as described
on
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/d
atastream/jdbc/#jdbcsinkexactlyoncesink
<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/
datastream/jdbc/> .

The query we're using is an UPSERT query. I cannot post the exact query, but
in essence it looks something like this:

INSERT INTO edges(edge_id, count) VALUES(?, ?) ON CONFLICT (edge_id) DO
UPDATE SET count = edges.count + EXCLUDED.count;

Of course, the table has a Primary Key on `edge_id`.

Batching is enabled with 250 UPSERTs per batch. Additionally, we set
withMaxRetries() to 0 on the JdbcExecutionOptions. 
On the JdbcExactlyOnceOptions, we set withTransactionPerConnection() to
true, as per documentation.

We're using unaligned checkpointing. Once a checkpoint is triggered, the
Sink will prepare a transaction in the database, I can see that in the
pg_locks.
However, that prepared transaction is never committed. Instead, the Sink
function will already be in the invoke() for the next stream elements. Now,
since these new elements happen to conflict on keys from the previous (still
uncommitted) transaction, it seems like the new transaction (new connection
due to Postgres XA) is not able to acquire a 'ShareLock' lock - I see the
process is permanently stuck in (wait_event_type=Lock,
wait_event="transactionid") in pg_stat_activity.

Why is that first transaction not committed before new statements are
executed? 

Am I taking a wrong assumption that the Sink would work with UPSERTs or is
this a potential bug in the XA sink function?


Thanks in advance.

Best,
Pascal.


PS: We're using Flink 1.16.2. The JDBC connector dependency version is
3.1.1-1.16.

Reply via email to