[jira] [Created] (FLINK-35179) add postgres pipeline data sink connector
melin created FLINK-35179: - Summary: add postgres pipeline data sink connector Key: FLINK-35179 URL: https://issues.apache.org/jira/browse/FLINK-35179 Project: Flink Issue Type: New Feature Components: Flink CDC Reporter: melin -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35180) Instant in row doesn't convert to correct type in python process mode
Wei Yuan created FLINK-35180: Summary: Instant in row doesn't convert to correct type in python process mode Key: FLINK-35180 URL: https://issues.apache.org/jira/browse/FLINK-35180 Project: Flink Issue Type: Bug Components: API / Python Reporter: Wei Yuan use ``` from pyflink.datastream import StreamExecutionEnvironment from pyflink.common import Types, WatermarkStrategy, Configuration from pyflink.table import EnvironmentSettings, TableEnvironment from pyflink.table import StreamTableEnvironment, Schema from pyflink.datastream.functions import ProcessFunction, MapFunction # init task env config = Configuration() # config.set_string("python.execution-mode", "thread") config.set_string("python.execution-mode", "process") config.set_string("python.client.executable", "/root/miniconda3/bin/python3") config.set_string("python.executable", "/root/miniconda3/bin/python3") env = StreamExecutionEnvironment.get_execution_environment(config) table_env = StreamTableEnvironment.create(env) # create a batch TableEnvironment table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", "content") table_env.create_temporary_view("test_table", table) result_table = table_env.sql_query("select *, NOW() as dt from test_table") result_ds = table_env.to_data_stream(result_table) def test_func(row): print(row) return row result_ds.map(test_func) env.execute() ``` output in process mode: ``` Row(id=1, content='Hi', dt=Instant<1713609386, 27100>) Row(id=2, content='Hello', dt=Instant<1713609386, 58000>) ``` output in thread mode: ``` Row(id=1, content='Hi', dt=) Traceback (most recent call last): File "/home/disk1/yuanwei/bug.py", line 31, in env.execute() File "/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", line 773, in execute return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph)) File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1322, in __call__ return_value = get_return_value( File "/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute. : org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47) at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310) at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307) at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234) at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231) at scala.concurrent.impl.Cal
[jira] [Created] (FLINK-35181) CLONE - Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Elasticsearch connector
Zhongqiang Gong created FLINK-35181: --- Summary: CLONE - Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Elasticsearch connector Key: FLINK-35181 URL: https://issues.apache.org/jira/browse/FLINK-35181 Project: Flink Issue Type: Technical Debt Components: Connectors/ RabbitMQ Reporter: Zhongqiang Gong Assignee: Danny Cranmer Fix For: rabbitmq-3.1.0 Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink RabbitMQ connector -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35182) Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Pulsar connector
Zhongqiang Gong created FLINK-35182: --- Summary: Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Pulsar connector Key: FLINK-35182 URL: https://issues.apache.org/jira/browse/FLINK-35182 Project: Flink Issue Type: Technical Debt Components: Connectors / Pulsar Reporter: Zhongqiang Gong -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-446: Kubernetes Operator State Snapshot CRD
Hi, Thanks for your comments, Gyula, I really appreciate it! I have updated the following things in the FLIP, please comment on these changes if you have any suggestions or concerns: - Added path field to FlinkStateSnapshotReference - Added two examples at the bottom. - Added error handling section and the new fields associated ("backoffLimit" and "failures") to the interfaces. - Renamed field "completed" to "alreadyExists". Regarding the separate resources, I don't think that any of the two solutions would bring too much (dis)advantage to the table, so I am still neutral, and waiting for others to chime in as well with their thoughts and feedback! Regards, Mate Gyula Fóra ezt írta (időpont: 2024. ápr. 19., P, 21:43): > Hey! > > Regarding the question of initialSavepointPath and > flinkStateSnapshotReference new object, I think we could simply keep an > extra field as part of the flinkStateSnapshotReference object called path. > > Then the fields could be: > namespace, name, path > > If path is defined we would use that (to support the simple way also) > otherwise use the resource. I would still deprecate the > initialSavepointPath field in the jobSpec. > > Regarding the Savepoint/Checkpoint vs FlinkStateSnapshot. > What we need: > 1. Easy way to list all state snapshots (to select latest) > 2. Easy way to reference a savepoint/checkpoint from a jobspec > 3. Differentiate state snapshot types (in some cases users may prefer to > use checkpoint/savepoint for certain upgrades) -> we should add a label or > something for easy selection > 4. Be able to delete savepoints (and checkpoints maybe) > > I am personally slightly more in favor of having a single resource as that > ticks all the boxes, while having 2 separate resources will make both > listing and referencing harder. We would have to introduce state type in > the reference (name + namespace would not be enough to uniquely identify a > state snapshot) > > I wonder if I am missing any good argument against the single > FlinkStateSnapshot here. > > Cheers, > Gyula > > > On Fri, Apr 19, 2024 at 9:09 PM Mate Czagany wrote: > >> Hi Robert and Thomas, >> >> Thank you for sharing your thoughts, I will try to address your questions >> and suggestions: >> >> 1. I would really love to hear others' inputs as well about separating >> the snapshot CRD into two different CRDs instead for savepoints and >> checkpoints. I think the main upside is that we would not need the >> mandatory savepoint or checkpoint field inside the spec. The two CRs could >> share the same status fields, and their specs would be different. >> I personally like both solutions, and would love to hear others' thoughts >> as well. >> >> 2. I agree with you that "completed" is not very clear, but I would >> suggest the name "alreadyExists". WDYT? >> >> 3. I think having a retry loop inside the operator does not add too much >> complexity to the FLIP. On failure, we check if we have reached the max >> retries. If we did, the state will be set to "FAILED", else it will be set >> to "TRIGGER_PENDING", causing the operator to retry the task. The "error" >> field will always be populated with the latest error. Kubernetes Jobs >> already has a similar field called "backoffLimit", maybe we could use the >> same name, with the same logic applied, WDYT? >> About error events, I think we should keep the "error" field, and upon >> successful snapshot, we clear it. I will add to the FLIP that there will be >> an event generated for each unsuccessful snapshots. >> >> 4. I really like the idea of having something like Pod Conditions, but I >> think it wouldn't add too much value here, because the only 2 stages >> important to the user are "Triggered" and "Completed", and those timestamps >> will already be included in the status field. I think it would make more >> sense to implement this if there were more lifecycle stages. >> >> 5. There will be a new field in JobSpec called >> "flinkStateSnapshotReference" to reference a FlinkStateSnapshot to restore >> from. >> >> > How do you see potential effects on API server performance wrt. number >> of >> objects vs mutations? Is the proposal more or less neutral in that regard? >> >> While I am not an expert in Kubernetes internals, my understanding is >> that for the api-server, editing an existing resource or creating a new one >> is not different performance-wise, because the whole resource will always >> be written to etcd anyways. >> Retrieving the savepoints from etcd will be different though for some >> use-cases, e.g. retrieving all snapshots for a specific FlinkDeployment >> would require the api-server to retrieve every snapshots first in a >> namespace from etcd, then filter them for that specific FlinkDeployment. I >> think this is a worst-case scenario, and it will be up to the user to >> optimize their queries via e.g. watch queries [1] or resourceVersions [2]. >> >> > Does that mean one would have to create a FlinkStateSnapshot CR when >>
[jira] [Created] (FLINK-35183) Expose FlinkMinorVersion as metric for applications in operator
Márton Balassi created FLINK-35183: -- Summary: Expose FlinkMinorVersion as metric for applications in operator Key: FLINK-35183 URL: https://issues.apache.org/jira/browse/FLINK-35183 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Reporter: Márton Balassi Assignee: Márton Balassi Fix For: kubernetes-operator-1.9.0 This is a convenience feature on top of the existing Flink version grouping. When implementing platform overview dashboards for aggregating metrics from multiple operators it comes in handy. -- This message was sent by Atlassian Jira (v8.20.10#820010)