Re: Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0

2021-01-04 Thread Till Rohrmann
Hi Deep,

Flink has dropped support for specifying the number of TMs via -n since the
introduction of Flip-6. Since then, Flink will automatically start TMs
depending on the required resources. Hence, there is no need to specify the
-n parameter anymore. Instead, you should specify the parallelism with
which you would like to run your job via the -p option.

Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to
limit the upper limit of slots a cluster is allowed to allocate [1].

[1] https://issues.apache.org/jira/browse/FLINK-16605

Cheers,
Till

On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh 
wrote:

> Hi Guys,
>
> I’m struggling while initiating the task manager with flink 1.11.0 in AWS
> EMR but with older versions it is not. Let me put the full context here.
>
> *When using Flink 1.9.1 and EMR 5.29.0*
>
> To create a long running session, we used the below command.
>
> *sudo flink-yarn-session -n  -s  -jm 
> -tm  -d*
>
> and followed by below command to run the final job.
>
> *flink run -m yarn-cluster -yid  -yn  -ys
>  -yjm  -ytm  -c  *
>
> and if “n” is 6 then it is used to create 6 task managers to start the job,
> so whatever “n” is configured the result was that number of TM the job is
> being started.
>
> But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and
> EMR 6.1.0*) we are unable to achieve the desired values for TM.
>
> Please find the session Ids of new configuration,
>
> *sudo flink-yarn-session -Djobmanager.memory.process.size=
> -Dtaskmanager.memory.process.size= -n  -s  slot/core> -d*
>
> And the final Job command
>
> *flink run -m yarn-cluster -yid  -c   Path>*
>
> I have tried a lot of combinations, but nothing worked out so far. I
> request your help in this regard as the plan to have this configuration in
> *PRODUCTION* soon.
>
> Thanks in advance.
>
>
> Regards,
>
> -Deep
>


Re: Tumbling Time Window

2021-01-04 Thread David Anderson
For straightforward tumbling windows, the regular DSL windowing performs
noticeably better than a custom process function because it takes advantage
of an internal API to avoid some serialization overhead.

There's a simple example of a ProcessWindowFunction in [1], and an example
of using a KeyedProcessFunction to do windowing in [2].

Best,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/streaming_analytics.html#window-functions
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example


On Mon, Jan 4, 2021 at 1:05 AM Navneeth Krishnan 
wrote:

> Hello All,
>
> First of all Happy New Year!! Thanks for the excellent community support.
>
> I have a job which requires a 2 seconds tumbling time window per key, For
> each user we wait for 2 seconds to collect enough data and proceed to
>  further processing. My question is should I use the regular DSL windowing
> or write a custom process function which does the windowing. I have heard
> that the DSL window has more overhead versus the custom window function.
>
> What do you guys suggest and can someone provide an example of custom
> window function per key. Also given the window time is very less (2 secs)
> would there be more overhead in firing so many timers for each key?
>
> Thanks!
>
> Regards,
> Navneeth
>


How to register TableSinks

2021-01-04 Thread Patrick.Eifler
Hi and Happy New Year,

I’m currently trying to remove deprecations to prepare for the upgrade to Flink 
1.12. currently running on 1.11.

Specifically I need to update our code that registers table sinks into the 
StreamTableEnvironment. I’m maintaining jobs that use DataStreams with multiple 
sinks. Now I want to use the StatementSet to benefit from its DAG for multiple 
sinks.

So far I added the code to add the sinks into the StatementSet:

statementSet.addInsert(sinkName,.table)

and to execute the StatementSet:

statementSet.execute()

For this to work I need to register the sinks. I used to do that with the (now 
deprecated) function on the StreamTableEnvironment:

tableEnv.registerTableSink(
sinkName,
fieldNames,
fieldTypes,
tableSink
)

My Question is how to register sinks to be discovered by the statement set? 
What is the proper replacement for the function registerTableSink?

executeSql(ddl) as suggested, does not apply to this use case. Did not find 
anything in the documentation either: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#translate-and-execute-a-query

When running the job I’m getting the error, that the sink could not be found in 
the catalog. Which means I have to add the sink into the catalog, but how?

Which function should be used for registering a table sink into the table 
environments catalog?

Thanks!

Kind Regards,

Patrick
--
Patrick Eifler

Senior Software Engineer (BI)

Cloud Gaming Engineering & Infrastructure
Sony Interactive Entertainment LLC

Wilhelmstraße 118, 10963 Berlin

Germany

E: patrick.eif...@sony.com


Re: Flink sink never executes

2021-01-04 Thread Kostas Kloudas
Hi Ben,

Sorry for the late reply but I guess that your question was answered
in StackOverflow, right?
Did that answer solve your problem?

Cheers,
Kostas

On Mon, Dec 21, 2020 at 9:09 AM Ben Beasley  wrote:
>
> First off I want to thank the folks in this email list for their help thus 
> far.
>
>
>
> I’m facing another strange issue where if I add a window to my stream, the 
> sink no longer executes. However the sink executes without the windowing. I 
> described my problem on stackoverflow so that the code is easy to read.
>
>
>
> I wonder if anyone can help me once more, I believe the solution could be 
> simple for someone familiar with the code. I believe I’ve followed the 
> tutorials and articles on the flink website correctly.


Re: How to register TableSinks

2021-01-04 Thread Dawid Wysakowicz
Hi Patrick.

Happy New Year to you too ;)

The method you referring was deprecated along with the TableSink
whatsoever in favour of a much improved and feature rich new Source &
Sink API. You can find an extensive documentation on this new API here[1].

Therefore if you use the old TableSink interface you must stick with the
deprecated method.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html

On 04/01/2021 14:53, patrick.eif...@sony.com wrote:
>
> Hi and Happy New Year,
>
>  
>
> I’m currently trying to remove deprecations to prepare for the upgrade
> to Flink 1.12. currently running on 1.11.
>
>  
>
> Specifically I need to update our code that registers table sinks into
> the StreamTableEnvironment. I’m maintaining jobs that use DataStreams
> with multiple sinks. Now I want to use the StatementSet to benefit
> from its DAG for multiple sinks.
>
>  
>
> So far I added the code to add the sinks into the StatementSet:
>
>  
>
> *statementSet.addInsert(sinkName,.table)*
>
>  
>
> and to execute the StatementSet:
>
>  
>
> *statementSet.execute()*
>
>  
>
> For this to work I need to register the sinks. I used to do that with
> the (now deprecated) function on the StreamTableEnvironment:
>
>  
>
> *tableEnv.registerTableSink(*
>
> *    sinkName,*
>
> *    fieldNames,*
>
> *    fieldTypes,*
>
> *    tableSink*
>
> *)*
>
>  
>
> My Question is how to register sinks to be discovered by the statement
> set? What is the proper replacement for the function *registerTableSink*?
>
>  
>
> *executeSql(ddl)*as suggested, does not apply to this use case. Did
> not find anything in the documentation either:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#translate-and-execute-a-query
>
>  
>
> When running the job I’m getting the error, that the sink could not be
> found in the catalog. Which means I have to add the sink into the
> catalog, but how?
>
>  
>
> Which function should be used for registering a table sink into the
> table environments catalog?
>
> Thanks!
>
>  
>
> Kind Regards,
>
>  
>
> Patrick
>
> -- 
>
> Patrick Eifler
>
>  
>
> Senior Software Engineer (BI)
>
> Cloud Gaming Engineering & Infrastructure 
> Sony Interactive Entertainment LLC 
>
> Wilhelmstraße 118, 10963 Berlin
>
>
> Germany
>
> E: patrick.eif...@sony.com
>


signature.asc
Description: OpenPGP digital signature


Re: Is chk-$id/_metadata created regardless of enabling externalized checkpoints?

2021-01-04 Thread Yun Gao
Hi Dongwon,

   Happy new year! One meta file would be stored on top of HDFS even if 
external-checkpoint is not enabled. If external checkpoint is not enabled, 
flink would delete all the checkpoints on exit, and if external checkpoint is 
enabled, the checkpoints would be kept on cancel or fail cases, according to 
the settings. Thus for the second issue, I think it would be yes.

Best,
 Yun


 --Original Mail --
Sender:Dongwon Kim 
Send Date:Mon Jan 4 19:16:39 2021
Recipients:user 
Subject:Is chk-$id/_metadata created regardless of enabling externalized 
checkpoints?

Hi,

First of all, happy new year!
It can be a very basic question but I have something to clarify in my head.

my flink-conf.yaml is as follows (note that I didn't specify the value of 
"execution-checkpointing-externalized-checkpoint-retention [1]"):
#...
execution.checkpointing.interval: 20min
execution.checkpointing.min-pause: 1min

state.backend: rocksdb
state.backend.incremental: true

state.checkpoints.dir: hdfs:///flink-jobs/ckpts
state.checkpoints.num-retained: 10

state.savepoints.dir: hdfs:///flink-jobs/svpts
#...

And the checkpoint configuration is shown as follows in Web UI (note that 
"Persist Checkpoints Externally" is "Disabled" in the final row):


According to [2],
externalized checkpoints: You can configure periodic checkpoints to be 
persisted externally. Externalized checkpoints write their meta data out to 
persistent storage and are not automatically cleaned up when the job fails. 
This way, you will have a checkpoint around to resume from if your job fails. 
There are more details in the deployment notes on externalized checkpoints.
So I've thought the metadata of a checkpoint is only on JobManager's memory and 
not stored on HDFS unless 
"execution-checkpointing-externalized-checkpoint-retention" is set.

However, even without setting the value, every checkpoint already contains its 
own metadata:
[user@devflink conf]$ hdfs dfs -ls 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/*
Found 1 items
-rw-r--r--  3 user hdfs  163281 2021-01-04 14:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-945/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  163281 2021-01-04 14:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-946/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  163157 2021-01-04 15:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-947/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  156684 2021-01-04 15:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-948/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  147280 2021-01-04 15:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-949/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  147280 2021-01-04 16:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-950/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  162937 2021-01-04 16:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-951/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  175089 2021-01-04 16:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-952/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  173289 2021-01-04 17:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-953/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  153951 2021-01-04 17:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-954/_metadata
Found 21 items
-rw-r--r--  3 user hdfs 78748 2021-01-04 14:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/05d76f4e-3d9c-420c-8b87-077fc9880d9a
-rw-r--r--  3 user hdfs 23905 2021-01-04 15:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/0b9d9323-9f10-4fc2-8fcc-a9326448b07c
-rw-r--r--  3 user hdfs 81082 2021-01-04 16:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/0f6779d0-3a2e-4a94-be9b-d9d6710a7ea0
-rw-r--r--  3 user hdfs 23905 2021-01-04 16:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/107b3b74-634a-462c-bf40-1d4886117aa9
-rw-r--r--  3 user hdfs 78748 2021-01-04 14:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/18a538c6-d40e-48c0-a965-d65be407a124
-rw-r--r--  3 user hdfs 83550 2021-01-04 16:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/24ed9c4a-0b8e-45d4-95b8-64547cb9c541
-rw-r--r--  3 user hdfs 23905 2021-01-04 17:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/35ee9665-7c1f-4407-beb5-fbb312d84907
-rw-r--r--  3 user hdfs 47997 2021-01-04 11:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/36363172-c401-4d60-a970-cfb2b3cbf058
-rw-r--r--  3 user hdfs 81082 2021-01-04 15:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/43aecc8c-145f-43ba-81a8-b0ce2c3498f4
-rw-r--r--  3 user hdfs 79898 2021-01-04 15:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/5743f278-fc50-4c4a-b14e-89bfdb2139fa
-rw-r--r--  3 user hdfs 23905 2021-01-04 16:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/67e16688-c48c-409b-acac-e7091a84d548
-rw-r--r--  3 user hdfs 23905 2021-01

Re: How to register TableSinks

2021-01-04 Thread Patrick.Eifler
Hey, Thanks Dawid,

One more question: Does the StatementSet API supposed to work with the old sink 
interface?
I get the following error when I’m using it with the deprecated 
registerTableSink method:

The main method caused an error: requirement failed: operations should not be 
empty

Thanks!

Patrick
--
Patrick Eifler

Senior Software Engineer (BI)

Cloud Gaming Engineering & Infrastructure
Sony Interactive Entertainment LLC

Wilhelmstraße 118, 10963 Berlin

Germany

E: patrick.eif...@sony.com

From: Dawid Wysakowicz 
Date: Monday, 4. January 2021 at 15:50
To: , 
Subject: Re: How to register TableSinks


Hi Patrick.

Happy New Year to you too ;)

The method you referring was deprecated along with the TableSink whatsoever in 
favour of a much improved and feature rich new Source & Sink API. You can find 
an extensive documentation on this new API here[1].

Therefore if you use the old TableSink interface you must stick with the 
deprecated method.

Best,

Dawid

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
On 04/01/2021 14:53, patrick.eif...@sony.com 
wrote:
Hi and Happy New Year,

I’m currently trying to remove deprecations to prepare for the upgrade to Flink 
1.12. currently running on 1.11.

Specifically I need to update our code that registers table sinks into the 
StreamTableEnvironment. I’m maintaining jobs that use DataStreams with multiple 
sinks. Now I want to use the StatementSet to benefit from its DAG for multiple 
sinks.

So far I added the code to add the sinks into the StatementSet:

statementSet.addInsert(sinkName,.table)

and to execute the StatementSet:

statementSet.execute()

For this to work I need to register the sinks. I used to do that with the (now 
deprecated) function on the StreamTableEnvironment:

tableEnv.registerTableSink(
sinkName,
fieldNames,
fieldTypes,
tableSink
)

My Question is how to register sinks to be discovered by the statement set? 
What is the proper replacement for the function registerTableSink?

executeSql(ddl) as suggested, does not apply to this use case. Did not find 
anything in the documentation either: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#translate-and-execute-a-query

When running the job I’m getting the error, that the sink could not be found in 
the catalog. Which means I have to add the sink into the catalog, but how?

Which function should be used for registering a table sink into the table 
environments catalog?


Thanks!

Kind Regards,

Patrick
--
Patrick Eifler

Senior Software Engineer (BI)

Cloud Gaming Engineering & Infrastructure
Sony Interactive Entertainment LLC

Wilhelmstraße 118, 10963 Berlin

Germany

E: patrick.eif...@sony.com


Re: Visualizing Flink Statefun's "Logical Co-location, Physical Separation" Properties

2021-01-04 Thread Igal Shilman
Hi Le,
Let me try to answer to your multiple questions, one by one:


> I'm trying to understand the internal mechanism used by Flink Statefun to
> dispatch functions to Flink cluster. In particular, I was trying to find a
> good example demonstrating Statefun's "Logical Co-location, Physical
> Separation" properties (as pointed out by [1]).
>

I'm not sure that I understand what dispatch functions to the Flink cluster
mean here, but I will try to give you a general description of how things
work with StateFun, and please follow up
with any clarifications :-)

In general, StateFun is a very specific Flink streaming job, and as such it
will be running on a Flink cluster. Now, a remote function is a function
that runs in a different process
that executes (for now) an HTTP server and runs the StateFun SDK. These
processes can be located at the same machine as the Flink's TaskManagers
and communicate via a unix domain socket, or they can be at a different
machine, or they can even be deployed behind a load balancer, and
autoscaled up and down on demand.
Now, as long as StateFun knows how to translate a function logical address
to an HTTP endpoint that serves it, StateFun can dispatch function calls to
these remote function processes.
By logical co-location, physical separation: a Flink worker that executes
the StateFun job, is responsible for the state and messaging of a specific
key (address) but the function itself can be running at a different
physical process.
A good example of this kind of deployment you can find Gordon's talk [1],
that demonstrates deploying the remote functions on AWS lambda.


My understanding based on the doc was that there are three modes to deploy
> statefun to Flink cluster ranging from remote functions to embedded
> functions (decreasing storage locality and increasing efficiency).
> Therefore, in the scenario of remote functions, functions are deployed with
> its state and message independent from flink processes. And function should
> be able to be executed in any Flink process as if it is a stateless
> application.
>

Remote functions are indeed "effectively stateless" and state is being
provided as part of an invocation request. But the state is managed in a
fault tolerant way by Flink.


> I have tried out a couple of examples from the statefun but judging by
> allocation result the subtask of the job seems to bind statically with each
> task slot in the Flink cluster (I'm assuming the example such as DataStream
> uses embedded function instead?).
>

You are correct, the StateFun job has a fixed topology independent of the
number of functions or function types. Therefore you can have many
different function types and many billions of function instances.
A single FunctionDispatcher operator, will handle transparently the
multiplexing of different function types and instances behind the scenes.

I hope that clarifies a bit.

Igal.


[1] https://www.youtube.com/watch?v=tuSylBadNSo


On Tue, Dec 29, 2020 at 10:58 PM Le Xu  wrote:

> Hello!
>
> I'm trying to understand the internal mechanism used by Flink Statefun to
> dispatch functions to Flink cluster. In particular, I was trying to find a
> good example demonstrating Statefun's "Logical Co-location, Physical
> Separation" properties (as pointed out by [1]).
>
> My understanding based on the doc was that there are three modes to deploy
> statefun to Flink cluster ranging from remote functions to embedded
> functions (decreasing storage locality and increasing efficiency).
> Therefore, in the scenario of remote functions, functions are deployed with
> its state and message independent from flink processes. And function should
> be able to be executed in any Flink process as if it is a stateless
> application. I have tried out a couple of examples from the statefun but
> judging by allocation result the subtask of the job seems to bind
> statically with each task slot in the Flink cluster (I'm assuming the
> example such as DataStream uses embedded function instead?).
>
> I also came across this tutorial [2] demonstrating the usage of remote
> function. The README indicates [3] that "Since the functions are
> stateless, and running in their own container, we can redeploy and rescale
> it independently of the rest of the infrastructure." which seems to
> indicate that the function performs scaling manually by the user that could
> occupy arbitrary resources (e.g., task slots) from the Flink cluster on
> demand. But I wasn't sure how to explicitly specify the amount of
> parallelism for each function dynamically.
> Is there a good example to visualize statefun "physical separation"
> behavior by forcing the same function to be invoked at different task slots
> / machines (either on-demand or automatically)?
>
> Any help will be appreciated!
>
> Thanks!
>
> Le
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/concepts/distributed_architecture.html#remote-functions
> [2] https://github.com/ververica/fli

Re: Flink Stateful Function: The program's entry point class not found in the jar file

2021-01-04 Thread Igal Shilman
Hi Le,

Looking at your pom.xml, you are pointing to the wrong main method here:
https://github.com/flint-stone/flink-statefun/blob/lx-base/statefun-examples/statefun-flink-datastream-example/pom.xml#L161

You need to change it to your Example class, this should work.


On Tue, Dec 29, 2020 at 5:06 AM Le Xu  wrote:

> Hi Igal:
>
> Thanks for pointing that out. I was able to add the dependency in and
> submit the job. For statefun-greeter example [1] I was able to submit the
> job to the cluster . But when I try out the statefun-data-stream example
> [2] I got the complaints saying that "There are no ingress defined" (I'm
> adding the full trace to the end of the email). (I'm also adding my pom to
> [3] and source file to [4].) From the example it appears the job uses the
> Datastream as ingress [5] so ideally the job should be able to receive
> events as trigger periodically (the job works fine with local environment).
> I also came across this issue [6] but I don't think it helps solving my
> problem. By any chance you'd know what's going on?
>
>
> [1]
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
> [2]
> https://github.com/flint-stone/flink-statefun/blob/3aba506242300d69e95ef339afeac8e561dc7a2d/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java#L82
> [3]
> https://github.com/flint-stone/flink-statefun/blob/lx-base/statefun-examples/statefun-flink-datastream-example/pom.xml
> [4]
> https://github.com/flint-stone/flink-statefun/blob/lx-base/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
> [5]
> https://github.com/flint-stone/flink-statefun/blob/3aba506242300d69e95ef339afeac8e561dc7a2d/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java#L82
> [6]
> https://stackoverflow.com/questions/61578082/flink-statefun-co-located-functions-communication
>
> Error Trace:
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: There are no ingress defined.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.lang.IllegalStateException: There are no ingress defined.
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:76)
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:52)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 8 more
>
>
> Thanks for the help!
>
> Le
>
>
> On Mon, Dec 28, 2020 at 6:24 AM Igal Shilman  wrote:
>
>> Hi Le,
>> Indeed you have added the dependency correctly. But the resulting
>> artifact doesn't contain the dependencies. You need to create a jar with
>> dependencies ( via [1] or [2])
>> Take a look at [3] for a usage example of the maven shade plugin.
>>
>> I hope this helps,
>> Igal.
>>
>> [1] https://maven.apache.org/plugins/maven-assembly-plugin/usage.html
>> [2] https://maven.apache.org/plugins/maven-shade-plugin/
>> [3]
>> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-distribution/pom.xml#L126,L199
>>
>> On Sat, Dec 26, 2020 at 11:52 PM Le Xu  wrote:
>>
>>> Thanks Igal! I might be missing something here. I did place
>>> statefun-flink-distribution as part of my dependency in the pom (see
>>> line 46 at [1]).  Is there a correct way to include the jar? I'm having the
>>> same problem across many examples I'm running.
>>>
>>> [1]
>>> https://gist.github.com/flint-stone/059f00832d8b99af433a446771f4f740#file-pom-xml-L64
>>>
>>> Thanks!
>>>
>>> Le
>>>
>>> On Sat, Dec 26, 2020 at 2:23 PM Ig

Fwd: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

2021-01-04 Thread Dongwon Kim
Any advice would be appreciated :)

Thanks,

Dongwon

-- Forwarded message -
From: Dongwon Kim 
Date: Mon, Dec 14, 2020 at 11:27 PM
Subject: How to gracefully avoid "Generic types have been disabled in the
ExecutionConfig and type java.util.List is treated as a generic type"?
To: user 


Hi,

The following program compiles and runs w/o exceptions:

> public class Test {
>
>   public static class A {
> private int n;
>
> public A() { }
> public int getN() {  return n;  }
> public void setN(int n) {  this.n = n;  }
>   }
>
>   public static class B {
> private List lst;
>
> public B() { }
> public List getLst() {  return lst;  }
> public void setLst(List lst) {  this.lst = lst;  }
>   }
>
>   public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
>
> env.fromElements(new B())
>   .print();
>
> env.execute();
>   }
> }
>

When I add the following line,

> env.getConfig().disableGenericTypes();

then the program shows me an exception:

> Exception in thread "main" java.lang.UnsupportedOperationException:
> Generic types have been disabled in the ExecutionConfig and type
> java.util.List is treated as a generic type.
> at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
> at Test.main(Test.java:29)


To avoid this exception, I found that I have to declare a type factory like:

>   public static class BTypeFactory extends TypeInfoFactory {
> @Override
> public TypeInformation createTypeInfo(Type t, Map TypeInformation> genericParameters) {
>   return Types.POJO(
> B.class,
> ImmutableMap.>builder()
>   .put("lst", Types.LIST(Types.POJO(A.class)))
> .build()
>   );
> }
>   }

and give it to class B as follows:

>   @TypeInfo(BTypeFactory.class)
>   public static class B {


Is there no other way but to declare BTypeFactory in such cases?
I don't like the way I have to type a field name twice, one for a member
variable and the other for an Map entry in TypeInfoFactory.

Thanks in advance,

Dongwon


Batch with Flink Steraming API version 1.12.0

2021-01-04 Thread Robert Cullen
I have a Kafka source that I would like to run a batch job on.  Since
Version 1.12.0 is now soft deprecating the DataSet API in favor of the
DataStream API, can someone show me an example of this? (Using DataStream)

thanks
-- 
Robert Cullen
240-475-4490


Re: Flink sink never executes

2021-01-04 Thread Ben Beasley
Yes, it did. Thanks for checking, Kostas. Also, thanks again for helping me 
with the other issue. What a great community Flink has.

From: Kostas Kloudas 
Date: Monday, January 4, 2021 at 6:21 AM
To: Ben Beasley 
Cc: user@flink.apache.org 
Subject: Re: Flink sink never executes
Hi Ben,

Sorry for the late reply but I guess that your question was answered
in StackOverflow, right?
Did that answer solve your problem?

Cheers,
Kostas

On Mon, Dec 21, 2020 at 9:09 AM Ben Beasley  wrote:
>
> First off I want to thank the folks in this email list for their help thus 
> far.
>
>
>
> I’m facing another strange issue where if I add a window to my stream, the 
> sink no longer executes. However the sink executes without the windowing. I 
> described my problem on stackoverflow so that the code is easy to read.
>
>
>
> I wonder if anyone can help me once more, I believe the solution could be 
> simple for someone familiar with the code. I believe I’ve followed the 
> tutorials and articles on the flink website correctly.


Kafka SQL Connector Behavior (v1.11)

2021-01-04 Thread Aeden Jameson
Based on these docs,
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html,
the default partitioning behavior is not quite clear to me.
If no value for sink-partitioner is given, is the default behavior
just that of the native Kafka library? (with key use murmur2 , without
key round robin)

Thank you,
Aeden


Comparing Flink vs Materialize

2021-01-04 Thread Dan Hill
Has anyone compared Flink with Materialize?  A friend recommended me switch
to Materialize.

In one of their blog posts, it says that Flink splits operators across CPUs
(instead of splitting partitions across CPUs).  Is this true?  Is it
configurable?

https://materialize.com/blog-rocksdb/


Replace (xx,'#','') has error

2021-01-04 Thread abc15606
What can i do?

发自我的iPhone


Re: pause and resume flink stream job based on certain condition

2021-01-04 Thread Eleanore Jin
Hi Robert,

sorry for the late reply, I just did a quick test up, this seems working:
1. during the time checkpoints could expire, but once the thread is not
blocked, it will continue checkpointing
2. this guarantees the message ordering

Thanks a lot!
Eleanore

On Tue, Dec 15, 2020 at 10:42 PM Robert Metzger  wrote:

> What you can also do is rely on Flink's backpressure mechanism: If the map
> operator that validates the messages detects that the external system is
> down, it blocks until the system is up again.
> This effectively causes the whole streaming job to pause: the Kafka source
> won't read new messages.
>
> On Tue, Dec 15, 2020 at 3:07 AM Eleanore Jin 
> wrote:
>
>> Hi Guowei and Arvid,
>>
>> Thanks for the suggestion. I wonder if it makes sense and possible that
>> the operator will produce a side output message telling the source to
>> 'pause', and the same side output as the side input to the source, based on
>> which, the source would pause and resume?
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Sun, Nov 29, 2020 at 11:33 PM Arvid Heise  wrote:
>>
>>> Hi Eleanore,
>>>
>>> if the external system is down, you could simply fail the job after a
>>> given timeout (for example, using asyncIO). Then the job would restart
>>> using the restarting policies.
>>>
>>> If your state is rather small (and thus recovery time okay), you would
>>> pretty much get your desired behavior. The job would stop to make progress
>>> until eventually the external system is responding again.
>>>
>>> On Mon, Nov 30, 2020 at 7:39 AM Guowei Ma  wrote:
>>>
 Hi, Eleanore

 1. AFAIK I think only the job could "pause" itself.  For example the
 "query" external system could pause when the external system is down.
 2. Maybe you could try the "iterate" and send the failed message back
 to retry if you use the DataStream api.

 Best,
 Guowei


 On Mon, Nov 30, 2020 at 1:01 PM Eleanore Jin 
 wrote:

> Hi experts,
>
> Here is my use case, it's a flink stateless streaming job for message
> validation.
> 1. read from a kafka topic
> 2. perform validation of message, which requires query external system
>2a. the metadata from the external system will be cached in
> memory for 15minutes
>2b. there is another stream that will send updates to update
> the cache if metadata changed within 15 minutes
> 3. if message is valid, publish to valid topic
> 4. if message is invalid, publish to error topic
> 5. if the external system is down, the message is marked as invalid
> with different error code, and published to the same error topic.
>
> Ask:
> For those messages that failed due to external system failures, it
> requires manual replay of those messages.
>
> Is there a way to pause the job if there is an external system
> failure, and resume once the external system is online?
>
> Or are there any other suggestions to allow auto retry such error?
>
> Thanks a lot!
> Eleanore
>

>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> 
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward  - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>


Re: Replace (xx,'#','') has error

2021-01-04 Thread Arvid Heise
Hi,

without seeing the error and an example, it's hard to help.

Are you sure that xx is a string? You may need to convert it beforehand
with

CAST(xx AS VARCHAR)


On Tue, Jan 5, 2021 at 3:12 AM  wrote:

> What can i do?
>
> 发自我的iPhone
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: numRecordsOutPerSecond metric and side outputs

2021-01-04 Thread Arvid Heise
Hi Alexey,

side outputs should be counted in numRecordsOutPerSecond. However, there is
a bug that this is not happening for side-outputs in the middle of the
chain [1].

[1] https://issues.apache.org/jira/browse/FLINK-18808

On Tue, Dec 22, 2020 at 1:14 AM Alexey Trenikhun  wrote:

> Hello,
> Does numRecordsOutPerSecond metric takes into account number of records
> send to side output or it provides rate only for main output?
>
> Thanks,
> Alexey
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng