Re: Handling "Global" Updating State

2021-05-16 Thread Rion Williams
Hey folks,

After digging into this a bit it does seem like Broadcast State would fit the 
bill for this scenario and keeping the downstream operators up-to-date as 
messages arrived in my Kafka topic.

My question is - is there a pattern for pre-populating the state initially? In 
my case, I need to have loaded all of my “lookup” topic into state before 
processing any records in the other stream.

My thought initially is to do something like this, if it’s possible:

- Create a KafkaConsumer on startup to read the lookup topic in its entirety 
into some collection like a hashmap (prior to executing the Flink pipeline to 
ensure synchronicity)
- Use this to initialize the state of my broadcast stream (if possible)
- At this point that stream would be broadcasting any new records coming in, so 
I “should” stay up to date at that point.

Is this an oversimplification or is there an obviously better / well known 
approach to handling this?

Thanks,

Rion

> On May 14, 2021, at 9:51 AM, Rion Williams  wrote:
> 
> 
> Hi all,
> 
> I've encountered a challenge within a Flink job that I'm currently working 
> on. The gist of it is that I have a job that listens to a series of events 
> from a Kafka topic and eventually sinks those down into Postgres via the 
> JDBCSink.
> 
> A requirement recently came up for the need to filter these events based on 
> some configurations that are currently being stored within another Kafka 
> topic. I'm wondering what the best approach might be to handle this type of 
> problem.
> 
> My initial naive approach was:
> When Flink starts up, use a regular Kafka Consumer and read all of the 
> configuration data from that topic in its entirety.
> Store the messages from that topic in some type of thread-safe collection 
> statically accessible by the operators downstream.
> Expose the thread-safe collection within the operators to actually perform 
> the filtering.
> This doesn't seem right though. I was reading about BroadcastState which 
> seems like it might fit the bill (e.g. keep those mappings in Broadcast state 
> so that all of the downstream operations would have access to them, which I'd 
> imagine would handle keeping things up to date). 
> 
> Does Flink have a good pattern / construct to handle this? Basically, I have 
> a series of mappings that I want to keep relatively up to date in a Kafka 
> topic, and I'm consuming from another Kafka topic that will need those 
> mappings to filter against.
> 
> I'd be happy to share some of the approaches I currently have or elaborate a 
> bit more if that isn't super clear.
> 
> Thanks much,
> 
> Rion
> 


Re: reactive mode and back pressure

2021-05-16 Thread Xintong Song
Hi Alexey,

I don't think the new reactive mode makes any changes to the
checkpoint/savepoint mechanism, at least not at the moment.

However, you might want to take a look at the unaligned checkpoint [1]. The
unaligned checkpoint is designed to be tolerant with back pressure. AFAIK,
this can work with both the default and the new reactive modes.

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints



On Fri, May 14, 2021 at 11:29 PM Alexey Trenikhun  wrote:

> Hello,
>
> Is new reactive mode can operate under back pressure? Old manual rescaling
> via taking savepoint didn't work with system under back pressure, since it
> was practically impossible to take savepoint, so wondering is reactive mode
> expected to be better in this regards ?
>
> Thanks,
> Alexey
>


Re: The heartbeat of JobManager timed out

2021-05-16 Thread Smile
Hi Alexey,

We also have the same problem running on Yarn using Flink 1.9.0.
JM log shows this:


We are also looking for a way to troubleshoot this problem.

Best regards.
Smile


Alexey Trenikhun wrote
> Hello,
> 
> I periodically see in JM log (Flink 12.2):
> 
> {"ts":"2021-05-15T21:10:36.325Z","message":"The heartbeat of JobManager
> with id be8225ebae1d6422b7f268c801044b05 timed
> out.","logger_name":"org.apache.flink.runtime.resourcemanager.StandaloneResourceManager","thread_name":"flink-akka.actor.default-dispatcher-5","level":"INFO","level_value":2}
> 
> How to diagnose/troubleshoot this problem? Why could JobManager, which is
> co-located with resource manager timeout, I assume this is unlikely
> network issue?
> 
> Thanks,
> Alexey





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: The heartbeat of JobManager timed out

2021-05-16 Thread Smile
JM log shows this:

INFO  org.apache.flink.yarn.YarnResourceManager - The
heartbeat of JobManager with id 41e3ef1f248d24ddefdccd1887947106 timed out.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: The heartbeat of JobManager timed out

2021-05-16 Thread Xintong Song
Hi Alexey & Smile,

JM & RM are located in the same process, thus it's unlikely a network
issue. Such timeouts are usually caused by one of the two endpoints not
responding timely.

Some common causes:
- The process is under severe GC pressure. You can check the GC logs for
the pressure.
- Insufficient CPU resource. You may check the cpu workload of the physical
machine (standalone) or pod/container (K8s/Yarn).
- Busy RPC main thread. Even if there's sufficient CPU resources (multiple
cores), the processing capacity can be limited by the single-pointed RPC
main threads. This is usually observed for large scale jobs (in terms of
number of vertices and parallelism). In that case, we would have to
increase the heartbeat timeout.

Thank you~

Xintong Song



On Mon, May 17, 2021 at 11:12 AM Smile  wrote:

> JM log shows this:
>
> INFO  org.apache.flink.yarn.YarnResourceManager - The
> heartbeat of JobManager with id 41e3ef1f248d24ddefdccd1887947106 timed out.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Convert DataStream to Table with the same columns in Row

2021-05-16 Thread John Smith
Thanks for your help Timo and Fabian,
Got it working with Timo’s suggestion.

On Fri, May 14, 2021 at 6:14 AM Fabian Paul 
wrote:

> Hi John,
>
> Can you maybe share more code about how you build the DataStrean?
> It would also be good to know against which Flink version you are testing.
> I just
> tried the following code against the current master and:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> DataStream rowStream = env.fromElements(Row.of(1, "test1"), Row.of(2, 
> "test2"));
> Table t = tableEnv.fromDataStream(rowStream);
> t.execute().print();
>
> seems to work fine.
> ++-++
> | op |  f0 | f1 |
> ++-++
> | +I |   1 |  test1 |
> | +I |   2 |  test2 |
> ++-++
>
> Best,
> Fabian
>
>
> On 14. May 2021, at 09:01, John Smith  wrote:
>
> Hi,
>
> Sorry if this is a duplicate question but I couldn't find any answer to
> my question.
> I am trying to convert a DataStream into a Table where the columns in
> the Row objects in the DataStream will become columns of the Table.
> Here is how I tried to do it:
>
> //Creating a DataStream of Row type. Let's assume the Row type has 3
> columns:
> // (c1 BIGINT, c2 String, c3 String)
> DataStream rowStream = 
>
> // Convert it to a Table
> Table t = tableEnv.fromDataStream(rowStream)
>
> // Print the table
> t.execute().print();
>
> However, when I print the table it has one column of type Row instead of
> three columns (c1, c2, c3).
>
> What I see in the print result is:
>
> +++
> | op |  f0   |
> +++
> | +I | +I{c1=1620968140951, ... |
>
> What I would like to see is:
>
> ++---+
> | op |c1  |   c2   |c3   |
> ++---+
> |   +I |  1620968140951 |  'foo'  |  'bar'  |
>
> How can I convert the DataStream to a table that has the same columns as
> the columns in Row in the DataStream.
> Would really appreciate it if anyone can share a code snippet for the
> above example.
>
> Thanks,
> JS.
>
>
>


Re: reactive mode and back pressure

2021-05-16 Thread Arvid Heise
Hi Alexey,

Flink supports rescaling from a normal checkpoint if you are not changing
your application too much. So if normal checkpointing works, you can just
use that for rescaling by using Retained Checkpoints and supply the path on
resume at the place where you supplied the savepoint path before. So this
is the easiest option to try out since it works on pretty much any Flink
version.

Reactive Mode indeed just restores from the latest completed checkpoint.
[1] Since it's backed into the system, you don't need to retain checkpoints
and supply the path manually. It's rather using the fault tolerance of
Flink to rescale on-the-fly. This feature is newly added to Flink 1.13.

The last option is to try out unaligned checkpoints. Since Flink 1.13.0,
you can also rescale from unaligned checkpoints. Note that unaligned
checkpoints work best if you can use a new source (FileSource or
KafkaSource atm).

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/#reactive-mode



On Mon, May 17, 2021 at 4:30 AM Xintong Song  wrote:

> Hi Alexey,
>
> I don't think the new reactive mode makes any changes to the
> checkpoint/savepoint mechanism, at least not at the moment.
>
> However, you might want to take a look at the unaligned checkpoint [1].
> The unaligned checkpoint is designed to be tolerant with back pressure.
> AFAIK, this can work with both the default and the new reactive modes.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints
>
>
>
> On Fri, May 14, 2021 at 11:29 PM Alexey Trenikhun  wrote:
>
>> Hello,
>>
>> Is new reactive mode can operate under back pressure? Old manual
>> rescaling via taking savepoint didn't work with system under back pressure,
>> since it was practically impossible to take savepoint, so wondering is
>> reactive mode expected to be better in this regards ?
>>
>> Thanks,
>> Alexey
>>
>


Re: Re: Handling "Global" Updating State

2021-05-16 Thread Yun Gao
Hi Rion, 

I think FLIP-150[1] should be able to solve this scenario.

Since FLIP-150 is still under discussion, for now a temporary method come 
to me might be
1. Write a first job to read the kafka and update the broadcast state of some 
operator. The job
would keep the source alive after all the data are emit (like sleep forever), 
and when all the data 
are processed, then stop the job with savepoint. 
2. Use the savepoint to start the original job. For the operator required the 
broadcast state, it could
set the same uid and same state name with the corresponding operator in the 
first job, so it could
acqure the state content on startup.

Yun,
Best

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source


 --Original Mail --
Sender:Rion Williams 
Send Date:Mon May 17 07:00:03 2021
Recipients:user 
Subject:Re: Handling "Global" Updating State
Hey folks,

After digging into this a bit it does seem like Broadcast State would fit the 
bill for this scenario and keeping the downstream operators up-to-date as 
messages arrived in my Kafka topic.

My question is - is there a pattern for pre-populating the state initially? In 
my case, I need to have loaded all of my “lookup” topic into state before 
processing any records in the other stream.

My thought initially is to do something like this, if it’s possible:

- Create a KafkaConsumer on startup to read the lookup topic in its entirety 
into some collection like a hashmap (prior to executing the Flink pipeline to 
ensure synchronicity)
- Use this to initialize the state of my broadcast stream (if possible)
- At this point that stream would be broadcasting any new records coming in, so 
I “should” stay up to date at that point.

Is this an oversimplification or is there an obviously better / well known 
approach to handling this?

Thanks,

Rion

On May 14, 2021, at 9:51 AM, Rion Williams  wrote:



Hi all,

I've encountered a challenge within a Flink job that I'm currently working on. 
The gist of it is that I have a job that listens to a series of events from a 
Kafka topic and eventually sinks those down into Postgres via the JDBCSink.

A requirement recently came up for the need to filter these events based on 
some configurations that are currently being stored within another Kafka topic. 
I'm wondering what the best approach might be to handle this type of problem.

My initial naive approach was:

When Flink starts up, use a regular Kafka Consumer and read all of the 
configuration data from that topic in its entirety.
Store the messages from that topic in some type of thread-safe collection 
statically accessible by the operators downstream.
Expose the thread-safe collection within the operators to actually perform the 
filtering.
This doesn't seem right though. I was reading about BroadcastState which seems 
like it might fit the bill (e.g. keep those mappings in Broadcast state so that 
all of the downstream operations would have access to them, which I'd imagine 
would handle keeping things up to date). 

Does Flink have a good pattern / construct to handle this? Basically, I have a 
series of mappings that I want to keep relatively up to date in a Kafka topic, 
and I'm consuming from another Kafka topic that will need those mappings to 
filter against.

I'd be happy to share some of the approaches I currently have or elaborate a 
bit more if that isn't super clear.

Thanks much,

Rion



Re: Flink SQL on Yarn For Help

2021-05-16 Thread Timo Walther

You check if there is a configuration option listed here:

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/

If it is, you can add it to config/flink-config.yaml.

Maybe others have other pointers.

Otherwise you will need to use Table API instead of SQL Client.

Regards,
Timo



On 17.05.21 08:38, Yunhui Han wrote:

Hi Timo,

Thanks for your response. I submitted to a session cluster.
And this is the full log from /log directory. I found a problem that the 
*zk namespace* of the session cluster is different from which the 
sql-client specified.

The zk namespace of the cluster is
image.png
And the zk_namespace of the sql client specified is
image.png
However, I cannot find a way to configure the zk namespace from sql client.

Regards,
Yunhui

2021-05-17 03:45:32,210 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property: jobmanager.rpc.address, localhost
2021-05-17 03:45:32,213 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property: jobmanager.rpc.port, 6123
2021-05-17 03:45:32,213 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property: jobmanager.memory.process.size, 4096m
2021-05-17 03:45:32,213 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property: taskmanager.memory.process.size, 4096m
2021-05-17 03:45:32,213 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property: taskmanager.numberOfTaskSlots, 1
2021-05-17 03:45:32,213 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property: parallelism.default, 1
2021-05-17 03:45:32,222 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property: state.backend, filesystem
2021-05-17 03:45:32,222 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property: state.checkpoints.dir,
hdfs:///flink-1.11.0/checkpoints
2021-05-17 03:45:32,222 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property: state.savepoints.dir,
hdfs:///flink-1.11.0/savepoints
2021-05-17 03:45:32,223 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property:
jobmanager.execution.failover-strategy, region
2021-05-17 03:45:32,223 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property: high-availability, zookeeper
2021-05-17 03:45:32,223 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property: high-availability.zookeeper.quorum,
***:2181,***:2181,***:2181
2021-05-17 03:45:32,223 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property: high-availability.storageDir,
hdfs:///flink-1.11.0/recovery
2021-05-17 03:45:32,223 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property:
high-availability.zookeeper.path.root, /flink-1.11.1
2021-05-17 03:45:32,224 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property:
high-availability.zookeeper.path.namespace, /infra-streaming-2
2021-05-17 03:45:32,224 INFO
  org.apache.flink.configuration.GlobalConfiguration           [] -
Loading configuration property: yarn.application-attempts, 10
2021-05-17 03:45:32,310 INFO
  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] -
Found Yarn properties file under /tmp/.yarn-properties-sdev.
2021-05-17 03:45:32,524 INFO
  org.apache.flink.table.client.gateway.local.LocalExecutor    [] -
Using default environment file:
file:/home/sdev/flink-1.12.0/conf/sql-client-defaults.yaml
2021-05-17 03:45:32,771 INFO
  org.apache.flink.table.client.config.entries.ExecutionEntry  [] -
Property 'execution.restart-strategy.type' not specified. Using
default value: fallback
2021-05-17 03:45:33,619 INFO
  org.apache.flink.table.client.gateway.local.ExecutionContext [] -
Executor config: {execution.savepoint.ignore-unclaimed-state=false,
execution.attached=true, yarn.application.id
=application_1620719181606_1106,
execution.shutdown-on-attached-exit=false,

pipeline.jars=[file:/home/sdev/flink-1.12.0/opt/flink-sql-client_2.11-1.12.0.jar],
high-availability.cluster-id=application_1620719181606_1106,
pipeline.classpaths=[], execution.target=yarn-session,
$internal.deployment.config-dir=/home/sdev/flink-1.12.0/conf}
2021-05-17 03:45:33,783 INFO
  org.apac

Re: reactive mode and back pressure

2021-05-16 Thread Alexey Trenikhun
Hi Xintong,
Does reactive mode need checkpoint for re-scheduling ?

Thanks,
Alexey


From: Xintong Song 
Sent: Sunday, May 16, 2021 7:30:15 PM
To: Flink User Mail List 
Subject: Re: reactive mode and back pressure

Hi Alexey,

I don't think the new reactive mode makes any changes to the 
checkpoint/savepoint mechanism, at least not at the moment.

However, you might want to take a look at the unaligned checkpoint [1]. The 
unaligned checkpoint is designed to be tolerant with back pressure. AFAIK, this 
can work with both the default and the new reactive modes.


Thank you~

Xintong Song


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints


On Fri, May 14, 2021 at 11:29 PM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
Hello,

Is new reactive mode can operate under back pressure? Old manual rescaling via 
taking savepoint didn't work with system under back pressure, since it was 
practically impossible to take savepoint, so wondering is reactive mode 
expected to be better in this regards ?

Thanks,
Alexey