Hi ,
I am trying to write a pipeline to read data from oracle using python sdk.
I have used *ReadFromJdbc. *While reading data it is failing for data types
like Varchar, datetime.
I think this might be an issue because of cross language (python sdk
calling Java expansion service).
Do we have a solution for this.
It is failing with the below issue:
ValueError: Failed to decode schema due to an issue with Field proto: name:
"F_DATE" type { nullable: true logical_type { urn:
"beam:logical_type:javasdk:v1" payload:
"\202SNAPPY\000\000\000\000\001\000\000\000\001\000\000\002_\303\007\360@\254\355\000\005sr\000*org.apache.beam.sdk.io.jdbc.LogicalTypes$2M\352\236\036h\3034/\002\000\000xr\000?N9\000
schemas.l\t9Dtypes.PassThroughL\t\030\001Q\270\210\324\331\211\313P\033\263\002\000\004L\000\010argumentt\000\022Ljava/lang/Object;L\000\014a\r
\001:\034t\000.Lorg/\t\266\000/\001\266\020/sdk/\r}\004/S\005\205\024$Field\0010\020;L\000\tf\021\rDq\000~\000\003L\000\nidentifier6s\000<String;xpt\000\000sr\0006n\346\000$AutoValue_\ts\000_\025sh9\304m\364S\243\227P\002\000\010L\000\025collectionEle\001\346\001\226\r\211\000\013-+\001\023\010t\0000\216\331\000=E$;L\000\nmapKey\001@\rS\014\014map\005\227\035\024,\010metadatat\000\017)aXutil/Map;L\000\010nullablet\000\023\t\035%~\030Boolean!?\010row\t\343\010t\000$\212\243\000\001T(typeNamet\000-\2122\000\000$\001\254\001/\020;xr\000,nu\001\t\2109\3360\013PLl[\357\3103\002\000\000xp\001\001\014sr\000\036AC\000.\001\342\004.C5|Ds$EmptyMapY6\024\205Z\334\347\320\0053\014sr\000\021\005/\001\364\000.\r\3648\315
r\200\325\234\372\356\002\000\001Z\000\005v!\344\034xp\000p~r\000+\212\234\000\021\314\000\000\r\001\000\022e1\000\016\031f\014Enum\r\034\005\035(pt\000\006STRINGs!\304\000\007\001\307\001\t\000\020\001\005\010\022p~\001\007H\023t\000\010DATETIMEt\000\004DATE"
representation { logical_type { urn: "beam:logical_type:datetime:v1"
representation { atomic_type: INT64 } } } argument_type { atomic_type:
STRING } argument { atomic_value { string: "" } } } } id: 1
encoding_position: 1 === Source Location Trace: ===
dist_proc/dax/workflow/worker/fnapi_sdk_harness.cc:177 .

I also followed the solution mentioned in this Stackoverflow post:
https://stackoverflow.com/a/71265662/18224790

I want to know if there is a solution for that or if this is a Bug in
ReadFromJdbc for python sdk.

I am also attaching the pipeline code that I am using.

Regards
Abhinav Jha
import argparse
import csv
import argparse
import apache_beam
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class JobOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            "--project_id", type=str, help="project ID of GCP project", 
default=None
        )



if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=False, save_main_session=True
    )
    options = pipeline_options.view_as(JobOptions)
    pipeline = beam.Pipeline(options=pipeline_options)
    data = (pipeline
                | "Read from rdbms" >> ReadFromJdbc(
                    driver_class_name='oracle.jdbc.driver.OracleDriver',
                    jdbc_url='jdbc:url//',
                    username='XXXX',
                    password='XXXXX',
                    table_name='database.table_name',
                    classpath=['/home/abhinav_jha/python_df/ojdbc8.jar']
                )
                )

    data | "Convert To Dict" >> beam.ParDo(ConvertToDict())
    pipeline.run()

Reply via email to