Evolving Flink SQL statement set and restoring from savepoints

2024-05-07 Thread Keith Lee
Hello,

I'm running into issues restoring from savepoint after changing SQL
statement.

[ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalStateException: Failed to rollback to
> checkpoint/savepoint
> file:/tmp/flink-savepoints/savepoint-52c344-3bedb8204ff0. Cannot map
> checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to
> the new program, because the operator is not available in the new program.
> If you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.


EXECUTE STATEMENT SET was used, where an additional INSERT is added to the
initial set with the first INSERT unmodified. I appreciate that under the
hood, Flink does the planning and assigning of random uid for operator.
Under this scenario, I'd expect the restore to be fine as the first
statement remain unchanged, are there any ways around this. Also is it
possible to force uid using configuration or SQL hint?

Initial SQL statement set:

EXECUTE STATEMENT SET BEGIN
>
> INSERT INTO UserErrorExperienceS3Sink (user_id, user_session,
> interaction_type, interaction_target, interaction_tags, event_time) SELECT
> user_id, user_session, interaction_type, interaction_target,
> interaction_tags, event_time FROM UserBehaviourKafkaSource WHERE
> interaction_result Like '%ERROR%';
>
> END


Updated SQL statement set:

EXECUTE STATEMENT SET BEGIN
>
> INSERT INTO UserErrorExperienceS3Sink (user_id, user_session,
> interaction_type, interaction_target, interaction_tags, event_time) SELECT
> user_id, user_session, interaction_type, interaction_target,
> interaction_tags, event_time FROM UserBehaviourKafkaSource WHERE
> interaction_result Like '%ERROR%';
>
>   INSERT INTO CampaignAggregationsJDBC
>   SELECT
> CONCAT_WS('/', interaction_tags, interaction_result,
> DATE_FORMAT(window_start, '-MM-DD HH:mm:ss.SSS'),
> DATE_FORMAT(window_end, '-MM-DD HH:mm:ss.SSS')) AS id,
> interaction_tags as campaign,
> interaction_result,
> COUNT(*) AS interaction_count,
> window_start,
> window_end
>   FROM TABLE(
> TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time),
> INTERVAL '10' SECONDS))
>   GROUP BY window_start, window_end, interaction_tags, interaction_result;
>
> END;


This was done on Flink 1.18 SQL Client.

Much appreciated
Keith


RE: [EXTERNAL] Re: Coordinator of operator ... does not exist or the job vertex this operator belongs to is not initialized.

2024-05-07 Thread Eduard Skhisov via user
Hi Biao,
I figured that the error happens only when there is a JOIN in the select.  But 
I will put together a simple example.

Thank you,
Ed Skhisov
Architect | www.intradiem.com
303.588.2518
Mailing Address: 2500 Dallas Hwy Ste 202, Dept #37049 Marietta, GA 30064


[cid:image001.png@01DAA062.D9330690]

From: Biao Geng 
Sent: Monday, May 6, 2024 8:20 PM
To: Eduard Skhisov 
Cc: user@flink.apache.org
Subject: [EXTERNAL] Re: Coordinator of operator ... does not exist or the job 
vertex this operator belongs to is not initialized.

Hi Ed,
Would you mind giving a minimal example to reproduce your case?
I tried a pretty simple case like this in a mini cluster:
```
tEnv.createTemporaryView("test", env.fromData(1, 2, 3));
Table table = tEnv.sqlQuery("SELECT * FROM test");
table.execute().collect().next();
```
But I failed to reproduce the exception you attached :(

Best,
Biao Geng


Eduard Skhisov via user mailto:user@flink.apache.org>> 
于2024年5月1日周三 05:09写道:
Hello,
I am trying to use CloseableIterator, but next() operation reliably generates 
the following error:

java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkException: 
Coordinator of operator 4596fb32cad14208ec80c1cae8623e11 does not exist or the 
job vertex this operator belongs to is not initialized.
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
 ~[na:na]
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
 ~[na:na]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:171)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:129)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.next(CollectResultIterator.java:88)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.next(CollectDynamicSink.java:229)
 ~[flink-table-planner_2.12-1.18.0.jar:1.18.0]
at 
com.intradiem.service.flink.job.UserSnapshotJob.createSnapshot(UserSnapshotJob.java:108)
 ~[classes/:na]
at 
com.intradiem.service.quartz.TriggerUserSnapshot.execute(TriggerUserSnapshot.java:68)
 ~[classes/:na]
at org.quartz.core.JobRunShell.run(JobRunShell.java:202) 
~[quartz-2.3.2.jar:na]
at 
org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) 
~[quartz-2.3.2.jar:na]
Caused by: org.apache.flink.util.FlinkException: Coordinator of operator 
4596fb32cad14208ec80c1cae8623e11 does not exist or the job vertex this operator 
belongs to is not initialized.
at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(DefaultOperatorCoordinatorHandler.java:135)
 ~[flink-runtime-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.deliverCoordinationRequestToCoordinator(SchedulerBase.java:1070)
 ~[flink-runtime-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.sendRequestToCoordinator(JobMaster.java:616)
 ~[flink-runtime-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.deliverCoordinationRequestToCoordinator(JobMaster.java:937)
 ~[flink-runtime-1.18.0.jar:1.18.0]
at 
jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source) ~[na:na]
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) 
~[na:na]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
 ~[na:na]
at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[flink-rpc-core-1.18.0.jar:1.18.0]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
 ~[na:na]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
 ~[na:na]
at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
 ~[na:na]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
 ~[na:na]
at 
org.apache.pekko.japi.pf.UnitCaseStatement.

Re: Flink Kubernetes Operator Application mode multiple jobs

2024-05-07 Thread Biao Geng
Hi Raihan,

I believe the flink k8s operator has some implicit assumption that each
application should contain only one job so that it can manage the lifecycle
of the job reasonably. Your usage of deploying multiple jobs in a single
python file(i.e. in a single flink application) may be not preferred in
this case.

More details why you only see the first job: currently, when flink k8s
operator deploys a job, it will try to set the job id of the application
(See codes

for
more details).
As a result, when you call multiple "executeAsync()", in flink client's
execute() implementation
,
as the PIPELINE_FIXED_JOB_ID is set in the config, all "executeAsync()"
will share the same job id. In this case, only the first "executeAsync()"
will be executed and the following is a no-op. I think that's why you see
only the first job is submitted. In the JM log, you may see messages like
`Job 4eadb4b025efa5dc2a6ef2199af6ca2b was recovered successfully.` which
implies the following executeAsync() does not take effect.

Best,
Biao Geng

Best,
Biao Geng


Raihan Sunny  于2024年5月7日周二 13:48写道:

> Hi everyone,
>
> I need some help with deploying multiple jobs from a single main function
> in Application mode using Flink Kubernetes Operator. As per the
> documentation [1] it should be possible to use multiple "executeAsync()" to
> deploy multiple jobs from the same file. This indeed is the case when
> running the job locally using the CLI with something like `/bin/flink run
> -pym main -pyfs /project/` (I'm using PyFlink btw) and I can see multiple
> jobs running in the UI. However, when I try to deploy the same job using
> Flink Kubernetes Operator, it seems that only the first job gets submitted.
> The second job is never submitted although the code leading up to
> "executeAsync()" does get executed.
>
> This is a minimal representation of the deployment manifest that I tried
> to run:
>
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
> name: flink-job
> spec:
> image: flinkimage
> imagePullPolicy: IfNotPresent
> flinkVersion: v1_17
> flinkConfiguration:
>   taskmanager.numberOfTaskSlots: "1"
>   state.savepoints.dir: hdfs://...
>   state.checkpoints.dir: hdfs://...
> serviceAccount: flink
> jobManager:
>   resource:
>   memory: "1024m"
>   cpu: 0.5
>   taskManager:
> resource:
>   memory: "1024m"
>   cpu: 0.5
>   job:
>   jarURI: local:///opt/flink/opt/flink-python_2.12-1.17.0.jar
>   entryClass: "org.apache.flink.client.python.PythonDriver"
>   args: ["python", "-pym", "main", "-pyfs", "/project/"]
> parallelism: 1
>   upgradeMode: savepoint
> state: running
>
> Any help would be greatly appreciated. I'm using Flink v1.17 and Flink
> Kubernetes Operator v1.7.0.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/overview/#application-mode
> 
>
>
> Thanks,
> Sunny
>


Re: Evolving Flink SQL statement set and restoring from savepoints

2024-05-07 Thread Talat Uyarer via user
Hi Keith,

When you add a new insert statement to your EXECUTE STATEMENT you change
your job graph with independent two graphs.Unfortunately, Flink doesn't
currently provide a way to directly force specific UIDs for operators
through configuration or SQL hints. This is primarily due to how Flink's
internal planner optimizes execution plans.

Talat


On Tue, May 7, 2024 at 8:42 AM Keith Lee 
wrote:

> Hello,
>
> I'm running into issues restoring from savepoint after changing SQL
> statement.
>
> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.IllegalStateException: Failed to rollback to
>> checkpoint/savepoint
>> file:/tmp/flink-savepoints/savepoint-52c344-3bedb8204ff0. Cannot map
>> checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to
>> the new program, because the operator is not available in the new program.
>> If you want to allow to skip this, you can set the --allowNonRestoredState
>> option on the CLI.
>
>
> EXECUTE STATEMENT SET was used, where an additional INSERT is added to the
> initial set with the first INSERT unmodified. I appreciate that under the
> hood, Flink does the planning and assigning of random uid for operator.
> Under this scenario, I'd expect the restore to be fine as the first
> statement remain unchanged, are there any ways around this. Also is it
> possible to force uid using configuration or SQL hint?
>
> Initial SQL statement set:
>
> EXECUTE STATEMENT SET BEGIN
>>
>> INSERT INTO UserErrorExperienceS3Sink (user_id, user_session,
>> interaction_type, interaction_target, interaction_tags, event_time) SELECT
>> user_id, user_session, interaction_type, interaction_target,
>> interaction_tags, event_time FROM UserBehaviourKafkaSource WHERE
>> interaction_result Like '%ERROR%';
>>
>> END
>
>
> Updated SQL statement set:
>
> EXECUTE STATEMENT SET BEGIN
>>
>> INSERT INTO UserErrorExperienceS3Sink (user_id, user_session,
>> interaction_type, interaction_target, interaction_tags, event_time) SELECT
>> user_id, user_session, interaction_type, interaction_target,
>> interaction_tags, event_time FROM UserBehaviourKafkaSource WHERE
>> interaction_result Like '%ERROR%';
>>
>>   INSERT INTO CampaignAggregationsJDBC
>>   SELECT
>> CONCAT_WS('/', interaction_tags, interaction_result,
>> DATE_FORMAT(window_start, '-MM-DD HH:mm:ss.SSS'),
>> DATE_FORMAT(window_end, '-MM-DD HH:mm:ss.SSS')) AS id,
>> interaction_tags as campaign,
>> interaction_result,
>> COUNT(*) AS interaction_count,
>> window_start,
>> window_end
>>   FROM TABLE(
>> TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time),
>> INTERVAL '10' SECONDS))
>>   GROUP BY window_start, window_end, interaction_tags,
>> interaction_result;
>>
>> END;
>
>
> This was done on Flink 1.18 SQL Client.
>
> Much appreciated
> Keith
>


Re: Exception in Flink 1.18 (Time should be non negative)

2024-05-07 Thread Talat Uyarer via user
Hi Lasse,

If there's a significant difference in the system time between Flink
TaskManagers, it can lead to negative time calculations when comparing
timestamps from different sources.

On Mon, May 6, 2024 at 5:40 AM Lasse Nedergaard <
lassenedergaardfl...@gmail.com> wrote:

> Hi.
>
> In Flink jobs running 1.18 I see the error below sometimes. I Can see the
> same problem has been reported and fixed I 1.14
> Anyone have an idea how to debug and/or work around this problem?
>
> java.lang.IllegalArgumentException: Time should be non negative
>
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:80)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:71)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.triggerDebloating(SingleInputGate.java:450)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.triggerDebloating(InputGateWithMetrics.java:90)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:843)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$5(StreamTask.java:834)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
> ~[flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> [flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
> [flink-dist-1.18.0.jar:1.18.0]
>
> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> [flink-dist-1.18.0.jar:1.18.0]
>
> at java.lang.Thread.run(Unknown Source) [?:?]
>
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>


Re: Issue in PrefetchCount

2024-05-07 Thread Talat Uyarer via user
Hi ajay,

When you have 3 parallelisms you will have 3 independent clients. If you
want to keep prefetch count 3 you need to set setRequestedChannelMax as 1
and setParallelism 3. So All 3 clients can have one connection.

Talat

On Tue, May 7, 2024 at 5:52 AM ajay pandey  wrote:

> Hi Flink Team,
>
>
> I am currently reading streaming data from RabbitMQ and using the
> RMQConnectionConfig for establishing the connection. Here's how I'm setting
> up the connection:
> and we use flink version 1.16.2 and RabbitMQ version 3.10.7
>
>  RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
> .setPrefetchCount(smsInput.prefetchCount)
> .setHost(smsInput.HostServer)
> .setPort(smsInput.HostPort)
> .setUserName(smsInput.HostUserName)
> .setPassword(smsInput.HostPassword)
> .setVirtualHost("/")
> .build();
>
> ConnectionFactory rabbitMQConnectionFactory =
> connectionConfig.getConnectionFactory();
> rabbitMQConnectionFactory.setRequestedChannelMax(smsInput.prefetchCount);
> // Set prefetchcount
>
> DataStream stream = executionEnvironment.addSource(new
> RMQSource(connectionConfig,
>  smsInput.QueueName,
>  new SimpleStringSchema()))
>  .setParallelism(1);
>
>
> Additionally, I have configured the prefetch count to read 3 data at the
> same time from RabbitMQ. Here's how I have enabled the checkpointing
> interval.
>
>
> executionEnvironment.enableCheckpointing(smsInput.checkpointIntervalMS,CheckpointingMode.EXACTLY_ONCE,true);
>
> The prefetch count seems to be working fine, but when I run the job with a
> parallelism of 3, the prefetchCount is not working as expected.
>
> We establish a connection to RabbitMQ with a fixed setParallelism of 1.
> However, my other operators retrieve data from RabbitMQ and execute the job
> with a parallelism of 3, as shown in the following command.
>
> bin/flink run -p 3 ../apps/Flink_1.16.2_prefetch.jar
> ../config/app-config.properties -yD
> env.java.home=/usr/lib/jvm/java-11-openjdk-11.0.19.0.7-1.el7_9.x86_64
>
> So kindly provide a solution for configuring the prefetch count with
> parallelism.
>
>
>
> Thanks,
> Ajay Pandey
>


Is there open requests to support Redis Datastream connector

2024-05-07 Thread Zhou, Tony
Hi team,

I need a Redis sink connector for my Flink app but the best I can find is from 
Bahir,
 which is deprecated. I am wondering if someone in the community is working on 
supporting Redis Datastream connector, if not, does anyone have idea how much 
effort roughly it can take to implement it by myself? I have a simple use case 
to JSON put data into my Redis cluster.

Thanks,
Tony


Re:Re: Evolving Flink SQL statement set and restoring from savepoints

2024-05-07 Thread Xuyang
Hi, if the processing logic is modified, then the representation of the 
topology would change. Consequently, the UIDs that are determined by the 
topological order might change as well, which could potentially cause state 
recovery to fail. For further details, you can refer to [1].

Currently, the Table API does not have the capability to customize UIDs. You 
might consider creating a feature request on JIRA [2], and then initiate a 
discussion on the dev mailing list.




[1] 
https://github.com/apache/flink/blob/92eef24d4cc531d6474252ef909fc6d431285dd9/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java#L243C38-L243C62

[2] https://issues.apache.org/jira/projects/FLINK/issues/




--

Best!
Xuyang




在 2024-05-08 06:13:29,"Talat Uyarer via user"  写道:

Hi Keith,


When you add a new insert statement to your EXECUTE STATEMENT you change your 
job graph with independent two graphs.Unfortunately, Flink doesn't currently 
provide a way to directly force specific UIDs for operators through 
configuration or SQL hints. This is primarily due to how Flink's internal 
planner optimizes execution plans.


Talat




On Tue, May 7, 2024 at 8:42 AM Keith Lee  wrote:

Hello,


I'm running into issues restoring from savepoint after changing SQL statement.

[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint 
file:/tmp/flink-savepoints/savepoint-52c344-3bedb8204ff0. Cannot map 
checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the 
new program, because the operator is not available in the new program. If you 
want to allow to skip this, you can set the --allowNonRestoredState option on 
the CLI.

EXECUTE STATEMENT SET was used, where an additional INSERT is added to the 
initial set with the first INSERT unmodified. I appreciate that under the hood, 
Flink does the planning and assigning of random uid for operator. Under this 
scenario, I'd expect the restore to be fine as the first statement remain 
unchanged, are there any ways around this. Also is it possible to force uid 
using configuration or SQL hint?


Initial SQL statement set:


EXECUTE STATEMENT SET BEGIN

INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, interaction_type, 
interaction_target, interaction_tags, event_time) SELECT user_id, user_session, 
interaction_type, interaction_target, interaction_tags, event_time FROM 
UserBehaviourKafkaSource WHERE interaction_result Like '%ERROR%';

END


Updated SQL statement set:

EXECUTE STATEMENT SET BEGIN

INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, interaction_type, 
interaction_target, interaction_tags, event_time) SELECT user_id, user_session, 
interaction_type, interaction_target, interaction_tags, event_time FROM 
UserBehaviourKafkaSource WHERE interaction_result Like '%ERROR%';

  INSERT INTO CampaignAggregationsJDBC
  SELECT
CONCAT_WS('/', interaction_tags, interaction_result, 
DATE_FORMAT(window_start, '-MM-DD HH:mm:ss.SSS'), DATE_FORMAT(window_end, 
'-MM-DD HH:mm:ss.SSS')) AS id,
interaction_tags as campaign,
interaction_result,
COUNT(*) AS interaction_count,
window_start,
window_end
  FROM TABLE(
TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time), INTERVAL 
'10' SECONDS))
  GROUP BY window_start, window_end, interaction_tags, interaction_result;

END;


This was done on Flink 1.18 SQL Client.



Much appreciated

Keith


Re:Is there open requests to support Redis Datastream connector

2024-05-07 Thread Xuyang
Hi, Tony.




There are some related works in Flink commu to support Redis source / sink 
connector. You can see more here[1][2][3][4]. However, these JIRAs are still 
WIP.




The good news is that the PR in jira[1] is already available. You can directly 
download the PR and package it into a connector for use.







[1] https://issues.apache.org/jira/browse/FLINK-15571

[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-254%3A+Redis+Streams+Connector

[3] https://issues.apache.org/jira/browse/FLINK-33873

[4] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-404%3A+Create+a+Redis+HyperLogLog+Connector+for+Flink







--

Best!
Xuyang




At 2024-05-08 07:28:08, "Zhou, Tony"  wrote:

Hi team,

 

I need a Redis sink connector for my Flink app but the best I can find is from 
Bahir, which is deprecated. I am wondering if someone in the community is 
working on supporting Redis Datastream connector, if not, does anyone have idea 
how much effort roughly it can take to implement it by myself? I have a simple 
use case to JSON put data into my Redis cluster.

 

Thanks,

Tony

RE: Flink Kubernetes Operator Application mode multiple jobs

2024-05-07 Thread Guozhen Yang
Hi Raihan,

We have encountered the same issue though we are using Flink Kubernetes
Operator 1.6.

Biao Geng's explanation is correct. We also have a detailed briefing in
this jira ticket  but it
seems that submitting multiple jobs is not allowed by design.

So we made a patch to solve this issue in our custom flink distribution.
Solving this is simple.

All we need is make one line change in ApplicationReconciler class

like
below.

change from
setJobIdIfNecessary(spec, relatedResource, deployConfig);
to

if (HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig))
{
setJobIdIfNecessary(spec, relatedResource, deployConfig);
}

After this modification, everything works perfectly fine as we
expected in our environment.


On 2024/05/07 05:48:02 Raihan Sunny wrote:
> Hi everyone,
>
> I need some help with deploying multiple jobs from a single main function
in Application mode using Flink Kubernetes Operator. As per the
documentation [1] it should be possible to use multiple "executeAsync()" to
deploy multiple jobs from the same file. This indeed is the case when
running the job locally using the CLI with something like `/bin/flink run
-pym main -pyfs /project/` (I'm using PyFlink btw) and I can see multiple
jobs running in the UI. However, when I try to deploy the same job using
Flink Kubernetes Operator, it seems that only the first job gets submitted.
The second job is never submitted although the code leading up to
"executeAsync()" does get executed.
>
> This is a minimal representation of the deployment manifest that I tried
to run:
>
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
> name: flink-job
> spec:
> image: flinkimage
> imagePullPolicy: IfNotPresent
> flinkVersion: v1_17
> flinkConfiguration:
>  taskmanager.numberOfTaskSlots: "1"
>  state.savepoints.dir: hdfs://...
>  state.checkpoints.dir: hdfs://...
> serviceAccount: flink
> jobManager:
>  resource:
>   memory: "1024m"
>   cpu: 0.5
>   taskManager:
> resource:
>   memory: "1024m"
>   cpu: 0.5
>   job:
>  jarURI: local:///opt/flink/opt/flink-python_2.12-1.17.0.jar
>  entryClass: "org.apache.flink.client.python.PythonDriver"
>  args: ["python", "-pym", "main", "-pyfs", "/project/"]
> parallelism: 1
>  upgradeMode: savepoint
> state: running
>
> Any help would be greatly appreciated. I'm using Flink v1.17 and Flink
Kubernetes Operator v1.7.0.
>
> [1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/overview/#application-mode
<
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/overview/#application-mode
),>
>
>
> Thanks,
> Sunny
>