I was not able to get the local runner to work yet, but at least I have a better idea of what the error seems to be...
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting control server on port 46401 INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting data server on port 44029 INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting state server on port 37569 INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting logging server on port 37833 INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.DockerSdkWorkerHandler object at 0xffff765b1100> for environment external_1beam:env:docker:v1 (beam:env:docker:v1, b'\n\x1dapache/beam_java11_sdk:2.39.0') INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Attempting to pull image apache/beam_java11_sdk:2.39.0 2.39.0: Pulling from apache/beam_java11_sdk Digest: sha256:69a2f3bbc7713b6f8ef3f0d268648fde11e8f162190302bf98195037d17a3546 Status: Image is up to date for apache/beam_java11_sdk:2.39.0 docker.io/apache/beam_java11_sdk:2.39.0 E0119 07:20:27.722846632 316 fork_posix.cc:76] Other threads are currently calling into gRPC, skipping fork() handlers WARNING: The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Waiting for docker to start up. Current status is running INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Docker container is running. container_id = b'e50a1c26bbfc7cce27395b99cbe3448c3f7cfb9acdefeff3a531e1e6db6d9ffc', worker_id = worker_0 E0119 07:20:30.379257675 891 fork_posix.cc:76] Other threads are currently calling into gRPC, skipping fork() handlers INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100 INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0xffff6fcd2a60> for environment ref_Environment_default_environment_2 (beam:env:embedded_python:v1, b'') Note: The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested And if I look at the container that was started for the Beam Java SDK, it says: 2023/01/18 18:56:35 Failed to obtain provisioning information: failed to dial server at localhost:37255 caused by: context deadline exceeded We develop on our Mac laptops (in my case with the M1 chip, which has all sorts of fun side effects!), but in a Docker container that emulates Linux. So naturally, that would mean that the Beam Java SDK for linux/amd64 would be requested... but then we're out of the dev container, and back to arm64. (I think. I'm very new to Docker!) Not sure how to fix it though, if this is indeed the problem. Thanks! -Lina On Thu, Jan 12, 2023 at 5:22 PM Robert Bradshaw <rober...@google.com> wrote: > Were you ever able to get the local runner to work? If not, some more > context on the errors would be useful. > > On Tue, Jan 10, 2023 at 10:00 AM Lina Mårtensson <lina@camus.energy> > wrote: > > > > Thanks! Moving my DoFn into a new module worked, and that solved the > slowness as well. > > I tried importing it in setup() as well, but that didn't work. > > > > On Fri, Jan 6, 2023 at 2:25 PM Luke Cwik <lc...@google.com> wrote: > >> > >> The proto (java) -> bytes -> proto (python) sounds good. > >> > >> Have you tried moving your DoFn outside of your main module into a new > module as per [1]. Other suggestions are to do the import in the function. > Can you do the import once in the setup()[2] function? Have you considered > using the cloud profiler[3] to see what is actually slow? > >> > >> 1: > https://stackoverflow.com/questions/69436706/nameerror-name-beam-is-not-defined-in-lambda > >> 2: > https://github.com/apache/beam/blob/f9d5de34ae1dad251f5580073c0245a206224a69/sdks/python/apache_beam/transforms/core.py#L670 > >> 3: > https://cloud.google.com/dataflow/docs/guides/profiling-a-pipeline#python > >> > >> > >> On Fri, Jan 6, 2023 at 11:19 AM Lina Mårtensson <lina@camus.energy> > wrote: > >>> > >>> I am *so close* it seems. ;) > >>> > >>> I followed Luke's advice and am reading the proto > com.google.bigtable.v2.Row, then use a transform to convert that to bytes > in order to be able to send it across to Python. (I assume that's what I > should be doing with the proto?) > >>> Once on the Python side, when running on Dataflow, I'm running into > the dreaded NameError. > >>> save_main_session is True. > >>> > >>> Either > >>> from google.cloud.bigtable_v2.types import Row > >>> ... > >>> class ParsePB(beam.DoFn): > >>> def process(self, pb_bytes): > >>> row = Row() > >>> row.ParseFromString(pb_bytes) > >>> > >>> or > >>> > >>> from google.cloud.bigtable_v2.proto import data_pb2 as data_v2_pb2 > >>> ... > >>> class ParsePB(beam.DoFn): > >>> def process(self, pb_bytes): > >>> row = Row() > >>> row.ParseFromString(pb_bytes) > >>> > >>> works in the DirectRunner (if I skip the Java connection and fake > input data), but not on Dataflow. > >>> It works if I put the import in the process() function, although then > running the code is super slow. (I'm not sure why, but running an import on > every entry definitely sounds like it could cause that!) > >>> > >>> (I still have issues with the DirectRunner, as per my previous email.) > >>> > >>> Is there a good way to get around this? > >>> > >>> Thanks! > >>> -Lina > >>> > >>> On Thu, Jan 5, 2023 at 4:49 PM Lina Mårtensson <lina@camus.energy> > wrote: > >>>> > >>>> Great, thanks! That was a huge improvement. > >>>> > >>>> > >>>> On Thu, Jan 5, 2023 at 12:52 PM Luke Cwik <lc...@google.com> wrote: > >>>>> > >>>>> By default Beam Java only uploads artifacts that have changed but it > looks like this is not the case for Beam Python and you need to explicitly > opt in with the --enable_artifact_caching flag[1]. > >>>>> > >>>>> It looks like this feature was added 1 year ago[2], should we make > this on by default? > >>>>> > >>>>> 1: > https://github.com/apache/beam/blob/3070160203c6734da0eb04b440e08b43f9fd33f3/sdks/python/apache_beam/options/pipeline_options.py#L794 > >>>>> 2: https://github.com/apache/beam/pull/16229 > >>>>> > >>>>> > >>>>> > >>>>> On Thu, Jan 5, 2023 at 11:43 AM Lina Mårtensson <lina@camus.energy> > wrote: > >>>>>> > >>>>>> Thanks! I have now successfully written a beautiful string of > protobuf bytes into a file via Python. 🎉 > >>>>>> > >>>>>> Two issues though: > >>>>>> 1. Robert said the Python direct runner would just work with this - > but it's not working. After about half an hour of these messages repeated > over and over again I interrupted the job: > >>>>>> > >>>>>> E0105 07:25:48.170601677 58210 fork_posix.cc:76] Other > threads are currently calling into gRPC, skipping fork() handlers > >>>>>> > >>>>>> > INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'2023/01/05 > 06:57:10 Failed to obtain provisioning information: failed to dial server > at localhost:41087\n\tcaused by:\ncontext deadline exceeded\n' > >>>>>> > >>>>>> 2. I (unsurprisingly) get back to the issue I had when I tested out > the Spanner x-lang transform on Dataflow - the overhead for starting a job > is unbearably slow, the time mainly spent in transferring the expansion > service jar (115 MB) + my jar (105 MB) with my new code and its > dependencies: > >>>>>> > >>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS > upload to > gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar... > >>>>>> > >>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS > upload to > gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar > in 321 seconds. > >>>>>> > >>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS > upload to > gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar... > >>>>>> > >>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS > upload to > gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar > in 295 seconds. > >>>>>> > >>>>>> I have a total of 13 minutes until any workers have started on > Dataflow, then another 4.5 minutes once the job actually does anything > (which eventually is to read a whopping 3 cells from Bigtable ;). > >>>>>> > >>>>>> How could this be improved? > >>>>>> For one, it seems to me like the upload of > sdks:java:io:google-cloud-platform:expansion-service:shadowJar from my > computer shouldn't be necessary - shouldn't Dataflow have that > already/could it be fetched by Dataflow rather than having to upload it > over slow internet? > >>>>>> And what about my own jar - it's not bound to change very often, so > would it be possible to upload somewhere and then fetch it from there? > >>>>>> > >>>>>> Thanks! > >>>>>> -Lina > >>>>>> > >>>>>> On Tue, Jan 3, 2023 at 1:23 PM Luke Cwik <lc...@google.com> wrote: > >>>>>>> > >>>>>>> I would suggest using BigtableIO which also returns a protobuf > com.google.bigtable.v2.Row. This should allow you to replicate what > SpannerIO is doing. > >>>>>>> > >>>>>>> Alternatively you could provide a way to convert the HBase result > into a Beam row by specifying a converter and a schema for it and then you > could use the already well known Beam Schema type: > >>>>>>> > https://github.com/apache/beam/blob/0b8f0b4db7a0de4977e30bcfeb50b5c14c7c1572/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1068 > >>>>>>> > >>>>>>> Otherwise you'll have to register the HBase result coder with a > well known name so that the runner API coder URN is something that you know > and then on the Python side you would need a coder for that URN as well > allow you to understand the bytes being sent across from the Java portion > of the pipeline. > >>>>>>> > >>>>>>> On Fri, Dec 30, 2022 at 12:59 AM Lina Mårtensson <lina@camus.energy> > wrote: > >>>>>>>> > >>>>>>>> And next issue... I'm getting KeyError: 'beam:coders:javasdk:0.1' > which I learned is because the transform is trying to return something that > there isn't a standard Beam coder for. > >>>>>>>> Makes sense, but... how do I fix this? The documentation talks > about how to do this for the input, but not for the output. > >>>>>>>> > >>>>>>>> Comparing to Spanner, it looks like Spanner returns a protobuf, > which I'm guessing somehow gets converted to bytes... But CloudBigtableIO > returns org.apache.hadoop.hbase.client.Result. > >>>>>>>> > >>>>>>>> My buildExternal method looks like follows: > >>>>>>>> > >>>>>>>> @Override > >>>>>>>> > >>>>>>>> public PTransform<PBegin, PCollection<Result>> > buildExternal( > >>>>>>>> > >>>>>>>> BigtableReadBuilder.Configuration configuration) { > >>>>>>>> > >>>>>>>> return Read.from(CloudBigtableIO.read( > >>>>>>>> > >>>>>>>> new CloudBigtableScanConfiguration.Builder() > >>>>>>>> > >>>>>>>> .withProjectId(configuration.projectId) > >>>>>>>> > >>>>>>>> .withInstanceId(configuration.instanceId) > >>>>>>>> > >>>>>>>> .withTableId(configuration.tableId) > >>>>>>>> > >>>>>>>> .build() > >>>>>>>> > >>>>>>>> )); > >>>>>>>> > >>>>>>>> > >>>>>>>> I also got a warning, which I *believe* is unrelated (but also an > issue): > >>>>>>>> > >>>>>>>> INFO:apache_beam.utils.subprocess_server:b"WARNING: Configuration > class > 'energy.camus.beam.BigtableRegistrar$BigtableReadBuilder$Configuration' has > no schema registered. Attempting to construct with setter approach." > >>>>>>>> > >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'Dec 30, 2022 7:46:14 > AM > org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader > payloadToConfig' > >>>>>>>> > >>>>>>>> What is this schema and what should it look like? > >>>>>>>> > >>>>>>>> Thanks! > >>>>>>>> -Lina > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Fri, Dec 30, 2022 at 12:28 AM Lina Mårtensson > <lina@camus.energy> wrote: > >>>>>>>>> > >>>>>>>>> Thanks! This was really helpful. It took a while to figure out > the details - a section in the docs on what's required of these jars for > non-Java users would be a great addition. > >>>>>>>>> > >>>>>>>>> But once I did, the Bazel config was actually quite > straightforward and makes sense. > >>>>>>>>> I pasted the first section from here into my WORKSPACE file and > changed the artifacts to the ones I needed. (How to find the right ones > remains confusing.) > >>>>>>>>> > >>>>>>>>> After that I updated my BUILD rules and Blaze had easy and > straightforward configs for it, all I needed was this: > >>>>>>>>> > >>>>>>>>> # From > https://github.com/google/bazel-common/blob/master/third_party/java/auto/BUILD > . > >>>>>>>>> > >>>>>>>>> # The auto service is what registers our Registrar class, and it > needs to be a plugin which > >>>>>>>>> > >>>>>>>>> # makes it run at compile-time. > >>>>>>>>> > >>>>>>>>> java_plugin( > >>>>>>>>> > >>>>>>>>> name = "auto_service_processor", > >>>>>>>>> > >>>>>>>>> processor_class = > "com.google.auto.service.processor.AutoServiceProcessor", > >>>>>>>>> > >>>>>>>>> deps = [ > >>>>>>>>> > >>>>>>>>> "@maven//:com_google_auto_service_auto_service", > >>>>>>>>> > >>>>>>>>> > "@maven//:com_google_auto_service_auto_service_annotations", > >>>>>>>>> > >>>>>>>>> "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre", > >>>>>>>>> > >>>>>>>>> ], > >>>>>>>>> > >>>>>>>>> ) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> java_binary( > >>>>>>>>> > >>>>>>>>> name = "java_hbase", > >>>>>>>>> > >>>>>>>>> main_class = "energy.camus.beam.BigtableRegistrar", > >>>>>>>>> > >>>>>>>>> plugins = [":auto_service_processor"], > >>>>>>>>> > >>>>>>>>> srcs = > ["src/main/java/energy/camus/beam/BigtableRegistrar.java"], > >>>>>>>>> > >>>>>>>>> deps = [ > >>>>>>>>> > >>>>>>>>> "@maven//:com_google_auto_service_auto_service", > >>>>>>>>> > >>>>>>>>> > "@maven//:com_google_auto_service_auto_service_annotations", > >>>>>>>>> > >>>>>>>>> "@maven//:com_google_cloud_bigtable_bigtable_hbase_beam", > >>>>>>>>> > >>>>>>>>> "@maven//:org_apache_beam_beam_sdks_java_core", > >>>>>>>>> > >>>>>>>>> "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre", > >>>>>>>>> > >>>>>>>>> "@maven//:org_apache_hbase_hbase_shaded_client", > >>>>>>>>> > >>>>>>>>> ], > >>>>>>>>> > >>>>>>>>> ) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Thu, Dec 29, 2022 at 2:43 PM Luke Cwik <lc...@google.com> > wrote: > >>>>>>>>>> > >>>>>>>>>> AutoService relies on Java's compiler annotation processor. > https://github.com/google/auto/tree/main/service#getting-started shows > that you need to configure Java's compiler to use the annotation processors > within AutoService. > >>>>>>>>>> > >>>>>>>>>> I saw this public gist that seemed to enable using the > AutoService annotation processor with Bazel > https://gist.github.com/jart/5333824b94cd706499a7bfa1e086ee00 > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Thu, Dec 29, 2022 at 2:27 PM Lina Mårtensson via dev < > dev@beam.apache.org> wrote: > >>>>>>>>>>> > >>>>>>>>>>> That's good news about the direct runner, thanks! > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Dec 29, 2022 at 2:02 PM Robert Bradshaw < > rober...@google.com> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> On Thu, Jul 28, 2022 at 5:37 PM Chamikara Jayalath via dev > >>>>>>>>>>>> <dev@beam.apache.org> wrote: > >>>>>>>>>>>> > > >>>>>>>>>>>> > On Thu, Jul 28, 2022 at 4:51 PM Lina Mårtensson > <lina@camus.energy> wrote: > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> Thanks for the detailed answers! > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> I totally get the points about development & maintenance > cost, and, > >>>>>>>>>>>> >> from a user perspective, about getting the performance > right. > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> I decided to try out the Spanner connector to get a sense > of how well > >>>>>>>>>>>> >> the x-language approach works in our world, since that's > an existing > >>>>>>>>>>>> >> x-language connector. > >>>>>>>>>>>> >> Overall, it works and with minimal intervention as you say > - it is > >>>>>>>>>>>> >> very slow, though. > >>>>>>>>>>>> >> I'm a little confused about "portable runners" - if I > understand this > >>>>>>>>>>>> >> correctly, this means we couldn't run with the > DirectRunner anymore if > >>>>>>>>>>>> >> using an x-language connector? (At least it didn't work > when I tried > >>>>>>>>>>>> >> it.) > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > You'll have to use the portable DirectRunner - > https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/portability > >>>>>>>>>>>> > > >>>>>>>>>>>> > Job service for this can be started using following command: > >>>>>>>>>>>> > python > apache_beam/runners/portability/local_job_service_main.py -p <port> > >>>>>>>>>>>> > >>>>>>>>>>>> Note that the Python direct runner is already a portable > runner, so > >>>>>>>>>>>> you shouldn't have to do anything special (like start up a > separate > >>>>>>>>>>>> job service and pass extra options) to run locally. Just use > the > >>>>>>>>>>>> cross-language transforms as you would any normal Python > transform. > >>>>>>>>>>>> > >>>>>>>>>>>> The goal is to make this as smooth and transparent as > possible; please > >>>>>>>>>>>> keep coming back to us if you find rough edges. >