[jira] [Created] (FLINK-27316) Prevent users from changing bucket number

2022-04-20 Thread Jane Chan (Jira)
Jane Chan created FLINK-27316:
-

 Summary: Prevent users from changing bucket number
 Key: FLINK-27316
 URL: https://issues.apache.org/jira/browse/FLINK-27316
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.1.0
Reporter: Jane Chan
 Fix For: table-store-0.1.0


Before we support this feature, we should throw a meaningful exception to 
prevent data corruption which is caused by
{code:sql}
 ALTER TABLE ... SET ('bucket' = '...');
 ALTER TABLE ... RESET ('bucket');{code}
 
h3. How to reproduce
{code:sql}

-- Suppose we defined a managed table like
CREATE TABLE IF NOT EXISTS managed_table (
  f0 INT,
  f1 STRING) WITH (
'path' = '...'
'bucket' = '3');

-- then write some data
INSERT INTO managed_table
VALUES (1, 'Sense and Sensibility),
(2, 'Pride and Prejudice), 
(3, 'Emma'), 
(4, 'Mansfield Park'), 
(5, 'Northanger Abbey'),
(6, 'The Mad Woman in the Attic'),
(7, 'Little Woman');

-- change bucket number
ALTER TABLE managed_table SET ('bucket' = '5');

-- write some data again
INSERT INTO managed_table 
VALUES (1, 'Sense and Sensibility'), 
(2, 'Pride and Prejudice'), 
(3, 'Emma'), 
(4, 'Mansfield Park'), 
(5, 'Northanger Abbey'), 
(6, 'The Mad Woman in the Attic'), 
(7, 'Little Woman'), 
(8, 'Jane Eyre');

-- change bucket number again
ALTER TABLE managed_table SET ('bucket' = '1')

-- then write some record with '-D' as changelog mode
-- E.g. changelogRow("-D", 7, "Little Woman"), changelogRow("-D", 2, "Pride and 
Prejudice"), changelogRow("-D", 3, "Emma"), changelogRow("-D", 4, "Mansfield 
Park"), changelogRow("-D", 5, "Northanger Abbey"), changelogRow("-D", 6, "The 
Mad Woman in the Attic"), changelogRow("-D", 8, "Jane Eyre"), 
changelogRow("-D", 1, "Sense and Sensibility"), changelogRow("-D", 1, "Sense 
and Sensibility") CREATE TABLE helper_source (
  f0 INT, 
  f1 STRING) WITH (
  'connector' = 'values', 
  'data-id' = '${register-id}', 
  'bounded' = 'false', 
  'changelog-mode' = 'I,UA,UB,D'
); 

INSERT INTO managed_table SELECT * FROM helper_source;

-- then read the snapshot

SELECT * FROM managed_table
{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27317) Snapshot deployment fails due to .scalafmt.conf not being found

2022-04-20 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27317:


 Summary: Snapshot deployment fails due to .scalafmt.conf not being 
found
 Key: FLINK-27317
 URL: https://issues.apache.org/jira/browse/FLINK-27317
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System
Affects Versions: 1.16.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34852&view=logs&j=eca6b3a6-1600-56cc-916a-c549b3cde3ff&t=e9844b5e-5aa3-546b-6c3e-5395c7c0cac7

Cause by the maven-source-plugin jar goal forking the build and apparently 
messing up the working directory.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: Discuss making KafkaSubscriber Public

2022-04-20 Thread Mason Chen
Hi all,

Just following up on this thread. The subject header may have been
misleading--I was proposing to make KafkaSubscriber @PublicEvolving and
expose a setter to pass a custom implementation. It seems logical since the
KafkaSource is also @PublicEvolving and this lets the user know that the
interface may be subject to change if requirements change in the future.
Anyone have any opinions? Thanks!

Best,
Mason

On Wed, Apr 13, 2022 at 10:07 AM Mason Chen  wrote:

> Hi Chesnay,
>
> Typically, users want to plug in a KafkaSubscriber that depends on an
> external system [1][2]. We could also provide a higher level interface that
> doesn’t depend on the Kafka Admin Client, but I think it would be more
> flexible to be able to re-use the one created by the enumerator if needed.
> If we don't want to expose the Kafka Admin Client and if users want to
> apply some complex filter, then we can also provide a pluggable interface
> used in a similar implementation to that of the subscriber used for topic
> pattern and allow users to filter topics after the Kafka API response.
>
> [1] https://www.mail-archive.com/user@flink.apache.org/msg44340.html
> [2] https://www.mail-archive.com/dev@flink.apache.org/msg52007.html
>
> Best,
> Mason
>
>
> On Wed, Apr 13, 2022 at 6:32 AM Chesnay Schepler 
> wrote:
>
>> Could you expand a bit on possible alternative implementations that
>> require this interface to become public, opposed to providing more
>> built-in ways to subscribe?
>>
>> On 13/04/2022 11:26, Qingsheng Ren wrote:
>> > Thanks for the proposal Mason! I think exposing `KafkaSubscriber` as
>> public API is helpful for users to implement more complex subscription
>> logics.
>> >
>> > +1 (non-binding)
>> >
>> > Cheers,
>> >
>> > Qingsheng
>> >
>> >> On Apr 12, 2022, at 11:46, Mason Chen  wrote:
>> >>
>> >> Hi Flink Devs,
>> >>
>> >> I was looking to contribute to
>> https://issues.apache.org/jira/browse/FLINK-24660, which is a ticket to
>> track changing the KafkaSubscriber from Internal to PublicEvolving.
>> >>
>> >> In the PR, it seems a few of us have agreement on making the
>> subscriber pluggable in the KafkaSource, but I'd like to raise the question
>> nevertheless. Furthermore, there is also interest from various Flink
>> mailing threads and on the Jira ticket itself for the ticket, so I think
>> the change would be beneficial to the users. There is already some feedback
>> to make the contract of handling removed splits by the KafkaSource and
>> subscriber clearer in the docs.
>> >>
>> >> I have yet to address all the PR feedback, but does anyone have any
>> concerns before I proceed further?
>> >>
>> >> Best,
>> >> Mason
>>
>>
>>


[jira] [Created] (FLINK-27318) KafkaSink: when init transaction, it take too long to get a producerId with epoch=0

2022-04-20 Thread Zhengqi Zhang (Jira)
Zhengqi Zhang created FLINK-27318:
-

 Summary: KafkaSink: when init transaction, it take too long to get 
a producerId with epoch=0
 Key: FLINK-27318
 URL: https://issues.apache.org/jira/browse/FLINK-27318
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.14.4
Reporter: Zhengqi Zhang
 Attachments: image-2022-04-20-17-34-48-207.png

as we can see, the new KafkaSink aborts all transactions that have been created 
by a subtask in a previous run, only return when get a producerId was unused 
before(epoch=0). But this can take a long time, especially if the task has been 
started and cancelled many times before. In my tests, it even took {*}10 
minutes{*}. Is there a better way to solve this problem, or {*}do what 
FlinkKafkaProducer did{*}.

!image-2022-04-20-17-34-48-207.png|width=556,height=412!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27319) Duplicated "-t" option for savepoint format and deployment target

2022-04-20 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-27319:


 Summary: Duplicated "-t" option for savepoint format and 
deployment target
 Key: FLINK-27319
 URL: https://issues.apache.org/jira/browse/FLINK-27319
 Project: Flink
  Issue Type: Bug
  Components: Command Line Client
Affects Versions: 1.15.0
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.15.0


The two options savepoint format and deployment target have the same short 
option which causes a clash and the CLI to fail.

I suggest to drop the short "-t" for savepoint format.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27320) When using KafkaSink EXACTLY_ONCE semantics, frequent OutOfOrderSequenceException anomalies

2022-04-20 Thread Zhengqi Zhang (Jira)
Zhengqi Zhang created FLINK-27320:
-

 Summary: When using KafkaSink EXACTLY_ONCE semantics, frequent 
OutOfOrderSequenceException anomalies
 Key: FLINK-27320
 URL: https://issues.apache.org/jira/browse/FLINK-27320
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.4
Reporter: Zhengqi Zhang
 Attachments: image-2022-04-20-17-48-37-149.png, 
image-2022-04-20-17-49-15-143.png

This problem does not occur when using EXACTLY_ONCE semantics in 
FlinkKafkaProducer, but occurs frequently when using KafkaSink.

!image-2022-04-20-17-48-37-149.png|width=573,height=220!

!image-2022-04-20-17-49-15-143.png|width=818,height=469!

This is ProducerConfig when using KafkaSink:
{code:java}
    acks = 1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = 
    compression.type = none
    connections.max.idle.ms = 54
    delivery.timeout.ms = 12
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 6
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 30
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 3
    partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 3
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 6
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 30
    transactional.id = kafka-sink-0-36
    value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer{code}
This is ProducerConfig when using FlinkKafkaProducer:

 
{code:java}
    acks = 1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = 
    compression.type = none
    connections.max.idle.ms = 54
    delivery.timeout.ms = 12
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 6
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 30
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 3
    partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 3
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 6
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm

[jira] [Created] (FLINK-27321) [JUnit5 Migration] Module: java-ci-tools

2022-04-20 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27321:


 Summary: [JUnit5 Migration] Module: java-ci-tools
 Key: FLINK-27321
 URL: https://issues.apache.org/jira/browse/FLINK-27321
 Project: Flink
  Issue Type: Sub-task
  Components: Build System / CI
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27322) Add license headers and spotless checks for them

2022-04-20 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-27322:
---

 Summary: Add license headers and spotless checks for them
 Key: FLINK-27322
 URL: https://issues.apache.org/jira/browse/FLINK-27322
 Project: Flink
  Issue Type: Bug
  Components: Documentation / Training / Exercises
Affects Versions: 1.14.4
Reporter: Nico Kruber
Assignee: Nico Kruber


It looks as if there are a couple of files that are missing their appropriate 
license headers, e.g. 
https://github.com/apache/flink-training/blob/0b1c83b16065484200564402bef2ca10ef19cb30/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/RideAndFare.java

We should fix that by:
# adding the missing license headers
# adding spotless checks to ensure this doesn't happen again

Potential downside: if a user doing the training exercises creates files on 
their own, these would need the license header as well. On the other hand, a 
simple `./gradlew spotlessApply` can fix that easily



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[VOTE] FLIP-214: Support Advanced Function DDL

2022-04-20 Thread 刘大龙
Hi, everyone




I'd like to start a vote on FLIP-214: Support Advanced Function DDL [1] which 
has been discussed in [2].

The vote will be open for at least 72 hours unless there is an objection or not 
enough votes.




[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-214+Support+Advanced+Function+DDL

[2] https://lists.apache.org/thread/7m5md150qgodgz1wllp5plx15j1nowx8




Best,

Ron

[jira] [Created] (FLINK-27323) [JUnit5 Migration] Module: flink-table-api-java

2022-04-20 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-27323:
---

 Summary: [JUnit5 Migration] Module: flink-table-api-java
 Key: FLINK-27323
 URL: https://issues.apache.org/jira/browse/FLINK-27323
 Project: Flink
  Issue Type: Sub-task
Reporter: Sergey Nuyanzin






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27324) [JUnit5 Migration] Module: flink-rpc-akka-loader

2022-04-20 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27324:


 Summary: [JUnit5 Migration] Module: flink-rpc-akka-loader
 Key: FLINK-27324
 URL: https://issues.apache.org/jira/browse/FLINK-27324
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27325) Remove unnecessary forkCount settings

2022-04-20 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27325:


 Summary: Remove unnecessary forkCount settings
 Key: FLINK-27325
 URL: https://issues.apache.org/jira/browse/FLINK-27325
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System, Connectors / ElasticSearch, Connectors / 
HBase, Connectors / Kafka, Connectors / Pulsar, Deployment / Kubernetes
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0


Several modules configure an explicit forkCount of 1. As far as I can tell this 
is simply unnecessary.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: Alertmanager Sink Connector

2022-04-20 Thread Konstantin Knauf
cc user@, bcc dev@

Hi Dhruv,

Yes, this should be totally possible.

For 1, I would use a ProcessFunction to buffer the alerts and register
timers per alert for the repeated firing (or to clear it from state if it
is resolved). From this ProcessFunction you send records to an
AlertManagerSink.

For 2. the AsyncSink, Fabian Paul (cc) might be the best person to judge if
it is a good fit. There is PR [1] to add a blog post about the Async Sink
that might be of help for you in the meantime.

Cheers,

Konstantin

[1] https://github.com/apache/flink-web/pull/517


On Thu, Apr 14, 2022 at 9:43 AM Dhruv Patel 
wrote:

> Hi,
>We have a use case where we want to send alerts to Prometheus
> Alertmanager (https://prometheus.io/docs/alerting/latest/alertmanager/)
> from Flink. Each of these alerts have a startsAt, endsAt field along with
> alertMetadata. Alertmanager expects clients to send all the FIRING alerts
> every X minutes (X is configurable) with an updated endsAt time (greater
> than X since alertmanager would drop the alert once endsAt time is reached)
> and once the alert is in RESOLVED state stop sending it. The state updates
> of the alerts would come from Kafka. So the pipeline is something like this
> State Updates to Alerts (Kafka) -> Flink (Some Enrichment) -> Alertmanager
> Sink
>  They provide a rest endpoint where we can post these alerts. I have some
> questions to see if its achievable to develop a sink connector for
> alertmanager in flink?
>
> 1. Is it possible to send data to a sink every X minutes from a custom sink
> connector since I have to somehow simulate a behavior of continuously
> sending the same alerts even because state updates are only received from
> Kafka for FIRING -> RESOLVED state and not for FIRING -> FIRING state? I
> was thinking of having a state of active alerts and somehow the sink
> connector would get the state every X minutes, batch it and then send it to
> alertmanager. However, I am not able to find resources to write some
> implementation around it.
>
> 2. In Flink 1.15, there are Async Sinks (
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink)
> but not much documentation around. Also don't know if it would be
> achievable to write the continuous firing logic in alertmanager
>
> Any other ideas are welcomed.
>
> --
> *Thanks,*
> *Dhruv*
>


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


[jira] [Created] (FLINK-27326) Remove LocalExecutor#createWithFactory

2022-04-20 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27326:


 Summary: Remove LocalExecutor#createWithFactory
 Key: FLINK-27326
 URL: https://issues.apache.org/jira/browse/FLINK-27326
 Project: Flink
  Issue Type: Technical Debt
  Components: Client / Job Submission
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0


This method is effectively unused.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27327) Add description about changing max parallelism explicitly leads to state incompatibility

2022-04-20 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-27327:


 Summary: Add description about changing max parallelism explicitly 
leads to state incompatibility
 Key: FLINK-27327
 URL: https://issues.apache.org/jira/browse/FLINK-27327
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Configuration
Reporter: Hangxiang Yu
 Fix For: 1.16.0


I think the description "changing maxParallelism/pipline.max-parallelism 
explicitly will lead to state incompatibility" could be added into parallel.md 
and the description of pipline.max-parallelism.

Some users misconfigured the value easily.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27328) Could not resolve ResourceManager address

2022-04-20 Thread Viswanath Shanmugam (Jira)
Viswanath Shanmugam created FLINK-27328:
---

 Summary: Could not resolve ResourceManager address
 Key: FLINK-27328
 URL: https://issues.apache.org/jira/browse/FLINK-27328
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: 1.14.4
 Environment: h3. JobManager

{{apiVersion: v1
kind: Service
metadata:
  name: jobmanager-cs
spec:
  type: NodePort
  ports:
  - name: ui
port: 8081
  selector: 
app: flink
 component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
  name: jobmanager-hs
spec:
  type: ClusterIP
  ports:
- port: 6123
  name: rpc
- port: 6124
  name: blob-server
- port: 6125
  name: query
  selector: 
app: flink
component: jobmanager
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  selector:
matchLabels:
  app: flink
  template:
metadata:
  labels:
app: flink
component: jobmanager
spec:
  restartPolicy: Always
  containers:
- name: jobmanager
  image: flink:1.13.1-scala_2.12
  command: [bash,"-ec",bin/jobmanager.sh start-foreground cluster]
  resources:
limits:
  memory: "2024Mi"
  cpu: "500m"
  env:
  - name: JOB_MANAGER_ID
valueFrom:
  fieldRef:
apiVersion: v1
fieldPath: status.podIP
  - name: POD_IP
valueFrom:
  fieldRef:
apiVersion: v1
fieldPath: status.podIP
  # The following args overwrite the value of jobmanager.rpc.address 
configured in the configuration config map to POD_IP.
  args: ["standalone-job", "--host", "$POD_IP", "--job-classname", 
"org.apache.flink.application.Main"] #, , ]  
optional arguments: ["--job-id", "", "--fromSavepoint", 
"/path/to/savepoint", "--allowNonRestoredState"]
  ports:
- containerPort: 6123
  name: rpc
- containerPort: 6124
  name: blob-server
- containerPort: 6125
  name: query
- containerPort: 8081
  name: webui
  volumeMounts:
- name: flink-config-volume
  mountPath: /opt/flink/conf
- name: job-artifacts-volume
  mountPath: /opt/flink/usrlib
  securityContext:
runAsUser:  
  volumes:
- name: flink-config-volume
  configMap:
name: flink-config
items:
  - key: flink-conf.yaml
path: flink-conf.yaml
  - key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
  hostPath:
path: /config/flink}}
h3. Task Manager

{{apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
matchLabels:
  app: flink
  component: taskmanager
  template:
metadata:
  labels:
app: flink
component: taskmanager
spec:
  containers:
  - name: taskmanager
image: flink:1.13.1-scala_2.12
env:
  - name: K8S_POD_IP
valueFrom:
  fieldRef:
fieldPath: status.podIP
command: ["/bin/sh", "-ec", "sleep 1000"]
resources:
  limits:
memory: "800Mi"
cpu: "2000m"
args: 
["taskmanager","start-foreground","-Dtaskmanager.host=$K8S_POD_IP"]
ports:
- containerPort: 6122
  name: rpc
- containerPort: 6125
  name: query-state
volumeMounts:
- name: flink-config-volume
  mountPath: /opt/flink/conf/
- name: job-artifacts-volume
  mountPath: /opt/flink/usrlib
securityContext:
  runAsUser:   
  volumes:
  - name: flink-config-volume
configMap:
  name: flink-config
  items:
  - key: flink-conf.yaml
path: flink-conf.yaml
  - key: log4j-console.properties
path: log4j-console.properties
  - name: job-artifacts-volume
hostPath:
  path: /config/flink}}
h3. ConfigMap

{{apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
app: flink
data:
  flink-conf.yaml: |+
jobmanager.rpc.address: jobmanager-hs
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
taskmanager.heap.size: 1024m
jobmanager.heap.size: 1024m
state.backend: filesystem
s3.access-key: k8sdemo
s3.secret-key: k8sdemo123
state.checkpoints.dir: /opt/flink/usrlib/checkpoints
state.savepoints.dir: /opt/flink/usrlib/savepoints
metrics.reporters: prom
metrics.reporter.prom.class: 
org.apache.flink.metrics

[jira] [Created] (FLINK-27329) Add default value of replica of JM pod and remove declaring it in example yamls

2022-04-20 Thread Biao Geng (Jira)
Biao Geng created FLINK-27329:
-

 Summary: Add default value of replica of JM pod and remove 
declaring it in example yamls
 Key: FLINK-27329
 URL: https://issues.apache.org/jira/browse/FLINK-27329
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Biao Geng


Currently, we do not explicitly set the default value of `replica` in 
`JobManagerSpec`. As a result, Java sets the default value to be zero. 
Besides, in our examples, we explicitly declare `replica` in `JobManagerSpec` 
to be 1. 
After a deeper look when debugging the exception thrown in FLINK-27310, we find 
it would be better to set the default value to 1 for `replica` fields and 
remove the declaration in examples due to following reasons:
1. A normal Session or Application cluster should have at least one JM. The 
current default value, zero, does not follow the common case.
2. One JM can work for k8s HA mode as well and if users really want to launch a 
standby JM for faster recorvery, they can declare the `replica` field in the 
yaml file. In examples, we just use the new default valu(i.e. 1) should be fine.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27330) Why Read File Stream Function Deprecated?

2022-04-20 Thread Karthik (Jira)
Karthik created FLINK-27330:
---

 Summary: Why Read File Stream Function Deprecated?
 Key: FLINK-27330
 URL: https://issues.apache.org/jira/browse/FLINK-27330
 Project: Flink
  Issue Type: Technical Debt
  Components: API / DataStream
Affects Versions: 1.11.6
Reporter: Karthik


 env.readFileStream() is used in previous version of flink.

But I think after 1.11.6 it is deprecated. may I know why?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-220: Temporal State

2022-04-20 Thread Yun Tang
Hi Nico,

Thanks for your clarification.
For the discussion about generalizing Temporal state to sorted map state. I 
could give some examples of how to use sorted map state in min/max with retract 
functions.

As you know, NavigableMap in java has several APIs like:

Map.Entry firstEntry();
Map.Entry lastEntry();
NavigableMap tailMap(K fromKey, boolean inclusive)

The #firstEntry API could be used in MinWithRetractAggFunction#updateMin, 
#lastEntry could be used in MaxWithRetractAggFunction#updateMax, and #tailMap 
could be used in FirstValueWithRetractAggFunction#retract.
If we can introduce SortedMap-like state, these functions could be benefited. 
BTW, I prefer to introduce another state descriptor instead of current map 
state descriptor.

For the API of SortedMapOfListsState, I think this is a bit bounded to current 
implementation of RocksDB state-backend.

For the discussion of ChangelogStateBackend, you can think of changelog 
state-backend as a write-ahead-log service. And we need to record the changes 
to any state, thus this should be included in the design doc as we need to 
introduce another kind of state, especially you might need to consider how to 
store key bytes serialized by the new serializer (as we might not be able to 
write the length in the beginning of serialized bytes to make the order of 
bytes same as natural order).

Best
Yun Tang.

From: Nico Kruber 
Sent: Wednesday, April 20, 2022 0:38
To: dev 
Cc: Yun Tang ; David Morávek 
Subject: Re: [DISCUSS] FLIP-220: Temporal State

Hi all,
I have read the discussion points from the last emails and would like to add
my two cents on what I believe are the remaining points to solve:

1. Do we need a TemporalValueState?

I guess, this all boils down to dealing with duplicates / values for the same
timestamp. Either you always have to account for them and thus always have to
store a list anyway, or you only need to join with "the latest" (or the only)
value for a given timestamp and get a nicer API and lower overhead for that
use case.
At the easiest, you can make an assumption that there is only a single value
for each timestamp by contract, e.g. by increasing the timestamp precision and
interpreting them as nanoseconds, or maybe milliseconds are already good
enough. If that contract breaks, however, you will get into undefined
behaviour.
The TemporalRowTimeJoinOperator, for example, currently just assumes that
there is only a single value on the right side of the join (rightState) and I
believe many use cases can make that assumption or otherwise you'd have to
define the expected behaviour for multiple values at the same timestamp, e.g.
"join with the most recent value at the time of the left side and if there are
multiple values, choose X".

I lean towards having a ValueState implementation as well (in addition to
lists).


2. User-facing API (Iterators vs. valueAtOr[Before|After])

I like the iterable-based APIs that David M was proposing, i.e.
- Iterable> readRange(long minTimestamp, long
limitTimestamp);
- void clearRange(long minTimestamp, long limitTimestamp);

However, I find Iterables rather cumbersome to work with if you actually only
need a single value, e.g. the most recent one.
For iterating over a range of values, however, they feel more natural to me
than our proposal.

Actually, if we generalise the key type (see below), we may also need to offer
additional value[Before|After] functions to cover "+1" iterations where we
cannot simply add 1 as we do now.

(a) How about offering both Iterables and value[AtOrBefore|AtOrAfter|Before|
After]?
This would be similar to what NavigableMap [2] is offering but with a more
explicit API than "ceiling", "floor",...

(b) Our API proposal currently also allows iterating backwards which is not
covered by the readRange proposal - we could, however, just do that if
minTimestamp > limitTimestamp). What do you think?

(c) When implementing the iterators, I actually also see two different modes
which may differ in performance: I call them iteration with eager vs. lazy
value retrieval. Eager retrieval may retrieve all values in a range at once
and make them available in memory, e.g. for smaller data sets similar to what
TemporalRowTimeJoinOperator is doing for the right side of the join. This can
be spare a lot of Java<->JNI calls and let RocksDB iterate only once (as long
at things fit into memory). Lazy retrieval would fetch results one-by-one.
-> We could set one as default and allow the user to override that behaviour.


3. Should we generalise the Temporal***State to offer arbitrary key types and
not just Long timestamps?

@Yun Tang: can you describe in more detail where you think this would be
needed for SQL users? I don't quite get how this would be beneficial. The
example you linked doesn't quite show the same behaviour.

Other than this, I could see that you can leverage such a generalisation for
arbitrary joins between, for example, IDs and I

[jira] [Created] (FLINK-27331) Support Avro microsecond precision for TIME

2022-04-20 Thread Marios Trivyzas (Jira)
Marios Trivyzas created FLINK-27331:
---

 Summary: Support Avro microsecond precision for TIME
 Key: FLINK-27331
 URL: https://issues.apache.org/jira/browse/FLINK-27331
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Marios Trivyzas






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27332) org.apache.flink.streaming.api.functions.sink.filesystem.Bucket can't receive completion notification for savepoint

2022-04-20 Thread Darcy Lin (Jira)
Darcy Lin created FLINK-27332:
-

 Summary: 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket can't receive 
completion notification for savepoint
 Key: FLINK-27332
 URL: https://issues.apache.org/jira/browse/FLINK-27332
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream, Connectors / FileSystem
Affects Versions: 1.15.0
Reporter: Darcy Lin


2022-04-20 17:28:03,525 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=225 (max part counter=15).
 
2022-04-20 17:38:06,933 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=226 (max part counter=15).
2022-04-20 17:38:08,228 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
received completion notification for checkpoint with id=226.
 

As shown in the above log, checkpoint 225 is triggered by savepoint, there is 
no log about "checkpoint completed", and checkpoint 226 is a normal checkpoint 
and everything works fine. The impact is that savepoint cannot modify pending 
files into finished.
 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27333) Upgrade flink-filesystems hadoop version to version 3.3.2

2022-04-20 Thread Chinmay Sumant (Jira)
Chinmay Sumant created FLINK-27333:
--

 Summary: Upgrade flink-filesystems hadoop version to version 3.3.2
 Key: FLINK-27333
 URL: https://issues.apache.org/jira/browse/FLINK-27333
 Project: Flink
  Issue Type: Improvement
  Components: FileSystems
Affects Versions: 1.14.3
Reporter: Chinmay Sumant


We have a security requirement to client side encrypt flink state for certain 
flink applications that process sensitive data.

Currently, there is no feature that supports this out of the box on AWS S3 
backend. 

We found that one way to do it is to use flink-s3-fs-hadoop compiled against 
hadoop 3.3.2 for checkpoints as hadoop 3.3.2 provides out of the box AWS client 
side encryption using AWS KMS keys before writing the data to S3. 

We were able to change the flink-filesystems shaded hadoop version from 
existing 3.2.2 version to version 3.3.2 and compile with minimal code changes. 
The resultant flink-s3-fs-hadoop jar was used in the checkpoint plugin path for 
our flink jobs and worked well for checkpoints/savepoints upto 250 GB each with 
client side encryption using AWS KMS.

Filing this Jira to request to take these changes upstream and also to check if 
there are concerns with changing the hadoop version that may affect any other 
components since our observations have been limited to plugin jar and 
checkpoints using flink-s3-fs-hadoop filesystem. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Upgrade flink-filesystems hadoop version to version 3.3.2

2022-04-20 Thread Chinmay Sumant
Hello,

We have a security requirement to client side encrypt flink state for certain 
flink applications that process sensitive data. Currently, there is no feature 
that supports this out of the box on AWS S3 backend.

One way to do it is to use flink-s3-fs-hadoop compiled against hadoop 3.3.2 for 
checkpoints as hadoop 3.3.2 provides out of the box AWS client side encryption 
using AWS KMS.

I have filed a JIRA requesting the shaded Hadoop version upgrade in 
flink-filesystems module: https://issues.apache.org/jira/browse/FLINK-27333 
 

We were able to experiment with an internal build on our flink jobs and saw no 
issues in checkpoints in flink 1.14.3 after the Hadoop version change. Wanted 
to check with the community if there are any concerns in moving ahead with this 
version upgrade. If there are no concerns, the JIRA can be assigned to me to 
try to take the changes to the main flink repo. 

Thanks,
Chinmay

Re: Upgrade flink-filesystems hadoop version to version 3.3.2

2022-04-20 Thread Chinmay Sumant
Hello,

With regards to my earlier email and JIRA request : 
https://issues.apache.org/jira/browse/FLINK-27333 
 ,

I have opened a PR for the same: https://github.com/apache/flink/pull/19540 
 

Thanks,
Chinmay

> On Apr 20, 2022, at 1:36 PM, Chinmay Sumant  wrote:
> 
> Hello,
> 
> We have a security requirement to client side encrypt flink state for certain 
> flink applications that process sensitive data. Currently, there is no 
> feature that supports this out of the box on AWS S3 backend.
> 
> One way to do it is to use flink-s3-fs-hadoop compiled against hadoop 3.3.2 
> for checkpoints as hadoop 3.3.2 provides out of the box AWS client side 
> encryption using AWS KMS.
> 
> I have filed a JIRA requesting the shaded Hadoop version upgrade in 
> flink-filesystems module: https://issues.apache.org/jira/browse/FLINK-27333 
>  
> 
> We were able to experiment with an internal build on our flink jobs and saw 
> no issues in checkpoints in flink 1.14.3 after the Hadoop version change. 
> Wanted to check with the community if there are any concerns in moving ahead 
> with this version upgrade. If there are no concerns, the JIRA can be assigned 
> to me to try to take the changes to the main flink repo. 
> 
> Thanks,
> Chinmay



Upgrade flink-filesystems hadoop version to version 3.3.2

2022-04-20 Thread Chinmay Sumant
Hello,

We have a security requirement to client side encrypt flink state for certain 
flink applications that process sensitive data. Currently, there is no feature 
that supports this out of the box on AWS S3 backend.

One way to do it is to use flink-s3-fs-hadoop compiled against hadoop 3.3.2 for 
checkpoints as hadoop 3.3.2 provides out of the box AWS client side encryption 
using AWS KMS.

I have filed a JIRA requesting the shaded Hadoop version upgrade in 
flink-filesystems module: https://issues.apache.org/jira/browse/FLINK-27333 
 

We were able to experiment with an internal build on our flink jobs and saw no 
issues in checkpoints in flink 1.14.3 after the Hadoop version change. Wanted 
to check with the community if there are any concerns in moving ahead with this 
version upgrade. If there are no concerns, the JIRA can be assigned to me to 
try to take the changes to the main flink repo. 

Thanks,
Chinmay

Re: [DISCUSS] FLIP-222: Support full query lifecycle statements in SQL client

2022-04-20 Thread Paul Lam
ping @Timo @Jark @Shengkai

Best,
Paul Lam

> 2022年4月18日 17:12,Paul Lam  写道:
> 
> Hi team,
> 
> I’d like to start a discussion about FLIP-222 [1], which adds query lifecycle 
> statements to SQL client.
> 
> Currently, SQL client supports submitting queries (queries in a broad sense, 
> including DQLs and DMLs) but no further lifecycle statements, like canceling
> a query or triggering a savepoint. That makes SQL users have to rely on 
> CLI or REST API to manage theirs queries. 
> 
> Thus, I propose to introduce the following statements to fill the gap.
> SHOW QUERIES
> STOP QUERY 
> CANCEL QUERY 
> TRIGGER SAVEPOINT 
> DISPOSE SAVEPOINT 
> These statement would align SQL client with CLI, providing the full lifecycle
> management for queries/jobs.
> 
> Please see the FLIP page[1] for more details. Thanks a lot!
> (For reference, the previous discussion thread see [2].)
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-222%3A+Support+full+query+lifecycle+statements+in+SQL+client
>  
> 
> [2] https://lists.apache.org/thread/wr47ng0m2hdybjkrwjlk9ftwg403odqb 
> 
> 
> Best,
> Paul Lam
> 



Re:[VOTE] FLIP-214: Support Advanced Function DDL

2022-04-20 Thread Mang Zhang
+1













--

Best regards,
Mang Zhang





At 2022-04-20 18:28:28, "刘大龙"  wrote:
>Hi, everyone
>
>
>
>
>I'd like to start a vote on FLIP-214: Support Advanced Function DDL [1] which 
>has been discussed in [2].
>
>The vote will be open for at least 72 hours unless there is an objection or 
>not enough votes.
>
>
>
>
>[1] 
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-214+Support+Advanced+Function+DDL
>
>[2] https://lists.apache.org/thread/7m5md150qgodgz1wllp5plx15j1nowx8
>
>
>
>
>Best,
>
>Ron


Re: [VOTE] FLIP-214: Support Advanced Function DDL

2022-04-20 Thread Jark Wu
Thanks for driving this work @Ron,

+1 (binding)

Best,
Jark

On Thu, 21 Apr 2022 at 10:42, Mang Zhang  wrote:

> +1
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Best regards,
> Mang Zhang
>
>
>
>
>
> At 2022-04-20 18:28:28, "刘大龙"  wrote:
> >Hi, everyone
> >
> >
> >
> >
> >I'd like to start a vote on FLIP-214: Support Advanced Function DDL [1]
> which has been discussed in [2].
> >
> >The vote will be open for at least 72 hours unless there is an objection
> or not enough votes.
> >
> >
> >
> >
> >[1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-214+Support+Advanced+Function+DDL
> >
> >[2] https://lists.apache.org/thread/7m5md150qgodgz1wllp5plx15j1nowx8
> >
> >
> >
> >
> >Best,
> >
> >Ron
>


[VOTE] Release 1.15.0, release candidate #4

2022-04-20 Thread Yun Gao

Hi everyone,

Please review and vote on the release candidate #4 for the version 1.15.0, as 
follows:
[ ] +1, Approve the release[ ] -1, Do not approve the release (please provide 
specific comments)

The complete staging area is available for your review, which includes:
* JIRA release notes [1],* the official Apache source release and binary 
convenience releases to be deployed to dist.apache.org [2], 
   which are signed with the key with fingerprint 
CBE82BEFD827B08AFA843977EDBF922A7BC84897 [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "release-1.15.0-rc4" [5],* website pull request listing the 
new release and adding announcement blog post [6].

The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.

Thanks,
Joe, Till and Yun Gao

[1] 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350442
[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.15.0-rc4/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1500/
[5] https://github.com/apache/flink/releases/tag/release-1.15.0-rc4/
[6] https://github.com/apache/flink-web/pull/526




[jira] [Created] (FLINK-27334) Support auto generate the doc for the KubernetesOperatorConfigOptions

2022-04-20 Thread Aitozi (Jira)
Aitozi created FLINK-27334:
--

 Summary: Support auto generate the doc for the 
KubernetesOperatorConfigOptions
 Key: FLINK-27334
 URL: https://issues.apache.org/jira/browse/FLINK-27334
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Aitozi
 Fix For: kubernetes-operator-1.0.0






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27335) Optimize async compaction in MergeTreeWriter

2022-04-20 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-27335:


 Summary: Optimize async compaction in MergeTreeWriter
 Key: FLINK-27335
 URL: https://issues.apache.org/jira/browse/FLINK-27335
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: table-store-0.2.0


Currently Full Compaction may cause the writer to be blocked, which has an 
impact on LogStore latency.

We need to decouple compact and write, compact completely asynchronous.
But too many files will lead to unstable reads, when too many files, Compaction 
processing speed can not keep up with Writer, need to back press Writer.

Stop parameter: num-sorted-run.stop-trigger, default 10



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[DISCUSS] FLIP-91: Support SQL Client Gateway

2022-04-20 Thread Shengkai Fang
Hi, Flink developers.

I want to start a discussion about the FLIP-91: Support Flink SQL
Gateway[1]. Flink SQL Gateway is a service that allows users to submit and
manage their jobs in the online environment with the pluggable endpoints.
The reason why we introduce the Gateway with pluggable endpoints is that
many users have their preferences. For example, the HiveServer2 users
prefer to use the gateway with HiveServer2-style API, which has numerous
tools. However, some filnk-native users may prefer to use the REST API.
Therefore, we propose the SQL Gateway with pluggable endpoint.

In the FLIP, we also propose the REST endpoint, which has the similar
APIs compared to the gateway in the ververica/flink-sql-gateway[2]. At the
last, we discuss how to use the SQL Client to submit the statement to the
Gateway with the REST API.

I am glad that you can give some feedback about FLIP-91.

Best,
Shengkai

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-91%3A+Support+SQL+Client+Gateway
[2] https://github.com/ververica/flink-sql-gateway


[DISCUSS] FLIP-223: Support HiveServer2 Endpoint

2022-04-20 Thread Shengkai Fang
Hi, Flink developers.

I want to start a discussion about the FLIP-223: Support HiveServer2
Endpoint[1]. The Endpoint will implement the thrift interface exposed by
the HiveServer2, and users' BI, CLI and other tools based on the
HiveServer2 can also be seamlessly migrated to the Flink SQL Gateway. After
the FLIP finishes, the users can have almost the same experience in the
Flink SQL Gateway with the HiveServer2 endpoint as in the HiveServer2.


I am glad that you can give some feedback about FLIP-223.

Best,
Shengkai

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-223+Support+HiveServer2+Endpoint