Hi. Can someone help me with this? El mié, 19 abr 2023 a las 15:08, Juan Romero (<jsrf...@gmail.com>) escribió:
> Hi community. > > On this occasion I have a doubt regarding how to read a stream from kafka > and write batches of data with the jdbc connector. The idea is to override > a specific row if the current row we want to insert into has the same id > and the load_date_time is greater. The conceptual pipeline look like this > and it is working (Take in mind that the source will be a streaming from > kafka): > > ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str, > load_date_time=str) > > > with beam.Pipeline() as p: > _ = ( > p > | beam.Create( > [ > > ExampleRow(1, 'zzzz', '2023-04-05 12:34:56'), > ExampleRow(1, 'yyyz', '2023-04-05 12:34:55') > ]).with_output_types(ExampleRow) > | 'Write to jdbc' >> WriteToJdbc( > driver_class_name='org.postgresql.Driver', > jdbc_url='jdbc:postgresql://localhost:5432/postgres', > username='postgres', > password='postgres', > table_name= 'test', > connection_properties="stringtype=unspecified", > statement= 'INSERT INTO test \ > VALUES(?,?,?) \ > ON CONFLICT (id)\ > DO UPDATE SET name = EXCLUDED.name, load_date_time = > EXCLUDED.load_date_time\ > WHERE EXCLUDED.load_date_time::timestamp > > test.load_date_time::timestamp', > )) > > My question is if I want to write a stream that comes from kafka how can > how can avoid the jdbc connector inserting the register one by one > statement and rather insert the data in based time batches. Probably > internally jdbc has some kind of "intelligence for do this" but i want to > know what do you think about it . > > Thank you! >