Hi,
Here is the code which triggers the error(part of sink):
@Override
public void invoke(KafkaLog value) throws Exception {
..
if (arg instanceof String && "error".equals((String)arg)) {
throw new IOException("search for error");
}
Hi,
I have this, architecture: kafka topic -> flink kafka stream -> flink custom
sink to save data in a Postgresql database.
For testing how the system will behave if an error occurs, I've done the
following test:
Activate checkpoints on my DataStream and put on kafka topic one item with
special
Hi,
Thank you very much for you hit! It works pretty well.
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Stream-to-Database-batch-inserts-tp10036p10140.html
Sent from the Apache Flink User Mailing List archive. mailing list archive
Hello,
I'm new to Flink and I need some advicees regarding the best approach to do
the following:
- read some items from a Kafka topic
- on Flink stream side, after some simple filtering steps, group these items
in batches by flink processing time.
- insert the items in a PostgreSql database using