So it looks like newer versions have this.

The 1.8 branch you can have similar functionality if you enable
checkpointing. There's a few things to look at that could be confusing if
using 1.8.
JDBCOuputFormat: Only works with batch size interval and works with Row.
JDBCSinkFunction: Uses JDBCOuputFormat but will also sink on a time
interval based on the checkpoint settings.
AppendTableSink: Uses JDBCSinkFunction and works with table APIs.

On Thu, 17 Oct 2019 at 23:57, Rong Rong <walter...@gmail.com> wrote:

> Splendid. Thanks for following up and moving the discussion forward :-)
>
> --
> Rong
>
> On Thu, Oct 17, 2019 at 11:38 AM John Smith <java.dev....@gmail.com>
> wrote:
>
>> I recorded two:
>> Time interval: https://issues.apache.org/jira/browse/FLINK-14442
>> Checkpointing: https://issues.apache.org/jira/browse/FLINK-14443
>>
>>
>> On Thu, 17 Oct 2019 at 14:00, Rong Rong <walter...@gmail.com> wrote:
>>
>>> Yes, I think having a time interval execution (for the AppendableSink)
>>> should be a good idea.
>>> Can you please open a Jira issue[1] for further discussion.
>>>
>>> --
>>> Rong
>>>
>>> [1] https://issues.apache.org/jira/projects/FLINK/issues
>>>
>>> On Thu, Oct 17, 2019 at 9:48 AM John Smith <java.dev....@gmail.com>
>>> wrote:
>>>
>>>> Yes correct, I set it to batch interval = 1 and it works fine. Anyways
>>>> I think the JDBC sink could have some improvements like batchInterval +
>>>> time interval execution. So if the batch doesn't fill up then execute what
>>>> ever is left on that time interval.
>>>>
>>>> On Thu, 17 Oct 2019 at 12:22, Rong Rong <walter...@gmail.com> wrote:
>>>>
>>>>> Hi John,
>>>>>
>>>>> You are right. IMO the batch interval setting is used for increasing
>>>>> the JDBC execution performance purpose.
>>>>> The reason why your INSERT INTO statement with a `non_existing_table`
>>>>> the exception doesn't happen is because the JDBCAppendableSink does not
>>>>> check table existence beforehand. That being said it should fail at the
>>>>> first batch execution.
>>>>>
>>>>> Also I think the `batchInterval` setting is local to the task , this
>>>>> means the default 5000 batchInterval is per-partition.
>>>>>
>>>>> --
>>>>> Rong
>>>>>
>>>>> On Wed, Oct 16, 2019 at 7:21 AM John Smith <java.dev....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ok I think I found it. it's the batch interval setting. From what I
>>>>>> see, if we want "realtime" stream to the database we have to set it to 1
>>>>>> other wise the sink will wait until, the batch interval count is reached.
>>>>>>
>>>>>> The batch interval mechanism doesn't see correct? If the default size
>>>>>> is 5000 and you need to insert 5001 you will never get that 1 record?
>>>>>>
>>>>>> On Tue, 15 Oct 2019 at 15:54, John Smith <java.dev....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, using 1.8.0
>>>>>>>
>>>>>>> I have the following job: https://pastebin.com/ibZUE8Qx
>>>>>>>
>>>>>>> So the job does the following steps...
>>>>>>> 1- Consume from Kafka and return JsonObject
>>>>>>> 2- Map JsonObject to MyPojo
>>>>>>> 3- Convert The stream to a table
>>>>>>> 4- Insert the table to JDBC sink table
>>>>>>> 5- Print the table.
>>>>>>>
>>>>>>> - The job seems to work with no errors and I can see the row print
>>>>>>> to the console and I see nothing in my database.
>>>>>>> - If I put invalid host for the database and restart the job, I get
>>>>>>> a connection SQLException error. So at least we know that works.
>>>>>>> - If I make a typo on the INSERT INTO statement like INSERTS INTO
>>>>>>> non_existing_table, there are no exceptions thrown, the print happens, 
>>>>>>> the
>>>>>>> stream continues to work.
>>>>>>> - If I drop the table from the database, same thing, no exceptions
>>>>>>> thrown, the print happens, the stream continues to work.
>>>>>>>
>>>>>>> So am I missing something?
>>>>>>>
>>>>>>

Reply via email to