[PR] [FLINK-37332] Support any column as chunk key column (postgres, orcale, db2, sqlserver) #3922 [flink-cdc]
SML0127 opened a new pull request, #3928: URL: https://github.com/apache/flink-cdc/pull/3928 MySQL CDC connector supports this feature from flink cdc v3.2 Now, other connectors that support incremental snapshot allow any columns as chunk key column apache jira: https://issues.apache.org/jira/browse/FLINK-37332 cc. @gtk96 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37236] Bump the flink version from 1.20.0 to 1.20.1 [flink-kubernetes-operator]
gyfora commented on PR #943: URL: https://github.com/apache/flink-kubernetes-operator/pull/943#issuecomment-2673514124 No changes to the NOTICE file necessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-37161) Cross-team verification for "Adaptive skewed join optimization for batch jobs"
[ https://issues.apache.org/jira/browse/FLINK-37161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929029#comment-17929029 ] xingbe commented on FLINK-37161: The above test cases have been sequentially verified. For test cases 1, 3, 4, and 5, it has been verified that the skewed join can handle join skew issues as expected. For example, the data distribution before optimization is: !https://static.dingtalk.com/media/lQLPJxH9h4tniDfNBIzNCHqw9SDlZWw_GjwHmzXkALzlAQ_2170_1164.png|width=355,height=191! and the data distribution after optimization is: !https://static.dingtalk.com/media/lQLPJxDsrln4yDfNBIbNCLKwBUSGUGCqbGQHmzXkALzlAA_2226_1158.png|width=362,height=188! But for test case 2, when there is an aggregation node with the same hash key downstream of the join node, it was found that the Join operator cannot be converted into an AdaptiveJoin operator. > Cross-team verification for "Adaptive skewed join optimization for batch jobs" > -- > > Key: FLINK-37161 > URL: https://issues.apache.org/jira/browse/FLINK-37161 > Project: Flink > Issue Type: Sub-task >Reporter: Junrui Lee >Assignee: xingbe >Priority: Blocker > Fix For: 2.0.0 > > > In Flink 2.0, we support the capability of adaptive skewed join optimization > for batch jobs, which will allow the Join operator to dynamically split > skewed and splittable partitions based on runtime input statistics, thereby > mitigating the long-tail problem caused by skewed data. > We may need the following tests: > # > Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is > set to {{{}auto{}}}. We need to construct a simple join case with data skewed > on a single key (e.g., making the data of a specified join key N times larger > than other join keys, where N is defined by > {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}). And ensuring > the data volume for the skewed join key exceeds the skewed-threshold (defined > by {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}). > Finally, observe whether the ratio of the maximum data volume to the median > data volume processed by concurrent join tasks is less than the skew factor. > # > Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is > set to {{{}forced{}}}. Construct a skewed join instance similar to Test 1, > but with the following difference: the join case should be connected to a > downstream operator that performs hashing on the same field (e.g., hash > aggregation or group by). It is recommended to set different parallelisms for > the join operator and the downstream operator to prevent the out edge from > being optimized to a forward edge. Finally, observe whether the ratio of the > maximum data volume to the median data volume processed by concurrent join > tasks is less than the skew factor. > # > Test the case where > {{{}table.optimizer.skewed-join-optimization.strategy{}}}as none, and verify > that the join operator will not be optimized into an adaptive join operator > under any circumstances. > # Test the case with customized > {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}. We need to > construct a skewed join instance similar to Test 1, setting different skewed > factors and observing whether the ratio of the maximum data volume to the > median data volume processed by concurrent join tasks is less than the skew > factor. Note that currently, Flink can only reduce the ratio to 2.0, and > please ensure that the skewed-factor is greater than 2.0 during testing. > # Test the case with customized > {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}. We need to > construct a skewed join instance similar to Test 1, setting different > skewed-threshold and observing whether the optimization is effective only > when the data volume processed by the skewed join instance is greater than > the skewed threshold. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37322) surport eval groovy scripts udf for flink sql
[ https://issues.apache.org/jira/browse/FLINK-37322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tianyuan updated FLINK-37322: - Component/s: Table SQL / API > surport eval groovy scripts udf for flink sql > - > > Key: FLINK-37322 > URL: https://issues.apache.org/jira/browse/FLINK-37322 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: tianyuan >Priority: Major > > I want to add a new UDF to dynamically execute Groovy scripts to achieve any > desired functions. > Just like using Lua scripts in Redis. I have implemented this UDF locally, > which has very efficient execution performance and strong flexibility. Usage > is as follows: > select eval_groovy_script('INT', 'return arg0+arg1', 1, 2); > select eval_groovy_script('INT', 'return p1+p2', MAP['p1', 2, 'p2', 5]); > select eval_groovy_script('MAP', 'return ["TopicName" : > "Lists", "Author" : "Raghav"] '); > This udf requires the following three parameters: > 1. Data return type > 2. Groovy script > 3. Parameters passed to the groovy script. If it is a variable type, use the > variable in the groovy script through arg\{index}. If it is a map type, you > can use the variable through the key in the groovy script. > Apache pinot has implemented similar functions: > https://docs.pinot.apache.org/users/user-guide-query/scalar-functions > I hope a committer will recognize my idea and assign this task to me. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37322) surport eval groovy scripts udf for flink sql
[ https://issues.apache.org/jira/browse/FLINK-37322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tianyuan updated FLINK-37322: - Affects Version/s: 2.1.0 > surport eval groovy scripts udf for flink sql > - > > Key: FLINK-37322 > URL: https://issues.apache.org/jira/browse/FLINK-37322 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 2.1.0 >Reporter: tianyuan >Priority: Major > > I want to add a new UDF to dynamically execute Groovy scripts to achieve any > desired functions. > Just like using Lua scripts in Redis. I have implemented this UDF locally, > which has very efficient execution performance and strong flexibility. Usage > is as follows: > select eval_groovy_script('INT', 'return arg0+arg1', 1, 2); > select eval_groovy_script('INT', 'return p1+p2', MAP['p1', 2, 'p2', 5]); > select eval_groovy_script('MAP', 'return ["TopicName" : > "Lists", "Author" : "Raghav"] '); > This udf requires the following three parameters: > 1. Data return type > 2. Groovy script > 3. Parameters passed to the groovy script. If it is a variable type, use the > variable in the groovy script through arg\{index}. If it is a map type, you > can use the variable through the key in the groovy script. > Apache pinot has implemented similar functions: > https://docs.pinot.apache.org/users/user-guide-query/scalar-functions > I hope a committer will recognize my idea and assign this task to me. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37320] [Observer] FINISHED finite streaming jobs incorrectly being set to RECONCILING [flink-kubernetes-operator]
gyfora merged PR #944: URL: https://github.com/apache/flink-kubernetes-operator/pull/944 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-3154) Update Kryo version from 2.24.0 to latest Kryo LTS version
[ https://issues.apache.org/jira/browse/FLINK-3154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17929032#comment-17929032 ] Gyula Fora commented on FLINK-3154: --- I feel that there is not a very strong consensus here regarding the approach, I started a discussion to finalize the decision on the dev mailing list: [https://lists.apache.org/thread/odhglx8tmpdt6jnorgcsvxjqjfd169x6] Please chime in and then we can move ahead with this > Update Kryo version from 2.24.0 to latest Kryo LTS version > -- > > Key: FLINK-3154 > URL: https://issues.apache.org/jira/browse/FLINK-3154 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Affects Versions: 1.0.0 >Reporter: Maximilian Michels >Priority: Not a Priority > Labels: pull-request-available > > Flink's Kryo version is outdated and could be updated to a newer version, > e.g. kryo-3.0.3. > From ML: we cannot bumping the Kryo version easily - the serialization format > changed (that's why they have a new major version), which would render all > Flink savepoints and checkpoints incompatible. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-37320) [Observer] FINISHED finite streaming jobs incorrectly being set to RECONCILING
[ https://issues.apache.org/jira/browse/FLINK-37320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-37320. -- Fix Version/s: kubernetes-operator-1.11.0 Assignee: Luca Castelli Resolution: Fixed merged to main f874b03dfddb47cd55cc9491c0dd4ebdd4fa3c38 > [Observer] FINISHED finite streaming jobs incorrectly being set to RECONCILING > -- > > Key: FLINK-37320 > URL: https://issues.apache.org/jira/browse/FLINK-37320 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.10.0 > Environment: I've attached the flinkdeployment CR and operator-config > I used to locally replicate. >Reporter: Luca Castelli >Assignee: Luca Castelli >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.11.0 > > Attachments: operator-config.yaml, > operator-log-finite-streaming-job.log, test-finite-streaming-job.yaml > > > Hello, > I believe I've found a bug within the observation logic for finite streaming > jobs. This is a follow-up to: > [https://lists.apache.org/thread/xvsk4fmlqln092cdolvox4dgko0pw81k]. > *For finite streaming jobs:* > # The job finishes successfully and the job status changes to FINISHED > # TTL (kubernetes.operator.jm-deployment.shutdown-ttl) cleanup removes the > JM deployments and clears HA configmap data > # On the next loop, the observer sees MISSING JM and changes the job status > from FINISHED to RECONCILING > The job had reached a terminal state. It shouldn't have been set back to > RECONCILING. > This leads to an operator error later when a recovery attempt is triggered. > The recovery is triggered because the JM is MISSING, the status is > RECONCILING, spec shows RUNNING, and HA enabled. The recovery fails with > validateHaMetadataExists throwing UpgradeFailureException. > At that point the deployment gets stuck in a loop with status RECONCILING and > UpgradeFailureException thrown on each cycle. I've attached operator logs > showing this. > *Proposed solution:* I think the fix would be to wrap > [AbstractFlinkDeploymentObserver.observeJmDeployment|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L155] > in an if-statement that checks the job is not in a terminal state. Happy to > discuss and/or put up the 2 line code change PR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-37350][table-planner] Fix memory leak caused by skewed join optimization strategy [flink]
noorall opened a new pull request, #26180: URL: https://github.com/apache/flink/pull/26180 ## What is the purpose of the change When the AdaptiveBroadcastJoinOptimizationStrategy is in effect, it leads to the statistics in the AdaptiveSkewedJoinOptimizationStrategy to not be released properly, which causes memory leaks. This PR is intended to fix the issue. ## Brief change log Fix memory leak caused by skewed join optimization strategy. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37350) Memory leak caused by skewed join optimization strategy
[ https://issues.apache.org/jira/browse/FLINK-37350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-37350: --- Labels: pull-request-available (was: ) > Memory leak caused by skewed join optimization strategy > --- > > Key: FLINK-37350 > URL: https://issues.apache.org/jira/browse/FLINK-37350 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Lei Yang >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0, 2.0.1 > > > When the AdaptiveBroadcastJoinOptimizationStrategy is in effect, it leads to > the statistics in the AdaptiveSkewedJoinOptimizationStrategy to not be > released properly, which causes memory leaks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-2.0][FLINK-28047][docs] Remove deprecated StreamExecutionEnvironment#readTextFile method in documentation [flink]
reswqa merged PR #26164: URL: https://github.com/apache/flink/pull/26164 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-37356) Recycle use of kafka producer(which commit error) maybe send data without AddPartitionsToTxnRequest
[ https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928691#comment-17928691 ] Arvid Heise commented on FLINK-37356: - Thank you very much for the detailed analysis. However, I'm still not sure I fully understand the issue. In general, everything that we do with reflection is fishy and may have worked at some point in time but prone to fail at a later time. So that makes your analysis very plausible. What I didn't fully understand is: how do we get an entry in {{partitionsInTransaction}} when we have an empty data? As far as I understood FLINK-31363, we actually don't have any partitions and as such the transaction is invisible to the broker. Nevertheless, I do see two issues: * Committing an empty transaction will result in a {{{}InvalidTxnStateException{}}}. I think that the current {{main}} contains already a fix for that by simple not trying to commit and resetting the state only on client side (please double-check if that makes sense at all). * I also think it may be inherently unsafe to reuse a producer that encountered an error for any reason. Let's say the producer the producer was fenced. Can we assume that the transaction manager is in a state from which we can easily resume? In theory, {{initTransaction}} should reset everything but you definitively found cases where it doesn't. So I'd propose to close a producer that encountered a non-retriable error and open a new one instead. Coming back to your original problem. It would be awesome if you could distill the issue to a small repo/test case that fails. From your description it sounds easy to do, although - as I said - I cannot follow 100%. {{main}} now also contains better debug logging that makes it easier to follow reuse because I also print the system hash code. That also helps tremendously in debugging leaks. > Recycle use of kafka producer(which commit error) maybe send data without > AddPartitionsToTxnRequest > --- > > Key: FLINK-37356 > URL: https://issues.apache.org/jira/browse/FLINK-37356 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.4.0 >Reporter: Hongshun Wang >Priority: Major > > In my production environment, READ_COMMITTED consumer can no longer consume > any records. Then I found that the LSO of the partition doesn't change for a > long time. I lookup all the log in Kafka cluster, then find that there is a > transaction lacking AddPartitionsToTxnRequest. > > At first, I think the problem is caused by > https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster > log contains InvalidTxnStateException. However, though the transaction is in > an invalid state, no data is written into Kafka topic partition in this > transaction( because in this case, the transaction is empty). It will not > influence any Kafka topic partition's LSO, thus consumer won't be blocked. > > Then I check the code of Kafka client, it seems no way to produce data > without AddPartitionsToTxnRequest done because the the `Sender` will refuse > to dequeue batches from the accumulator until they have been added to the > transaction. > {code:java} > // org.apache.kafka.clients.producer.KafkaProducer#doSend > private Future doSend(ProducerRecord record, Callback > callback) { > // ..ignore code > // Add the partition to the transaction (if in progress) after it has been > successfully > // appended to the accumulator. We cannot do it before because the partition > may be > // unknown or the initially selected partition may be changed when the batch > is closed > // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse > to dequeue > // batches from the accumulator until they have been added to the transaction. > if (transactionManager != null) { > transactionManager.maybeAddPartition(appendCallbacks.topicPartition()); > // ignore code > }{code} > {code:java} > //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode > > if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // > there is a rare case that a single batch size is larger than the request size > due to // compression; in this case we will still eventually send this batch > in a single request break; } else { if > (shouldStopDrainBatchesForPartition(first, tp)) break; } > {code} > > Then I have a idea that if a TransactionManager which doesn't clear > partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will > be sent again. It maybe happen in Flink Kafka connector: > > 1. The flink kafka connector also reuse and recycle KafkaProducer: > KafkaCommitter will recycle
Re: [PR] [FLINK-37350][table-planner] Fix memory leak caused by skewed join optimization strategy [flink]
JunRuiLee commented on code in PR #26180: URL: https://github.com/apache/flink/pull/26180#discussion_r1963051617 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/VertexParallelismAndInputInfosDeciderUtils.java: ## @@ -658,4 +660,148 @@ public static long calculateDataVolumePerTaskForInput( long globalDataVolumePerTask, long inputsGroupBytes, long totalDataBytes) { return (long) ((double) inputsGroupBytes / totalDataBytes * globalDataVolumePerTask); } + +public static Optional constructOptimizationLog( +BlockingInputInfo inputInfo, JobVertexInputInfo jobVertexInputInfo) { +if (inputInfo.areInterInputsKeysCorrelated() && inputInfo.isIntraInputKeyCorrelated()) { +return Optional.empty(); +} +boolean optimized = false; +List executionVertexInputInfos = +jobVertexInputInfo.getExecutionVertexInputInfos(); +int parallelism = executionVertexInputInfos.size(); +long[] optimizedDataBytes = new long[parallelism]; +long optimizedMin = Long.MAX_VALUE, optimizedMax = 0; +long[] nonOptimizedDataBytes = new long[parallelism]; +long nonOptimizedMin = Long.MAX_VALUE, nonOptimizedMax = 0; +for (int i = 0; i < parallelism; ++i) { +Map consumedSubpartitionGroups = + executionVertexInputInfos.get(i).getConsumedSubpartitionGroups(); +for (Map.Entry entry : consumedSubpartitionGroups.entrySet()) { +IndexRange partitionRange = entry.getKey(); +IndexRange subpartitionRange = entry.getValue(); +optimizedDataBytes[i] += +inputInfo.getNumBytesProduced(partitionRange, subpartitionRange); +} +optimizedMin = Math.min(optimizedMin, optimizedDataBytes[i]); +optimizedMax = Math.max(optimizedMax, optimizedDataBytes[i]); + +Map nonOptimizedConsumedSubpartitionGroup = +computeNumBasedConsumedSubpartitionGroup(parallelism, i, inputInfo); +checkState(nonOptimizedConsumedSubpartitionGroup.size() == 1); +nonOptimizedDataBytes[i] += +inputInfo.getNumBytesProduced( +nonOptimizedConsumedSubpartitionGroup +.entrySet() +.iterator() +.next() +.getKey(), +nonOptimizedConsumedSubpartitionGroup +.entrySet() +.iterator() +.next() +.getValue()); +nonOptimizedMin = Math.min(nonOptimizedMin, nonOptimizedDataBytes[i]); +nonOptimizedMax = Math.max(nonOptimizedMax, nonOptimizedDataBytes[i]); + +if (!optimized +&& !consumedSubpartitionGroups.equals(nonOptimizedConsumedSubpartitionGroup)) { +optimized = true; +} +} +if (optimized) { +long optimizedMed = median(optimizedDataBytes); +long nonOptimizedMed = median(nonOptimizedDataBytes); +String logMessage = +String.format( +"Result id: %s, " ++ "type number: %d, " ++ "input data size: " ++ "[ Before: {min: %s, median: %s, max: %s}, " ++ "After: {min: %s, median: %s, max: %s} ]", +inputInfo.getResultId(), +inputInfo.getInputTypeNumber(), +new MemorySize(nonOptimizedMin).toHumanReadableString(), +new MemorySize(nonOptimizedMed).toHumanReadableString(), +new MemorySize(nonOptimizedMax).toHumanReadableString(), +new MemorySize(optimizedMin).toHumanReadableString(), +new MemorySize(optimizedMed).toHumanReadableString(), +new MemorySize(optimizedMax).toHumanReadableString()); +return Optional.of(logMessage); +} +return Optional.empty(); +} + +private static Map computeNumBasedConsumedSubpartitionGroup( +int parallelism, int currentIndex, BlockingInputInfo inputInfo) { +int sourceParallelism = inputInfo.getNumPartitions(); + +if (inputInfo.isPointwise()) { +return computeNumBasedConsumedSubpartitionGroupForPointwise( +sourceParallelism, parallelism, currentIndex, inputInfo::getNumSubpartitions); +} else { +return computeNumBasedConsumedSubpartitionGroupForAllToAll( +
Re: [PR] [FLINK-37149][doc] Add documents for adaptive batch execution. [flink]
JunRuiLee merged PR #26168: URL: https://github.com/apache/flink/pull/26168 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-37069) Cross-team verification for "Disaggregated State Management"
[ https://issues.apache.org/jira/browse/FLINK-37069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928718#comment-17928718 ] Zakelly Lan commented on FLINK-37069: - Thanks Weijie! > Cross-team verification for "Disaggregated State Management" > > > Key: FLINK-37069 > URL: https://issues.apache.org/jira/browse/FLINK-37069 > Project: Flink > Issue Type: Sub-task >Reporter: Xintong Song >Assignee: Weijie Guo >Priority: Blocker > Fix For: 2.0.0 > > > Instructions: > First of all, please read the related documents briefly (still under review, > will replace with formal links if merged): > * Disaggregated State Management: > [https://github.com/apache/flink/pull/26107/files#diff-bfa19e04bb5c3487c3e9bf514d61c0fa8bb973950fb0ad0e3d4a6898a99b83e3] > * State V2: > [https://github.com/apache/flink/pull/26107/files#diff-5d1147987fecbda329132403c1d92384575be220092995c4be491e12b8c50cc9] > * ForSt State Backend: > [https://github.com/apache/flink/pull/26107/files#diff-b7c52c06f6ed4d5af6f230d11ba23ea051bf4a08c589d98392143f080c468a87] > For the SQL part, verification goes in FLINK-37068, we mainly focus on > Datastream jobs and APIs here. > 1. Make sure you are verifying this on release-2.0 branch, since we have > fixed several bugs since the rc0 package. > 2. Choose one example in `flink-examples-streaming`. Most of the jobs has > been rewritten using new API. Here we take `StateMachineExample` as an > example. > 3. Compile and run `StateMachineExample` in proper environment (I suggest a > standalone session cluster or yarn), make sure you have the following command > line params: > {code:bash} > ./flink run x \ > --backend forst \ > --checkpoint-dir s3://your/cp/dir \ > --incremental-checkpoints true > {code} > Or set via `config.yaml`. > {code:yaml} > state.backend.type: forst > execution.checkpointing.incremental: true > execution.checkpointing.dir: s3://your-bucket/flink-checkpoints > {code} > 4. Check the job is running smoothly, the periodic checkpoints are > successfully taken. > 5. Stop the job and restart from the latest checkpoint. > It would be great if you could write your own job using State V2 API, and > follow the above Step 3~5. It is important to check whether there is any bug > in new State APIs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36549) Using the ignore-parse-errors parameter in Debezium/Canal/Maxwell/Ogg JSON results in unexpected data loss.
[ https://issues.apache.org/jira/browse/FLINK-36549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928717#comment-17928717 ] xiaoyu commented on FLINK-36549: The PRs for versions 1.19 and 1.20 have been created. Here are the links for your reference: * [https://github.com/apache/flink/pull/26172] * https://github.com/apache/flink/pull/26173 When you have some time, I’d really appreciate it if you could help review them. Thank you in advance for your time and support![~libenchao] > Using the ignore-parse-errors parameter in Debezium/Canal/Maxwell/Ogg JSON > results in unexpected data loss. > --- > > Key: FLINK-36549 > URL: https://issues.apache.org/jira/browse/FLINK-36549 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.17.2, 1.18.1, 1.19.1 >Reporter: jiangyu >Assignee: xiaoyu >Priority: Critical > Labels: pull-request-available > > In Debezium/Canal/Maxwell/Ogg JSON, setting {{ignore-parse-errors}} would > cause data loss if an operator chained with a format-related operator > encounters an exception. The reason is that in the deserialization > implementation of Debezium/Canal/Maxwell/Ogg JSON, enabling the > {{ignore-parse-errors}} parameter skips exceptions related to the format's > emitted data. For example, in Canal's JSON code, enabling the > {{ignore-parse-errors}} parameter catches and skips exceptions for {{emitRow}} > {code:java} > @Override > public void deserialize(@Nullable byte[] message, Collector out) > throws IOException { > if (message == null || message.length == 0) { > return; > } > try { > final JsonNode root = jsonDeserializer.deserializeToJsonNode(message); > if (database != null) { > if (!databasePattern > .matcher(root.get(ReadableMetadata.DATABASE.key).asText()) > .matches()) { > return; > } > } > if (table != null) { > if (!tablePattern > .matcher(root.get(ReadableMetadata.TABLE.key).asText()) > .matches()) { > return; > } > } > final GenericRowData row = (GenericRowData) > jsonDeserializer.convertToRowData(root); > String type = row.getString(2).toString(); // "type" field > if (OP_INSERT.equals(type)) { > // "data" field is an array of row, contains inserted rows > ArrayData data = row.getArray(0); > for (int i = 0; i < data.size(); i++) { > GenericRowData insert = (GenericRowData) data.getRow(i, > fieldCount); > insert.setRowKind(RowKind.INSERT); > emitRow(row, insert, out); > } > } else if (OP_UPDATE.equals(type)) { > // "data" field is an array of row, contains new rows > ArrayData data = row.getArray(0); > // "old" field is an array of row, contains old values > ArrayData old = row.getArray(1); > for (int i = 0; i < data.size(); i++) { > // the underlying JSON deserialization schema always produce > GenericRowData. > GenericRowData after = (GenericRowData) data.getRow(i, > fieldCount); > GenericRowData before = (GenericRowData) old.getRow(i, > fieldCount); > final JsonNode oldField = root.get(FIELD_OLD); > for (int f = 0; f < fieldCount; f++) { > if (before.isNullAt(f) && > oldField.findValue(fieldNames.get(f)) == null) { > // fields in "old" (before) means the fields are > changed > // fields not in "old" (before) means the fields are > not changed > // so we just copy the not changed fields into before > before.setField(f, after.getField(f)); > } > } > before.setRowKind(RowKind.UPDATE_BEFORE); > after.setRowKind(RowKind.UPDATE_AFTER); > emitRow(row, before, out); > emitRow(row, after, out); > } > } else if (OP_DELETE.equals(type)) { > // "data" field is an array of row, contains deleted rows > ArrayData data = row.getArray(0); > for (int i = 0; i < data.size(); i++) { > GenericRowData insert = (GenericRowData) data.getRow(i, > fieldCount); > insert.setRowKind(RowKind.DELETE); > emitRow(row, insert, out); > } > } else if (OP_CREATE.equals(type)) { > // "data"
Re: [PR] [FLINK-37327] [Formats (JSON, Avro, Parquet, ORC, SequenceFile)] Debezium Avro Format: Add FormatOption to Optionally Skip emitting UPDATE_BEFORE Rows [flink]
davidradl commented on code in PR #26159: URL: https://github.com/apache/flink/pull/26159#discussion_r1963417984 ## docs/content.zh/docs/connectors/table/formats/debezium.md: ## @@ -320,6 +320,13 @@ Flink 提供了 `debezium-avro-confluent` 和 `debezium-json` 两种 format 来 String Bearer auth token for Schema Registry + +debezium-avro-confluent.upsert-mode +optional +false +Boolean +In upsert mode the format will not produce UPDATE_BEFORE Rows from Debezium op='u' change events when deserializing. Review Comment: nit: Rows ->rows -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37329][table-planner] Skip Source Stats Collection When table.optimizer.source.report-statistics-enabled is False [flink]
davidradl commented on code in PR #26162: URL: https://github.com/apache/flink/pull/26162#discussion_r1963420772 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java: ## @@ -115,25 +115,26 @@ private TableStats recomputeStatistics( FilterPushDownSpec filterPushDownSpec, boolean reportStatEnabled) { TableStats origTableStats = table.getStatistic().getTableStats(); +// bail out early if stats computation is disabled. +if (!reportStatEnabled) { +return origTableStats; +} DynamicTableSource tableSource = table.tableSource(); if (filterPushDownSpec != null && !filterPushDownSpec.isAllPredicatesRetained()) { // filter push down but some predicates are accepted by source and not in reaming // predicates // the catalog do not support get statistics with filters, -// so only call reportStatistics method if reportStatEnabled is true +// so only call reportStatistics Review Comment: nit: the comment now does not read well. "so only call reportStatistics" implies there will be more to the sentence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36409] Publish some autoscaler metrics during stabilisation period [flink-kubernetes-operator]
mxm commented on PR #945: URL: https://github.com/apache/flink-kubernetes-operator/pull/945#issuecomment-2671259434 I'm ok with not having flag / config option. I do see that adding the changes under a feature flag is of little use. Plus, we have too many config options already. You will likely still have gaps in the metric processing because of the job restarts for which metrics aren't available, but it may help users to better understand what's happening with the cluster during the stabilization period. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Fix post-release japicmp update script for macOS [flink]
flinkbot commented on PR #26182: URL: https://github.com/apache/flink/pull/26182#issuecomment-2671276189 ## CI report: * 90fcdf11fa4a9d9933965f4430d7a6c2d3fb182c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]
davidradl commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1963430852 ## flink-python/docs/reference/pyflink.table/catalog.rst: ## @@ -250,3 +250,31 @@ A CatalogDescriptor is a template for creating a catalog instance. It closely re :toctree: api/ CatalogDescriptor.of + + +ObjectIdentifier + + +Identifies an object in a catalog. It allows to identify objects such as tables, views, Review Comment: nit: `It allows to identify objects ` does not read well. I suggest `Identifies an object in a catalog, including tables, views, functions and types.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]
davidradl commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1963431910 ## flink-python/docs/reference/pyflink.table/catalog.rst: ## @@ -250,3 +250,31 @@ A CatalogDescriptor is a template for creating a catalog instance. It closely re :toctree: api/ CatalogDescriptor.of + + +ObjectIdentifier + + +Identifies an object in a catalog. It allows to identify objects such as tables, views, +function, or types in a catalog. An identifier must be fully qualified. It is the Review Comment: nit: An identifier -> ObjectIdentifier An identifier must be fully qualified. -> An identifier must be fully qualified, by this we mean . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]
davidradl commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1963432849 ## flink-python/docs/reference/pyflink.table/catalog.rst: ## @@ -250,3 +250,31 @@ A CatalogDescriptor is a template for creating a catalog instance. It closely re :toctree: api/ CatalogDescriptor.of + + +ObjectIdentifier + + +Identifies an object in a catalog. It allows to identify objects such as tables, views, +function, or types in a catalog. An identifier must be fully qualified. It is the +responsibility of the catalog manager to resolve an identifier to an object. Review Comment: nit: identifier -> objectIdentifier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-37359] Use pinned qemu image [flink-kubernetes-operator]
gyfora opened a new pull request, #946: URL: https://github.com/apache/flink-kubernetes-operator/pull/946 ## Brief change log Use fixed qemu image as new latest seems to be broken ## Verifying this change CI ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]
davidradl commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1963431910 ## flink-python/docs/reference/pyflink.table/catalog.rst: ## @@ -250,3 +250,31 @@ A CatalogDescriptor is a template for creating a catalog instance. It closely re :toctree: api/ CatalogDescriptor.of + + +ObjectIdentifier + + +Identifies an object in a catalog. It allows to identify objects such as tables, views, +function, or types in a catalog. An identifier must be fully qualified. It is the Review Comment: nit: An identifier -> ObjectIdentifier ( think it is always best to use the same word to describe something ) An identifier must be fully qualified. -> An identifier must be fully qualified, by this we mean . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-2.0][FLINK-37149][doc] Add documents for adaptive batch execution. [flink]
JunRuiLee merged PR #26181: URL: https://github.com/apache/flink/pull/26181 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-2.0][FLINK-37149][doc] Add documents for adaptive batch execution. [flink]
JunRuiLee opened a new pull request, #26181: URL: https://github.com/apache/flink/pull/26181 ## What is the purpose of the change [BP-2.0][FLINK-37149][doc] Add documents for adaptive batch execution. ## Brief change log [BP-2.0][FLINK-37149][doc] Add documents for adaptive batch execution. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37350][table-planner] Fix memory leak caused by skewed join optimization strategy [flink]
JunRuiLee commented on code in PR #26180: URL: https://github.com/apache/flink/pull/26180#discussion_r1963051617 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/VertexParallelismAndInputInfosDeciderUtils.java: ## @@ -658,4 +660,148 @@ public static long calculateDataVolumePerTaskForInput( long globalDataVolumePerTask, long inputsGroupBytes, long totalDataBytes) { return (long) ((double) inputsGroupBytes / totalDataBytes * globalDataVolumePerTask); } + +public static Optional constructOptimizationLog( +BlockingInputInfo inputInfo, JobVertexInputInfo jobVertexInputInfo) { +if (inputInfo.areInterInputsKeysCorrelated() && inputInfo.isIntraInputKeyCorrelated()) { +return Optional.empty(); +} +boolean optimized = false; +List executionVertexInputInfos = +jobVertexInputInfo.getExecutionVertexInputInfos(); +int parallelism = executionVertexInputInfos.size(); +long[] optimizedDataBytes = new long[parallelism]; +long optimizedMin = Long.MAX_VALUE, optimizedMax = 0; +long[] nonOptimizedDataBytes = new long[parallelism]; +long nonOptimizedMin = Long.MAX_VALUE, nonOptimizedMax = 0; +for (int i = 0; i < parallelism; ++i) { +Map consumedSubpartitionGroups = + executionVertexInputInfos.get(i).getConsumedSubpartitionGroups(); +for (Map.Entry entry : consumedSubpartitionGroups.entrySet()) { +IndexRange partitionRange = entry.getKey(); +IndexRange subpartitionRange = entry.getValue(); +optimizedDataBytes[i] += +inputInfo.getNumBytesProduced(partitionRange, subpartitionRange); +} +optimizedMin = Math.min(optimizedMin, optimizedDataBytes[i]); +optimizedMax = Math.max(optimizedMax, optimizedDataBytes[i]); + +Map nonOptimizedConsumedSubpartitionGroup = +computeNumBasedConsumedSubpartitionGroup(parallelism, i, inputInfo); +checkState(nonOptimizedConsumedSubpartitionGroup.size() == 1); +nonOptimizedDataBytes[i] += +inputInfo.getNumBytesProduced( +nonOptimizedConsumedSubpartitionGroup +.entrySet() +.iterator() +.next() +.getKey(), +nonOptimizedConsumedSubpartitionGroup +.entrySet() +.iterator() +.next() +.getValue()); +nonOptimizedMin = Math.min(nonOptimizedMin, nonOptimizedDataBytes[i]); +nonOptimizedMax = Math.max(nonOptimizedMax, nonOptimizedDataBytes[i]); + +if (!optimized +&& !consumedSubpartitionGroups.equals(nonOptimizedConsumedSubpartitionGroup)) { +optimized = true; +} +} +if (optimized) { +long optimizedMed = median(optimizedDataBytes); +long nonOptimizedMed = median(nonOptimizedDataBytes); +String logMessage = +String.format( +"Result id: %s, " ++ "type number: %d, " ++ "input data size: " ++ "[ Before: {min: %s, median: %s, max: %s}, " ++ "After: {min: %s, median: %s, max: %s} ]", +inputInfo.getResultId(), +inputInfo.getInputTypeNumber(), +new MemorySize(nonOptimizedMin).toHumanReadableString(), +new MemorySize(nonOptimizedMed).toHumanReadableString(), +new MemorySize(nonOptimizedMax).toHumanReadableString(), +new MemorySize(optimizedMin).toHumanReadableString(), +new MemorySize(optimizedMed).toHumanReadableString(), +new MemorySize(optimizedMax).toHumanReadableString()); +return Optional.of(logMessage); +} +return Optional.empty(); +} + +private static Map computeNumBasedConsumedSubpartitionGroup( +int parallelism, int currentIndex, BlockingInputInfo inputInfo) { +int sourceParallelism = inputInfo.getNumPartitions(); + +if (inputInfo.isPointwise()) { +return computeNumBasedConsumedSubpartitionGroupForPointwise( +sourceParallelism, parallelism, currentIndex, inputInfo::getNumSubpartitions); +} else { +return computeNumBasedConsumedSubpartitionGroupForAllToAll( +
[jira] [Created] (FLINK-37359) Docker build fails for Kubernetes operator
Gyula Fora created FLINK-37359: -- Summary: Docker build fails for Kubernetes operator Key: FLINK-37359 URL: https://issues.apache.org/jira/browse/FLINK-37359 Project: Flink Issue Type: Bug Components: Kubernetes Operator Reporter: Gyula Fora All docker build started to fail with the following error: {noformat} 20.45 Setting up libc-bin (2.35-0ubuntu3.9) ... 20.59 qemu: uncaught target signal 11 (Segmentation fault) - core dumped 20.95 Segmentation fault (core dumped) 21.00 qemu: uncaught target signal 11 (Segmentation fault) - core dumped 21.37 Segmentation fault (core dumped) Sub-process /usr/bin/dpkg returned an error code (1) -- WARNING: No output specified for bake-platform target(s) with docker-container driver. Build result will only remain in the build cache. To push result image into registry use --push or to load image into docker use --load Dockerfile:68 66 | # Updating Debian 67 | RUN if [ "$SKIP_OS_UPDATE" = "false" ]; then apt-get update; fi 68 | >>> RUN if [ "$SKIP_OS_UPDATE" = "false" ]; then apt-get upgrade -y; fi 69 | 70 | ARG DISABLE_JEMALLOC=false ERROR: failed to solve: process "/bin/sh -c if [ \"$SKIP_OS_UPDATE\" = \"false\" ]; then apt-get upgrade -y; fi" did not complete successfully: exit code: 100{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-37356) Recycle use of kafka producer(which commit error) maybe send data without AddPartitionsToTxnRequest
[ https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928706#comment-17928706 ] Hongshun Wang commented on FLINK-37356: --- [~arvid] . The flink log is lost, thus I have to try in logically. I will add flink log next time I met again. I will try describe it in a small case: 1. KafkaWriter some data in a transaction 1 to {color:#172b4d}{color:#0747a6}partition A{color} {color}with {color:#FF}Producer1{color}, then generate KafkaCommittable to KafkaCommitter. At this time, transactionManager in {color:#de350b}Producer 1{color} save {color:#4c9aff}Partition A {color}into partitionsInTransaction after ADD_OFFSETS_TO_TXN. 2. KafkaCommitter try to commit transaction 1, but fail with OOM(or else) . Thus, KafkaCommitter will recycle {color:#de350b}Producer 1{color} (put back into producerPool of KafkaWriter). But in this step, we doesn't clear the partitionsInTransaction of {color:#de350b}Producer 1. {color}{color:#de350b} The KafkaCommitter and KafkaWriter are not in same thread, thus before KafkaCommitter stop the job, KafkaWriter can get {color:#de350b}Producer 1 from producerPool.{color}{color} 3. KafkaWriter some data in a transaction 2 to {color:#172b4d}{color:#0747a6}partition A{color} {color}with {color:#de350b}Producer 1(which is from producerPool).{color:#172b4d} When try to add partition A to transaction, it turn out that {color:#4c9aff}Partition A {color:#172b4d}is already in partitionsInTransaction of transactionManager. Thus, it won't send {color:#de350b}{color:#172b4d}{color:#4c9aff}Partition A {color:#172b4d}with ADD_OFFSETS_TO_TXN again, but directly write into the cluster.{color}{color}{color}{color}{color}{color}{color}{color} > Recycle use of kafka producer(which commit error) maybe send data without > AddPartitionsToTxnRequest > --- > > Key: FLINK-37356 > URL: https://issues.apache.org/jira/browse/FLINK-37356 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.4.0 >Reporter: Hongshun Wang >Priority: Major > > In my production environment, READ_COMMITTED consumer can no longer consume > any records. Then I found that the LSO of the partition doesn't change for a > long time. I lookup all the log in Kafka cluster, then find that there is a > transaction lacking AddPartitionsToTxnRequest. > > At first, I think the problem is caused by > https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster > log contains InvalidTxnStateException. However, though the transaction is in > an invalid state, no data is written into Kafka topic partition in this > transaction( because in this case, the transaction is empty). It will not > influence any Kafka topic partition's LSO, thus consumer won't be blocked. > > Then I check the code of Kafka client, it seems no way to produce data > without AddPartitionsToTxnRequest done because the the `Sender` will refuse > to dequeue batches from the accumulator until they have been added to the > transaction. > {code:java} > // org.apache.kafka.clients.producer.KafkaProducer#doSend > private Future doSend(ProducerRecord record, Callback > callback) { > // ..ignore code > // Add the partition to the transaction (if in progress) after it has been > successfully > // appended to the accumulator. We cannot do it before because the partition > may be > // unknown or the initially selected partition may be changed when the batch > is closed > // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse > to dequeue > // batches from the accumulator until they have been added to the transaction. > if (transactionManager != null) { > transactionManager.maybeAddPartition(appendCallbacks.topicPartition()); > // ignore code > }{code} > {code:java} > //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode > > if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // > there is a rare case that a single batch size is larger than the request size > due to // compression; in this case we will still eventually send this batch > in a single request break; } else { if > (shouldStopDrainBatchesForPartition(first, tp)) break; } > {code} > > Then I have a idea that if a TransactionManager which doesn't clear > partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will > be sent again. It maybe happen in Flink Kafka connector: > > 1. The flink kafka connector also reuse and recycle KafkaProducer: > KafkaCommitter will recycle the producer to > producerPool after the transaction complete or exception, and then > KafkaWriter will reuse it from producerPoo
Re: [PR] [FLINK-36409] Publish some autoscaler metrics during stabilisation period [flink-kubernetes-operator]
mxm commented on PR #945: URL: https://github.com/apache/flink-kubernetes-operator/pull/945#issuecomment-2671149679 > I guess Max is under the assumption that the current logic does not collect metrics during the stabilization period. We do collect samples, and once the stabilization is over we even evaluate them. This PR does not change that logic, so not sure what should be controlled by a flag. The only thing the PR does is that it reports those metrics. Can you clarify? I might missing something obvious from the current logic. You're right, we already return metrics from the stabilization phase, but only to measure the observed true processing rate. In the original model, we only returned metrics once the metric window was full. I think that was more elegant, but the source metrics proved not reliable enough that we had to manually measure the processing capacity instead of always relying on the processing rate and busyness metrics of sources. I might be a bit pedantic here, but I want to see the actual metrics used for evaluation reported as autoscaler metrics. Reporting metrics during stabilization removes that clarity. You can only observe what the assumptions of the autoscaler were, if you observed what is actually used for evaluation. That's why I suggested to put reporting autoscaler metrics during the stabilization period behind a flag. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36409] Publish some autoscaler metrics during stabilisation period [flink-kubernetes-operator]
mxm commented on code in PR #945: URL: https://github.com/apache/flink-kubernetes-operator/pull/945#discussion_r1963310240 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -149,19 +149,31 @@ public CollectedMetricHistory updateMetrics( // Add scaling metrics to history if they were computed successfully metricHistory.put(now, scalingMetrics); -if (isStabilizing) { -LOG.info("Stabilizing until {}", readable(stableTime)); -stateStore.storeCollectedMetrics(ctx, metricHistory); -return new CollectedMetricHistory(topology, Collections.emptySortedMap(), jobRunningTs); -} - var collectedMetrics = new CollectedMetricHistory(topology, metricHistory, jobRunningTs); if (now.isBefore(windowFullTime)) { -LOG.info("Metric window not full until {}", readable(windowFullTime)); +if (isStabilizing) { +LOG.info( +"Stabilizing... until {}. {} samples collected", Review Comment: True, we return the metrics from the stabilization period once the stabilization period is over. They are used to calculate the observed true processing rate. When the window is full, we clear the metrics from the stabilization period and mark the window as fully collected. I still think that returning this logging message ("XX samples collected") to the user during stabilization is confusing. It suggests that these metrics will be evaluated for scaling decisions, which they are not. We are merely measuring the processing capacity of the sources. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/DateTimeUtils.java: ## @@ -26,7 +26,7 @@ public class DateTimeUtils { private static final DateTimeFormatter DEFAULT_FORMATTER = -DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss"); +DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss.SSS"); Review Comment: AFAIK we don't use any milliseconds in the tests, but advance only by full seconds. I think this may have unexpected consequences for the storage and logging layer. If this is a test-only change, then it should go into test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37350][table-planner] Fix memory leak caused by skewed join optimization strategy [flink]
JunRuiLee commented on code in PR #26180: URL: https://github.com/apache/flink/pull/26180#discussion_r1963052390 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/VertexParallelismAndInputInfosDeciderUtils.java: ## @@ -658,4 +660,148 @@ public static long calculateDataVolumePerTaskForInput( long globalDataVolumePerTask, long inputsGroupBytes, long totalDataBytes) { return (long) ((double) inputsGroupBytes / totalDataBytes * globalDataVolumePerTask); } + +public static Optional constructOptimizationLog( +BlockingInputInfo inputInfo, JobVertexInputInfo jobVertexInputInfo) { +if (inputInfo.areInterInputsKeysCorrelated() && inputInfo.isIntraInputKeyCorrelated()) { +return Optional.empty(); +} +boolean optimized = false; +List executionVertexInputInfos = +jobVertexInputInfo.getExecutionVertexInputInfos(); +int parallelism = executionVertexInputInfos.size(); +long[] optimizedDataBytes = new long[parallelism]; +long optimizedMin = Long.MAX_VALUE, optimizedMax = 0; +long[] nonOptimizedDataBytes = new long[parallelism]; +long nonOptimizedMin = Long.MAX_VALUE, nonOptimizedMax = 0; +for (int i = 0; i < parallelism; ++i) { +Map consumedSubpartitionGroups = + executionVertexInputInfos.get(i).getConsumedSubpartitionGroups(); +for (Map.Entry entry : consumedSubpartitionGroups.entrySet()) { +IndexRange partitionRange = entry.getKey(); +IndexRange subpartitionRange = entry.getValue(); +optimizedDataBytes[i] += +inputInfo.getNumBytesProduced(partitionRange, subpartitionRange); +} +optimizedMin = Math.min(optimizedMin, optimizedDataBytes[i]); +optimizedMax = Math.max(optimizedMax, optimizedDataBytes[i]); + +Map nonOptimizedConsumedSubpartitionGroup = +computeNumBasedConsumedSubpartitionGroup(parallelism, i, inputInfo); +checkState(nonOptimizedConsumedSubpartitionGroup.size() == 1); +nonOptimizedDataBytes[i] += +inputInfo.getNumBytesProduced( +nonOptimizedConsumedSubpartitionGroup +.entrySet() +.iterator() +.next() +.getKey(), +nonOptimizedConsumedSubpartitionGroup +.entrySet() +.iterator() +.next() +.getValue()); +nonOptimizedMin = Math.min(nonOptimizedMin, nonOptimizedDataBytes[i]); +nonOptimizedMax = Math.max(nonOptimizedMax, nonOptimizedDataBytes[i]); + +if (!optimized +&& !consumedSubpartitionGroups.equals(nonOptimizedConsumedSubpartitionGroup)) { +optimized = true; +} +} +if (optimized) { +long optimizedMed = median(optimizedDataBytes); +long nonOptimizedMed = median(nonOptimizedDataBytes); +String logMessage = +String.format( +"Result id: %s, " ++ "type number: %d, " ++ "input data size: " ++ "[ Before: {min: %s, median: %s, max: %s}, " ++ "After: {min: %s, median: %s, max: %s} ]", +inputInfo.getResultId(), +inputInfo.getInputTypeNumber(), +new MemorySize(nonOptimizedMin).toHumanReadableString(), +new MemorySize(nonOptimizedMed).toHumanReadableString(), +new MemorySize(nonOptimizedMax).toHumanReadableString(), +new MemorySize(optimizedMin).toHumanReadableString(), +new MemorySize(optimizedMed).toHumanReadableString(), +new MemorySize(optimizedMax).toHumanReadableString()); +return Optional.of(logMessage); +} +return Optional.empty(); +} + +private static Map computeNumBasedConsumedSubpartitionGroup( +int parallelism, int currentIndex, BlockingInputInfo inputInfo) { +int sourceParallelism = inputInfo.getNumPartitions(); + +if (inputInfo.isPointwise()) { +return computeNumBasedConsumedSubpartitionGroupForPointwise( +sourceParallelism, parallelism, currentIndex, inputInfo::getNumSubpartitions); +} else { +return computeNumBasedConsumedSubpartitionGroupForAllToAll( +
Re: [PR] [FLINK-37348][pipeline-paimon] fix paimon ArrayIndexOutOfBoundsException when add column first [flink-cdc]
MOBIN-F commented on code in PR #3925: URL: https://github.com/apache/flink-cdc/pull/3925#discussion_r1961136374 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java: ## @@ -463,12 +463,20 @@ public void testAddColumnWithPosition(String metastore) "col7_after", org.apache.flink.cdc.common.types.DataTypes.STRING()), "col2")); +addedColumns.add( +AddColumnEvent.before( +Column.physicalColumn( +"col4_first_before", + org.apache.flink.cdc.common.types.DataTypes.STRING()), +"col1")); Review Comment: fix~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36659] Bump to 2.0-preview1. [flink-connector-jdbc]
leonardBang commented on code in PR #153: URL: https://github.com/apache/flink-connector-jdbc/pull/153#discussion_r1944348435 ## flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java: ## @@ -106,66 +92,66 @@ void testEnrichedClassCastException() { } } -private void runTest(boolean exploitParallelism) throws Exception { -ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); -JdbcInputFormat.JdbcInputFormatBuilder inputBuilder = -JdbcInputFormat.buildJdbcInputFormat() -.setDrivername(getMetadata().getDriverClass()) -.setDBUrl(getMetadata().getJdbcUrl()) -.setQuery(SELECT_ALL_BOOKS) -.setRowTypeInfo(ROW_TYPE_INFO); - -if (exploitParallelism) { -final int fetchSize = 1; -final long min = TEST_DATA[0].id; -final long max = TEST_DATA[TEST_DATA.length - fetchSize].id; -// use a "splittable" query to exploit parallelism -inputBuilder = -inputBuilder -.setQuery(SELECT_ALL_BOOKS_SPLIT_BY_ID) -.setParametersProvider( -new JdbcNumericBetweenParametersProvider(min, max) -.ofBatchSize(fetchSize)); -} -DataSet source = environment.createInput(inputBuilder.finish()); - -// NOTE: in this case (with Derby driver) setSqlTypes could be skipped, but -// some databases don't null values correctly when no column type was specified -// in PreparedStatement.setObject (see its javadoc for more details) -org.apache.flink.connector.jdbc.JdbcConnectionOptions connectionOptions = -new org.apache.flink.connector.jdbc.JdbcConnectionOptions -.JdbcConnectionOptionsBuilder() -.withUrl(getMetadata().getJdbcUrl()) -.withDriverName(getMetadata().getDriverClass()) -.build(); - -JdbcOutputFormat jdbcOutputFormat = -new JdbcOutputFormat<>( -new SimpleJdbcConnectionProvider(connectionOptions), -JdbcExecutionOptions.defaults(), -() -> -createSimpleRowExecutor( -String.format(INSERT_TEMPLATE, OUTPUT_TABLE), -new int[] { -Types.INTEGER, -Types.VARCHAR, -Types.VARCHAR, -Types.DOUBLE, -Types.INTEGER -})); -source.output(new TestOutputFormat(jdbcOutputFormat)); -environment.execute(); - -try (Connection dbConn = DriverManager.getConnection(getMetadata().getJdbcUrl()); -PreparedStatement statement = dbConn.prepareStatement(SELECT_ALL_NEWBOOKS); -ResultSet resultSet = statement.executeQuery()) { -int count = 0; -while (resultSet.next()) { -count++; -} -assertThat(count).isEqualTo(TEST_DATA.length); -} -} +//private void runTest(boolean exploitParallelism) throws Exception { Review Comment: ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-2.0][FLINK-37149][doc] Add documents for adaptive batch execution. [flink]
flinkbot commented on PR #26181: URL: https://github.com/apache/flink/pull/26181#issuecomment-2670908708 ## CI report: * f5ea5d1375ce527720ee8be6fd8081f5805535a5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35014] SqlNode to operation conversion for models [flink]
lihaosky commented on code in PR #25834: URL: https://github.com/apache/flink/pull/25834#discussion_r1964599071 ## flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; +import org.apache.calcite.util.NlsString; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * ALTER MODEL [IF EXISTS] [[catalogName.] dataBasesName.]modelName RESET ( 'key1' [, 'key2']...). + */ +public class SqlAlterModelReset extends SqlAlterModel { +private final SqlNodeList optionKeyList; + +public SqlAlterModelReset( +SqlParserPos pos, +SqlIdentifier modelName, +boolean ifModelExists, +SqlNodeList optionKeyList) { +super(pos, modelName, ifModelExists); +this.optionKeyList = requireNonNull(optionKeyList, "optionKeyList should not be null"); +} + +@Override +public List getOperandList() { +return ImmutableNullableList.of(modelName, optionKeyList); Review Comment: Didn't change this. Let me know if you want me to change this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34467] add lineage integration for jdbc connector [flink-connector-jdbc]
HuangZhenQiu commented on code in PR #149: URL: https://github.com/apache/flink-connector-jdbc/pull/149#discussion_r1964608069 ## flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/Db2LocationExtractor.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.db2.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocation; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor; +import org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor; + +import java.net.URISyntaxException; +import java.util.Properties; + +/** + * Implementation of {@link JdbcLocationExtractor} for DB2. + * + * @see https://www.ibm.com/docs/en/db2woc?topic=programmatically-jdbc";>DB2 URL Format + */ +@Internal +public class Db2LocationExtractor implements JdbcLocationExtractor { + +private JdbcLocationExtractor delegate() { +return new OverrideJdbcLocationExtractor("db2"); Review Comment: Make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35724][oss] Bump OSS SDK to 3.17.4 [flink]
JunRuiLee merged PR #26177: URL: https://github.com/apache/flink/pull/26177 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-37333) Use ForwardForUnspecifiedPartitioner when adaptive broadcast join takes effect
[ https://issues.apache.org/jira/browse/FLINK-37333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928971#comment-17928971 ] Junrui Lee commented on FLINK-37333: release-2.0 83dc6f4013854ccc5089daf7d9bc0472504e9bde master ee0c187b35df9a20ee3249f984274f012a7b5309 > Use ForwardForUnspecifiedPartitioner when adaptive broadcast join takes effect > -- > > Key: FLINK-37333 > URL: https://issues.apache.org/jira/browse/FLINK-37333 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 2.0.0 >Reporter: xingbe >Assignee: xingbe >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > According to the design, we need to change the original forward partitioner > to a rescale partitioner after the adaptive broadcast join takes effect. > Accordingly, we need to use ForwardForUnspecifiedPartitioner to achieve this > purpose. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [BP-2.0][FLINK-35724][oss] Bump OSS SDK to 3.17.4 [flink]
JunRuiLee opened a new pull request, #26189: URL: https://github.com/apache/flink/pull/26189 (cherry picked from commit 225ef6a2a060bff13530d699d20fb8d5f1f8b7e4) ## What is the purpose of the change [BP-2.0][FLINK-35724][oss] Bump OSS SDK to 3.17.4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35936][cdc-connector][paimon] Fix paimon cdc schema evolution failure when restart job [flink-cdc]
github-actions[bot] closed pull request #3502: [FLINK-35936][cdc-connector][paimon] Fix paimon cdc schema evolution failure when restart job URL: https://github.com/apache/flink-cdc/pull/3502 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35936][cdc-connector][paimon] Fix paimon cdc schema evolution failure when restart job [flink-cdc]
github-actions[bot] commented on PR #3502: URL: https://github.com/apache/flink-cdc/pull/3502#issuecomment-2672990347 This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35622][runtime] Filter out noisy 'Coordinator of operator xxx does not exist' exceptions in CollectResultFetcher [flink]
JunRuiLee merged PR #26171: URL: https://github.com/apache/flink/pull/26171 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-35622) Filter out noisy "Coordinator of operator xxxx does not exist" exceptions in batch mode
[ https://issues.apache.org/jira/browse/FLINK-35622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Junrui Lee closed FLINK-35622. -- Fix Version/s: 2.0.1 Resolution: Fixed master c92f208f4a074812b5314f2b4858f244b5ecd9fe > Filter out noisy "Coordinator of operator does not exist" exceptions in > batch mode > --- > > Key: FLINK-35622 > URL: https://issues.apache.org/jira/browse/FLINK-35622 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Coordination >Reporter: Junrui Li >Assignee: Biao Geng >Priority: Major > Labels: pull-request-available > Fix For: 2.0.1 > > > In batch mode, the Flink JobManager logs frequently print "Coordinator of > operator does not exist or the job vertex this operator belongs to is > not initialized." exceptions when using the collect() method. > This exception is caused by the collect sink attempting to fetch data from > the corresponding operator coordinator on the JM based on the operator ID. > However, batch jobs do not initialize all job vertices at the beginning, and > instead, initialize them in a sequential manner. If a job vertex has not been > initialized yet, the corresponding coordinator cannot be found, leading to > the printing of this message. > These exceptions are harmless and do not affect the job execution, but they > can clutter the logs and make it difficult to find relevant information, > especially for large-scale batch jobs. > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35622) Filter out noisy "Coordinator of operator xxxx does not exist" exceptions in batch mode
[ https://issues.apache.org/jira/browse/FLINK-35622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Junrui Lee reassigned FLINK-35622: -- Assignee: Biao Geng > Filter out noisy "Coordinator of operator does not exist" exceptions in > batch mode > --- > > Key: FLINK-35622 > URL: https://issues.apache.org/jira/browse/FLINK-35622 > Project: Flink > Issue Type: Technical Debt > Components: Runtime / Coordination >Reporter: Junrui Li >Assignee: Biao Geng >Priority: Major > Labels: pull-request-available > > In batch mode, the Flink JobManager logs frequently print "Coordinator of > operator does not exist or the job vertex this operator belongs to is > not initialized." exceptions when using the collect() method. > This exception is caused by the collect sink attempting to fetch data from > the corresponding operator coordinator on the JM based on the operator ID. > However, batch jobs do not initialize all job vertices at the beginning, and > instead, initialize them in a sequential manner. If a job vertex has not been > initialized yet, the corresponding coordinator cannot be found, leading to > the printing of this message. > These exceptions are harmless and do not affect the job execution, but they > can clutter the logs and make it difficult to find relevant information, > especially for large-scale batch jobs. > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-37366) Allow configurable retry for Kafka topic metadata fetch
[ https://issues.apache.org/jira/browse/FLINK-37366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reassigned FLINK-37366: -- Assignee: Shuyi Chen > Allow configurable retry for Kafka topic metadata fetch > --- > > Key: FLINK-37366 > URL: https://issues.apache.org/jira/browse/FLINK-37366 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > For high availability, we adopted a multi-primary Kafka cluster setup, so the > data of a Kafka topic will be in multiple physical clusters. In case of a > kafka cluster failure, Flink pipeline should continue to run w/o failure. > Currently, Flink pipeline will fail due to SubscriberUtils.getTopicMetadata() > throwing RuntimeException if a kafka cluster fails, causing the pipeline keep > restarting. We propose to add a configurable retry policy in > SubscriberUtils.getTopicMetadata(), so we can configure flink Kafka connector > to tolerate kafka failure for longer period of time w/o restarting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37357) Migrating SqlNode conversion logic from to SqlNodeConverters
[ https://issues.apache.org/jira/browse/FLINK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hirson Zhang updated FLINK-37357: - Summary: Migrating SqlNode conversion logic from to SqlNodeConverters (was: Migrating conversion logic to SqlNodeConverters) > Migrating SqlNode conversion logic from to SqlNodeConverters > - > > Key: FLINK-37357 > URL: https://issues.apache.org/jira/browse/FLINK-37357 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 2.0-preview >Reporter: Hirson Zhang >Priority: Major > > There is a "TODO" at class "SqlNodeToOperationConversion", method > "convertValidatedSqlNode": > "TODO: all the below conversion logic should be migrated to > SqlNodeConverters". > Part of the code: > {code:java} > /** Convert a validated sql node to Operation. */ > private static Optional convertValidatedSqlNode( > FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode > validated) { > beforeConversion(); // delegate conversion to the registered > converters first > SqlNodeConvertContext context = new SqlNodeConvertContext(flinkPlanner, > catalogManager); > Optional operation = > SqlNodeConverters.convertSqlNode(validated, context); > if (operation.isPresent()) { > return operation; > } // TODO: all the below conversion logic should be migrated to > SqlNodeConverters > SqlNodeToOperationConversion converter = > new SqlNodeToOperationConversion(flinkPlanner, catalogManager); > if (validated instanceof SqlDropCatalog) { > return Optional.of(converter.convertDropCatalog((SqlDropCatalog) > validated)); > } else if (validated instanceof SqlLoadModule) { > return Optional.of(converter.convertLoadModule((SqlLoadModule) > validated)); > } else if (validated instanceof SqlShowCurrentCatalog) { > return Optional.of( > converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) > validated)); > } else if (validated instanceof SqlShowModules) { > return Optional.of(converter.convertShowModules((SqlShowModules) > validated)); > .{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37357) Migrating SqlNode conversion logic from SqlNodeToOperationConversion to SqlNodeConverters
[ https://issues.apache.org/jira/browse/FLINK-37357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hirson Zhang updated FLINK-37357: - Summary: Migrating SqlNode conversion logic from SqlNodeToOperationConversion to SqlNodeConverters (was: Migrating SqlNode conversion logic from to SqlNodeConverters) > Migrating SqlNode conversion logic from SqlNodeToOperationConversion to > SqlNodeConverters > -- > > Key: FLINK-37357 > URL: https://issues.apache.org/jira/browse/FLINK-37357 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 2.0-preview >Reporter: Hirson Zhang >Priority: Major > > There is a "TODO" at class "SqlNodeToOperationConversion", method > "convertValidatedSqlNode": > "TODO: all the below conversion logic should be migrated to > SqlNodeConverters". > Part of the code: > {code:java} > /** Convert a validated sql node to Operation. */ > private static Optional convertValidatedSqlNode( > FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode > validated) { > beforeConversion(); // delegate conversion to the registered > converters first > SqlNodeConvertContext context = new SqlNodeConvertContext(flinkPlanner, > catalogManager); > Optional operation = > SqlNodeConverters.convertSqlNode(validated, context); > if (operation.isPresent()) { > return operation; > } // TODO: all the below conversion logic should be migrated to > SqlNodeConverters > SqlNodeToOperationConversion converter = > new SqlNodeToOperationConversion(flinkPlanner, catalogManager); > if (validated instanceof SqlDropCatalog) { > return Optional.of(converter.convertDropCatalog((SqlDropCatalog) > validated)); > } else if (validated instanceof SqlLoadModule) { > return Optional.of(converter.convertLoadModule((SqlLoadModule) > validated)); > } else if (validated instanceof SqlShowCurrentCatalog) { > return Optional.of( > converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) > validated)); > } else if (validated instanceof SqlShowModules) { > return Optional.of(converter.convertShowModules((SqlShowModules) > validated)); > .{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37366) Allow configurable retry for Kafka topic metadata fetch
Shuyi Chen created FLINK-37366: -- Summary: Allow configurable retry for Kafka topic metadata fetch Key: FLINK-37366 URL: https://issues.apache.org/jira/browse/FLINK-37366 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: Shuyi Chen For high availability, we adopted a multi-primary Kafka cluster setup, so the data of a Kafka topic will be in multiple physical clusters. In case of a kafka cluster failure, Flink pipeline should continue to run w/o failure. Currently, Flink pipeline will fail due to SubscriberUtils.getTopicMetadata() throwing RuntimeException if a kafka cluster fails, causing the pipeline keep restarting. We propose to add a configurable retry policy in SubscriberUtils.getTopicMetadata(), so we can configure flink Kafka connector to tolerate kafka failure for longer period of time w/o restarting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-2.0][FLINK-35724][oss] Bump OSS SDK to 3.17.4 [flink]
flinkbot commented on PR #26189: URL: https://github.com/apache/flink/pull/26189#issuecomment-2673252457 ## CI report: * 3f24b40dd73221a2ac8790e2ce9fdf93d7e316e9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]
beryllw commented on code in PR #3856: URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964738198 ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java: ## @@ -470,7 +470,13 @@ private List splitEvenlySizedChunks( } } // add the ending split -splits.add(ChunkRange.of(chunkStart, null)); +// assign ending split first, both the largest and smallest unbounded chunks are completed +// in the first two splits +if (sourceConfig.isAssignEndingChunkFirst()) { +splits.add(0, ChunkRange.of(chunkStart, null)); Review Comment: Considering the use of `System.arraycopy` for performance, as we need to move n-1 elements, I noticed that the `ArrayList` function `add(int index, E element)` is implemented as follows and already uses `System.arraycopy` internally. Is there anything else I need to change? ```java /** * Inserts the specified element at the specified position in this * list. Shifts the element currently at that position (if any) and * any subsequent elements to the right (adds one to their indices). * * @param index index at which the specified element is to be inserted * @param element element to be inserted * @throws IndexOutOfBoundsException {@inheritDoc} */ public void add(int index, E element) { rangeCheckForAdd(index); ensureCapacityInternal(size + 1); // Increments modCount!! System.arraycopy(elementData, index, elementData, index + 1, size - index); elementData[index] = element; size++; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]
beryllw commented on code in PR #3856: URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964738198 ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java: ## @@ -470,7 +470,13 @@ private List splitEvenlySizedChunks( } } // add the ending split -splits.add(ChunkRange.of(chunkStart, null)); +// assign ending split first, both the largest and smallest unbounded chunks are completed +// in the first two splits +if (sourceConfig.isAssignEndingChunkFirst()) { +splits.add(0, ChunkRange.of(chunkStart, null)); Review Comment: Considering the use of `System.arraycopy` for performance, I noticed that the `ArrayList` function `add(int index, E element)` is implemented as follows and already uses `System.arraycopy` internally. Is there anything else I need to change? ```java /** * Inserts the specified element at the specified position in this * list. Shifts the element currently at that position (if any) and * any subsequent elements to the right (adds one to their indices). * * @param index index at which the specified element is to be inserted * @param element element to be inserted * @throws IndexOutOfBoundsException {@inheritDoc} */ public void add(int index, E element) { rangeCheckForAdd(index); ensureCapacityInternal(size + 1); // Increments modCount!! System.arraycopy(elementData, index, elementData, index + 1, size - index); elementData[index] = element; size++; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37329][table-planner] Skip Source Stats Collection When table.optimizer.source.report-statistics-enabled is False [flink]
shameersss1 commented on PR #26162: URL: https://github.com/apache/flink/pull/26162#issuecomment-2673427565 @davidradl - I have addressed your comments. Could you please review the same ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37332] Support any column as chunk key column (postgres, orcale, db2, sqlserver) [flink-cdc]
SML0127 commented on PR #3922: URL: https://github.com/apache/flink-cdc/pull/3922#issuecomment-2673458269 > cc @lvyanquan I recently changed my laptop and some environment seems to have become a bit messed up..🥲 I'll make a new PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37332] Support any column as chunk key column (postgres, orcale, db2, sqlserver) [flink-cdc]
SML0127 closed pull request #3922: [FLINK-37332] Support any column as chunk key column (postgres, orcale, db2, sqlserver) URL: https://github.com/apache/flink-cdc/pull/3922 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36409] Publish some autoscaler metrics during stabilisation period [flink-kubernetes-operator]
morhidi merged PR #945: URL: https://github.com/apache/flink-kubernetes-operator/pull/945 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]
beryllw commented on code in PR #3856: URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964751251 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java: ## @@ -313,4 +313,12 @@ public class MySqlDataSourceOptions { .defaultValue(true) .withDescription( "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format."); + +@Experimental +public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST = + ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled") Review Comment: Given that "unboundary" is not a commonly used term and may cause confusion, I have decided to use "unbounded" instead. Therefore, the configuration item will be `scan.incremental.snapshot.unbounded-chunk-first.enabled`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]
leonardBang commented on code in PR #3856: URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964761727 ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java: ## @@ -470,7 +470,13 @@ private List splitEvenlySizedChunks( } } // add the ending split -splits.add(ChunkRange.of(chunkStart, null)); +// assign ending split first, both the largest and smallest unbounded chunks are completed +// in the first two splits +if (sourceConfig.isAssignEndingChunkFirst()) { +splits.add(0, ChunkRange.of(chunkStart, null)); Review Comment: No, thanks for your explanation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]
leonardBang commented on code in PR #3856: URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964756149 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java: ## @@ -313,4 +313,12 @@ public class MySqlDataSourceOptions { .defaultValue(true) .withDescription( "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format."); + +@Experimental +public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST = + ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled") Review Comment: +1 from my side, please keep go on -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]
beryllw commented on code in PR #3856: URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964738198 ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java: ## @@ -470,7 +470,13 @@ private List splitEvenlySizedChunks( } } // add the ending split -splits.add(ChunkRange.of(chunkStart, null)); +// assign ending split first, both the largest and smallest unbounded chunks are completed +// in the first two splits +if (sourceConfig.isAssignEndingChunkFirst()) { +splits.add(0, ChunkRange.of(chunkStart, null)); Review Comment: You mentioned using `System.arraycopy` for performance as we need to move n-1 elements. The `ArrayList` function `add(int index, E element)` is implemented as follows, using `System.arraycopy` internally. Is there anything else I need to change? ```java /** * Inserts the specified element at the specified position in this * list. Shifts the element currently at that position (if any) and * any subsequent elements to the right (adds one to their indices). * * @param index index at which the specified element is to be inserted * @param element element to be inserted * @throws IndexOutOfBoundsException {@inheritDoc} */ public void add(int index, E element) { rangeCheckForAdd(index); ensureCapacityInternal(size + 1); // Increments modCount!! System.arraycopy(elementData, index, elementData, index + 1, size - index); elementData[index] = element; size++; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37313] Fix the problem of reading binlog before the high and low watermarks during the snapshot process [flink-cdc]
gongzexin closed pull request #3920: [FLINK-37313] Fix the problem of reading binlog before the high and low watermarks during the snapshot process URL: https://github.com/apache/flink-cdc/pull/3920 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37313] Fix the problem of reading binlog before the high and low watermarks during the snapshot process [flink-cdc]
gongzexin commented on PR #3920: URL: https://github.com/apache/flink-cdc/pull/3920#issuecomment-2673182080 > Hi, @gongzexin. > > I think that the problem that you met was resolved in #3902 as [#3902 (comment)](https://github.com/apache/flink-cdc/pull/3902#issuecomment-2639121416) describes, can you check for this? Hi, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35360] support Flink cdc pipeline Yarn application mode [flink-cdc]
joyCurry30 commented on PR #3643: URL: https://github.com/apache/flink-cdc/pull/3643#issuecomment-2673185291 Could you please add a document, like this: https://github.com/apache/flink-cdc/blob/master/docs/content.zh/docs/deployment/yarn.md ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]
leonardBang commented on code in PR #3856: URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1964634199 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java: ## @@ -313,4 +313,12 @@ public class MySqlDataSourceOptions { .defaultValue(true) .withDescription( "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format."); + +@Experimental +public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST = + ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled") Review Comment: I mean ConfigOptions.key("scan.incremental.snapshot.boundary-chunk-first.enabled") with default value `true`, currently we assign first chunk(unboundary chunk) and then assign boundary chunk, right? But the two ConfigOptions make sense to me when you use different default value, feel free to choose you flavor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37332] Support any column as chunk key column (postgres, orcale, db2, sqlserver) [flink-cdc]
gtk96 commented on PR #3922: URL: https://github.com/apache/flink-cdc/pull/3922#issuecomment-2673187886 cc @lvyanquan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37333][runtime] Use ForwardForUnspecifiedPartitioner when adaptive broadcast join takes effect [flink]
JunRuiLee merged PR #26178: URL: https://github.com/apache/flink/pull/26178 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-37149) Add Adaptive Batch Execution Documentation
[ https://issues.apache.org/jira/browse/FLINK-37149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Junrui Lee closed FLINK-37149. -- Resolution: Done master 1c759337fd9eef2bcf482ae173d0d45355213565 release-2.0 63e1bfbbae1ff43b150fbc0ec85c21996a73a0d1 > Add Adaptive Batch Execution Documentation > -- > > Key: FLINK-37149 > URL: https://issues.apache.org/jira/browse/FLINK-37149 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Junrui Lee >Assignee: Junrui Lee >Priority: Major > Labels: 2.0-related, pull-request-available > > This ticket aims to introduce a new page for adaptive batch execution and > includes the following parts: > # Adaptive parallelism inference > # Adaptive broadcast join > # Adaptive skewed join optimization -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]
davidradl commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1963468045 ## flink-python/pyflink/table/table.py: ## @@ -1077,6 +1078,91 @@ def explain(self, *extra_details: ExplainDetail) -> str: j_extra_details = to_j_explain_detail_arr(extra_details) return self._j_table.explain(TEXT, j_extra_details) +def insert_into( +self, table_path_or_descriptor: Union[str, TableDescriptor], overwrite: bool = False +) -> TablePipeline: +""" +When ``target_path_or_descriptor`` is a table path: + +Declares that the pipeline defined by the given :class:`Table` (backed by a +DynamicTableSink) should be written to a table that was registered under the specified +path. + +See the documentation of + :func:`pyflink.table.table_environment.TableEnvironment.use_database` or + :func:`pyflink.table.table_environment.TableEnvironment.use_catalog` for the rules on +the path resolution. + +Example: +:: + +>>> table = table_env.sql_query("SELECTFROM MyTable") +>>> table_pipeline = table.insert_into_table_path("MySinkTable", True) +>>> table_result = table_pipeline.execute().wait() + +When ``target_path_or_descriptor`` is a :class:`~pyflink.table.TableDescriptor` : + +Declares that the pipeline defined by the given :class:`Table` object should be written +to a table (backed by a DynamicTableSink) expressed via the given +:class:`~pyflink.table.TableDescriptor`. + +The descriptor won't be registered in the catalog, but it will be propagated directly +in the operation tree. Note that calling this method multiple times, even with the same +descriptor, results in multiple sink tables instances. + +This method allows to declare a :class:`~pyflink.table.Schema` for the sink descriptor. Review Comment: nit: `This method allows to declare` -> A :class:`~pyflink.table.Schema` can be associated with the sink descriptor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Fix post-release japicmp update script for macOS [flink]
flinkbot commented on PR #26183: URL: https://github.com/apache/flink/pull/26183#issuecomment-2671340643 ## CI report: * 00795756ff864f9783d6ce8e38c3d99213845493 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37361) Update japicmp configuration post 1.19.2
[ https://issues.apache.org/jira/browse/FLINK-37361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Fedulov updated FLINK-37361: -- Fix Version/s: 1.19.3 > Update japicmp configuration post 1.19.2 > > > Key: FLINK-37361 > URL: https://issues.apache.org/jira/browse/FLINK-37361 > Project: Flink > Issue Type: Technical Debt >Affects Versions: 1.19.1 >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Fix For: 1.19.3 > > > Update the japicmp reference version and wipe exclusions / enable API > compatibility checks for {{@PublicEvolving}} APIs on the corresponding > SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see > below). > For a new major release (x.y.0), run the same command also on the master > branch for updating the japicmp reference version and removing out-dated > exclusions in the japicmp configuration. > Make sure that all Maven artifacts are already pushed to Maven Central. > Otherwise, there's a risk that CI fails due to missing reference artifacts. > {code:bash} > tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh > tools $ cd .. > $ git add * > $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37360) Update japicmp configuration post 1.20.1
[ https://issues.apache.org/jira/browse/FLINK-37360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Fedulov updated FLINK-37360: -- Fix Version/s: 1.20.2 > Update japicmp configuration post 1.20.1 > > > Key: FLINK-37360 > URL: https://issues.apache.org/jira/browse/FLINK-37360 > Project: Flink > Issue Type: Technical Debt >Affects Versions: 1.20.1 >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Fix For: 1.20.2 > > > Update the japicmp reference version and wipe exclusions / enable API > compatibility checks for {{@PublicEvolving}} APIs on the corresponding > SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see > below). > For a new major release (x.y.0), run the same command also on the master > branch for updating the japicmp reference version and removing out-dated > exclusions in the japicmp configuration. > Make sure that all Maven artifacts are already pushed to Maven Central. > Otherwise, there's a risk that CI fails due to missing reference artifacts. > {code:bash} > tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh > tools $ cd .. > $ git add * > $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37361) Update japicmp configuration post 1.19.2
[ https://issues.apache.org/jira/browse/FLINK-37361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Fedulov updated FLINK-37361: -- Affects Version/s: 1.19.1 (was: 1.20.2) > Update japicmp configuration post 1.19.2 > > > Key: FLINK-37361 > URL: https://issues.apache.org/jira/browse/FLINK-37361 > Project: Flink > Issue Type: Technical Debt >Affects Versions: 1.19.1 >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > > Update the japicmp reference version and wipe exclusions / enable API > compatibility checks for {{@PublicEvolving}} APIs on the corresponding > SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see > below). > For a new major release (x.y.0), run the same command also on the master > branch for updating the japicmp reference version and removing out-dated > exclusions in the japicmp configuration. > Make sure that all Maven artifacts are already pushed to Maven Central. > Otherwise, there's a risk that CI fails due to missing reference artifacts. > {code:bash} > tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh > tools $ cd .. > $ git add * > $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37361) Update japicmp configuration post 1.19.2
Alexander Fedulov created FLINK-37361: - Summary: Update japicmp configuration post 1.19.2 Key: FLINK-37361 URL: https://issues.apache.org/jira/browse/FLINK-37361 Project: Flink Issue Type: Technical Debt Affects Versions: 1.20.2 Reporter: Alexander Fedulov Assignee: Alexander Fedulov Update the japicmp reference version and wipe exclusions / enable API compatibility checks for {{@PublicEvolving}} APIs on the corresponding SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see below). For a new major release (x.y.0), run the same command also on the master branch for updating the japicmp reference version and removing out-dated exclusions in the japicmp configuration. Make sure that all Maven artifacts are already pushed to Maven Central. Otherwise, there's a risk that CI fails due to missing reference artifacts. {code:bash} tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh tools $ cd .. $ git add * $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37360) Update japicmp configuration post 1.20.1
[ https://issues.apache.org/jira/browse/FLINK-37360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Fedulov updated FLINK-37360: -- Affects Version/s: 1.20.1 (was: 1.20.2) > Update japicmp configuration post 1.20.1 > > > Key: FLINK-37360 > URL: https://issues.apache.org/jira/browse/FLINK-37360 > Project: Flink > Issue Type: Technical Debt >Affects Versions: 1.20.1 >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > > Update the japicmp reference version and wipe exclusions / enable API > compatibility checks for {{@PublicEvolving}} APIs on the corresponding > SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see > below). > For a new major release (x.y.0), run the same command also on the master > branch for updating the japicmp reference version and removing out-dated > exclusions in the japicmp configuration. > Make sure that all Maven artifacts are already pushed to Maven Central. > Otherwise, there's a risk that CI fails due to missing reference artifacts. > {code:bash} > tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh > tools $ cd .. > $ git add * > $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37361) Update japicmp configuration post 1.19.2
[ https://issues.apache.org/jira/browse/FLINK-37361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Fedulov updated FLINK-37361: -- Affects Version/s: 1.19.2 (was: 1.19.1) > Update japicmp configuration post 1.19.2 > > > Key: FLINK-37361 > URL: https://issues.apache.org/jira/browse/FLINK-37361 > Project: Flink > Issue Type: Technical Debt >Affects Versions: 1.19.2 >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Fix For: 1.19.3 > > > Update the japicmp reference version and wipe exclusions / enable API > compatibility checks for {{@PublicEvolving}} APIs on the corresponding > SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see > below). > For a new major release (x.y.0), run the same command also on the master > branch for updating the japicmp reference version and removing out-dated > exclusions in the japicmp configuration. > Make sure that all Maven artifacts are already pushed to Maven Central. > Otherwise, there's a risk that CI fails due to missing reference artifacts. > {code:bash} > tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh > tools $ cd .. > $ git add * > $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37360) Update japicmp configuration post 1.20.1
Alexander Fedulov created FLINK-37360: - Summary: Update japicmp configuration post 1.20.1 Key: FLINK-37360 URL: https://issues.apache.org/jira/browse/FLINK-37360 Project: Flink Issue Type: Technical Debt Affects Versions: 1.20.2 Reporter: Alexander Fedulov Assignee: Alexander Fedulov Update the japicmp reference version and wipe exclusions / enable API compatibility checks for {{@PublicEvolving}} APIs on the corresponding SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see below). For a new major release (x.y.0), run the same command also on the master branch for updating the japicmp reference version and removing out-dated exclusions in the japicmp configuration. Make sure that all Maven artifacts are already pushed to Maven Central. Otherwise, there's a risk that CI fails due to missing reference artifacts. {code:bash} tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh tools $ cd .. $ git add * $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]
davidradl commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1963481246 ## flink-python/pyflink/table/table_pipeline.py: ## @@ -0,0 +1,78 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from typing import Optional +from pyflink.java_gateway import get_gateway +from pyflink.table import ExplainDetail +from pyflink.table.catalog import ObjectIdentifier +from pyflink.table.table_result import TableResult +from pyflink.util.java_utils import to_j_explain_detail_arr + +__all__ = ["TablePipeline"] + + +class TablePipeline(object): +""" +Describes a complete pipeline from one or more source tables to a sink table. +""" + +def __init__(self, j_table_pipeline, t_env): +self._j_table_pipeline = j_table_pipeline +self._t_env = t_env + +def __str__(self) -> str: +return self._j_table_pipeline.toString() + +def execute(self) -> TableResult: +""" +Executes the table pipeline. + +.. versionadded:: 2.1.0 +""" +self._t_env._before_execute() +return TableResult(self._j_table_pipeline.execute()) + +def explain(self, *extra_details: ExplainDetail) -> str: +""" +Returns the AST and the execution plan of the table pipeline. + +:param extra_details: The extra explain details which the explain result should include, + e.g. estimated cost, changelog mode for streaming +:return: AST and execution plans + +.. versionadded:: 2.1.0 +""" +gateway = get_gateway() +j_extra_details = to_j_explain_detail_arr(extra_details) +return self._j_table_pipeline.explain( +gateway.jvm.org.apache.flink.table.api.ExplainFormat.TEXT, j_extra_details +) + +def get_sink_identifier(self) -> Optional[ObjectIdentifier]: +""" +Returns the sink table's :class:`~pyflink.table.catalog.ObjectIdentifier`, if any. +The result is empty for anonymous sink tables that haven't been registered before. Review Comment: maybe point to details on how to register these types of tables. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-37356) Recycle use of kafka producer(which commit error) maybe send data without AddPartitionsToTxnRequest
[ https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928771#comment-17928771 ] Hongshun Wang edited comment on FLINK-37356 at 2/20/25 12:28 PM: - [~arvid] just do it. > I'd avoid tinkering around with the internal state of the transactionManager It does make sense to me! I'd like to see your redesign of kafka sink. was (Author: JIRAUSER298968): [~arvid] just do it. > I'd avoid tinkering around with the internal state of the transactionManager It does make sense to me! I'd like to see your resign of kafka sink. > Recycle use of kafka producer(which commit error) maybe send data without > AddPartitionsToTxnRequest > --- > > Key: FLINK-37356 > URL: https://issues.apache.org/jira/browse/FLINK-37356 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.4.0 >Reporter: Hongshun Wang >Priority: Major > > In my production environment, READ_COMMITTED consumer can no longer consume > any records. Then I found that the LSO of the partition doesn't change for a > long time. I lookup all the log in Kafka cluster, then find that there is a > transaction lacking AddPartitionsToTxnRequest. > > At first, I think the problem is caused by > https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster > log contains InvalidTxnStateException. However, though the transaction is in > an invalid state, no data is written into Kafka topic partition in this > transaction( because in this case, the transaction is empty). It will not > influence any Kafka topic partition's LSO, thus consumer won't be blocked. > > Then I check the code of Kafka client, it seems no way to produce data > without AddPartitionsToTxnRequest done because the the `Sender` will refuse > to dequeue batches from the accumulator until they have been added to the > transaction. > {code:java} > // org.apache.kafka.clients.producer.KafkaProducer#doSend > private Future doSend(ProducerRecord record, Callback > callback) { > // ..ignore code > // Add the partition to the transaction (if in progress) after it has been > successfully > // appended to the accumulator. We cannot do it before because the partition > may be > // unknown or the initially selected partition may be changed when the batch > is closed > // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse > to dequeue > // batches from the accumulator until they have been added to the transaction. > if (transactionManager != null) { > transactionManager.maybeAddPartition(appendCallbacks.topicPartition()); > // ignore code > }{code} > {code:java} > //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode > > if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // > there is a rare case that a single batch size is larger than the request size > due to // compression; in this case we will still eventually send this batch > in a single request break; } else { if > (shouldStopDrainBatchesForPartition(first, tp)) break; } > {code} > > Then I have a idea that if a TransactionManager which doesn't clear > partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will > be sent again. It maybe happen in Flink Kafka connector: > > 1. The flink kafka connector also reuse and recycle KafkaProducer: > KafkaCommitter will recycle the producer to > producerPool after the transaction complete or exception, and then > KafkaWriter will reuse it from producerPool. > {code:java} > // code placeholder > org.apache.flink.connector.kafka.sink.KafkaCommitter#commit > @Override > public void commit(Collection> requests) > throws IOException, InterruptedException { > for (CommitRequest request : requests) { > > producer.commitTransaction(); > producer.flush(); > recyclable.ifPresent(Recyclable::close); > } catch (RetriableException e) { > request.retryLater(); > } catch (ProducerFencedException e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (InvalidTxnStateException e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (UnknownProducerIdException e) { > LOG.error( > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (Exception e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithUnknownReason(e); > }
Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]
davidradl commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1963476837 ## flink-python/pyflink/table/table_pipeline.py: ## @@ -0,0 +1,78 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from typing import Optional +from pyflink.java_gateway import get_gateway +from pyflink.table import ExplainDetail +from pyflink.table.catalog import ObjectIdentifier +from pyflink.table.table_result import TableResult +from pyflink.util.java_utils import to_j_explain_detail_arr + +__all__ = ["TablePipeline"] + + +class TablePipeline(object): +""" +Describes a complete pipeline from one or more source tables to a sink table. +""" + +def __init__(self, j_table_pipeline, t_env): +self._j_table_pipeline = j_table_pipeline +self._t_env = t_env + +def __str__(self) -> str: +return self._j_table_pipeline.toString() + +def execute(self) -> TableResult: +""" +Executes the table pipeline. + +.. versionadded:: 2.1.0 +""" +self._t_env._before_execute() +return TableResult(self._j_table_pipeline.execute()) + +def explain(self, *extra_details: ExplainDetail) -> str: +""" +Returns the AST and the execution plan of the table pipeline. + +:param extra_details: The extra explain details which the explain result should include, Review Comment: nit: I am not sure what the possible values are for this field. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37361) Update japicmp configuration post 1.19.2
[ https://issues.apache.org/jira/browse/FLINK-37361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-37361: --- Labels: pull-request-available (was: ) > Update japicmp configuration post 1.19.2 > > > Key: FLINK-37361 > URL: https://issues.apache.org/jira/browse/FLINK-37361 > Project: Flink > Issue Type: Technical Debt >Affects Versions: 1.19.2 >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Labels: pull-request-available > Fix For: 1.19.3 > > > Update the japicmp reference version and wipe exclusions / enable API > compatibility checks for {{@PublicEvolving}} APIs on the corresponding > SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see > below). > For a new major release (x.y.0), run the same command also on the master > branch for updating the japicmp reference version and removing out-dated > exclusions in the japicmp configuration. > Make sure that all Maven artifacts are already pushed to Maven Central. > Otherwise, there's a risk that CI fails due to missing reference artifacts. > {code:bash} > tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh > tools $ cd .. > $ git add * > $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-37361][release] Update japicmp configuration post 1.19.2 [flink]
afedulov opened a new pull request, #26185: URL: https://github.com/apache/flink/pull/26185 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-37356) Recycle use of kafka producer(which commit error) maybe send data without AddPartitionsToTxnRequest
[ https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928771#comment-17928771 ] Hongshun Wang commented on FLINK-37356: --- [~arvid] just do it. I'd like to see your resign of kafka sink. > Recycle use of kafka producer(which commit error) maybe send data without > AddPartitionsToTxnRequest > --- > > Key: FLINK-37356 > URL: https://issues.apache.org/jira/browse/FLINK-37356 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.4.0 >Reporter: Hongshun Wang >Priority: Major > > In my production environment, READ_COMMITTED consumer can no longer consume > any records. Then I found that the LSO of the partition doesn't change for a > long time. I lookup all the log in Kafka cluster, then find that there is a > transaction lacking AddPartitionsToTxnRequest. > > At first, I think the problem is caused by > https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster > log contains InvalidTxnStateException. However, though the transaction is in > an invalid state, no data is written into Kafka topic partition in this > transaction( because in this case, the transaction is empty). It will not > influence any Kafka topic partition's LSO, thus consumer won't be blocked. > > Then I check the code of Kafka client, it seems no way to produce data > without AddPartitionsToTxnRequest done because the the `Sender` will refuse > to dequeue batches from the accumulator until they have been added to the > transaction. > {code:java} > // org.apache.kafka.clients.producer.KafkaProducer#doSend > private Future doSend(ProducerRecord record, Callback > callback) { > // ..ignore code > // Add the partition to the transaction (if in progress) after it has been > successfully > // appended to the accumulator. We cannot do it before because the partition > may be > // unknown or the initially selected partition may be changed when the batch > is closed > // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse > to dequeue > // batches from the accumulator until they have been added to the transaction. > if (transactionManager != null) { > transactionManager.maybeAddPartition(appendCallbacks.topicPartition()); > // ignore code > }{code} > {code:java} > //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode > > if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // > there is a rare case that a single batch size is larger than the request size > due to // compression; in this case we will still eventually send this batch > in a single request break; } else { if > (shouldStopDrainBatchesForPartition(first, tp)) break; } > {code} > > Then I have a idea that if a TransactionManager which doesn't clear > partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will > be sent again. It maybe happen in Flink Kafka connector: > > 1. The flink kafka connector also reuse and recycle KafkaProducer: > KafkaCommitter will recycle the producer to > producerPool after the transaction complete or exception, and then > KafkaWriter will reuse it from producerPool. > {code:java} > // code placeholder > org.apache.flink.connector.kafka.sink.KafkaCommitter#commit > @Override > public void commit(Collection> requests) > throws IOException, InterruptedException { > for (CommitRequest request : requests) { > > producer.commitTransaction(); > producer.flush(); > recyclable.ifPresent(Recyclable::close); > } catch (RetriableException e) { > request.retryLater(); > } catch (ProducerFencedException e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (InvalidTxnStateException e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (UnknownProducerIdException e) { > LOG.error( > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (Exception e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithUnknownReason(e); > } > } > } {code} > 2. If KafkaCommitter meet an exception and doesn't sucess to > commitTransaction, the partitionsInTransaction in > TransactionManager won't be > clear(org.apache.kafka.clients.producer.internals.TransactionManager#completeTransaction). > > 3. If KafkaWriter which reuse same producer and send data to same > partitions in next transact
[jira] [Updated] (FLINK-37360) Update japicmp configuration post 1.20.1
[ https://issues.apache.org/jira/browse/FLINK-37360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-37360: --- Labels: pull-request-available (was: ) > Update japicmp configuration post 1.20.1 > > > Key: FLINK-37360 > URL: https://issues.apache.org/jira/browse/FLINK-37360 > Project: Flink > Issue Type: Technical Debt >Affects Versions: 1.20.1 >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Labels: pull-request-available > Fix For: 1.20.2 > > > Update the japicmp reference version and wipe exclusions / enable API > compatibility checks for {{@PublicEvolving}} APIs on the corresponding > SNAPSHOT branch with the {{update_japicmp_configuration.sh}} script (see > below). > For a new major release (x.y.0), run the same command also on the master > branch for updating the japicmp reference version and removing out-dated > exclusions in the japicmp configuration. > Make sure that all Maven artifacts are already pushed to Maven Central. > Otherwise, there's a risk that CI fails due to missing reference artifacts. > {code:bash} > tools $ NEW_VERSION=$RELEASE_VERSION releasing/update_japicmp_configuration.sh > tools $ cd .. > $ git add * > $ git commit -m "Update japicmp configuration for $RELEASE_VERSION"{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37359] Pin qemu image to 7.0.0 [flink-kubernetes-operator]
gyfora merged PR #946: URL: https://github.com/apache/flink-kubernetes-operator/pull/946 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-37362) PolardbxCharsetITCase test has timed out after 60 minutes
Leonard Xu created FLINK-37362: -- Summary: PolardbxCharsetITCase test has timed out after 60 minutes Key: FLINK-37362 URL: https://issues.apache.org/jira/browse/FLINK-37362 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.4.0 Reporter: Leonard Xu Fix For: cdc-3.4.0 Failed instance: https://github.com/apache/flink-cdc/actions/runs/13390330565/job/37400625838 {code:java} [INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 160.571 s - in org.apache.flink.cdc.connectors.polardbx.PolardbxSourceITCase [INFO] Running org.apache.flink.cdc.connectors.polardbx.PolardbxCharsetITCase Error: The action 'Compile and test' has timed out after 60 minutes. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-37359) Docker build fails for Kubernetes operator
[ https://issues.apache.org/jira/browse/FLINK-37359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-37359. -- Fix Version/s: kubernetes-operator-1.11.0 Resolution: Fixed merged to main 3a2cfdd916c475ee1ccb63fe42f5cfeb3fe912fb > Docker build fails for Kubernetes operator > -- > > Key: FLINK-37359 > URL: https://issues.apache.org/jira/browse/FLINK-37359 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Gyula Fora >Priority: Blocker > Labels: pull-request-available > Fix For: kubernetes-operator-1.11.0 > > > All docker build started to fail with the following error: > {noformat} > 20.45 Setting up libc-bin (2.35-0ubuntu3.9) ... > 20.59 qemu: uncaught target signal 11 (Segmentation fault) - core dumped > 20.95 Segmentation fault (core dumped) > 21.00 qemu: uncaught target signal 11 (Segmentation fault) - core dumped > 21.37 Segmentation fault (core dumped) > Sub-process /usr/bin/dpkg returned an error code (1) > -- > WARNING: No output specified for bake-platform target(s) with > docker-container driver. Build result will only remain in the build cache. To > push result image into registry use --push or to load image into docker use > --load > Dockerfile:68 > > 66 | # Updating Debian > 67 | RUN if [ "$SKIP_OS_UPDATE" = "false" ]; then apt-get update; fi > 68 | >>> RUN if [ "$SKIP_OS_UPDATE" = "false" ]; then apt-get upgrade -y; fi > 69 | > 70 | ARG DISABLE_JEMALLOC=false > > ERROR: failed to solve: process "/bin/sh -c if [ \"$SKIP_OS_UPDATE\" = > \"false\" ]; then apt-get upgrade -y; fi" did not complete successfully: exit > code: 100{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37363) Remove confusing and duplicated state.backend.type config from state SQL connector
Gabor Somogyi created FLINK-37363: - Summary: Remove confusing and duplicated state.backend.type config from state SQL connector Key: FLINK-37363 URL: https://issues.apache.org/jira/browse/FLINK-37363 Project: Flink Issue Type: Improvement Components: API / State Processor Reporter: Gabor Somogyi Assignee: Gabor Somogyi -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37152][build] Update Flink version to 1.20 [flink-cdc]
leonardBang commented on PR #3868: URL: https://github.com/apache/flink-cdc/pull/3868#issuecomment-2671490111 Failed case tracked via https://issues.apache.org/jira/browse/FLINK-37362 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]
beryllw commented on code in PR #3856: URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1963566316 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java: ## @@ -313,4 +313,12 @@ public class MySqlDataSourceOptions { .defaultValue(true) .withDescription( "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format."); + +@Experimental +public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST = + ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled") Review Comment: IIUC, should it be `ConfigOptions.key("scan.incremental.snapshot.unboundary-chunk-first.enabled")`? The end is the largest and the start is the smallest unbounded chunks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]
davidradl commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1963448829 ## flink-python/pyflink/table/catalog.py: ## @@ -1391,3 +1391,87 @@ def of(catalog_name: str, configuration: Configuration, comment: str = None): j_catalog_descriptor = gateway.jvm.org.apache.flink.table.catalog.CatalogDescriptor.of( catalog_name, configuration._j_configuration, comment) return CatalogDescriptor(j_catalog_descriptor) + + +class ObjectIdentifier(object): +""" +Identifies an object in a catalog. It allows to identify objects such as tables, views, +function, or types in a catalog. An identifier must be fully qualified. It is the +responsibility of the catalog manager to resolve an identifier to an object. + +While Path :class:`ObjectPath` is used within the same catalog, instances of this class can be +used across catalogs. + +Two objects are considered equal if they share the same object identifier in a stable session +context. +""" + +_UNKNOWN = "" + +def __init__(self, j_object_identifier): +self._j_object_identifier = j_object_identifier + +def __str__(self): +return self._j_object_identifier.toString() + +def __hash__(self): +return self._j_object_identifier.hashCode() + +def __eq__(self, other): +return isinstance(other, self.__class__) and self._j_object_identifier.equals( +other._j_object_identifier +) + +@staticmethod +def of(catalog_name: str, database_name: str, object_name: str) -> "ObjectIdentifier": +assert catalog_name is not None, "Catalog name must not be null." +assert database_name is not None, "Database name must not be null." +assert object_name is not None, "Object name must not be null." + +if catalog_name == ObjectIdentifier._UNKNOWN or database_name == ObjectIdentifier._UNKNOWN: +raise ValueError(f"Catalog or database cannot be named {ObjectIdentifier._UNKNOWN}") +else: +gateway = get_gateway() +j_object_identifier = gateway.jvm.org.apache.flink.table.catalog.ObjectIdentifier.of( +catalog_name, database_name, object_name +) +return ObjectIdentifier(j_object_identifier=j_object_identifier) + +def get_catalog_name(self) -> str: +return self._j_object_identifier.getCatalogName() + +def get_database_name(self) -> str: +return self._j_object_identifier.getDatabaseName() + +def get_object_name(self) -> str: +return self._j_object_identifier.getObjectName() + +def to_object_path(self) -> ObjectPath: +""" +Convert this :class:`ObjectIdentifier` to :class:`ObjectPath`. + +Throws a TableException if the identifier cannot be converted. +""" +j_object_path = self._j_object_identifier.toObjectPath() +return ObjectPath(j_object_path=j_object_path) + +def to_list(self) -> List[str]: +""" +List of the component names of this object identifier. +""" +return self._j_object_identifier.toList() + +def as_serializable_string(self) -> str: +""" +Returns a string that fully serializes this instance. The serialized string can be used for +transmitting or persisting an object identifier. + +Throws a TableException if the identifier cannot be serialized. Review Comment: So this means it might not always be a Table Exception - as it could be mapped to a CatalogException or one of the others in [https://github.com/apache/flink/blob/master/flink-python/pyflink/util/exceptions.py#L126:L127](https://github.com/apache/flink/blob/master/flink-python/pyflink/util/exceptions.py#L126:L127) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-37356) Recycle use of kafka producer(which commit error) maybe send data without AddPartitionsToTxnRequest
[ https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928771#comment-17928771 ] Hongshun Wang edited comment on FLINK-37356 at 2/20/25 12:27 PM: - [~arvid] just do it. > I'd avoid tinkering around with the internal state of the transactionManager It does make sense to me! I'd like to see your resign of kafka sink. was (Author: JIRAUSER298968): [~arvid] just do it. I'd like to see your resign of kafka sink. > Recycle use of kafka producer(which commit error) maybe send data without > AddPartitionsToTxnRequest > --- > > Key: FLINK-37356 > URL: https://issues.apache.org/jira/browse/FLINK-37356 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.4.0 >Reporter: Hongshun Wang >Priority: Major > > In my production environment, READ_COMMITTED consumer can no longer consume > any records. Then I found that the LSO of the partition doesn't change for a > long time. I lookup all the log in Kafka cluster, then find that there is a > transaction lacking AddPartitionsToTxnRequest. > > At first, I think the problem is caused by > https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster > log contains InvalidTxnStateException. However, though the transaction is in > an invalid state, no data is written into Kafka topic partition in this > transaction( because in this case, the transaction is empty). It will not > influence any Kafka topic partition's LSO, thus consumer won't be blocked. > > Then I check the code of Kafka client, it seems no way to produce data > without AddPartitionsToTxnRequest done because the the `Sender` will refuse > to dequeue batches from the accumulator until they have been added to the > transaction. > {code:java} > // org.apache.kafka.clients.producer.KafkaProducer#doSend > private Future doSend(ProducerRecord record, Callback > callback) { > // ..ignore code > // Add the partition to the transaction (if in progress) after it has been > successfully > // appended to the accumulator. We cannot do it before because the partition > may be > // unknown or the initially selected partition may be changed when the batch > is closed > // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse > to dequeue > // batches from the accumulator until they have been added to the transaction. > if (transactionManager != null) { > transactionManager.maybeAddPartition(appendCallbacks.topicPartition()); > // ignore code > }{code} > {code:java} > //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode > > if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // > there is a rare case that a single batch size is larger than the request size > due to // compression; in this case we will still eventually send this batch > in a single request break; } else { if > (shouldStopDrainBatchesForPartition(first, tp)) break; } > {code} > > Then I have a idea that if a TransactionManager which doesn't clear > partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will > be sent again. It maybe happen in Flink Kafka connector: > > 1. The flink kafka connector also reuse and recycle KafkaProducer: > KafkaCommitter will recycle the producer to > producerPool after the transaction complete or exception, and then > KafkaWriter will reuse it from producerPool. > {code:java} > // code placeholder > org.apache.flink.connector.kafka.sink.KafkaCommitter#commit > @Override > public void commit(Collection> requests) > throws IOException, InterruptedException { > for (CommitRequest request : requests) { > > producer.commitTransaction(); > producer.flush(); > recyclable.ifPresent(Recyclable::close); > } catch (RetriableException e) { > request.retryLater(); > } catch (ProducerFencedException e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (InvalidTxnStateException e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (UnknownProducerIdException e) { > LOG.error( > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (Exception e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithUnknownReason(e); > } > } > } {code} > 2. If KafkaCommitter meet an exception and doesn't sucess to > commitTransaction, the
Re: [PR] [FLINK-37120][cdc-connector] Add ending split chunk first to avoid TaskManager oom [flink-cdc]
leonardBang commented on code in PR #3856: URL: https://github.com/apache/flink-cdc/pull/3856#discussion_r1963519196 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java: ## @@ -313,4 +313,12 @@ public class MySqlDataSourceOptions { .defaultValue(true) .withDescription( "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format."); + +@Experimental +public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST = + ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled") Review Comment: How about ```suggestion ConfigOptions.key("scan.incremental.snapshot.boundary-chunk-first.enabled") ```? IIUC, this options is used to control the boundary chunks should be assigned first or not? ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java: ## @@ -470,7 +470,13 @@ private List splitEvenlySizedChunks( } } // add the ending split -splits.add(ChunkRange.of(chunkStart, null)); +// assign ending split first, both the largest and smallest unbounded chunks are completed +// in the first two splits +if (sourceConfig.isAssignEndingChunkFirst()) { +splits.add(0, ChunkRange.of(chunkStart, null)); Review Comment: Could you consider ` System.arraycopy` for performance ? Currently we need move n-1 elements. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36900] Migrate from conda to uv for managing Python environments [flink-connector-shared-utils]
autophagy commented on code in PR #43: URL: https://github.com/apache/flink-connector-shared-utils/pull/43#discussion_r1963579142 ## python/lint-python.sh: ## @@ -377,73 +375,62 @@ function install_environment() { print_function "STAGE" "installing environment" local sys_os=`uname -s` -#get the index of the SUPPORT_OS array for convinient to intall tool. +#get the index of the SUPPORT_OS array for convenient to install tool. get_os_index $sys_os local os_index=$? -# step-1 install wget -# the file size of the miniconda.sh is too big to use "wget" tool to download instead -# of the "curl" in the weak network environment. +# step-1 install uv if [ $STEP -lt 1 ]; then -print_function "STEP" "installing wget..." -install_wget ${SUPPORT_OS[$os_index]} -STEP=1 -checkpoint_stage $STAGE $STEP -print_function "STEP" "install wget... [SUCCESS]" -fi - -# step-2 install miniconda -if [ $STEP -lt 2 ]; then -print_function "STEP" "installing miniconda..." +print_function "STEP" "installing uv..." create_dir $CURRENT_DIR/download -install_miniconda $os_index -STEP=2 +install_uv +STEP=1 checkpoint_stage $STAGE $STEP -print_function "STEP" "install miniconda... [SUCCESS]" +print_function "STEP" "install uv... [SUCCESS]" fi -# step-3 install python environment which includes +# step-2 install python environment which includes # 3.7 3.8 3.9 3.10 -if [ $STEP -lt 3 ] && [ `need_install_component "py_env"` = true ]; then +if [ $STEP -lt 2 ] && [ `need_install_component "py_env"` = true ]; then print_function "STEP" "installing python environment..." install_py_env -STEP=3 +STEP=2 checkpoint_stage $STAGE $STEP print_function "STEP" "install python environment... [SUCCESS]" fi -# step-4 install tox -if [ $STEP -lt 4 ] && [ `need_install_component "tox"` = true ]; then +# step-3 install tox +if [ $STEP -lt 3 ] && [ `need_install_component "tox"` = true ]; then print_function "STEP" "installing tox..." install_tox -STEP=4 +STEP=3 checkpoint_stage $STAGE $STEP print_function "STEP" "install tox... [SUCCESS]" fi -# step-5 install flake8 -if [ $STEP -lt 5 ] && [ `need_install_component "flake8"` = true ]; then +# step-4 install flake8 +if [ $STEP -lt 4 ] && [ `need_install_component "flake8"` = true ]; then print_function "STEP" "installing flake8..." install_flake8 -STEP=5 +STEP=4 checkpoint_stage $STAGE $STEP print_function "STEP" "install flake8... [SUCCESS]" fi -# step-6 install sphinx -if [ $STEP -lt 6 ] && [ `need_install_component "sphinx"` = true ]; then +# step-5 install sphinx +if [ $STEP -lt 5 ] && [ `need_install_component "sphinx"` = true ]; then print_function "STEP" "installing sphinx..." install_sphinx -STEP=6 +STEP=5 checkpoint_stage $STAGE $STEP print_function "STEP" "install sphinx... [SUCCESS]" fi -# step-7 install mypy -if [[ ${STEP} -lt 7 ]] && [[ `need_install_component "mypy"` = true ]]; then +# step-5 install mypy Review Comment: Oops, yes, good catch 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36594][hive]HiveCatalog should set HiveConf.hiveSiteLocation back [flink]
slankka commented on PR #25568: URL: https://github.com/apache/flink/pull/25568#issuecomment-2671535475 closed in favor of https://github.com/apache/flink-connector-hive/pull/32 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36594][hive]HiveCatalog should set HiveConf.hiveSiteLocation back [flink]
slankka closed pull request #25568: [FLINK-36594][hive]HiveCatalog should set HiveConf.hiveSiteLocation back URL: https://github.com/apache/flink/pull/25568 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]
davidradl commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1963472704 ## flink-python/pyflink/table/table.py: ## @@ -1077,6 +1078,91 @@ def explain(self, *extra_details: ExplainDetail) -> str: j_extra_details = to_j_explain_detail_arr(extra_details) return self._j_table.explain(TEXT, j_extra_details) +def insert_into( +self, table_path_or_descriptor: Union[str, TableDescriptor], overwrite: bool = False +) -> TablePipeline: +""" +When ``target_path_or_descriptor`` is a table path: + +Declares that the pipeline defined by the given :class:`Table` (backed by a +DynamicTableSink) should be written to a table that was registered under the specified +path. + +See the documentation of + :func:`pyflink.table.table_environment.TableEnvironment.use_database` or + :func:`pyflink.table.table_environment.TableEnvironment.use_catalog` for the rules on +the path resolution. + +Example: +:: + +>>> table = table_env.sql_query("SELECTFROM MyTable") +>>> table_pipeline = table.insert_into_table_path("MySinkTable", True) +>>> table_result = table_pipeline.execute().wait() + +When ``target_path_or_descriptor`` is a :class:`~pyflink.table.TableDescriptor` : + +Declares that the pipeline defined by the given :class:`Table` object should be written +to a table (backed by a DynamicTableSink) expressed via the given +:class:`~pyflink.table.TableDescriptor`. + +The descriptor won't be registered in the catalog, but it will be propagated directly +in the operation tree. Note that calling this method multiple times, even with the same +descriptor, results in multiple sink tables instances. + +This method allows to declare a :class:`~pyflink.table.Schema` for the sink descriptor. +The declaration is similar to a ``CREATE TABLE`` DDL in SQL and allows to: Review Comment: maybe change this to. Specifying a schema asserts a structure to the table. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]
davidradl commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1963465316 ## flink-python/pyflink/table/table.py: ## @@ -1077,6 +1078,91 @@ def explain(self, *extra_details: ExplainDetail) -> str: j_extra_details = to_j_explain_detail_arr(extra_details) return self._j_table.explain(TEXT, j_extra_details) +def insert_into( +self, table_path_or_descriptor: Union[str, TableDescriptor], overwrite: bool = False +) -> TablePipeline: +""" +When ``target_path_or_descriptor`` is a table path: + +Declares that the pipeline defined by the given :class:`Table` (backed by a +DynamicTableSink) should be written to a table that was registered under the specified +path. + +See the documentation of + :func:`pyflink.table.table_environment.TableEnvironment.use_database` or + :func:`pyflink.table.table_environment.TableEnvironment.use_catalog` for the rules on +the path resolution. + +Example: +:: + +>>> table = table_env.sql_query("SELECTFROM MyTable") Review Comment: "SELECTFROM MyTable" does not look like valid SQL. I was expecting something like `SELECT * FROM MyTable; ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]
davidradl commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1963472704 ## flink-python/pyflink/table/table.py: ## @@ -1077,6 +1078,91 @@ def explain(self, *extra_details: ExplainDetail) -> str: j_extra_details = to_j_explain_detail_arr(extra_details) return self._j_table.explain(TEXT, j_extra_details) +def insert_into( +self, table_path_or_descriptor: Union[str, TableDescriptor], overwrite: bool = False +) -> TablePipeline: +""" +When ``target_path_or_descriptor`` is a table path: + +Declares that the pipeline defined by the given :class:`Table` (backed by a +DynamicTableSink) should be written to a table that was registered under the specified +path. + +See the documentation of + :func:`pyflink.table.table_environment.TableEnvironment.use_database` or + :func:`pyflink.table.table_environment.TableEnvironment.use_catalog` for the rules on +the path resolution. + +Example: +:: + +>>> table = table_env.sql_query("SELECTFROM MyTable") +>>> table_pipeline = table.insert_into_table_path("MySinkTable", True) +>>> table_result = table_pipeline.execute().wait() + +When ``target_path_or_descriptor`` is a :class:`~pyflink.table.TableDescriptor` : + +Declares that the pipeline defined by the given :class:`Table` object should be written +to a table (backed by a DynamicTableSink) expressed via the given +:class:`~pyflink.table.TableDescriptor`. + +The descriptor won't be registered in the catalog, but it will be propagated directly +in the operation tree. Note that calling this method multiple times, even with the same +descriptor, results in multiple sink tables instances. + +This method allows to declare a :class:`~pyflink.table.Schema` for the sink descriptor. +The declaration is similar to a ``CREATE TABLE`` DDL in SQL and allows to: Review Comment: maybe change this to. Specifying a schema asserts a structure to the table and can be used to: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37336][python] Introduce TablePipeline descriptor to Python Table API [flink]
davidradl commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1963443638 ## flink-python/pyflink/table/catalog.py: ## @@ -1391,3 +1391,87 @@ def of(catalog_name: str, configuration: Configuration, comment: str = None): j_catalog_descriptor = gateway.jvm.org.apache.flink.table.catalog.CatalogDescriptor.of( catalog_name, configuration._j_configuration, comment) return CatalogDescriptor(j_catalog_descriptor) + + +class ObjectIdentifier(object): +""" +Identifies an object in a catalog. It allows to identify objects such as tables, views, Review Comment: nit: change to match the other doc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org