[jira] [Created] (FLINK-27808) Allow "kubernetes" as setting for HA_MODE
Chesnay Schepler created FLINK-27808: Summary: Allow "kubernetes" as setting for HA_MODE Key: FLINK-27808 URL: https://issues.apache.org/jira/browse/FLINK-27808 Project: Flink Issue Type: Improvement Components: Deployment / Kubernetes, Runtime / Configuration Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.16.0 Make it easier to enable kubernetes HA by allowing "kubernetes" as a setting for HA_MODE. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27809) Clarify that cluster-id is mandatory for Kubernetes HA in standalone mode
Chesnay Schepler created FLINK-27809: Summary: Clarify that cluster-id is mandatory for Kubernetes HA in standalone mode Key: FLINK-27809 URL: https://issues.apache.org/jira/browse/FLINK-27809 Project: Flink Issue Type: Improvement Components: Deployment / Kubernetes, Runtime / Configuration Reporter: Chesnay Schepler Fix For: 1.16.0 The description for KubernetesConfigOptions#CLUSTER_ID states that the client generates this automatically if it isn't set. This is technically correct, because the client is not involved in the deployment for standalone clusters, but to users this sounds like it is optional in general, while it must be set (to an ideally unique value) in standalone mode. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-217 Support watermark alignment of source splits
I had an offline discussion with Piotr and here is the summary. Please correct me if I miss something, Piotr. There are two things we would like to seek more opinions from the community, so we can make progress on this FLIP. 1. The General pattern to add obligatory features to existing interfaces. *** For interfaces exposed to the developers for implementation, they are either intended to be *optional* or *obligatory. *While it is quite clear about how to convey that intention when creating the interfaces, it is not as commonly agreed when we are adding new features to an existing interface. In general, Flink uses decorative interfaces when adding optional features to existing interfaces. Both Piotr and I agree that looks good. Different opinions are mainly about how to add obligatory features to the existing interfaces, probably due to different understandings of "obligatory". We have discussed about four options: *Option 1:* - Just add a new method to the existing interface. - For backwards compatibility, the method would have a default implementation throwing "UnsupportedOperationException". - In the next major version, remove the default implementation. - For the developers, any method with a default implementation throwing an "UnsupportedOperationException" should be taken as obligatory. *Option 2:* - Always make the features optional by adding a decorative interface, just like ordinary optional features. - Inform the developers via documentation that this feature is obligatory, although it looks like optional from the code. - In case the developers did not implement the decorative interface, throw an exception - In the next major version, move the methods in the decorative interface to the base interface, and deprecate the decorative interface. *Option 3:* - Always bump the major version when a new obligatory feature is added, even if we may have to do it frequently. *Option 4:* - Add a V2, V3... of the interface affected by the new obligatory feature. - In the next major versions, deprecate old versions of the interfaces. Both Piotr and me agreed that option 3 and option 4 have a big side effect and should be avoided. We have different preference between option 1 and option 2. Personally I prefer option 1, the reasons are: a) simple and intuitive. Java 8 introduced the default impl in interfaces exactly for interface evolving, and this is a common pattern in many projects. b) prominent to the developers that the feature is expected to be implemented, because it explicitly throws an exception in the default impl. c) low maintenance overhead - the Flink framework can always assume the method exists, so no special handling logic is needed. d) communicate a clear semantic boundary between optional and obligatory features in the Flink to the developers. - Optional: Jobs still run without exception if these methods are not implemented. e.g. all the SupportsXXXPushDown interfaces. - Obligatory: Jobs may fail if these methods are not implemented properly. e..g SourceReader#pauseOrResumeSplits(). This is a common pattern in Java, e.g. Iterator.remove() by default throws "UnsupportedOperationException", informing the implementation that things may go wrong if this method is not implemented. As for option 2, Although the API itself sounds clean, it misleads people to think of an obligatory feature to be optional - from the code the feature is optional, but the documents say it is obligatory. We probably should avoid such code-doc inconsistency, as people will be confused. And I would actually be bewildered that sometimes not implementing an "optional" feature is fine, but sometimes it causes the jobs to fail. In response to the argument that the method with a default implementation is always optional, if that is true, it actually means all the interfaces should be immutable once they are created. If we want to add a method to an existing interface, for backwards compatibility, we will have to provide a default implementation. And the fact it has a default implementation indicates the method is optional. If that method is optional, it should reside in a separate decorative interface, otherwise it clogs that existing interface. Therefore, people should never add a method to an existing interface. I find this conclusion a bit extreme. Piotr prefers option 2, his opinions are: a) Obligatory methods are the methods that fail the code compilation if not implemented. b) All obligatory methods should reside in the base interface, without a default implementation. And all the optional methods should be in decorative interfaces. This is a clean API. c) due to b), there isn't a viable solution to add an obligatory method to an existing interface in a backwards compatible way. Unless we are OK with breaking backwards compatibility, all the
[jira] [Created] (FLINK-27810) Elasticsearch e2e jars bundle way more than they should
Chesnay Schepler created FLINK-27810: Summary: Elasticsearch e2e jars bundle way more than they should Key: FLINK-27810 URL: https://issues.apache.org/jira/browse/FLINK-27810 Project: Flink Issue Type: Technical Debt Components: Build System, Connectors / ElasticSearch, Tests Affects Versions: 1.16.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.16.0 The jars bundle flink-end-to-end-tests-common-elasticsearch and all of it's transitive dependencies, like junit or flink-rpc-core. All of these are unnecessary for the test to work and really shouldn't be bundled. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27811) Remove netty dependency in flink-test-utils
Chesnay Schepler created FLINK-27811: Summary: Remove netty dependency in flink-test-utils Key: FLINK-27811 URL: https://issues.apache.org/jira/browse/FLINK-27811 Project: Flink Issue Type: Technical Debt Components: Build System, Tests Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.16.0 For some reason we bundle a relocated version of netty in flink-test-utils. AFAICT this should be unnecessary because nothing makes use of the relocated version. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric
Hi all, I think the problem now is below: 1. AllCache and PartialCache interface on the non-uniform, one needs to provide LookupProvider, the other needs to provide CacheBuilder. 2. AllCache definition is not flexible, for example, PartialCache can use any custom storage, while the AllCache can not, AllCache can also be considered to store memory or disk, also need a flexible strategy. 3. AllCache can not customize ReloadStrategy, currently only ScheduledReloadStrategy. In order to solve the above problems, the following are my ideas. ## Top level cache interfaces: ``` public interface CacheLookupProvider extends LookupTableSource.LookupRuntimeProvider { CacheBuilder createCacheBuilder(); } public interface CacheBuilder { Cache create(); } public interface Cache { /** * Returns the value associated with key in this cache, or null if there is no cached value for * key. */ @Nullable Collection getIfPresent(RowData key); /** Returns the number of key-value mappings in the cache. */ long size(); } ``` ## Partial cache ``` public interface PartialCacheLookupFunction extends CacheLookupProvider { @Override PartialCacheBuilder createCacheBuilder(); /** Creates an {@link LookupFunction} instance. */ LookupFunction createLookupFunction(); } public interface PartialCacheBuilder extends CacheBuilder { PartialCache create(); } public interface PartialCache extends Cache { /** * Associates the specified value rows with the specified key row in the cache. If the cache * previously contained value associated with the key, the old value is replaced by the * specified value. * * @return the previous value rows associated with key, or null if there was no mapping for key. * @param key - key row with which the specified value is to be associated * @param value – value rows to be associated with the specified key */ Collection put(RowData key, Collection value); /** Discards any cached value for the specified key. */ void invalidate(RowData key); } ``` ## All cache ``` public interface AllCacheLookupProvider extends CacheLookupProvider { void registerReloadStrategy(ScheduledExecutorService executorService, Reloader reloader); ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(); @Override AllCacheBuilder createCacheBuilder(); } public interface AllCacheBuilder extends CacheBuilder { AllCache create(); } public interface AllCache extends Cache { void putAll(Iterator> allEntries); void clearAll(); } public interface Reloader { void reload(); } ``` Best, Jingsong On Fri, May 27, 2022 at 11:10 AM Jingsong Li wrote: > Thanks Qingsheng and all for your discussion. > > Very sorry to jump in so late. > > Maybe I missed something? > My first impression when I saw the cache interface was, why don't we > provide an interface similar to guava cache [1], on top of guava cache, > caffeine also makes extensions for asynchronous calls.[2] > There is also the bulk load in caffeine too. > > I am also more confused why first from LookupCacheFactory.Builder and then > to Factory to create Cache. > > [1] https://github.com/google/guava > [2] https://github.com/ben-manes/caffeine/wiki/Population > > Best, > Jingsong > > On Thu, May 26, 2022 at 11:17 PM Jark Wu wrote: > >> After looking at the new introduced ReloadTime and Becket's comment, >> I agree with Becket we should have a pluggable reloading strategy. >> We can provide some common implementations, e.g., periodic reloading, and >> daily reloading. >> But there definitely be some connector- or business-specific reloading >> strategies, e.g. >> notify by a zookeeper watcher, reload once a new Hive partition is >> complete. >> >> Best, >> Jark >> >> On Thu, 26 May 2022 at 11:52, Becket Qin wrote: >> >> > Hi Qingsheng, >> > >> > Thanks for updating the FLIP. A few comments / questions below: >> > >> > 1. Is there a reason that we have both "XXXFactory" and "XXXProvider". >> > What is the difference between them? If they are the same, can we just >> use >> > XXXFactory everywhere? >> > >> > 2. Regarding the FullCachingLookupProvider, should the reloading policy >> > also be pluggable? Periodical reloading could be sometimes be tricky in >> > practice. For example, if user uses 24 hours as the cache refresh >> interval >> > and some nightly batch job delayed, the cache update may still see the >> > stale data. >> > >> > 3. In DefaultLookupCacheFactory, it looks like InitialCapacity should be >> > removed. >> > >> > 4. The purpose of LookupFunctionProvider#cacheMissingKey() seems a >> little >> > confusing to me. If Optional getCacheFactory() >> returns >> > a non-empty factory, doesn't that already indicates the framework to >> cache >> > the missing keys? Also, why is this method returning an >> Optional >> > instead of boolean? >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > >> > >> > On
[jira] [Created] (FLINK-27812) Support Dynamic change of watched namespaces
Matyas Orhidi created FLINK-27812: - Summary: Support Dynamic change of watched namespaces Key: FLINK-27812 URL: https://issues.apache.org/jira/browse/FLINK-27812 Project: Flink Issue Type: Improvement Reporter: Matyas Orhidi -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27813) java.lang.IllegalStateException: afte migration from statefun 3.1.1 to 3.2.0
Oleksandr created FLINK-27813: - Summary: java.lang.IllegalStateException: afte migration from statefun 3.1.1 to 3.2.0 Key: FLINK-27813 URL: https://issues.apache.org/jira/browse/FLINK-27813 Project: Flink Issue Type: Bug Components: API / State Processor Affects Versions: statefun-3.2.0 Reporter: Oleksandr Issue was met after migration to 3.2.0 {code:java} ts.execution-paymentManualValidationRequestEgress-egress, Sink: ua.aval.payments.execution-norkomExecutionRequested-egress, Sink: ua.aval.payments.execution-shareStatusToManualValidation-egress) (1/1)#15863 (98a1f6bef9a435ef9d0292fefeb64022) switched from RUNNING to FAILED with failure cause: java.lang.IllegalStateException: Unable to parse Netty transport spec.\n\tat org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.parseTransportSpec(NettyRequestReplyClientFactory.java:56)\n\tat org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplyClientFactory.createTransportClient(NettyRequestReplyClientFactory.java:39)\n\tat org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider.functionOfType(HttpFunctionProvider.java:49)\n\tat org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:70)\n\tat org.apache.flink.statefun.flink.core.functions.PredefinedFunctionLoader.load(PredefinedFunctionLoader.java:47)\n\tat org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:71)\n\tat org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)\n\tat org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:48)\n\tat org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:153)\n\tat org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:148)\n\tat org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)\n\tat org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)\n\tat org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)\n\tat org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)\n\tat org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)\n\tat org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)\n\tat org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)\n\tat org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)\n\tat org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)\n\tat org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)\n\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)\n\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException: Time interval unit label 'm' does not match any of the recognized units: DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | nanoseconds) (through reference chain: org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplySpec[\"timeouts\"]->org.apache.flink.statefun.flink.core.nettyclient.NettyRequestReplySpec$Timeouts[\"call\"])\n\tat org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonM
[jira] [Created] (FLINK-27814) Add an abstraction layer for connectors to read and write row data instead of key-values
Caizhi Weng created FLINK-27814: --- Summary: Add an abstraction layer for connectors to read and write row data instead of key-values Key: FLINK-27814 URL: https://issues.apache.org/jira/browse/FLINK-27814 Project: Flink Issue Type: Improvement Components: Table Store Affects Versions: table-store-0.2.0 Reporter: Caizhi Weng Currently {{FileStore}} exposes an interface for reading and writing {{KeyValue}}. However connectors may have different ways to change a {{RowData}} to {{KeyValue}} under different {{WriteMode}}. This results in lots of {{if...else...}} branches and duplicated code. We need to add an abstraction layer for connectors to read and write row data instead of key-values. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27815) Improve the join reorder strategy for batch sql job
godfrey he created FLINK-27815: -- Summary: Improve the join reorder strategy for batch sql job Key: FLINK-27815 URL: https://issues.apache.org/jira/browse/FLINK-27815 Project: Flink Issue Type: New Feature Components: Table SQL / Planner Reporter: godfrey he Join is heavy operation in the execution, the join order in a query can have a significant impact on the query’s performance. Currently, the planner has one join reorder strategy which is provided by Apache Calcite, but it strongly depends on the statistics. It's better we can provide different join reorder strategies for different situations, such as: 1. provide a join reorder strategy without statistics, e.g. eliminate cross joins 2. improve current join reorders strategy with statistics 3. provide hints to allow users to choose join order strategy 4. ... -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27816) file source reader has Some requirements
jinshuangxian created FLINK-27816: - Summary: file source reader has Some requirements Key: FLINK-27816 URL: https://issues.apache.org/jira/browse/FLINK-27816 Project: Flink Issue Type: Improvement Affects Versions: 1.15.0 Reporter: jinshuangxian I use the flink sql file-system connector to consume data, write it to object storage, and use the file-system consumption to process the data in the object storage. The whole process works fine. I have 2 new requirements: 1. can I specify a timestamp to consume files within a specified time range 2. It is hoped that the data written to the object storage can be ordered in the partition (for example, partitioned according to the device id), and the file source reader can consume the files in an orderly manner similar to kafka when consuming files. Can some enhancements be made to the file source reader? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27817) TaskManager metaspace OOM for session cluster
godfrey he created FLINK-27817: -- Summary: TaskManager metaspace OOM for session cluster Key: FLINK-27817 URL: https://issues.apache.org/jira/browse/FLINK-27817 Project: Flink Issue Type: New Feature Components: Runtime / Task Reporter: godfrey he >From user ML: >https://www.mail-archive.com/user-zh@flink.apache.org/msg15224.html For SQL jobs, the most operators are code generated with *unique class name*, this will cause the TM metaspace space continued growth until OOM in a session cluster. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27818) Model enums as references in OpenAPI spec
Chesnay Schepler created FLINK-27818: Summary: Model enums as references in OpenAPI spec Key: FLINK-27818 URL: https://issues.apache.org/jira/browse/FLINK-27818 Project: Flink Issue Type: Improvement Components: Documentation, Runtime / REST Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27819) Generate better operationIds for OpenAPI spec
Chesnay Schepler created FLINK-27819: Summary: Generate better operationIds for OpenAPI spec Key: FLINK-27819 URL: https://issues.apache.org/jira/browse/FLINK-27819 Project: Flink Issue Type: Improvement Components: Documentation, Runtime / REST Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.16.0 There is an easy way to generate operation ids that are significantly better than the defaults. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-234: Support Retryable Lookup Join To Solve Delayed Updates Issue In External Systems
Hi Lincoln, Delayed Dim Join is a frequently requested feature, it's exciting to see this feature is on the road. The FLIP looks good to me in general. I only left some minor comments. 1) support retry for sync lookup I'm also fine with the idea proposed by Jingsong. But this doesn't conflict with the FLIP and can be future work. It would be great if we can determine the APIs first. 1) "allow-unordered" => "unordered" I would prefer the "unordered" output mode rather than "allow-unordered". Because this fully aligns with the DataStream behaviors and avoids confusion on the differences. I understand the purpose that adding a "allow" prefix here, but I think the semantic is fine to just use "unordered" here. We didn't see any users confused about OutputMode#UNORDERED. Best, Jark On Fri, 27 May 2022 at 12:58, Jingsong Li wrote: > Thanks Lincoln for your proposal. > > Take a look at `strategy: fixed-delay delay: duration, e.g., 10s > max-attempts: integer, e.g., 3`. > > Are these options only for async? It looks like normal lookups work too? > > One thing is: most of the lookup functions seem to be synchronous now? > There are not so many asynchronous ones? > > Best, > Jingsong > > On Tue, May 24, 2022 at 11:48 AM Lincoln Lee > wrote: > > > Hi all, > > > > Considering the new common table option 'lookup.max-retries' proposed in > > FLIP-221[1] which is commonly used for exception handling in connector > > implementation, we should clearly distinguish ASYNC_LOOKUP_RETRY from it > to > > avoid confusing users. > > > > To do so, the name ASYNC_LOOKUP_RETRY can change to > > ASYNC_LOOKUP_MISS_RETRY, and as the name implies, restrict it to support > > retries only for lookup misses and no longer include exceptions (for sql > > connectors, let the connector implementer decide how to handle exceptions > > since there are various kinds of retryable exceptions and can not retry > > ones). > > > > The FLIP[2] has been updated. > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric > > [2] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems > > > > > > Best, > > Lincoln Lee > > > > > > Lincoln Lee 于2022年5月19日周四 18:24写道: > > > > > Dear Flink developers, > > > > > > I would like to open a discussion on FLIP 234 [1] to support retryable > > > lookup join to solve delayed updates issue, as a pre-work for this > > > solution, we proposed FLIP-232[2] which adds a generic retry support > for > > > Async I/O. > > > We prefer to offer this retry capability via query hints, similar to > new > > > join hints proposed in FLINK-27625[3] & FLIP-204[4]. > > > > > > This feature is backwards compatible and transparently to connectors. > For > > > existing connectors which implements AsyncTableFunction, can easily > > enable > > > async retry via the new join hint. > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems > > > [2] > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 > > > [3] https://lists.apache.org/thread/jm9kg33wk9z2bvo2b0g5bp3n5kfj6qv8 > > > [4] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204:+Introduce+Hash+Lookup+Join > > > > > > Best, > > > Lincoln Lee > > > > > >
[jira] [Created] (FLINK-27820) Handle Upgrade/Deployment errors gracefully
Gyula Fora created FLINK-27820: -- Summary: Handle Upgrade/Deployment errors gracefully Key: FLINK-27820 URL: https://issues.apache.org/jira/browse/FLINK-27820 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.0.0 Reporter: Gyula Fora Assignee: Gyula Fora Fix For: kubernetes-operator-1.1.0 The operator currently cannot gracefully handle the cases when there is a failure during (or directly after & and before updating the status) job submission. This applies to both initial cluster submissions when a Flink CR was created but more importantly during upgrades. This is slightly related to https://issues.apache.org/jira/browse/FLINK-27804 where mid-upgrade observe was disabled to workaround some issues, this logic should also be improved to only skip observing last-state info for already finished jobs (that were observed before). During upgrades, the observer should be able to recognize when the job/cluster was actually submitted already even if the status update subsequently failed and move the status into a healthy DEPLOYED state. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27821) Cannot delete flinkdeployment when the pod and deployment deleted manually
Hector Miuler Malpica Gallegos created FLINK-27821: -- Summary: Cannot delete flinkdeployment when the pod and deployment deleted manually Key: FLINK-27821 URL: https://issues.apache.org/jira/browse/FLINK-27821 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.1.0 Reporter: Hector Miuler Malpica Gallegos My operator was installed with following command: ``` git clone g...@github.com:apache/flink-kubernetes-operator.git git checkout 207b17b cd flink-kubernetes-operator helm --debug upgrade -i \ flink-kubernetes-operator helm/flink-kubernetes-operator \ --set image.repository=ghcr.io/apache/flink-kubernetes-operator \ --set image.tag=207b17b ``` Then I create a flinkDeployment and flinkSessionJob, then I delete the deployment of the flinkDeployment, and finally I wanted to delete the flinkdeployment kubectl logs -f pod/flink-kubernetes-operator-5cf66cbbcb-bpl9p ``` 2022-05-27 13:40:22,027 o.a.f.k.o.c.FlinkDeploymentController [INFO ][flink-system/migration] Deleting FlinkDeployment 2022-05-27 13:40:34,047 o.a.f.s.n.i.n.c.AbstractChannel [WARN ] Force-closing a channel whose registration task was not accepted by an event loop: [id: 0xb2062900] java.util.concurrent.RejectedExecutionException: event executor terminated at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483) at org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87) at org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81) at org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86) at org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323) at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139) at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123) at org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:467) at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:390) at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:304) at org.apache.flink.client.program.rest.RestClusterClient.lambda$null$32(RestClusterClient.java:863) at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.postFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) 2022-05-27 13:40:34,047 o.a.f.s.n.i.n.u.c.D.rejectedExecution [ERROR] Failed to submit a listener notification task. Event loop shut down? java.util.concurrent.RejectedExecutionException: event executor terminated at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825) at org.apache.flink.shaded.netty4.i
Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric
Thanks all for the valuable discussion. The new feature looks very interesting. According to the FLIP description: "*Currently we have JDBC, Hive and HBase connector implemented lookup table source. All existing implementations will be migrated to the current design and the migration will be transparent to end users*." I was only wondering if we should pay attention to HBase and similar DBs. Since, commonly, the lookup data will be huge while using HBase, partial caching will be used in this case, if I am not mistaken, which might have an impact on the block cache used by HBase, e.g. LruBlockCache. Another question is that, since HBase provides a sophisticated cache solution, does it make sense to have a no-cache solution as one of the default solutions so that customers will have no effort for the migration if they want to stick with Hbase cache? Best regards, Jing On Fri, May 27, 2022 at 11:19 AM Jingsong Li wrote: > Hi all, > > I think the problem now is below: > 1. AllCache and PartialCache interface on the non-uniform, one needs to > provide LookupProvider, the other needs to provide CacheBuilder. > 2. AllCache definition is not flexible, for example, PartialCache can use > any custom storage, while the AllCache can not, AllCache can also be > considered to store memory or disk, also need a flexible strategy. > 3. AllCache can not customize ReloadStrategy, currently only > ScheduledReloadStrategy. > > In order to solve the above problems, the following are my ideas. > > ## Top level cache interfaces: > > ``` > > public interface CacheLookupProvider extends > LookupTableSource.LookupRuntimeProvider { > > CacheBuilder createCacheBuilder(); > } > > > public interface CacheBuilder { > Cache create(); > } > > > public interface Cache { > > /** > * Returns the value associated with key in this cache, or null if > there is no cached value for > * key. > */ > @Nullable > Collection getIfPresent(RowData key); > > /** Returns the number of key-value mappings in the cache. */ > long size(); > } > > ``` > > ## Partial cache > > ``` > > public interface PartialCacheLookupFunction extends CacheLookupProvider { > > @Override > PartialCacheBuilder createCacheBuilder(); > > /** Creates an {@link LookupFunction} instance. */ > LookupFunction createLookupFunction(); > } > > > public interface PartialCacheBuilder extends CacheBuilder { > > PartialCache create(); > } > > > public interface PartialCache extends Cache { > > /** > * Associates the specified value rows with the specified key row > in the cache. If the cache > * previously contained value associated with the key, the old > value is replaced by the > * specified value. > * > * @return the previous value rows associated with key, or null if > there was no mapping for key. > * @param key - key row with which the specified value is to be > associated > * @param value – value rows to be associated with the specified key > */ > Collection put(RowData key, Collection value); > > /** Discards any cached value for the specified key. */ > void invalidate(RowData key); > } > > ``` > > ## All cache > ``` > > public interface AllCacheLookupProvider extends CacheLookupProvider { > > void registerReloadStrategy(ScheduledExecutorService > executorService, Reloader reloader); > > ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(); > > @Override > AllCacheBuilder createCacheBuilder(); > } > > > public interface AllCacheBuilder extends CacheBuilder { > > AllCache create(); > } > > > public interface AllCache extends Cache { > > void putAll(Iterator> allEntries); > > void clearAll(); > } > > > public interface Reloader { > > void reload(); > } > > ``` > > Best, > Jingsong > > On Fri, May 27, 2022 at 11:10 AM Jingsong Li > wrote: > > > Thanks Qingsheng and all for your discussion. > > > > Very sorry to jump in so late. > > > > Maybe I missed something? > > My first impression when I saw the cache interface was, why don't we > > provide an interface similar to guava cache [1], on top of guava cache, > > caffeine also makes extensions for asynchronous calls.[2] > > There is also the bulk load in caffeine too. > > > > I am also more confused why first from LookupCacheFactory.Builder and > then > > to Factory to create Cache. > > > > [1] https://github.com/google/guava > > [2] https://github.com/ben-manes/caffeine/wiki/Population > > > > Best, > > Jingsong > > > > On Thu, May 26, 2022 at 11:17 PM Jark Wu wrote: > > > >> After looking at the new introduced ReloadTime and Becket's comment, > >> I agree with Becket we should have a pluggable reloading strategy. > >> We can provide some common implementations, e.g., periodic reloading, > and > >> daily reloading. > >> But there definitely be some connector- or business-specific reloading > >> strategies, e.g. > >> notify by a zookeeper watcher, reload once a new Hive partition is >
Re: [DISCUSS] FLIP-231: Introduce SupportStatisticReport to support reporting statistics from source connectors
Hi, everyone. Thanks for all the inputs. If there is no more feedback, I think we can start the vote next monday. Best, Godfrey Martijn Visser 于2022年5月25日周三 19:46写道: > > Hi Godfrey, > > Thanks for creating the FLIP. I have no comments. > > Best regards, > > Martijn > > > On Tue, 17 May 2022 at 04:52, Jingsong Li wrote: > > > Hi Godfrey, > > > > Thanks for your reply. > > > > Sounds good to me. > > > > > I think we should also introduce a config option > > > > We can add this option to the FLIP. I prefer a option for > > FileSystemConnector, maybe a enum. > > > > Best, > > Jingsong > > > > On Tue, May 17, 2022 at 10:31 AM godfrey he wrote: > > > > > Hi Jingsong, > > > > > > Thanks for the feedback. > > > > > > > > > >One concern I have is that we read the footer for each file, and this > > may > > > >be a bit costly in some cases. Is it possible for us to have some > > > > hierarchical way > > > yes, if there are thousands of orc/parquet files, it may take a long > > time. > > > So we can introduce a config option to let the user choose the > > > granularity of the statistics. > > > But the SIZE will not be introduced, because the planner does not use > > > the file size statistics now. > > > We can introduce once file size statistics is introduce in the future. > > > I think we should also introduce a config option to enable/disable > > > SupportStatisticReport, > > > because it's a heavy operation for some connectors in some cases. > > > > > > > is the filter pushdown already happening at > > > > this time? > > > That's a good point. Currently, the filter push down is after partition > > > pruning > > > to prevent the filter push down rule from consuming the partition > > > predicates. > > > The statistics will be set to unknown if filter is pushed down now. > > > To combine them all, we can create an optimization program after filter > > > push > > > down program to collect the statistics. This could avoid collecting > > > statistics multiple times. > > > > > > > > > Best, > > > Godfrey > > > > > > Jingsong Li 于2022年5月13日周五 22:44写道: > > > > > > > > Thank Godfrey for driving. > > > > > > > > Looks very good~ This will undoubtedly greatly enhance the various > > batch > > > > mode connectors. > > > > > > > > I left some comments: > > > > > > > > ## FileBasedStatisticsReportableDecodingFormat > > > > > > > > One concern I have is that we read the footer for each file, and this > > may > > > > be a bit costly in some cases. Is it possible for us to have some > > > > hierarchical way, e.g. > > > > - No statistics are collected for files by default. > > > > - SIZE: Generate statistics based on file Size, get the size of the > > file > > > > only with access to the master of the FileSystem. > > > > - DETAILED: Get the complete statistics by format, possibly by > > accessing > > > > the footer of the file. > > > > > > > > ## When use the statistics reported by connector > > > > > > > > > When partitions are pruned by PushPartitionIntoTableSourceScanRule, > > the > > > > statistics should also be updated. > > > > > > > > I understand that we definitely need to use reporter after the > > partition > > > > prune, but another question: is the filter pushdown already happening > > at > > > > this time? > > > > Can we make sure that in the following three cases, both the filter > > > > pushdown and the partition prune happen before the stats reporting. > > > > - only partition prune happens > > > > - only filter pushdown happens > > > > - both filter pushdown and partition prune happen > > > > > > > > Best, > > > > Jingsong > > > > > > > > On Fri, May 13, 2022 at 6:57 PM godfrey he > > wrote: > > > > > > > > > Hi all, > > > > > > > > > > I would like to open a discussion on FLIP-231: Introduce > > > > > SupportStatisticReport > > > > > to support reporting statistics from source connectors. > > > > > > > > > > Statistics are one of the most important inputs to the optimizer. > > > > > Accurate and complete statistics allows the optimizer to be more > > > powerful. > > > > > Currently, the statistics of Flink SQL come from Catalog only, > > > > > while many Connectors have the ability to provide statistics, e.g. > > > > > FileSystem. > > > > > In production, we find many tables in Catalog do not have any > > > statistics. > > > > > As a result, the optimizer can't generate better execution plans, > > > > > especially for Batch jobs. > > > > > > > > > > There are two approaches to enhance statistics for the planner, > > > > > one is to introduce the "ANALYZE TABLE" syntax which will write > > > > > the analyzed result to the catalog, another is to introduce a new > > > > > connector interface > > > > > which allows the connector itself to report statistics directly to > > the > > > > > planner. > > > > > The second one is a supplement to the catalog statistics. > > > > > > > > > > Here, we will discuss the second approach. Compared to the first one, > > > > > the second one is to get statistics in real time, no
RE: [VOTE] Apache Flink Kubernetes Operator Release 1.0.0, release candidate #2
Hi Yang, Oh, I’ve been using podman on Red Hat for testing: podman version Client: Podman Engine Version: 4.0.2 API Version: 4.0.2 If I use Docker (version 20.10.13 for me right now) then it builds fine with that COPY git line. Nice! To use podman, I need to either comment out the COPY git line, or mkdir .git first. Thanks, Jim
Re: [VOTE] Apache Flink Kubernetes Operator Release 1.0.0, release candidate #2
Hi Devs! We have been performing extensive manual testing on the release. We have found some very infrequent savepoint upgrade issues with Flink 1.15 deployments that we could not reliably reproduce so far. I have added a fix that hopefully eliminates the root cause ( https://issues.apache.org/jira/browse/FLINK-27804) Since adding this we haven't hit the same problem again, so our confidence in the fix is growing :) I think it would make sense to create a new RC including this fix tomorrow, or whenever you have time Yang. We do not need to rush this release, I would prefer to take an extra 1-2 days to eliminate these corner cases as much as possible. Cheers, Gyula On Fri, May 27, 2022 at 11:09 AM Jim Busche wrote: > Hi Yang, > > > Oh, I’ve been using podman on Red Hat for testing: > > podman version > > Client: Podman Engine > > Version: 4.0.2 > > API Version: 4.0.2 > > > If I use Docker (version 20.10.13 for me right now) then it builds fine > with that COPY git line. Nice! > > > > To use podman, I need to either comment out the COPY git line, or mkdir > .git first. > > > > Thanks, Jim > > > > > >
[jira] [Created] (FLINK-27822) Translate the doc of checkpoint/savepoint guarantees
fanrui created FLINK-27822: -- Summary: Translate the doc of checkpoint/savepoint guarantees Key: FLINK-27822 URL: https://issues.apache.org/jira/browse/FLINK-27822 Project: Flink Issue Type: Improvement Components: chinese-translation, Documentation Affects Versions: 1.15.0, 1.16.0 Reporter: fanrui Fix For: 1.16.0, 1.15.1 Translate the change of FLINK-26134 -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [VOTE] FLIP-224: Blocklist Mechanism
Hi Chesnay, Would you share your thoughts in the discussion thread if there are still concerns? Thanks, Zhu Chesnay Schepler 于2022年5月27日周五 14:54写道: > > -1 to put a lid on things for now, because I'm not quite done yet with > the discussion. > > On 27/05/2022 05:25, Yangze Guo wrote: > > +1 (binding) > > > > Best, > > Yangze Guo > > > > On Thu, May 26, 2022 at 3:54 PM Yun Gao > > wrote: > >> Thanks Lijie and Zhu for driving the FLIP! > >> > >> The blocked list functionality helps reduce the complexity in maintenance > >> and the currently design looks good to me, thus +1 from my side (binding). > >> > >> > >> Best, > >> Yun > >> > >> > >> > >> > >> -- > >> From:Xintong Song > >> Send Time:2022 May 26 (Thu.) 12:51 > >> To:dev > >> Subject:Re: [VOTE] FLIP-224: Blocklist Mechanism > >> > >> Thanks for driving this effort, Lijie. > >> > >> I think a nice addition would be to make this feature accessible directly > >> from webui. However, there's no reason to block this FLIP on it. > >> > >> So +1 (binding) from my side. > >> > >> Best, > >> > >> Xintong > >> > >> > >> > >> On Fri, May 20, 2022 at 12:57 PM Lijie Wang > >> wrote: > >> > >>> Hi everyone, > >>> > >>> Thanks for the feedback for FLIP-224: Blocklist Mechanism [1] on the > >>> discussion thread [2] > >>> > >>> I'd like to start a vote for it. The vote will last for at least 72 hours > >>> unless there is an objection or insufficient votes. > >>> > >>> [1] > >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blocklist+Mechanism > >>> [2] https://lists.apache.org/thread/fngkk52kjbc6b6v9nn0lkfq6hhsbgb1h > >>> > >>> Best, > >>> Lijie > >>> >
Re: [DISCUSS] FLIP-224: Blacklist Mechanism
Regarding the concern of the SlotManager, my two cents: 1. it is necessary for the SlotManager to host blocked slots, in 2 cases: a. In standalone mode, a taskmanager may be temporarily added to the blocklist. We do not want the TM to get disconnected and shut down. So we need to keep its connection to RM and keep hosting its slots. b. When we want to avoid allocating slots to a slow nodes but do not want to kill current running tasks on the nodes (MARK_BLOCKED mode). There is possible a way to keep the connection of a blocked task manager while hide its slots from SlotManager, but I feel it may be even much more complicated. 2. It will not complicate the SlotManager too much. The SlotManager will be offered a BlockedTaskManagerChecker when created, and just need to use it to filter out blocked slots on slot request. Therefore I think the complication is acceptable. Thanks, Zhu Lijie Wang 于2022年5月25日周三 15:26写道: > > Hi everyone, > > I've updated the FLIP according to Chesnay's feedback, changes as follows: > 1. Change the GET result to a map. > 2. Only left *endTimestamp* in ADD operation, and change the rest (from > POST) to PUT > 3. Introduce a new slot pool implementation(BlocklistSlotPool) to > encapsulate blocklist related functions. > 4. Remove *mainThread* from BlocklistTracker, instead provide a > *removeTimeoutItems* method to be called by outside components。 > > Best, > Lijie > > Lijie Wang 于2022年5月23日周一 22:51写道: > > > Hi Chesnay, > > > > Thanks for feedback. > > > > 1. Regarding the TM/Node id. Do you mean special characters may appear in > > the rest URL? Actually, I don't think so. The task manager id in REST API > > should be the *ResourceID* of taskmanager in Flink, there should be no > > special characters, and some existing REST APIs are already using it, e.g. > > GET: http://{jm_rest_address:port}/taskmanagers/. The node > > id should be an IP of a machine or node name in Yarn/Kubernetes, I think it > > should also have no special characters. > > 2. Regarding the GET query responses. I agree with you, it makes sense to > > change the GET result to a map. > > > > 3. Regarding the endTimestamp. I also agree with you, endTimestamp can > > cover everything, and the endTimestamp is a unix timestamp, there should be > > no timezone issues. But I think PUT and DELETE are enough, no PATCH. The > > add rest api is add or update, PUT can cover this semantics. > > > > 4. Regarding the slot pool/manager. I don't think the current slotpool > > and slotmanager are able to support the MARK_BLOCKED(slots that are > > already allocated will not be affected) action. The reasons are as > > follows: > > > > a) for slot pool, with the MARK_BLOCKED action, when a slot state changes > > from reserved(task assigned) to free(no task assigned), it is necessary to > > check whether the slot should be released immediately(it should be released > > immediately if the task manager is blocked, otherwise it may be allocated > > to other tasks). I think it cannot be supported without being aware of > > the blocklist information. Compared to the solution in FLIP, a more > > appropriate/prefered way may be: Introduce a new slot pool > > implementation for blocklist(may be named BlocklistSlotPool, it > > extends/wrapps the original slot pool), and implement the parts that need > > to be aware of the blocklist in this newly introduced slot pool, and the > > original slot pool basically does not need to change. > > > > b) for slot manager, with the MARK_BLOCKED action, there may be free but > > blocked slots in slot manager (the corresponding TMs cannot be > > released/unregistered because there are still running tasks on them). > > Therefore, we need to filter out the blocked slots when trying to fulfill > > the slot requirements. Therefore it also needs to know the blocklist > > information. > > A better way may be to abstract a resource allocation strategy, and make > > the blocklist as a special implementation, then pass the resource > > allocation strategy in when constructing the slot manager. Unfortunately, > > the data structures in the two existing slot manager > > implementations(*DeclarativeSlotManager* and *FineGrainedSlotManager*) are > > quite different, it is not easy to abstract a common resource allocation > > strategy, so we prefer to keep the current way(i.e. pass the blocklist > > information directly into slot manager). > > > > > > 5. Regarding the BlocklistTracker. I also agree with you, the > > BlocklistTracker > > does not need to be aware of the executor, and the timeout actions can be > > done outside. > > > > Chesnay Schepler 于2022年5月20日周五 17:34写道: > > > >> I have a number of concerns: > >> > >> Is the id used for deleting an item the same sent in the initial request > >> (and not one returned by Flink)? > >> I'm very concerned that the tm/node id can contain special characters. > >> > >> The GET query should return a map, not a list of items. This makes it > >> easier to work with. >