Hi. Can someone help me with this?

El mié, 19 abr 2023 a las 15:08, Juan Romero (<jsrf...@gmail.com>) escribió:

> Hi community.
>
> On this occasion I have a doubt regarding how to read a stream from kafka
> and write batches of data with the jdbc connector. The idea is to override
> a specific row if the current row we want to insert into has the same id
> and the load_date_time is greater. The conceptual pipeline look like this
> and it is working (Take in mind that the source will be a streaming from
> kafka):
>
> ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str, 
> load_date_time=str)
>
>
> with beam.Pipeline() as p:
>   _ = (
>       p
>       | beam.Create(
>         [
>
>             ExampleRow(1, 'zzzz', '2023-04-05 12:34:56'),
>             ExampleRow(1, 'yyyz', '2023-04-05 12:34:55')
>         ]).with_output_types(ExampleRow)
>       | 'Write to jdbc' >> WriteToJdbc(
>           driver_class_name='org.postgresql.Driver',
>           jdbc_url='jdbc:postgresql://localhost:5432/postgres',
>           username='postgres',
>           password='postgres',
>           table_name= 'test',
>           connection_properties="stringtype=unspecified",
>           statement= 'INSERT INTO test \
>                         VALUES(?,?,?) \
>                       ON CONFLICT (id)\
>                         DO UPDATE SET name = EXCLUDED.name, load_date_time = 
> EXCLUDED.load_date_time\
>                       WHERE EXCLUDED.load_date_time::timestamp > 
> test.load_date_time::timestamp',
>       ))
>
> My question is if I want to write a stream that comes from kafka how can
> how can avoid the jdbc connector inserting the register one by one
> statement and rather insert the data in based time batches. Probably
> internally jdbc has some kind of "intelligence for do this" but i want to
> know what do you think about it  .
>
> Thank you!
>

Reply via email to