[jira] [Created] (FLINK-13097) Buffer depletion in SimpleCollectingOutputView throws non-obvious EOFException

2019-07-04 Thread JIRA
Cyrille Chépélov created FLINK-13097:


 Summary: Buffer depletion in SimpleCollectingOutputView throws 
non-obvious EOFException 
 Key: FLINK-13097
 URL: https://issues.apache.org/jira/browse/FLINK-13097
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.8.0, 1.7.2
 Environment: SCIO + BEAM + Flink under scala 2.12, java 8

Reporter: Cyrille Chépélov


When SimpleCollectingOutputView is used, records are collected into a 
pre-allocated MemorySegmentSource. 

In case of depletion, the SimpleCollectingOutputView#nextSegment method throws 
EOFException without a message.

This can be non-obvious to diagnose as a newcomer, as 
* the Java SDK documentation strongly suggests EOFException is related to an 
inability to read further (whereas in this context, the exception materializes 
an inability to _write_ further)
* awareness than pre-allocated, fixed-size buffers are in play may not 
necessarily be expected of a newcomer to flink.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13536) Improve nullability handling in Types

2019-08-01 Thread JIRA
François Lacombe created FLINK-13536:


 Summary: Improve nullability handling in Types
 Key: FLINK-13536
 URL: https://issues.apache.org/jira/browse/FLINK-13536
 Project: Flink
  Issue Type: Improvement
  Components: API / Type Serialization System, Formats (JSON, Avro, 
Parquet, ORC, SequenceFile)
Affects Versions: 1.8.0
Reporter: François Lacombe


Currently, Avro to Flink type matching doesn't propagate nullability definition.

In Avro :
{code:java}
"type":["null","string"]{code}
allows Java String myField=null;

while
{code:java}
"type":"string"{code}
doesn't.

It may be good to find corresponding property in Flink types too as to check 
for nullability in JsonRowDeserializationSchema for instance (null or absent 
field in parsed JSON should only be possible on nullable fields)

 

Thanks in advance



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13538) Give field names in deserializers thrown exceptions

2019-08-01 Thread JIRA
François Lacombe created FLINK-13538:


 Summary: Give field names in deserializers thrown exceptions
 Key: FLINK-13538
 URL: https://issues.apache.org/jira/browse/FLINK-13538
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.8.0
Reporter: François Lacombe


Deserializers like JsonRowDeserializerSchema parse JSON strings according to a 
TypeInformation object.

Types mistakes can occur and it usually rise a IOException caused by a 
IllegalStateException. Here I try to parse "field":"blabla" described with 
Type.INT

 
{code:java}
java.io.IOException: Failed to deserialize JSON object.
    at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:97)
    at 
com.dcbrain.etl.inputformat.JsonInputFormat.nextRecord(JsonInputFormat.java:96)
    at 
com.dcbrain.etl.inputformat.JsonInputFormat.nextRecord(JsonInputFormat.java:1)
    at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Unsupported type information 
'Integer' for node: "blabla"
    at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convert(JsonRowDeserializationSchema.java:191)
    at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertRow(JsonRowDeserializationSchema.java:212)
    at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:95)
    ... 5 common frames omitted{code}
 

The message nor the exception objects contains reference to field causing this 
error which require time to inspect complex input data to find where the error 
really is.

Could it be possible to improve messages or even Exceptions objects thrown by 
Serializers/Deserializers to get which field is responsible of the error please?

JsonRowDeserializerSchema isn't the only one touched by such issues.

 

This will allow to produce more useful logs to be read by users or 
administrators.

 

All the best



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13586) Method ClosureCleaner.clean broke backward compatibility between 1.8.0 and 1.8.1

2019-08-05 Thread JIRA
Gaël Renoux created FLINK-13586:
---

 Summary: Method ClosureCleaner.clean broke backward compatibility 
between 1.8.0 and 1.8.1
 Key: FLINK-13586
 URL: https://issues.apache.org/jira/browse/FLINK-13586
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.8.1
Reporter: Gaël Renoux


Method clean in org.apache.flink.api.java.ClosureCleaner received a new 
parameter in Flink 1.8.1. This class is noted as internal, but is used in the 
Kafka connectors (in 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase).

The Kafka connectors library is not provided by the server, and must be set up 
as a dependency with compile scope (see 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#usage,
 or the Maven project template). Any project using those connectors and built 
with 1.8.0 cannot be deployed on a 1.8.1 Flink server, because it would target 
the old method.

=> This methods needs a fallback with the original two arguments (setting a 
default value of RECURSIVE for the level argument).



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13721) BroadcastState should support StateTTL

2019-08-14 Thread JIRA
Kerem Ulutaş created FLINK-13721:


 Summary: BroadcastState should support StateTTL
 Key: FLINK-13721
 URL: https://issues.apache.org/jira/browse/FLINK-13721
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream, Runtime / Queryable State
Affects Versions: 1.8.1
 Environment: MacOS 10.14.6 running IntelliJ Idea Ultimate 2019.2, 
Flink version 1.8.1
Reporter: Kerem Ulutaş
 Attachments: DebugBroadcastStateTTL.java, IntHolder.java, 
StringHolder.java, flink_broadcast_state_ttl_debug.log

Hi everyone,

Sorry if I'm doing anything wrong, this is my first issue in Apache Jira.

I have a use case which requires 2 data streams to be cross joined. To be 
exact, one stream is location updates from clients and the other stream is 
event data with location information. I'm trying to get events that happen 
within a certain radius of client location(s).

Since the streams can not be related to each other by using a common key for 
partitioning, I have to broadcast client updates to all tasks and evaluate the 
radius check for each event.

The requirement here is, if we don't receive any location updates from a client 
for a certain amount of time, we should consider the client is "gone" (similar 
to the rationale stated in FLINK-3089 description: 
https://issues.apache.org/jira/browse/FLINK-3089)

I have attached the sample application classes I used to debug BroadcastState 
and StateTTL together.

The output (see flink_broadcast_state_ttl_debug.log) shows that client "c0" got 
its first event at 17:08:07.67 (expected to be evicted sometime after 
17:08:37.xxx) but doesn't get evicted.

For the operator part (which is the result of 
BroadcastConnectedStream.process) - since context in 
onTimer method doesn't let user to change contents of the broadcast state, only 
way to deal with stale client data is as follows:
 * In processElement method, calculate result for client data which is newer 
than the ttl
 * In processBroadcastElement method, remove client data older than a certain 
amount of time from the broadcast state

If broadcast side of the connected streams doesn't get data for longer than the 
desired time-to-live amount of time, BroadcastState will hold stale data and 
processElement method would have to filter those client data each time. This is 
the method I am using in production right now.

I am not aware of any decision or limitation that makes it not possible to 
implement StateTTL for BroadcastState, I will be pleased if someone explains if 
there are any.

Thanks and regards.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13836) Improve support of java.util.UUID for JDBCTypeUtil

2019-08-23 Thread Jira
François Lacombe created FLINK-13836:


 Summary: Improve support of java.util.UUID for JDBCTypeUtil
 Key: FLINK-13836
 URL: https://issues.apache.org/jira/browse/FLINK-13836
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Affects Versions: 1.8.0
Reporter: François Lacombe


Currently, JDBCTypeUtil used by JDBCAppendTableSinkBuilder dones't support UUID 
types with java.util.UUID in Java.

Could it be possible to handle that as to allow to write UUID directly to 
postgresql please?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13925) ClassLoader in BlobLibraryCacheManager is not using context class loader

2019-08-30 Thread Jira
Jan Lukavský created FLINK-13925:


 Summary: ClassLoader in BlobLibraryCacheManager is not using 
context class loader
 Key: FLINK-13925
 URL: https://issues.apache.org/jira/browse/FLINK-13925
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.9.0, 1.8.1
Reporter: Jan Lukavský
 Fix For: 1.8.2, 1.9.1


Use thread's current context classloader as parent class loader of flink user 
code class loaders.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13945) Vendor-repos Maven profile doesn't exist in flink-shaded

2019-09-03 Thread Jira
Elise Ramé created FLINK-13945:
--

 Summary: Vendor-repos Maven profile doesn't exist in flink-shaded
 Key: FLINK-13945
 URL: https://issues.apache.org/jira/browse/FLINK-13945
 Project: Flink
  Issue Type: Bug
  Components: BuildSystem / Shaded
Affects Versions: shaded-8.0, shaded-7.0, shaded-9.0
Reporter: Elise Ramé


According to 
[documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.9/flinkDev/building.html#custom--vendor-specific-versions],
 to build Flink against a vendor specific Hadoop version it is necessary to 
build flink-shaded against this version first : 
{code:bash}
mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=
{code}
vendor-repos profile has to be activated to include Hadoop vendors repositories.
 But Maven cannot find expected Hadoop dependencies and returns an error 
because vendor-repos profile isn't defined in flink-shaded.

Example using flink-shaded 8.0 and HDP 2.6.5 Hadoop version :
{code:bash}
mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=2.7.3.2.6.5.0-292
{code}
{code:bash}
[INFO] ---< org.apache.flink:flink-shaded-hadoop-2 >---
[INFO] Building flink-shaded-hadoop-2 2.7.3.2.6.5.0-292-8.0 [10/11]
[INFO] [ jar ]-
[WARNING] The POM for org.apache.hadoop:hadoop-common:jar:2.7.3.2.6.5.0-292 is 
missing, no dependency information available
[WARNING] The POM for org.apache.hadoop:hadoop-hdfs:jar:2.7.3.2.6.5.0-292 is 
missing, no dependency information available
[WARNING] The POM for 
org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.7.3.2.6.5.0-292 is 
missing, no dependency information available
[WARNING] The POM for 
org.apache.hadoop:hadoop-yarn-client:jar:2.7.3.2.6.5.0-292 is missing, no 
dependency information available
[WARNING] The POM for 
org.apache.hadoop:hadoop-yarn-common:jar:2.7.3.2.6.5.0-292 is missing, no 
dependency information available
[INFO] 
[INFO] Reactor Summary:
[INFO]
[INFO] flink-shaded 8.0 ... SUCCESS [  2.122 s]
[INFO] flink-shaded-force-shading 8.0 . SUCCESS [  0.607 s]
[INFO] flink-shaded-asm-7 7.1-8.0 . SUCCESS [  0.667 s]
[INFO] flink-shaded-guava-18 18.0-8.0 . SUCCESS [  1.452 s]
[INFO] flink-shaded-netty-4 4.1.39.Final-8.0 .. SUCCESS [  4.597 s]
[INFO] flink-shaded-netty-tcnative-dynamic 2.0.25.Final-8.0 SUCCESS [  0.620 s]
[INFO] flink-shaded-jackson-parent 2.9.8-8.0 .. SUCCESS [  0.018 s]
[INFO] flink-shaded-jackson-2 2.9.8-8.0 ... SUCCESS [  0.914 s]
[INFO] flink-shaded-jackson-module-jsonSchema-2 2.9.8-8.0 . SUCCESS [  0.627 s]
[INFO] flink-shaded-hadoop-2 2.7.3.2.6.5.0-292-8.0  FAILURE [  0.047 s]
[INFO] flink-shaded-hadoop-2-uber 2.7.3.2.6.5.0-292-8.0 ... SKIPPED
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time:  11.947 s
[INFO] Finished at: 2019-09-03T16:52:59+02:00
[INFO] 
[WARNING] The requested profile "vendor-repos" could not be activated because 
it does not exist.
[ERROR] Failed to execute goal on project flink-shaded-hadoop-2: Could not 
resolve dependencies for project 
org.apache.flink:flink-shaded-hadoop-2:jar:2.7.3.2.6.5.0-292-8.0: The following 
artifacts could not be resolved: 
org.apache.hadoop:hadoop-common:jar:2.7.3.2.6.5.0-292, 
org.apache.hadoop:hadoop-hdfs:jar:2.7.3.2.6.5.0-292, 
org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.7.3.2.6.5.0-292, 
org.apache.hadoop:hadoop-yarn-client:jar:2.7.3.2.6.5.0-292, 
org.apache.hadoop:hadoop-yarn-common:jar:2.7.3.2.6.5.0-292: Failure to find 
org.apache.hadoop:hadoop-common:jar:2.7.3.2.6.5.0-292 in 
https://repo.maven.apache.org/maven2 was cached in the local repository, 
resolution will not be reattempted until the update interval of central has 
elapsed or updates are forced -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please 
read the following articles:
[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn  -rf :flink-shaded-hadoop-2
{code}
vendor-repos profile exists in Flink pom.xml file : 
[https://github.com/apache/flink/blob/3079d11913f153ec40c75afb5356fd3be1a1e550/pom.xml#L1037]



--
This messa

[jira] [Created] (FLINK-31935) The new resource requirements REST API is only available for session clusters

2023-04-25 Thread Jira
David Morávek created FLINK-31935:
-

 Summary: The new resource requirements REST API is only available 
for session clusters
 Key: FLINK-31935
 URL: https://issues.apache.org/jira/browse/FLINK-31935
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Affects Versions: 1.18.0
Reporter: David Morávek


We need to register both `JobResourceRequirementsHandler` and `
JobResourceRequirementsUpdateHandler` for application / per-job clusters as 
well.
 
These handlers have been introduced as part of FLINK-31316.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31954) Prevent Mockito for the new code with Checkstyle

2023-04-27 Thread Jira
David Morávek created FLINK-31954:
-

 Summary: Prevent Mockito for the new code with Checkstyle
 Key: FLINK-31954
 URL: https://issues.apache.org/jira/browse/FLINK-31954
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: David Morávek


Based on [https://lists.apache.org/thread/xl456044hmxk87mwq02p4m22yp3b04sc] 
discussion.

 

We'll set up a Checkstyle rule that disallows Mockito usage and create a 
one-off suppression list for the existing violations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31955) Prevent JUnit 4 usage for the new code with Checkstyle

2023-04-27 Thread Jira
David Morávek created FLINK-31955:
-

 Summary: Prevent JUnit 4 usage for the new code with Checkstyle
 Key: FLINK-31955
 URL: https://issues.apache.org/jira/browse/FLINK-31955
 Project: Flink
  Issue Type: Improvement
  Components: Build System
 Environment: Based on 
[https://lists.apache.org/thread/xl456044hmxk87mwq02p4m22yp3b04sc] discussion.

 

We'll set up a Checkstyle rule that disallows JUnit 4 usage and create a 
one-off suppression list for the existing violations.
Reporter: David Morávek






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31996) Chaining operators with different max parallelism prevents rescaling

2023-05-04 Thread Jira
David Morávek created FLINK-31996:
-

 Summary: Chaining operators with different max parallelism 
prevents rescaling
 Key: FLINK-31996
 URL: https://issues.apache.org/jira/browse/FLINK-31996
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: David Morávek


We might chain operators with different max parallelism together if they are 
set to have the same parallelism initially.

When we decide to rescale the JobGraph vertices (using AdaptiveScheduler), 
we're gapped by the lowest maxParallelism of the operator chain. This is 
especially visible with things like CollectSink, TwoPhaseCommitSink, CDC, and a 
GlobalCommiter with maxParallelism set to 1.

 

An obvious solution would be to prevent the chaining of operators with 
different maxParallelism, but we need to double-check this doesn't introduce a 
breaking change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32000) Expose vertex max parallelism in the WebUI

2023-05-04 Thread Jira
David Morávek created FLINK-32000:
-

 Summary: Expose vertex max parallelism in the WebUI
 Key: FLINK-32000
 URL: https://issues.apache.org/jira/browse/FLINK-32000
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Reporter: David Morávek
 Attachments: Screenshot 2023-05-04 at 14.15.34.png

It would be great to expose max parallelism in the vertex detail drawer for 
debug purposes !Screenshot 2023-05-04 at 14.15.34.png! .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32006) AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry times out on Azure

2023-05-04 Thread Jira
David Morávek created FLINK-32006:
-

 Summary: 
AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry 
times out on Azure
 Key: FLINK-32006
 URL: https://issues.apache.org/jira/browse/FLINK-32006
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.18.0
Reporter: David Morávek


{code:java}
May 04 13:52:18 [ERROR] 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry
  Time elapsed: 100.009 s  <<< ERROR!
May 04 13:52:18 org.junit.runners.model.TestTimedOutException: test timed out 
after 100 seconds
May 04 13:52:18 at java.lang.Thread.sleep(Native Method)
May 04 13:52:18 at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncWaitOperatorTest.java:1313)
May 04 13:52:18 at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.testProcessingTimeWithTimeoutFunctionOrderedWithRetry(AsyncWaitOperatorTest.java:1277)
May 04 13:52:18 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
May 04 13:52:18 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
May 04 13:52:18 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
May 04 13:52:18 at java.lang.reflect.Method.invoke(Method.java:498)
May 04 13:52:18 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
May 04 13:52:18 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
May 04 13:52:18 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
May 04 13:52:18 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
May 04 13:52:18 at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
May 04 13:52:18 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
May 04 13:52:18 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
May 04 13:52:18 at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
May 04 13:52:18 at java.lang.Thread.run(Thread.java:748)
 {code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48671&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=9288



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32010) KubernetesLeaderRetrievalDriver always waits for lease update to resolve leadership

2023-05-05 Thread Jira
David Morávek created FLINK-32010:
-

 Summary: KubernetesLeaderRetrievalDriver always waits for lease 
update to resolve leadership
 Key: FLINK-32010
 URL: https://issues.apache.org/jira/browse/FLINK-32010
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.16.1, 1.17.0, 1.18.0
Reporter: David Morávek


The k8s-based leader retrieval is based on ConfigMap watching. The config map 
lifecycle (from the consumer point of view) is handled as a series of events 
with the following types:
 * ADDED -> the first time the consumer has seen the CM
 * UPDATED -> any further changes to the CM
 * DELETED -> ... you get the idea

The implementation assumes that ElectionDriver (the one that creates the CM) 
and ElectionRetriver are started simultaneously and therefore ignore the ADDED 
events because the CM is always created as empty and is updated with the 
leadership information later on.

This assumption is incorrect in the following cases (I might be missing some, 
but that's not important, the goal is to illustrate the problem):
 * TM joining the cluster later when the leaders are established to discover RM 
/ JM
 * RM tries to discover JM when 
MultipleComponentLeaderElectionDriver is used

This, for example, leads to higher job submission latencies that could be 
unnecessarily held back for up to the lease retry period [1].

[1] Configured by _high-availability.kubernetes.leader-election.retry-period_



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32160) CompactOperator cannot continue from checkpoint because of java.util.NoSuchElementException

2023-05-23 Thread Jira
Michał Fijołek created FLINK-32160:
--

 Summary: CompactOperator cannot continue from checkpoint because 
of java.util.NoSuchElementException
 Key: FLINK-32160
 URL: https://issues.apache.org/jira/browse/FLINK-32160
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.17.0, 1.16.0
 Environment: Flink 1.17 on k8s (flink-kubernetes-operator v.1.4.0), s3
Reporter: Michał Fijołek


Hello :) We have a flink job (v 1.17) on k8s (using official 
flink-k8s-operator) that reads data from kafka and writes it to s3 using 
flink-sql using compaction. Job sometimes fails and continues from checkpoint 
just fine, but once a couple of days we experience a crash loop. Job cannot 
continue from the latest checkpoint and fails with such exception:
{noformat}
java.util.NoSuchElementException at 
java.base/java.util.ArrayList$Itr.next(Unknown Source)
 at 
org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:114)
 at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
 at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
 at java.base/java.lang.Thread.run(Unknown Source){noformat}
Here’s the relevant code: 
[https://github.com/apache/flink/blob/release-1.17/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114]

It looks like `CompactOperator` is calling `next()` on iterator without 
checking `hasNext()` first - why's that? Is it a bug? Why 
`context.getOperatorStateStore().getListState(metaDescriptor)` returns empty 
iterator? Is latest checkpoint broken in such case? 
We have an identical job, but without compaction, and it works smoothly for a 
couple of weeks now. 

The whole job is just `select` from kafka and `insert` to s3.
{noformat}
CREATE EXTERNAL TABLE IF NOT EXISTS hive.`foo`.`bar` (  `foo_bar1` STRING,
  `foo_bar2` STRING,
  `foo_bar3` STRING,
  `foo_bar4` STRING
  )
  PARTITIONED BY (`foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING)
  STORED AS parquet
  LOCATION 's3a://my/bucket/'
  TBLPROPERTIES (
'auto-compaction' = 'true',
'compaction.file-size' = '128MB',
'sink.parallelism' = '8',
'format' = 'parquet',
'parquet.compression' = 'SNAPPY',
'sink.rolling-policy.rollover-interval' = '1 h',
'sink.partition-commit.policy.kind' = 'metastore'
  ){noformat}
Checkpoint configuration:
{noformat}
Checkpointing Mode Exactly Once
Checkpoint Storage FileSystemCheckpointStorage
State Backend HashMapStateBackend
Interval 20m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 0ms
Maximum Concurrent Checkpoints 1
Unaligned Checkpoints Disabled
Persist Checkpoints Externally Enabled (retain on cancellation)
Tolerable Failed Checkpoints 0
Checkpoints With Finished Tasks Enabled
State Changelog Disabled{noformat}

Is there something wrong with given config or is this some unhandled edge case? 

Currently our workaround is to restart a job, without using checkpoint - it 
uses a state from kafka which in this case is fine



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32228) Bump testcontainers

2023-05-31 Thread Jira
João Boto created FLINK-32228:
-

 Summary: Bump testcontainers
 Key: FLINK-32228
 URL: https://issues.apache.org/jira/browse/FLINK-32228
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: João Boto


Bump testcontainers version



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins

2023-06-12 Thread Jira
Luís Costa created FLINK-32318:
--

 Summary: [flink-operator] missing s3 plugin in folder plugins
 Key: FLINK-32318
 URL: https://issues.apache.org/jira/browse/FLINK-32318
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: Luís Costa


Greetings,

I'm trying to configure [Flink's Kubernetes HA 
services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/]
 for flink operator jobs, but got an error regarding s3 plugin: _"Could not 
find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto"_


{code:java}
2023-06-12 10:05:16,981 INFO  akka.remote.Remoting  
   [] - Starting remoting
2023-06-12 10:05:17,194 INFO  akka.remote.Remoting  
   [] - Remoting started; listening on addresses 
:[akka.tcp://flink@10.4.125.209:6123]
2023-06-12 10:05:17,377 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system 
started at akka.tcp://flink@10.4.125.209:6123
2023-06-12 10:05:18,175 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
KubernetesApplicationClusterEntrypoint down with application status FAILED. 
Diagnostics org.apache.flink.util.FlinkException: Could not create the ha 
services from the instantiated HighAvailabilityServicesFactory 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
at 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
Caused by: java.io.IOException: Could not create FileSystem for highly 
available storage path 
(s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
at 
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86)
at 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto. Please ensure that each plugin resides within its own 
subfolder within the plugins directory. See 
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/
 for more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99)
... 13 more {code}

Looking into the container, can see that s3 plugins are in folder /opt/flink/ 
instead of s3/plugins as mentioned 

[jira] [Created] (FLINK-32376) [FLIP-287] Extend Sink#InitContext to expose TypeSerializer, ObjectReuse and JobID

2023-06-19 Thread Jira
João Boto created FLINK-32376:
-

 Summary: [FLIP-287] Extend Sink#InitContext to expose 
TypeSerializer, ObjectReuse and JobID
 Key: FLINK-32376
 URL: https://issues.apache.org/jira/browse/FLINK-32376
 Project: Flink
  Issue Type: Improvement
Reporter: João Boto






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32540) The issue of not distributing the last batch of data

2023-07-04 Thread Jira
原来你是小幸运001 created FLINK-32540:
--

 Summary: The issue of not distributing the last batch of data
 Key: FLINK-32540
 URL: https://issues.apache.org/jira/browse/FLINK-32540
 Project: Flink
  Issue Type: Bug
 Environment: The above code was executed in IntelliJ IDEA, Flink 
version 1.16, which also has this issue in 1.14. Other versions have not 
attempted it

 
Reporter: 原来你是小幸运001


I copied the source code of the flat map and wanted to implement my own flat 
map. One of the logic is to issue the last piece of data at the end of the 
Flink job, so I executed collector.collect in the close method, but the data 
was not issued and the operator below cannot receive it.
{code:java}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;

/**
 * @author LaiYongBIn
 * @date 2023/7/5 10:09
 * @Description Do SomeThing
 */
public class Test {
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream stream0 =
env.addSource(new SourceFunction() {
@Override
public void run(SourceContext 
sourceContext) throws Exception {
sourceContext.collect("TEST");

System.out.println("cancel");
}

@Override
public void cancel() {
}
})
.setParallelism(1);

MyFlatMapFun flatMapFunc = new MyFlatMapFun();
TypeInformation outType = 
TypeExtractor.getFlatMapReturnTypes(env.clean(flatMapFunc), stream0.getType(), 
Utils.getCallLocationName(), true);

DataStream flatMap = stream0.transform("Flat Map", outType, new 
MyStreamOperator(env.clean(flatMapFunc))).setParallelism(1);

flatMap.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String s, Collector collector) throws 
Exception {

System.out.println("Obtain upstream 
data is:" + s);
}
});

env.execute();
}
}


class MyStreamOperator extends AbstractUdfStreamOperator> implements OneInputStreamOperator {


private transient TimestampedCollector collector;


public MyStreamOperator(FlatMapFunction userFunction) {
super(userFunction);
}

@Override
public void open() throws Exception {
collector = new TimestampedCollector<>(output);
}

@Override
public void close() throws Exception {
// Distribute data during close
collector.collect("close message");
}

@Override
public void processElement(StreamRecord streamRecord) throws 
Exception {
// do nothing
}
}


 class MyFlatMapFun implements FlatMapFunction {

@Override
public void flatMap(String s, Collector collector) throws Exception 
{
// do nothing
}
} {code}
Then I found out there was a finish method, and I tried to execute 'collector. 
collect' in the finish method, and the data was successfully distributed。
{code:java}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runt

[jira] [Created] (FLINK-32567) when Flink write azure data lake storage,error occur

2023-07-09 Thread Jira
宇宙先生 created FLINK-32567:


 Summary: when Flink write azure data lake storage,error occur
 Key: FLINK-32567
 URL: https://issues.apache.org/jira/browse/FLINK-32567
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.17.1
Reporter: 宇宙先生
 Attachments: image-2023-07-10-14-26-22-019.png, 
image-2023-07-10-14-28-11-792.png

When I strictly followed the official website to perform these two operations, 
I still reported the wrong certification problem, and I wanted to know how I 
should turn on the certification

!image-2023-07-10-14-26-22-019.png!

 

 

error:

 

!image-2023-07-10-14-28-11-792.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32627) Add support for dynamic time window function

2023-07-18 Thread Jira
张一帆 created FLINK-32627:
---

 Summary: Add support for dynamic time window function
 Key: FLINK-32627
 URL: https://issues.apache.org/jira/browse/FLINK-32627
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.18.0
Reporter: 张一帆
 Fix For: 1.18.0


When using windows for calculations, when the logic is frequently modified and 
adjusted, the entire program needs to be stopped, the code is modified, the 
program is repackaged and then submitted to the cluster. It is impossible to 
achieve logic dynamic modification and external dynamic injection. The window 
information can be obtained from the data to trigger Redistribution of windows 
to achieve the effect of dynamic windows{*}{*}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32629) Add support for dynamic CEP

2023-07-19 Thread Jira
张一帆 created FLINK-32629:
---

 Summary: Add support for dynamic CEP 
 Key: FLINK-32629
 URL: https://issues.apache.org/jira/browse/FLINK-32629
 Project: Flink
  Issue Type: New Feature
  Components: Library / CEP
Affects Versions: 1.18.0
Reporter: 张一帆
 Fix For: 1.18.0


When using CEP as a complex event processing engine, when the logic is 
frequently modified and the threshold is frequently adjusted, the entire 
program needs to be stopped, the code should be modified, the program should be 
repackaged, and then submitted to the cluster. Dynamic logic modification and 
external dynamic injection cannot be realized. Currently, Realized the dynamic 
injection of CEP logic, based on message-driven logic modification, you can 
manually inject specific messages into the source end to achieve fine-grained 
control of logic injection perception



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32650) Added the ability to split flink-protobuf codegen code

2023-07-23 Thread Jira
李精卫 created FLINK-32650:
---

 Summary: Added the ability to split flink-protobuf codegen code
 Key: FLINK-32650
 URL: https://issues.apache.org/jira/browse/FLINK-32650
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.17.0
Reporter: 李精卫
 Fix For: 1.17.0


Flink serializes and deserializes protobuf format data by calling the decode or 
encode method in GeneratedProtoToRow_XXX.java generated by codegen to parse 
byte[] data into protobuf java objects. The size of the decode/encode codegen 
method body is strongly related to the number of defined fields in protobuf. 
When the number of fields exceeds a certain threshold and the compiled method 
body exceeds 8k, the decode/encode method will not be optimized by JIT, 
seriously affecting serialization or deserialization performance. Even if the 
compiled method body exceeds 64k, it will directly cause the task to fail to 
start.
So I proposed Codegen Splitter for protobuf parsing to split the encode/decode 
method to solve this problem.
The specific idea is as follows. In the current decode/encode method, each 
field defined for the protobuf message is placed in the method body. In fact, 
there are no shared parameters between the fields, so multiple fields can be 
merged and parsed and written into the split method body. If the number of 
strings in the current method body exceeds the threshold, a split method will 
be generated, these fields will be parsed in the split method, and the split 
method will be called in the decode/encode method. By analogy, the 
decode/encode method including the split method is finally generated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32812) HBaseRowDataLookupFunction HTable instantiation of thread safety problems

2023-08-08 Thread Jira
王江洲 created FLINK-32812:
---

 Summary: HBaseRowDataLookupFunction HTable instantiation of thread 
safety problems
 Key: FLINK-32812
 URL: https://issues.apache.org/jira/browse/FLINK-32812
 Project: Flink
  Issue Type: Bug
  Components: Connectors / HBase
Affects Versions: 1.17.1
 Environment: Flink 1.17.1

Hbase 2.4.11

@Override

public void open(FunctionContext context) {
LOG.info("start open ...");
Configuration config = prepareRuntimeConfiguration();
try {
hConnection = ConnectionFactory.createConnection(config);
table = (HTable) hConnection.getTable(TableName.valueOf(hTableName));
} catch (TableNotFoundException tnfe) {
LOG.error("Table '{}' not found ", hTableName, tnfe);
throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
} catch (IOException ioe) {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
LOG.info("end open.");
}
Reporter: 王江洲
 Fix For: 1.17.1


HBaseRowDataLookupFunction HTable instantiation of thread safety problems in 
the actual development environment, the program has been performed, the close 
() method of the large probability can't perform, result in Ttable cannot be 
shut down, all use the same Ttable subsequent applications, multithreading 
safety hazard, Data errors occur.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32961) A new FileSystemFactory that support two high available hdfs

2023-08-25 Thread Jira
王茂军 created FLINK-32961:
---

 Summary: A new FileSystemFactory that support two high available 
hdfs
 Key: FLINK-32961
 URL: https://issues.apache.org/jira/browse/FLINK-32961
 Project: Flink
  Issue Type: Improvement
  Components: FileSystems
Reporter: 王茂军
 Fix For: 1.18.0


I run realtime ETL program by flink on yarn. The ETL program sink user log to 
master hdfs, and sink checkpoint to anather micro hdfs.The master hdfs and 
micro hdfs are both high available.

By default, the ETL program can not understand the dfs.nameservices of the 
micro hdfs.

I prepare to write a custom org.apache.flink.core.fs.FileSystemFactory that 
support two or more ha hdfs.So that , i can sink user log to master hdfs, and 
save checkpoint data to micro hdfs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33163) Support Java 21 (LTS)

2023-09-26 Thread Jira
Maciej Bryński created FLINK-33163:
--

 Summary: Support Java 21 (LTS)
 Key: FLINK-33163
 URL: https://issues.apache.org/jira/browse/FLINK-33163
 Project: Flink
  Issue Type: Bug
Reporter: Maciej Bryński


Based on https://issues.apache.org/jira/browse/FLINK-15736 we should have 
similar ticket for Java 21 LTS.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33231) Memory leak in KafkaSourceReader if no data in consumed topic

2023-10-10 Thread Jira
Lauri Suurväli created FLINK-33231:
--

 Summary: Memory leak in KafkaSourceReader if no data in consumed 
topic
 Key: FLINK-33231
 URL: https://issues.apache.org/jira/browse/FLINK-33231
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.17.1
Reporter: Lauri Suurväli
 Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png

*Problem description*

Our Flink streaming job TaskManager heap gets full when the job has nothing to 
consume and process.

It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
When there are no messages in the source topic the TaskManager heap usage 
starts increasing until the job exits after receiving a SIGTERM signal. We are 
running the job on AWS EMR with YARN.

The problems with the TaskManager heap usage do not occur when there is data to 
process. It's also worth noting that sending a single message to the source 
topic of a streaming job that has been sitting idle and suffers from the memory 
leak will cause the heap to be cleared. However it does not resolve the problem 
since the heap usage will start increasing immediately after processing the 
message.

!Screenshot 2023-10-10 at 12.49.37.png!

TaskManager heap used percentage is calculated by 

 
{code:java}
flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
 

 

 I was able to take heap dumps of the TaskManager processes during a high heap 
usage percentage. Heap dump analysis detected 912,355 instances of 
java.util.HashMap empty collections retaining >= 43,793,040 bytes.

!Screenshot 2023-10-09 at 14.13.43.png!

The retained heap seemed to be located at:

 
{code:java}
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
 

!Screenshot 2023-10-09 at 13.02.34.png!

 

*Possible hints:*

An empty HashMap is added during the snapshotState method to offsetsToCommit 
map if it does not already exist for the given checkpoint. [KafkaSourceReader 
line 
107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]

 
{code:java}
Map offsetsMap =
offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
{code}
 

If the startingOffset for the given split is >= 0 then a new entry would be 
added to the map from the previous step. [KafkaSourceReader line 
113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
{code:java}
if (split.getStartingOffset() >= 0) {
offsetsMap.put(
split.getTopicPartition(),
new OffsetAndMetadata(split.getStartingOffset()));
}{code}
If the starting offset is smaller than 0 then this would leave the offsetMap 
created in step 1 empty. We can see from the logs that the startingOffset is -3 
when the splits are added to the reader.

 
{code:java}
Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
[Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
-9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
 

 

The offsetsToCommit map is cleaned from entries once they have been committed 
to Kafka which happens during the callback function that is passed to the 
KafkaSourceFetcherManager.commitOffsets method in 
KafkaSourceReader.notifyCheckpointComplete method.

However if the committedPartitions is empty for the given checkpoint, then the 
KafkaSourceFetcherManager.commitOffsets method returns.  
[KafkaSourceFetcherManager line 
78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78]
{code:java}
if (offsetsToCommit.isEmpty()) {
return;
} {code}
We can observe from the logs that indeed an empty map is encountered at this 
step:
{code:java}
Committing offsets {}{code}
*Conclusion*

It seems that an empty map gets added per each checkpoint to offsetsToCommit 
map. Since the startingOffset in our case is -3 then the empty map never gets 
filled. During the offset commit phase the offsets for these checkpoints are 

[jira] [Created] (FLINK-33357) add Apache Software License 2

2023-10-24 Thread Jira
蔡灿材 created FLINK-33357:
---

 Summary: add Apache Software License 2
 Key: FLINK-33357
 URL: https://issues.apache.org/jira/browse/FLINK-33357
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.6.0
Reporter: 蔡灿材
 Fix For: kubernetes-operator-1.5.0
 Attachments: 2023-10-25 12-08-58屏幕截图.png

Flinkdeployments.flink.apache.org - v1. Currently yml and 
flinksessionjobs.flink.apache.org - v1. Yml don't

add add Apache Software License 2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33378) Bump flink version on flink-connectors-jdbc

2023-10-27 Thread Jira
João Boto created FLINK-33378:
-

 Summary: Bump flink version on flink-connectors-jdbc
 Key: FLINK-33378
 URL: https://issues.apache.org/jira/browse/FLINK-33378
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: João Boto


With the release of Flink 1.18, bump flink version on connector 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33393) flink document description error

2023-10-30 Thread Jira
蔡灿材 created FLINK-33393:
---

 Summary: flink document description error
 Key: FLINK-33393
 URL: https://issues.apache.org/jira/browse/FLINK-33393
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.17.1
Reporter: 蔡灿材
 Fix For: 1.17.1
 Attachments: 捕获.PNG

flink document description error, function part description error



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33528) Externalize Python connector code

2023-11-12 Thread Jira
Márton Balassi created FLINK-33528:
--

 Summary: Externalize Python connector code
 Key: FLINK-33528
 URL: https://issues.apache.org/jira/browse/FLINK-33528
 Project: Flink
  Issue Type: Technical Debt
  Components: API / Python, Connectors / Common
Affects Versions: 1.18.0
Reporter: Márton Balassi
 Fix For: 1.19.0


During the connector externalization effort end to end tests for the python 
connectors were left in the main repository under:

[https://github.com/apache/flink/tree/master/flink-python/pyflink/datastream/connectors]

These include both python connector implementation and tests. Currently they 
depend on a previously released version of the underlying connectors, otherwise 
they would introduce a circular dependency given that they are in the flink 
repo at the moment.

This setup prevents us from propagating any breaking change to PublicEvolving 
and Internal APIs used by the connectors as they lead to breaking the python 
e2e tests. We run into this while implementing FLINK-25857.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33556) Test infrastructure for externalized python code

2023-11-15 Thread Jira
Márton Balassi created FLINK-33556:
--

 Summary: Test infrastructure for externalized python code
 Key: FLINK-33556
 URL: https://issues.apache.org/jira/browse/FLINK-33556
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python, Connectors / Common
Affects Versions: 1.18.0
Reporter: Márton Balassi
Assignee: Peter Vary
 Fix For: 1.19.0


We need to establish the reusable parts of the python infrastructure as part of 
the shared connector utils such that it can be easily reused. Ideally we would 
create a github workflow similar to 
https://github.com/apache/flink-connector-shared-utils/blob/ci_utils/.github/workflows/ci.yml.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33557) Externalize Cassandra Python connector code

2023-11-15 Thread Jira
Márton Balassi created FLINK-33557:
--

 Summary: Externalize Cassandra Python connector code
 Key: FLINK-33557
 URL: https://issues.apache.org/jira/browse/FLINK-33557
 Project: Flink
  Issue Type: Sub-task
Reporter: Márton Balassi


See description of parent ticket for context.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33558) Externalize Elasticsearch Python connector code

2023-11-15 Thread Jira
Márton Balassi created FLINK-33558:
--

 Summary: Externalize Elasticsearch Python connector code
 Key: FLINK-33558
 URL: https://issues.apache.org/jira/browse/FLINK-33558
 Project: Flink
  Issue Type: Sub-task
Reporter: Márton Balassi


See description of parent ticket for context.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33559) Externalize Kafka Python connector code

2023-11-15 Thread Jira
Márton Balassi created FLINK-33559:
--

 Summary: Externalize Kafka Python connector code
 Key: FLINK-33559
 URL: https://issues.apache.org/jira/browse/FLINK-33559
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.18.0
Reporter: Márton Balassi
Assignee: Peter Vary
 Fix For: 1.19.0


See description of parent ticket for context.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33560) Externalize Kinesis Python connector code

2023-11-15 Thread Jira
Márton Balassi created FLINK-33560:
--

 Summary: Externalize Kinesis Python connector code
 Key: FLINK-33560
 URL: https://issues.apache.org/jira/browse/FLINK-33560
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python, Connectors / Kinesis
Affects Versions: 1.18.0
Reporter: Márton Balassi
 Fix For: 1.19.0


See description of parent ticket for context.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33561) Externalize Pulsar Python connector code

2023-11-15 Thread Jira
Márton Balassi created FLINK-33561:
--

 Summary: Externalize Pulsar Python connector code
 Key: FLINK-33561
 URL: https://issues.apache.org/jira/browse/FLINK-33561
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python, Connectors / Pulsar
Affects Versions: 1.18.0
Reporter: Márton Balassi
 Fix For: 1.19.0


See description of parent ticket for context.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33562) Externalize RabbitMQ Python connector code

2023-11-15 Thread Jira
Márton Balassi created FLINK-33562:
--

 Summary: Externalize RabbitMQ Python connector code
 Key: FLINK-33562
 URL: https://issues.apache.org/jira/browse/FLINK-33562
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python, Connectors/ RabbitMQ
Affects Versions: 1.18.0
Reporter: Márton Balassi
 Fix For: 1.19.0


See description of parent ticket for context.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33722) MATCH_RECOGNIZE in batch mode ignores events order

2023-12-01 Thread Jira
Grzegorz Kołakowski created FLINK-33722:
---

 Summary: MATCH_RECOGNIZE in batch mode ignores events order
 Key: FLINK-33722
 URL: https://issues.apache.org/jira/browse/FLINK-33722
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.17.1
Reporter: Grzegorz Kołakowski


MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider 
the following example:
{code:sql}
FROM events
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY ts ASC
MEASURES
FIRST(A.ts) as _start,
LAST(A.ts) as _middle,
LAST(B.ts) as _finish
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS
DEFINE
A AS active is false,
B AS active is true
) AS T {code}
where _events_ is a Postgresql table containing ~1 records.
{code:java}
CREATE TABLE events (
  id INT,
  user_id INT,
  ts TIMESTAMP(3),
  active BOOLEAN,
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/test',
'username' = 'test',
'password' = 'test',
'table-name' = 'events'
); {code}
It can happen that __finish_ is smaller than __start_ or {_}_middle{_}, which 
is wrong.
{noformat}
   user_id  _start _middle 
_finish
 1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23 
14:34:44.264{noformat}
 

Repository where I reproduced the problem: 
https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging

 

According to [~dwysakowicz]:  In BATCH the CepOperator is always created to 
process records in processing time:
https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54
A comparator is passed along to the operator covering the sorting on ts field: 
https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173
 but this is only secondary sorting. It is applied only within records of the 
same timestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33729) Events are getting lost when an exception occurs within a processing function

2023-12-03 Thread Jira
Rafał Trójczak created FLINK-33729:
--

 Summary: Events are getting lost when an exception occurs within a 
processing function
 Key: FLINK-33729
 URL: https://issues.apache.org/jira/browse/FLINK-33729
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.15.3
Reporter: Rafał Trójczak


We have a Flink job using a Pulsar source that reads from an input topic, and a 
Pulsar sink that is writing to an output topic.  Both Flink and Pulsar 
connector are of version 1.15.3. The Pulsar version that I use is 2.10.3.

Here is a simple project that is intended to reproduce this problem: 
[https://github.com/trojczak/flink-pulsar-connector-problem/]

All of my tests were done on my local Kubernetes cluster using the Flink 
Kubernetes Operator and Pulsar is running on  my local Docker. But the same 
problem occurred on a "normal" cluster.

Expected behavior: When an exception is thrown within the code (or a 
TaskManager pod is restarted for any other reason, e.g. OOM exception), the 
processing should be picked up from the last event sent to the output topic.

Actual behavior: The events before the failure are sent correctly to the output 
topic, next some of the events from the input topic are missing, then from some 
point the events are being processed normally until the next exception is 
thrown, and so on. Finally, from 100 events that should be sent from the input 
topic to the output topic, only 40 are sent.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33955) UnsupportedFileSystemException when trying to save data to Azure's abfss File System

2023-12-28 Thread Jira
Alek Łańduch created FLINK-33955:


 Summary: UnsupportedFileSystemException when trying to save data 
to Azure's abfss File System
 Key: FLINK-33955
 URL: https://issues.apache.org/jira/browse/FLINK-33955
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.17.1, 1.18.0
 Environment: Flink 1.17.1 & Flink 1.18.0 with Java 11, ADLS Gen.2 with 
hierarchical namespace enabled
Reporter: Alek Łańduch
 Attachments: error.log, pom.xml, success.log

When using Azure's File System connector for reading and writing files to Azure 
Data Lake Storage 2 Flink job fails at writing files with given error:

 
{noformat}
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: 
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme 
"file"{noformat}
 

Full logs from Job Manager along with stack trace is attached to as 
[^error.log] file.

The connection itself seems to be good, as the job successfully creates desired 
structure inside ADLS (and the the `.part` file), but the file itself is empty.

The job is simple, as its only purpose is to save events `a`, `b` and `c` into 
a file on ADLS. The whole code is presented below:
{code:java}
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataStreamJob {
  public static void main(String[] args) throws Exception {
    final FileSink sink = FileSink
        .forRowFormat(
            new Path("abfss://t...@stads2dev01.dfs.core.windows.net/output"),
            new SimpleStringEncoder("UTF-8"))
        .build();
    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.fromElements("a", "b", "c").sinkTo(sink);
    env.execute("Test");
  }
}
{code}
Code is run locally using Flink 1.18.0 (the same behavior was present in 
version 1.17.1). The only change that was made to `flink-conf.yaml` was to add 
key for accessing Azure:

 
{code:java}
fs.azure.account.auth.type.stads2dev01.dfs.core.windows.net: SharedKey
fs.azure.account.key.stads2dev01.dfs.core.windows.net: **{code}
 

The [^pom.xml] file was created by using [Getting 
Started|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#getting-started]
 documentation - the only thing I added was `flink-azure-fs-hadoop` connector. 
The whole [^pom.xml] file is attached. The connector JAR was also copied from 
`opt` directory to `plugins/azure-fs-hadoop` in cluster files according to the 
documentation.

The interesting fact is that the deprecated method `writeAsText` (instead of 
FileSink) not only works and creates desired file on ADLS, but *the subsequent 
jobs that use FileSInk that previously failed now works and creates file 
successfully* (until cluster's restart). The logs from job with deprecated 
method are also attached here as [^success.log] file.

I suspect that it is somehow connected to how Azure File System is initialized, 
where the new FileSink method would create it incorrectly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33976) AdaptiveScheduler cooldown period is taken from a wrong configuration

2024-01-03 Thread Jira
David Morávek created FLINK-33976:
-

 Summary: AdaptiveScheduler cooldown period is taken from a wrong 
configuration
 Key: FLINK-33976
 URL: https://issues.apache.org/jira/browse/FLINK-33976
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration, Runtime / Coordination
Reporter: David Morávek


The new JobManager options introduced in FLINK-21883: 
`scaling-interval.\{min,max}` of AdaptiveScheduler are resolved from the 
per-Job configuration instead of JobManager's configuration, which is not 
correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-16260) Add Java 11 based version of the docker images

2020-02-24 Thread Jira
Ismaël Mejía created FLINK-16260:


 Summary: Add Java 11 based version of the docker images
 Key: FLINK-16260
 URL: https://issues.apache.org/jira/browse/FLINK-16260
 Project: Flink
  Issue Type: New Feature
  Components: Release System / Docker
Reporter: Ismaël Mejía


Since 1.10.0 supports Java 11, we can add a version of the docker image based 
on Java 11

Feature [requested in our old issue 
tracker|https://github.com/docker-flink/docker-flink/issues/97] and moved here



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16262) Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory

2020-02-24 Thread Jira
Jürgen Kreileder created FLINK-16262:


 Summary: Class loader problem with 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib directory
 Key: FLINK-16262
 URL: https://issues.apache.org/jira/browse/FLINK-16262
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.10.0
 Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 
build (nothing changed regarding Kafka and/or class loading).
Reporter: Jürgen Kreileder


We're using Docker images modeled after 
[https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile] 
(using Java 11)

When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the 
taskmanager startup fails with:
{code:java}
org.apache.kafka.common.config.ConfigException: Invalid value 
org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
key.serializer: Class org.apache.kafka.common.serialization.ByteArraySerializer 
could not be found.
 at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718)
 at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
 at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
 at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
 at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
 at 
org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:396)
 at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:326)
 at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
 at 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:76)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
 at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source)
 at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown Source)
 at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
 at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source)
 at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
 at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
 at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
Source)
 at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
 at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
 at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown 
Source){code}
This looks like class loading issue: If I copy our JAR to FLINK_LIB_DIR instead 
of FLINK_USR_LIB_DIR, everything works find.

(AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR)

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16277) StreamTableEnvironment.toAppendStream fails with Decimal types

2020-02-25 Thread Jira
Benoît Paris created FLINK-16277:


 Summary: StreamTableEnvironment.toAppendStream fails with Decimal 
types
 Key: FLINK-16277
 URL: https://issues.apache.org/jira/browse/FLINK-16277
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Legacy Planner, Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Benoît Paris
 Attachments: flink-test-schema-update.zip

The following fails when there is a Decimal type in the underlying TableSource:

 
{code:java}
DataStream appendStream = tEnv.toAppendStream(
  asTable,
  asTable.getSchema().toRowType()
);{code}
Yielding the following error:

 

ValidationException: Type ROW<`y` DECIMAL(38, 18)> of table field 'payload' 
does not match with the physical type ROW<`y` LEGACY('DECIMAL', 'DECIMAL')> of 
the 'payload' field of the TableSource return type

 

Remarks:

* toAppendStream is not ready for the new type system, does not accept the new 
DataTypes

* The LegacyTypeInformationType transition type hinders things. Replacing it 
with the new 

DataTypes.DECIMAL type makes things work.

 

Workaround: reprocess TypeConversions.fromLegacyInfoToDataType's output to 
replace LegacyTypeInformationType types when they are of DECIMAL typeroot with 
the new types.

 

Included is reproduction and workaround (activated by line 127) code, with java 
+ pom + stacktrace files.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16447) Non serializable field on CompressWriterFactory

2020-03-05 Thread Jira
João Boto created FLINK-16447:
-

 Summary: Non serializable field on CompressWriterFactory
 Key: FLINK-16447
 URL: https://issues.apache.org/jira/browse/FLINK-16447
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.10.0
Reporter: João Boto


CompressWriterFactory has a CompressionCodec that is not serializable..

this make that StreamingFileSink fails to with non serializable field.

 

extending codec and implementing serializable solves the problem, but its odd



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16544) Flink FileSystem for web.uploadDir

2020-03-11 Thread Jira
Angel Barragán created FLINK-16544:
--

 Summary: Flink FileSystem for web.uploadDir
 Key: FLINK-16544
 URL: https://issues.apache.org/jira/browse/FLINK-16544
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Affects Versions: 1.10.0
Reporter: Angel Barragán


Currently the configuration properties "web.upload.dir" and "web.upload.dir" 
only supports paths on the local filesystem. When we deploy Flink under another 
cluster environment like yarn, it is more useful to be able to configure those 
directories to be on HDFS, so the size and maintenance tasks are easier, than 
trying to find out on which node yarn has launched the Jobmanager task, and 
manage the upload directory there.

In my concrete case, I found this management (let's say disadvantage) creating 
an AWS EMR cluster with Flink, where the default configuration creates this 
directory under /tmp on the local filesystem of the CORE node where the 
JobManager is deployed by Yarn. We found that EMR cluster is also configured to 
fully empty /tmp on a month basis, removing the upload directory for Flink, and 
in that case makigng Flink to fail when you try to submit a new Job. We had to 
recreate the directory manually.

The first solution I tried is to change the above configuration properties to 
use hdfs like we did with configuration property "state.checkpoints.dir", and 
we found it doesn't work on yarn environment. So I checked Flink code to see 
how this configuration is being used and found it is the local file system.

I think, that this solution would be an improvement on the management for Flink 
when running on another Cluster environment where we can use a shared storage 
like HDFS or S3.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16580) flink-connector-kafka desrializer

2020-03-12 Thread Jira
李开青 created FLINK-16580:
---

 Summary: flink-connector-kafka desrializer
 Key: FLINK-16580
 URL: https://issues.apache.org/jira/browse/FLINK-16580
 Project: Flink
  Issue Type: Wish
  Components: Connectors / Kafka
Affects Versions: 1.10.0
Reporter: 李开青


FlinkKafkaConsumer.setDeserializer(Properties props)

Why is ByteArrayDeserializer.class mandatory?

I found the flink sql conf "connector.properties.key.deserializer" will lose 
efficacy

 

private static void setDeserializer(Properties props) {
 final String deSerName = ByteArrayDeserializer.class.getName();

 Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
 Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);

 if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
 LOG.warn("Ignoring configured key DeSerializer ({})", 
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
 }
 if (valDeSer != null && !valDeSer.equals(deSerName)) {
 LOG.warn("Ignoring configured value DeSerializer ({})", 
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
 }

 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16596) Support Enum-Values as part of a Key

2020-03-14 Thread Jira
Felix Wollschläger created FLINK-16596:
--

 Summary: Support Enum-Values as part of a Key
 Key: FLINK-16596
 URL: https://issues.apache.org/jira/browse/FLINK-16596
 Project: Flink
  Issue Type: Improvement
Reporter: Felix Wollschläger


See: FLINK-11774

h2. Description:
The hashCode implementation of Enum-Values is guaranteed to be stable inside a 
JVM, but not accross multiple JVMs. This leads to failures when restoring from 
a checkpoint/savepoint containing Keyed-State on Keys with Enums as a part of 
the key.

For users of Flink there is a workaround to solve this problem:
Don't rely on the hashCode-Implementation of Enum-Values in the 
hashCode-Implementation of the actual key. Use the ordinal() or 
name().hashCode() instead of the Enum hashCode.

h2. Goals of this improvement:
Implement a way to handle Enum-Values, by either handling Enum-Values on a 
internal level or implementing a abstract Base KeySelector class that users of 
flink can choose if their key contains a Enum-Value.

Code to reproduce a failure and first thoughts can be found in FLINK-11774  .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16651) flink stream SQL INNER JOIN乱序

2020-03-18 Thread Jira
王太阳 created FLINK-16651:
---

 Summary: flink stream SQL INNER JOIN乱序
 Key: FLINK-16651
 URL: https://issues.apache.org/jira/browse/FLINK-16651
 Project: Flink
  Issue Type: Bug
Reporter: 王太阳


flink1.9.1:在读取kafka的两个topic(单分区)的时候,注册为两个表并进行 INNER JOIN,数出的结果是乱序的

topic: test
{code:java}
a,b,c
a,b,c1
a,b,c2{code}
 

topic: test2
 
{code:java}
a,b2,c2
a,b3,c3
a,b4,c4
a,b5,c5
a,b6,c6{code}
 
FLINK SQL: 
{code:java}
select *  from test t inner join test2 t2 on t.a=t2.a
{code}
第一次输出: 
{code:java}
1> (true,a,b,c,2020-03-18T09:40:11.858,a,b2,c2,2020-03-18T09:40:11.858)
1> (true,a,b,c2,2020-03-18T09:40:11.862,a,b2,c2,2020-03-18T09:40:11.862)
1> (true,a,b,c1,2020-03-18T09:40:11.862,a,b2,c2,2020-03-18T09:40:11.862)
1> (true,a,b,c,2020-03-18T09:40:11.862,a,b3,c3,2020-03-18T09:40:11.862)
1> (true,a,b,c2,2020-03-18T09:40:11.862,a,b3,c3,2020-03-18T09:40:11.862)
1> (true,a,b,c1,2020-03-18T09:40:11.862,a,b3,c3,2020-03-18T09:40:11.862)
1> (true,a,b,c,2020-03-18T09:40:11.863,a,b4,c4,2020-03-18T09:40:11.863)
1> (true,a,b,c2,2020-03-18T09:40:11.863,a,b4,c4,2020-03-18T09:40:11.863)
1> (true,a,b,c1,2020-03-18T09:40:11.863,a,b4,c4,2020-03-18T09:40:11.863)
1> (true,a,b,c,2020-03-18T09:40:11.863,a,b5,c5,2020-03-18T09:40:11.863)
1> (true,a,b,c2,2020-03-18T09:40:11.863,a,b5,c5,2020-03-18T09:40:11.863)
1> (true,a,b,c1,2020-03-18T09:40:11.863,a,b5,c5,2020-03-18T09:40:11.863)
1> (true,a,b,c,2020-03-18T09:40:11.864,a,b6,c6,2020-03-18T09:40:11.864)
1> (true,a,b,c2,2020-03-18T09:40:11.864,a,b6,c6,2020-03-18T09:40:11.864)
1> (true,a,b,c1,2020-03-18T09:40:11.864,a,b6,c6,2020-03-18T09:40:11.864){code}
 
第二次输出:
{code:java}
1> (true,a,b,c,2020-03-18T09:42:36.168,a,b2,c2,2020-03-18T09:42:36.169)
1> (true,a,b,c,2020-03-18T09:42:36.171,a,b4,c4,2020-03-18T09:42:36.171)
1> (true,a,b,c,2020-03-18T09:42:36.171,a,b3,c3,2020-03-18T09:42:36.171)
1> (true,a,b,c1,2020-03-18T09:42:36.171,a,b2,c2,2020-03-18T09:42:36.171)
1> (true,a,b,c1,2020-03-18T09:42:36.171,a,b4,c4,2020-03-18T09:42:36.171)
1> (true,a,b,c1,2020-03-18T09:42:36.171,a,b3,c3,2020-03-18T09:42:36.171)
1> (true,a,b,c2,2020-03-18T09:42:36.172,a,b2,c2,2020-03-18T09:42:36.172)
1> (true,a,b,c2,2020-03-18T09:42:36.172,a,b4,c4,2020-03-18T09:42:36.172)
1> (true,a,b,c2,2020-03-18T09:42:36.172,a,b3,c3,2020-03-18T09:42:36.172)
1> (true,a,b,c,2020-03-18T09:42:36.188,a,b5,c5,2020-03-18T09:42:36.188)
1> (true,a,b,c2,2020-03-18T09:42:36.188,a,b5,c5,2020-03-18T09:42:36.188)
1> (true,a,b,c1,2020-03-18T09:42:36.188,a,b5,c5,2020-03-18T09:42:36.188)
1> (true,a,b,c,2020-03-18T09:42:36.188,a,b6,c6,2020-03-18T09:42:36.188)
1> (true,a,b,c2,2020-03-18T09:42:36.188,a,b6,c6,2020-03-18T09:42:36.188)
1> (true,a,b,c1,2020-03-18T09:42:36.188,a,b6,c6,2020-03-18T09:42:36.188)
{code}
 
 
 
 
 
 
 
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16846) Add docker images with python

2020-03-28 Thread Jira
Ismaël Mejía created FLINK-16846:


 Summary: Add docker images with python
 Key: FLINK-16846
 URL: https://issues.apache.org/jira/browse/FLINK-16846
 Project: Flink
  Issue Type: Improvement
  Components: Release System / Docker
Reporter: Ismaël Mejía


We do not include python currently in the docker images. This issue is to 
include it or create derived python specific images.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16907) did not react to cancelling signal for 30 seconds, but is stuck in method:

2020-04-01 Thread Jira
辛羿彤 created FLINK-16907:
---

 Summary: did not react to cancelling signal for 30 seconds, but is 
stuck in method:
 Key: FLINK-16907
 URL: https://issues.apache.org/jira/browse/FLINK-16907
 Project: Flink
  Issue Type: Task
  Components: API / DataStream
Affects Versions: 1.9.1
 Environment: Hadoop version: 3.1.1.3.1.0.0-78
flink-1.9.1
hdp-3.1.0.0-78-kafka
Reporter: 辛羿彤


接入kafka数据时在map方法卡住无响应



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17259) Have scala 2.12 support

2020-04-20 Thread Jira
João Boto created FLINK-17259:
-

 Summary: Have scala 2.12 support
 Key: FLINK-17259
 URL: https://issues.apache.org/jira/browse/FLINK-17259
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Affects Versions: 2.0.0
Reporter: João Boto


In statefun-flink is defined the scala.binary.version as 2.11

this force to use this the use of scala 2.11

 

should be the default 2.12? or have the option to chose the scala version



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17324) Move the image we use to generate the flink-docker image into flink-docker

2020-04-22 Thread Jira
Ismaël Mejía created FLINK-17324:


 Summary: Move the image we use to generate the flink-docker image 
into flink-docker
 Key: FLINK-17324
 URL: https://issues.apache.org/jira/browse/FLINK-17324
 Project: Flink
  Issue Type: Improvement
  Components: Release System / Docker
Reporter: Ismaël Mejía


Before the docker official image was repatriated into Apache Flink we used a 
docker image that contained the scripts to generate the release.

{{docker run --rm \}}
{{  --volume ~/projects/docker-flink:/build \}}
{{  plucas/docker-flink-build \}}
{{  /build/generate-stackbrew-library.sh > ~/projects/official-images  
/library/flink}}

Notice that this docker image tool 'plucas/docker-flink-build' is not part of 
upstream Flink so we need to move it there into some sort of tools section in 
the flink-docker repo or document an alternative to it.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14173) ANSI-style JOIN with Temporal Table Function fails

2019-09-23 Thread Jira
Benoît Paris created FLINK-14173:


 Summary: ANSI-style JOIN with Temporal Table Function fails
 Key: FLINK-14173
 URL: https://issues.apache.org/jira/browse/FLINK-14173
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Legacy Planner, Table SQL / Planner
Affects Versions: 1.9.0
 Environment: Java 1.8, Scala 2.11, Flink 1.9 (pom.xml file attached)
Reporter: Benoît Paris
 Attachments: flink-test-temporal-tables-1.9.zip

The planner fails to generate a plan for ANSI-style joins with Temporal Table 
Functions. The Blink planners throws with a "Missing conversion is 
LogicalTableFunctionScan[convention: NONE -> LOGICAL]" message (and some very 
fancy graphviz stuff). The old planner does a "This exception indicates that 
the query uses an unsupported SQL feature."

This fails:
{code:java}
 SELECT 
   o_amount * r_amount AS amount 
 FROM Orders 
 JOIN LATERAL TABLE (Rates(o_proctime)) 
   ON r_currency = o_currency {code}
This works:
{code:java}
 SELECT 
   o_amount * r_amount AS amount 
 FROM Orders 
, LATERAL TABLE (Rates(o_proctime)) 
 WHERE r_currency = o_currency{code}
Reproduction with the attached Java and pom.xml files. Also included: stack 
traces for both Blink and the old planner.

I think this is a regression. I remember using ANSI-style joins with a temporal 
table function successfully in 1.8.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14200) Temporal Table Function Joins do not work on Tables (only TableSources) on the query side

2019-09-24 Thread Jira
Benoît Paris created FLINK-14200:


 Summary: Temporal Table Function Joins do not work on Tables (only 
TableSources) on the query side
 Key: FLINK-14200
 URL: https://issues.apache.org/jira/browse/FLINK-14200
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.0
 Environment: Java 8, Scala 2.11, Flink 1.9
Reporter: Benoît Paris
 Attachments: temporal-table-function-query-side-as-not-table-source.zip

This only affects the Blink planner. The legacy planner works fine.

With Orders as a TableSource, and Orders2 as a Table with the same content:
{code:java}
tEnv.registerTableSource("Orders", new FooSource(new String[] {"o_currency", 
"o_amount", "o_proctime"}));
Table orders2 = tEnv.sqlQuery("SELECT * FROM Orders");
tEnv.registerTable("Orders2", orders2);{code}
This works (TableSource on the query side):
{code:java}
SELECT 
 o_amount * r_amount AS amount 
FROM Orders  
 , LATERAL TABLE (Rates(o_proctime)) 
WHERE r_currency = o_currency{code}
While this does not (Table on the query side):
{code:java}
SELECT 
 o_amount * r_amount AS amount 
FROM Orders2 
 , LATERAL TABLE (Rates(o_proctime)) 
WHERE r_currency = o_currency{code}
Throwing an NPE in FlinkRelBuilder, called from 
LogicalCorrelateToJoinFromTemporalTableFunctionRule. Attached is Java code for 
reproduction, along with the full log and stacktrace, and a pom.xml.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14297) Temporal Table Function Build Side does not accept a constant key

2019-09-30 Thread Jira
Benoît Paris created FLINK-14297:


 Summary: Temporal Table Function Build Side does not accept a 
constant key
 Key: FLINK-14297
 URL: https://issues.apache.org/jira/browse/FLINK-14297
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Legacy Planner, Table SQL / Planner
Affects Versions: 1.9.0
 Environment: Java 1.8, Scala 2.11, Flink 1.9 (pom.xml file attached)
Reporter: Benoît Paris
 Attachments: flink-test-temporal-constant-key-build-side.zip

When defining a table that will be used as the build side on a Temporal Table 
Function, a constant key will not be accepted:

In:
{code:java}
Table ratesHistory = tEnv.sqlQuery(sql);
TemporalTableFunction rates = 
ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
{code}
 

This crashes:

 
{code:java}
SELECT 
 'Eur1' AS r_currency,
 r_amount, 
 r_proctime 
FROM RatesHistory{code}
 

Making a type verification in Calcite fail: RelOptUtil.verifyTypeEquivalence, 
when trying to join the Lateral Table Function. It seems like this is corner 
case in nullability, the error is: 

 
{code:java}
(Blink) 
Apply rule [LogicalCorrelateToJoinFromTemporalTableFunctionRule] [...]
(old planner) 
Apply rule [LogicalCorrelateToTemporalTableJoinRule] [...]
Exception in thread "main" java.lang.AssertionError: Cannot add expression of 
different type to set:
set type is RecordType(
  [...] VARCHAR(65536) CHARACTER SET "UTF-16LE"  r_currency, 
[...]) NOT NULL
expression type is RecordType(
  [...] CHAR(4)CHARACTER SET "UTF-16LE" NOT NULL r_currency, 
[...]) NOT NULL{code}
 

(formatting and commenting mine)

No problem in VARCHAR vs CHAR, as using the following works:

 
{code:java}
SELECT 
 COALESCE('Eur1', r_currency) AS r_currency, 
 r_amount, 
 r_proctime 
FROM RatesHistory{code}
 

The problem is coming from NULL vs NOT NULL

Attached is Java reproduction code, pom.xml, and both blink and old planner 
logs and stacktraces.

My speculations on this is that an earlier transformation infers and normalizes 
the key type (or maybe gets it from the query side?), but the decorrelation and 
special temporal table function case happens later.

Reordering the rules could help? Maybe way too heavy handed.

Or do the 
[rexBuilder.makeInputRef|[https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala#L145]]
 in a type-compatible way.

This seems to be related to another issue:

https://issues.apache.org/jira/browse/FLINK-14173

Where careful support of the the nullability of the build side key in a LEFT 
JOIN will take part in the output.

This might seem like a useless use case, but a constant key is the only way to 
access in SQL a Temporal Table Function for a global value (like querying the 
global current number of users)

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14617) Dataset Parquet ClassCastException for SpecificRecord

2019-11-05 Thread Jira
Dominik Wosiński created FLINK-14617:


 Summary: Dataset Parquet ClassCastException for SpecificRecord
 Key: FLINK-14617
 URL: https://issues.apache.org/jira/browse/FLINK-14617
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.8.0
 Environment: {code:java}
 {code}
Reporter: Dominik Wosiński


The following code runs smoothly when the _executionEnvironment_ is instance of 
_StreamExecutionEnvironment_:
{code:java}
val job = Job.getInstance()
AvroReadSupport.setAvroDataSupplier(job.getConfiguration, 
classOf[AvroDataSupplierWithTimestampConversion])
val avroParquetInputFormat = new AvroParquetInputFormat[GpsPointDTO]()
val hadoopInputFormat = new HadoopInputFormat[Void, 
GpsPointDTO](avroParquetInputFormat, classOf[Void], classOf[GpsPointDTO], job)
FileInputFormat.addInputPaths(job, filePaths.head)
executionEnvironment.createInput(hadoopInputFormat).map(_._2).print(){code}
But when the _ExecutionEnvironment_ is used instead of 
_StreamExecutionEnviroment,_ then the code throws the: 
{code:java}
Caused by: java.lang.ClassCastException: class 
org.apache.avro.generic.GenericData$Record cannot be cast to class 
com.company.GpsPointDTO (org.apache.avro.generic.GenericData$Record and 
com.company.GpsPointDTO are in unnamed module of loader 'app'){code}
I don't think this is the expected behavior. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14954) Consider providing an official OpenAPI specification of REST Monitoring API

2019-11-26 Thread Jira
Michaël Melchiore created FLINK-14954:
-

 Summary: Consider providing an official OpenAPI specification of 
REST Monitoring API
 Key: FLINK-14954
 URL: https://issues.apache.org/jira/browse/FLINK-14954
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Reporter: Michaël Melchiore


Hello,

Flink provides a very helpful REST Monitoring API.

OpenAPI is convenient standard to generate clients in a variety of language for 
REST API documented according to their specification. In this case, clients 
would be helpful to automate management of Flink clusters.

Currently, there is no "official" OpenAPI specification of Flink REST 
Monitoring API. [Some|https://github.com/nextbreakpoint/flink-client] have 
written by users, but their consistency across Flink releases is uncertain.

I think it would be beneficial to have an OpenAPI specification provided and 
maintained by the Flink project.

 

Kind regards,

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15016) Remove unused dependency

2019-12-02 Thread Jira
César Soto Valero created FLINK-15016:
-

 Summary: Remove unused dependency
 Key: FLINK-15016
 URL: https://issues.apache.org/jira/browse/FLINK-15016
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: César Soto Valero


Dependency *commons-io:commons-io* is declared in module *flink-core*. However, 
this dependency is not used and, therefore, should be removed to make the pom 
clearer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15357) schema created by JsonRowSchemaConverter are not suitable for TableEnv.sqlQuery table schema

2019-12-21 Thread Jira
巫旭阳 created FLINK-15357:
---

 Summary: schema created by JsonRowSchemaConverter are not suitable 
for TableEnv.sqlQuery table schema 
 Key: FLINK-15357
 URL: https://issues.apache.org/jira/browse/FLINK-15357
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common, Table SQL / API
Affects Versions: 1.9.1
 Environment: You can reappear the bug by the following code 

String sql = "SELECT count(*) as cnt, age, TUMBLE_START(rowtime, INTERVAL '10' 
SECOND) as tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), 
age";
 StreamExecutionEnvironment senv = 
StreamExecutionEnvironment.getExecutionEnvironment();
 senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 StreamTableEnvironment tenv = StreamTableEnvironment.create(senv);
 DataStream source = senv.addSource(new SourceFunction() {
 @Override
 public void run(SourceContext sourceContext) throws Exception {
 int i = 1000;
 String[] names = \{"Hanmeimei", "Lilei"};
 while (i > 1) {
 sourceContext.collect(new User(names[i%2], i, new 
Timestamp(System.currentTimeMillis(;
 Thread.sleep(10);
 i--;
 }
 }
 @Override
 public void cancel() {

 }
 });
 tenv.registerDataStream("abc", source, "name, age, timestamp, 
rowtime.rowtime");
 Table table = tenv.sqlQuery(sql);
 List hosts = Arrays.asList(new Host("10.20.128.210", 19201, "http"));
 TypeInformation typeInformation = JsonRowSchemaConverter.convert("{" +
 " type:'object'," +
 " properties:{" +
 " cnt: {" +
 " type: 'number'" +
 " }," +
 " tumTime:{" +
 " type:'string'," +
 " format:'date-time'" +
 " }" +
 " }" +
 "}");
 RowTypeInfo typeInfo = (RowTypeInfo) typeInformation;
 TypeInformation[] typeInformations = typeInfo.getFieldTypes();

 String[] fieldNames = typeInfo.getFieldNames();
 TableSchema.Builder builder = TableSchema.builder();
 for (int i = 0; i < typeInformations.length; i ++) {
 builder.field(fieldNames[i], typeInformations[i]);
 }
 Elasticsearch6UpsertTableSink establesink = new Elasticsearch6UpsertTableSink(
 true,
 builder.build(),
 hosts,
 "aggregation",
 "data",
 "$",
 "n/a",
 new JsonRowSerializationSchema.Builder(typeInformation).build(),
 XContentType.JSON,
 new IgnoringFailureHandler(),
 new HashMap<>()
 );
 tenv.registerTableSink("aggregationTableSink", establesink);
 table.insertInto("aggregationTableSink");
}


@Data
@AllArgsConstructor
@NoArgsConstructor
public static class User {
 private String name;

 private Integer age;

 private Timestamp timestamp;
}
Reporter: 巫旭阳
 Fix For: 1.9.2, 1.10.0


Use JsonRowSchemaConverter.convert(jsonString) create schema TypeInfo area only 
sport bigdecimal DataType of number , but the Table created by 
usingTableEnvironmentImpl.sqlQuer(sqlString) may has a lot of number DataTypes 
such as Long, Integer。
when program run it will throw an exception like below:
{color:#FF}Field types of query result and registered TableSink [XXX] do 
not match.{color}
{color:#FF}Query result schema: [cnt: Long, tumTime: Timestamp]{color}
{color:#FF}TableSink schema: [cnt: BigDecimal, tumTime: Timestamp]{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15438) Counter metrics are incorrectly reported as total counts to DataDog

2019-12-30 Thread Jira
Jörn Kottmann created FLINK-15438:
-

 Summary: Counter metrics are incorrectly reported as total counts 
to DataDog
 Key: FLINK-15438
 URL: https://issues.apache.org/jira/browse/FLINK-15438
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.9.1
Reporter: Jörn Kottmann


The Flink semantics of a counter are not matching with the counters in DataDog.

In Flink a counter counts the total of increment and decrement calls.
In DataDog a counter is a rate over the reporting interval. 

The Flink implementation of the DataDog reporter seems to send the Flink 
counter value each time the metrics are reported. Correct would be to send the 
delta of the counter since the last report.

There are some features in DataDog which are easier to use if this could be 
fixed, e.g. alerts based on counters.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15584) Give nested data type of ROWs in ValidationException

2020-01-14 Thread Jira
Benoît Paris created FLINK-15584:


 Summary: Give nested data type of ROWs in ValidationException
 Key: FLINK-15584
 URL: https://issues.apache.org/jira/browse/FLINK-15584
 Project: Flink
  Issue Type: Improvement
Reporter: Benoît Paris


In

 

 
{code:java}
INSERT INTO baz_sinkSELECT
  a,
  ROW(b, c)
FROM foo_source{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15634) disableAutoGeneratedUIDs fails with coGroup and join

2020-01-17 Thread Jira
Jürgen Kreileder created FLINK-15634:


 Summary: disableAutoGeneratedUIDs fails with coGroup and join
 Key: FLINK-15634
 URL: https://issues.apache.org/jira/browse/FLINK-15634
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.10.0
Reporter: Jürgen Kreileder


coGroup/join seems to generate two Map operators for which you can't set the 
UID. 

Here's a test case:
{code:java}
@Test
public void testDisablingAutoUidsWorksWithCoGroup() throws Exception {
   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   env.getConfig().disableAutoGeneratedUIDs();

   env
  .addSource(new 
NoOpSourceFunction()).setUidHash("")
  .coGroup(env.addSource(new 
NoOpSourceFunction()).setUidHash(""))
  .where(o -> o).equalTo(o -> o)
  .window(TumblingEventTimeWindows.of(Time.days(1)))
  .with(new CoGroupFunction() {
 @Override
 public void coGroup(Iterable first, Iterable second, 
Collector out) throws Exception {
 }
  }).setUidHash("")
  .addSink(new 
DiscardingSink<>()).setUidHash("");

   env.execute();
}
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15726) Fixing error message in StreamExecTableSourceScan

2020-01-21 Thread Jira
Benoît Paris created FLINK-15726:


 Summary: Fixing error message in StreamExecTableSourceScan
 Key: FLINK-15726
 URL: https://issues.apache.org/jira/browse/FLINK-15726
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.9.1, 1.10.0, 1.11.0
Reporter: Benoît Paris


The error message in translateToPlanInternal does not provide good information.

[A hotfix 
|[https://github.com/apache/flink/commit/02b676e9169b9879d406e79c8cbe4fcf6b33afa1#diff-ed386bd5b2f8bc873a24413ff1d82562]]
 in the legacy planner corrected it but was not applied to the Blink planner.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15775) SourceFunctions are instanciated twice when pulled on from 2 Sinks

2020-01-27 Thread Jira
Benoît Paris created FLINK-15775:


 Summary: SourceFunctions are instanciated twice when pulled on 
from 2 Sinks
 Key: FLINK-15775
 URL: https://issues.apache.org/jira/browse/FLINK-15775
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.1, 1.10.0
Reporter: Benoît Paris
 Attachments: flink-test-duplicated-sources.zip

When pulled on by two sinks, the SourceFunctions of a TableSource will get 
instantiated twice; (and subsequently opened by the parallelism number, which 
is expected behavior):

The following will instantiate the FooTableSource's SourceFunction once (OK 
behavior, but not the processing we want):

 
{code:java}
tEnv.registerTableSource("foo_table", new FooTableSource());
Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
tEnv.registerTableSink("syso_sink_0", new SysoSink());
out0.insertInto("syso_sink_0");
{code}
 

This will instantiate the FooTableSource's SourceFunction twice (Not OK, as 
we're missing half the inputs in each SysoSink):

 
{code:java}
tEnv.registerTableSource("foo_table", new FooTableSource());
Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1");
tEnv.registerTableSink("syso_sink_0", new SysoSink());
tEnv.registerTableSink("syso_sink_1", new SysoSink());
out0.insertInto("syso_sink_0");
out1.insertInto("syso_sink_1"); 
{code}
 

This might not be a problem for Kafka's SourceFunctions, as we can always 
reread from a log; but it is a data loss problem when the source data can't be 
reproduced.

Actually, this might be me not understanding the API. Is there a way to make 
the runtime read from the same opened SourceFunctions?

Attached is Java code that logs the faulty opening of the SourceFunctions, 
pom.xml, and logical execution plans for the duplicated case, and the 
workaround.

 

Workaround: make a conversion to an appendStream. Somehow this makes the 
planner think it has to put a materialization barrier after the Source and read 
from that:

 
{code:java}
tEnv.registerTableSource("foo_table_source", new FooTableSource());
Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
Table appendingSourceTable = tEnv.fromDataStream(
 tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new 
TypeInformation[]{Types.LONG()}))
);
tEnv.registerTable("foo_table", appendingSourceTable);{code}
 

 

Best Regards,

Ben



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15808) Upgrade Parquet to version 1.11.0

2020-01-29 Thread Jira
Ismaël Mejía created FLINK-15808:


 Summary: Upgrade Parquet to version 1.11.0
 Key: FLINK-15808
 URL: https://issues.apache.org/jira/browse/FLINK-15808
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Ismaël Mejía






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15841) TimeWindow.intersects return true for consecutive windows

2020-02-01 Thread Jira
Jörn Kottmann created FLINK-15841:
-

 Summary: TimeWindow.intersects return true for consecutive windows
 Key: FLINK-15841
 URL: https://issues.apache.org/jira/browse/FLINK-15841
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Reporter: Jörn Kottmann


The TimeWindow JavaDoc explains that the start index is inclusive and the end 
index is exclusive, therefore two windows T0 to T1 and T1 to T2 are next to 
each other without overlapping.

To fix this the intersects comparison should be changed to: {{this.start < 
other.end && this.end > other.start}}

Also a test should be added to verify the methods works correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15844) Removal of JobWithJars.buildUserCodeClassLoader method without Configuration breaks backwards compatibility

2020-02-02 Thread Jira
Ismaël Mejía created FLINK-15844:


 Summary: Removal of JobWithJars.buildUserCodeClassLoader method 
without Configuration breaks backwards compatibility
 Key: FLINK-15844
 URL: https://issues.apache.org/jira/browse/FLINK-15844
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.9.2
Reporter: Ismaël Mejía


The removal of the method of the `JobWithJars.buildUserCodeClassLoader` is not 
backwards compatible with precedent versions of Flink 1.9.x

I was  trying to upgrade to the just released version on Apache Beam and it 
broke, so I a dependency analysis and found this:
https://output.jsbin.com/zudemis/3#Source_Removed



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15867) LAST_VALUE aggregate function does not support time-related types

2020-02-03 Thread Jira
Benoît Paris created FLINK-15867:


 Summary: LAST_VALUE aggregate function does not support 
time-related types
 Key: FLINK-15867
 URL: https://issues.apache.org/jira/browse/FLINK-15867
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.2, 1.10.0
Reporter: Benoît Paris
 Attachments: flink-test-lastvalue-timestamp.zip

The following fails:
{code:java}
LAST_VALUE(TIMESTAMP '2020-02-03 16:17:20')
LAST_VALUE(DATE '2020-02-03')
LAST_VALUE(TIME '16:17:20')
LAST_VALUE(NOW()){code}
But this works:

 
{code:java}
LAST_VALUE(UNIX_TIMESTAMP()) 
{code}
Leading me to say it might be more a type/format issue, rather than an actual 
time processing issue.

Attached is java + pom + full stacktrace, for reproduction. Stacktrace part is 
below.

 

The ByteLastValueAggFunction, etc types seem trivial to implement, but the in 
the createLastValueAggFunction only basic types seem to be dealt with. Is there 
a reason more complicated LogicalTypeRoots might not be implemented ? (old vs 
new types?)

 

 

Caused by: org.apache.flink.table.api.TableException: LAST_VALUE aggregate 
function does not support type: ''TIMESTAMP_WITHOUT_TIME_ZONE''.Caused by: 
org.apache.flink.table.api.TableException: LAST_VALUE aggregate function does 
not support type: ''TIMESTAMP_WITHOUT_TIME_ZONE''.Please re-check the data 
type. at 
org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createLastValueAggFunction(AggFunctionFactory.scala:617)
 at 
org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:113)
 at 
org.apache.flink.table.planner.plan.utils.AggregateUtil$$anonfun$9.apply(AggregateUtil.scala:285)
 at 
org.apache.flink.table.planner.plan.utils.AggregateUtil$$anonfun$9.apply(AggregateUtil.scala:279)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:279)
 at 
org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:228)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.(StreamExecGroupAggregate.scala:72)
 at 
org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecGroupAggregateRule.convert(StreamExecGroupAggregateRule.scala:68)
 at 
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:139) at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:208)
 at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
 at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:328) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-9557) FlinkTypeFactory should support BigInteger type

2018-06-08 Thread JIRA
Dominik Wosiński created FLINK-9557:
---

 Summary: FlinkTypeFactory should support BigInteger type
 Key: FLINK-9557
 URL: https://issues.apache.org/jira/browse/FLINK-9557
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.5.0
Reporter: Dominik Wosiński


Currently, `FlinkTypeFactory` method `typeInfoToSqlTypeName` does not support 
BigInteger, since this is default type returned by `JsonSchemaConverter` for 
all fields with type: `number` this can create issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9570) SQL Client merging environments uses AbstractMap

2018-06-12 Thread JIRA
Dominik Wosiński created FLINK-9570:
---

 Summary: SQL Client merging environments uses AbstractMap
 Key: FLINK-9570
 URL: https://issues.apache.org/jira/browse/FLINK-9570
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.5.0
Reporter: Dominik Wosiński


Currently _Environment.merge()_ function looks like below: 

 
{code:java}
final Environment mergedEnv = new Environment();

// merge tables
final Map tables = new HashMap<>(env1.getTables());
mergedEnv.getTables().putAll(env2.getTables());
mergedEnv.tables = tables;

{code}
and no-arg constructor for _Environment_ defaults tables to 
_Collections.emptyMap()._
This basically results in calling _putAll_ on _EmptyMap_ which defaults to 
_AbstractMap_ which always throws _UnsuppoertedOperationException._

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9575) Potential race condition when removing JobGraph in HA

2018-06-13 Thread JIRA
Dominik Wosiński created FLINK-9575:
---

 Summary: Potential race condition when removing JobGraph in HA
 Key: FLINK-9575
 URL: https://issues.apache.org/jira/browse/FLINK-9575
 Project: Flink
  Issue Type: Bug
Reporter: Dominik Wosiński


When we are removing the _JobGraph_ from _JobManager_ for example after 
invoking _cancel()_, the following code is executed : 
{noformat}
 
val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val 
result = if (removeJobFromStateBackend) { val futureOption = Some(future { try 
{ // ...otherwise, we can have lingering resources when there is a concurrent 
shutdown // and the ZooKeeper client is closed. Not removing the job 
immediately allow the // shutdown to release all resources. 
submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => 
log.warn(s"Could not remove submitted job graph $jobID.", t) } 
}(context.dispatcher)) try { archive ! decorateMessage( ArchiveExecutionGraph( 
jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => 
log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } 
else { None } currentJobs.remove(jobID) result case None => None } // remove 
all job-related BLOBs from local and HA store 
libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, 
removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption }
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9615) Add

2018-06-19 Thread JIRA
Dominik Wosiński created FLINK-9615:
---

 Summary: Add
 Key: FLINK-9615
 URL: https://issues.apache.org/jira/browse/FLINK-9615
 Project: Flink
  Issue Type: Improvement
Reporter: Dominik Wosiński


AFAIK, there is currently no possibility to use Kafka or other connectors as a 
sink in SQL Client. Such feature would be good for prototyping or quick streams 
manipulation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9627) Extending

2018-06-20 Thread JIRA
Dominik Wosiński created FLINK-9627:
---

 Summary: Extending 
 Key: FLINK-9627
 URL: https://issues.apache.org/jira/browse/FLINK-9627
 Project: Flink
  Issue Type: Bug
Reporter: Dominik Wosiński






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9772) Documentation of Hadoop API outdated

2018-07-05 Thread JIRA
Lorenz Bühmann created FLINK-9772:
-

 Summary: Documentation of Hadoop API outdated
 Key: FLINK-9772
 URL: https://issues.apache.org/jira/browse/FLINK-9772
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.5.0
Reporter: Lorenz Bühmann


It looks like the documentation of the [Hadoop 
Compatibility|https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/hadoop_compatibility.html]
 is somewhat outdated? At least the text and examples in section [Using Hadoop 
InputFormats|https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/hadoop_compatibility.html#using-hadoop-inputformats]
 mention methods

{{env.readHadoopFile}} and {{env.createHadoopInput}}

which do not exist anymore since 1.4.0. 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9778) Remove SlotRequest timeout

2018-07-08 Thread JIRA
陈梓立 created FLINK-9778:
--

 Summary: Remove SlotRequest timeout
 Key: FLINK-9778
 URL: https://issues.apache.org/jira/browse/FLINK-9778
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, ResourceManager, TaskManager
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.5.1


Now when SlotPool(JobMaster) requestSlotsFromResourceManager, it checks 
timeout, if RM does not response in 5 minutes, JM fails the request and 
re-request it. It does little good and cause flink request resource less 
exactly.

I would propose remove this timeout mechanism, that is, a SlotRequest does no 
more timeout. And our current failure tolerant mechanism would handle 
SlotRequest exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9779) Remove SlotRequest timeout

2018-07-09 Thread JIRA
陈梓立 created FLINK-9779:
--

 Summary: Remove SlotRequest timeout
 Key: FLINK-9779
 URL: https://issues.apache.org/jira/browse/FLINK-9779
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, ResourceManager, TaskManager
Reporter: 陈梓立


As is involved in FLINK-8643 and  FLINK-8653, we use external timeout to 
replace internal timeout of slot request. Follow the question: why not entirely 
remove this timeout mechanism? In our industrial case, this timeout mechanism 
causes more no-needed fail and makes resource allocation inaccurate.

I would propose to get rid of slot request timeout. Instead, we handle TM fail 
in RM where properly cancel pending request and if TM cannot offer slot to JM, 
we introduce a blacklist mechanism to nudge RM realloc for pending request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9813) Build xTableSource from Avro schemas

2018-07-11 Thread JIRA
François Lacombe created FLINK-9813:
---

 Summary: Build xTableSource from Avro schemas
 Key: FLINK-9813
 URL: https://issues.apache.org/jira/browse/FLINK-9813
 Project: Flink
  Issue Type: Wish
  Components: Table API & SQL
Affects Versions: 1.5.0
Reporter: François Lacombe


As Avro provide efficient data schemas formalism, it may be great to be able to 
build Flink Tables Sources with such files.

More info about Avro schemas 
:[https://avro.apache.org/docs/1.8.1/spec.html#schemas]

For instance, with CsvTableSource :

Parser schemaParser = new Schema.Parser();

Schema tableSchema = schemaParser.parse("avro.json");

Builder bld = CsvTableSource.builder().schema(tableSchema);

 

This would give me a fully available CsvTableSource with columns defined in 
avro.json

It may be possible to do so for every TableSources since avro format is really 
common and versatile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9814) CsvTableSource lack of column warning

2018-07-11 Thread JIRA
François Lacombe created FLINK-9814:
---

 Summary: CsvTableSource lack of column warning
 Key: FLINK-9814
 URL: https://issues.apache.org/jira/browse/FLINK-9814
 Project: Flink
  Issue Type: Wish
  Components: Table API & SQL
Affects Versions: 1.5.0
Reporter: François Lacombe


The CsvTableSource class is built by defining expected columns to be find in 
the corresponding csv file.

 

It would be great to throw an Exception when the csv file doesn't have the same 
structure as defined in the source.

It can be easilly checked with file header if it exists.

Is this possible ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9859) Distinguish TM akka config with JM config

2018-07-16 Thread JIRA
陈梓立 created FLINK-9859:
--

 Summary: Distinguish TM akka config with JM config
 Key: FLINK-9859
 URL: https://issues.apache.org/jira/browse/FLINK-9859
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 1.5.1
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.5.2


... increase the number of akka threads on JM, to improve its performance; 
decrease the number of akka threads on TM, to save resource.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9869) Send PartitionInfo in batch to Improve perfornance

2018-07-16 Thread JIRA
陈梓立 created FLINK-9869:
--

 Summary: Send PartitionInfo in batch to Improve perfornance
 Key: FLINK-9869
 URL: https://issues.apache.org/jira/browse/FLINK-9869
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 1.5.1
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.5.2


... current we send partition info as soon as one arrive. we could 
`cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve 
performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9875) Add concurrent creation of execution job vertex

2018-07-17 Thread JIRA
陈梓立 created FLINK-9875:
--

 Summary: Add concurrent creation of execution job vertex
 Key: FLINK-9875
 URL: https://issues.apache.org/jira/browse/FLINK-9875
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 1.5.1
Reporter: 陈梓立
Assignee: 陈梓立


in some case like inputformat vertex, creation of execution job vertex is time
consuming, this pr add concurrent creation of execution job vertex to 
accelerate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9894) Potential Data Race

2018-07-19 Thread JIRA
陈梓立 created FLINK-9894:
--

 Summary: Potential Data Race
 Key: FLINK-9894
 URL: https://issues.apache.org/jira/browse/FLINK-9894
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 1.5.1
Reporter: 陈梓立
Assignee: 陈梓立


CoLocationGroup#ensureConstraints



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10038) Parallel the creation of InputSplit if necessary

2018-08-02 Thread JIRA
陈梓立 created FLINK-10038:
---

 Summary: Parallel the creation of InputSplit if necessary
 Key: FLINK-10038
 URL: https://issues.apache.org/jira/browse/FLINK-10038
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.5.0
Reporter: 陈梓立


As a continue to the discussion in the PR about parallelize the creation of 
ExecutionJobVertex [here|https://github.com/apache/flink/pull/6353].

[~StephanEwen] suggested that we could parallelize the creation of InputSplit, 
from which we gain performance improvements.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10056) Add testRequestNextInputSplit

2018-08-04 Thread JIRA
陈梓立 created FLINK-10056:
---

 Summary: Add testRequestNextInputSplit
 Key: FLINK-10056
 URL: https://issues.apache.org/jira/browse/FLINK-10056
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, Tests
Affects Versions: 1.5.0
Reporter: 陈梓立
Assignee: 陈梓立


Add testRequestNextInputSplit to make sure JobMaster#testRequestNextInputSplit 
works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10099) Rework YarnResourceManagerTest

2018-08-07 Thread JIRA
陈梓立 created FLINK-10099:
---

 Summary: Rework YarnResourceManagerTest
 Key: FLINK-10099
 URL: https://issues.apache.org/jira/browse/FLINK-10099
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.6.0
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.6.0


To avoid OOM and gather mock stuff for replacing them once we could.

also structure the test.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10256) Rework JobManagerFailsITCase and JobManagerTest into JobMasterITCase and JobMasterHAITCase

2018-08-29 Thread JIRA
陈梓立 created FLINK-10256:
---

 Summary: Rework JobManagerFailsITCase and JobManagerTest into 
JobMasterITCase and JobMasterHAITCase
 Key: FLINK-10256
 URL: https://issues.apache.org/jira/browse/FLINK-10256
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.7.0
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.7.0


I am planning to rework JobManagerFailsITCase and JobManagerTest into 
JobMasterITCase and JobMasterHAITCase. That is, reorganize the legacy tests, 
make them neat and cover cases explicitly. The PR would follow before this 
weekend.

While reworking, I'd like to add more jm failover test cases list below, for 
the further implement of jm failover with RECONCILING state. For "jm failover", 
I mean a real world failover(like low power or process exit), without calling 
Flink internal postStop logic or something like it.

1. Streaming task with jm failover.
2. Streaming task with jm failover concurrent to task fail.
3. Batch task with jm failover.
4. Batch task with jm failover concurrent to task fail.
5. Batch task with jm failover when some vertex has already been FINISHED.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10275) StreamTask support object reuse

2018-08-31 Thread JIRA
陈梓立 created FLINK-10275:
---

 Summary: StreamTask support object reuse
 Key: FLINK-10275
 URL: https://issues.apache.org/jira/browse/FLINK-10275
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.7.0
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.7.0


StreamTask support efficient object reuse. The purpose behind this is to reduce 
pressure on the garbage collector.

All objects are reused, without backup copies. The operators and UDFs must be 
careful to not keep any objects as state or not to modify the objects.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10302) Mark legacy(non-flip 6) code as Deprecated

2018-09-07 Thread JIRA
陈梓立 created FLINK-10302:
---

 Summary: Mark legacy(non-flip 6) code as Deprecated
 Key: FLINK-10302
 URL: https://issues.apache.org/jira/browse/FLINK-10302
 Project: Flink
  Issue Type: Improvement
Reporter: 陈梓立


There are several time I dash into some classes/methods, finding them weird 
from the FLIP-6 codebase and finally figure out that they are legacy codes.

Currently we mix up legacy code with FLIP-6 code in same place(i.e. some 
package), new contributor might casually lost into such code and result in 
works in vain.

With [FLINK-4319] closed we announced that FLIP-6 is production ready, and 
[~trohrm...@apache.org] comments on this 
[commits|https://github.com/apache/flink/commit/ddd6a99a95b56c52ea5b5153b7270b578f5479bc#commitcomment-30330739]
 shows that it is planned to remove legacy code.

I'd prefer to marking all legacy class as Deprecated for now thus our 
contributors could recognize them quickly and do not ruin they work in vain.

What do you think? cc [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10304) Remove deprecated AbstractYarnClusterDescriptor field

2018-09-07 Thread JIRA
陈梓立 created FLINK-10304:
---

 Summary: Remove deprecated AbstractYarnClusterDescriptor field
 Key: FLINK-10304
 URL: https://issues.apache.org/jira/browse/FLINK-10304
 Project: Flink
  Issue Type: Improvement
  Components: Client, YARN
Affects Versions: 1.7.0
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.7.0


Depend on [~trohrm...@apache.org]'s 
[commit|https://github.com/apache/flink/commit/6356128865bff7463bf03185d18b129ed3633bc2],
 {{AbstractYarnClusterDescriptor}} should not care whether it is in DETACHED 
mode.

After digging I found the main usages of it are

1. {{FlinkYarnSessionCli#run}}, this can be resolved by checking whether 
{{allOptions}} has {{DETACHED_OPTION}} locally.

2. when AbstractYarnClusterDescriptor start a AM, it sets 
{{appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));}}. 
At this point it seems that YarnClusterDescriptor should know whether or not it 
is in detached mode.

If usage 2 is irrelevant now, we can get rid of deprecated method in FLIP-6 
codebase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10315) Let JDBCAppendTableSink be built with java.sql.Connection

2018-09-10 Thread JIRA
François Lacombe created FLINK-10315:


 Summary: Let JDBCAppendTableSink be built with java.sql.Connection
 Key: FLINK-10315
 URL: https://issues.apache.org/jira/browse/FLINK-10315
 Project: Flink
  Issue Type: Improvement
  Components: Java API
 Environment: I'm currently using Flink 1.6.0 Java.
Reporter: François Lacombe


Currently, JDBCAppendTableSink is built with methods like setDBUrl, 
setUsername, setPassword... and so on.
We can't use an existing Java SQL connection to build it.

It may be great to add a setConnection() method to the builder class as to 
prevent sensitive data like username or password to transit through large 
stacks from config connectors (often in main()) to JDBC sinks.

To be able to provide only one object is far lighter than 4 or 5 strings

 

Thanks in advance



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10319) Avoid requestPartitionState from JM but always try retrigger

2018-09-11 Thread JIRA
陈梓立 created FLINK-10319:
---

 Summary: Avoid requestPartitionState from JM but always try 
retrigger
 Key: FLINK-10319
 URL: https://issues.apache.org/jira/browse/FLINK-10319
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.7.0
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.7.0


Do not requestPartitionState from JM on partition request fail, which may 
generate too many RPC requests and block JM.

We gain little benefit to check what state producer is in, which in the other 
hand crash JM by too many RPC requests. Task could always 
retriggerPartitionRequest from its InputGate, it would be fail if the producer 
has gone and succeed if the producer alive. Anyway, no need to ask for JM for 
help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10320) Introduce JobMaster schedule micro-benchmark

2018-09-11 Thread JIRA
陈梓立 created FLINK-10320:
---

 Summary: Introduce JobMaster schedule micro-benchmark
 Key: FLINK-10320
 URL: https://issues.apache.org/jira/browse/FLINK-10320
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: 陈梓立
Assignee: 陈梓立


Based on {{org.apache.flink.streaming.runtime.io.benchmark}} stuff and the repo 
[flink-benchmark|https://github.com/dataArtisans/flink-benchmarks], I proposal 
to introduce another micro-benchmark which focuses on {{JobMaster}} schedule 
performance

h3. Target
Benchmark how long from {{JobMaster}} startup(receive the {{JobGraph}} and 
init) to all tasks RUNNING. Technically we use bounded stream and TM finishes 
tasks as soon as they arrived. So the real interval we measure is to all tasks 
FINISHED.

h3. Case
1. JobGraph that cover EAGER + PIPELINED edges
2. JobGraph that cover LAZY_FROM_SOURCES + PIPELINED edges
3. JobGraph that cover LAZY_FROM_SOURCES + BLOCKING edges
ps: maybe benchmark if the source is get from {{InputSplit}}?

h3. Implement
Based on the flink-benchmark repo, we finally run benchmark using jmh. So the 
whole test suit is separated into two repos. The testing environment could be 
located in the main repo, maybe under 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/benchmark.
To measure the performance of {{JobMaster}} scheduling, we need to simulate an 
environment that:
1. has a real {{JobMaster}}
2. has a mock/testing {{ResourceManager}} that having infinite resource and 
react immediately.
3. has a(many?) mock/testing {{TaskExecutor}} that deploy and finish tasks 
immediately.

[~trohrm...@apache.org] [~GJL] [~pnowojski] could you please review this 
proposal to help clarify the goal and concrete details? Thanks in advance.

Any suggestions are welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10349) Unify stopActor* utils

2018-09-15 Thread JIRA
陈梓立 created FLINK-10349:
---

 Summary: Unify stopActor* utils
 Key: FLINK-10349
 URL: https://issues.apache.org/jira/browse/FLINK-10349
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.7.0
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.7.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10378) Hide/Comment out contribute guide from PULL_REQUEST_TEMPLATE

2018-09-20 Thread JIRA
陈梓立 created FLINK-10378:
---

 Summary: Hide/Comment out contribute guide from 
PULL_REQUEST_TEMPLATE
 Key: FLINK-10378
 URL: https://issues.apache.org/jira/browse/FLINK-10378
 Project: Flink
  Issue Type: Improvement
  Components: GitHub
Reporter: 陈梓立
Assignee: 陈梓立


Explicitly comment out contribute guide from PULL_REQUEST_TEMPLATE by .

This is a hint to contributor that such message is as information and would not 
appear at the final content, as a side effect also reduce the work the a 
contributor delete such text every time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10385) Implement a waitUntilCondition utils

2018-09-20 Thread JIRA
陈梓立 created FLINK-10385:
---

 Summary: Implement a waitUntilCondition utils
 Key: FLINK-10385
 URL: https://issues.apache.org/jira/browse/FLINK-10385
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.7.0
Reporter: 陈梓立
 Fix For: 1.7.0


Recently when I refine some tests, I notice that it is a common requirement to 
wait until a (stable) condition occur.

To achieve this, we have {{ExecutionGraphTestUtils#waitUntilJobStatus}} and 
many. Most of them can simply abstract as
{code:java}
public static void waitUntilCondition(SupplierWithException 
conditionSupplier, Deadline deadline) {
  while (deadline.hasTimeLeft()) {
if (conditionSupplier.get()) { return; }
Thread.sleep(Math.min(deadline.toMillis(), 500);
  }
  throws new IlleagalStateException("...");
}
{code}
 

I propose to implement such a method to avoid too many utils method scattered 
to achieve the same purpose.
 Looking forward to your advice. If there is previous code/project already 
implemented this, I am glad to introduce it.

cc [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10386) Remove legacy class TaskExecutionStateListener

2018-09-20 Thread JIRA
陈梓立 created FLINK-10386:
---

 Summary: Remove legacy class TaskExecutionStateListener
 Key: FLINK-10386
 URL: https://issues.apache.org/jira/browse/FLINK-10386
 Project: Flink
  Issue Type: Improvement
  Components: TaskManager
Affects Versions: 1.7.0
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.7.0


After a discussion 
[here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
 with [~trohrm...@apache.org]. I start to analyze the usage of 
{{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.

In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
component rely on it. Instead, we introduce {{TaskManagerActions}} to take the 
role for the communication of {{Task}} with {{TaskManager}}. No one except 
{{TaskManager}} should directly communicate with {{Task}}. So it can be safely 
remove legacy class {{TaskExecutionStateListener}}.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10424) Inconsistency between JsonSchemaConveerter and FlinkTypeFactory

2018-09-25 Thread JIRA
Dominik Wosiński created FLINK-10424:


 Summary: Inconsistency between JsonSchemaConveerter and 
FlinkTypeFactory
 Key: FLINK-10424
 URL: https://issues.apache.org/jira/browse/FLINK-10424
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.6.0
Reporter: Dominik Wosiński
Assignee: Dominik Wosiński


There is still an inconsistency between _JsonSchemaConverter_ and 
_FlinkTypeFactory_ in case of using JsonSchema with _integer_ type field. 
_JsonSchemaConverter_ will return BigInteger Type Information for _integer_, 
but _FlinkTypeFactory_ currently does not support BigInteger Type Info and thus 
an exception will be thrown. 

Two possible ways of solving this issue are possible:
  - allow using _BigInteger_ Type Info in _FlinkTypeFactory_

  _-_ change _JsonSchemaConverter,_ so it returns Integer Type Info instead.


IMHO, the changes should be made in _FlinkTypeFactory._



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   5   6   7   8   9   10   >