GitHub user aryandwivedi007 edited a discussion: Inconsistent update behavior 
with R2DBC and PostgreSQL: returns Successfully updated campaign 
0197bf4a-1ab7-72b3-92ef-ebe128bdb15e rows affected 0: MDC: {}

### Summary

We're encountering an issue where a reactive flow using R2DBC + PostgreSQL 
intermittently fails to find a record that was just inserted. Specifically, 
update is success but returns update query affects 0 rows, even though the 
insert has already occurred.
This issue occurs on 3 node kubernetes env locally the count is updating just 
fine.


**_And this does not occur every times some times it updates count for coming 
event just right but sometime it cause issue_**

This happens in a low-load environment (only 1 message), with no replicas, and 
appears to be a visibility or subscription timing issue in the reactive chain.

I am beginner to Apache Pekko and R2DBC so please guide me what can be the 
issue here.

---

### Environment

- Database: PostgreSQL (AWS RDS)
- Connection: R2DBC (r2dbc-postgresql driver)
- Framework: Apache Pekko (reactive projections and flow)
- Persistence: pekko-persistence-r2dbc
- Pool Config: max-size = 100, acquire-timeout = 5s, connect-timeout = 3s
- Load: Single message tested (not concurrency related)

---

### Observed Behavior

1. Insert is triggered via projection (confirmed by logs).
2. After ~500–900ms delay, provider response is received and update flow is 
triggered.
3. `findById(campaignId)` returns `Optional.empty`.
4. `UPDATE campaigns SET ... WHERE id = $2` affects 0 rows.

Relevant log:
```text
Executing update: 
PostgresqlStatement{bindings=[Binding{parameters=[Parameter{format=FORMAT_BINARY,
 type=701, value=MonoSupplier}, Parameter{format=FORMAT_TEXT, type=1043, 
value=MonoSupplier}]}], 
context=ConnectionContext{client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137,
 codecs=io.r2dbc.postgresql.codec.DefaultCodecs@714d4d6, 
connection=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137,
 codecs=io.r2dbc.postgresql.codec.DefaultCodecs@714d4d6}, 
configuration=PostgresqlConnectionConfiguration{applicationName='r2dbc-postgresql',
 autodetectExtensions='true', compatibilityMode=false, connectTimeout=PT5S, 
errorResponseLogLevel=DEBUG, database='unwaveringmedia_platform_prod', 
extensions=[], 
fetchSize=io.r2dbc.postgresql.PostgresqlConnectionConfiguration$Builder$$Lambda/0x00007f0cdea42348@234a08ea,
 forceBinary='true', 
host='unwavering-media-marketing-platform-db.cl248qc4ol90.us-east-1.rds.amazonaws.com',
 lockWaitTimeout='null, loopResources='null'
 , noticeLogLevel='DEBUG', options='{}', password='**********************', 
port=5432, preferAttachedBuffers=true, socket=null, statementTimeout=null, 
tcpKeepAlive=false, tcpNoDelay=true, username='boban'}, 
portalNameSupplier=io.r2dbc.postgresql.DefaultPortalNameSupplier@28e388a9, 
statementCache=LimitedStatementCache{cache={io.r2dbc.postgresql.BoundedStatementCache$CacheKey@9418b150=S_0,
 io.r2dbc.postgresql.BoundedStatementCache$CacheKey@589270a3=S_1, 
io.r2dbc.postgresql.BoundedStatementCache$CacheKey@f6d8b11a=S_2, 
io.r2dbc.postgresql.BoundedStatementCache$CacheKey@de50be46=S_3}, counter=4, 
client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137, limit=5000}}, 
sql='UPDATE public.campaigns SET message_sent_count = message_sent_count + 1, 
total_cost = total_cost + $1 WHERE id = $2;', generatedColumns=null}
2025-06-30 05:23:18.924 [INFO ] 
[com.unwaveringmedia.platform.message.projection.updater.campaign.CampaignTableStatisticsUpdaterProjectionHandler]
 [] [ForkJoinPool.commonPool-worker-5] - Campaign exists with id Optional.empty 
MDC: {}
2025-06-30 05:23:18.925 [INFO ] 
[com.unwaveringmedia.platform.message.projection.updater.campaign.CampaignTableStatisticsUpdaterProjectionHandler]
 [] [ForkJoinPool.commonPool-worker-5] - Successfully updated campaign 
0197bf4a-1ab7-72b3-92ef-ebe128bdb15e rows affected 0:  MDC: {}

**```
public class CampaignTableStatisticsUpdaterProjectionHandler extends 
R2dbcHandler<EventEnvelope<OutboundCampaignMessageEntity.Event>> {
    @Override
    public CompletionStage<Done> process(R2dbcSession session, 
EventEnvelope<OutboundCampaignMessageEntity.Event> eventEventEnvelope) throws 
Exception {
        
if(eventEventEnvelope.event().getTimestamp().isBefore(ServiceUtils.BEFORE_INCIDENT)){
            //skip events before incident
            return CompletableFuture.completedFuture(Done.done());
        }
        log.debug("Processing event: {} , sequence: {}", 
eventEventEnvelope.event(), eventEventEnvelope.sequenceNr());
        if(eventEventEnvelope.event() instanceof 
OutboundCampaignMessageEntity.MessageCreated evt) {
            var statement = session
                    .createStatement("UPDATE public.campaigns SET 
message_created_count = message_created_count + 1 WHERE id = $1;")
                    .bind(0, evt.getCampaignId());
            return session.updateOne(statement).thenApply(rowsUpdated-> 
Done.done());
        } else if (eventEventEnvelope.event() instanceof 
OutboundCampaignMessageEntity.MessageSent evt) {
            var statement = session
                    .createStatement("UPDATE public.campaigns SET 
message_sent_count = message_sent_count + 1, " +
                            "total_cost = total_cost + $1 WHERE id = $2;")
                    .bind(0, evt.getMessageCost())
                    .bind(1, evt.getCampaignId());

            return session.updateOne(statement)
                    .thenApply(rowsUpdated -> {
                        if (rowsUpdated == 0) {
                            System.err.println("WARNING: 0 rows updated for 
campaign " + evt.getCampaignId());
                        }
                        return Done.done();
                    });
        } else if(eventEventEnvelope.event() instanceof 
OutboundCampaignMessageEntity.MessageSendingFailed evt) {
            var statement = session
                    .createStatement("UPDATE public.campaigns SET 
message_sending_failed_count = message_sending_failed_count + 1 WHERE id = $1;")
                    .bind(0, evt.getCampaignId());
            return session.updateOne(statement).thenApply(rowsUpdated-> 
Done.done());
        } else if(eventEventEnvelope.event() instanceof 
OutboundCampaignMessageEntity.ClickReceived evt) {
            if(evt.getDeviceInfo() != null){
                if(evt.getDeviceInfo().isMobile() && evt.isNewContact()){
                    var statement = session
                            .createStatement("UPDATE public.campaigns SET 
click_received_count = click_received_count + 1, mobile_click_received_count = 
mobile_click_received_count + 1 WHERE id = $1;")
                            .bind(0, evt.getCampaignId());
                    return session.updateOne(statement).thenApply(rowsUpdated-> 
Done.done());
                } else if(evt.getDeviceInfo().isTablet() && evt.isNewContact()){
                    var statement = session
                            .createStatement("UPDATE public.campaigns SET 
click_received_count = click_received_count + 1, tablet_click_received_count = 
tablet_click_received_count + 1 WHERE id = $1;")
                            .bind(0, evt.getCampaignId());
                    return session.updateOne(statement).thenApply(rowsUpdated-> 
Done.done());
                } else if(evt.getDeviceInfo().isPc() && evt.isNewContact()) {
                    var statement = session
                            .createStatement("UPDATE public.campaigns SET 
click_received_count = click_received_count + 1, pc_click_received_count = 
pc_click_received_count + 1 WHERE id = $1;")
                            .bind(0, evt.getCampaignId());
                    return session.updateOne(statement).thenApply(rowsUpdated-> 
Done.done());
                }
            } 
        return CompletableFuture.completedFuture(Done.done());
    }
}
```**





GitHub link: https://github.com/apache/pekko-persistence-r2dbc/discussions/225

----
This is an automatically sent email for notifications@pekko.apache.org.
To unsubscribe, please send an email to: 
notifications-unsubscr...@pekko.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org
For additional commands, e-mail: notifications-h...@pekko.apache.org

Reply via email to