Re: Running apache_beam python sdk without c/c++ libs
Most runners are written in Java while others are cloud offerings which wouldn't work for your use case which limits you to use the direct runner (not meant for production/high performance applications). Beam Python SDK uses cython for performance reasons but I don't believe it strictly requires it as many unit tests run with and without cython enabled. Integrations between Beam and third party libraries may require it though so it likely depends on what you plan to do. On Wed, Jun 10, 2020 at 8:17 AM Noah Goodrich wrote: > I am looking at using the Beam Python SDK in AWS Glue but it doesn't > support non-native python libraries (anything that is c/c++ based). > > Is the Beam Python SDK / runners able to be used without any c/c++ library > dependencies? >
Re: KafkaIO Read Latency
Hi Talat, Could you elaborate what do you mean by “opening and closing bundle”? Sometimes, starting a KafkaReader can take time since it will seek for a start offset for each assigned partition but it happens only once at pipeline start-up and mostly depends on network conditions. > On 9 Jun 2020, at 23:05, Talat Uyarer wrote: > > Hi, > I added some metrics on a step right after KafkaIO. When I compare the read > time difference between producer and KafkaIO it is 800ms for P99. However > somehow that step's opening and closing bundle difference is 18 seconds for > p99. The step itself does not do any specific thing. Do you have any idea why > bundle latency is very high ? Where should I check or tune on KafkaIO ? > > Additional information I read from one topic. That topic has 15 partitions. > Producer write in a round robin fashion. > > Thanks
Re: KafkaIO Read Latency
Hi, I check the time when StartBundle is called and do the same thing for FinishBundle then take the difference between Start and Finish Bundle times and report bundle latency. I put this metric on a step(KafkaMessageExtractor) which is right after the KafkaIO step. I dont know if this is related, My pipeline has a Windowing function and GroupIntoBatches. Windowing duration is 10 seconds and batch size is 400. My Current traffic is 8kps. I changed the window duration 5 seconds and 20 seconds. But it does not affect much. KafkaIO -> KafkaMessageExtractor -> Windowing Function -> Sink .apply(Window.>into( FixedWindows.of(Duration.standardSeconds(windowDurationSeconds))) .triggering(Repeatedly.forever(AfterFirst.of( AfterPane.elementCountAtLeast((int) batchSize), AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(windowDurationSeconds ) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes()) .apply(GroupIntoBatches.ofSize(batchSize)) Thanks On Wed, Jun 10, 2020 at 8:37 AM Alexey Romanenko wrote: > Hi Talat, > > Could you elaborate what do you mean by “*opening and closing bundle*”? > > Sometimes, starting a KafkaReader can take time since it will seek for a > start offset for each assigned partition but it happens only once at > pipeline start-up and mostly depends on network conditions. > > On 9 Jun 2020, at 23:05, Talat Uyarer > wrote: > > Hi, > I added some metrics on a step right after KafkaIO. When I compare the > read time difference between producer and KafkaIO it is 800ms for P99. > However somehow that step's opening and closing bundle difference is 18 > seconds for p99. The step itself does not do any specific thing. Do you > have any idea why bundle latency is very high ? Where should I check or > tune on KafkaIO ? > > Additional information I read from one topic. That topic has 15 > partitions. Producer write in a round robin fashion. > > Thanks > > >
Re: Running apache_beam python sdk without c/c++ libs
I'm not sure. It depends on whether the Spark -> Beam Python integration will interfere with the magic built into AWS Glue. On Wed, Jun 10, 2020 at 8:57 AM Noah Goodrich wrote: > I was hoping to use the Spark runner since Glue is just Spark with some > magic on top. And in our specific use case, we'd be looking at working with > S3, Kinesis, and MySQL RDS. > > Sounds like this is a non-starter? > > On Wed, Jun 10, 2020 at 9:33 AM Luke Cwik wrote: > >> Most runners are written in Java while others are cloud offerings which >> wouldn't work for your use case which limits you to use the direct runner >> (not meant for production/high performance applications). Beam Python SDK >> uses cython for performance reasons but I don't believe it strictly >> requires it as many unit tests run with and without cython enabled. >> Integrations between Beam and third party libraries may require it though >> so it likely depends on what you plan to do. >> >> On Wed, Jun 10, 2020 at 8:17 AM Noah Goodrich >> wrote: >> >>> I am looking at using the Beam Python SDK in AWS Glue but it doesn't >>> support non-native python libraries (anything that is c/c++ based). >>> >>> Is the Beam Python SDK / runners able to be used without any c/c++ >>> library dependencies? >>> >>
Re: How to safely update jobs in-flight using Apache Beam on AWS EMR?
Hi! I found great docs about Apache Beam on Dataflow (which makes sense). I was not able to find this about AWS EMR. https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline https://medium.com/google-cloud/restarting-cloud-dataflow-in-flight-9c688c49adfd
[ANNOUNCE] Beam 2.22.0 Released
The Apache Beam team is pleased to announce the release of version 2.22.0. Apache Beam is an open source unified programming model to define and execute data processing pipelines, including ETL, batch and stream (continuous) processing. See https://beam.apache.org You can download the release here: https://beam.apache.org/get-started/downloads/ This release includes bug fixes, features, and improvements detailed on the Beam blog: https://beam.apache.org/blog/beam-2.22.0/ Thanks to everyone who contributed to this release, and we hope you enjoy using Beam 2.22.0. -- Brian Hulette, on behalf of The Apache Beam team
Re: How to safely update jobs in-flight using Apache Beam on AWS EMR?
The runner needs to support it and I'm not aware of an EMR runner for Apache Beam let alone one that supports pipeline update. Have you tried reaching out to AWS? On Wed, Jun 10, 2020 at 11:14 AM Dan Hill wrote: > Hi! I found great docs about Apache Beam on Dataflow (which makes > sense). I was not able to find this about AWS EMR. > > https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline > > > https://medium.com/google-cloud/restarting-cloud-dataflow-in-flight-9c688c49adfd >
Re: BEAM-2217 NotImplementedError - DataflowRunner parsing Protos from PubSub (Python SDK)
+Udi Meiri - do you have any suggestions to resolve this? It looks like there are a couple things going wrong: - ProtoCoder doesn't have a definition for to_type_hint - DataflowRunner calls from_runner_api, which I believe is a workaround we can eventually remove (+Robert Bradshaw ), which in turn tries to get type hints for every coder in the pipeline Brian On Tue, Jun 9, 2020 at 1:50 AM Lien Michiels wrote: > Hi Brian, > > Thanks so much for your quick response! > > I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the > exact same error. Here is the full stacktrace: > > (metadata-persistor) ➜ metadata-persistor git:(feature/DEV-1249) ✗ > metadata_persistor --project X --environments X --window_size 1 > --input_subscription > projects/XXX/subscriptions/mds-internal-item-metadata-subscription > --runner DataflowRunner --temp_location gs:///item-metadata/temp > --staging_location gs://XXX/item-metadata/staging > /Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431: > BeamDeprecationWarning: options is deprecated since First stable release. > References to .options will not be supported > experiments = p.options.view_as(DebugOptions).experiments or [] > WARNING:root:Make sure that locally built Python SDK docker image has > Python 3.7 interpreter. > Traceback (most recent call last): > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor", > line 8, in > sys.exit(run()) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py", > line 246, in run > batch_size=500, > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 524, in __exit__ > self.run().wait_until_finish() > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 497, in run > self._options).run(False) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 510, in run > return self.runner.run_pipeline(self, self._options) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", > line 484, in run_pipeline > allow_proto_holders=True) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 858, in from_runner_api > p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 103, in get_by_id > self._id_to_proto[id], self._pipeline_context) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 1238, in from_runner_api > part = context.transforms.get_by_id(transform_id) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 103, in get_by_id > self._id_to_proto[id], self._pipeline_context) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 1244, in from_runner_api > id in proto.outputs.items() > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 1244, in > id in proto.outputs.items() > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 103, in get_by_id > self._id_to_proto[id], self._pipeline_context) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py", > line 214, in from_runner_api > element_type=context.element_type_from_coder_id(proto.coder_id), > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 227, in element_type_from_coder_id > self.coders[coder_id].to_type_hint()) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py", > line 221, in to_type_hint > raise NotImplementedError('BEAM-2717') > NotImplementedError: BEAM-2717 > > When I was debugging and commenting out the different steps, I noticed the > location in my code that supposedly throws the error changes. Here it > complains about the WriteToBigQuery step (batch_size=500) but if I comment > out that step it just moves on to the one above. It appears it's > consistently thrown on the last run step (don't know if that's helpful, > just thought I'd mention it). > > After adding beam.typehints.d
Re: How to safely update jobs in-flight using Apache Beam on AWS EMR?
No. I just sent AWS Support a message. On Wed, Jun 10, 2020 at 1:00 PM Luke Cwik wrote: > The runner needs to support it and I'm not aware of an EMR runner for > Apache Beam let alone one that supports pipeline update. Have you tried > reaching out to AWS? > > On Wed, Jun 10, 2020 at 11:14 AM Dan Hill wrote: > >> Hi! I found great docs about Apache Beam on Dataflow (which makes >> sense). I was not able to find this about AWS EMR. >> >> https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline >> >> >> https://medium.com/google-cloud/restarting-cloud-dataflow-in-flight-9c688c49adfd >> >
Re: How to safely update jobs in-flight using Apache Beam on AWS EMR?
Hi Dan, AWS EMR generally runs Flink and/or Spark as supported Beam Runners. For EMR, you might want to check compatibility for versions of Beam/Flink can run, and the status of beam pipelines using either of those runners. On running Beam in AWS, had you seen: https://www.youtube.com/watch?v=eCgZRJqdt_I Cheers, Austin On Wed, Jun 10, 2020 at 2:02 PM Dan Hill wrote: > No. I just sent AWS Support a message. > > On Wed, Jun 10, 2020 at 1:00 PM Luke Cwik wrote: > >> The runner needs to support it and I'm not aware of an EMR runner for >> Apache Beam let alone one that supports pipeline update. Have you tried >> reaching out to AWS? >> >> On Wed, Jun 10, 2020 at 11:14 AM Dan Hill wrote: >> >>> Hi! I found great docs about Apache Beam on Dataflow (which makes >>> sense). I was not able to find this about AWS EMR. >>> >>> https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline >>> >>> >>> https://medium.com/google-cloud/restarting-cloud-dataflow-in-flight-9c688c49adfd >>> >>
Re: BEAM-2217 NotImplementedError - DataflowRunner parsing Protos from PubSub (Python SDK)
I don't believe you need to register a coder unless you create your own coder class. Since the coder has a proto_message_type field, it could return that in to_type_hint, but I'll defer to Robert since I'm unfamiliar with that area. As a temporary workaround, please try overriding the output type like so: Map(parse_message_blobs).with_output_types(typing.Any) That should prevent Beam from attempting to use the ProtoCoder. On Wed, Jun 10, 2020 at 1:47 PM Brian Hulette wrote: > +Udi Meiri - do you have any suggestions to resolve > this? > > It looks like there are a couple things going wrong: > - ProtoCoder doesn't have a definition for to_type_hint > - DataflowRunner calls from_runner_api, which I believe is a workaround we > can eventually remove (+Robert Bradshaw ), which in > turn tries to get type hints for every coder in the pipeline > > Brian > > On Tue, Jun 9, 2020 at 1:50 AM Lien Michiels > wrote: > >> Hi Brian, >> >> Thanks so much for your quick response! >> >> I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the >> exact same error. Here is the full stacktrace: >> >> (metadata-persistor) ➜ metadata-persistor git:(feature/DEV-1249) ✗ >> metadata_persistor --project X --environments X --window_size 1 >> --input_subscription >> projects/XXX/subscriptions/mds-internal-item-metadata-subscription >> --runner DataflowRunner --temp_location gs:///item-metadata/temp >> --staging_location gs://XXX/item-metadata/staging >> /Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431: >> BeamDeprecationWarning: options is deprecated since First stable release. >> References to .options will not be supported >> experiments = p.options.view_as(DebugOptions).experiments or [] >> WARNING:root:Make sure that locally built Python SDK docker image has >> Python 3.7 interpreter. >> Traceback (most recent call last): >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor", >> line 8, in >> sys.exit(run()) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py", >> line 246, in run >> batch_size=500, >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 524, in __exit__ >> self.run().wait_until_finish() >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 497, in run >> self._options).run(False) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 510, in run >> return self.runner.run_pipeline(self, self._options) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", >> line 484, in run_pipeline >> allow_proto_holders=True) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 858, in from_runner_api >> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 103, in get_by_id >> self._id_to_proto[id], self._pipeline_context) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 1238, in from_runner_api >> part = context.transforms.get_by_id(transform_id) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 103, in get_by_id >> self._id_to_proto[id], self._pipeline_context) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 1244, in from_runner_api >> id in proto.outputs.items() >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 1244, in >> id in proto.outputs.items() >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 103, in get_by_id >> self._id_to_proto[id], self._pipeline_context) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py", >> line 214, in from_runner_api >> element_type=context.element_type_from_coder_id(proto.coder_id), >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 227, in element_type_from_coder_id >> self.coders[coder_id].to_type_hint()) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/py
Re: How to safely update jobs in-flight using Apache Beam on AWS EMR?
Sweet. I have not seen that video. Cool. I'm curious about how well AWS's managed services (like the Kinesis Data Analytics managed Flink runner) handle the updates. I'd guess it is best effort from the saved state (if enabled). If this is all delegated by Beam to Flink, then this is more of a question for AWS. On Wed, Jun 10, 2020 at 2:18 PM Austin Bennett wrote: > Hi Dan, > > AWS EMR generally runs Flink and/or Spark as supported Beam Runners. For > EMR, you might want to check compatibility for versions of Beam/Flink can > run, and the status of beam pipelines using either of those runners. > > On running Beam in AWS, had you seen: > https://www.youtube.com/watch?v=eCgZRJqdt_I > > > > Cheers, > Austin > > On Wed, Jun 10, 2020 at 2:02 PM Dan Hill wrote: > >> No. I just sent AWS Support a message. >> >> On Wed, Jun 10, 2020 at 1:00 PM Luke Cwik wrote: >> >>> The runner needs to support it and I'm not aware of an EMR runner for >>> Apache Beam let alone one that supports pipeline update. Have you tried >>> reaching out to AWS? >>> >>> On Wed, Jun 10, 2020 at 11:14 AM Dan Hill wrote: >>> Hi! I found great docs about Apache Beam on Dataflow (which makes sense). I was not able to find this about AWS EMR. https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline https://medium.com/google-cloud/restarting-cloud-dataflow-in-flight-9c688c49adfd >>>