Hi, If you have a closer look at the excecption, you'll see that the issue is cause by com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl not being serializable. It seems that you have a reference to this class somewhere.
Flink requires that all function classes (like KeySelector) are serializable. Best, Fabian Am Mo., 8. Apr. 2019 um 09:13 Uhr schrieb Papadopoulos, Konstantinos < konstantinos.papadopou...@iriworldwide.com>: > Hi Fabian, > > > > Thanks for your support. I updated my POJO to implement the Serializable > interface with no success. > > I got the same NotSerializableException. > > > > Best, > > Konstantinos > > > > *From:* Fabian Hueske <fhue...@gmail.com> > *Sent:* Σάββατο, 6 Απριλίου 2019 2:26 πμ > *To:* Papadopoulos, Konstantinos > <konstantinos.papadopou...@iriworldwide.com> > *Cc:* Chesnay Schepler <ches...@apache.org>; user <user@flink.apache.org> > *Subject:* Re: InvalidProgramException when trying to sort a group within > a dataset > > > > Hi, > > > > You POJO should implement the Serializable interface. > > Otherwise it's not considered to be serializable. > > > > Best, Fabian > > > > Papadopoulos, Konstantinos <konstantinos.papadopou...@iriworldwide.com> > schrieb am Mi., 3. Apr. 2019, 07:22: > > Hi Chesnay, > > > > Thanks for your support. ThresholdAcvFact class is a simple POJO with the > following definition: > > > > public class ThresholdAcvFact { > > > > private Long timePeriodId; > > private Long geographyId; > > private Long productId; > > private Long customerId; > > private Double basePrice; > > private Double promoPrice; > > private Double basePriceAcv; > > private Double promoPriceAcv; > > private Long count; > > > > public Long getTimePeriodId() { > > return timePeriodId; > > } > > > > public void setTimePeriodId(Long timePeriodId) { > > this.timePeriodId = timePeriodId; > > } > > > > public Long getGeographyId() { > > return geographyId; > > } > > > > public void setGeographyId(Long geographyId) { > > this.geographyId = geographyId; > > } > > > > public Long getProductId() { > > return productId; > > } > > > > public void setProductId(Long productId) { > > this.productId = productId; > > } > > > > public Long getCustomerId() { > > return customerId; > > } > > > > public void setCustomerId(Long customerId) { > > this.customerId = customerId; > > } > > > > public Double getBasePrice() { > > return basePrice; > > } > > > > public void setBasePrice(Double basePrice) { > > this.basePrice = basePrice; > > } > > > > public Double getPromoPrice() { > > return promoPrice; > > } > > > > public void setPromoPrice(Double promoPrice) { > > this.promoPrice = promoPrice; > > } > > > > public Double getBasePriceAcv() { > > return basePriceAcv; > > } > > > > public void setBasePriceAcv(Double basePriceAcv) { > > this.basePriceAcv = basePriceAcv; > > } > > > > public Double getPromoPriceAcv() { > > return promoPriceAcv; > > } > > > > public void setPromoPriceAcv(Double promoPriceAcv) { > > this.promoPriceAcv = promoPriceAcv; > > } > > > > public Long getCount() { > > return count; > > } > > > > public void setCount(Long count) { > > this.count = count; > > } > > > > @Override > > public String toString() { > > return "ThresholdAcvFact{" + > > "timePeriodId=" + timePeriodId + > > ", geographyId=" + geographyId + > > ", productId=" + productId + > > ", customerId=" + customerId + > > ", basePrice=" + basePrice + > > ", promoPrice=" + promoPrice + > > ", basePriceAcv=" + basePriceAcv + > > ", promoPriceAcv=" + promoPriceAcv + > > ", count=" + count + > > '}'; > > } > > > > } > > > > While the implementation of the function we faced the issue reported is > the following: > > > > public DataSet<ThresholdAcvFact> transform(ThresholdAcvCalcSources > thresholdAcvCalcSources, Long customerId) { > > > > final DataSet<ThresholdAcvFact> basePriceFacts = getPriceFacts( > > thresholdAcvCalcSources.getBasePriceDataSet(), > thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(), > > new ThresholdAcvBasePriceFactMapper(customerId)); > > > > final DataSet<ThresholdAcvFact> promoPriceFacts = getPriceFacts( > > thresholdAcvCalcSources.getPromoPriceDataSet(), > thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(), > > new ThresholdAcvPromoPriceFactMapper(customerId)); > > > > return basePriceFacts > > .fullOuterJoin(promoPriceFacts) > > .where(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, > GEOGRAPHY_ID, "basePrice") > > .equalTo(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, > GEOGRAPHY_ID, "promoPrice") > > .with(new ThresholdAcvFactBasePromoPriceJoiner()) > > .groupBy(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, > GEOGRAPHY_ID) > > .sortGroup(new KeySelector<ThresholdAcvFact, Double>() { > > @Override > public Double getKey(ThresholdAcvFact thresholdAcvFact) throws > Exception { > > return > Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice()); > > > } > }, Order.*ASCENDING*) > > .reduceGroup(new ThresholdAcvFactCountGroupReducer()); > > > > } > > > > Regards, > > Konstantinos > > > > *From:* Chesnay Schepler <ches...@apache.org> > *Sent:* Τετάρτη, 3 Απριλίου 2019 12:59 μμ > *To:* Papadopoulos, Konstantinos < > konstantinos.papadopou...@iriworldwide.com>; user@flink.apache.org > *Subject:* Re: InvalidProgramException when trying to sort a group within > a dataset > > > > Your user-defined functions are referencing the class > "com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl" which isn't > serializable. > > My guess is that "ThresholdAcvFact" is a non-static inner class, however I > would need to see the entire class to give an accurate analysis. > > On 02/04/2019 12:31, Papadopoulos, Konstantinos wrote: > > Hi all, > > > > I am trying to sort a group within a dataset using KeySelector as follows: > > > > in > > .groupBy(“productId”, “timePeriodId”, “geographyId”) > > .sortGroup(new KeySelector<ThresholdAcvFact, Double>() { > > @Override > public Double getKey(ThresholdAcvFact thresholdAcvFact) throws > Exception { > > return > Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice()); > > > } > }, Order.*ASCENDING*) > > .reduceGroup(/* do something */) > > > > And I am getting the following exception: > > > > org.apache.flink.api.common.InvalidProgramException: KeySelector > group-sorting keys can only be used with KeySelector grouping keys. > > > > at > org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup(UnsortedGrouping.java:318) > > at > com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl.transform(ThresholdAcvCalcServiceImpl.java:91) > > at > com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:91) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532) > > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115) > > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171) > > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) > > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167) > > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114) > > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108) > > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74) > > at java.util.ArrayList.forEach(ArrayList.java:1257) > > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112) > > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74) > > at java.util.ArrayList.forEach(ArrayList.java:1257) > > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112) > > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74) > > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) > > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) > > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) > > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220) > > at > org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188) > > at > org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202) > > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181) > > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128) > > at > com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69) > > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > > > > Then, I tried to use KeySelector for both ‘groupBy’ and ‘sortGroup’ > transformations as follows: > > > > in > > .groupBy(new KeySelector<ThresholdAcvFact, Tuple3<Long, Long, Long>>() { > > > > @Override > > public Tuple3<Long, Long, Long> getKey(ThresholdAcvFact > thresholdAcvFact) throws Exception { > > return new Tuple3<>(thresholdAcvFact.getProductId(), > thresholdAcvFact.getTimePeriodId(), thresholdAcvFact.getGeographyId()); > > } > > }) > > .sortGroup(new KeySelector<ThresholdAcvFact, Double>() { > > @Override > public Double getKey(ThresholdAcvFact thresholdAcvFact) throws > Exception { > > return > Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice()); > > > } > }, Order.*ASCENDING*) > > .reduceGroup(/* do something */) > > > > The job execution still failed with the following exception: > > > > org.apache.flink.optimizer.CompilerException: Error translating node 'Map > "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] > ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could > not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: > com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl > > > > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:429) > > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:116) > > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) > > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) > > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:203) > > at > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:221) > > at > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) > > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) > > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) > > at > com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:95) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532) > > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115) > > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171) > > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) > > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167) > > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114) > > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108) > > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74) > > at java.util.ArrayList.forEach(ArrayList.java:1257) > > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112) > > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74) > > at java.util.ArrayList.forEach(ArrayList.java:1257) > > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112) > > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98) > > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74) > > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) > > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) > > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) > > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220) > > at > org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188) > > at > org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202) > > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181) > > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128) > > at > com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69) > > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: > Could not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: > com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl > > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281) > > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:897) > > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:374) > > ... 50 more > > Caused by: java.io.NotSerializableException: > com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534) > > at > org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:463) > > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279) > > ... 52 more > > > > Does anyone have any idea how I can surpass such issues? > > > > Thanks in advance > > > > > > > > > >