[jira] [Created] (FLINK-32001) SupportsRowLevelUpdate does not support returning only a part of the columns.
Ming Li created FLINK-32001: --- Summary: SupportsRowLevelUpdate does not support returning only a part of the columns. Key: FLINK-32001 URL: https://issues.apache.org/jira/browse/FLINK-32001 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.17.0 Reporter: Ming Li [FLIP-282|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235838061] introduces the new Delete and Update API in Flink SQL. Although it is described in the documentation that in case of {{partial-update}} we only need to return the primary key columns and the updated columns. But in fact, the topology of the job is {{{}source -> cal -> constraintEnforcer -> sink{}}}, and the constraint check will be performed in the operator of {{{}constraintEnforcer{}}}, which is done according to index, not according to column. If only some columns are returned, the constraint check is wrong, and it is easy to generate {{{}ArrayIndexOutOfBoundsException{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32655) RecreateOnResetOperatorCoordinator did not forward notifyCheckpointAborted to the real OperatorCoordinator
Ming Li created FLINK-32655: --- Summary: RecreateOnResetOperatorCoordinator did not forward notifyCheckpointAborted to the real OperatorCoordinator Key: FLINK-32655 URL: https://issues.apache.org/jira/browse/FLINK-32655 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.17.1 Reporter: Ming Li {{[RecreateOnResetOperatorCoordinator|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L115]}} does not override {{{}notifyCheckpointAborted{}}}, which causes the {{SplitEnumerator}} in {{SourceCoordinator}} can not receive the checkpoint abort message. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32698) Add getCheckpointOptions interface in ManagedSnapshotContext
Ming Li created FLINK-32698: --- Summary: Add getCheckpointOptions interface in ManagedSnapshotContext Key: FLINK-32698 URL: https://issues.apache.org/jira/browse/FLINK-32698 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Ming Li Currently only {{checkpointID}} and {{checkpointTimestamp}} information are provided in {{{}ManagedSnapshotContext{}}}. We hope to provide more information about {{{}CheckpointOptions{}}}, so that operators can adopt different logics when performing {{{}SnapshotState{}}}. An example is to adopt different behaviors according to the type of checkpoint. For example, in {{{}Paimon{}}}, we hope that the data written by {{checkpoint}} can expire automatically, while the data written by {{savepoint}} can be persisted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33798) Automatically clean up rocksdb logs when the task failover.
Ming Li created FLINK-33798: --- Summary: Automatically clean up rocksdb logs when the task failover. Key: FLINK-33798 URL: https://issues.apache.org/jira/browse/FLINK-33798 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: Ming Li Since FLINK-24785 relocates rocksdb log, multiple rocksdb logs will be created under the flink log directory, but they are not cleaned up during task failover, resulting in a large number of rocksdb logs under the flink log directory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-14481) Modify the Flink valid socket port check to 0 to 65535.
ming li created FLINK-14481: --- Summary: Modify the Flink valid socket port check to 0 to 65535. Key: FLINK-14481 URL: https://issues.apache.org/jira/browse/FLINK-14481 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: ming li In Flink, I found that Flink's socket port check is 'port >= 0 && port <= 65536. {code:java} checkArgument(serverPort >= 0 && serverPort <= 65536, "Invalid port number.");{code} But in the process of binding the port, the valid port is 0 to 65535(A port number of zero will let the System pick up anephemeral port in a bin operation). Although the 65536 port will fail due to the port out of range when actually binding, Flink has already done a valid range check on the port, which seems to be very confusing. Should we modify Flink's port check to 0 to 65535? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation
ming li created FLINK-15467: --- Summary: Should wait for the end of the source thread during the Task cancellation Key: FLINK-15467 URL: https://issues.apache.org/jira/browse/FLINK-15467 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.9.1, 1.9.0 Reporter: ming li In the new mailBox model, SourceStreamTask starts a source thread to run user methods, and the current execution thread will block on mailbox.takeMail (). When a task cancels, the TaskCanceler thread will cancel the task and interrupt the execution thread. Therefore, the execution thread of SourceStreamTask will throw InterruptedException, then cancel the task again, and throw an exception. {code:java} //代码占位符 @Override protected void performDefaultAction(ActionContext context) throws Exception { // Against the usual contract of this method, this implementation is not step-wise but blocking instead for // compatibility reasons with the current source interface (source functions run as a loop, not in steps). sourceThread.start(); // We run an alternative mailbox loop that does not involve default actions and synchronizes around actions. try { runAlternativeMailboxLoop(); } catch (Exception mailboxEx) { // We cancel the source function if some runtime exception escaped the mailbox. if (!isCanceled()) { cancelTask(); } throw mailboxEx; } sourceThread.join(); if (!isFinished) { sourceThread.checkThrowSourceExecutionException(); } context.allActionsCompleted(); } {code} When all tasks of this TaskExecutor are canceled, the blob file will be cleaned up. But the real source thread is not finished at this time, which will cause a ClassNotFoundException when loading a new class. In this case, the source thread may not be able to properly clean up and release resources (such as closing child threads, cleaning up local files, etc.). Therefore, I think we should mark this task canceled or finished after the execution of the source thread is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20554) The Checkpointed Data Size of the Latest Completed Checkpoint is incorrectly displayed on the Overview page of the UI
ming li created FLINK-20554: --- Summary: The Checkpointed Data Size of the Latest Completed Checkpoint is incorrectly displayed on the Overview page of the UI Key: FLINK-20554 URL: https://issues.apache.org/jira/browse/FLINK-20554 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.11.0 Reporter: ming li Attachments: image-2020-12-10-11-57-56-888.png The {{Checkpointed Data Size}} of the {{Latest Completed Checkpoint}} always shows '-' in the {{Overview}} of the UI. !image-2020-12-10-11-57-56-888.png|width=862,height=104! I think it should be ??state_size?? instead of ??checkpointed_data_size?? in the code(https://github.com/apache/flink/blob/master/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html#L52), which should fix this problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-30251) Move the IO with DFS during abort checkpoint to an asynchronous thread.
ming li created FLINK-30251: --- Summary: Move the IO with DFS during abort checkpoint to an asynchronous thread. Key: FLINK-30251 URL: https://issues.apache.org/jira/browse/FLINK-30251 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.15.2, 1.16.0 Reporter: ming li Attachments: image-2022-11-30-19-10-51-226.png Currently when the {{checkpoint}} fails, we process the abort message in the Task's {{{}mailbox{}}}. We will close the output stream and delete the file on DFS. However, when the {{checkpoint}} failure is caused by a DFS system failure (for example, the namenode failure of HDFS), this operation may take a long time or hang, resulting in the Task being unable to continue processing data and the Task will not be able to process the data at this time. So I think we can put the operation of deleting files in an asynchronous thread just like uploading checkpoint data asynchronously. !image-2022-11-30-19-10-51-226.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30985) [Flink][table-store] Change the Splits allocation algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
ming li created FLINK-30985: --- Summary: [Flink][table-store] Change the Splits allocation algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm. Key: FLINK-30985 URL: https://issues.apache.org/jira/browse/FLINK-30985 Project: Flink Issue Type: Improvement Components: Table Store Reporter: ming li Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in {{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the number of buckets is fixed, the order of traversal is also fixed. {code:java} private void assignSplits() { bucketSplits.forEach( (bucket, splits) -> { if (splits.size() > 0) { // To ensure the order of consumption, the data of the same bucket is given // to a task to be consumed. int task = bucket % context.currentParallelism(); if (readersAwaitingSplit.remove(task)) { // if the reader that requested another split has failed in the // meantime, remove // it from the list of waiting readers if (!context.registeredReaders().containsKey(task)) { return; } context.assignSplit(splits.poll(), task); } } }); }{code} Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough split in each {{bucket}} , so that the first {{bucket}} will always be assigned to the task, and other buckets may not be consumed for a long time, resulting in uneven consumption and difficulty in advancing {{{}watermark{}}}. So I think we should change the split allocation algorithm to a fair algorithm. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31008) [Flink][Table Store] The Split allocation of the same bucket in ContinuousFileSplitEnumerator may be out of order
ming li created FLINK-31008: --- Summary: [Flink][Table Store] The Split allocation of the same bucket in ContinuousFileSplitEnumerator may be out of order Key: FLINK-31008 URL: https://issues.apache.org/jira/browse/FLINK-31008 Project: Flink Issue Type: Bug Components: Table Store Reporter: ming li There are two places in {{ContinuousFileSplitEnumerator}} that add {{FileStoreSourceSplit}} to {{{}bucketSplits{}}}: {{addSplitsBack}} and {{{}processDiscoveredSplits{}}}. {{processDiscoveredSplits}} will continuously check for new splits and add them to the queue. At this time, the order of the splits is in order. {code:java} private void addSplits(Collection splits) { splits.forEach(this::addSplit); } private void addSplit(FileStoreSourceSplit split) { bucketSplits .computeIfAbsent(((DataSplit) split.split()).bucket(), i -> new LinkedList<>()) .add(split); }{code} However, when the task failover, the splits that have been allocated before will be returned. At this time, these returned splits are also added to the end of the queue, which leads to disorder in the allocation of splits. I think these returned splits should be added to the head of the queue to ensure the order of allocation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31294) CommitterOperator forgot to close Committer when closing.
ming li created FLINK-31294: --- Summary: CommitterOperator forgot to close Committer when closing. Key: FLINK-31294 URL: https://issues.apache.org/jira/browse/FLINK-31294 Project: Flink Issue Type: Bug Components: Table Store Reporter: ming li {{CommitterOperator}} does not close the {{Committer}} when it closes, which may lead to resource leaks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31364) [Flink] add metrics for TableStore
Ming Li created FLINK-31364: --- Summary: [Flink] add metrics for TableStore Key: FLINK-31364 URL: https://issues.apache.org/jira/browse/FLINK-31364 Project: Flink Issue Type: Improvement Components: Table Store Reporter: Ming Li Currently, relevant metrics are missing in {{{}Table Store{}}}, such as split consumption speed, commit information statistics, etc. We can add metrics for real-time monitoring of the {{{}Table Store{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31462) [Flink] Supports full calculation from the specified snapshot
Ming Li created FLINK-31462: --- Summary: [Flink] Supports full calculation from the specified snapshot Key: FLINK-31462 URL: https://issues.apache.org/jira/browse/FLINK-31462 Project: Flink Issue Type: Improvement Components: Table Store Reporter: Ming Li Currently, the table store provides a startup mode for incremental consumption from a specified snapshot. We can provide a startup mode for incremental consumption after full calculation from a specified snapshot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31465) [Flink] Fix shortcode errors in docs
Ming Li created FLINK-31465: --- Summary: [Flink] Fix shortcode errors in docs Key: FLINK-31465 URL: https://issues.apache.org/jira/browse/FLINK-31465 Project: Flink Issue Type: Improvement Components: Table Store Reporter: Ming Li When running docs with hugo, I get the following exception: {code:java} hugo v0.111.3+extended darwin/amd64 BuildDate=unknown Error: Error building site: "/xxx/flink-table-store/docs/content/docs/how-to/writing-tables.md:303:1": failed to extract shortcode: shortcode "tabs" must be closed or self-closed{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31474) [Flink] Add failure information for out-of-order checkpoints
Ming Li created FLINK-31474: --- Summary: [Flink] Add failure information for out-of-order checkpoints Key: FLINK-31474 URL: https://issues.apache.org/jira/browse/FLINK-31474 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Ming Li At present, when the checkpoint is out of order, only out-of-order logs will be printed on the {{Task}} side, while on the {{JM}} side, the checkpoint can only fail through timeout, and the real reason cannot be confirmed. Therefore, I think we should add failure information on the JM side for the out-of-order checkpoint. {code:java} if (lastCheckpointId >= metadata.getCheckpointId()) { LOG.info( "Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId()); channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true); checkAndClearAbortedStatus(metadata.getCheckpointId()); return; } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.
ming li created FLINK-27681: --- Summary: Improve the availability of Flink when the RocksDB file is corrupted. Key: FLINK-27681 URL: https://issues.apache.org/jira/browse/FLINK-27681 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: ming li We have encountered several times when the RocksDB checksum does not match or the block verification fails when the job is restored. The reason for this situation is generally that there are some problems with the machine where the task is located, which causes the files uploaded to HDFS to be incorrect, but it has been a long time (a dozen minutes to half an hour) when we found this problem. I'm not sure if anyone else has had a similar problem. Since this file is referenced by incremental checkpoints for a long time, when the maximum number of checkpoints reserved is exceeded, we can only use this file until it is no longer referenced. When the job failed, it cannot be recovered. Therefore we consider: 1. Can RocksDB periodically check whether all files are correct and find the problem in time? 2. Can Flink automatically roll back to the previous checkpoint when there is a problem with the checkpoint data, because even with manual intervention, it just tries to recover from the existing checkpoint or discard the entire state. 3. Can we increase the maximum number of references to a file based on the maximum number of checkpoints reserved? When the number of references exceeds the maximum number of checkpoints -1, the Task side is required to upload a new file for this reference. Not sure if this way will ensure that the new file we upload will be correct. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28010) Use deleteRange to optimize the clear operation of RocksDBMapState.
ming li created FLINK-28010: --- Summary: Use deleteRange to optimize the clear operation of RocksDBMapState. Key: FLINK-28010 URL: https://issues.apache.org/jira/browse/FLINK-28010 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: ming li [FLINK-21321|https://issues.apache.org/jira/browse/FLINK-21321] has introduced {{deleteRange}} for fast clipping of Incremental checkpoint, so can the {{clear}} method in {{RocksDBMapState}} be replaced with {{{}deleteRange{}}}? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-21990) Double check Task status when perform checkpoint.
ming li created FLINK-21990: --- Summary: Double check Task status when perform checkpoint. Key: FLINK-21990 URL: https://issues.apache.org/jira/browse/FLINK-21990 Project: Flink Issue Type: Bug Affects Versions: 1.11.0 Reporter: ming li We need to double check Task status when making Checkpoint. Otherwise, after a Task failed, the checkpoint may still be made successfully. For example, I try to throw an exception at 17:10:24.069, get the lock at 17:10:24.070 and start making Checkpoint, and finish making Checkpoint at 17:10:24.373. {code:java} 17:10:24.069 [Legacy Source Thread - Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO org.apache.flink.test.checkpointing.RegionCheckpointITCase - throw expected exception 17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0,5,Flink Task Threads] took 0 ms. 17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO org.apache.flink.test.checkpointing.RegionCheckpointITCase - sleep 300 ms 17:10:24.372 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0,5,Flink Task Threads] took 0 ms. 17:10:24.373 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job 4a08c4a50d00dfd56f86eb6ccb83b89c (0 bytes in 1137 ms). {code} >From the code point of view, we only judged the state of the task at the >beginning, and when the lock was obtained, we directly started to make the >Checkpoint. {code:java} private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetrics) throws Exception { if (isRunning) { actionExecutor.runThrowing( () -> {//do checkpoint}); return true; } else { ... } }{code} However, during the period of acquiring the lock, the task state is likely to change. Compared with the Flink 1.9 version code, the 1.9 version judges the task status after acquiring the lock. {code:java} private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics, boolean advanceToEndOfTime) throws Exception { LOG.debug("Starting checkpoint ({}) {} on task {}", checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); final long checkpointId = checkpointMetaData.getCheckpointId(); synchronized (lock) { if (isRunning) { //do checkpoint } else { ... } }{code} Therefore, I think we need to double check the task status to avoid the situation where the task fails but the Checkpoint can still succeed in the process of acquiring the lock. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24086) Do not re-register SharedStateRegistry to reduce the recovery time of the job
ming li created FLINK-24086: --- Summary: Do not re-register SharedStateRegistry to reduce the recovery time of the job Key: FLINK-24086 URL: https://issues.apache.org/jira/browse/FLINK-24086 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: ming li At present, we only recover the {{CompletedCheckpointStore}} when the {{JobManager}} starts, so it seems that we do not need to re-register the {{SharedStateRegistry}} when the task restarts. The reason for this issue is that in our production environment, we discard part of the data and state to only restart the failed task, but found that it may take several seconds to register the {{SharedStateRegistry}} (thousands of tasks and dozens of TB states). When there are a large number of task failures at the same time, this may take several minutes (number of tasks * several seconds). Therefore, if the {{SharedStateRegistry}} can be reused, the time for task recovery can be reduced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24815) Reduce the cpu cost of calculating stateSize during state allocation
ming li created FLINK-24815: --- Summary: Reduce the cpu cost of calculating stateSize during state allocation Key: FLINK-24815 URL: https://issues.apache.org/jira/browse/FLINK-24815 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: ming li When the task failover, we will reassign the state for each subtask and create a new {{OperatorSubtaskState}} object. At this time, the {{stateSize}} field in the {{OperatorSubtaskState}} will be recalculated. When using incremental {{{}Checkpoint{}}}, this field needs to traverse all shared states and then accumulate the size of the state. Taking a job with 2000 parallelism and 100 share state for each task as an example, it needs to traverse 2000 * 100 = 20w times. At this time, the cpu of the JM scheduling thread will be full. I think we can try to provide a construction method with {{stateSize}} for {{OperatorSubtaskState}} or delay the calculation of {{{}stateSize{}}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-28390) Allows RocksDB to configure FIFO Compaction to reduce CPU overhead.
ming li created FLINK-28390: --- Summary: Allows RocksDB to configure FIFO Compaction to reduce CPU overhead. Key: FLINK-28390 URL: https://issues.apache.org/jira/browse/FLINK-28390 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: ming li We know that the fifo compaction strategy may silently delete data and may lose data for the business. But in some scenarios, FIFO compaction can be a very effective way to reduce CPU usage. Flink's Taskmanager is usually some small-scale processes, such as allocating 4 CPUs and 16G memory. When the state size is small, the CPU overhead occupied by RocksDB is not high, and as the state increases, RocksDB may frequently be in the compaction operation, which will occupy a large amount of CPU and affect the computing operation. We usually configure a TTL for the state, so when using FIFO we can configure it to be slightly longer than the TTL, so that the upper layer is the same as before. Although the FIFO Compaction strategy may bring space amplification, the disk is cheaper than the CPU after all, so the overall cost is reduced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-18451) Flink HA on yarn may appear TaskManager double running when HA is restored
ming li created FLINK-18451: --- Summary: Flink HA on yarn may appear TaskManager double running when HA is restored Key: FLINK-18451 URL: https://issues.apache.org/jira/browse/FLINK-18451 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.9.0 Reporter: ming li We found that when NodeManager is lost, the new JobManager will be restored by Yarn's ResourceManager, and the Leader node will be registered on Zookeeper. The original TaskManager will find the new JobManager through Zookeeper and close the old JobManager connection. At this time, all tasks of the TaskManager will fail. The new JobManager will directly perform job recovery and recover from the latest checkpoint. However, during the recovery process, when a TaskManager is abnormally connected to Zookeeper, it is not registered with the new JobManager in time. Before the following timeout: 1. Connect with Zookeeper 2. Heartbeat with JobManager/ResourceManager Task will continue to run (assuming that Task can run independently in TaskManager). Assuming that HA recovers fast enough, some Task double runs will occur at this time. Do we need to make a persistent record of the cluster resources we allocated during the runtime, and use it to judge all Task stops when HA is restored? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18808) Task-level numRecordsOut metric may be underestimated
ming li created FLINK-18808: --- Summary: Task-level numRecordsOut metric may be underestimated Key: FLINK-18808 URL: https://issues.apache.org/jira/browse/FLINK-18808 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Affects Versions: 1.11.1 Environment: !image-2020-08-04-11-26-21-490.png! Reporter: ming li Attachments: image-2020-08-04-11-28-13-800.png, image-2020-08-04-11-32-20-678.png At present, we only register task-level numRecordsOut metric by reusing operator output record counter at the end of OperatorChain. {code:java} //代码占位符 if (config.isChainEnd()) { operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask(); } {code} If we only send data out through the last operator of OperatorChain, there is no problem with this statistics. But consider the following scenario: !image-2020-08-04-11-28-13-800.png|width=507,height=174! In this JobGraph, we not only send data in the last operator, but also send data in the middle operator of OperatorChain (the map operator just returns the original value directly). Below is one of our test topology, we can see that the statistics actually only have half of the total data received by the downstream. !image-2020-08-04-11-32-20-678.png|width=648,height=251! I think the data sent out by the intermediate operator should also be counted into the numRecordsOut of the Task. But currently we are not reusing operators output record counters in the intermediate operators, which leads to our task-level numRecordsOut metric is underestimated (although this has no effect on the actual operation of the job, it may affect our monitoring). A simple idea of mine is to modify the condition of reusing operators output record counter: {code:java} //代码占位符 if (!config.getNonChainedOutputs(getUserCodeClassloader()).isEmpty()) { operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask(); }{code} In addition, I have another question: If a record is broadcast to all downstream, should the numRecordsOut counter increase by one or the downstream number? It seems that currently we are adding one to calculate the numRecordsOut metric. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19006) project transformation does not support conversion to Tuple25 type
ming li created FLINK-19006: --- Summary: project transformation does not support conversion to Tuple25 type Key: FLINK-19006 URL: https://issues.apache.org/jira/browse/FLINK-19006 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.11.1 Reporter: ming li In the {{DataStream#project}} method, it will judge whether the length of {{fieldIndexes}} is between 1 and {{Tuple.MAX_ARITY-1}}, and then call {{projectTupleXX}} according to the length of {{fieldIndexes}}. This limits the maximum length of {{Tuple}} to 24. {code:java} protected StreamProjection(DataStream dataStream, int[] fieldIndexes) { if (!dataStream.getType().isTupleType()) { throw new RuntimeException("Only Tuple DataStreams can be projected"); } if (fieldIndexes.length == 0) { throw new IllegalArgumentException("project() needs to select at least one (1) field."); } else if (fieldIndexes.length > Tuple.MAX_ARITY - 1) { throw new IllegalArgumentException( "project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields."); } int maxFieldIndex = (dataStream.getType()).getArity(); for (int i = 0; i < fieldIndexes.length; i++) { Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex); } this.dataStream = dataStream; this.fieldIndexes = fieldIndexes; }{code} This problem also appears in {{ProjectOperator}}. {code:java} public Projection(DataSet ds, int[] fieldIndexes) { if (!(ds.getType() instanceof TupleTypeInfo)) { throw new UnsupportedOperationException("project() can only be applied to DataSets of Tuples."); } if (fieldIndexes.length == 0) { throw new IllegalArgumentException("project() needs to select at least one (1) field."); } else if (fieldIndexes.length > Tuple.MAX_ARITY - 1) { throw new IllegalArgumentException( "project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields."); } int maxFieldIndex = ds.getType().getArity(); for (int fieldIndexe : fieldIndexes) { Preconditions.checkElementIndex(fieldIndexe, maxFieldIndex); } this.ds = ds; this.fieldIndexes = fieldIndexes; }{code} I think the length we limit should be 1 to {{Tuple.MAX_ARITY}} instead of 1 to {{Tuple.MAX_ARITY-1}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)