GitHub user aryandwivedi007 edited a discussion: Inconsistent update behavior with R2DBC and PostgreSQL: returns Successfully updated campaign 0197bf4a-1ab7-72b3-92ef-ebe128bdb15e rows affected 0: MDC: {}
### Summary We're encountering an issue where a reactive flow using R2DBC + PostgreSQL intermittently fails to find a record that was just inserted. Specifically, update is success but returns update query affects 0 rows, even though the insert has already occurred. This issue occurs on 3 node kubernetes env locally the count is updating just fine. **_And this does not occur every times some times it updates count for coming event just right but sometime it cause issue_** This happens in a low-load environment (only 1 message), with no replicas, and appears to be a visibility or subscription timing issue in the reactive chain. I am beginner to Apache Pekko and R2DBC so please guide me what can be the issue here. --- ### Environment - Database: PostgreSQL (AWS RDS) - Connection: R2DBC (r2dbc-postgresql driver) - Framework: Apache Pekko (reactive projections and flow) - Persistence: pekko-persistence-r2dbc - Pool Config: max-size = 100, acquire-timeout = 5s, connect-timeout = 3s - Load: Single message tested (not concurrency related) --- ### Observed Behavior 1. Insert is triggered via projection (confirmed by logs). 2. After ~500–900ms delay, provider response is received and update flow is triggered. 3. `findById(campaignId)` returns `Optional.empty`. 4. `UPDATE campaigns SET ... WHERE id = $2` affects 0 rows. Relevant log: ```text Executing update: PostgresqlStatement{bindings=[Binding{parameters=[Parameter{format=FORMAT_BINARY, type=701, value=MonoSupplier}, Parameter{format=FORMAT_TEXT, type=1043, value=MonoSupplier}]}], context=ConnectionContext{client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@714d4d6, connection=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@714d4d6}, configuration=PostgresqlConnectionConfiguration{applicationName='r2dbc-postgresql', autodetectExtensions='true', compatibilityMode=false, connectTimeout=PT5S, errorResponseLogLevel=DEBUG, database='unwaveringmedia_platform_prod', extensions=[], fetchSize=io.r2dbc.postgresql.PostgresqlConnectionConfiguration$Builder$$Lambda/0x00007f0cdea42348@234a08ea, forceBinary='true', host='unwavering-media-marketing-platform-db.cl248qc4ol90.us-east-1.rds.amazonaws.com', lockWaitTimeout='null, loopResources='null' , noticeLogLevel='DEBUG', options='{}', password='**********************', port=5432, preferAttachedBuffers=true, socket=null, statementTimeout=null, tcpKeepAlive=false, tcpNoDelay=true, username='boban'}, portalNameSupplier=io.r2dbc.postgresql.DefaultPortalNameSupplier@28e388a9, statementCache=LimitedStatementCache{cache={io.r2dbc.postgresql.BoundedStatementCache$CacheKey@9418b150=S_0, io.r2dbc.postgresql.BoundedStatementCache$CacheKey@589270a3=S_1, io.r2dbc.postgresql.BoundedStatementCache$CacheKey@f6d8b11a=S_2, io.r2dbc.postgresql.BoundedStatementCache$CacheKey@de50be46=S_3}, counter=4, client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137, limit=5000}}, sql='UPDATE public.campaigns SET message_sent_count = message_sent_count + 1, total_cost = total_cost + $1 WHERE id = $2;', generatedColumns=null} 2025-06-30 05:23:18.924 [INFO ] [com.unwaveringmedia.platform.message.projection.updater.campaign.CampaignTableStatisticsUpdaterProjectionHandler] [] [ForkJoinPool.commonPool-worker-5] - Campaign exists with id Optional.empty MDC: {} 2025-06-30 05:23:18.925 [INFO ] [com.unwaveringmedia.platform.message.projection.updater.campaign.CampaignTableStatisticsUpdaterProjectionHandler] [] [ForkJoinPool.commonPool-worker-5] - Successfully updated campaign 0197bf4a-1ab7-72b3-92ef-ebe128bdb15e rows affected 0: MDC: {} **``` public class CampaignTableStatisticsUpdaterProjectionHandler extends R2dbcHandler<EventEnvelope<OutboundCampaignMessageEntity.Event>> { @Override public CompletionStage<Done> process(R2dbcSession session, EventEnvelope<OutboundCampaignMessageEntity.Event> eventEventEnvelope) throws Exception { if(eventEventEnvelope.event().getTimestamp().isBefore(ServiceUtils.BEFORE_INCIDENT)){ //skip events before incident return CompletableFuture.completedFuture(Done.done()); } log.debug("Processing event: {} , sequence: {}", eventEventEnvelope.event(), eventEventEnvelope.sequenceNr()); if(eventEventEnvelope.event() instanceof OutboundCampaignMessageEntity.MessageCreated evt) { var statement = session .createStatement("UPDATE public.campaigns SET message_created_count = message_created_count + 1 WHERE id = $1;") .bind(0, evt.getCampaignId()); return session.updateOne(statement).thenApply(rowsUpdated-> Done.done()); } else if (eventEventEnvelope.event() instanceof OutboundCampaignMessageEntity.MessageSent evt) { var statement = session .createStatement("UPDATE public.campaigns SET message_sent_count = message_sent_count + 1, " + "total_cost = total_cost + $1 WHERE id = $2;") .bind(0, evt.getMessageCost()) .bind(1, evt.getCampaignId()); return session.updateOne(statement) .thenApply(rowsUpdated -> { if (rowsUpdated == 0) { System.err.println("WARNING: 0 rows updated for campaign " + evt.getCampaignId()); } return Done.done(); }); } else if(eventEventEnvelope.event() instanceof OutboundCampaignMessageEntity.MessageSendingFailed evt) { var statement = session .createStatement("UPDATE public.campaigns SET message_sending_failed_count = message_sending_failed_count + 1 WHERE id = $1;") .bind(0, evt.getCampaignId()); return session.updateOne(statement).thenApply(rowsUpdated-> Done.done()); } else if(eventEventEnvelope.event() instanceof OutboundCampaignMessageEntity.ClickReceived evt) { if(evt.getDeviceInfo() != null){ if(evt.getDeviceInfo().isMobile() && evt.isNewContact()){ var statement = session .createStatement("UPDATE public.campaigns SET click_received_count = click_received_count + 1, mobile_click_received_count = mobile_click_received_count + 1 WHERE id = $1;") .bind(0, evt.getCampaignId()); return session.updateOne(statement).thenApply(rowsUpdated-> Done.done()); } else if(evt.getDeviceInfo().isTablet() && evt.isNewContact()){ var statement = session .createStatement("UPDATE public.campaigns SET click_received_count = click_received_count + 1, tablet_click_received_count = tablet_click_received_count + 1 WHERE id = $1;") .bind(0, evt.getCampaignId()); return session.updateOne(statement).thenApply(rowsUpdated-> Done.done()); } else if(evt.getDeviceInfo().isPc() && evt.isNewContact()) { var statement = session .createStatement("UPDATE public.campaigns SET click_received_count = click_received_count + 1, pc_click_received_count = pc_click_received_count + 1 WHERE id = $1;") .bind(0, evt.getCampaignId()); return session.updateOne(statement).thenApply(rowsUpdated-> Done.done()); } } return CompletableFuture.completedFuture(Done.done()); } } ```** GitHub link: https://github.com/apache/pekko-persistence-r2dbc/discussions/225 ---- This is an automatically sent email for notifications@pekko.apache.org. To unsubscribe, please send an email to: notifications-unsubscr...@pekko.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org