So it looks like newer versions have this. The 1.8 branch you can have similar functionality if you enable checkpointing. There's a few things to look at that could be confusing if using 1.8. JDBCOuputFormat: Only works with batch size interval and works with Row. JDBCSinkFunction: Uses JDBCOuputFormat but will also sink on a time interval based on the checkpoint settings. AppendTableSink: Uses JDBCSinkFunction and works with table APIs.
On Thu, 17 Oct 2019 at 23:57, Rong Rong <walter...@gmail.com> wrote: > Splendid. Thanks for following up and moving the discussion forward :-) > > -- > Rong > > On Thu, Oct 17, 2019 at 11:38 AM John Smith <java.dev....@gmail.com> > wrote: > >> I recorded two: >> Time interval: https://issues.apache.org/jira/browse/FLINK-14442 >> Checkpointing: https://issues.apache.org/jira/browse/FLINK-14443 >> >> >> On Thu, 17 Oct 2019 at 14:00, Rong Rong <walter...@gmail.com> wrote: >> >>> Yes, I think having a time interval execution (for the AppendableSink) >>> should be a good idea. >>> Can you please open a Jira issue[1] for further discussion. >>> >>> -- >>> Rong >>> >>> [1] https://issues.apache.org/jira/projects/FLINK/issues >>> >>> On Thu, Oct 17, 2019 at 9:48 AM John Smith <java.dev....@gmail.com> >>> wrote: >>> >>>> Yes correct, I set it to batch interval = 1 and it works fine. Anyways >>>> I think the JDBC sink could have some improvements like batchInterval + >>>> time interval execution. So if the batch doesn't fill up then execute what >>>> ever is left on that time interval. >>>> >>>> On Thu, 17 Oct 2019 at 12:22, Rong Rong <walter...@gmail.com> wrote: >>>> >>>>> Hi John, >>>>> >>>>> You are right. IMO the batch interval setting is used for increasing >>>>> the JDBC execution performance purpose. >>>>> The reason why your INSERT INTO statement with a `non_existing_table` >>>>> the exception doesn't happen is because the JDBCAppendableSink does not >>>>> check table existence beforehand. That being said it should fail at the >>>>> first batch execution. >>>>> >>>>> Also I think the `batchInterval` setting is local to the task , this >>>>> means the default 5000 batchInterval is per-partition. >>>>> >>>>> -- >>>>> Rong >>>>> >>>>> On Wed, Oct 16, 2019 at 7:21 AM John Smith <java.dev....@gmail.com> >>>>> wrote: >>>>> >>>>>> Ok I think I found it. it's the batch interval setting. From what I >>>>>> see, if we want "realtime" stream to the database we have to set it to 1 >>>>>> other wise the sink will wait until, the batch interval count is reached. >>>>>> >>>>>> The batch interval mechanism doesn't see correct? If the default size >>>>>> is 5000 and you need to insert 5001 you will never get that 1 record? >>>>>> >>>>>> On Tue, 15 Oct 2019 at 15:54, John Smith <java.dev....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, using 1.8.0 >>>>>>> >>>>>>> I have the following job: https://pastebin.com/ibZUE8Qx >>>>>>> >>>>>>> So the job does the following steps... >>>>>>> 1- Consume from Kafka and return JsonObject >>>>>>> 2- Map JsonObject to MyPojo >>>>>>> 3- Convert The stream to a table >>>>>>> 4- Insert the table to JDBC sink table >>>>>>> 5- Print the table. >>>>>>> >>>>>>> - The job seems to work with no errors and I can see the row print >>>>>>> to the console and I see nothing in my database. >>>>>>> - If I put invalid host for the database and restart the job, I get >>>>>>> a connection SQLException error. So at least we know that works. >>>>>>> - If I make a typo on the INSERT INTO statement like INSERTS INTO >>>>>>> non_existing_table, there are no exceptions thrown, the print happens, >>>>>>> the >>>>>>> stream continues to work. >>>>>>> - If I drop the table from the database, same thing, no exceptions >>>>>>> thrown, the print happens, the stream continues to work. >>>>>>> >>>>>>> So am I missing something? >>>>>>> >>>>>>