Thanks Lukasz,

Please tell me, how can I set a coder on the PCollection created after the 
"MapToKV" apply?
I mean, all I know is that it will be a PCollection<KV<K,V>>, and I don't know 
what will be the actual runtime types of K and V.
So, what coder should I set? Can you please give a code example of how to do 
that?

Really appriciate your help,
Eran

From: Lukasz Cwik [mailto:[email protected]]
Sent: Monday, December 03, 2018 7:10 PM
To: [email protected]
Subject: Re: Generic Type PTransform

Apache Beam attempts to propagate coders through by looking at any typing 
information available but because Java has a lot of type erasure and there are 
many scenarios where these coders can't be propagated forward from a previous 
transform.

Take the following two examples (note that there are many subtle variations 
that can give different results):
List<String> erasedType = new List<String>();  // type is lost
List<String> keptType = new List<String>() {};  // type is kept because of 
anonymous inner class being declared
In the first the type is erased and in the second the type information is 
available. I would suggest

In your case we can't infer what K and what V are because after the code 
compiles the types have been erased hence the error message. To immediately fix 
the problem, you'll want to set the coder on the PCollection created after you 
apply the "MapToKV" transform (you might need to do it on the 
"MapToSimpleImmutableEntry" transform as well).

If you want to get into the details, take a look at they CoderRegistry[1] as it 
contains the type inference / propagation code.

Finally, this not an uncommon problem that users face and it seems as though 
the error message that is given doesn't make sense so feel free to propose 
changes in the error messages to help others such as yourself.

1: 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java

On Sun, Dec 2, 2018 at 10:54 PM Matt Casters 
<[email protected]<mailto:[email protected]>> wrote:
There are probably smarter people than me on this list but since I recently 
been through a similar thought exercise...

For the generic use in Kettle I have a PCollection<KettleRow> going through the 
pipeline.
KettleRow is just an Object[] wrapper for which I can implement a Coder.

The "group by" that I implemented does the following:Split 
PCollection<KettleRow> into PCollection<KV<KettleRow, KettleRow>>
Then it  applies the standard GroupByKey.create() giving us 
PCollection<KV<KettleRow, Iterable<KettleRow>>>
This means that we can simple aggregate all the elements in Iterable<KettleRow> 
to aggregate a group.

Well, at least that works for me. The code is open so look at it over here:
https://github.com/mattcasters/kettle-beam-core/blob/master/src/main/java/org/kettle/beam/core/transform/GroupByTransform.java

Like you I had trouble with the Coder for my KettleRows so I hacked up this to 
make it work:
https://github.com/mattcasters/kettle-beam-core/blob/master/src/main/java/org/kettle/beam/core/coder/KettleRowCoder.java

It's set on the pipeline:
pipeline.getCoderRegistry().registerCoderForClass( KettleRow.class, new 
KettleRowCoder());

Good luck!
Matt

Op zo 2 dec. 2018 om 20:57 schreef Eran Twili 
<[email protected]<mailto:[email protected]>>:
Hi,

We are considering using Beam in our software.
We wish to create a service for a user which will operate Beam for him, and 
obviously the user code doesn't have Beam API visibility.
For that we need to generify some Beam API.
So the user supply functions and we embed them in a generic PTransform and run 
them in a Beam pipeline.
We have some difficulties to understand how can we provide the user with option 
to perform GroupByKey operation.
The problem is that GroupByKey takes KV and our PCollections holds only user 
datatypes which should not be Beam datatypes.
So we thought about having this PTransform:
public class PlatformGroupByKey<K,V> extends
        PTransform<PCollection<CustomType<SimpleImmutableEntry<K,V>>>, 
PCollection<CustomType<SimpleImmutableEntry<K,Iterable<V>>>>> {
    @Override
    public PCollection<CustomType<SimpleImmutableEntry<K,Iterable<V>>>> 
expand(PCollection<CustomType<SimpleImmutableEntry<K,V>>> input) {

        return input
                .apply("MapToKV",
                        MapElements.via(
                            new 
SimpleFunction<CustomType<SimpleImmutableEntry<K,V>>, KV<K, V>>() {
                                @Override
                                public KV<K, V> 
apply(CustomType<SimpleImmutableEntry<K,V>> kv) {
                                    return KV.of(kv.field.getKey(), 
kv.field.getValue()); }}))
                .apply("GroupByKey",
                        GroupByKey.create())
                .apply("MapToSimpleImmutableEntry",
                        MapElements.via(
                            new SimpleFunction<KV<K, Iterable<V>>, 
CustomType<SimpleImmutableEntry<K,Iterable<V>>>>() {
                                @Override
                                public 
CustomType<SimpleImmutableEntry<K,Iterable<V>>> apply(KV<K, Iterable<V>> kv) {
                                    return new CustomType<>(new 
SimpleImmutableEntry<>(kv.getKey(), kv.getValue())); }}));
    }
}
In which we will get PCollection from our key-value type (java's 
SimpleImmutableEntry),
Convert it to KV,
Preform the GroupByKey,
And re-convert it again to SimpleImmutableEntry.

But we get this error in runtime:

java.lang.IllegalStateException: Unable to return a default Coder for 
GroupByKey/MapToKV/Map/ParMultiDo(Anonymous).output [PCollection]. Correct one 
of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().
  Inferring a Coder from the CoderRegistry failed: Cannot provide coder for 
parameterized type org.apache.beam.sdk.values.KV<K, V>: Unable to provide a 
Coder for K.
  Building a Coder using a registered CoderProvider failed.
  See suppressed exceptions for detailed failures.
  Using the default output Coder from the producing PTransform failed: 
PTransform.getOutputCoder called.
                at 
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
                at 
org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
                at 
org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115)
                at 
org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:190)
                at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
                at 
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
                at 
org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
                at 
org.apache.beam.examples.platform.PlatformGroupByKey.expand(PlatformGroupByKey.java:27)

We don't understand why is K generic type gets into runtime.
In runtime it will been known by the PCollection concrete input parameter that 
is being send to the expand method.
What are we doing wrong? Is there a way to achieve what we want using Beam?
Appreciate any help.

Regards,
Eran


Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Reply via email to