Hi again Maciek (and all),

I just recently returned to start investigating this approach, however I can't 
seem to get the underlying invocation to work as I would normally expect. I'll 
try to share a bit more as what I currently have and perhaps I'm just missing 
something minor that someone may be able to spot.

To reiterate - what I'm attempting to do is take a stream of events flowing 
through, specific types of entities are extracted from these events into 
multiple side-outputs, and these side-outputs are passed to a sync that will 
write them via JDBC using logic specific to that entity. What I am aiming to 
achieve is being able to capture a single record that may be problematic and 
avoid a poison pill to throw onto a dead-letter queue (Kafka). I understand 
this would mean limiting batching sizes to a single record, however I'm 
assuming that the connections themselves could be pooled possibly to avoid 
opening up a new connection per call. If this isn't the case, is there a way to 
handle that (or would I need to implement my own sync).

```
val users = Tags.users
        parsedChangelogs
            .getSideOutput(users)
            .addSink(PostgresSink.fromEntityType(users.typeInfo, parameters))
            .uid("sync-${users.id}-to-postgres")
            .name("sync-${users.id}-to-postgres")

val addresses = Tags.addresses
        parsedChangelogs
            .getSideOutput(addresses)
            .addSink(PostgresSink.fromEntityType(addresses.typeInfo, 
parameters))
            .uid("sync-${addresses.id}-to-postgres")
            .name("sync-${addresses.id}-to-postgres")
```

And the dynamic sink (that would associate a given entity to the necessary 
calls made to the database) looks a bit like this:

```
fun <T: Any> fromEntityType(typeInfo: TypeInformation<T>, parameters: 
ParameterTool): SinkFunction<T> {
        val metadata = getQueryMetadataFromType(typeInfo)

        return JdbcSink
            .sink(
                metadata.query,
                metadata.statement,
                getJdbcExecutionOptions(parameters),
                JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withDriverName("org.postgresql.Driver")
                    .withUrl(buildConnectionString(parameters))
                    .build(),
            )
    }
```

I've tried several, a naive wrapper approach that I attempted looked something 
like this:

```
class DlqWrapper<T>(private val sink: SinkFunction<T>, val parameters: 
ParameterTool): SinkFunction<T> {
    private val logger = LoggerFactory.getLogger(DlqSink::class.java)
    private val dlqSink: SinkFunction<String> = ...

    override fun invoke(value: T, context: SinkFunction.Context) {
        try {
            sink.invoke(value, context)
        }
        catch (ex: Exception) {
            logger.error("Encountered sink exception. Sending message to dead 
letter queue. Value: $value. Exception: ${ex.message}")
            val payload = Gson().toJsonTree(value).asJsonObject
            payload.addProperty("exception", ex.message)

            dlqSink.invoke("$payload", context)
        }
    }
}
```

After doing this, it doesn't look like when the invoke calls are made that it's 
actually attempting to perform the JDBC calls to insert the records into those 
sources. I'm not entirely sure if this is related specifically for how the 
JdbcSink is wrapped (via the GenericJdbcSink, etc.).

I had seen several posts around involving the use of an 
InvocationHandler/Proxy, etc. but I'm not sure if that should be necessary for 
handling this type of functionality. Any ideas/thoughts/examples would be 
greatly appreciated.

Thanks,

Rion

On 2021/07/14 15:47:18, Maciej Bryński <mac...@brynski.pl> wrote: 
> This is the idea.
> Of course you need to wrap more functions like: open, close,
> notifyCheckpointComplete, snapshotState, initializeState and
> setRuntimeContext.
> 
> The problem is that if you want to catch problematic record you need
> to set batch size to 1, which gives very bad performance.
> 
> Regards,
> Maciek
> 
> śr., 14 lip 2021 o 17:31 Rion Williams <rionmons...@gmail.com> napisał(a):
> >
> > Hi Maciej,
> >
> > Thanks for the quick response. I wasn't aware of the idea of using a 
> > SinkWrapper, but I'm not quite certain that it would suit this specific use 
> > case (as a SinkFunction / RichSinkFunction doesn't appear to support 
> > side-outputs). Essentially, what I'd hope to accomplish would be to pick up 
> > when a bad record could not be written to the sink and then offload that 
> > via a side-output somewhere else.
> >
> > Something like this, which is a very, very naive idea:
> >
> > class PostgresSinkWrapper<T>(private val sink: SinkFunction<T>): 
> > RichSinkFunction<T>() {
> >     private val logger = 
> > LoggerFactory.getLogger(PostgresSinkWrapper::class.java)
> >
> >     override fun invoke(value: T, context: SinkFunction.Context) {
> >         try {
> >             sink.invoke(value, context)
> >         }
> >         catch (exception: Exception){
> >             logger.error("Encountered a bad record, offloading to 
> > dead-letter-queue")
> >             // Offload bad record to DLQ
> >         }
> >     }
> > }
> >
> > But I think that's basically the gist of it. I'm just not sure how I could 
> > go about doing this aside from perhaps writing a custom process function 
> > that wraps another sink function (or just completely rewriting my own 
> > JdbcSink?)
> >
> > Thanks,
> >
> > Rion
> >
> >
> >
> >
> >
> > On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński <mac...@brynski.pl> wrote:
> >>
> >> Hi Rion,
> >> We have implemented such a solution with Sink Wrapper.
> >>
> >>
> >> Regards,
> >> Maciek
> >>
> >> śr., 14 lip 2021 o 16:21 Rion Williams <rionmons...@gmail.com> napisał(a):
> >> >
> >> > Hi all,
> >> >
> >> > Recently I've been encountering an issue where some external 
> >> > dependencies or process causes writes within my JDBCSink to fail (e.g. 
> >> > something is being inserted with an explicit constraint that never made 
> >> > it's way there). I'm trying to see if there's a pattern or 
> >> > recommendation for handling this similar to a dead-letter queue.
> >> >
> >> > Basically - if I experience a given number of failures (> max retry 
> >> > attempts) when writing to my JDBC destination, I'd like to take the 
> >> > record that was attempted and throw it into a Kafka topic or some other 
> >> > destination so that it can be evaluated at a later time.
> >> >
> >> > Are there any well defined patterns or recommended approaches around 
> >> > this?
> >> >
> >> > Thanks,
> >> >
> >> > Rion
> >>
> >>
> >>
> >> --
> >> Maciek Bryński
> 
> 
> 
> -- 
> Maciek Bryński
> 

Reply via email to