[ https://issues.apache.org/jira/browse/FLINK-14740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lining updated FLINK-14740: --------------------------- Description: *In current design:* The DataSet job uses VertexID as the OperatorID in the OperatorMetricGroup (ps:TaskMetricGroup.getOrAddOperator (string name)). If two operators in the same vertex have the same name, they will overwrite each other in the TaskMetricGroup. *Proposal:* We could add the OperatorID to the operator of the dataset. {code:java} for (TaskInChain tic : this.chainedTasksInSequence) { TaskConfig t = new TaskConfig(tic.getContainingVertex().getConfiguration()); Integer nodeId = tic.getPlanNode().getOptimizerNode().getId(); OperatorID operatorID = this.nodeId2OperatorId.get(nodeId); if(operatorID == null) { operatorID = new OperatorID(); this.nodeId2OperatorId.put(nodeId, operatorID); } t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName(), operatorID.toString()); } {code} Then we could get id from TaskInfo. was: In current design: The DataSet job uses VertexID as the OperatorID in the OperatorMetricGroup (ps:TaskMetricGroup.getOrAddOperator (string name)). If two operators in the same vertex have the same name, they will overwrite each other in the TaskMetricGroup. Proposal: We could add the OperatorID to the operator of the dataset. {code:java} for (TaskInChain tic : this.chainedTasksInSequence) { TaskConfig t = new TaskConfig(tic.getContainingVertex().getConfiguration()); Integer nodeId = tic.getPlanNode().getOptimizerNode().getId(); OperatorID operatorID = this.nodeId2OperatorId.get(nodeId); if(operatorID == null) { operatorID = new OperatorID(); this.nodeId2OperatorId.put(nodeId, operatorID); } t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName(), operatorID.toString()); } {code} Then we could get id from TaskInfo. > Create OperatorID for OperatorMetricGroup which in batch job > ------------------------------------------------------------- > > Key: FLINK-14740 > URL: https://issues.apache.org/jira/browse/FLINK-14740 > Project: Flink > Issue Type: Wish > Components: Runtime / Metrics > Reporter: lining > Priority: Major > > *In current design:* > The DataSet job uses VertexID as the OperatorID in the OperatorMetricGroup > (ps:TaskMetricGroup.getOrAddOperator (string name)). > If two operators in the same vertex have the same name, they will overwrite > each other in the TaskMetricGroup. > *Proposal:* > We could add the OperatorID to the operator of the dataset. > {code:java} > for (TaskInChain tic : this.chainedTasksInSequence) { > TaskConfig t = new > TaskConfig(tic.getContainingVertex().getConfiguration()); > Integer nodeId = tic.getPlanNode().getOptimizerNode().getId(); > OperatorID operatorID = this.nodeId2OperatorId.get(nodeId); > if(operatorID == null) { > operatorID = new OperatorID(); > this.nodeId2OperatorId.put(nodeId, operatorID); > } > t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), > tic.getTaskName(), operatorID.toString()); > } > {code} > Then we could get id from TaskInfo. -- This message was sent by Atlassian Jira (v8.3.4#803005)