[ https://issues.apache.org/jira/browse/FLINK-11389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Till Rohrmann updated FLINK-11389: ---------------------------------- Fix Version/s: 1.8.0 1.6.4 > Incorrectly use job information when call getSerializedTaskInformation in > class TaskDeploymentDescriptor > -------------------------------------------------------------------------------------------------------- > > Key: FLINK-11389 > URL: https://issues.apache.org/jira/browse/FLINK-11389 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination > Affects Versions: 1.6.3, 1.7.1, 1.8.0 > Reporter: yuqi > Assignee: yuqi > Priority: Major > Labels: pull-request-available > Fix For: 1.6.4, 1.7.2, 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > See TaskDeploymentDescriptor > {code:java} > @Nullable > public SerializedValue<TaskInformation> getSerializedTaskInformation() { > if (serializedJobInformation instanceof NonOffloaded) { > NonOffloaded<TaskInformation> jobInformation = > (NonOffloaded<TaskInformation>) > serializedTaskInformation; > return jobInformation.serializedValue; > } else { > throw new IllegalStateException( > "Trying to work with offloaded serialized job > information."); > } > } > {code} > the condition serializedJobInformation instanceof NonOffloaded is not > correctly, > as serializedJobInformation and serializedTaskInformation are passed from > ExecutionVertex#createDeploymentDescriptor > {code:java} > if (jobInformationOrBlobKey.isLeft()) { > serializedJobInformation = new > TaskDeploymentDescriptor.NonOffloaded<>(jobInformationOrBlobKey.left()); > } else { > serializedJobInformation = new > TaskDeploymentDescriptor.Offloaded<>(jobInformationOrBlobKey.right()); > } > final Either<SerializedValue<TaskInformation>, > PermanentBlobKey> taskInformationOrBlobKey; > try { > taskInformationOrBlobKey = > jobVertex.getTaskInformationOrBlobKey(); > } catch (IOException e) { > throw new ExecutionGraphException( > "Could not create a serialized > JobVertexInformation for " + > jobVertex.getJobVertexId(), e); > } > final TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> > serializedTaskInformation; > if (taskInformationOrBlobKey.isLeft()) { > serializedTaskInformation = new > TaskDeploymentDescriptor.NonOffloaded<>(taskInformationOrBlobKey.left()); > } else { > serializedTaskInformation = new > TaskDeploymentDescriptor.Offloaded<>(taskInformationOrBlobKey.right()); > } > {code} > serializedJobInformation and serializedTaskInformation are not necessarily > shared the class NonOffloaded or Offloaded -- This message was sent by Atlassian JIRA (v7.6.3#76005)