Were you ever able to get the local runner to work? If not, some more
context on the errors would be useful.

On Tue, Jan 10, 2023 at 10:00 AM Lina Mårtensson <lina@camus.energy> wrote:
>
> Thanks! Moving my DoFn into a new module worked, and that solved the slowness 
> as well.
> I tried importing it in setup() as well, but that didn't work.
>
> On Fri, Jan 6, 2023 at 2:25 PM Luke Cwik <lc...@google.com> wrote:
>>
>> The proto (java) -> bytes -> proto (python) sounds good.
>>
>> Have you tried moving your DoFn outside of your main module into a new 
>> module as per [1]. Other suggestions are to do the import in the function. 
>> Can you do the import once in the setup()[2] function? Have you considered 
>> using the cloud profiler[3] to see what is actually slow?
>>
>> 1: 
>> https://stackoverflow.com/questions/69436706/nameerror-name-beam-is-not-defined-in-lambda
>> 2: 
>> https://github.com/apache/beam/blob/f9d5de34ae1dad251f5580073c0245a206224a69/sdks/python/apache_beam/transforms/core.py#L670
>> 3: https://cloud.google.com/dataflow/docs/guides/profiling-a-pipeline#python
>>
>>
>> On Fri, Jan 6, 2023 at 11:19 AM Lina Mårtensson <lina@camus.energy> wrote:
>>>
>>> I am *so close* it seems. ;)
>>>
>>> I followed Luke's advice and am reading the proto 
>>> com.google.bigtable.v2.Row, then use a transform to convert that to bytes 
>>> in order to be able to send it across to Python. (I assume that's what I 
>>> should be doing with the proto?)
>>> Once on the Python side, when running on Dataflow, I'm running into the 
>>> dreaded NameError.
>>> save_main_session is True.
>>>
>>> Either
>>> from google.cloud.bigtable_v2.types import Row
>>> ...
>>> class ParsePB(beam.DoFn):
>>>     def process(self, pb_bytes):
>>>         row = Row()
>>>         row.ParseFromString(pb_bytes)
>>>
>>> or
>>>
>>> from google.cloud.bigtable_v2.proto import data_pb2 as data_v2_pb2
>>> ...
>>> class ParsePB(beam.DoFn):
>>>     def process(self, pb_bytes):
>>>         row = Row()
>>>         row.ParseFromString(pb_bytes)
>>>
>>> works in the DirectRunner (if I skip the Java connection and fake input 
>>> data), but not on Dataflow.
>>> It works if I put the import in the process() function, although then 
>>> running the code is super slow. (I'm not sure why, but running an import on 
>>> every entry definitely sounds like it could cause that!)
>>>
>>> (I still have issues with the DirectRunner, as per my previous email.)
>>>
>>> Is there a good way to get around this?
>>>
>>> Thanks!
>>> -Lina
>>>
>>> On Thu, Jan 5, 2023 at 4:49 PM Lina Mårtensson <lina@camus.energy> wrote:
>>>>
>>>> Great, thanks! That was a huge improvement.
>>>>
>>>>
>>>> On Thu, Jan 5, 2023 at 12:52 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>> By default Beam Java only uploads artifacts that have changed but it 
>>>>> looks like this is not the case for Beam Python and you need to 
>>>>> explicitly opt in with the --enable_artifact_caching flag[1].
>>>>>
>>>>> It looks like this feature was added 1 year ago[2], should we make this 
>>>>> on by default?
>>>>>
>>>>> 1: 
>>>>> https://github.com/apache/beam/blob/3070160203c6734da0eb04b440e08b43f9fd33f3/sdks/python/apache_beam/options/pipeline_options.py#L794
>>>>> 2: https://github.com/apache/beam/pull/16229
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jan 5, 2023 at 11:43 AM Lina Mårtensson <lina@camus.energy> wrote:
>>>>>>
>>>>>> Thanks! I have now successfully written a beautiful string of protobuf 
>>>>>> bytes into a file via Python. 🎉
>>>>>>
>>>>>> Two issues though:
>>>>>> 1. Robert said the Python direct runner would just work with this - but 
>>>>>> it's not working. After about half an hour of these messages repeated 
>>>>>> over and over again I interrupted the job:
>>>>>>
>>>>>> E0105 07:25:48.170601677   58210 fork_posix.cc:76]           Other 
>>>>>> threads are currently calling into gRPC, skipping fork() handlers
>>>>>>
>>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'2023/01/05
>>>>>>  06:57:10 Failed to obtain provisioning information: failed to dial 
>>>>>> server at localhost:41087\n\tcaused by:\ncontext deadline exceeded\n'
>>>>>>
>>>>>> 2. I (unsurprisingly) get back to the issue I had when I tested out the 
>>>>>> Spanner x-lang transform on Dataflow - the overhead for starting a job 
>>>>>> is unbearably slow, the time mainly spent in transferring the expansion 
>>>>>> service jar (115 MB) + my jar (105 MB) with my new code and its 
>>>>>> dependencies:
>>>>>>
>>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload 
>>>>>> to 
>>>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar...
>>>>>>
>>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS 
>>>>>> upload to 
>>>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar
>>>>>>  in 321 seconds.
>>>>>>
>>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload 
>>>>>> to 
>>>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar...
>>>>>>
>>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS 
>>>>>> upload to 
>>>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar
>>>>>>  in 295 seconds.
>>>>>>
>>>>>> I have a total of 13 minutes until any workers have started on Dataflow, 
>>>>>> then another 4.5 minutes once the job actually does anything (which 
>>>>>> eventually is to read a whopping 3 cells from Bigtable ;).
>>>>>>
>>>>>> How could this be improved?
>>>>>> For one, it seems to me like the upload of 
>>>>>> sdks:java:io:google-cloud-platform:expansion-service:shadowJar from my 
>>>>>> computer shouldn't be necessary - shouldn't Dataflow have that 
>>>>>> already/could it be fetched by Dataflow rather than having to upload it 
>>>>>> over slow internet?
>>>>>> And what about my own jar - it's not bound to change very often, so 
>>>>>> would it be possible to upload somewhere and then fetch it from there?
>>>>>>
>>>>>> Thanks!
>>>>>> -Lina
>>>>>>
>>>>>> On Tue, Jan 3, 2023 at 1:23 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>> I would suggest using BigtableIO which also returns a protobuf 
>>>>>>> com.google.bigtable.v2.Row. This should allow you to replicate what 
>>>>>>> SpannerIO is doing.
>>>>>>>
>>>>>>> Alternatively you could provide a way to convert the HBase result into 
>>>>>>> a Beam row by specifying a converter and a schema for it and then you 
>>>>>>> could use the already well known Beam Schema type:
>>>>>>> https://github.com/apache/beam/blob/0b8f0b4db7a0de4977e30bcfeb50b5c14c7c1572/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1068
>>>>>>>
>>>>>>> Otherwise you'll have to register the HBase result coder with a well 
>>>>>>> known name so that the runner API coder URN is something that you know 
>>>>>>> and then on the Python side you would need a coder for that URN as well 
>>>>>>> allow you to understand the bytes being sent across from the Java 
>>>>>>> portion of the pipeline.
>>>>>>>
>>>>>>> On Fri, Dec 30, 2022 at 12:59 AM Lina Mårtensson <lina@camus.energy> 
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> And next issue... I'm getting KeyError: 'beam:coders:javasdk:0.1' 
>>>>>>>> which I learned is because the transform is trying to return something 
>>>>>>>> that there isn't a standard Beam coder for.
>>>>>>>> Makes sense, but... how do I fix this? The documentation talks about 
>>>>>>>> how to do this for the input, but not for the output.
>>>>>>>>
>>>>>>>> Comparing to Spanner, it looks like Spanner returns a protobuf, which 
>>>>>>>> I'm guessing somehow gets converted to bytes... But CloudBigtableIO 
>>>>>>>> returns org.apache.hadoop.hbase.client.Result.
>>>>>>>>
>>>>>>>> My buildExternal method looks like follows:
>>>>>>>>
>>>>>>>>         @Override
>>>>>>>>
>>>>>>>>         public PTransform<PBegin, PCollection<Result>> buildExternal(
>>>>>>>>
>>>>>>>>                 BigtableReadBuilder.Configuration configuration) {
>>>>>>>>
>>>>>>>>             return Read.from(CloudBigtableIO.read(
>>>>>>>>
>>>>>>>>                 new CloudBigtableScanConfiguration.Builder()
>>>>>>>>
>>>>>>>>                     .withProjectId(configuration.projectId)
>>>>>>>>
>>>>>>>>                     .withInstanceId(configuration.instanceId)
>>>>>>>>
>>>>>>>>                     .withTableId(configuration.tableId)
>>>>>>>>
>>>>>>>>                     .build()
>>>>>>>>
>>>>>>>>             ));
>>>>>>>>
>>>>>>>>
>>>>>>>> I also got a warning, which I *believe* is unrelated (but also an 
>>>>>>>> issue):
>>>>>>>>
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b"WARNING: Configuration 
>>>>>>>> class 
>>>>>>>> 'energy.camus.beam.BigtableRegistrar$BigtableReadBuilder$Configuration'
>>>>>>>>  has no schema registered. Attempting to construct with setter 
>>>>>>>> approach."
>>>>>>>>
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'Dec 30, 2022 7:46:14 AM 
>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
>>>>>>>>  payloadToConfig'
>>>>>>>>
>>>>>>>> What is this schema and what should it look like?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> -Lina
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Dec 30, 2022 at 12:28 AM Lina Mårtensson <lina@camus.energy> 
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Thanks! This was really helpful. It took a while to figure out the 
>>>>>>>>> details - a section in the docs on what's required of these jars for 
>>>>>>>>> non-Java users would be a great addition.
>>>>>>>>>
>>>>>>>>> But once I did, the Bazel config was actually quite straightforward 
>>>>>>>>> and makes sense.
>>>>>>>>> I pasted the first section from here into my WORKSPACE file and 
>>>>>>>>> changed the artifacts to the ones I needed. (How to find the right 
>>>>>>>>> ones remains confusing.)
>>>>>>>>>
>>>>>>>>> After that I updated my BUILD rules and Blaze had easy and 
>>>>>>>>> straightforward configs for it, all I needed was this:
>>>>>>>>>
>>>>>>>>> # From 
>>>>>>>>> https://github.com/google/bazel-common/blob/master/third_party/java/auto/BUILD
>>>>>>>>>  .
>>>>>>>>>
>>>>>>>>> # The auto service is what registers our Registrar class, and it 
>>>>>>>>> needs to be a plugin which
>>>>>>>>>
>>>>>>>>> # makes it run at compile-time.
>>>>>>>>>
>>>>>>>>> java_plugin(
>>>>>>>>>
>>>>>>>>>     name = "auto_service_processor",
>>>>>>>>>
>>>>>>>>>     processor_class = 
>>>>>>>>> "com.google.auto.service.processor.AutoServiceProcessor",
>>>>>>>>>
>>>>>>>>>     deps = [
>>>>>>>>>
>>>>>>>>>         "@maven//:com_google_auto_service_auto_service",
>>>>>>>>>
>>>>>>>>>         "@maven//:com_google_auto_service_auto_service_annotations",
>>>>>>>>>
>>>>>>>>>         "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre",
>>>>>>>>>
>>>>>>>>>     ],
>>>>>>>>>
>>>>>>>>> )
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> java_binary(
>>>>>>>>>
>>>>>>>>>     name = "java_hbase",
>>>>>>>>>
>>>>>>>>>     main_class = "energy.camus.beam.BigtableRegistrar",
>>>>>>>>>
>>>>>>>>>     plugins = [":auto_service_processor"],
>>>>>>>>>
>>>>>>>>>     srcs = ["src/main/java/energy/camus/beam/BigtableRegistrar.java"],
>>>>>>>>>
>>>>>>>>>     deps = [
>>>>>>>>>
>>>>>>>>>         "@maven//:com_google_auto_service_auto_service",
>>>>>>>>>
>>>>>>>>>         "@maven//:com_google_auto_service_auto_service_annotations",
>>>>>>>>>
>>>>>>>>>         "@maven//:com_google_cloud_bigtable_bigtable_hbase_beam",
>>>>>>>>>
>>>>>>>>>         "@maven//:org_apache_beam_beam_sdks_java_core",
>>>>>>>>>
>>>>>>>>>         "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre",
>>>>>>>>>
>>>>>>>>>         "@maven//:org_apache_hbase_hbase_shaded_client",
>>>>>>>>>
>>>>>>>>>     ],
>>>>>>>>>
>>>>>>>>> )
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Dec 29, 2022 at 2:43 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>> AutoService relies on Java's compiler annotation processor. 
>>>>>>>>>> https://github.com/google/auto/tree/main/service#getting-started 
>>>>>>>>>> shows that you need to configure Java's compiler to use the 
>>>>>>>>>> annotation processors within AutoService.
>>>>>>>>>>
>>>>>>>>>> I saw this public gist that seemed to enable using the AutoService 
>>>>>>>>>> annotation processor with Bazel 
>>>>>>>>>> https://gist.github.com/jart/5333824b94cd706499a7bfa1e086ee00
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Dec 29, 2022 at 2:27 PM Lina Mårtensson via dev 
>>>>>>>>>> <dev@beam.apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>> That's good news about the direct runner, thanks!
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Dec 29, 2022 at 2:02 PM Robert Bradshaw 
>>>>>>>>>>> <rober...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jul 28, 2022 at 5:37 PM Chamikara Jayalath via dev
>>>>>>>>>>>> <dev@beam.apache.org> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> > On Thu, Jul 28, 2022 at 4:51 PM Lina Mårtensson 
>>>>>>>>>>>> > <lina@camus.energy> wrote:
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> Thanks for the detailed answers!
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> I totally get the points about development & maintenance cost, 
>>>>>>>>>>>> >> and,
>>>>>>>>>>>> >> from a user perspective, about getting the performance right.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> I decided to try out the Spanner connector to get a sense of 
>>>>>>>>>>>> >> how well
>>>>>>>>>>>> >> the x-language approach works in our world, since that's an 
>>>>>>>>>>>> >> existing
>>>>>>>>>>>> >> x-language connector.
>>>>>>>>>>>> >> Overall, it works and with minimal intervention as you say - it 
>>>>>>>>>>>> >> is
>>>>>>>>>>>> >> very slow, though.
>>>>>>>>>>>> >> I'm a little confused about "portable runners" - if I 
>>>>>>>>>>>> >> understand this
>>>>>>>>>>>> >> correctly, this means we couldn't run with the DirectRunner 
>>>>>>>>>>>> >> anymore if
>>>>>>>>>>>> >> using an x-language connector? (At least it didn't work when I 
>>>>>>>>>>>> >> tried
>>>>>>>>>>>> >> it.)
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > You'll have to use the portable DirectRunner - 
>>>>>>>>>>>> > https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/portability
>>>>>>>>>>>> >
>>>>>>>>>>>> > Job service for this can be started using following command:
>>>>>>>>>>>> > python apache_beam/runners/portability/local_job_service_main.py 
>>>>>>>>>>>> > -p <port>
>>>>>>>>>>>>
>>>>>>>>>>>> Note that the Python direct runner is already a portable runner, so
>>>>>>>>>>>> you shouldn't have to do anything special (like start up a separate
>>>>>>>>>>>> job service and pass extra options) to run locally. Just use the
>>>>>>>>>>>> cross-language transforms as you would any normal Python transform.
>>>>>>>>>>>>
>>>>>>>>>>>> The goal is to make this as smooth and transparent as possible; 
>>>>>>>>>>>> please
>>>>>>>>>>>> keep coming back to us if you find rough edges.

Reply via email to