Re: Running apache_beam python sdk without c/c++ libs

2020-06-10 Thread Luke Cwik
Most runners are written in Java while others are cloud offerings which
wouldn't work for your use case which limits you to use the direct runner
(not meant for production/high performance applications). Beam Python SDK
uses cython for performance reasons but I don't believe it strictly
requires it as many unit tests run with and without cython enabled.
Integrations between Beam and third party libraries may require it though
so it likely depends on what you plan to do.

On Wed, Jun 10, 2020 at 8:17 AM Noah Goodrich  wrote:

> I am looking at using the Beam Python SDK in AWS Glue but it doesn't
> support non-native python libraries (anything that is c/c++ based).
>
> Is the Beam Python SDK / runners able to be used without any c/c++ library
> dependencies?
>


Re: KafkaIO Read Latency

2020-06-10 Thread Alexey Romanenko
Hi Talat,

Could you elaborate what do you mean by “opening and closing bundle”?

Sometimes, starting a KafkaReader can take time since it will seek for a start 
offset for each assigned partition but it happens only once at pipeline 
start-up and mostly depends on network conditions.

> On 9 Jun 2020, at 23:05, Talat Uyarer  wrote:
> 
> Hi,
> I added some metrics on a step right after KafkaIO. When I compare the read 
> time difference between producer and KafkaIO it is 800ms for P99. However 
> somehow that step's opening and closing bundle difference is 18 seconds for 
> p99. The step itself does not do any specific thing. Do you have any idea why 
> bundle latency is very high ? Where should I check or tune on KafkaIO ?
> 
> Additional information I read from one topic. That topic has 15 partitions. 
> Producer write in a round robin fashion. 
> 
> Thanks



Re: KafkaIO Read Latency

2020-06-10 Thread Talat Uyarer
Hi,

I check the time when StartBundle is called and do the same thing for
FinishBundle then take the difference between Start and Finish Bundle times
and report bundle latency. I put this metric on
a step(KafkaMessageExtractor) which is right after the KafkaIO step. I dont
know if this is related, My pipeline has a Windowing function and
GroupIntoBatches. Windowing duration is 10 seconds and batch size is 400.
My Current traffic is 8kps. I changed the window duration 5 seconds and 20
seconds. But it does not affect much.

KafkaIO -> KafkaMessageExtractor -> Windowing Function -> Sink

.apply(Window.>into(
FixedWindows.of(Duration.standardSeconds(windowDurationSeconds)))
.triggering(Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast((int) batchSize),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(windowDurationSeconds
)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(GroupIntoBatches.ofSize(batchSize))


Thanks


On Wed, Jun 10, 2020 at 8:37 AM Alexey Romanenko 
wrote:

> Hi Talat,
>
> Could you elaborate what do you mean by “*opening and closing bundle*”?
>
> Sometimes, starting a KafkaReader can take time since it will seek for a
> start offset for each assigned partition but it happens only once at
> pipeline start-up and mostly depends on network conditions.
>
> On 9 Jun 2020, at 23:05, Talat Uyarer 
> wrote:
>
> Hi,
> I added some metrics on a step right after KafkaIO. When I compare the
> read time difference between producer and KafkaIO it is 800ms for P99.
> However somehow that step's opening and closing bundle difference is 18
> seconds for p99. The step itself does not do any specific thing. Do you
> have any idea why bundle latency is very high ? Where should I check or
> tune on KafkaIO ?
>
> Additional information I read from one topic. That topic has 15
> partitions. Producer write in a round robin fashion.
>
> Thanks
>
>
>


Re: Running apache_beam python sdk without c/c++ libs

2020-06-10 Thread Luke Cwik
I'm not sure. It depends on whether the Spark -> Beam Python integration
will interfere with the magic built into AWS Glue.

On Wed, Jun 10, 2020 at 8:57 AM Noah Goodrich  wrote:

> I was hoping to use the Spark runner since Glue is just Spark with some
> magic on top. And in our specific use case, we'd be looking at working with
> S3, Kinesis, and MySQL RDS.
>
> Sounds like this is a non-starter?
>
> On Wed, Jun 10, 2020 at 9:33 AM Luke Cwik  wrote:
>
>> Most runners are written in Java while others are cloud offerings which
>> wouldn't work for your use case which limits you to use the direct runner
>> (not meant for production/high performance applications). Beam Python SDK
>> uses cython for performance reasons but I don't believe it strictly
>> requires it as many unit tests run with and without cython enabled.
>> Integrations between Beam and third party libraries may require it though
>> so it likely depends on what you plan to do.
>>
>> On Wed, Jun 10, 2020 at 8:17 AM Noah Goodrich 
>> wrote:
>>
>>> I am looking at using the Beam Python SDK in AWS Glue but it doesn't
>>> support non-native python libraries (anything that is c/c++ based).
>>>
>>> Is the Beam Python SDK / runners able to be used without any c/c++
>>> library dependencies?
>>>
>>


Re: How to safely update jobs in-flight using Apache Beam on AWS EMR?

2020-06-10 Thread Dan Hill
Hi!  I found great docs about Apache Beam on Dataflow (which makes sense).
I was not able to find this about AWS EMR.

https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline

https://medium.com/google-cloud/restarting-cloud-dataflow-in-flight-9c688c49adfd


[ANNOUNCE] Beam 2.22.0 Released

2020-06-10 Thread Brian Hulette
The Apache Beam team is pleased to announce the release of version 2.22.0.

Apache Beam is an open source unified programming model to define and
execute data processing pipelines, including ETL, batch and stream
(continuous) processing. See https://beam.apache.org

You can download the release here:

https://beam.apache.org/get-started/downloads/

This release includes bug fixes, features, and improvements detailed on
the Beam blog: https://beam.apache.org/blog/beam-2.22.0/

Thanks to everyone who contributed to this release, and we hope you enjoy
using Beam 2.22.0.
-- Brian Hulette, on behalf of The Apache Beam team


Re: How to safely update jobs in-flight using Apache Beam on AWS EMR?

2020-06-10 Thread Luke Cwik
The runner needs to support it and I'm not aware of an EMR runner for
Apache Beam let alone one that supports pipeline update. Have you tried
reaching out to AWS?

On Wed, Jun 10, 2020 at 11:14 AM Dan Hill  wrote:

> Hi!  I found great docs about Apache Beam on Dataflow (which makes
> sense).  I was not able to find this about AWS EMR.
>
> https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline
>
>
> https://medium.com/google-cloud/restarting-cloud-dataflow-in-flight-9c688c49adfd
>


Re: BEAM-2217 NotImplementedError - DataflowRunner parsing Protos from PubSub (Python SDK)

2020-06-10 Thread Brian Hulette
+Udi Meiri  - do you have any suggestions to resolve this?

It looks like there are a couple things going wrong:
- ProtoCoder doesn't have a definition for to_type_hint
- DataflowRunner calls from_runner_api, which I believe is a workaround we
can eventually remove (+Robert Bradshaw ), which in
turn tries to get type hints for every coder in the pipeline

Brian

On Tue, Jun 9, 2020 at 1:50 AM Lien Michiels 
wrote:

> Hi Brian,
>
> Thanks so much for your quick response!
>
> I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the
> exact same error. Here is the full stacktrace:
>
> (metadata-persistor) ➜  metadata-persistor git:(feature/DEV-1249) ✗
> metadata_persistor --project X --environments X --window_size 1
> --input_subscription
> projects/XXX/subscriptions/mds-internal-item-metadata-subscription
> --runner DataflowRunner --temp_location gs:///item-metadata/temp
> --staging_location gs://XXX/item-metadata/staging
> /Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431:
> BeamDeprecationWarning: options is deprecated since First stable release.
> References to .options will not be supported
>   experiments = p.options.view_as(DebugOptions).experiments or []
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.7 interpreter.
> Traceback (most recent call last):
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor",
> line 8, in 
> sys.exit(run())
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py",
> line 246, in run
> batch_size=500,
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 524, in __exit__
> self.run().wait_until_finish()
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 497, in run
> self._options).run(False)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 510, in run
> return self.runner.run_pipeline(self, self._options)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
> line 484, in run_pipeline
> allow_proto_holders=True)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 858, in from_runner_api
> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1238, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1244, in from_runner_api
> id in proto.outputs.items()
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1244, in 
> id in proto.outputs.items()
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py",
> line 214, in from_runner_api
> element_type=context.element_type_from_coder_id(proto.coder_id),
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 227, in element_type_from_coder_id
> self.coders[coder_id].to_type_hint())
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py",
> line 221, in to_type_hint
> raise NotImplementedError('BEAM-2717')
> NotImplementedError: BEAM-2717
>
> When I was debugging and commenting out the different steps, I noticed the
> location in my code that supposedly throws the error changes. Here it
> complains about the WriteToBigQuery step (batch_size=500) but if I comment
> out that step it just moves on to the one above. It appears it's
> consistently thrown on the last run step (don't know if that's helpful,
> just thought I'd mention it).
>
> After adding beam.typehints.d

Re: How to safely update jobs in-flight using Apache Beam on AWS EMR?

2020-06-10 Thread Dan Hill
No.  I just sent AWS Support a message.

On Wed, Jun 10, 2020 at 1:00 PM Luke Cwik  wrote:

> The runner needs to support it and I'm not aware of an EMR runner for
> Apache Beam let alone one that supports pipeline update. Have you tried
> reaching out to AWS?
>
> On Wed, Jun 10, 2020 at 11:14 AM Dan Hill  wrote:
>
>> Hi!  I found great docs about Apache Beam on Dataflow (which makes
>> sense).  I was not able to find this about AWS EMR.
>>
>> https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline
>>
>>
>> https://medium.com/google-cloud/restarting-cloud-dataflow-in-flight-9c688c49adfd
>>
>


Re: How to safely update jobs in-flight using Apache Beam on AWS EMR?

2020-06-10 Thread Austin Bennett
Hi Dan,

AWS EMR generally runs Flink and/or Spark as supported Beam Runners.  For
EMR, you might want to check compatibility for versions of Beam/Flink can
run, and the status of beam pipelines using either of those runners.

On running Beam in AWS, had you seen:
https://www.youtube.com/watch?v=eCgZRJqdt_I



Cheers,
Austin

On Wed, Jun 10, 2020 at 2:02 PM Dan Hill  wrote:

> No.  I just sent AWS Support a message.
>
> On Wed, Jun 10, 2020 at 1:00 PM Luke Cwik  wrote:
>
>> The runner needs to support it and I'm not aware of an EMR runner for
>> Apache Beam let alone one that supports pipeline update. Have you tried
>> reaching out to AWS?
>>
>> On Wed, Jun 10, 2020 at 11:14 AM Dan Hill  wrote:
>>
>>> Hi!  I found great docs about Apache Beam on Dataflow (which makes
>>> sense).  I was not able to find this about AWS EMR.
>>>
>>> https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline
>>>
>>>
>>> https://medium.com/google-cloud/restarting-cloud-dataflow-in-flight-9c688c49adfd
>>>
>>


Re: BEAM-2217 NotImplementedError - DataflowRunner parsing Protos from PubSub (Python SDK)

2020-06-10 Thread Udi Meiri
I don't believe you need to register a coder unless you create your own
coder class.

Since the coder has a proto_message_type field, it could return that in
to_type_hint, but I'll defer to Robert since I'm unfamiliar with that area.

As a temporary workaround, please try overriding the output type like so:
  Map(parse_message_blobs).with_output_types(typing.Any)

That should prevent Beam from attempting to use the ProtoCoder.

On Wed, Jun 10, 2020 at 1:47 PM Brian Hulette  wrote:

> +Udi Meiri  - do you have any suggestions to resolve
> this?
>
> It looks like there are a couple things going wrong:
> - ProtoCoder doesn't have a definition for to_type_hint
> - DataflowRunner calls from_runner_api, which I believe is a workaround we
> can eventually remove (+Robert Bradshaw ), which in
> turn tries to get type hints for every coder in the pipeline
>
> Brian
>
> On Tue, Jun 9, 2020 at 1:50 AM Lien Michiels 
> wrote:
>
>> Hi Brian,
>>
>> Thanks so much for your quick response!
>>
>> I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the
>> exact same error. Here is the full stacktrace:
>>
>> (metadata-persistor) ➜  metadata-persistor git:(feature/DEV-1249) ✗
>> metadata_persistor --project X --environments X --window_size 1
>> --input_subscription
>> projects/XXX/subscriptions/mds-internal-item-metadata-subscription
>> --runner DataflowRunner --temp_location gs:///item-metadata/temp
>> --staging_location gs://XXX/item-metadata/staging
>> /Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431:
>> BeamDeprecationWarning: options is deprecated since First stable release.
>> References to .options will not be supported
>>   experiments = p.options.view_as(DebugOptions).experiments or []
>> WARNING:root:Make sure that locally built Python SDK docker image has
>> Python 3.7 interpreter.
>> Traceback (most recent call last):
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor",
>> line 8, in 
>> sys.exit(run())
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py",
>> line 246, in run
>> batch_size=500,
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 524, in __exit__
>> self.run().wait_until_finish()
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 497, in run
>> self._options).run(False)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 510, in run
>> return self.runner.run_pipeline(self, self._options)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
>> line 484, in run_pipeline
>> allow_proto_holders=True)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 858, in from_runner_api
>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 103, in get_by_id
>> self._id_to_proto[id], self._pipeline_context)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1238, in from_runner_api
>> part = context.transforms.get_by_id(transform_id)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 103, in get_by_id
>> self._id_to_proto[id], self._pipeline_context)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1244, in from_runner_api
>> id in proto.outputs.items()
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1244, in 
>> id in proto.outputs.items()
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 103, in get_by_id
>> self._id_to_proto[id], self._pipeline_context)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py",
>> line 214, in from_runner_api
>> element_type=context.element_type_from_coder_id(proto.coder_id),
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 227, in element_type_from_coder_id
>> self.coders[coder_id].to_type_hint())
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/py

Re: How to safely update jobs in-flight using Apache Beam on AWS EMR?

2020-06-10 Thread Dan Hill
Sweet.  I have not seen that video.  Cool.  I'm curious about how well
AWS's managed services (like the Kinesis Data Analytics managed Flink
runner) handle the updates.  I'd guess it is best effort from the saved
state (if enabled).  If this is all delegated by Beam to Flink, then this
is more of a question for AWS.



On Wed, Jun 10, 2020 at 2:18 PM Austin Bennett 
wrote:

> Hi Dan,
>
> AWS EMR generally runs Flink and/or Spark as supported Beam Runners.  For
> EMR, you might want to check compatibility for versions of Beam/Flink can
> run, and the status of beam pipelines using either of those runners.
>
> On running Beam in AWS, had you seen:
> https://www.youtube.com/watch?v=eCgZRJqdt_I
>
>
>
> Cheers,
> Austin
>
> On Wed, Jun 10, 2020 at 2:02 PM Dan Hill  wrote:
>
>> No.  I just sent AWS Support a message.
>>
>> On Wed, Jun 10, 2020 at 1:00 PM Luke Cwik  wrote:
>>
>>> The runner needs to support it and I'm not aware of an EMR runner for
>>> Apache Beam let alone one that supports pipeline update. Have you tried
>>> reaching out to AWS?
>>>
>>> On Wed, Jun 10, 2020 at 11:14 AM Dan Hill  wrote:
>>>
 Hi!  I found great docs about Apache Beam on Dataflow (which makes
 sense).  I was not able to find this about AWS EMR.

 https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline


 https://medium.com/google-cloud/restarting-cloud-dataflow-in-flight-9c688c49adfd

>>>