Were you ever able to get the local runner to work? If not, some more context on the errors would be useful.
On Tue, Jan 10, 2023 at 10:00 AM Lina Mårtensson <lina@camus.energy> wrote: > > Thanks! Moving my DoFn into a new module worked, and that solved the slowness > as well. > I tried importing it in setup() as well, but that didn't work. > > On Fri, Jan 6, 2023 at 2:25 PM Luke Cwik <lc...@google.com> wrote: >> >> The proto (java) -> bytes -> proto (python) sounds good. >> >> Have you tried moving your DoFn outside of your main module into a new >> module as per [1]. Other suggestions are to do the import in the function. >> Can you do the import once in the setup()[2] function? Have you considered >> using the cloud profiler[3] to see what is actually slow? >> >> 1: >> https://stackoverflow.com/questions/69436706/nameerror-name-beam-is-not-defined-in-lambda >> 2: >> https://github.com/apache/beam/blob/f9d5de34ae1dad251f5580073c0245a206224a69/sdks/python/apache_beam/transforms/core.py#L670 >> 3: https://cloud.google.com/dataflow/docs/guides/profiling-a-pipeline#python >> >> >> On Fri, Jan 6, 2023 at 11:19 AM Lina Mårtensson <lina@camus.energy> wrote: >>> >>> I am *so close* it seems. ;) >>> >>> I followed Luke's advice and am reading the proto >>> com.google.bigtable.v2.Row, then use a transform to convert that to bytes >>> in order to be able to send it across to Python. (I assume that's what I >>> should be doing with the proto?) >>> Once on the Python side, when running on Dataflow, I'm running into the >>> dreaded NameError. >>> save_main_session is True. >>> >>> Either >>> from google.cloud.bigtable_v2.types import Row >>> ... >>> class ParsePB(beam.DoFn): >>> def process(self, pb_bytes): >>> row = Row() >>> row.ParseFromString(pb_bytes) >>> >>> or >>> >>> from google.cloud.bigtable_v2.proto import data_pb2 as data_v2_pb2 >>> ... >>> class ParsePB(beam.DoFn): >>> def process(self, pb_bytes): >>> row = Row() >>> row.ParseFromString(pb_bytes) >>> >>> works in the DirectRunner (if I skip the Java connection and fake input >>> data), but not on Dataflow. >>> It works if I put the import in the process() function, although then >>> running the code is super slow. (I'm not sure why, but running an import on >>> every entry definitely sounds like it could cause that!) >>> >>> (I still have issues with the DirectRunner, as per my previous email.) >>> >>> Is there a good way to get around this? >>> >>> Thanks! >>> -Lina >>> >>> On Thu, Jan 5, 2023 at 4:49 PM Lina Mårtensson <lina@camus.energy> wrote: >>>> >>>> Great, thanks! That was a huge improvement. >>>> >>>> >>>> On Thu, Jan 5, 2023 at 12:52 PM Luke Cwik <lc...@google.com> wrote: >>>>> >>>>> By default Beam Java only uploads artifacts that have changed but it >>>>> looks like this is not the case for Beam Python and you need to >>>>> explicitly opt in with the --enable_artifact_caching flag[1]. >>>>> >>>>> It looks like this feature was added 1 year ago[2], should we make this >>>>> on by default? >>>>> >>>>> 1: >>>>> https://github.com/apache/beam/blob/3070160203c6734da0eb04b440e08b43f9fd33f3/sdks/python/apache_beam/options/pipeline_options.py#L794 >>>>> 2: https://github.com/apache/beam/pull/16229 >>>>> >>>>> >>>>> >>>>> On Thu, Jan 5, 2023 at 11:43 AM Lina Mårtensson <lina@camus.energy> wrote: >>>>>> >>>>>> Thanks! I have now successfully written a beautiful string of protobuf >>>>>> bytes into a file via Python. 🎉 >>>>>> >>>>>> Two issues though: >>>>>> 1. Robert said the Python direct runner would just work with this - but >>>>>> it's not working. After about half an hour of these messages repeated >>>>>> over and over again I interrupted the job: >>>>>> >>>>>> E0105 07:25:48.170601677 58210 fork_posix.cc:76] Other >>>>>> threads are currently calling into gRPC, skipping fork() handlers >>>>>> >>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'2023/01/05 >>>>>> 06:57:10 Failed to obtain provisioning information: failed to dial >>>>>> server at localhost:41087\n\tcaused by:\ncontext deadline exceeded\n' >>>>>> >>>>>> 2. I (unsurprisingly) get back to the issue I had when I tested out the >>>>>> Spanner x-lang transform on Dataflow - the overhead for starting a job >>>>>> is unbearably slow, the time mainly spent in transferring the expansion >>>>>> service jar (115 MB) + my jar (105 MB) with my new code and its >>>>>> dependencies: >>>>>> >>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload >>>>>> to >>>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar... >>>>>> >>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS >>>>>> upload to >>>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar >>>>>> in 321 seconds. >>>>>> >>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload >>>>>> to >>>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar... >>>>>> >>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS >>>>>> upload to >>>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar >>>>>> in 295 seconds. >>>>>> >>>>>> I have a total of 13 minutes until any workers have started on Dataflow, >>>>>> then another 4.5 minutes once the job actually does anything (which >>>>>> eventually is to read a whopping 3 cells from Bigtable ;). >>>>>> >>>>>> How could this be improved? >>>>>> For one, it seems to me like the upload of >>>>>> sdks:java:io:google-cloud-platform:expansion-service:shadowJar from my >>>>>> computer shouldn't be necessary - shouldn't Dataflow have that >>>>>> already/could it be fetched by Dataflow rather than having to upload it >>>>>> over slow internet? >>>>>> And what about my own jar - it's not bound to change very often, so >>>>>> would it be possible to upload somewhere and then fetch it from there? >>>>>> >>>>>> Thanks! >>>>>> -Lina >>>>>> >>>>>> On Tue, Jan 3, 2023 at 1:23 PM Luke Cwik <lc...@google.com> wrote: >>>>>>> >>>>>>> I would suggest using BigtableIO which also returns a protobuf >>>>>>> com.google.bigtable.v2.Row. This should allow you to replicate what >>>>>>> SpannerIO is doing. >>>>>>> >>>>>>> Alternatively you could provide a way to convert the HBase result into >>>>>>> a Beam row by specifying a converter and a schema for it and then you >>>>>>> could use the already well known Beam Schema type: >>>>>>> https://github.com/apache/beam/blob/0b8f0b4db7a0de4977e30bcfeb50b5c14c7c1572/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1068 >>>>>>> >>>>>>> Otherwise you'll have to register the HBase result coder with a well >>>>>>> known name so that the runner API coder URN is something that you know >>>>>>> and then on the Python side you would need a coder for that URN as well >>>>>>> allow you to understand the bytes being sent across from the Java >>>>>>> portion of the pipeline. >>>>>>> >>>>>>> On Fri, Dec 30, 2022 at 12:59 AM Lina Mårtensson <lina@camus.energy> >>>>>>> wrote: >>>>>>>> >>>>>>>> And next issue... I'm getting KeyError: 'beam:coders:javasdk:0.1' >>>>>>>> which I learned is because the transform is trying to return something >>>>>>>> that there isn't a standard Beam coder for. >>>>>>>> Makes sense, but... how do I fix this? The documentation talks about >>>>>>>> how to do this for the input, but not for the output. >>>>>>>> >>>>>>>> Comparing to Spanner, it looks like Spanner returns a protobuf, which >>>>>>>> I'm guessing somehow gets converted to bytes... But CloudBigtableIO >>>>>>>> returns org.apache.hadoop.hbase.client.Result. >>>>>>>> >>>>>>>> My buildExternal method looks like follows: >>>>>>>> >>>>>>>> @Override >>>>>>>> >>>>>>>> public PTransform<PBegin, PCollection<Result>> buildExternal( >>>>>>>> >>>>>>>> BigtableReadBuilder.Configuration configuration) { >>>>>>>> >>>>>>>> return Read.from(CloudBigtableIO.read( >>>>>>>> >>>>>>>> new CloudBigtableScanConfiguration.Builder() >>>>>>>> >>>>>>>> .withProjectId(configuration.projectId) >>>>>>>> >>>>>>>> .withInstanceId(configuration.instanceId) >>>>>>>> >>>>>>>> .withTableId(configuration.tableId) >>>>>>>> >>>>>>>> .build() >>>>>>>> >>>>>>>> )); >>>>>>>> >>>>>>>> >>>>>>>> I also got a warning, which I *believe* is unrelated (but also an >>>>>>>> issue): >>>>>>>> >>>>>>>> INFO:apache_beam.utils.subprocess_server:b"WARNING: Configuration >>>>>>>> class >>>>>>>> 'energy.camus.beam.BigtableRegistrar$BigtableReadBuilder$Configuration' >>>>>>>> has no schema registered. Attempting to construct with setter >>>>>>>> approach." >>>>>>>> >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'Dec 30, 2022 7:46:14 AM >>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader >>>>>>>> payloadToConfig' >>>>>>>> >>>>>>>> What is this schema and what should it look like? >>>>>>>> >>>>>>>> Thanks! >>>>>>>> -Lina >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Dec 30, 2022 at 12:28 AM Lina Mårtensson <lina@camus.energy> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Thanks! This was really helpful. It took a while to figure out the >>>>>>>>> details - a section in the docs on what's required of these jars for >>>>>>>>> non-Java users would be a great addition. >>>>>>>>> >>>>>>>>> But once I did, the Bazel config was actually quite straightforward >>>>>>>>> and makes sense. >>>>>>>>> I pasted the first section from here into my WORKSPACE file and >>>>>>>>> changed the artifacts to the ones I needed. (How to find the right >>>>>>>>> ones remains confusing.) >>>>>>>>> >>>>>>>>> After that I updated my BUILD rules and Blaze had easy and >>>>>>>>> straightforward configs for it, all I needed was this: >>>>>>>>> >>>>>>>>> # From >>>>>>>>> https://github.com/google/bazel-common/blob/master/third_party/java/auto/BUILD >>>>>>>>> . >>>>>>>>> >>>>>>>>> # The auto service is what registers our Registrar class, and it >>>>>>>>> needs to be a plugin which >>>>>>>>> >>>>>>>>> # makes it run at compile-time. >>>>>>>>> >>>>>>>>> java_plugin( >>>>>>>>> >>>>>>>>> name = "auto_service_processor", >>>>>>>>> >>>>>>>>> processor_class = >>>>>>>>> "com.google.auto.service.processor.AutoServiceProcessor", >>>>>>>>> >>>>>>>>> deps = [ >>>>>>>>> >>>>>>>>> "@maven//:com_google_auto_service_auto_service", >>>>>>>>> >>>>>>>>> "@maven//:com_google_auto_service_auto_service_annotations", >>>>>>>>> >>>>>>>>> "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre", >>>>>>>>> >>>>>>>>> ], >>>>>>>>> >>>>>>>>> ) >>>>>>>>> >>>>>>>>> >>>>>>>>> java_binary( >>>>>>>>> >>>>>>>>> name = "java_hbase", >>>>>>>>> >>>>>>>>> main_class = "energy.camus.beam.BigtableRegistrar", >>>>>>>>> >>>>>>>>> plugins = [":auto_service_processor"], >>>>>>>>> >>>>>>>>> srcs = ["src/main/java/energy/camus/beam/BigtableRegistrar.java"], >>>>>>>>> >>>>>>>>> deps = [ >>>>>>>>> >>>>>>>>> "@maven//:com_google_auto_service_auto_service", >>>>>>>>> >>>>>>>>> "@maven//:com_google_auto_service_auto_service_annotations", >>>>>>>>> >>>>>>>>> "@maven//:com_google_cloud_bigtable_bigtable_hbase_beam", >>>>>>>>> >>>>>>>>> "@maven//:org_apache_beam_beam_sdks_java_core", >>>>>>>>> >>>>>>>>> "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre", >>>>>>>>> >>>>>>>>> "@maven//:org_apache_hbase_hbase_shaded_client", >>>>>>>>> >>>>>>>>> ], >>>>>>>>> >>>>>>>>> ) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Dec 29, 2022 at 2:43 PM Luke Cwik <lc...@google.com> wrote: >>>>>>>>>> >>>>>>>>>> AutoService relies on Java's compiler annotation processor. >>>>>>>>>> https://github.com/google/auto/tree/main/service#getting-started >>>>>>>>>> shows that you need to configure Java's compiler to use the >>>>>>>>>> annotation processors within AutoService. >>>>>>>>>> >>>>>>>>>> I saw this public gist that seemed to enable using the AutoService >>>>>>>>>> annotation processor with Bazel >>>>>>>>>> https://gist.github.com/jart/5333824b94cd706499a7bfa1e086ee00 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Dec 29, 2022 at 2:27 PM Lina Mårtensson via dev >>>>>>>>>> <dev@beam.apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>> That's good news about the direct runner, thanks! >>>>>>>>>>> >>>>>>>>>>> On Thu, Dec 29, 2022 at 2:02 PM Robert Bradshaw >>>>>>>>>>> <rober...@google.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jul 28, 2022 at 5:37 PM Chamikara Jayalath via dev >>>>>>>>>>>> <dev@beam.apache.org> wrote: >>>>>>>>>>>> > >>>>>>>>>>>> > On Thu, Jul 28, 2022 at 4:51 PM Lina Mårtensson >>>>>>>>>>>> > <lina@camus.energy> wrote: >>>>>>>>>>>> >> >>>>>>>>>>>> >> Thanks for the detailed answers! >>>>>>>>>>>> >> >>>>>>>>>>>> >> I totally get the points about development & maintenance cost, >>>>>>>>>>>> >> and, >>>>>>>>>>>> >> from a user perspective, about getting the performance right. >>>>>>>>>>>> >> >>>>>>>>>>>> >> I decided to try out the Spanner connector to get a sense of >>>>>>>>>>>> >> how well >>>>>>>>>>>> >> the x-language approach works in our world, since that's an >>>>>>>>>>>> >> existing >>>>>>>>>>>> >> x-language connector. >>>>>>>>>>>> >> Overall, it works and with minimal intervention as you say - it >>>>>>>>>>>> >> is >>>>>>>>>>>> >> very slow, though. >>>>>>>>>>>> >> I'm a little confused about "portable runners" - if I >>>>>>>>>>>> >> understand this >>>>>>>>>>>> >> correctly, this means we couldn't run with the DirectRunner >>>>>>>>>>>> >> anymore if >>>>>>>>>>>> >> using an x-language connector? (At least it didn't work when I >>>>>>>>>>>> >> tried >>>>>>>>>>>> >> it.) >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > You'll have to use the portable DirectRunner - >>>>>>>>>>>> > https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/portability >>>>>>>>>>>> > >>>>>>>>>>>> > Job service for this can be started using following command: >>>>>>>>>>>> > python apache_beam/runners/portability/local_job_service_main.py >>>>>>>>>>>> > -p <port> >>>>>>>>>>>> >>>>>>>>>>>> Note that the Python direct runner is already a portable runner, so >>>>>>>>>>>> you shouldn't have to do anything special (like start up a separate >>>>>>>>>>>> job service and pass extra options) to run locally. Just use the >>>>>>>>>>>> cross-language transforms as you would any normal Python transform. >>>>>>>>>>>> >>>>>>>>>>>> The goal is to make this as smooth and transparent as possible; >>>>>>>>>>>> please >>>>>>>>>>>> keep coming back to us if you find rough edges.