[jira] [Created] (FLINK-32001) SupportsRowLevelUpdate does not support returning only a part of the columns.

2023-05-04 Thread Ming Li (Jira)
Ming Li created FLINK-32001:
---

 Summary: SupportsRowLevelUpdate does not support returning only a 
part of the columns.
 Key: FLINK-32001
 URL: https://issues.apache.org/jira/browse/FLINK-32001
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.17.0
Reporter: Ming Li


[FLIP-282|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235838061]
 introduces the new Delete and Update API in Flink SQL. Although it is 
described in the documentation that in case of {{partial-update}} we only need 
to return the primary key columns and the updated columns.

But in fact, the topology of the job  is {{{}source -> cal -> 
constraintEnforcer -> sink{}}}, and the constraint check will be performed in 
the operator of {{{}constraintEnforcer{}}}, which is done according to index, 
not according to column. If only some columns are returned, the constraint 
check is wrong, and it is easy to generate 
{{{}ArrayIndexOutOfBoundsException{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32655) RecreateOnResetOperatorCoordinator did not forward notifyCheckpointAborted to the real OperatorCoordinator

2023-07-24 Thread Ming Li (Jira)
Ming Li created FLINK-32655:
---

 Summary: RecreateOnResetOperatorCoordinator did not forward 
notifyCheckpointAborted to the real OperatorCoordinator
 Key: FLINK-32655
 URL: https://issues.apache.org/jira/browse/FLINK-32655
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.17.1
Reporter: Ming Li


{{[RecreateOnResetOperatorCoordinator|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java#L115]}}
 does not override {{{}notifyCheckpointAborted{}}}, which causes the 
{{SplitEnumerator}} in {{SourceCoordinator}} can not receive the checkpoint 
abort message.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32698) Add getCheckpointOptions interface in ManagedSnapshotContext

2023-07-26 Thread Ming Li (Jira)
Ming Li created FLINK-32698:
---

 Summary: Add getCheckpointOptions interface in 
ManagedSnapshotContext
 Key: FLINK-32698
 URL: https://issues.apache.org/jira/browse/FLINK-32698
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Ming Li


Currently only {{checkpointID}} and {{checkpointTimestamp}} information are 
provided in {{{}ManagedSnapshotContext{}}}. We hope to provide more information 
about {{{}CheckpointOptions{}}}, so that operators can adopt different logics 
when performing {{{}SnapshotState{}}}.

 

An example is to adopt different behaviors according to the type of checkpoint. 
For example, in {{{}Paimon{}}}, we hope that the data written by {{checkpoint}} 
can expire automatically, while the data written by {{savepoint}} can be 
persisted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33798) Automatically clean up rocksdb logs when the task failover.

2023-12-11 Thread Ming Li (Jira)
Ming Li created FLINK-33798:
---

 Summary: Automatically clean up rocksdb logs when the task 
failover.
 Key: FLINK-33798
 URL: https://issues.apache.org/jira/browse/FLINK-33798
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Ming Li


Since FLINK-24785 relocates rocksdb log, multiple rocksdb logs will be created 
under the flink log directory, but they are not cleaned up during task 
failover, resulting in a large number of rocksdb logs under the flink log 
directory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-14481) Modify the Flink valid socket port check to 0 to 65535.

2019-10-21 Thread ming li (Jira)
ming li created FLINK-14481:
---

 Summary: Modify the Flink valid socket port check to 0 to 65535.
 Key: FLINK-14481
 URL: https://issues.apache.org/jira/browse/FLINK-14481
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: ming li


In Flink, I found that Flink's socket port check is 'port >= 0 && port <= 65536.
{code:java}
checkArgument(serverPort >= 0 && serverPort <= 65536, "Invalid port 
number.");{code}
But in the process of binding the port, the valid port is 0 to 65535(A port 
number of zero will let the System pick up anephemeral port in a bin 
operation). Although the 65536 port will fail due to the port out of range when 
actually binding, Flink has already done a valid range check on the port, which 
seems to be very confusing. Should we modify Flink's port check to 0 to 65535?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15467) Should wait for the end of the source thread during the Task cancellation

2020-01-03 Thread ming li (Jira)
ming li created FLINK-15467:
---

 Summary: Should wait for the end of the source thread during the 
Task cancellation
 Key: FLINK-15467
 URL: https://issues.apache.org/jira/browse/FLINK-15467
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.9.1, 1.9.0
Reporter: ming li


     In the new mailBox model, SourceStreamTask starts a source thread to run 
user methods, and the current execution thread will block on mailbox.takeMail 
(). When a task cancels, the TaskCanceler thread will cancel the task and 
interrupt the execution thread. Therefore, the execution thread of 
SourceStreamTask will throw InterruptedException, then cancel the task again, 
and throw an exception.
{code:java}
//代码占位符
@Override
protected void performDefaultAction(ActionContext context) throws Exception {
   // Against the usual contract of this method, this implementation is not 
step-wise but blocking instead for
   // compatibility reasons with the current source interface (source functions 
run as a loop, not in steps).
   sourceThread.start();

   // We run an alternative mailbox loop that does not involve default actions 
and synchronizes around actions.
   try {
  runAlternativeMailboxLoop();
   } catch (Exception mailboxEx) {
  // We cancel the source function if some runtime exception escaped the 
mailbox.
  if (!isCanceled()) {
 cancelTask();
  }
  throw mailboxEx;
   }

   sourceThread.join();
   if (!isFinished) {
  sourceThread.checkThrowSourceExecutionException();
   }

   context.allActionsCompleted();
}
{code}
   When all tasks of this TaskExecutor are canceled, the blob file will be 
cleaned up. But the real source thread is not finished at this time, which will 
cause a ClassNotFoundException when loading a new class. In this case, the 
source thread may not be able to properly clean up and release resources (such 
as closing child threads, cleaning up local files, etc.). Therefore, I think we 
should mark this task canceled or finished after the execution of the source 
thread is completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20554) The Checkpointed Data Size of the Latest Completed Checkpoint is incorrectly displayed on the Overview page of the UI

2020-12-09 Thread ming li (Jira)
ming li created FLINK-20554:
---

 Summary: The Checkpointed Data Size of the Latest Completed 
Checkpoint is incorrectly displayed on the Overview page of the UI
 Key: FLINK-20554
 URL: https://issues.apache.org/jira/browse/FLINK-20554
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.11.0
Reporter: ming li
 Attachments: image-2020-12-10-11-57-56-888.png

The {{Checkpointed Data Size}} of the {{Latest Completed Checkpoint}} always 
shows '-' in the {{Overview}} of the UI.

!image-2020-12-10-11-57-56-888.png|width=862,height=104!

I think it should be ??state_size?? instead of ??checkpointed_data_size?? in 
the 
code(https://github.com/apache/flink/blob/master/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/job-checkpoints.component.html#L52),
 which should fix this problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-30251) Move the IO with DFS during abort checkpoint to an asynchronous thread.

2022-11-30 Thread ming li (Jira)
ming li created FLINK-30251:
---

 Summary: Move the IO with DFS during abort checkpoint to an 
asynchronous thread.
 Key: FLINK-30251
 URL: https://issues.apache.org/jira/browse/FLINK-30251
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.15.2, 1.16.0
Reporter: ming li
 Attachments: image-2022-11-30-19-10-51-226.png

Currently when the {{checkpoint}} fails, we process the abort message in the 
Task's {{{}mailbox{}}}. We will close the output stream and delete the file on 
DFS. 

 

However, when the {{checkpoint}} failure is caused by a DFS system failure (for 
example, the namenode failure of HDFS), this operation may take a long time or 
hang, resulting in the Task being unable to continue processing data and the 
Task will not be able to process the data at this time.

 

So I think we can put the operation of deleting files in an asynchronous thread 
just like uploading checkpoint data asynchronously.

 

!image-2022-11-30-19-10-51-226.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30985) [Flink][table-store] Change the Splits allocation algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm.

2023-02-09 Thread ming li (Jira)
ming li created FLINK-30985:
---

 Summary: [Flink][table-store] Change the Splits allocation 
algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
 Key: FLINK-30985
 URL: https://issues.apache.org/jira/browse/FLINK-30985
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: ming li


Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in 
{{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the 
number of buckets is fixed, the order of traversal is also fixed.
{code:java}
private void assignSplits() {
bucketSplits.forEach(
(bucket, splits) -> {
if (splits.size() > 0) {
// To ensure the order of consumption, the data of the same 
bucket is given
// to a task to be consumed.
int task = bucket % context.currentParallelism();
if (readersAwaitingSplit.remove(task)) {
// if the reader that requested another split has 
failed in the
// meantime, remove
// it from the list of waiting readers
if (!context.registeredReaders().containsKey(task)) {
return;
}
context.assignSplit(splits.poll(), task);
}
}
});
}{code}
Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough 
split in each {{bucket}} , so that the first {{bucket}} will always be assigned 
to the task, and other buckets may not be consumed for a long time, resulting 
in uneven consumption and difficulty in advancing {{{}watermark{}}}. So I think 
we should change the split allocation algorithm to a fair algorithm.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31008) [Flink][Table Store] The Split allocation of the same bucket in ContinuousFileSplitEnumerator may be out of order

2023-02-09 Thread ming li (Jira)
ming li created FLINK-31008:
---

 Summary: [Flink][Table Store] The Split allocation of the same 
bucket in ContinuousFileSplitEnumerator may be out of order
 Key: FLINK-31008
 URL: https://issues.apache.org/jira/browse/FLINK-31008
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Reporter: ming li


There are two places in {{ContinuousFileSplitEnumerator}} that add 
{{FileStoreSourceSplit}} to {{{}bucketSplits{}}}: {{addSplitsBack}} and 
{{{}processDiscoveredSplits{}}}. {{processDiscoveredSplits}} will continuously 
check for new splits and add them to the queue.  At this time, the order of the 
splits is in order.
{code:java}
private void addSplits(Collection splits) {
splits.forEach(this::addSplit);
}

private void addSplit(FileStoreSourceSplit split) {
bucketSplits
.computeIfAbsent(((DataSplit) split.split()).bucket(), i -> new 
LinkedList<>())
.add(split);
}{code}
However, when the task failover, the splits that have been allocated before 
will be returned. At this time, these returned splits are also added to the end 
of the queue, which leads to disorder in the allocation of splits.

 

I think these returned splits should be added to the head of the queue to 
ensure the order of allocation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31294) CommitterOperator forgot to close Committer when closing.

2023-03-01 Thread ming li (Jira)
ming li created FLINK-31294:
---

 Summary: CommitterOperator forgot to close Committer when closing.
 Key: FLINK-31294
 URL: https://issues.apache.org/jira/browse/FLINK-31294
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Reporter: ming li


{{CommitterOperator}} does not close the {{Committer}} when it closes, which 
may lead to resource leaks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31364) [Flink] add metrics for TableStore

2023-03-07 Thread Ming Li (Jira)
Ming Li created FLINK-31364:
---

 Summary: [Flink] add metrics for TableStore
 Key: FLINK-31364
 URL: https://issues.apache.org/jira/browse/FLINK-31364
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: Ming Li


Currently, relevant metrics are missing in {{{}Table Store{}}}, such as split 
consumption speed, commit information statistics, etc. We can add metrics for 
real-time monitoring of the {{{}Table Store{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31462) [Flink] Supports full calculation from the specified snapshot

2023-03-15 Thread Ming Li (Jira)
Ming Li created FLINK-31462:
---

 Summary: [Flink] Supports full calculation from the specified 
snapshot
 Key: FLINK-31462
 URL: https://issues.apache.org/jira/browse/FLINK-31462
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: Ming Li


Currently, the table store provides a startup mode for incremental consumption 
from a specified snapshot. We can provide a startup mode for incremental 
consumption after full calculation from a specified snapshot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31465) [Flink] Fix shortcode errors in docs

2023-03-15 Thread Ming Li (Jira)
Ming Li created FLINK-31465:
---

 Summary: [Flink] Fix shortcode errors in docs
 Key: FLINK-31465
 URL: https://issues.apache.org/jira/browse/FLINK-31465
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: Ming Li


When running docs with hugo, I get the following exception:
{code:java}
hugo v0.111.3+extended darwin/amd64 BuildDate=unknown
Error: Error building site: 
"/xxx/flink-table-store/docs/content/docs/how-to/writing-tables.md:303:1": 
failed to extract shortcode: shortcode "tabs" must be closed or 
self-closed{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31474) [Flink] Add failure information for out-of-order checkpoints

2023-03-15 Thread Ming Li (Jira)
Ming Li created FLINK-31474:
---

 Summary: [Flink] Add failure information for out-of-order 
checkpoints
 Key: FLINK-31474
 URL: https://issues.apache.org/jira/browse/FLINK-31474
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Ming Li


At present, when the checkpoint is out of order, only out-of-order logs will be 
printed on the {{Task}} side, while on the {{JM}} side, the checkpoint can only 
fail through timeout, and the real reason cannot be confirmed.

Therefore, I think we should add failure information on the JM side for the 
out-of-order checkpoint.
{code:java}
if (lastCheckpointId >= metadata.getCheckpointId()) {
LOG.info(
"Out of order checkpoint barrier (aborted previously?): {} >= {}",
lastCheckpointId,
metadata.getCheckpointId());
channelStateWriter.abort(metadata.getCheckpointId(), new 
CancellationException(), true);
checkAndClearAbortedStatus(metadata.getCheckpointId());
return;
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2022-05-18 Thread ming li (Jira)
ming li created FLINK-27681:
---

 Summary: Improve the availability of Flink when the RocksDB file 
is corrupted.
 Key: FLINK-27681
 URL: https://issues.apache.org/jira/browse/FLINK-27681
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: ming li


We have encountered several times when the RocksDB checksum does not match or 
the block verification fails when the job is restored. The reason for this 
situation is generally that there are some problems with the machine where the 
task is located, which causes the files uploaded to HDFS to be incorrect, but 
it has been a long time (a dozen minutes to half an hour) when we found this 
problem. I'm not sure if anyone else has had a similar problem.

Since this file is referenced by incremental checkpoints for a long time, when 
the maximum number of checkpoints reserved is exceeded, we can only use this 
file until it is no longer referenced. When the job failed, it cannot be 
recovered.

Therefore we consider:
1. Can RocksDB periodically check whether all files are correct and find the 
problem in time?
2. Can Flink automatically roll back to the previous checkpoint when there is a 
problem with the checkpoint data, because even with manual intervention, it 
just tries to recover from the existing checkpoint or discard the entire state.
3. Can we increase the maximum number of references to a file based on the 
maximum number of checkpoints reserved? When the number of references exceeds 
the maximum number of checkpoints -1, the Task side is required to upload a new 
file for this reference. Not sure if this way will ensure that the new file we 
upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28010) Use deleteRange to optimize the clear operation of RocksDBMapState.

2022-06-11 Thread ming li (Jira)
ming li created FLINK-28010:
---

 Summary: Use deleteRange to optimize the clear operation of 
RocksDBMapState.
 Key: FLINK-28010
 URL: https://issues.apache.org/jira/browse/FLINK-28010
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: ming li


[FLINK-21321|https://issues.apache.org/jira/browse/FLINK-21321] has introduced 
{{deleteRange}} for fast clipping of Incremental checkpoint, so can the 
{{clear}} method in {{RocksDBMapState}} be replaced with {{{}deleteRange{}}}?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-21990) Double check Task status when perform checkpoint.

2021-03-26 Thread ming li (Jira)
ming li created FLINK-21990:
---

 Summary: Double check Task status when perform checkpoint.
 Key: FLINK-21990
 URL: https://issues.apache.org/jira/browse/FLINK-21990
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.11.0
Reporter: ming li


We need to double check Task status when making Checkpoint. Otherwise, after a 
Task failed, the checkpoint may still be made successfully.



For example, I try to throw an exception at 17:10:24.069, get the lock at 
17:10:24.070 and start making Checkpoint, and finish making Checkpoint at 
17:10:24.373.
{code:java}
17:10:24.069 [Legacy Source Thread - Source_Custom_Source -> Sink_Unnamed 
(2/4)- execution # 0] INFO  
org.apache.flink.test.checkpointing.RegionCheckpointITCase  - throw expected 
exception
17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO  
org.apache.flink.runtime.state.AbstractSnapshotStrategy  - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 
0,5,Flink Task Threads] took 0 ms.
17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO  
org.apache.flink.test.checkpointing.RegionCheckpointITCase  - sleep 300 ms
17:10:24.372 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO  
org.apache.flink.runtime.state.AbstractSnapshotStrategy  - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 
0,5,Flink Task Threads] took 0 ms.
17:10:24.373 [jobmanager-future-thread-1] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 2 for job 4a08c4a50d00dfd56f86eb6ccb83b89c (0 bytes in 1137 ms).

{code}
 

 

>From the code point of view, we only judged the state of the task at the 
>beginning, and when the lock was obtained, we directly started to make the 
>Checkpoint.
{code:java}
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetricsBuilder checkpointMetrics)
throws Exception {

if (isRunning) {
actionExecutor.runThrowing(
() -> {//do checkpoint});
return true;
} else {
...
}
}{code}
However, during the period of acquiring the lock, the task state is likely to 
change. Compared with the Flink 1.9 version code, the 1.9 version judges the 
task status after acquiring the lock.

 
{code:java}
private boolean performCheckpoint(
  CheckpointMetaData checkpointMetaData,
  CheckpointOptions checkpointOptions,
  CheckpointMetrics checkpointMetrics,
  boolean advanceToEndOfTime) throws Exception {

   LOG.debug("Starting checkpoint ({}) {} on task {}",
  checkpointMetaData.getCheckpointId(), 
checkpointOptions.getCheckpointType(), getName());

   final long checkpointId = checkpointMetaData.getCheckpointId();

   synchronized (lock) {
  if (isRunning) {
 //do checkpoint
  } else {
...
  }
 }{code}
 

Therefore, I think we need to double check the task status to avoid the 
situation where the task fails but the Checkpoint can still succeed in the 
process of acquiring the lock.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24086) Do not re-register SharedStateRegistry to reduce the recovery time of the job

2021-08-31 Thread ming li (Jira)
ming li created FLINK-24086:
---

 Summary: Do not re-register SharedStateRegistry to reduce the 
recovery time of the job
 Key: FLINK-24086
 URL: https://issues.apache.org/jira/browse/FLINK-24086
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: ming li


At present, we only recover the {{CompletedCheckpointStore}} when the 
{{JobManager}} starts, so it seems that we do not need to re-register the 
{{SharedStateRegistry}} when the task restarts.


The reason for this issue is that in our production environment, we discard 
part of the data and state to only restart the failed task, but found that it 
may take several seconds to register the {{SharedStateRegistry}} (thousands of 
tasks and dozens of TB states). When there are a large number of task failures 
at the same time, this may take several minutes (number of tasks * several 
seconds).

 

Therefore, if the {{SharedStateRegistry}} can be reused, the time for task 
recovery can be reduced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24815) Reduce the cpu cost of calculating stateSize during state allocation

2021-11-07 Thread ming li (Jira)
ming li created FLINK-24815:
---

 Summary: Reduce the cpu cost of calculating stateSize during state 
allocation
 Key: FLINK-24815
 URL: https://issues.apache.org/jira/browse/FLINK-24815
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: ming li


When the task failover, we will reassign the state for each subtask and create 
a new {{OperatorSubtaskState}} object. At this time, the {{stateSize}} field in 
the {{OperatorSubtaskState}} will be recalculated. When using incremental 
{{{}Checkpoint{}}}, this field needs to traverse all shared states and then 
accumulate the size of the state.

Taking a job with 2000 parallelism and 100 share state for each task as an 
example, it needs to traverse 2000 * 100 = 20w times. At this time, the cpu of 
the JM scheduling thread will be full.

I think we can try to provide a construction method with {{stateSize}} for 
{{OperatorSubtaskState}} or delay the calculation of {{{}stateSize{}}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-28390) Allows RocksDB to configure FIFO Compaction to reduce CPU overhead.

2022-07-05 Thread ming li (Jira)
ming li created FLINK-28390:
---

 Summary: Allows RocksDB to configure FIFO Compaction to reduce CPU 
overhead.
 Key: FLINK-28390
 URL: https://issues.apache.org/jira/browse/FLINK-28390
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: ming li


We know that the fifo compaction strategy may silently delete data and may lose 
data for the business. But in some scenarios, FIFO compaction can be a very 
effective way to reduce CPU usage.

 

Flink's Taskmanager is usually some small-scale processes, such as allocating 4 
CPUs and 16G memory. When the state size is small, the CPU overhead occupied by 
RocksDB is not high, and as the state increases, RocksDB may frequently be in 
the compaction operation, which will occupy a large amount of CPU and affect 
the computing operation.

 

We usually configure a TTL for the state, so when using FIFO we can configure 
it to be slightly longer than the TTL, so that the upper layer is the same as 
before. 

 

Although the FIFO Compaction strategy may bring space amplification, the disk 
is cheaper than the CPU after all, so the overall cost is reduced.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-18451) Flink HA on yarn may appear TaskManager double running when HA is restored

2020-06-29 Thread ming li (Jira)
ming li created FLINK-18451:
---

 Summary: Flink HA on yarn may appear TaskManager double running 
when HA is restored
 Key: FLINK-18451
 URL: https://issues.apache.org/jira/browse/FLINK-18451
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.9.0
Reporter: ming li


We found that when NodeManager is lost, the new JobManager will be restored by 
Yarn's ResourceManager, and the Leader node will be registered on Zookeeper. 
The original TaskManager will find the new JobManager through Zookeeper and 
close the old JobManager connection. At this time, all tasks of the TaskManager 
will fail. The new JobManager will directly perform job recovery and recover 
from the latest checkpoint.

However, during the recovery process, when a TaskManager is abnormally 
connected to Zookeeper, it is not registered with the new JobManager in time. 
Before the following timeout:
1. Connect with Zookeeper
2. Heartbeat with JobManager/ResourceManager
Task will continue to run (assuming that Task can run independently in 
TaskManager). Assuming that HA recovers fast enough, some Task double runs will 
occur at this time.

Do we need to make a persistent record of the cluster resources we allocated 
during the runtime, and use it to judge all Task stops when HA is restored?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18808) Task-level numRecordsOut metric may be underestimated

2020-08-03 Thread ming li (Jira)
ming li created FLINK-18808:
---

 Summary: Task-level numRecordsOut metric may be underestimated
 Key: FLINK-18808
 URL: https://issues.apache.org/jira/browse/FLINK-18808
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Affects Versions: 1.11.1
 Environment: !image-2020-08-04-11-26-21-490.png!
Reporter: ming li
 Attachments: image-2020-08-04-11-28-13-800.png, 
image-2020-08-04-11-32-20-678.png

At present, we only register task-level numRecordsOut metric by reusing 
operator output record counter at the end of OperatorChain.
{code:java}
//代码占位符
if (config.isChainEnd()) {
   operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
}
{code}
If we only send data out through the last operator of OperatorChain, there is 
no problem with this statistics. But consider the following scenario:

!image-2020-08-04-11-28-13-800.png|width=507,height=174!

In this JobGraph, we not only send data in the last operator, but also send 
data in the middle operator of OperatorChain (the map operator just returns the 
original value directly). Below is one of our test topology, we can see that 
the statistics actually only have half of the total data received by the 
downstream.

!image-2020-08-04-11-32-20-678.png|width=648,height=251!

I think the data sent out by the intermediate operator should also be counted 
into the numRecordsOut of the Task. But currently we are not reusing operators 
output record counters in the intermediate operators, which leads to our 
task-level numRecordsOut metric is underestimated (although this has no effect 
on the actual operation of the job, it may affect our monitoring).

A simple idea of ​​mine is to modify the condition of reusing operators output 
record counter:
{code:java}
//代码占位符
if (!config.getNonChainedOutputs(getUserCodeClassloader()).isEmpty()) {
   operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
}{code}
In addition, I have another question: If a record is broadcast to all 
downstream, should the numRecordsOut counter increase by one or the downstream 
number? It seems that currently we are adding one to calculate the 
numRecordsOut metric.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19006) project transformation does not support conversion to Tuple25 type

2020-08-20 Thread ming li (Jira)
ming li created FLINK-19006:
---

 Summary: project transformation does not support conversion to 
Tuple25 type
 Key: FLINK-19006
 URL: https://issues.apache.org/jira/browse/FLINK-19006
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.11.1
Reporter: ming li


In the {{DataStream#project}} method, it will judge whether the length of 
{{fieldIndexes}} is between 1 and {{Tuple.MAX_ARITY-1}}, and then call 
{{projectTupleXX}} according to the length of {{fieldIndexes}}. This limits the 
maximum length of {{Tuple}} to 24.
{code:java}
protected StreamProjection(DataStream dataStream, int[] fieldIndexes) {
   if (!dataStream.getType().isTupleType()) {
  throw new RuntimeException("Only Tuple DataStreams can be projected");
   }
   if (fieldIndexes.length == 0) {
  throw new IllegalArgumentException("project() needs to select at least 
one (1) field.");
   } else if (fieldIndexes.length > Tuple.MAX_ARITY - 1) {
  throw new IllegalArgumentException(
"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") 
fields.");
   }

   int maxFieldIndex = (dataStream.getType()).getArity();
   for (int i = 0; i < fieldIndexes.length; i++) {
  Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex);
   }

   this.dataStream = dataStream;
   this.fieldIndexes = fieldIndexes;
}{code}
This problem also appears in {{ProjectOperator}}.
{code:java}
public Projection(DataSet ds, int[] fieldIndexes) {

   if (!(ds.getType() instanceof TupleTypeInfo)) {
  throw new UnsupportedOperationException("project() can only be applied to 
DataSets of Tuples.");
   }

   if (fieldIndexes.length == 0) {
  throw new IllegalArgumentException("project() needs to select at least 
one (1) field.");
   } else if (fieldIndexes.length > Tuple.MAX_ARITY - 1) {
  throw new IllegalArgumentException(
 "project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") 
fields.");
   }

   int maxFieldIndex = ds.getType().getArity();
   for (int fieldIndexe : fieldIndexes) {
  Preconditions.checkElementIndex(fieldIndexe, maxFieldIndex);
   }

   this.ds = ds;
   this.fieldIndexes = fieldIndexes;
}{code}
I think the length we limit should be 1 to {{Tuple.MAX_ARITY}} instead of 1 to 
{{Tuple.MAX_ARITY-1}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)