Hi Ken,
non-POJOs are serialized with Kryo. This might not give you optimal
performance. You can register a custom Kryo serializer in
ExecutionConfig to speed up the serialization.
Alternatively, you can implement `ResultTypeQueryable` provide a custom
type information with a custom serializer.
I hope this helps. Otherwise can you share a little example how you
would like to cann partitionCustom()?
Regards,
Timo
On 04.06.21 15:38, Ken Krugler wrote:
Hi all,
I'm using Flink 1.12 and a custom partitioner/partitioning key (batch
mode, with a DataSet) to do a better job of distributing data to tasks.
The classes look like:
public class MyPartitioner implements Partitioner<MyGroupingKey>
{
...
}
public class MyGroupingKey implements Comparable<MyGroupingKey>
{
...
}
This worked fine, but I noticed a warning logged by Flink
about MyGroupingKey not having an empty constructor, and thus not being
treated as a POJO.
I added that empty constructor, and then I got an error
because partitionCustom() only works on a single field key.
So I changed MyGroupingKey to have a single field (a string), with
transient cached values for the pieces of the key that I need while
partitioning. Now I get an odd error:
java.lang.RuntimeException: Error while calling custom partitioner
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
to MyGroupingKey
at MyPartitioner.partition(AdsPinotFilePartitioner.java:11)
at
org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235)
... 19 more
So I've got two questions…
• Should I just get rid of the empty constructor, and have Flink treat
it as a non-POJO? This seemed to be working fine.
• Is it a bug in Flink that the extracted field from the key is being
used as the expected type for partitioning?
Thanks!
— Ken
--------------------------
Ken Krugler
http://www.scaleunlimited.com <http://www.scaleunlimited.com>
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch