[jira] [Created] (FLINK-35179) add postgres pipeline data sink connector

2024-04-20 Thread melin (Jira)
melin created FLINK-35179:
-

 Summary: add postgres pipeline data sink connector
 Key: FLINK-35179
 URL: https://issues.apache.org/jira/browse/FLINK-35179
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: melin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35180) Instant in row doesn't convert to correct type in python process mode

2024-04-20 Thread Wei Yuan (Jira)
Wei Yuan created FLINK-35180:


 Summary: Instant in row doesn't convert to correct type in python 
process mode
 Key: FLINK-35180
 URL: https://issues.apache.org/jira/browse/FLINK-35180
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Wei Yuan


use 

```

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Types, WatermarkStrategy, Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import StreamTableEnvironment, Schema
from pyflink.datastream.functions import ProcessFunction, MapFunction

# init task env
config = Configuration()
# config.set_string("python.execution-mode", "thread")
config.set_string("python.execution-mode", "process")
config.set_string("python.client.executable", "/root/miniconda3/bin/python3")
config.set_string("python.executable", "/root/miniconda3/bin/python3")

env = StreamExecutionEnvironment.get_execution_environment(config)
table_env = StreamTableEnvironment.create(env)

# create a batch TableEnvironment
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", 
"content")
table_env.create_temporary_view("test_table", table)

result_table = table_env.sql_query("select *, NOW() as dt from test_table")
result_ds = table_env.to_data_stream(result_table)

def test_func(row):
    print(row)
    return row

result_ds.map(test_func)

env.execute()

```

output in process mode:

```

Row(id=1, content='Hi', dt=Instant<1713609386, 27100>)
Row(id=2, content='Hello', dt=Instant<1713609386, 58000>)

```

output in thread mode:

```

Row(id=1, content='Hi', dt=)
Traceback (most recent call last):
  File "/home/disk1/yuanwei/bug.py", line 31, in 
    env.execute()
  File 
"/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
 line 773, in execute
    return 
JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", 
line 1322, in __call__
    return_value = get_return_value(
  File 
"/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", 
line 146, in deco
    return f(*a, **kw)
  File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 
326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
        at scala.concurrent.impl.Cal

[jira] [Created] (FLINK-35181) CLONE - Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Elasticsearch connector

2024-04-20 Thread Zhongqiang Gong (Jira)
Zhongqiang Gong created FLINK-35181:
---

 Summary: CLONE - Bump org.apache.commons:commons-compress from 
1.24.0 to 1.26.1 for Flink Elasticsearch connector
 Key: FLINK-35181
 URL: https://issues.apache.org/jira/browse/FLINK-35181
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors/ RabbitMQ
Reporter: Zhongqiang Gong
Assignee: Danny Cranmer
 Fix For: rabbitmq-3.1.0


Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink 
RabbitMQ connector

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35182) Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Pulsar connector

2024-04-20 Thread Zhongqiang Gong (Jira)
Zhongqiang Gong created FLINK-35182:
---

 Summary: Bump org.apache.commons:commons-compress from 1.24.0 to 
1.26.1 for Flink Pulsar connector
 Key: FLINK-35182
 URL: https://issues.apache.org/jira/browse/FLINK-35182
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Pulsar
Reporter: Zhongqiang Gong






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-446: Kubernetes Operator State Snapshot CRD

2024-04-20 Thread Mate Czagany
Hi,

Thanks for your comments, Gyula, I really appreciate it!

I have updated the following things in the FLIP, please comment on these
changes if you have any suggestions or concerns:
- Added path field to FlinkStateSnapshotReference
- Added two examples at the bottom.
- Added error handling section and the new fields associated
("backoffLimit" and "failures") to the interfaces.
- Renamed field "completed" to "alreadyExists".

Regarding the separate resources, I don't think that any of the two
solutions would bring too much (dis)advantage to the table, so I am still
neutral, and waiting for others to chime in as well with their thoughts and
feedback!

Regards,
Mate


Gyula Fóra  ezt írta (időpont: 2024. ápr. 19., P,
21:43):

> Hey!
>
> Regarding the question of initialSavepointPath and
> flinkStateSnapshotReference new object, I think we could simply keep an
> extra field as part of the flinkStateSnapshotReference object called path.
>
> Then the fields could be:
> namespace, name, path
>
> If path is defined we would use that (to support the simple way also)
> otherwise use the resource. I would still deprecate the
> initialSavepointPath field in the jobSpec.
>
> Regarding the Savepoint/Checkpoint vs FlinkStateSnapshot.
> What we need:
>  1. Easy way to list all state snapshots (to select latest)
>  2. Easy way to reference a savepoint/checkpoint from a jobspec
>  3. Differentiate state snapshot types (in some cases users may prefer to
> use checkpoint/savepoint for certain upgrades) -> we should add a label or
> something for easy selection
>  4. Be able to delete savepoints (and checkpoints maybe)
>
> I am personally slightly more in favor of having a single resource as that
> ticks all the boxes, while having 2 separate resources will make both
> listing and referencing harder. We would have to introduce state type in
> the reference (name + namespace would not be enough to uniquely identify a
> state snapshot)
>
> I wonder if I am missing any good argument against the single
> FlinkStateSnapshot here.
>
> Cheers,
> Gyula
>
>
> On Fri, Apr 19, 2024 at 9:09 PM Mate Czagany  wrote:
>
>> Hi Robert and Thomas,
>>
>> Thank you for sharing your thoughts, I will try to address your questions
>> and suggestions:
>>
>> 1. I would really love to hear others' inputs as well about separating
>> the snapshot CRD into two different CRDs instead for savepoints and
>> checkpoints. I think the main upside is that we would not need the
>> mandatory savepoint or checkpoint field inside the spec. The two CRs could
>> share the same status fields, and their specs would be different.
>> I personally like both solutions, and would love to hear others' thoughts
>> as well.
>>
>> 2. I agree with you that "completed" is not very clear, but I would
>> suggest the name "alreadyExists". WDYT?
>>
>> 3. I think having a retry loop inside the operator does not add too much
>> complexity to the FLIP. On failure, we check if we have reached the max
>> retries. If we did, the state will be set to "FAILED", else it will be set
>> to "TRIGGER_PENDING", causing the operator to retry the task. The "error"
>> field will always be populated with the latest error. Kubernetes Jobs
>> already has a similar field called "backoffLimit", maybe we could use the
>> same name, with the same logic applied, WDYT?
>> About error events, I think we should keep the "error" field, and upon
>> successful snapshot, we clear it. I will add to the FLIP that there will be
>> an event generated for each unsuccessful snapshots.
>>
>> 4. I really like the idea of having something like Pod Conditions, but I
>> think it wouldn't add too much value here, because the only 2 stages
>> important to the user are "Triggered" and "Completed", and those timestamps
>> will already be included in the status field. I think it would make more
>> sense to implement this if there were more lifecycle stages.
>>
>> 5. There will be a new field in JobSpec called
>> "flinkStateSnapshotReference" to reference a FlinkStateSnapshot to restore
>> from.
>>
>> > How do you see potential effects on API server performance wrt. number
>> of
>> objects vs mutations? Is the proposal more or less neutral in that regard?
>>
>> While I am not an expert in Kubernetes internals, my understanding is
>> that for the api-server, editing an existing resource or creating a new one
>> is not different performance-wise, because the whole resource will always
>> be written to etcd anyways.
>> Retrieving the savepoints from etcd will be different though for some
>> use-cases, e.g. retrieving all snapshots for a specific FlinkDeployment
>> would require the api-server to retrieve every snapshots first in a
>> namespace from etcd, then filter them for that specific FlinkDeployment. I
>> think this is a worst-case scenario, and it will be up to the user to
>> optimize their queries via e.g. watch queries [1] or resourceVersions [2].
>>
>> > Does that mean one would have to create a FlinkStateSnapshot CR when
>>

[jira] [Created] (FLINK-35183) Expose FlinkMinorVersion as metric for applications in operator

2024-04-20 Thread Jira
Márton Balassi created FLINK-35183:
--

 Summary: Expose FlinkMinorVersion as metric for applications in 
operator
 Key: FLINK-35183
 URL: https://issues.apache.org/jira/browse/FLINK-35183
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Márton Balassi
Assignee: Márton Balassi
 Fix For: kubernetes-operator-1.9.0


This is a convenience feature on top of the existing Flink version grouping. 
When implementing platform overview dashboards for aggregating metrics from 
multiple operators it comes in handy.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)