[jira] [Created] (FLINK-13097) Buffer depletion in SimpleCollectingOutputView throws non-obvious EOFException
Cyrille Chépélov created FLINK-13097: Summary: Buffer depletion in SimpleCollectingOutputView throws non-obvious EOFException Key: FLINK-13097 URL: https://issues.apache.org/jira/browse/FLINK-13097 Project: Flink Issue Type: Improvement Affects Versions: 1.8.0, 1.7.2 Environment: SCIO + BEAM + Flink under scala 2.12, java 8 Reporter: Cyrille Chépélov When SimpleCollectingOutputView is used, records are collected into a pre-allocated MemorySegmentSource. In case of depletion, the SimpleCollectingOutputView#nextSegment method throws EOFException without a message. This can be non-obvious to diagnose as a newcomer, as * the Java SDK documentation strongly suggests EOFException is related to an inability to read further (whereas in this context, the exception materializes an inability to _write_ further) * awareness than pre-allocated, fixed-size buffers are in play may not necessarily be expected of a newcomer to flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13536) Improve nullability handling in Types
François Lacombe created FLINK-13536: Summary: Improve nullability handling in Types Key: FLINK-13536 URL: https://issues.apache.org/jira/browse/FLINK-13536 Project: Flink Issue Type: Improvement Components: API / Type Serialization System, Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.8.0 Reporter: François Lacombe Currently, Avro to Flink type matching doesn't propagate nullability definition. In Avro : {code:java} "type":["null","string"]{code} allows Java String myField=null; while {code:java} "type":"string"{code} doesn't. It may be good to find corresponding property in Flink types too as to check for nullability in JsonRowDeserializationSchema for instance (null or absent field in parsed JSON should only be possible on nullable fields) Thanks in advance -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13538) Give field names in deserializers thrown exceptions
François Lacombe created FLINK-13538: Summary: Give field names in deserializers thrown exceptions Key: FLINK-13538 URL: https://issues.apache.org/jira/browse/FLINK-13538 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.8.0 Reporter: François Lacombe Deserializers like JsonRowDeserializerSchema parse JSON strings according to a TypeInformation object. Types mistakes can occur and it usually rise a IOException caused by a IllegalStateException. Here I try to parse "field":"blabla" described with Type.INT {code:java} java.io.IOException: Failed to deserialize JSON object. at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:97) at com.dcbrain.etl.inputformat.JsonInputFormat.nextRecord(JsonInputFormat.java:96) at com.dcbrain.etl.inputformat.JsonInputFormat.nextRecord(JsonInputFormat.java:1) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Unsupported type information 'Integer' for node: "blabla" at org.apache.flink.formats.json.JsonRowDeserializationSchema.convert(JsonRowDeserializationSchema.java:191) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertRow(JsonRowDeserializationSchema.java:212) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:95) ... 5 common frames omitted{code} The message nor the exception objects contains reference to field causing this error which require time to inspect complex input data to find where the error really is. Could it be possible to improve messages or even Exceptions objects thrown by Serializers/Deserializers to get which field is responsible of the error please? JsonRowDeserializerSchema isn't the only one touched by such issues. This will allow to produce more useful logs to be read by users or administrators. All the best -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13586) Method ClosureCleaner.clean broke backward compatibility between 1.8.0 and 1.8.1
Gaël Renoux created FLINK-13586: --- Summary: Method ClosureCleaner.clean broke backward compatibility between 1.8.0 and 1.8.1 Key: FLINK-13586 URL: https://issues.apache.org/jira/browse/FLINK-13586 Project: Flink Issue Type: Bug Affects Versions: 1.8.1 Reporter: Gaël Renoux Method clean in org.apache.flink.api.java.ClosureCleaner received a new parameter in Flink 1.8.1. This class is noted as internal, but is used in the Kafka connectors (in org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase). The Kafka connectors library is not provided by the server, and must be set up as a dependency with compile scope (see https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#usage, or the Maven project template). Any project using those connectors and built with 1.8.0 cannot be deployed on a 1.8.1 Flink server, because it would target the old method. => This methods needs a fallback with the original two arguments (setting a default value of RECURSIVE for the level argument). -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13721) BroadcastState should support StateTTL
Kerem Ulutaş created FLINK-13721: Summary: BroadcastState should support StateTTL Key: FLINK-13721 URL: https://issues.apache.org/jira/browse/FLINK-13721 Project: Flink Issue Type: Improvement Components: API / DataStream, Runtime / Queryable State Affects Versions: 1.8.1 Environment: MacOS 10.14.6 running IntelliJ Idea Ultimate 2019.2, Flink version 1.8.1 Reporter: Kerem Ulutaş Attachments: DebugBroadcastStateTTL.java, IntHolder.java, StringHolder.java, flink_broadcast_state_ttl_debug.log Hi everyone, Sorry if I'm doing anything wrong, this is my first issue in Apache Jira. I have a use case which requires 2 data streams to be cross joined. To be exact, one stream is location updates from clients and the other stream is event data with location information. I'm trying to get events that happen within a certain radius of client location(s). Since the streams can not be related to each other by using a common key for partitioning, I have to broadcast client updates to all tasks and evaluate the radius check for each event. The requirement here is, if we don't receive any location updates from a client for a certain amount of time, we should consider the client is "gone" (similar to the rationale stated in FLINK-3089 description: https://issues.apache.org/jira/browse/FLINK-3089) I have attached the sample application classes I used to debug BroadcastState and StateTTL together. The output (see flink_broadcast_state_ttl_debug.log) shows that client "c0" got its first event at 17:08:07.67 (expected to be evicted sometime after 17:08:37.xxx) but doesn't get evicted. For the operator part (which is the result of BroadcastConnectedStream.process) - since context in onTimer method doesn't let user to change contents of the broadcast state, only way to deal with stale client data is as follows: * In processElement method, calculate result for client data which is newer than the ttl * In processBroadcastElement method, remove client data older than a certain amount of time from the broadcast state If broadcast side of the connected streams doesn't get data for longer than the desired time-to-live amount of time, BroadcastState will hold stale data and processElement method would have to filter those client data each time. This is the method I am using in production right now. I am not aware of any decision or limitation that makes it not possible to implement StateTTL for BroadcastState, I will be pleased if someone explains if there are any. Thanks and regards. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (FLINK-13836) Improve support of java.util.UUID for JDBCTypeUtil
François Lacombe created FLINK-13836: Summary: Improve support of java.util.UUID for JDBCTypeUtil Key: FLINK-13836 URL: https://issues.apache.org/jira/browse/FLINK-13836 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Affects Versions: 1.8.0 Reporter: François Lacombe Currently, JDBCTypeUtil used by JDBCAppendTableSinkBuilder dones't support UUID types with java.util.UUID in Java. Could it be possible to handle that as to allow to write UUID directly to postgresql please? -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-13925) ClassLoader in BlobLibraryCacheManager is not using context class loader
Jan Lukavský created FLINK-13925: Summary: ClassLoader in BlobLibraryCacheManager is not using context class loader Key: FLINK-13925 URL: https://issues.apache.org/jira/browse/FLINK-13925 Project: Flink Issue Type: Bug Affects Versions: 1.9.0, 1.8.1 Reporter: Jan Lukavský Fix For: 1.8.2, 1.9.1 Use thread's current context classloader as parent class loader of flink user code class loaders. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-13945) Vendor-repos Maven profile doesn't exist in flink-shaded
Elise Ramé created FLINK-13945: -- Summary: Vendor-repos Maven profile doesn't exist in flink-shaded Key: FLINK-13945 URL: https://issues.apache.org/jira/browse/FLINK-13945 Project: Flink Issue Type: Bug Components: BuildSystem / Shaded Affects Versions: shaded-8.0, shaded-7.0, shaded-9.0 Reporter: Elise Ramé According to [documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.9/flinkDev/building.html#custom--vendor-specific-versions], to build Flink against a vendor specific Hadoop version it is necessary to build flink-shaded against this version first : {code:bash} mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version= {code} vendor-repos profile has to be activated to include Hadoop vendors repositories. But Maven cannot find expected Hadoop dependencies and returns an error because vendor-repos profile isn't defined in flink-shaded. Example using flink-shaded 8.0 and HDP 2.6.5 Hadoop version : {code:bash} mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=2.7.3.2.6.5.0-292 {code} {code:bash} [INFO] ---< org.apache.flink:flink-shaded-hadoop-2 >--- [INFO] Building flink-shaded-hadoop-2 2.7.3.2.6.5.0-292-8.0 [10/11] [INFO] [ jar ]- [WARNING] The POM for org.apache.hadoop:hadoop-common:jar:2.7.3.2.6.5.0-292 is missing, no dependency information available [WARNING] The POM for org.apache.hadoop:hadoop-hdfs:jar:2.7.3.2.6.5.0-292 is missing, no dependency information available [WARNING] The POM for org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.7.3.2.6.5.0-292 is missing, no dependency information available [WARNING] The POM for org.apache.hadoop:hadoop-yarn-client:jar:2.7.3.2.6.5.0-292 is missing, no dependency information available [WARNING] The POM for org.apache.hadoop:hadoop-yarn-common:jar:2.7.3.2.6.5.0-292 is missing, no dependency information available [INFO] [INFO] Reactor Summary: [INFO] [INFO] flink-shaded 8.0 ... SUCCESS [ 2.122 s] [INFO] flink-shaded-force-shading 8.0 . SUCCESS [ 0.607 s] [INFO] flink-shaded-asm-7 7.1-8.0 . SUCCESS [ 0.667 s] [INFO] flink-shaded-guava-18 18.0-8.0 . SUCCESS [ 1.452 s] [INFO] flink-shaded-netty-4 4.1.39.Final-8.0 .. SUCCESS [ 4.597 s] [INFO] flink-shaded-netty-tcnative-dynamic 2.0.25.Final-8.0 SUCCESS [ 0.620 s] [INFO] flink-shaded-jackson-parent 2.9.8-8.0 .. SUCCESS [ 0.018 s] [INFO] flink-shaded-jackson-2 2.9.8-8.0 ... SUCCESS [ 0.914 s] [INFO] flink-shaded-jackson-module-jsonSchema-2 2.9.8-8.0 . SUCCESS [ 0.627 s] [INFO] flink-shaded-hadoop-2 2.7.3.2.6.5.0-292-8.0 FAILURE [ 0.047 s] [INFO] flink-shaded-hadoop-2-uber 2.7.3.2.6.5.0-292-8.0 ... SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 11.947 s [INFO] Finished at: 2019-09-03T16:52:59+02:00 [INFO] [WARNING] The requested profile "vendor-repos" could not be activated because it does not exist. [ERROR] Failed to execute goal on project flink-shaded-hadoop-2: Could not resolve dependencies for project org.apache.flink:flink-shaded-hadoop-2:jar:2.7.3.2.6.5.0-292-8.0: The following artifacts could not be resolved: org.apache.hadoop:hadoop-common:jar:2.7.3.2.6.5.0-292, org.apache.hadoop:hadoop-hdfs:jar:2.7.3.2.6.5.0-292, org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.7.3.2.6.5.0-292, org.apache.hadoop:hadoop-yarn-client:jar:2.7.3.2.6.5.0-292, org.apache.hadoop:hadoop-yarn-common:jar:2.7.3.2.6.5.0-292: Failure to find org.apache.hadoop:hadoop-common:jar:2.7.3.2.6.5.0-292 in https://repo.maven.apache.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of central has elapsed or updates are forced -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn -rf :flink-shaded-hadoop-2 {code} vendor-repos profile exists in Flink pom.xml file : [https://github.com/apache/flink/blob/3079d11913f153ec40c75afb5356fd3be1a1e550/pom.xml#L1037] -- This messa
[jira] [Created] (FLINK-31935) The new resource requirements REST API is only available for session clusters
David Morávek created FLINK-31935: - Summary: The new resource requirements REST API is only available for session clusters Key: FLINK-31935 URL: https://issues.apache.org/jira/browse/FLINK-31935 Project: Flink Issue Type: Bug Components: Runtime / REST Affects Versions: 1.18.0 Reporter: David Morávek We need to register both `JobResourceRequirementsHandler` and ` JobResourceRequirementsUpdateHandler` for application / per-job clusters as well. These handlers have been introduced as part of FLINK-31316. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31954) Prevent Mockito for the new code with Checkstyle
David Morávek created FLINK-31954: - Summary: Prevent Mockito for the new code with Checkstyle Key: FLINK-31954 URL: https://issues.apache.org/jira/browse/FLINK-31954 Project: Flink Issue Type: Improvement Components: Build System Reporter: David Morávek Based on [https://lists.apache.org/thread/xl456044hmxk87mwq02p4m22yp3b04sc] discussion. We'll set up a Checkstyle rule that disallows Mockito usage and create a one-off suppression list for the existing violations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31955) Prevent JUnit 4 usage for the new code with Checkstyle
David Morávek created FLINK-31955: - Summary: Prevent JUnit 4 usage for the new code with Checkstyle Key: FLINK-31955 URL: https://issues.apache.org/jira/browse/FLINK-31955 Project: Flink Issue Type: Improvement Components: Build System Environment: Based on [https://lists.apache.org/thread/xl456044hmxk87mwq02p4m22yp3b04sc] discussion. We'll set up a Checkstyle rule that disallows JUnit 4 usage and create a one-off suppression list for the existing violations. Reporter: David Morávek -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31996) Chaining operators with different max parallelism prevents rescaling
David Morávek created FLINK-31996: - Summary: Chaining operators with different max parallelism prevents rescaling Key: FLINK-31996 URL: https://issues.apache.org/jira/browse/FLINK-31996 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: David Morávek We might chain operators with different max parallelism together if they are set to have the same parallelism initially. When we decide to rescale the JobGraph vertices (using AdaptiveScheduler), we're gapped by the lowest maxParallelism of the operator chain. This is especially visible with things like CollectSink, TwoPhaseCommitSink, CDC, and a GlobalCommiter with maxParallelism set to 1. An obvious solution would be to prevent the chaining of operators with different maxParallelism, but we need to double-check this doesn't introduce a breaking change. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32000) Expose vertex max parallelism in the WebUI
David Morávek created FLINK-32000: - Summary: Expose vertex max parallelism in the WebUI Key: FLINK-32000 URL: https://issues.apache.org/jira/browse/FLINK-32000 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Reporter: David Morávek Attachments: Screenshot 2023-05-04 at 14.15.34.png It would be great to expose max parallelism in the vertex detail drawer for debug purposes !Screenshot 2023-05-04 at 14.15.34.png! . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32006) AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry times out on Azure
David Morávek created FLINK-32006: - Summary: AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry times out on Azure Key: FLINK-32006 URL: https://issues.apache.org/jira/browse/FLINK-32006 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.18.0 Reporter: David Morávek {code:java} May 04 13:52:18 [ERROR] org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry Time elapsed: 100.009 s <<< ERROR! May 04 13:52:18 org.junit.runners.model.TestTimedOutException: test timed out after 100 seconds May 04 13:52:18 at java.lang.Thread.sleep(Native Method) May 04 13:52:18 at org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncWaitOperatorTest.java:1313) May 04 13:52:18 at org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry(AsyncWaitOperatorTest.java:1277) May 04 13:52:18 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) May 04 13:52:18 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) May 04 13:52:18 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) May 04 13:52:18 at java.lang.reflect.Method.invoke(Method.java:498) May 04 13:52:18 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) May 04 13:52:18 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) May 04 13:52:18 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) May 04 13:52:18 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) May 04 13:52:18 at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) May 04 13:52:18 at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) May 04 13:52:18 at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) May 04 13:52:18 at java.util.concurrent.FutureTask.run(FutureTask.java:266) May 04 13:52:18 at java.lang.Thread.run(Thread.java:748) {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48671&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=9288 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32010) KubernetesLeaderRetrievalDriver always waits for lease update to resolve leadership
David Morávek created FLINK-32010: - Summary: KubernetesLeaderRetrievalDriver always waits for lease update to resolve leadership Key: FLINK-32010 URL: https://issues.apache.org/jira/browse/FLINK-32010 Project: Flink Issue Type: Bug Affects Versions: 1.16.1, 1.17.0, 1.18.0 Reporter: David Morávek The k8s-based leader retrieval is based on ConfigMap watching. The config map lifecycle (from the consumer point of view) is handled as a series of events with the following types: * ADDED -> the first time the consumer has seen the CM * UPDATED -> any further changes to the CM * DELETED -> ... you get the idea The implementation assumes that ElectionDriver (the one that creates the CM) and ElectionRetriver are started simultaneously and therefore ignore the ADDED events because the CM is always created as empty and is updated with the leadership information later on. This assumption is incorrect in the following cases (I might be missing some, but that's not important, the goal is to illustrate the problem): * TM joining the cluster later when the leaders are established to discover RM / JM * RM tries to discover JM when MultipleComponentLeaderElectionDriver is used This, for example, leads to higher job submission latencies that could be unnecessarily held back for up to the lease retry period [1]. [1] Configured by _high-availability.kubernetes.leader-election.retry-period_ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32160) CompactOperator cannot continue from checkpoint because of java.util.NoSuchElementException
Michał Fijołek created FLINK-32160: -- Summary: CompactOperator cannot continue from checkpoint because of java.util.NoSuchElementException Key: FLINK-32160 URL: https://issues.apache.org/jira/browse/FLINK-32160 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.17.0, 1.16.0 Environment: Flink 1.17 on k8s (flink-kubernetes-operator v.1.4.0), s3 Reporter: Michał Fijołek Hello :) We have a flink job (v 1.17) on k8s (using official flink-k8s-operator) that reads data from kafka and writes it to s3 using flink-sql using compaction. Job sometimes fails and continues from checkpoint just fine, but once a couple of days we experience a crash loop. Job cannot continue from the latest checkpoint and fails with such exception: {noformat} java.util.NoSuchElementException at java.base/java.util.ArrayList$Itr.next(Unknown Source) at org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:114) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Unknown Source){noformat} Here’s the relevant code: [https://github.com/apache/flink/blob/release-1.17/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114] It looks like `CompactOperator` is calling `next()` on iterator without checking `hasNext()` first - why's that? Is it a bug? Why `context.getOperatorStateStore().getListState(metaDescriptor)` returns empty iterator? Is latest checkpoint broken in such case? We have an identical job, but without compaction, and it works smoothly for a couple of weeks now. The whole job is just `select` from kafka and `insert` to s3. {noformat} CREATE EXTERNAL TABLE IF NOT EXISTS hive.`foo`.`bar` ( `foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING, `foo_bar4` STRING ) PARTITIONED BY (`foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING) STORED AS parquet LOCATION 's3a://my/bucket/' TBLPROPERTIES ( 'auto-compaction' = 'true', 'compaction.file-size' = '128MB', 'sink.parallelism' = '8', 'format' = 'parquet', 'parquet.compression' = 'SNAPPY', 'sink.rolling-policy.rollover-interval' = '1 h', 'sink.partition-commit.policy.kind' = 'metastore' ){noformat} Checkpoint configuration: {noformat} Checkpointing Mode Exactly Once Checkpoint Storage FileSystemCheckpointStorage State Backend HashMapStateBackend Interval 20m 0s Timeout 10m 0s Minimum Pause Between Checkpoints 0ms Maximum Concurrent Checkpoints 1 Unaligned Checkpoints Disabled Persist Checkpoints Externally Enabled (retain on cancellation) Tolerable Failed Checkpoints 0 Checkpoints With Finished Tasks Enabled State Changelog Disabled{noformat} Is there something wrong with given config or is this some unhandled edge case? Currently our workaround is to restart a job, without using checkpoint - it uses a state from kafka which in this case is fine -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32228) Bump testcontainers
João Boto created FLINK-32228: - Summary: Bump testcontainers Key: FLINK-32228 URL: https://issues.apache.org/jira/browse/FLINK-32228 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Reporter: João Boto Bump testcontainers version -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins
Luís Costa created FLINK-32318: -- Summary: [flink-operator] missing s3 plugin in folder plugins Key: FLINK-32318 URL: https://issues.apache.org/jira/browse/FLINK-32318 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.5.0 Reporter: Luís Costa Greetings, I'm trying to configure [Flink's Kubernetes HA services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/] for flink operator jobs, but got an error regarding s3 plugin: _"Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto"_ {code:java} 2023-06-12 10:05:16,981 INFO akka.remote.Remoting [] - Starting remoting 2023-06-12 10:05:17,194 INFO akka.remote.Remoting [] - Remoting started; listening on addresses :[akka.tcp://flink@10.4.125.209:6123] 2023-06-12 10:05:17,377 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system started at akka.tcp://flink@10.4.125.209:6123 2023-06-12 10:05:18,175 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting KubernetesApplicationClusterEntrypoint down with application status FAILED. Diagnostics org.apache.flink.util.FlinkException: Could not create the ha services from the instantiated HighAvailabilityServicesFactory org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory. at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729) at org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86) Caused by: java.io.IOException: Could not create FileSystem for highly available storage path (s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102) at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86) at org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296) ... 10 more Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99) ... 13 more {code} Looking into the container, can see that s3 plugins are in folder /opt/flink/ instead of s3/plugins as mentioned
[jira] [Created] (FLINK-32376) [FLIP-287] Extend Sink#InitContext to expose TypeSerializer, ObjectReuse and JobID
João Boto created FLINK-32376: - Summary: [FLIP-287] Extend Sink#InitContext to expose TypeSerializer, ObjectReuse and JobID Key: FLINK-32376 URL: https://issues.apache.org/jira/browse/FLINK-32376 Project: Flink Issue Type: Improvement Reporter: João Boto -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32540) The issue of not distributing the last batch of data
原来你是小幸运001 created FLINK-32540: -- Summary: The issue of not distributing the last batch of data Key: FLINK-32540 URL: https://issues.apache.org/jira/browse/FLINK-32540 Project: Flink Issue Type: Bug Environment: The above code was executed in IntelliJ IDEA, Flink version 1.16, which also has this issue in 1.14. Other versions have not attempted it Reporter: 原来你是小幸运001 I copied the source code of the flat map and wanted to implement my own flat map. One of the logic is to issue the last piece of data at the end of the Flink job, so I executed collector.collect in the close method, but the data was not issued and the operator below cannot receive it. {code:java} import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Collector; /** * @author LaiYongBIn * @date 2023/7/5 10:09 * @Description Do SomeThing */ public class Test { public static void main(String[] args) throws Exception { ParameterTool parameter = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream stream0 = env.addSource(new SourceFunction() { @Override public void run(SourceContext sourceContext) throws Exception { sourceContext.collect("TEST"); System.out.println("cancel"); } @Override public void cancel() { } }) .setParallelism(1); MyFlatMapFun flatMapFunc = new MyFlatMapFun(); TypeInformation outType = TypeExtractor.getFlatMapReturnTypes(env.clean(flatMapFunc), stream0.getType(), Utils.getCallLocationName(), true); DataStream flatMap = stream0.transform("Flat Map", outType, new MyStreamOperator(env.clean(flatMapFunc))).setParallelism(1); flatMap.flatMap(new FlatMapFunction() { @Override public void flatMap(String s, Collector collector) throws Exception { System.out.println("Obtain upstream data is:" + s); } }); env.execute(); } } class MyStreamOperator extends AbstractUdfStreamOperator> implements OneInputStreamOperator { private transient TimestampedCollector collector; public MyStreamOperator(FlatMapFunction userFunction) { super(userFunction); } @Override public void open() throws Exception { collector = new TimestampedCollector<>(output); } @Override public void close() throws Exception { // Distribute data during close collector.collect("close message"); } @Override public void processElement(StreamRecord streamRecord) throws Exception { // do nothing } } class MyFlatMapFun implements FlatMapFunction { @Override public void flatMap(String s, Collector collector) throws Exception { // do nothing } } {code} Then I found out there was a finish method, and I tried to execute 'collector. collect' in the finish method, and the data was successfully distributed。 {code:java} import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.runt
[jira] [Created] (FLINK-32567) when Flink write azure data lake storage,error occur
宇宙先生 created FLINK-32567: Summary: when Flink write azure data lake storage,error occur Key: FLINK-32567 URL: https://issues.apache.org/jira/browse/FLINK-32567 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.17.1 Reporter: 宇宙先生 Attachments: image-2023-07-10-14-26-22-019.png, image-2023-07-10-14-28-11-792.png When I strictly followed the official website to perform these two operations, I still reported the wrong certification problem, and I wanted to know how I should turn on the certification !image-2023-07-10-14-26-22-019.png! error: !image-2023-07-10-14-28-11-792.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32627) Add support for dynamic time window function
张一帆 created FLINK-32627: --- Summary: Add support for dynamic time window function Key: FLINK-32627 URL: https://issues.apache.org/jira/browse/FLINK-32627 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.18.0 Reporter: 张一帆 Fix For: 1.18.0 When using windows for calculations, when the logic is frequently modified and adjusted, the entire program needs to be stopped, the code is modified, the program is repackaged and then submitted to the cluster. It is impossible to achieve logic dynamic modification and external dynamic injection. The window information can be obtained from the data to trigger Redistribution of windows to achieve the effect of dynamic windows{*}{*} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32629) Add support for dynamic CEP
张一帆 created FLINK-32629: --- Summary: Add support for dynamic CEP Key: FLINK-32629 URL: https://issues.apache.org/jira/browse/FLINK-32629 Project: Flink Issue Type: New Feature Components: Library / CEP Affects Versions: 1.18.0 Reporter: 张一帆 Fix For: 1.18.0 When using CEP as a complex event processing engine, when the logic is frequently modified and the threshold is frequently adjusted, the entire program needs to be stopped, the code should be modified, the program should be repackaged, and then submitted to the cluster. Dynamic logic modification and external dynamic injection cannot be realized. Currently, Realized the dynamic injection of CEP logic, based on message-driven logic modification, you can manually inject specific messages into the source end to achieve fine-grained control of logic injection perception -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32650) Added the ability to split flink-protobuf codegen code
李精卫 created FLINK-32650: --- Summary: Added the ability to split flink-protobuf codegen code Key: FLINK-32650 URL: https://issues.apache.org/jira/browse/FLINK-32650 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.17.0 Reporter: 李精卫 Fix For: 1.17.0 Flink serializes and deserializes protobuf format data by calling the decode or encode method in GeneratedProtoToRow_XXX.java generated by codegen to parse byte[] data into protobuf java objects. The size of the decode/encode codegen method body is strongly related to the number of defined fields in protobuf. When the number of fields exceeds a certain threshold and the compiled method body exceeds 8k, the decode/encode method will not be optimized by JIT, seriously affecting serialization or deserialization performance. Even if the compiled method body exceeds 64k, it will directly cause the task to fail to start. So I proposed Codegen Splitter for protobuf parsing to split the encode/decode method to solve this problem. The specific idea is as follows. In the current decode/encode method, each field defined for the protobuf message is placed in the method body. In fact, there are no shared parameters between the fields, so multiple fields can be merged and parsed and written into the split method body. If the number of strings in the current method body exceeds the threshold, a split method will be generated, these fields will be parsed in the split method, and the split method will be called in the decode/encode method. By analogy, the decode/encode method including the split method is finally generated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32812) HBaseRowDataLookupFunction HTable instantiation of thread safety problems
王江洲 created FLINK-32812: --- Summary: HBaseRowDataLookupFunction HTable instantiation of thread safety problems Key: FLINK-32812 URL: https://issues.apache.org/jira/browse/FLINK-32812 Project: Flink Issue Type: Bug Components: Connectors / HBase Affects Versions: 1.17.1 Environment: Flink 1.17.1 Hbase 2.4.11 @Override public void open(FunctionContext context) { LOG.info("start open ..."); Configuration config = prepareRuntimeConfiguration(); try { hConnection = ConnectionFactory.createConnection(config); table = (HTable) hConnection.getTable(TableName.valueOf(hTableName)); } catch (TableNotFoundException tnfe) { LOG.error("Table '{}' not found ", hTableName, tnfe); throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe); } catch (IOException ioe) { LOG.error("Exception while creating connection to HBase.", ioe); throw new RuntimeException("Cannot create connection to HBase.", ioe); } this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral); LOG.info("end open."); } Reporter: 王江洲 Fix For: 1.17.1 HBaseRowDataLookupFunction HTable instantiation of thread safety problems in the actual development environment, the program has been performed, the close () method of the large probability can't perform, result in Ttable cannot be shut down, all use the same Ttable subsequent applications, multithreading safety hazard, Data errors occur. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32961) A new FileSystemFactory that support two high available hdfs
王茂军 created FLINK-32961: --- Summary: A new FileSystemFactory that support two high available hdfs Key: FLINK-32961 URL: https://issues.apache.org/jira/browse/FLINK-32961 Project: Flink Issue Type: Improvement Components: FileSystems Reporter: 王茂军 Fix For: 1.18.0 I run realtime ETL program by flink on yarn. The ETL program sink user log to master hdfs, and sink checkpoint to anather micro hdfs.The master hdfs and micro hdfs are both high available. By default, the ETL program can not understand the dfs.nameservices of the micro hdfs. I prepare to write a custom org.apache.flink.core.fs.FileSystemFactory that support two or more ha hdfs.So that , i can sink user log to master hdfs, and save checkpoint data to micro hdfs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33163) Support Java 21 (LTS)
Maciej Bryński created FLINK-33163: -- Summary: Support Java 21 (LTS) Key: FLINK-33163 URL: https://issues.apache.org/jira/browse/FLINK-33163 Project: Flink Issue Type: Bug Reporter: Maciej Bryński Based on https://issues.apache.org/jira/browse/FLINK-15736 we should have similar ticket for Java 21 LTS. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic
Lauri Suurväli created FLINK-33231: -- Summary: Memory leak in KafkaSourceReader if no data in consumed topic Key: FLINK-33231 URL: https://issues.apache.org/jira/browse/FLINK-33231 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.17.1 Reporter: Lauri Suurväli Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png *Problem description* Our Flink streaming job TaskManager heap gets full when the job has nothing to consume and process. It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. When there are no messages in the source topic the TaskManager heap usage starts increasing until the job exits after receiving a SIGTERM signal. We are running the job on AWS EMR with YARN. The problems with the TaskManager heap usage do not occur when there is data to process. It's also worth noting that sending a single message to the source topic of a streaming job that has been sitting idle and suffers from the memory leak will cause the heap to be cleared. However it does not resolve the problem since the heap usage will start increasing immediately after processing the message. !Screenshot 2023-10-10 at 12.49.37.png! TaskManager heap used percentage is calculated by {code:java} flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / flink.taskmanager.Status.JVM.Memory.Heap.Max{code} I was able to take heap dumps of the TaskManager processes during a high heap usage percentage. Heap dump analysis detected 912,355 instances of java.util.HashMap empty collections retaining >= 43,793,040 bytes. !Screenshot 2023-10-09 at 14.13.43.png! The retained heap seemed to be located at: {code:java} org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code} !Screenshot 2023-10-09 at 13.02.34.png! *Possible hints:* An empty HashMap is added during the snapshotState method to offsetsToCommit map if it does not already exist for the given checkpoint. [KafkaSourceReader line 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107] {code:java} Map offsetsMap = offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); {code} If the startingOffset for the given split is >= 0 then a new entry would be added to the map from the previous step. [KafkaSourceReader line 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113] {code:java} if (split.getStartingOffset() >= 0) { offsetsMap.put( split.getTopicPartition(), new OffsetAndMetadata(split.getStartingOffset())); }{code} If the starting offset is smaller than 0 then this would leave the offsetMap created in step 1 empty. We can see from the logs that the startingOffset is -3 when the splits are added to the reader. {code:java} Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, StoppingOffset: -9223372036854775808], [Partition: source-events-44, StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: source-events-36, StartingOffset: 1, StoppingOffset: -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: source-events-28, StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code} The offsetsToCommit map is cleaned from entries once they have been committed to Kafka which happens during the callback function that is passed to the KafkaSourceFetcherManager.commitOffsets method in KafkaSourceReader.notifyCheckpointComplete method. However if the committedPartitions is empty for the given checkpoint, then the KafkaSourceFetcherManager.commitOffsets method returns. [KafkaSourceFetcherManager line 78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78] {code:java} if (offsetsToCommit.isEmpty()) { return; } {code} We can observe from the logs that indeed an empty map is encountered at this step: {code:java} Committing offsets {}{code} *Conclusion* It seems that an empty map gets added per each checkpoint to offsetsToCommit map. Since the startingOffset in our case is -3 then the empty map never gets filled. During the offset commit phase the offsets for these checkpoints are
[jira] [Created] (FLINK-33357) add Apache Software License 2
蔡灿材 created FLINK-33357: --- Summary: add Apache Software License 2 Key: FLINK-33357 URL: https://issues.apache.org/jira/browse/FLINK-33357 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.6.0 Reporter: 蔡灿材 Fix For: kubernetes-operator-1.5.0 Attachments: 2023-10-25 12-08-58屏幕截图.png Flinkdeployments.flink.apache.org - v1. Currently yml and flinksessionjobs.flink.apache.org - v1. Yml don't add add Apache Software License 2 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33378) Bump flink version on flink-connectors-jdbc
João Boto created FLINK-33378: - Summary: Bump flink version on flink-connectors-jdbc Key: FLINK-33378 URL: https://issues.apache.org/jira/browse/FLINK-33378 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Reporter: João Boto With the release of Flink 1.18, bump flink version on connector -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33393) flink document description error
蔡灿材 created FLINK-33393: --- Summary: flink document description error Key: FLINK-33393 URL: https://issues.apache.org/jira/browse/FLINK-33393 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.17.1 Reporter: 蔡灿材 Fix For: 1.17.1 Attachments: 捕获.PNG flink document description error, function part description error -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33528) Externalize Python connector code
Márton Balassi created FLINK-33528: -- Summary: Externalize Python connector code Key: FLINK-33528 URL: https://issues.apache.org/jira/browse/FLINK-33528 Project: Flink Issue Type: Technical Debt Components: API / Python, Connectors / Common Affects Versions: 1.18.0 Reporter: Márton Balassi Fix For: 1.19.0 During the connector externalization effort end to end tests for the python connectors were left in the main repository under: [https://github.com/apache/flink/tree/master/flink-python/pyflink/datastream/connectors] These include both python connector implementation and tests. Currently they depend on a previously released version of the underlying connectors, otherwise they would introduce a circular dependency given that they are in the flink repo at the moment. This setup prevents us from propagating any breaking change to PublicEvolving and Internal APIs used by the connectors as they lead to breaking the python e2e tests. We run into this while implementing FLINK-25857. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33556) Test infrastructure for externalized python code
Márton Balassi created FLINK-33556: -- Summary: Test infrastructure for externalized python code Key: FLINK-33556 URL: https://issues.apache.org/jira/browse/FLINK-33556 Project: Flink Issue Type: Sub-task Components: API / Python, Connectors / Common Affects Versions: 1.18.0 Reporter: Márton Balassi Assignee: Peter Vary Fix For: 1.19.0 We need to establish the reusable parts of the python infrastructure as part of the shared connector utils such that it can be easily reused. Ideally we would create a github workflow similar to https://github.com/apache/flink-connector-shared-utils/blob/ci_utils/.github/workflows/ci.yml. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33557) Externalize Cassandra Python connector code
Márton Balassi created FLINK-33557: -- Summary: Externalize Cassandra Python connector code Key: FLINK-33557 URL: https://issues.apache.org/jira/browse/FLINK-33557 Project: Flink Issue Type: Sub-task Reporter: Márton Balassi See description of parent ticket for context. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33558) Externalize Elasticsearch Python connector code
Márton Balassi created FLINK-33558: -- Summary: Externalize Elasticsearch Python connector code Key: FLINK-33558 URL: https://issues.apache.org/jira/browse/FLINK-33558 Project: Flink Issue Type: Sub-task Reporter: Márton Balassi See description of parent ticket for context. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33559) Externalize Kafka Python connector code
Márton Balassi created FLINK-33559: -- Summary: Externalize Kafka Python connector code Key: FLINK-33559 URL: https://issues.apache.org/jira/browse/FLINK-33559 Project: Flink Issue Type: Sub-task Affects Versions: 1.18.0 Reporter: Márton Balassi Assignee: Peter Vary Fix For: 1.19.0 See description of parent ticket for context. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33560) Externalize Kinesis Python connector code
Márton Balassi created FLINK-33560: -- Summary: Externalize Kinesis Python connector code Key: FLINK-33560 URL: https://issues.apache.org/jira/browse/FLINK-33560 Project: Flink Issue Type: Sub-task Components: API / Python, Connectors / Kinesis Affects Versions: 1.18.0 Reporter: Márton Balassi Fix For: 1.19.0 See description of parent ticket for context. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33561) Externalize Pulsar Python connector code
Márton Balassi created FLINK-33561: -- Summary: Externalize Pulsar Python connector code Key: FLINK-33561 URL: https://issues.apache.org/jira/browse/FLINK-33561 Project: Flink Issue Type: Sub-task Components: API / Python, Connectors / Pulsar Affects Versions: 1.18.0 Reporter: Márton Balassi Fix For: 1.19.0 See description of parent ticket for context. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33562) Externalize RabbitMQ Python connector code
Márton Balassi created FLINK-33562: -- Summary: Externalize RabbitMQ Python connector code Key: FLINK-33562 URL: https://issues.apache.org/jira/browse/FLINK-33562 Project: Flink Issue Type: Sub-task Components: API / Python, Connectors/ RabbitMQ Affects Versions: 1.18.0 Reporter: Márton Balassi Fix For: 1.19.0 See description of parent ticket for context. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33722) MATCH_RECOGNIZE in batch mode ignores events order
Grzegorz Kołakowski created FLINK-33722: --- Summary: MATCH_RECOGNIZE in batch mode ignores events order Key: FLINK-33722 URL: https://issues.apache.org/jira/browse/FLINK-33722 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.17.1 Reporter: Grzegorz Kołakowski MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider the following example: {code:sql} FROM events MATCH_RECOGNIZE ( PARTITION BY user_id ORDER BY ts ASC MEASURES FIRST(A.ts) as _start, LAST(A.ts) as _middle, LAST(B.ts) as _finish ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS DEFINE A AS active is false, B AS active is true ) AS T {code} where _events_ is a Postgresql table containing ~1 records. {code:java} CREATE TABLE events ( id INT, user_id INT, ts TIMESTAMP(3), active BOOLEAN, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://postgres:5432/test', 'username' = 'test', 'password' = 'test', 'table-name' = 'events' ); {code} It can happen that __finish_ is smaller than __start_ or {_}_middle{_}, which is wrong. {noformat} user_id _start _middle _finish 1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23 14:34:44.264{noformat} Repository where I reproduced the problem: https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging According to [~dwysakowicz]: In BATCH the CepOperator is always created to process records in processing time: https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54 A comparator is passed along to the operator covering the sorting on ts field: https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173 but this is only secondary sorting. It is applied only within records of the same timestamp. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33729) Events are getting lost when an exception occurs within a processing function
Rafał Trójczak created FLINK-33729: -- Summary: Events are getting lost when an exception occurs within a processing function Key: FLINK-33729 URL: https://issues.apache.org/jira/browse/FLINK-33729 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.15.3 Reporter: Rafał Trójczak We have a Flink job using a Pulsar source that reads from an input topic, and a Pulsar sink that is writing to an output topic. Both Flink and Pulsar connector are of version 1.15.3. The Pulsar version that I use is 2.10.3. Here is a simple project that is intended to reproduce this problem: [https://github.com/trojczak/flink-pulsar-connector-problem/] All of my tests were done on my local Kubernetes cluster using the Flink Kubernetes Operator and Pulsar is running on my local Docker. But the same problem occurred on a "normal" cluster. Expected behavior: When an exception is thrown within the code (or a TaskManager pod is restarted for any other reason, e.g. OOM exception), the processing should be picked up from the last event sent to the output topic. Actual behavior: The events before the failure are sent correctly to the output topic, next some of the events from the input topic are missing, then from some point the events are being processed normally until the next exception is thrown, and so on. Finally, from 100 events that should be sent from the input topic to the output topic, only 40 are sent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33955) UnsupportedFileSystemException when trying to save data to Azure's abfss File System
Alek Łańduch created FLINK-33955: Summary: UnsupportedFileSystemException when trying to save data to Azure's abfss File System Key: FLINK-33955 URL: https://issues.apache.org/jira/browse/FLINK-33955 Project: Flink Issue Type: Bug Affects Versions: 1.17.1, 1.18.0 Environment: Flink 1.17.1 & Flink 1.18.0 with Java 11, ADLS Gen.2 with hierarchical namespace enabled Reporter: Alek Łańduch Attachments: error.log, pom.xml, success.log When using Azure's File System connector for reading and writing files to Azure Data Lake Storage 2 Flink job fails at writing files with given error: {noformat} Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"{noformat} Full logs from Job Manager along with stack trace is attached to as [^error.log] file. The connection itself seems to be good, as the job successfully creates desired structure inside ADLS (and the the `.part` file), but the file itself is empty. The job is simple, as its only purpose is to save events `a`, `b` and `c` into a file on ADLS. The whole code is presented below: {code:java} import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class DataStreamJob { public static void main(String[] args) throws Exception { final FileSink sink = FileSink .forRowFormat( new Path("abfss://t...@stads2dev01.dfs.core.windows.net/output"), new SimpleStringEncoder("UTF-8")) .build(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromElements("a", "b", "c").sinkTo(sink); env.execute("Test"); } } {code} Code is run locally using Flink 1.18.0 (the same behavior was present in version 1.17.1). The only change that was made to `flink-conf.yaml` was to add key for accessing Azure: {code:java} fs.azure.account.auth.type.stads2dev01.dfs.core.windows.net: SharedKey fs.azure.account.key.stads2dev01.dfs.core.windows.net: **{code} The [^pom.xml] file was created by using [Getting Started|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#getting-started] documentation - the only thing I added was `flink-azure-fs-hadoop` connector. The whole [^pom.xml] file is attached. The connector JAR was also copied from `opt` directory to `plugins/azure-fs-hadoop` in cluster files according to the documentation. The interesting fact is that the deprecated method `writeAsText` (instead of FileSink) not only works and creates desired file on ADLS, but *the subsequent jobs that use FileSInk that previously failed now works and creates file successfully* (until cluster's restart). The logs from job with deprecated method are also attached here as [^success.log] file. I suspect that it is somehow connected to how Azure File System is initialized, where the new FileSink method would create it incorrectly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33976) AdaptiveScheduler cooldown period is taken from a wrong configuration
David Morávek created FLINK-33976: - Summary: AdaptiveScheduler cooldown period is taken from a wrong configuration Key: FLINK-33976 URL: https://issues.apache.org/jira/browse/FLINK-33976 Project: Flink Issue Type: Bug Components: Runtime / Configuration, Runtime / Coordination Reporter: David Morávek The new JobManager options introduced in FLINK-21883: `scaling-interval.\{min,max}` of AdaptiveScheduler are resolved from the per-Job configuration instead of JobManager's configuration, which is not correct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-16260) Add Java 11 based version of the docker images
Ismaël Mejía created FLINK-16260: Summary: Add Java 11 based version of the docker images Key: FLINK-16260 URL: https://issues.apache.org/jira/browse/FLINK-16260 Project: Flink Issue Type: New Feature Components: Release System / Docker Reporter: Ismaël Mejía Since 1.10.0 supports Java 11, we can add a version of the docker image based on Java 11 Feature [requested in our old issue tracker|https://github.com/docker-flink/docker-flink/issues/97] and moved here -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16262) Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory
Jürgen Kreileder created FLINK-16262: Summary: Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory Key: FLINK-16262 URL: https://issues.apache.org/jira/browse/FLINK-16262 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.10.0 Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 build (nothing changed regarding Kafka and/or class loading). Reporter: Jürgen Kreileder We're using Docker images modeled after [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile] (using Java 11) When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the taskmanager startup fails with: {code:java} org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.ByteArraySerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found. at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718) at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) at org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:396) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:76) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source) at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown Source) at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source) at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source) at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source){code} This looks like class loading issue: If I copy our JAR to FLINK_LIB_DIR instead of FLINK_USR_LIB_DIR, everything works find. (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16277) StreamTableEnvironment.toAppendStream fails with Decimal types
Benoît Paris created FLINK-16277: Summary: StreamTableEnvironment.toAppendStream fails with Decimal types Key: FLINK-16277 URL: https://issues.apache.org/jira/browse/FLINK-16277 Project: Flink Issue Type: Bug Components: Table SQL / Legacy Planner, Table SQL / Planner Affects Versions: 1.10.0 Reporter: Benoît Paris Attachments: flink-test-schema-update.zip The following fails when there is a Decimal type in the underlying TableSource: {code:java} DataStream appendStream = tEnv.toAppendStream( asTable, asTable.getSchema().toRowType() );{code} Yielding the following error: ValidationException: Type ROW<`y` DECIMAL(38, 18)> of table field 'payload' does not match with the physical type ROW<`y` LEGACY('DECIMAL', 'DECIMAL')> of the 'payload' field of the TableSource return type Remarks: * toAppendStream is not ready for the new type system, does not accept the new DataTypes * The LegacyTypeInformationType transition type hinders things. Replacing it with the new DataTypes.DECIMAL type makes things work. Workaround: reprocess TypeConversions.fromLegacyInfoToDataType's output to replace LegacyTypeInformationType types when they are of DECIMAL typeroot with the new types. Included is reproduction and workaround (activated by line 127) code, with java + pom + stacktrace files. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16447) Non serializable field on CompressWriterFactory
João Boto created FLINK-16447: - Summary: Non serializable field on CompressWriterFactory Key: FLINK-16447 URL: https://issues.apache.org/jira/browse/FLINK-16447 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.10.0 Reporter: João Boto CompressWriterFactory has a CompressionCodec that is not serializable.. this make that StreamingFileSink fails to with non serializable field. extending codec and implementing serializable solves the problem, but its odd -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16544) Flink FileSystem for web.uploadDir
Angel Barragán created FLINK-16544: -- Summary: Flink FileSystem for web.uploadDir Key: FLINK-16544 URL: https://issues.apache.org/jira/browse/FLINK-16544 Project: Flink Issue Type: Improvement Components: API / Core Affects Versions: 1.10.0 Reporter: Angel Barragán Currently the configuration properties "web.upload.dir" and "web.upload.dir" only supports paths on the local filesystem. When we deploy Flink under another cluster environment like yarn, it is more useful to be able to configure those directories to be on HDFS, so the size and maintenance tasks are easier, than trying to find out on which node yarn has launched the Jobmanager task, and manage the upload directory there. In my concrete case, I found this management (let's say disadvantage) creating an AWS EMR cluster with Flink, where the default configuration creates this directory under /tmp on the local filesystem of the CORE node where the JobManager is deployed by Yarn. We found that EMR cluster is also configured to fully empty /tmp on a month basis, removing the upload directory for Flink, and in that case makigng Flink to fail when you try to submit a new Job. We had to recreate the directory manually. The first solution I tried is to change the above configuration properties to use hdfs like we did with configuration property "state.checkpoints.dir", and we found it doesn't work on yarn environment. So I checked Flink code to see how this configuration is being used and found it is the local file system. I think, that this solution would be an improvement on the management for Flink when running on another Cluster environment where we can use a shared storage like HDFS or S3. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16580) flink-connector-kafka desrializer
李开青 created FLINK-16580: --- Summary: flink-connector-kafka desrializer Key: FLINK-16580 URL: https://issues.apache.org/jira/browse/FLINK-16580 Project: Flink Issue Type: Wish Components: Connectors / Kafka Affects Versions: 1.10.0 Reporter: 李开青 FlinkKafkaConsumer.setDeserializer(Properties props) Why is ByteArrayDeserializer.class mandatory? I found the flink sql conf "connector.properties.key.deserializer" will lose efficacy private static void setDeserializer(Properties props) { final String deSerName = ByteArrayDeserializer.class.getName(); Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); if (keyDeSer != null && !keyDeSer.equals(deSerName)) { LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); } if (valDeSer != null && !valDeSer.equals(deSerName)) { LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); } props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName); } -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16596) Support Enum-Values as part of a Key
Felix Wollschläger created FLINK-16596: -- Summary: Support Enum-Values as part of a Key Key: FLINK-16596 URL: https://issues.apache.org/jira/browse/FLINK-16596 Project: Flink Issue Type: Improvement Reporter: Felix Wollschläger See: FLINK-11774 h2. Description: The hashCode implementation of Enum-Values is guaranteed to be stable inside a JVM, but not accross multiple JVMs. This leads to failures when restoring from a checkpoint/savepoint containing Keyed-State on Keys with Enums as a part of the key. For users of Flink there is a workaround to solve this problem: Don't rely on the hashCode-Implementation of Enum-Values in the hashCode-Implementation of the actual key. Use the ordinal() or name().hashCode() instead of the Enum hashCode. h2. Goals of this improvement: Implement a way to handle Enum-Values, by either handling Enum-Values on a internal level or implementing a abstract Base KeySelector class that users of flink can choose if their key contains a Enum-Value. Code to reproduce a failure and first thoughts can be found in FLINK-11774 . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16651) flink stream SQL INNER JOIN乱序
王太阳 created FLINK-16651: --- Summary: flink stream SQL INNER JOIN乱序 Key: FLINK-16651 URL: https://issues.apache.org/jira/browse/FLINK-16651 Project: Flink Issue Type: Bug Reporter: 王太阳 flink1.9.1:在读取kafka的两个topic(单分区)的时候,注册为两个表并进行 INNER JOIN,数出的结果是乱序的 topic: test {code:java} a,b,c a,b,c1 a,b,c2{code} topic: test2 {code:java} a,b2,c2 a,b3,c3 a,b4,c4 a,b5,c5 a,b6,c6{code} FLINK SQL: {code:java} select * from test t inner join test2 t2 on t.a=t2.a {code} 第一次输出: {code:java} 1> (true,a,b,c,2020-03-18T09:40:11.858,a,b2,c2,2020-03-18T09:40:11.858) 1> (true,a,b,c2,2020-03-18T09:40:11.862,a,b2,c2,2020-03-18T09:40:11.862) 1> (true,a,b,c1,2020-03-18T09:40:11.862,a,b2,c2,2020-03-18T09:40:11.862) 1> (true,a,b,c,2020-03-18T09:40:11.862,a,b3,c3,2020-03-18T09:40:11.862) 1> (true,a,b,c2,2020-03-18T09:40:11.862,a,b3,c3,2020-03-18T09:40:11.862) 1> (true,a,b,c1,2020-03-18T09:40:11.862,a,b3,c3,2020-03-18T09:40:11.862) 1> (true,a,b,c,2020-03-18T09:40:11.863,a,b4,c4,2020-03-18T09:40:11.863) 1> (true,a,b,c2,2020-03-18T09:40:11.863,a,b4,c4,2020-03-18T09:40:11.863) 1> (true,a,b,c1,2020-03-18T09:40:11.863,a,b4,c4,2020-03-18T09:40:11.863) 1> (true,a,b,c,2020-03-18T09:40:11.863,a,b5,c5,2020-03-18T09:40:11.863) 1> (true,a,b,c2,2020-03-18T09:40:11.863,a,b5,c5,2020-03-18T09:40:11.863) 1> (true,a,b,c1,2020-03-18T09:40:11.863,a,b5,c5,2020-03-18T09:40:11.863) 1> (true,a,b,c,2020-03-18T09:40:11.864,a,b6,c6,2020-03-18T09:40:11.864) 1> (true,a,b,c2,2020-03-18T09:40:11.864,a,b6,c6,2020-03-18T09:40:11.864) 1> (true,a,b,c1,2020-03-18T09:40:11.864,a,b6,c6,2020-03-18T09:40:11.864){code} 第二次输出: {code:java} 1> (true,a,b,c,2020-03-18T09:42:36.168,a,b2,c2,2020-03-18T09:42:36.169) 1> (true,a,b,c,2020-03-18T09:42:36.171,a,b4,c4,2020-03-18T09:42:36.171) 1> (true,a,b,c,2020-03-18T09:42:36.171,a,b3,c3,2020-03-18T09:42:36.171) 1> (true,a,b,c1,2020-03-18T09:42:36.171,a,b2,c2,2020-03-18T09:42:36.171) 1> (true,a,b,c1,2020-03-18T09:42:36.171,a,b4,c4,2020-03-18T09:42:36.171) 1> (true,a,b,c1,2020-03-18T09:42:36.171,a,b3,c3,2020-03-18T09:42:36.171) 1> (true,a,b,c2,2020-03-18T09:42:36.172,a,b2,c2,2020-03-18T09:42:36.172) 1> (true,a,b,c2,2020-03-18T09:42:36.172,a,b4,c4,2020-03-18T09:42:36.172) 1> (true,a,b,c2,2020-03-18T09:42:36.172,a,b3,c3,2020-03-18T09:42:36.172) 1> (true,a,b,c,2020-03-18T09:42:36.188,a,b5,c5,2020-03-18T09:42:36.188) 1> (true,a,b,c2,2020-03-18T09:42:36.188,a,b5,c5,2020-03-18T09:42:36.188) 1> (true,a,b,c1,2020-03-18T09:42:36.188,a,b5,c5,2020-03-18T09:42:36.188) 1> (true,a,b,c,2020-03-18T09:42:36.188,a,b6,c6,2020-03-18T09:42:36.188) 1> (true,a,b,c2,2020-03-18T09:42:36.188,a,b6,c6,2020-03-18T09:42:36.188) 1> (true,a,b,c1,2020-03-18T09:42:36.188,a,b6,c6,2020-03-18T09:42:36.188) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16846) Add docker images with python
Ismaël Mejía created FLINK-16846: Summary: Add docker images with python Key: FLINK-16846 URL: https://issues.apache.org/jira/browse/FLINK-16846 Project: Flink Issue Type: Improvement Components: Release System / Docker Reporter: Ismaël Mejía We do not include python currently in the docker images. This issue is to include it or create derived python specific images. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16907) did not react to cancelling signal for 30 seconds, but is stuck in method:
辛羿彤 created FLINK-16907: --- Summary: did not react to cancelling signal for 30 seconds, but is stuck in method: Key: FLINK-16907 URL: https://issues.apache.org/jira/browse/FLINK-16907 Project: Flink Issue Type: Task Components: API / DataStream Affects Versions: 1.9.1 Environment: Hadoop version: 3.1.1.3.1.0.0-78 flink-1.9.1 hdp-3.1.0.0-78-kafka Reporter: 辛羿彤 接入kafka数据时在map方法卡住无响应 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17259) Have scala 2.12 support
João Boto created FLINK-17259: - Summary: Have scala 2.12 support Key: FLINK-17259 URL: https://issues.apache.org/jira/browse/FLINK-17259 Project: Flink Issue Type: Improvement Components: Stateful Functions Affects Versions: 2.0.0 Reporter: João Boto In statefun-flink is defined the scala.binary.version as 2.11 this force to use this the use of scala 2.11 should be the default 2.12? or have the option to chose the scala version -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17324) Move the image we use to generate the flink-docker image into flink-docker
Ismaël Mejía created FLINK-17324: Summary: Move the image we use to generate the flink-docker image into flink-docker Key: FLINK-17324 URL: https://issues.apache.org/jira/browse/FLINK-17324 Project: Flink Issue Type: Improvement Components: Release System / Docker Reporter: Ismaël Mejía Before the docker official image was repatriated into Apache Flink we used a docker image that contained the scripts to generate the release. {{docker run --rm \}} {{ --volume ~/projects/docker-flink:/build \}} {{ plucas/docker-flink-build \}} {{ /build/generate-stackbrew-library.sh > ~/projects/official-images /library/flink}} Notice that this docker image tool 'plucas/docker-flink-build' is not part of upstream Flink so we need to move it there into some sort of tools section in the flink-docker repo or document an alternative to it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14173) ANSI-style JOIN with Temporal Table Function fails
Benoît Paris created FLINK-14173: Summary: ANSI-style JOIN with Temporal Table Function fails Key: FLINK-14173 URL: https://issues.apache.org/jira/browse/FLINK-14173 Project: Flink Issue Type: Bug Components: Table SQL / Legacy Planner, Table SQL / Planner Affects Versions: 1.9.0 Environment: Java 1.8, Scala 2.11, Flink 1.9 (pom.xml file attached) Reporter: Benoît Paris Attachments: flink-test-temporal-tables-1.9.zip The planner fails to generate a plan for ANSI-style joins with Temporal Table Functions. The Blink planners throws with a "Missing conversion is LogicalTableFunctionScan[convention: NONE -> LOGICAL]" message (and some very fancy graphviz stuff). The old planner does a "This exception indicates that the query uses an unsupported SQL feature." This fails: {code:java} SELECT o_amount * r_amount AS amount FROM Orders JOIN LATERAL TABLE (Rates(o_proctime)) ON r_currency = o_currency {code} This works: {code:java} SELECT o_amount * r_amount AS amount FROM Orders , LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency{code} Reproduction with the attached Java and pom.xml files. Also included: stack traces for both Blink and the old planner. I think this is a regression. I remember using ANSI-style joins with a temporal table function successfully in 1.8. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14200) Temporal Table Function Joins do not work on Tables (only TableSources) on the query side
Benoît Paris created FLINK-14200: Summary: Temporal Table Function Joins do not work on Tables (only TableSources) on the query side Key: FLINK-14200 URL: https://issues.apache.org/jira/browse/FLINK-14200 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.0 Environment: Java 8, Scala 2.11, Flink 1.9 Reporter: Benoît Paris Attachments: temporal-table-function-query-side-as-not-table-source.zip This only affects the Blink planner. The legacy planner works fine. With Orders as a TableSource, and Orders2 as a Table with the same content: {code:java} tEnv.registerTableSource("Orders", new FooSource(new String[] {"o_currency", "o_amount", "o_proctime"})); Table orders2 = tEnv.sqlQuery("SELECT * FROM Orders"); tEnv.registerTable("Orders2", orders2);{code} This works (TableSource on the query side): {code:java} SELECT o_amount * r_amount AS amount FROM Orders , LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency{code} While this does not (Table on the query side): {code:java} SELECT o_amount * r_amount AS amount FROM Orders2 , LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency{code} Throwing an NPE in FlinkRelBuilder, called from LogicalCorrelateToJoinFromTemporalTableFunctionRule. Attached is Java code for reproduction, along with the full log and stacktrace, and a pom.xml. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14297) Temporal Table Function Build Side does not accept a constant key
Benoît Paris created FLINK-14297: Summary: Temporal Table Function Build Side does not accept a constant key Key: FLINK-14297 URL: https://issues.apache.org/jira/browse/FLINK-14297 Project: Flink Issue Type: Bug Components: Table SQL / Legacy Planner, Table SQL / Planner Affects Versions: 1.9.0 Environment: Java 1.8, Scala 2.11, Flink 1.9 (pom.xml file attached) Reporter: Benoît Paris Attachments: flink-test-temporal-constant-key-build-side.zip When defining a table that will be used as the build side on a Temporal Table Function, a constant key will not be accepted: In: {code:java} Table ratesHistory = tEnv.sqlQuery(sql); TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); {code} This crashes: {code:java} SELECT 'Eur1' AS r_currency, r_amount, r_proctime FROM RatesHistory{code} Making a type verification in Calcite fail: RelOptUtil.verifyTypeEquivalence, when trying to join the Lateral Table Function. It seems like this is corner case in nullability, the error is: {code:java} (Blink) Apply rule [LogicalCorrelateToJoinFromTemporalTableFunctionRule] [...] (old planner) Apply rule [LogicalCorrelateToTemporalTableJoinRule] [...] Exception in thread "main" java.lang.AssertionError: Cannot add expression of different type to set: set type is RecordType( [...] VARCHAR(65536) CHARACTER SET "UTF-16LE" r_currency, [...]) NOT NULL expression type is RecordType( [...] CHAR(4)CHARACTER SET "UTF-16LE" NOT NULL r_currency, [...]) NOT NULL{code} (formatting and commenting mine) No problem in VARCHAR vs CHAR, as using the following works: {code:java} SELECT COALESCE('Eur1', r_currency) AS r_currency, r_amount, r_proctime FROM RatesHistory{code} The problem is coming from NULL vs NOT NULL Attached is Java reproduction code, pom.xml, and both blink and old planner logs and stacktraces. My speculations on this is that an earlier transformation infers and normalizes the key type (or maybe gets it from the query side?), but the decorrelation and special temporal table function case happens later. Reordering the rules could help? Maybe way too heavy handed. Or do the [rexBuilder.makeInputRef|[https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala#L145]] in a type-compatible way. This seems to be related to another issue: https://issues.apache.org/jira/browse/FLINK-14173 Where careful support of the the nullability of the build side key in a LEFT JOIN will take part in the output. This might seem like a useless use case, but a constant key is the only way to access in SQL a Temporal Table Function for a global value (like querying the global current number of users) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14617) Dataset Parquet ClassCastException for SpecificRecord
Dominik Wosiński created FLINK-14617: Summary: Dataset Parquet ClassCastException for SpecificRecord Key: FLINK-14617 URL: https://issues.apache.org/jira/browse/FLINK-14617 Project: Flink Issue Type: Bug Affects Versions: 1.8.0 Environment: {code:java} {code} Reporter: Dominik Wosiński The following code runs smoothly when the _executionEnvironment_ is instance of _StreamExecutionEnvironment_: {code:java} val job = Job.getInstance() AvroReadSupport.setAvroDataSupplier(job.getConfiguration, classOf[AvroDataSupplierWithTimestampConversion]) val avroParquetInputFormat = new AvroParquetInputFormat[GpsPointDTO]() val hadoopInputFormat = new HadoopInputFormat[Void, GpsPointDTO](avroParquetInputFormat, classOf[Void], classOf[GpsPointDTO], job) FileInputFormat.addInputPaths(job, filePaths.head) executionEnvironment.createInput(hadoopInputFormat).map(_._2).print(){code} But when the _ExecutionEnvironment_ is used instead of _StreamExecutionEnviroment,_ then the code throws the: {code:java} Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class com.company.GpsPointDTO (org.apache.avro.generic.GenericData$Record and com.company.GpsPointDTO are in unnamed module of loader 'app'){code} I don't think this is the expected behavior. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14954) Consider providing an official OpenAPI specification of REST Monitoring API
Michaël Melchiore created FLINK-14954: - Summary: Consider providing an official OpenAPI specification of REST Monitoring API Key: FLINK-14954 URL: https://issues.apache.org/jira/browse/FLINK-14954 Project: Flink Issue Type: Improvement Components: Runtime / REST Reporter: Michaël Melchiore Hello, Flink provides a very helpful REST Monitoring API. OpenAPI is convenient standard to generate clients in a variety of language for REST API documented according to their specification. In this case, clients would be helpful to automate management of Flink clusters. Currently, there is no "official" OpenAPI specification of Flink REST Monitoring API. [Some|https://github.com/nextbreakpoint/flink-client] have written by users, but their consistency across Flink releases is uncertain. I think it would be beneficial to have an OpenAPI specification provided and maintained by the Flink project. Kind regards, -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15016) Remove unused dependency
César Soto Valero created FLINK-15016: - Summary: Remove unused dependency Key: FLINK-15016 URL: https://issues.apache.org/jira/browse/FLINK-15016 Project: Flink Issue Type: Improvement Components: Build System Reporter: César Soto Valero Dependency *commons-io:commons-io* is declared in module *flink-core*. However, this dependency is not used and, therefore, should be removed to make the pom clearer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15357) schema created by JsonRowSchemaConverter are not suitable for TableEnv.sqlQuery table schema
巫旭阳 created FLINK-15357: --- Summary: schema created by JsonRowSchemaConverter are not suitable for TableEnv.sqlQuery table schema Key: FLINK-15357 URL: https://issues.apache.org/jira/browse/FLINK-15357 Project: Flink Issue Type: Bug Components: Connectors / Common, Table SQL / API Affects Versions: 1.9.1 Environment: You can reappear the bug by the following code String sql = "SELECT count(*) as cnt, age, TUMBLE_START(rowtime, INTERVAL '10' SECOND) as tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), age"; StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tenv = StreamTableEnvironment.create(senv); DataStream source = senv.addSource(new SourceFunction() { @Override public void run(SourceContext sourceContext) throws Exception { int i = 1000; String[] names = \{"Hanmeimei", "Lilei"}; while (i > 1) { sourceContext.collect(new User(names[i%2], i, new Timestamp(System.currentTimeMillis(; Thread.sleep(10); i--; } } @Override public void cancel() { } }); tenv.registerDataStream("abc", source, "name, age, timestamp, rowtime.rowtime"); Table table = tenv.sqlQuery(sql); List hosts = Arrays.asList(new Host("10.20.128.210", 19201, "http")); TypeInformation typeInformation = JsonRowSchemaConverter.convert("{" + " type:'object'," + " properties:{" + " cnt: {" + " type: 'number'" + " }," + " tumTime:{" + " type:'string'," + " format:'date-time'" + " }" + " }" + "}"); RowTypeInfo typeInfo = (RowTypeInfo) typeInformation; TypeInformation[] typeInformations = typeInfo.getFieldTypes(); String[] fieldNames = typeInfo.getFieldNames(); TableSchema.Builder builder = TableSchema.builder(); for (int i = 0; i < typeInformations.length; i ++) { builder.field(fieldNames[i], typeInformations[i]); } Elasticsearch6UpsertTableSink establesink = new Elasticsearch6UpsertTableSink( true, builder.build(), hosts, "aggregation", "data", "$", "n/a", new JsonRowSerializationSchema.Builder(typeInformation).build(), XContentType.JSON, new IgnoringFailureHandler(), new HashMap<>() ); tenv.registerTableSink("aggregationTableSink", establesink); table.insertInto("aggregationTableSink"); } @Data @AllArgsConstructor @NoArgsConstructor public static class User { private String name; private Integer age; private Timestamp timestamp; } Reporter: 巫旭阳 Fix For: 1.9.2, 1.10.0 Use JsonRowSchemaConverter.convert(jsonString) create schema TypeInfo area only sport bigdecimal DataType of number , but the Table created by usingTableEnvironmentImpl.sqlQuer(sqlString) may has a lot of number DataTypes such as Long, Integer。 when program run it will throw an exception like below: {color:#FF}Field types of query result and registered TableSink [XXX] do not match.{color} {color:#FF}Query result schema: [cnt: Long, tumTime: Timestamp]{color} {color:#FF}TableSink schema: [cnt: BigDecimal, tumTime: Timestamp]{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15438) Counter metrics are incorrectly reported as total counts to DataDog
Jörn Kottmann created FLINK-15438: - Summary: Counter metrics are incorrectly reported as total counts to DataDog Key: FLINK-15438 URL: https://issues.apache.org/jira/browse/FLINK-15438 Project: Flink Issue Type: Bug Components: Runtime / Metrics Affects Versions: 1.9.1 Reporter: Jörn Kottmann The Flink semantics of a counter are not matching with the counters in DataDog. In Flink a counter counts the total of increment and decrement calls. In DataDog a counter is a rate over the reporting interval. The Flink implementation of the DataDog reporter seems to send the Flink counter value each time the metrics are reported. Correct would be to send the delta of the counter since the last report. There are some features in DataDog which are easier to use if this could be fixed, e.g. alerts based on counters. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15584) Give nested data type of ROWs in ValidationException
Benoît Paris created FLINK-15584: Summary: Give nested data type of ROWs in ValidationException Key: FLINK-15584 URL: https://issues.apache.org/jira/browse/FLINK-15584 Project: Flink Issue Type: Improvement Reporter: Benoît Paris In {code:java} INSERT INTO baz_sinkSELECT a, ROW(b, c) FROM foo_source{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15634) disableAutoGeneratedUIDs fails with coGroup and join
Jürgen Kreileder created FLINK-15634: Summary: disableAutoGeneratedUIDs fails with coGroup and join Key: FLINK-15634 URL: https://issues.apache.org/jira/browse/FLINK-15634 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.10.0 Reporter: Jürgen Kreileder coGroup/join seems to generate two Map operators for which you can't set the UID. Here's a test case: {code:java} @Test public void testDisablingAutoUidsWorksWithCoGroup() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableAutoGeneratedUIDs(); env .addSource(new NoOpSourceFunction()).setUidHash("") .coGroup(env.addSource(new NoOpSourceFunction()).setUidHash("")) .where(o -> o).equalTo(o -> o) .window(TumblingEventTimeWindows.of(Time.days(1))) .with(new CoGroupFunction() { @Override public void coGroup(Iterable first, Iterable second, Collector out) throws Exception { } }).setUidHash("") .addSink(new DiscardingSink<>()).setUidHash(""); env.execute(); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15726) Fixing error message in StreamExecTableSourceScan
Benoît Paris created FLINK-15726: Summary: Fixing error message in StreamExecTableSourceScan Key: FLINK-15726 URL: https://issues.apache.org/jira/browse/FLINK-15726 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.9.1, 1.10.0, 1.11.0 Reporter: Benoît Paris The error message in translateToPlanInternal does not provide good information. [A hotfix |[https://github.com/apache/flink/commit/02b676e9169b9879d406e79c8cbe4fcf6b33afa1#diff-ed386bd5b2f8bc873a24413ff1d82562]] in the legacy planner corrected it but was not applied to the Blink planner. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15775) SourceFunctions are instanciated twice when pulled on from 2 Sinks
Benoît Paris created FLINK-15775: Summary: SourceFunctions are instanciated twice when pulled on from 2 Sinks Key: FLINK-15775 URL: https://issues.apache.org/jira/browse/FLINK-15775 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.1, 1.10.0 Reporter: Benoît Paris Attachments: flink-test-duplicated-sources.zip When pulled on by two sinks, the SourceFunctions of a TableSource will get instantiated twice; (and subsequently opened by the parallelism number, which is expected behavior): The following will instantiate the FooTableSource's SourceFunction once (OK behavior, but not the processing we want): {code:java} tEnv.registerTableSource("foo_table", new FooTableSource()); Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0"); tEnv.registerTableSink("syso_sink_0", new SysoSink()); out0.insertInto("syso_sink_0"); {code} This will instantiate the FooTableSource's SourceFunction twice (Not OK, as we're missing half the inputs in each SysoSink): {code:java} tEnv.registerTableSource("foo_table", new FooTableSource()); Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0"); Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1"); tEnv.registerTableSink("syso_sink_0", new SysoSink()); tEnv.registerTableSink("syso_sink_1", new SysoSink()); out0.insertInto("syso_sink_0"); out1.insertInto("syso_sink_1"); {code} This might not be a problem for Kafka's SourceFunctions, as we can always reread from a log; but it is a data loss problem when the source data can't be reproduced. Actually, this might be me not understanding the API. Is there a way to make the runtime read from the same opened SourceFunctions? Attached is Java code that logs the faulty opening of the SourceFunctions, pom.xml, and logical execution plans for the duplicated case, and the workaround. Workaround: make a conversion to an appendStream. Somehow this makes the planner think it has to put a materialization barrier after the Source and read from that: {code:java} tEnv.registerTableSource("foo_table_source", new FooTableSource()); Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source"); Table appendingSourceTable = tEnv.fromDataStream( tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new TypeInformation[]{Types.LONG()})) ); tEnv.registerTable("foo_table", appendingSourceTable);{code} Best Regards, Ben -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15808) Upgrade Parquet to version 1.11.0
Ismaël Mejía created FLINK-15808: Summary: Upgrade Parquet to version 1.11.0 Key: FLINK-15808 URL: https://issues.apache.org/jira/browse/FLINK-15808 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: Ismaël Mejía -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15841) TimeWindow.intersects return true for consecutive windows
Jörn Kottmann created FLINK-15841: - Summary: TimeWindow.intersects return true for consecutive windows Key: FLINK-15841 URL: https://issues.apache.org/jira/browse/FLINK-15841 Project: Flink Issue Type: Bug Components: API / DataStream Reporter: Jörn Kottmann The TimeWindow JavaDoc explains that the start index is inclusive and the end index is exclusive, therefore two windows T0 to T1 and T1 to T2 are next to each other without overlapping. To fix this the intersects comparison should be changed to: {{this.start < other.end && this.end > other.start}} Also a test should be added to verify the methods works correctly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15844) Removal of JobWithJars.buildUserCodeClassLoader method without Configuration breaks backwards compatibility
Ismaël Mejía created FLINK-15844: Summary: Removal of JobWithJars.buildUserCodeClassLoader method without Configuration breaks backwards compatibility Key: FLINK-15844 URL: https://issues.apache.org/jira/browse/FLINK-15844 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.9.2 Reporter: Ismaël Mejía The removal of the method of the `JobWithJars.buildUserCodeClassLoader` is not backwards compatible with precedent versions of Flink 1.9.x I was trying to upgrade to the just released version on Apache Beam and it broke, so I a dependency analysis and found this: https://output.jsbin.com/zudemis/3#Source_Removed -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15867) LAST_VALUE aggregate function does not support time-related types
Benoît Paris created FLINK-15867: Summary: LAST_VALUE aggregate function does not support time-related types Key: FLINK-15867 URL: https://issues.apache.org/jira/browse/FLINK-15867 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.2, 1.10.0 Reporter: Benoît Paris Attachments: flink-test-lastvalue-timestamp.zip The following fails: {code:java} LAST_VALUE(TIMESTAMP '2020-02-03 16:17:20') LAST_VALUE(DATE '2020-02-03') LAST_VALUE(TIME '16:17:20') LAST_VALUE(NOW()){code} But this works: {code:java} LAST_VALUE(UNIX_TIMESTAMP()) {code} Leading me to say it might be more a type/format issue, rather than an actual time processing issue. Attached is java + pom + full stacktrace, for reproduction. Stacktrace part is below. The ByteLastValueAggFunction, etc types seem trivial to implement, but the in the createLastValueAggFunction only basic types seem to be dealt with. Is there a reason more complicated LogicalTypeRoots might not be implemented ? (old vs new types?) Caused by: org.apache.flink.table.api.TableException: LAST_VALUE aggregate function does not support type: ''TIMESTAMP_WITHOUT_TIME_ZONE''.Caused by: org.apache.flink.table.api.TableException: LAST_VALUE aggregate function does not support type: ''TIMESTAMP_WITHOUT_TIME_ZONE''.Please re-check the data type. at org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createLastValueAggFunction(AggFunctionFactory.scala:617) at org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:113) at org.apache.flink.table.planner.plan.utils.AggregateUtil$$anonfun$9.apply(AggregateUtil.scala:285) at org.apache.flink.table.planner.plan.utils.AggregateUtil$$anonfun$9.apply(AggregateUtil.scala:279) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:279) at org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:228) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.(StreamExecGroupAggregate.scala:72) at org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecGroupAggregateRule.convert(StreamExecGroupAggregateRule.scala:68) at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:139) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:328) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-9557) FlinkTypeFactory should support BigInteger type
Dominik Wosiński created FLINK-9557: --- Summary: FlinkTypeFactory should support BigInteger type Key: FLINK-9557 URL: https://issues.apache.org/jira/browse/FLINK-9557 Project: Flink Issue Type: Bug Affects Versions: 1.5.0 Reporter: Dominik Wosiński Currently, `FlinkTypeFactory` method `typeInfoToSqlTypeName` does not support BigInteger, since this is default type returned by `JsonSchemaConverter` for all fields with type: `number` this can create issues. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9570) SQL Client merging environments uses AbstractMap
Dominik Wosiński created FLINK-9570: --- Summary: SQL Client merging environments uses AbstractMap Key: FLINK-9570 URL: https://issues.apache.org/jira/browse/FLINK-9570 Project: Flink Issue Type: Bug Affects Versions: 1.5.0 Reporter: Dominik Wosiński Currently _Environment.merge()_ function looks like below: {code:java} final Environment mergedEnv = new Environment(); // merge tables final Map tables = new HashMap<>(env1.getTables()); mergedEnv.getTables().putAll(env2.getTables()); mergedEnv.tables = tables; {code} and no-arg constructor for _Environment_ defaults tables to _Collections.emptyMap()._ This basically results in calling _putAll_ on _EmptyMap_ which defaults to _AbstractMap_ which always throws _UnsuppoertedOperationException._ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9575) Potential race condition when removing JobGraph in HA
Dominik Wosiński created FLINK-9575: --- Summary: Potential race condition when removing JobGraph in HA Key: FLINK-9575 URL: https://issues.apache.org/jira/browse/FLINK-9575 Project: Flink Issue Type: Bug Reporter: Dominik Wosiński When we are removing the _JobGraph_ from _JobManager_ for example after invoking _cancel()_, the following code is executed : {noformat} val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val result = if (removeJobFromStateBackend) { val futureOption = Some(future { try { // ...otherwise, we can have lingering resources when there is a concurrent shutdown // and the ZooKeeper client is closed. Not removing the job immediately allow the // shutdown to release all resources. submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t) } }(context.dispatcher)) try { archive ! decorateMessage( ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result case None => None } // remove all job-related BLOBs from local and HA store libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption } {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9615) Add
Dominik Wosiński created FLINK-9615: --- Summary: Add Key: FLINK-9615 URL: https://issues.apache.org/jira/browse/FLINK-9615 Project: Flink Issue Type: Improvement Reporter: Dominik Wosiński AFAIK, there is currently no possibility to use Kafka or other connectors as a sink in SQL Client. Such feature would be good for prototyping or quick streams manipulation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9627) Extending
Dominik Wosiński created FLINK-9627: --- Summary: Extending Key: FLINK-9627 URL: https://issues.apache.org/jira/browse/FLINK-9627 Project: Flink Issue Type: Bug Reporter: Dominik Wosiński -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9772) Documentation of Hadoop API outdated
Lorenz Bühmann created FLINK-9772: - Summary: Documentation of Hadoop API outdated Key: FLINK-9772 URL: https://issues.apache.org/jira/browse/FLINK-9772 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.5.0 Reporter: Lorenz Bühmann It looks like the documentation of the [Hadoop Compatibility|https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/hadoop_compatibility.html] is somewhat outdated? At least the text and examples in section [Using Hadoop InputFormats|https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/hadoop_compatibility.html#using-hadoop-inputformats] mention methods {{env.readHadoopFile}} and {{env.createHadoopInput}} which do not exist anymore since 1.4.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9778) Remove SlotRequest timeout
陈梓立 created FLINK-9778: -- Summary: Remove SlotRequest timeout Key: FLINK-9778 URL: https://issues.apache.org/jira/browse/FLINK-9778 Project: Flink Issue Type: Improvement Components: JobManager, ResourceManager, TaskManager Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.5.1 Now when SlotPool(JobMaster) requestSlotsFromResourceManager, it checks timeout, if RM does not response in 5 minutes, JM fails the request and re-request it. It does little good and cause flink request resource less exactly. I would propose remove this timeout mechanism, that is, a SlotRequest does no more timeout. And our current failure tolerant mechanism would handle SlotRequest exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9779) Remove SlotRequest timeout
陈梓立 created FLINK-9779: -- Summary: Remove SlotRequest timeout Key: FLINK-9779 URL: https://issues.apache.org/jira/browse/FLINK-9779 Project: Flink Issue Type: Improvement Components: JobManager, ResourceManager, TaskManager Reporter: 陈梓立 As is involved in FLINK-8643 and FLINK-8653, we use external timeout to replace internal timeout of slot request. Follow the question: why not entirely remove this timeout mechanism? In our industrial case, this timeout mechanism causes more no-needed fail and makes resource allocation inaccurate. I would propose to get rid of slot request timeout. Instead, we handle TM fail in RM where properly cancel pending request and if TM cannot offer slot to JM, we introduce a blacklist mechanism to nudge RM realloc for pending request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9813) Build xTableSource from Avro schemas
François Lacombe created FLINK-9813: --- Summary: Build xTableSource from Avro schemas Key: FLINK-9813 URL: https://issues.apache.org/jira/browse/FLINK-9813 Project: Flink Issue Type: Wish Components: Table API & SQL Affects Versions: 1.5.0 Reporter: François Lacombe As Avro provide efficient data schemas formalism, it may be great to be able to build Flink Tables Sources with such files. More info about Avro schemas :[https://avro.apache.org/docs/1.8.1/spec.html#schemas] For instance, with CsvTableSource : Parser schemaParser = new Schema.Parser(); Schema tableSchema = schemaParser.parse("avro.json"); Builder bld = CsvTableSource.builder().schema(tableSchema); This would give me a fully available CsvTableSource with columns defined in avro.json It may be possible to do so for every TableSources since avro format is really common and versatile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9814) CsvTableSource lack of column warning
François Lacombe created FLINK-9814: --- Summary: CsvTableSource lack of column warning Key: FLINK-9814 URL: https://issues.apache.org/jira/browse/FLINK-9814 Project: Flink Issue Type: Wish Components: Table API & SQL Affects Versions: 1.5.0 Reporter: François Lacombe The CsvTableSource class is built by defining expected columns to be find in the corresponding csv file. It would be great to throw an Exception when the csv file doesn't have the same structure as defined in the source. It can be easilly checked with file header if it exists. Is this possible ? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9859) Distinguish TM akka config with JM config
陈梓立 created FLINK-9859: -- Summary: Distinguish TM akka config with JM config Key: FLINK-9859 URL: https://issues.apache.org/jira/browse/FLINK-9859 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 1.5.1 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.5.2 ... increase the number of akka threads on JM, to improve its performance; decrease the number of akka threads on TM, to save resource. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9869) Send PartitionInfo in batch to Improve perfornance
陈梓立 created FLINK-9869: -- Summary: Send PartitionInfo in batch to Improve perfornance Key: FLINK-9869 URL: https://issues.apache.org/jira/browse/FLINK-9869 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 1.5.1 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.5.2 ... current we send partition info as soon as one arrive. we could `cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9875) Add concurrent creation of execution job vertex
陈梓立 created FLINK-9875: -- Summary: Add concurrent creation of execution job vertex Key: FLINK-9875 URL: https://issues.apache.org/jira/browse/FLINK-9875 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 1.5.1 Reporter: 陈梓立 Assignee: 陈梓立 in some case like inputformat vertex, creation of execution job vertex is time consuming, this pr add concurrent creation of execution job vertex to accelerate it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9894) Potential Data Race
陈梓立 created FLINK-9894: -- Summary: Potential Data Race Key: FLINK-9894 URL: https://issues.apache.org/jira/browse/FLINK-9894 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 1.5.1 Reporter: 陈梓立 Assignee: 陈梓立 CoLocationGroup#ensureConstraints -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10038) Parallel the creation of InputSplit if necessary
陈梓立 created FLINK-10038: --- Summary: Parallel the creation of InputSplit if necessary Key: FLINK-10038 URL: https://issues.apache.org/jira/browse/FLINK-10038 Project: Flink Issue Type: Improvement Affects Versions: 1.5.0 Reporter: 陈梓立 As a continue to the discussion in the PR about parallelize the creation of ExecutionJobVertex [here|https://github.com/apache/flink/pull/6353]. [~StephanEwen] suggested that we could parallelize the creation of InputSplit, from which we gain performance improvements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10056) Add testRequestNextInputSplit
陈梓立 created FLINK-10056: --- Summary: Add testRequestNextInputSplit Key: FLINK-10056 URL: https://issues.apache.org/jira/browse/FLINK-10056 Project: Flink Issue Type: Improvement Components: JobManager, Tests Affects Versions: 1.5.0 Reporter: 陈梓立 Assignee: 陈梓立 Add testRequestNextInputSplit to make sure JobMaster#testRequestNextInputSplit works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10099) Rework YarnResourceManagerTest
陈梓立 created FLINK-10099: --- Summary: Rework YarnResourceManagerTest Key: FLINK-10099 URL: https://issues.apache.org/jira/browse/FLINK-10099 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.6.0 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.6.0 To avoid OOM and gather mock stuff for replacing them once we could. also structure the test. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10256) Rework JobManagerFailsITCase and JobManagerTest into JobMasterITCase and JobMasterHAITCase
陈梓立 created FLINK-10256: --- Summary: Rework JobManagerFailsITCase and JobManagerTest into JobMasterITCase and JobMasterHAITCase Key: FLINK-10256 URL: https://issues.apache.org/jira/browse/FLINK-10256 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.7.0 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.7.0 I am planning to rework JobManagerFailsITCase and JobManagerTest into JobMasterITCase and JobMasterHAITCase. That is, reorganize the legacy tests, make them neat and cover cases explicitly. The PR would follow before this weekend. While reworking, I'd like to add more jm failover test cases list below, for the further implement of jm failover with RECONCILING state. For "jm failover", I mean a real world failover(like low power or process exit), without calling Flink internal postStop logic or something like it. 1. Streaming task with jm failover. 2. Streaming task with jm failover concurrent to task fail. 3. Batch task with jm failover. 4. Batch task with jm failover concurrent to task fail. 5. Batch task with jm failover when some vertex has already been FINISHED. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10275) StreamTask support object reuse
陈梓立 created FLINK-10275: --- Summary: StreamTask support object reuse Key: FLINK-10275 URL: https://issues.apache.org/jira/browse/FLINK-10275 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 1.7.0 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.7.0 StreamTask support efficient object reuse. The purpose behind this is to reduce pressure on the garbage collector. All objects are reused, without backup copies. The operators and UDFs must be careful to not keep any objects as state or not to modify the objects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10302) Mark legacy(non-flip 6) code as Deprecated
陈梓立 created FLINK-10302: --- Summary: Mark legacy(non-flip 6) code as Deprecated Key: FLINK-10302 URL: https://issues.apache.org/jira/browse/FLINK-10302 Project: Flink Issue Type: Improvement Reporter: 陈梓立 There are several time I dash into some classes/methods, finding them weird from the FLIP-6 codebase and finally figure out that they are legacy codes. Currently we mix up legacy code with FLIP-6 code in same place(i.e. some package), new contributor might casually lost into such code and result in works in vain. With [FLINK-4319] closed we announced that FLIP-6 is production ready, and [~trohrm...@apache.org] comments on this [commits|https://github.com/apache/flink/commit/ddd6a99a95b56c52ea5b5153b7270b578f5479bc#commitcomment-30330739] shows that it is planned to remove legacy code. I'd prefer to marking all legacy class as Deprecated for now thus our contributors could recognize them quickly and do not ruin they work in vain. What do you think? cc [~Zentol] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10304) Remove deprecated AbstractYarnClusterDescriptor field
陈梓立 created FLINK-10304: --- Summary: Remove deprecated AbstractYarnClusterDescriptor field Key: FLINK-10304 URL: https://issues.apache.org/jira/browse/FLINK-10304 Project: Flink Issue Type: Improvement Components: Client, YARN Affects Versions: 1.7.0 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.7.0 Depend on [~trohrm...@apache.org]'s [commit|https://github.com/apache/flink/commit/6356128865bff7463bf03185d18b129ed3633bc2], {{AbstractYarnClusterDescriptor}} should not care whether it is in DETACHED mode. After digging I found the main usages of it are 1. {{FlinkYarnSessionCli#run}}, this can be resolved by checking whether {{allOptions}} has {{DETACHED_OPTION}} locally. 2. when AbstractYarnClusterDescriptor start a AM, it sets {{appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));}}. At this point it seems that YarnClusterDescriptor should know whether or not it is in detached mode. If usage 2 is irrelevant now, we can get rid of deprecated method in FLIP-6 codebase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10315) Let JDBCAppendTableSink be built with java.sql.Connection
François Lacombe created FLINK-10315: Summary: Let JDBCAppendTableSink be built with java.sql.Connection Key: FLINK-10315 URL: https://issues.apache.org/jira/browse/FLINK-10315 Project: Flink Issue Type: Improvement Components: Java API Environment: I'm currently using Flink 1.6.0 Java. Reporter: François Lacombe Currently, JDBCAppendTableSink is built with methods like setDBUrl, setUsername, setPassword... and so on. We can't use an existing Java SQL connection to build it. It may be great to add a setConnection() method to the builder class as to prevent sensitive data like username or password to transit through large stacks from config connectors (often in main()) to JDBC sinks. To be able to provide only one object is far lighter than 4 or 5 strings Thanks in advance -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10319) Avoid requestPartitionState from JM but always try retrigger
陈梓立 created FLINK-10319: --- Summary: Avoid requestPartitionState from JM but always try retrigger Key: FLINK-10319 URL: https://issues.apache.org/jira/browse/FLINK-10319 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.7.0 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.7.0 Do not requestPartitionState from JM on partition request fail, which may generate too many RPC requests and block JM. We gain little benefit to check what state producer is in, which in the other hand crash JM by too many RPC requests. Task could always retriggerPartitionRequest from its InputGate, it would be fail if the producer has gone and succeed if the producer alive. Anyway, no need to ask for JM for help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10320) Introduce JobMaster schedule micro-benchmark
陈梓立 created FLINK-10320: --- Summary: Introduce JobMaster schedule micro-benchmark Key: FLINK-10320 URL: https://issues.apache.org/jira/browse/FLINK-10320 Project: Flink Issue Type: Improvement Components: Tests Reporter: 陈梓立 Assignee: 陈梓立 Based on {{org.apache.flink.streaming.runtime.io.benchmark}} stuff and the repo [flink-benchmark|https://github.com/dataArtisans/flink-benchmarks], I proposal to introduce another micro-benchmark which focuses on {{JobMaster}} schedule performance h3. Target Benchmark how long from {{JobMaster}} startup(receive the {{JobGraph}} and init) to all tasks RUNNING. Technically we use bounded stream and TM finishes tasks as soon as they arrived. So the real interval we measure is to all tasks FINISHED. h3. Case 1. JobGraph that cover EAGER + PIPELINED edges 2. JobGraph that cover LAZY_FROM_SOURCES + PIPELINED edges 3. JobGraph that cover LAZY_FROM_SOURCES + BLOCKING edges ps: maybe benchmark if the source is get from {{InputSplit}}? h3. Implement Based on the flink-benchmark repo, we finally run benchmark using jmh. So the whole test suit is separated into two repos. The testing environment could be located in the main repo, maybe under flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/benchmark. To measure the performance of {{JobMaster}} scheduling, we need to simulate an environment that: 1. has a real {{JobMaster}} 2. has a mock/testing {{ResourceManager}} that having infinite resource and react immediately. 3. has a(many?) mock/testing {{TaskExecutor}} that deploy and finish tasks immediately. [~trohrm...@apache.org] [~GJL] [~pnowojski] could you please review this proposal to help clarify the goal and concrete details? Thanks in advance. Any suggestions are welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10349) Unify stopActor* utils
陈梓立 created FLINK-10349: --- Summary: Unify stopActor* utils Key: FLINK-10349 URL: https://issues.apache.org/jira/browse/FLINK-10349 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.7.0 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.7.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10378) Hide/Comment out contribute guide from PULL_REQUEST_TEMPLATE
陈梓立 created FLINK-10378: --- Summary: Hide/Comment out contribute guide from PULL_REQUEST_TEMPLATE Key: FLINK-10378 URL: https://issues.apache.org/jira/browse/FLINK-10378 Project: Flink Issue Type: Improvement Components: GitHub Reporter: 陈梓立 Assignee: 陈梓立 Explicitly comment out contribute guide from PULL_REQUEST_TEMPLATE by . This is a hint to contributor that such message is as information and would not appear at the final content, as a side effect also reduce the work the a contributor delete such text every time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10385) Implement a waitUntilCondition utils
陈梓立 created FLINK-10385: --- Summary: Implement a waitUntilCondition utils Key: FLINK-10385 URL: https://issues.apache.org/jira/browse/FLINK-10385 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.7.0 Reporter: 陈梓立 Fix For: 1.7.0 Recently when I refine some tests, I notice that it is a common requirement to wait until a (stable) condition occur. To achieve this, we have {{ExecutionGraphTestUtils#waitUntilJobStatus}} and many. Most of them can simply abstract as {code:java} public static void waitUntilCondition(SupplierWithException conditionSupplier, Deadline deadline) { while (deadline.hasTimeLeft()) { if (conditionSupplier.get()) { return; } Thread.sleep(Math.min(deadline.toMillis(), 500); } throws new IlleagalStateException("..."); } {code} I propose to implement such a method to avoid too many utils method scattered to achieve the same purpose. Looking forward to your advice. If there is previous code/project already implemented this, I am glad to introduce it. cc [~Zentol] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10386) Remove legacy class TaskExecutionStateListener
陈梓立 created FLINK-10386: --- Summary: Remove legacy class TaskExecutionStateListener Key: FLINK-10386 URL: https://issues.apache.org/jira/browse/FLINK-10386 Project: Flink Issue Type: Improvement Components: TaskManager Affects Versions: 1.7.0 Reporter: 陈梓立 Assignee: 陈梓立 Fix For: 1.7.0 After a discussion [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257] with [~trohrm...@apache.org]. I start to analyze the usage of {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}. In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any component rely on it. Instead, we introduce {{TaskManagerActions}} to take the role for the communication of {{Task}} with {{TaskManager}}. No one except {{TaskManager}} should directly communicate with {{Task}}. So it can be safely remove legacy class {{TaskExecutionStateListener}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10424) Inconsistency between JsonSchemaConveerter and FlinkTypeFactory
Dominik Wosiński created FLINK-10424: Summary: Inconsistency between JsonSchemaConveerter and FlinkTypeFactory Key: FLINK-10424 URL: https://issues.apache.org/jira/browse/FLINK-10424 Project: Flink Issue Type: Bug Affects Versions: 1.6.0 Reporter: Dominik Wosiński Assignee: Dominik Wosiński There is still an inconsistency between _JsonSchemaConverter_ and _FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. _JsonSchemaConverter_ will return BigInteger Type Information for _integer_, but _FlinkTypeFactory_ currently does not support BigInteger Type Info and thus an exception will be thrown. Two possible ways of solving this issue are possible: - allow using _BigInteger_ Type Info in _FlinkTypeFactory_ _-_ change _JsonSchemaConverter,_ so it returns Integer Type Info instead. IMHO, the changes should be made in _FlinkTypeFactory._ -- This message was sent by Atlassian JIRA (v7.6.3#76005)