Hi,
We are considering using Beam in our software.
We wish to create a service for a user which will operate Beam for him, and
obviously the user code doesn't have Beam API visibility.
For that we need to generify some Beam API.
So the user supply functions and we embed them in a generic PTransform and run
them in a Beam pipeline.
We have some difficulties to understand how can we provide the user with option
to perform GroupByKey operation.
The problem is that GroupByKey takes KV and our PCollections holds only user
datatypes which should not be Beam datatypes.
So we thought about having this PTransform:
public class PlatformGroupByKey<K,V> extends
PTransform<PCollection<CustomType<SimpleImmutableEntry<K,V>>>,
PCollection<CustomType<SimpleImmutableEntry<K,Iterable<V>>>>> {
@Override
public PCollection<CustomType<SimpleImmutableEntry<K,Iterable<V>>>>
expand(PCollection<CustomType<SimpleImmutableEntry<K,V>>> input) {
return input
.apply("MapToKV",
MapElements.via(
new
SimpleFunction<CustomType<SimpleImmutableEntry<K,V>>, KV<K, V>>() {
@Override
public KV<K, V>
apply(CustomType<SimpleImmutableEntry<K,V>> kv) {
return KV.of(kv.field.getKey(),
kv.field.getValue()); }}))
.apply("GroupByKey",
GroupByKey.create())
.apply("MapToSimpleImmutableEntry",
MapElements.via(
new SimpleFunction<KV<K, Iterable<V>>,
CustomType<SimpleImmutableEntry<K,Iterable<V>>>>() {
@Override
public
CustomType<SimpleImmutableEntry<K,Iterable<V>>> apply(KV<K, Iterable<V>> kv) {
return new CustomType<>(new
SimpleImmutableEntry<>(kv.getKey(), kv.getValue())); }}));
}
}
In which we will get PCollection from our key-value type (java's
SimpleImmutableEntry),
Convert it to KV,
Preform the GroupByKey,
And re-convert it again to SimpleImmutableEntry.
But we get this error in runtime:
java.lang.IllegalStateException: Unable to return a default Coder for
GroupByKey/MapToKV/Map/ParMultiDo(Anonymous).output [PCollection]. Correct one
of the following root causes:
No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Cannot provide coder for
parameterized type org.apache.beam.sdk.values.KV<K, V>: Unable to provide a
Coder for K.
Building a Coder using a registered CoderProvider failed.
See suppressed exceptions for detailed failures.
Using the default output Coder from the producing PTransform failed:
PTransform.getOutputCoder called.
at
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
at
org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
at
org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115)
at
org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:190)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536)
at
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
at
org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
at
org.apache.beam.examples.platform.PlatformGroupByKey.expand(PlatformGroupByKey.java:27)
We don't understand why is K generic type gets into runtime.
In runtime it will been known by the PCollection concrete input parameter that
is being send to the expand method.
What are we doing wrong? Is there a way to achieve what we want using Beam?
Appreciate any help.
Regards,
Eran
Confidentiality: This communication and any attachments are intended for the
above-named persons only and may be confidential and/or legally privileged. Any
opinions expressed in this communication are not necessarily those of NICE
Actimize. If this communication has come to you in error you must take no
action based on it, nor must you copy or show it to anyone; please
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and
attachments are free from any virus, we advise that in keeping with good
computing practice the recipient should ensure they are actually virus free.