Hi all,

I am trying to sort a group within a dataset using KeySelector as follows:

in
  .groupBy("productId", "timePeriodId", "geographyId")
  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return 
Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());

     }
  }, Order.ASCENDING)
  .reduceGroup(/* do something */)

And I am getting the following exception:

org.apache.flink.api.common.InvalidProgramException: KeySelector group-sorting 
keys can only be used with KeySelector grouping keys.

     at 
org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup(UnsortedGrouping.java:318)
     at 
com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl.transform(ThresholdAcvCalcServiceImpl.java:91)
     at 
com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:91)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:498)
     at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)
     at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
     at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)
     at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
     at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)
     at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)
     at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)
     at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
     at java.util.ArrayList.forEach(ArrayList.java:1257)
     at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
     at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
     at java.util.ArrayList.forEach(ArrayList.java:1257)
     at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
     at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
     at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
     at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
     at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
     at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
     at 
org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
     at 
org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
     at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
     at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
     at 
com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
     at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
     at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Then, I tried to use KeySelector for both 'groupBy' and 'sortGroup' 
transformations as follows:

in
  .groupBy(new KeySelector<ThresholdAcvFact, Tuple3<Long, Long, Long>>() {

    @Override
    public Tuple3<Long, Long, Long> getKey(ThresholdAcvFact thresholdAcvFact) 
throws Exception {
      return  new Tuple3<>(thresholdAcvFact.getProductId(), 
thresholdAcvFact.getTimePeriodId(), thresholdAcvFact.getGeographyId());
    }
  })
  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return 
Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());

     }
  }, Order.ASCENDING)
  .reduceGroup(/* do something */)

The job execution still failed with the following exception:

org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key 
Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ 
LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write 
the user code wrapper class 
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : 
java.io.NotSerializableException: 
com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:429)
     at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:116)
     at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
     at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
     at 
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
     at 
org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
     at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:203)
     at 
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:221)
     at 
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
     at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
     at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
     at 
com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:95)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:498)
     at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)
     at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
     at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)
     at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
     at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)
     at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)
     at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)
     at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
     at java.util.ArrayList.forEach(ArrayList.java:1257)
     at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
     at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
     at java.util.ArrayList.forEach(ArrayList.java:1257)
     at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
     at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
     at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
     at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
     at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
     at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
     at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
     at 
org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
     at 
org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
     at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
     at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
     at 
com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
     at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
     at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: 
org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could 
not write the user code wrapper class 
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : 
java.io.NotSerializableException: 
com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl
     at 
org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281)
     at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:897)
     at 
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:374)
     ... 50 more
Caused by: java.io.NotSerializableException: 
com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
     at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
     at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
     at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
     at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
     at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
     at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
     at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
     at 
org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:463)
     at 
org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279)
     ... 52 more

Does anyone have any idea how I can surpass such issues?

Thanks in advance



Reply via email to