Hi again Maciek (and all), I just recently returned to start investigating this approach, however I can't seem to get the underlying invocation to work as I would normally expect. I'll try to share a bit more as what I currently have and perhaps I'm just missing something minor that someone may be able to spot.
To reiterate - what I'm attempting to do is take a stream of events flowing through, specific types of entities are extracted from these events into multiple side-outputs, and these side-outputs are passed to a sync that will write them via JDBC using logic specific to that entity. What I am aiming to achieve is being able to capture a single record that may be problematic and avoid a poison pill to throw onto a dead-letter queue (Kafka). I understand this would mean limiting batching sizes to a single record, however I'm assuming that the connections themselves could be pooled possibly to avoid opening up a new connection per call. If this isn't the case, is there a way to handle that (or would I need to implement my own sync). ``` val users = Tags.users parsedChangelogs .getSideOutput(users) .addSink(PostgresSink.fromEntityType(users.typeInfo, parameters)) .uid("sync-${users.id}-to-postgres") .name("sync-${users.id}-to-postgres") val addresses = Tags.addresses parsedChangelogs .getSideOutput(addresses) .addSink(PostgresSink.fromEntityType(addresses.typeInfo, parameters)) .uid("sync-${addresses.id}-to-postgres") .name("sync-${addresses.id}-to-postgres") ``` And the dynamic sink (that would associate a given entity to the necessary calls made to the database) looks a bit like this: ``` fun <T: Any> fromEntityType(typeInfo: TypeInformation<T>, parameters: ParameterTool): SinkFunction<T> { val metadata = getQueryMetadataFromType(typeInfo) return JdbcSink .sink( metadata.query, metadata.statement, getJdbcExecutionOptions(parameters), JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("org.postgresql.Driver") .withUrl(buildConnectionString(parameters)) .build(), ) } ``` I've tried several, a naive wrapper approach that I attempted looked something like this: ``` class DlqWrapper<T>(private val sink: SinkFunction<T>, val parameters: ParameterTool): SinkFunction<T> { private val logger = LoggerFactory.getLogger(DlqSink::class.java) private val dlqSink: SinkFunction<String> = ... override fun invoke(value: T, context: SinkFunction.Context) { try { sink.invoke(value, context) } catch (ex: Exception) { logger.error("Encountered sink exception. Sending message to dead letter queue. Value: $value. Exception: ${ex.message}") val payload = Gson().toJsonTree(value).asJsonObject payload.addProperty("exception", ex.message) dlqSink.invoke("$payload", context) } } } ``` After doing this, it doesn't look like when the invoke calls are made that it's actually attempting to perform the JDBC calls to insert the records into those sources. I'm not entirely sure if this is related specifically for how the JdbcSink is wrapped (via the GenericJdbcSink, etc.). I had seen several posts around involving the use of an InvocationHandler/Proxy, etc. but I'm not sure if that should be necessary for handling this type of functionality. Any ideas/thoughts/examples would be greatly appreciated. Thanks, Rion On 2021/07/14 15:47:18, Maciej Bryński <mac...@brynski.pl> wrote: > This is the idea. > Of course you need to wrap more functions like: open, close, > notifyCheckpointComplete, snapshotState, initializeState and > setRuntimeContext. > > The problem is that if you want to catch problematic record you need > to set batch size to 1, which gives very bad performance. > > Regards, > Maciek > > śr., 14 lip 2021 o 17:31 Rion Williams <rionmons...@gmail.com> napisał(a): > > > > Hi Maciej, > > > > Thanks for the quick response. I wasn't aware of the idea of using a > > SinkWrapper, but I'm not quite certain that it would suit this specific use > > case (as a SinkFunction / RichSinkFunction doesn't appear to support > > side-outputs). Essentially, what I'd hope to accomplish would be to pick up > > when a bad record could not be written to the sink and then offload that > > via a side-output somewhere else. > > > > Something like this, which is a very, very naive idea: > > > > class PostgresSinkWrapper<T>(private val sink: SinkFunction<T>): > > RichSinkFunction<T>() { > > private val logger = > > LoggerFactory.getLogger(PostgresSinkWrapper::class.java) > > > > override fun invoke(value: T, context: SinkFunction.Context) { > > try { > > sink.invoke(value, context) > > } > > catch (exception: Exception){ > > logger.error("Encountered a bad record, offloading to > > dead-letter-queue") > > // Offload bad record to DLQ > > } > > } > > } > > > > But I think that's basically the gist of it. I'm just not sure how I could > > go about doing this aside from perhaps writing a custom process function > > that wraps another sink function (or just completely rewriting my own > > JdbcSink?) > > > > Thanks, > > > > Rion > > > > > > > > > > > > On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński <mac...@brynski.pl> wrote: > >> > >> Hi Rion, > >> We have implemented such a solution with Sink Wrapper. > >> > >> > >> Regards, > >> Maciek > >> > >> śr., 14 lip 2021 o 16:21 Rion Williams <rionmons...@gmail.com> napisał(a): > >> > > >> > Hi all, > >> > > >> > Recently I've been encountering an issue where some external > >> > dependencies or process causes writes within my JDBCSink to fail (e.g. > >> > something is being inserted with an explicit constraint that never made > >> > it's way there). I'm trying to see if there's a pattern or > >> > recommendation for handling this similar to a dead-letter queue. > >> > > >> > Basically - if I experience a given number of failures (> max retry > >> > attempts) when writing to my JDBC destination, I'd like to take the > >> > record that was attempted and throw it into a Kafka topic or some other > >> > destination so that it can be evaluated at a later time. > >> > > >> > Are there any well defined patterns or recommended approaches around > >> > this? > >> > > >> > Thanks, > >> > > >> > Rion > >> > >> > >> > >> -- > >> Maciek Bryński > > > > -- > Maciek Bryński >