Hi guys. For example it works to me, but i need there is case were only we
need the update query. How can i do it?

import apache_beam as beam
from apache_beam import coders
from pyparsing import unicode
import typing
from datetime import datetime
from apache_beam.io.jdbc import WriteToJdbc
from apache_beam.io import jdbc


'''
CREATE TABLE IF NOT EXISTS test (
   id integer PRIMARY KEY,
   name VARCHAR(10),
   load_date_time TIMESTAMP
);
'''

ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str,
load_date_time=str)

datetime.now()

class CustomCoder(beam.coders.Coder):
    '''
    Encodes the given object into a byte string
    '''
    def encode(self, value):
        return ('%s:%s:%s' % (str(value.id), value.name,
value.load_date_time.strftime("%Y-%m-%d %H:%M:%S"))).encode('utf-8')

    '''
    Decode the given byte string into the corresponding object
    '''
    def decode(self, s):
        fields=s.decode('utf-8').split(':')
        return ExampleRow(int(fields[0]),fields[1],datetime(fields[2]))

    '''
    If the coder is deterministic or not. If it is not deterministic
it might serialize the Pcollections elements
    differently on different machines. For operations like group by it
is a problem because with groupBy we compare
    different element across different machines and if the machines
have different coders may result in incorrect behavior.
    If the coder is deterministic we guarantee that we will use the
same coder in all the machines.
    '''
    # def is_deterministic(self):  # type: () -> bool
    #     return True
    #
    # def to_type_hint(self):
    #     return unicode
    # '''
    # This method estimates the encoded size of the input value. It is
not mandatory, It is related to performance only.
    # '''
    # def estimate_size(self, value):
    #     return len(self.encode(value))

beam.coders.registry.register_coder(ExampleRow,coders.RowCoder)

date_format = '%Y-%m-%d %H:%M:%S'

with beam.Pipeline() as p:
  _ = (
      p
      | beam.Create(
        [

            ExampleRow(1, 'wrwerewr', '2023-04-05 12:34:55'),
            ExampleRow(1, 'weerret', '2023-04-05 12:34:54')
        ]).with_output_types(ExampleRow)
      | 'Write to jdbc' >> WriteToJdbc(
          driver_class_name='org.postgresql.Driver',
          jdbc_url='jdbc:postgresql://localhost:5432/postgres',
          username='postgres',
          password='postgres',
          table_name= 'test',
          connection_properties="stringtype=unspecified",
          statement='INSERT INTO test \
                        VALUES(?,?,?) \
                      ON CONFLICT (id)\
                        DO UPDATE SET name = EXCLUDED.name,
load_date_time = EXCLUDED.load_date_time\
                      WHERE EXCLUDED.load_date_time::timestamp >
test.load_date_time::timestamp'
      ))


El mié, 17 ene 2024 a las 6:46, Juan Romero (<jsrf...@gmail.com>) escribió:

> Hi guys. Im looking for how configure a custom query (statement parameter
> in the connector) only for update a register. Can you help me with that?

Reply via email to