I am *so close* it seems. ;) I followed Luke's advice and am reading the proto com.google.bigtable.v2.Row, then use a transform to convert that to bytes in order to be able to send it across to Python. (I assume that's what I should be doing with the proto?) Once on the Python side, when running on Dataflow, I'm running into the dreaded NameError. save_main_session is True.
Either from google.cloud.bigtable_v2.types import Row ... class ParsePB(beam.DoFn): def process(self, pb_bytes): row = Row() row.ParseFromString(pb_bytes) or from google.cloud.bigtable_v2.proto import data_pb2 as data_v2_pb2 ... class ParsePB(beam.DoFn): def process(self, pb_bytes): row = Row() row.ParseFromString(pb_bytes) works in the DirectRunner (if I skip the Java connection and fake input data), but not on Dataflow. It works if I put the import in the process() function, although then running the code is super slow. (I'm not sure why, but running an import on every entry definitely sounds like it could cause that!) (I still have issues with the DirectRunner, as per my previous email.) Is there a good way to get around this? Thanks! -Lina On Thu, Jan 5, 2023 at 4:49 PM Lina Mårtensson <lina@camus.energy> wrote: > Great, thanks! That was a huge improvement. > > > On Thu, Jan 5, 2023 at 12:52 PM Luke Cwik <lc...@google.com> wrote: > >> By default Beam Java only uploads artifacts that have changed but it >> looks like this is not the case for Beam Python and you need to explicitly >> opt in with the --enable_artifact_caching flag[1]. >> >> It looks like this feature was added 1 year ago[2], should we make this >> on by default? >> >> 1: >> https://github.com/apache/beam/blob/3070160203c6734da0eb04b440e08b43f9fd33f3/sdks/python/apache_beam/options/pipeline_options.py#L794 >> 2: https://github.com/apache/beam/pull/16229 >> >> >> >> On Thu, Jan 5, 2023 at 11:43 AM Lina Mårtensson <lina@camus.energy> >> wrote: >> >>> Thanks! I have now successfully written a beautiful string of protobuf >>> bytes into a file via Python. 🎉 >>> >>> Two issues though: >>> 1. Robert said the Python direct runner would just work with this - but >>> it's not working. After about half an hour of these messages repeated over >>> and over again I interrupted the job: >>> >>> E0105 07:25:48.170601677 58210 fork_posix.cc:76] Other >>> threads are currently calling into gRPC, skipping fork() handlers >>> >>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'2023/01/05 >>> 06:57:10 Failed to obtain provisioning information: failed to dial server >>> at localhost:41087\n\tcaused by:\ncontext deadline exceeded\n' >>> 2. I (unsurprisingly) get back to the issue I had when I tested out the >>> Spanner x-lang transform on Dataflow - the overhead for starting a job is >>> unbearably slow, the time mainly spent in transferring the expansion >>> service jar (115 MB) + my jar (105 MB) with my new code and its >>> dependencies: >>> >>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload >>> to >>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar... >>> >>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS >>> upload to >>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar >>> in 321 seconds. >>> >>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload >>> to >>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar... >>> >>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS >>> upload to >>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar >>> in 295 seconds. >>> I have a total of 13 minutes until any workers have started on Dataflow, >>> then another 4.5 minutes once the job actually does anything (which >>> eventually is to read a whopping 3 cells from Bigtable ;). >>> >>> How could this be improved? >>> For one, it seems to me like the upload of >>> sdks:java:io:google-cloud-platform:expansion-service:shadowJar from my >>> computer shouldn't be necessary - shouldn't Dataflow have that >>> already/could it be fetched by Dataflow rather than having to upload it >>> over slow internet? >>> And what about my own jar - it's not bound to change very often, so >>> would it be possible to upload somewhere and then fetch it from there? >>> >>> Thanks! >>> -Lina >>> >>> On Tue, Jan 3, 2023 at 1:23 PM Luke Cwik <lc...@google.com> wrote: >>> >>>> I would suggest using BigtableIO which also returns a >>>> protobuf com.google.bigtable.v2.Row. This should allow you to replicate >>>> what SpannerIO is doing. >>>> >>>> Alternatively you could provide a way to convert the HBase result into >>>> a Beam row by specifying a converter and a schema for it and then you could >>>> use the already well known Beam Schema type: >>>> >>>> https://github.com/apache/beam/blob/0b8f0b4db7a0de4977e30bcfeb50b5c14c7c1572/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1068 >>>> >>>> Otherwise you'll have to register the HBase result coder with a well >>>> known name so that the runner API coder URN is something that you know and >>>> then on the Python side you would need a coder for that URN as well allow >>>> you to understand the bytes being sent across from the Java portion of the >>>> pipeline. >>>> >>>> On Fri, Dec 30, 2022 at 12:59 AM Lina Mårtensson <lina@camus.energy> >>>> wrote: >>>> >>>>> And next issue... I'm getting KeyError: 'beam:coders:javasdk:0.1' which >>>>> I learned >>>>> <https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips> >>>>> is because the transform is trying to return something that there isn't a >>>>> standard >>>>> Beam coder for >>>>> <https://github.com/apache/beam/blob/05428866cdbf1ea8e4c1789dd40327673fd39451/model/pipeline/src/main/proto/beam_runner_api.proto#L784> >>>>> . >>>>> Makes sense, but... how do I fix this? The documentation talks >>>>> about how to do this for the input, but not for the output. >>>>> >>>>> Comparing to Spanner, it looks like Spanner returns a protobuf, which >>>>> I'm guessing somehow gets converted to bytes... But CloudBigtableIO >>>>> <https://github.com/googleapis/java-bigtable-hbase/blob/main/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java> >>>>> returns org.apache.hadoop.hbase.client.Result. >>>>> >>>>> My buildExternal method looks like follows: >>>>> >>>>> @Override >>>>> >>>>> public PTransform<PBegin, PCollection<Result>> buildExternal( >>>>> >>>>> BigtableReadBuilder.Configuration configuration) { >>>>> >>>>> >>>>> return Read.from(CloudBigtableIO.read( >>>>> >>>>> new CloudBigtableScanConfiguration.Builder() >>>>> >>>>> >>>>> .withProjectId(configuration.projectId) >>>>> >>>>> >>>>> .withInstanceId(configuration.instanceId) >>>>> >>>>> >>>>> .withTableId(configuration.tableId) >>>>> >>>>> .build() >>>>> >>>>> )); >>>>> >>>>> >>>>> I also got a warning, which I *believe* is unrelated (but also an >>>>> issue): >>>>> >>>>> INFO:apache_beam.utils.subprocess_server:b"WARNING: Configuration >>>>> class >>>>> 'energy.camus.beam.BigtableRegistrar$BigtableReadBuilder$Configuration' >>>>> has >>>>> no schema registered. Attempting to construct with setter approach." >>>>> >>>>> INFO:apache_beam.utils.subprocess_server:b'Dec 30, 2022 7:46:14 AM >>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader >>>>> payloadToConfig' >>>>> What is this schema and what should it look like? >>>>> >>>>> Thanks! >>>>> -Lina >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Dec 30, 2022 at 12:28 AM Lina Mårtensson <lina@camus.energy> >>>>> wrote: >>>>> >>>>>> Thanks! This was really helpful. It took a while to figure out the >>>>>> details - a section in the docs on what's required of these jars for >>>>>> non-Java users would be a great addition. >>>>>> >>>>>> But once I did, the Bazel config was actually quite straightforward >>>>>> and makes sense. >>>>>> I pasted the first section from here >>>>>> <https://github.com/bazelbuild/rules_jvm_external/blob/master/README.md#usage> >>>>>> into >>>>>> my WORKSPACE file and changed the artifacts to the ones I needed. (How to >>>>>> find the right ones remains confusing.) >>>>>> >>>>>> After that I updated my BUILD rules and Blaze had easy and >>>>>> straightforward configs for it, all I needed was this: >>>>>> >>>>>> # From >>>>>> https://github.com/google/bazel-common/blob/master/third_party/java/auto/BUILD >>>>>> . >>>>>> >>>>>> # The auto service is what registers our Registrar class, and it >>>>>> needs to be a plugin which >>>>>> >>>>>> # makes it run at compile-time. >>>>>> >>>>>> java_plugin( >>>>>> >>>>>> name = "auto_service_processor", >>>>>> >>>>>> processor_class = >>>>>> "com.google.auto.service.processor.AutoServiceProcessor", >>>>>> >>>>>> deps = [ >>>>>> >>>>>> "@maven//:com_google_auto_service_auto_service", >>>>>> >>>>>> "@maven//:com_google_auto_service_auto_service_annotations", >>>>>> >>>>>> "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre", >>>>>> >>>>>> ], >>>>>> >>>>>> ) >>>>>> >>>>>> >>>>>> java_binary( >>>>>> >>>>>> name = "java_hbase", >>>>>> >>>>>> main_class = "energy.camus.beam.BigtableRegistrar", >>>>>> >>>>>> plugins = [":auto_service_processor"], >>>>>> >>>>>> srcs = ["src/main/java/energy/camus/beam/BigtableRegistrar.java" >>>>>> ], >>>>>> >>>>>> deps = [ >>>>>> >>>>>> "@maven//:com_google_auto_service_auto_service", >>>>>> >>>>>> "@maven//:com_google_auto_service_auto_service_annotations", >>>>>> >>>>>> >>>>>> "@maven//:com_google_cloud_bigtable_bigtable_hbase_beam", >>>>>> >>>>>> >>>>>> "@maven//:org_apache_beam_beam_sdks_java_core", >>>>>> >>>>>> "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre", >>>>>> >>>>>> "@maven//:org_apache_hbase_hbase_shaded_client", >>>>>> >>>>>> ], >>>>>> >>>>>> ) >>>>>> >>>>>> >>>>>> On Thu, Dec 29, 2022 at 2:43 PM Luke Cwik <lc...@google.com> wrote: >>>>>> >>>>>>> AutoService relies on Java's compiler annotation processor. >>>>>>> https://github.com/google/auto/tree/main/service#getting-started >>>>>>> shows that you need to configure Java's compiler to use the annotation >>>>>>> processors within AutoService. >>>>>>> >>>>>>> I saw this public gist that seemed to enable using the AutoService >>>>>>> annotation processor with Bazel >>>>>>> https://gist.github.com/jart/5333824b94cd706499a7bfa1e086ee00 >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Dec 29, 2022 at 2:27 PM Lina Mårtensson via dev < >>>>>>> dev@beam.apache.org> wrote: >>>>>>> >>>>>>>> That's good news about the direct runner, thanks! >>>>>>>> >>>>>>>> On Thu, Dec 29, 2022 at 2:02 PM Robert Bradshaw < >>>>>>>> rober...@google.com> wrote: >>>>>>>> >>>>>>>>> On Thu, Jul 28, 2022 at 5:37 PM Chamikara Jayalath via dev >>>>>>>>> <dev@beam.apache.org> wrote: >>>>>>>>> > >>>>>>>>> > On Thu, Jul 28, 2022 at 4:51 PM Lina Mårtensson >>>>>>>>> <lina@camus.energy> wrote: >>>>>>>>> >> >>>>>>>>> >> Thanks for the detailed answers! >>>>>>>>> >> >>>>>>>>> >> I totally get the points about development & maintenance cost, >>>>>>>>> and, >>>>>>>>> >> from a user perspective, about getting the performance right. >>>>>>>>> >> >>>>>>>>> >> I decided to try out the Spanner connector to get a sense of >>>>>>>>> how well >>>>>>>>> >> the x-language approach works in our world, since that's an >>>>>>>>> existing >>>>>>>>> >> x-language connector. >>>>>>>>> >> Overall, it works and with minimal intervention as you say - it >>>>>>>>> is >>>>>>>>> >> very slow, though. >>>>>>>>> >> I'm a little confused about "portable runners" - if I >>>>>>>>> understand this >>>>>>>>> >> correctly, this means we couldn't run with the DirectRunner >>>>>>>>> anymore if >>>>>>>>> >> using an x-language connector? (At least it didn't work when I >>>>>>>>> tried >>>>>>>>> >> it.) >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > You'll have to use the portable DirectRunner - >>>>>>>>> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/portability >>>>>>>>> > >>>>>>>>> > Job service for this can be started using following command: >>>>>>>>> > python apache_beam/runners/portability/local_job_service_main.py >>>>>>>>> -p <port> >>>>>>>>> >>>>>>>>> Note that the Python direct runner is already a portable runner, so >>>>>>>>> you shouldn't have to do anything special (like start up a separate >>>>>>>>> job service and pass extra options) to run locally. Just use the >>>>>>>>> cross-language transforms as you would any normal Python transform. >>>>>>>>> >>>>>>>>> The goal is to make this as smooth and transparent as possible; >>>>>>>>> please >>>>>>>>> keep coming back to us if you find rough edges. >>>>>>>>> >>>>>>>>