Hi guys. For example it works to me, but i need there is case were only we need the update query. How can i do it?
import apache_beam as beam from apache_beam import coders from pyparsing import unicode import typing from datetime import datetime from apache_beam.io.jdbc import WriteToJdbc from apache_beam.io import jdbc ''' CREATE TABLE IF NOT EXISTS test ( id integer PRIMARY KEY, name VARCHAR(10), load_date_time TIMESTAMP ); ''' ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str, load_date_time=str) datetime.now() class CustomCoder(beam.coders.Coder): ''' Encodes the given object into a byte string ''' def encode(self, value): return ('%s:%s:%s' % (str(value.id), value.name, value.load_date_time.strftime("%Y-%m-%d %H:%M:%S"))).encode('utf-8') ''' Decode the given byte string into the corresponding object ''' def decode(self, s): fields=s.decode('utf-8').split(':') return ExampleRow(int(fields[0]),fields[1],datetime(fields[2])) ''' If the coder is deterministic or not. If it is not deterministic it might serialize the Pcollections elements differently on different machines. For operations like group by it is a problem because with groupBy we compare different element across different machines and if the machines have different coders may result in incorrect behavior. If the coder is deterministic we guarantee that we will use the same coder in all the machines. ''' # def is_deterministic(self): # type: () -> bool # return True # # def to_type_hint(self): # return unicode # ''' # This method estimates the encoded size of the input value. It is not mandatory, It is related to performance only. # ''' # def estimate_size(self, value): # return len(self.encode(value)) beam.coders.registry.register_coder(ExampleRow,coders.RowCoder) date_format = '%Y-%m-%d %H:%M:%S' with beam.Pipeline() as p: _ = ( p | beam.Create( [ ExampleRow(1, 'wrwerewr', '2023-04-05 12:34:55'), ExampleRow(1, 'weerret', '2023-04-05 12:34:54') ]).with_output_types(ExampleRow) | 'Write to jdbc' >> WriteToJdbc( driver_class_name='org.postgresql.Driver', jdbc_url='jdbc:postgresql://localhost:5432/postgres', username='postgres', password='postgres', table_name= 'test', connection_properties="stringtype=unspecified", statement='INSERT INTO test \ VALUES(?,?,?) \ ON CONFLICT (id)\ DO UPDATE SET name = EXCLUDED.name, load_date_time = EXCLUDED.load_date_time\ WHERE EXCLUDED.load_date_time::timestamp > test.load_date_time::timestamp' )) El mié, 17 ene 2024 a las 6:46, Juan Romero (<jsrf...@gmail.com>) escribió: > Hi guys. Im looking for how configure a custom query (statement parameter > in the connector) only for update a register. Can you help me with that?