[SURVEY] What Change Data Capture tools are you using?

2020-03-11 Thread Jark Wu
Hi all,

The Flink community is going to integrate some popular Change Data Capture
(CDC) tools. We would like to support reading and processing some common
binlog/changelog data in Flink SQL in the next release. We hope this survey
can help identify the most common cases and prioritize our roadmaps. I will
also share a report with a summary when the survey is closed.

The survey is very simple and is available here:
https://forms.gle/GZ5bpUpPg7tWYRRo8

Welcome feedbacks under this thread to share more context of your cases.

Best,
Jark


Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-11 Thread Jark Wu
Hi Lei,

Yes. If you are creating a Kafka table, then the kafka connector jar and
some format jars are required.

That's weird. If DDL is failed, the yaml way should fail in the same
exception, unless some connector properties value is not the same.

Could you share the detailed exception stack?

Best,
Jark

On Wed, 11 Mar 2020 at 14:51, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Hi Jark,
>
> I  have tried to use CREATE table DDL
> First  ./bin/sql-client.sh embedded. Then create a table from kafka topic
> and it tell me table has been created.
> But when I query with select * from tableName. There's error:
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
> a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Perhaps i need some jar to the lib directory.
> But If i write the table configuration in the sql-client-defaults.yaml
> file,i can select the result correctly
>
> Thanks,
> Lei
>
> --
>
>
> *From:* Jark Wu 
> *Date:* 2020-03-11 11:13
> *To:* wangl...@geekplus.com.cn
> *CC:* Arvid Heise ; user 
> *Subject:* Re: Re: Dose flink-1.10 sql-client support kafka sink?
> Hi Lei,
>
> CREATE TABLE DDL [1][2] is the recommended way to register a table since
> 1.9. And the yaml way might be deprecated in the future.
> By using DDL, a registered table can both be used as source and sink.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>
> On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>> Thanks, works now.
>>
>> Seems it is because i added the
>>schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code
>> STRING, status INT)"
>>
>> under format label.
>>
>> *From:* Arvid Heise 
>> *Date:* 2020-03-10 20:51
>> *To:* wangl...@geekplus.com.cn
>> *CC:* user 
>> *Subject:* Re: Dose flink-1.10 sql-client support kafka sink?
>> Hi Lei,
>>
>> yes Kafka as a sink is supported albeit only for appends (no
>> deletions/updates yet) [1].
>>
>> An example is a bit hidden in the documentation [2]:
>>
>> tables:
>>   - name: MyTableSink
>> type: sink-table
>> update-mode: append
>> connector:
>>   property-version: 1
>>   type: kafka
>>   version: "0.11"
>>   topic: OutputTopic
>>   properties:
>> zookeeper.connect: localhost:2181
>> bootstrap.servers: localhost:9092
>> group.id: testGroup
>> format:
>>   property-version: 1
>>   type: json
>>   derive-schema: true
>> schema:
>>   - name: rideId
>> data-type: BIGINT
>>   - name: lon
>> data-type: FLOAT
>>   - name: lat
>> data-type: FLOAT
>>   - name: rideTime
>> data-type: TIMESTAMP(3)
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries
>>
>> On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn <
>> wangl...@geekplus.com.cn> wrote:
>>
>>>
>>> I have configured  source table successfully using the following
>>> configuration:
>>>
>>> - name: out_order
>>> type: source
>>> update-mode: append
>>> schema:
>>> - name: out_order_code
>>>   type: STRING
>>> - name: input_date
>>>   type: BIGINT
>>> - name: owner_code
>>>   type: STRING
>>> connector:
>>>   property-version: 1
>>>   type: kafka
>>>   version: universal
>>>   topic: out_order
>>>   startup-mode: latest-offset
>>>   properties:
>>>   - key: zookeeper.connect
>>> value: 172.19.78.32:2181
>>>   - key: bootstrap.servers
>>> value: 172.19.78.32:9092
>>>   - key: group.id
>>>   property-version: 1
>>>   type: json
>>>   schema: "ROW(out_order_code STRING,owner_code STRING,input_date
>>> BIGINT)"
>>>
>>> How can i configure a sink table? I haven't found any useful docs for
>>> this.
>>>
>>> Thanks,
>>> Lei
>>>
>>


Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-11 Thread wangl...@geekplus.com.cn

I am using flink-1.10.  But I add flink-json-1.9.1.jar and 
flink-sql-connector-kafka_2.11-1.9.1.jar to lib directory.
After change to flink-json-1.10.0.jar, 
flink-sql-connector-kafka_2.12-1.10.0.jar, it works.

But I have no idea why the yaml way works when i use  flink-json-1.9.1.jar and 
flink-sql-connector-kafka_2.11-1.9.1.jar in  flink-1.10 environment.

Thanks,
Lei



wangl...@geekplus.com.cn

 
From: wangl...@geekplus.com.cn
Date: 2020-03-11 14:51
To: Jark Wu
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Jark, 

I  have tried to use CREATE table DDL
First  ./bin/sql-client.sh embedded. Then create a table from kafka topic and 
it tell me table has been created.
But when I query with select * from tableName. There's error:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Perhaps i need some jar to the lib directory.
But If i write the table configuration in the sql-client-defaults.yaml file,i 
can select the result correctly

Thanks,
Lei
 



 
From: Jark Wu
Date: 2020-03-11 11:13
To: wangl...@geekplus.com.cn
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. 
And the yaml way might be deprecated in the future. 
By using DDL, a registered table can both be used as source and sink. 

Best,
Jark

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn 
 wrote:
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, 
status INT)"

under format label.

From: Arvid Heise
Date: 2020-03-10 20:51
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates 
yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
type: sink-table
update-mode: append
connector:
  property-version: 1
  type: kafka
  version: "0.11"
  topic: OutputTopic
  properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
  property-version: 1
  type: json
  derive-schema: true
schema:
  - name: rideId
data-type: BIGINT
  - name: lon
data-type: FLOAT
  - name: lat
data-type: FLOAT
  - name: rideTime
data-type: TIMESTAMP(3)
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn 
 wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
  type: STRING
- name: input_date
  type: BIGINT
- name: owner_code
  type: STRING
connector:
  property-version: 1
  type: kafka
  version: universal
  topic: out_order
  startup-mode: latest-offset
  properties:
  - key: zookeeper.connect
value: 172.19.78.32:2181
  - key: bootstrap.servers
value: 172.19.78.32:9092
  - key: group.id
  property-version: 1
  type: json
  schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei


Re: Use flink to calculate sum of the inventory under certain conditions

2020-03-11 Thread Jiawei Wu
Hi Kurt,

What you said is the 1st reason.
The second reason is this query need to scan the whole table. I think we
can do better :-)

Best,
Jiawei

On Wed, Mar 11, 2020 at 10:52 AM Kurt Young  wrote:

> Hi Jiawai,
>
> Sorry I still didn't fully get your question. What's wrong with your
> proposed SQL?
>
> > select vendorId, sum(inventory units)
> > from dynamodb
> > where today's time - inbound time > 15
> > group by vendorId
>
> My guess is that such query would only trigger calculations by new event.
> So if a very old
> inventory like inbounded 17 days ago, and there is no new events coming
> about that inventory,
> then the calculation would not be triggered and you can't sum it, right?
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 10:06 AM Jiawei Wu 
> wrote:
>
>> Hi Robert,
>>
>> Your answer really helps.
>>
>> About the problem, we have 2 choices. The first one is using Flink as
>> described in this email thread. The second one is using AWS Lambda
>> triggered by CDC stream and compute the latest 15 days record, which is a
>> walk-around solution and looks not as elegant as Flink to me.
>>
>> Currently we decided to choose AWS Lambda because we are familiar with
>> it, and the most important, it lead to nearly no operational burden. But we
>> are actively looking for the comparison between Lambda and Flink and want
>> to know in which situation we prefer Flink over Lambda. Several teams in
>> our company are already in a hot debate about the comparison, and the
>> biggest concern is the non-function requirements about Flink, such as fault
>> tolerance, recovery, etc.
>>
>> I also searched the internet but found there are nearly no comparisons
>> between Lambda and Flink except for their market share :-( I'm wondering
>> what do you think of this? Or any comments from flink community is
>> appreciated.
>>
>> Thanks,
>> J
>>
>>
>> On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger 
>> wrote:
>>
>>> Hey Jiawei,
>>>
>>> I'm sorry that you haven't received an answer yet.
>>>
>>> So you basically have a stream of dynamodb table updates (let's call id
>>> CDC stream), and you would like to maintain the inventory of the last 15
>>> days for each vendor.
>>> Whenever there's an update in the inventory data (a new event arrives in
>>> the CDC stream), you want to produce a new event with the inventory count.
>>>
>>> If I'm not mistaken, you will need to keep all the inventory in Flink's
>>> state to have an accurate count and to drop old records when they are
>>> expired.
>>> There are two options for maintaining the state:
>>> - in memory (using the FsStateBackend)
>>> - on disk (using the embedded RocksDBStatebackend)
>>>
>>> I would recommend starting with the RocksDBStateBackend. It will work as
>>> long as your state fits on all your machines hard disks (we'll probably not
>>> have an issue there :) )
>>> If you run into performance issues, you can consider switching to a
>>> memory based backend (by then, you should have some knowledge about your
>>> state size)
>>>
>>> For tracking the events, I would recommend you to look into Flink's
>>> windowing API:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>>>  / https://flink.apache.org/news/2015/12/04/Introducing-windows.html
>>> Or alternatively doing an implementation with ProcessFunction:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html
>>> I personally would give it a try with ProcessFunction first.
>>>
>>> For reading the data from DynamoDB, there's an undocumented feature for
>>> it in Flink. This is an example for reading from a DynamoDB stream:
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java
>>> Here's also some info: https://issues.apache.org/jira/browse/FLINK-4582
>>>
>>> For writing to DynamoDB there is currently no official sink in Flink. It
>>> should be fairly straightforward to implement a Sink using the SinkFunction
>>> interface of Flink.
>>>
>>> I hope this answers your question.
>>>
>>> Best,
>>> Robert
>>>
>>>
>>>
>>>
>>> On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu 
>>> wrote:
>>>
 Hi flink users,

 We have a problem and think flink may be a good solution for that. But
 I'm new to flink and hope can get some insights from flink community :)

 Here is the problem. Suppose we have a DynamoDB table which store the
 inventory data, the schema is like:

 * vendorId (primary key)
 * inventory name
 * inventory units
 * inbound time
 ...

 This DDB table keeps changing, since we have inventory coming and
 removal. *Every change will trigger a DynamoDB stream. *
 We need to calculate *all the inventory units that > 15 days for a
 specific vendor* like this:
 > select vendorId, sum(inventory units)
 > from dynamodb
 > where to

Re: Use flink to calculate sum of the inventory under certain conditions

2020-03-11 Thread Arvid Heise
>
> About the problem, we have 2 choices. The first one is using Flink as
> described in this email thread. The second one is using AWS Lambda
> triggered by CDC stream and compute the latest 15 days record, which is a
> walk-around solution and looks not as elegant as Flink to me.
>
>
Currently we decided to choose AWS Lambda because we are familiar with it,
> and the most important, it lead to nearly no operational burden. But we are
> actively looking for the comparison between Lambda and Flink and want to
> know in which situation we prefer Flink over Lambda. Several teams in our
> company are already in a hot debate about the comparison, and the biggest
> concern is the non-function requirements about Flink, such as fault
> tolerance, recovery, etc.
>
> I also searched the internet but found there are nearly no comparisons
> between Lambda and Flink except for their market share :-( I'm wondering
> what do you think of this? Or any comments from flink community is
> appreciated.
>

You pretty much described the biggest difference already. Doing any more
complex operation with Lambda will turn into a mess quickly.

Lambdas currently shine for two use cases because of the ease of operation
and unlimited scalability:
- Simple transformations: input -> transform -> output
- Simple database updates (together with Dynamo): input -> lookup by key
(db), update by key (db) -> output

As soon as you exceed point queries (time windows, joins) or have state,
Lambdas actually get harder to manage imho. You need a zoo of supporting
technologies or sacrifice lots of performance.

In Flink, you have a higher barrier to entry, but as soon as your streaming
application grows, it pays off quickly. Data is relocated with processing,
such that you don't need to program access patterns yourself.

So I'd decide it on a case by case basis for each application. If it's one
of the two above mentioned use cases, just go lambda. You will not gain
much with Flink, especially if you already have the experience.
If you know your application will grow out of these use cases or is more
complex to begin with, consider Flink.

There is also one relatively new technology based on Flink called stateful
functions [1]. It tries to combine the advanced state processing of Flink
with the benefits of Lambdas (albeit scalability is not unlimited). You
might want to check that out, as it may solve your use cases.

[1] https://statefun.io/

On Wed, Mar 11, 2020 at 3:06 AM Jiawei Wu  wrote:

> Hi Robert,
>
> Your answer really helps.
>
> About the problem, we have 2 choices. The first one is using Flink as
> described in this email thread. The second one is using AWS Lambda
> triggered by CDC stream and compute the latest 15 days record, which is a
> walk-around solution and looks not as elegant as Flink to me.
>
> Currently we decided to choose AWS Lambda because we are familiar with it,
> and the most important, it lead to nearly no operational burden. But we are
> actively looking for the comparison between Lambda and Flink and want to
> know in which situation we prefer Flink over Lambda. Several teams in our
> company are already in a hot debate about the comparison, and the biggest
> concern is the non-function requirements about Flink, such as fault
> tolerance, recovery, etc.
>
> I also searched the internet but found there are nearly no comparisons
> between Lambda and Flink except for their market share :-( I'm wondering
> what do you think of this? Or any comments from flink community is
> appreciated.
>
> Thanks,
> J
>
>
> On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger  wrote:
>
>> Hey Jiawei,
>>
>> I'm sorry that you haven't received an answer yet.
>>
>> So you basically have a stream of dynamodb table updates (let's call id
>> CDC stream), and you would like to maintain the inventory of the last 15
>> days for each vendor.
>> Whenever there's an update in the inventory data (a new event arrives in
>> the CDC stream), you want to produce a new event with the inventory count.
>>
>> If I'm not mistaken, you will need to keep all the inventory in Flink's
>> state to have an accurate count and to drop old records when they are
>> expired.
>> There are two options for maintaining the state:
>> - in memory (using the FsStateBackend)
>> - on disk (using the embedded RocksDBStatebackend)
>>
>> I would recommend starting with the RocksDBStateBackend. It will work as
>> long as your state fits on all your machines hard disks (we'll probably not
>> have an issue there :) )
>> If you run into performance issues, you can consider switching to a
>> memory based backend (by then, you should have some knowledge about your
>> state size)
>>
>> For tracking the events, I would recommend you to look into Flink's
>> windowing API:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>>  / https://flink.apache.org/news/2015/12/04/Introducing-windows.html
>> Or alternatively doing an implementation with ProcessFunctio

Re: Setting app Flink logger

2020-03-11 Thread Chesnay Schepler

@Eyal:

The image you are using is for 1.9.2, but the logging configuration you 
fetched was from master.


In 1.9.2 we use Log4j1, but on master we switched to Log4j2 instead, 
which uses a different configuration syntax. Log4j1 pretty much ignores 
the entire file, causing the error.


Please use the configuration file from the release-1.9 branch.

On 11/03/2020 03:50, Yang Wang wrote:

Since you are using log4j2, the java dynamic property should not be
"log4j.configuration". Please use "log4j.configurationFile" instead.

Maybe it is not your problem, there is something wrong with the docker
image. The log4j2 properties in "flink-console.sh" are not configured
correctly.


Best,
Yang

miki haiat mailto:miko5...@gmail.com>> 
于2020年3月10日周二 下午11:50写道:


Which image are you using ?

On Tue, Mar 10, 2020, 16:27 Eyal Pe'er mailto:eyal.p...@startapp.com>> wrote:

Hi Rafi,

The file exists (and is the file from the official imageJ,
please see below).

The user is root and it has permission. I am running in HA
mode using docker.

cat /opt/flink/conf/log4j-console.properties




# Licensed to the Apache Software Foundation (ASF) under one

#  or more contributor license agreements.  See the NOTICE file

# distributed with this work for additional information

# regarding copyright ownership.  The ASF licenses this file

#  to you under the Apache License, Version 2.0 (the

# "License"); you may not use this file except in compliance

# with the License.  You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing,
software

# distributed under the License is distributed on an "AS IS"
BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.

#  See the License for the specific language governing
permissions and

# limitations under the License.




# This affects logging for both user code and Flink

rootLogger.level = INFO

rootLogger.appenderRef.console.ref = ConsoleAppender

# Uncomment this if you want to _only_ change Flink's logging

#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common
libraries/connectors on

# log level INFO. The root logger does not override this. You
have to manually

# change the log levels here.

logger.akka.name  = akka

logger.akka.level = INFO

logger.kafka.name = org.apache.kafka

logger.kafka.level = INFO

logger.hadoop.name  = org.apache.hadoop

logger.hadoop.level = INFO

logger.zookeeper.name  =
org.apache.zookeeper

logger.zookeeper.level = INFO

# Log all infos to the console

appender.console.name  =
ConsoleAppender

appender.console.type = CONSOLE

appender.console.layout.type = PatternLayout

appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS}
%-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty
channel handler

logger.netty.name  =

org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline

logger.netty.level = OFF

Best regards

Eyal Peer */ *Data Platform Developer

cid:image003.png@01D32C73.C785C410

*From:* Rafi Aroch mailto:rafi.ar...@gmail.com>>
*Sent:* Tuesday, March 10, 2020 3:55 PM
*To:* Eyal Pe'er mailto:eyal.p...@startapp.com>>
*Cc:* user mailto:user@flink.apache.org>>; StartApp R&D Data Platform
mailto:startapprnd...@startapp.com>>
*Subject:* Re: Setting app Flink logger

Hi Eyal,

Sounds trivial, but can you verify that the file actually
exists in /opt/flink/conf/log4j-console.properties? Also,
verify that the user running the process has read permissions
to that file.

You said you use Flink in YARN mode, but the the example above
you run inside a docker image so this is a bit confusing.
Notice that the official docker images run as "flink" user and
group ids.

If you wish to try to use Logback instead, you can place you
logback.xml file as part of your project resources folder to
include it in the classpath. That should automatically get
detected on startup.

Hope this helps,

Rafi


Re: datadog metrics

2020-03-11 Thread Chesnay Schepler
Please open a JIRA; we may have to split the datatog report into several 
chunks.


On 09/03/2020 07:47, Fanbin Bu wrote:

quote from the following link:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Query-named-operator-exceeds-80-characters-td24807.html#a24818

"This is a safeguard in the metric system to prevent extremely long names
(as these could cause the reporting to fail); so long as the prefix is
unique you can safely ignore this warning."

I do see from log that my sql operator name is too long and says it's 
truncated.

But i still failed to report to datadog.

Thanks
Fanbin

On Sun, Mar 8, 2020 at 11:36 PM Fanbin Bu > wrote:


Hi,

Has anybody seen this error before and what is the suggested way
to solve it?

2020-03-07 02:54:34,100 WARN
 org.apache.flink.metrics.datadog.DatadogHttpClient      - Failed
to send request to Datadog (response was
Response{protocol=http/1.1, code=413, message=Request Entity Too
Large, url=https://app.datadoghq.com/api/v1/series?api_key=

thanks,
Fanbin





Re: scaling issue Running Flink on Kubernetes

2020-03-11 Thread Flavio Pompermaier
Have you tried to use existing operators such as
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator or
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator?

On Wed, Mar 11, 2020 at 4:46 AM Xintong Song  wrote:

> Hi Eleanore,
>
> That does't sound like a scaling issue. It's probably a data skew, that
> the data volume on some of the keys are significantly higher than others.
> I'm not familiar with this area though, and have copied Jark for you, who
> is one of the community experts in this area.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Mar 11, 2020 at 10:37 AM Eleanore Jin 
> wrote:
>
>> _Hi Xintong,
>>
>> Thanks for the prompt reply! To answer your question:
>>
>>- Which Flink version are you using?
>>
>>v1.8.2
>>
>>- Is this skew observed only after a scaling-up? What happens if the
>>parallelism is initially set to the scaled-up value?
>>
>>I also tried this, it seems skew also happens even I do
>> not change the parallelism, so it may not caused by scale-up/down
>>
>>- Keeping the job running a while after the scale-up, does the skew
>>ease?
>>
>>So the skew happens in such a way that: some partitions
>> lags down to 0, but other partitions are still at level of 10_000, and I am
>> seeing the back pressure is ok.
>>
>> Thanks a lot!
>> Eleanore
>>
>>
>> On Tue, Mar 10, 2020 at 7:03 PM Xintong Song 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> I have a few more questions regarding your issue.
>>>
>>>- Which Flink version are you using?
>>>- Is this skew observed only after a scaling-up? What happens if the
>>>parallelism is initially set to the scaled-up value?
>>>- Keeping the job running a while after the scale-up, does the skew
>>>ease?
>>>
>>> I suspect the performance difference might be an outcome of some warming
>>> up issues. E.g., the existing TMs might have some file already localized,
>>> or some memory buffers already promoted to the JVM tenured area, while the
>>> new TMs have not.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin 
>>> wrote:
>>>
 Hi Experts,
 I have my flink application running on Kubernetes, initially with 1 Job
 Manager, and 2 Task Managers.

 Then we have the custom operator that watches for the CRD, when the CRD
 replicas changed, it will patch the Flink Job Manager deployment
 parallelism and max parallelism according to the replicas from CRD
 (parallelism can be configured via env variables for our application).
 which causes the job manager restart. hence a new Flink job. But the
 consumer group does not change, so it will continue from the offset
 where it left.

 In addition, operator will also update Task Manager's deployment
 replicas, and will adjust the pod number.

 In case of scale up, the existing task manager pods do not get killed,
 but new task manager pods will be created.

 And we observed a skew in the partition offset consumed. e.g. some
 partitions have huge lags and other partitions have small lags. (observed
 from burrow)

 This is also validated by the metrics from Flink UI, showing the
 throughput differs for slotss

 Any clue why this is the case?

 Thanks a lot!
 Eleanore

>>>


Re: scaling issue Running Flink on Kubernetes

2020-03-11 Thread Flavio Pompermaier
Sorry I wanted to mention https://github.com/lyft/flinkk8soperator (I don't
know which one of the 2 is better)

On Wed, Mar 11, 2020 at 10:19 AM Flavio Pompermaier 
wrote:

> Have you tried to use existing operators such as
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator or
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator?
>
> On Wed, Mar 11, 2020 at 4:46 AM Xintong Song 
> wrote:
>
>> Hi Eleanore,
>>
>> That does't sound like a scaling issue. It's probably a data skew, that
>> the data volume on some of the keys are significantly higher than others.
>> I'm not familiar with this area though, and have copied Jark for you, who
>> is one of the community experts in this area.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Mar 11, 2020 at 10:37 AM Eleanore Jin 
>> wrote:
>>
>>> _Hi Xintong,
>>>
>>> Thanks for the prompt reply! To answer your question:
>>>
>>>- Which Flink version are you using?
>>>
>>>v1.8.2
>>>
>>>- Is this skew observed only after a scaling-up? What happens if the
>>>parallelism is initially set to the scaled-up value?
>>>
>>>I also tried this, it seems skew also happens even I do
>>> not change the parallelism, so it may not caused by scale-up/down
>>>
>>>- Keeping the job running a while after the scale-up, does the skew
>>>ease?
>>>
>>>So the skew happens in such a way that: some partitions
>>> lags down to 0, but other partitions are still at level of 10_000, and I am
>>> seeing the back pressure is ok.
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>>
>>> On Tue, Mar 10, 2020 at 7:03 PM Xintong Song 
>>> wrote:
>>>
 Hi Eleanore,

 I have a few more questions regarding your issue.

- Which Flink version are you using?
- Is this skew observed only after a scaling-up? What happens if
the parallelism is initially set to the scaled-up value?
- Keeping the job running a while after the scale-up, does the skew
ease?

 I suspect the performance difference might be an outcome of some
 warming up issues. E.g., the existing TMs might have some file already
 localized, or some memory buffers already promoted to the JVM tenured area,
 while the new TMs have not.

 Thank you~

 Xintong Song



 On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin 
 wrote:

> Hi Experts,
> I have my flink application running on Kubernetes, initially with 1
> Job Manager, and 2 Task Managers.
>
> Then we have the custom operator that watches for the CRD, when the
> CRD replicas changed, it will patch the Flink Job Manager deployment
> parallelism and max parallelism according to the replicas from CRD
> (parallelism can be configured via env variables for our application).
> which causes the job manager restart. hence a new Flink job. But the
> consumer group does not change, so it will continue from the offset
> where it left.
>
> In addition, operator will also update Task Manager's deployment
> replicas, and will adjust the pod number.
>
> In case of scale up, the existing task manager pods do not get killed,
> but new task manager pods will be created.
>
> And we observed a skew in the partition offset consumed. e.g. some
> partitions have huge lags and other partitions have small lags. (observed
> from burrow)
>
> This is also validated by the metrics from Flink UI, showing the
> throughput differs for slotss
>
> Any clue why this is the case?
>
> Thanks a lot!
> Eleanore
>

>


Re: Use flink to calculate sum of the inventory under certain conditions

2020-03-11 Thread Kurt Young
> The second reason is this query need to scan the whole table. I think we
can do better :-)

Not necessarily, you said all the changes will trigger a DDB stream, you
can use Flink to consume such
stream incrementally.

For the 1st problem, I think you can use DataStream API and register a
timer on every inventory which
got inbound. If the inventory got updated before timeout, you can delete
the timer, otherwise the timer
will trigger the calculation after timeout and you can get the total count
and emit that whenever an inventory
times out.

Best,
Kurt


On Wed, Mar 11, 2020 at 4:53 PM Arvid Heise  wrote:

> About the problem, we have 2 choices. The first one is using Flink as
>> described in this email thread. The second one is using AWS Lambda
>> triggered by CDC stream and compute the latest 15 days record, which is a
>> walk-around solution and looks not as elegant as Flink to me.
>>
>>
> Currently we decided to choose AWS Lambda because we are familiar with it,
>> and the most important, it lead to nearly no operational burden. But we are
>> actively looking for the comparison between Lambda and Flink and want to
>> know in which situation we prefer Flink over Lambda. Several teams in our
>> company are already in a hot debate about the comparison, and the biggest
>> concern is the non-function requirements about Flink, such as fault
>> tolerance, recovery, etc.
>>
>> I also searched the internet but found there are nearly no comparisons
>> between Lambda and Flink except for their market share :-( I'm wondering
>> what do you think of this? Or any comments from flink community is
>> appreciated.
>>
>
> You pretty much described the biggest difference already. Doing any more
> complex operation with Lambda will turn into a mess quickly.
>
> Lambdas currently shine for two use cases because of the ease of operation
> and unlimited scalability:
> - Simple transformations: input -> transform -> output
> - Simple database updates (together with Dynamo): input -> lookup by key
> (db), update by key (db) -> output
>
> As soon as you exceed point queries (time windows, joins) or have state,
> Lambdas actually get harder to manage imho. You need a zoo of supporting
> technologies or sacrifice lots of performance.
>
> In Flink, you have a higher barrier to entry, but as soon as your
> streaming application grows, it pays off quickly. Data is relocated with
> processing, such that you don't need to program access patterns yourself.
>
> So I'd decide it on a case by case basis for each application. If it's one
> of the two above mentioned use cases, just go lambda. You will not gain
> much with Flink, especially if you already have the experience.
> If you know your application will grow out of these use cases or is more
> complex to begin with, consider Flink.
>
> There is also one relatively new technology based on Flink called stateful
> functions [1]. It tries to combine the advanced state processing of Flink
> with the benefits of Lambdas (albeit scalability is not unlimited). You
> might want to check that out, as it may solve your use cases.
>
> [1] https://statefun.io/
>
> On Wed, Mar 11, 2020 at 3:06 AM Jiawei Wu 
> wrote:
>
>> Hi Robert,
>>
>> Your answer really helps.
>>
>> About the problem, we have 2 choices. The first one is using Flink as
>> described in this email thread. The second one is using AWS Lambda
>> triggered by CDC stream and compute the latest 15 days record, which is a
>> walk-around solution and looks not as elegant as Flink to me.
>>
>> Currently we decided to choose AWS Lambda because we are familiar with
>> it, and the most important, it lead to nearly no operational burden. But we
>> are actively looking for the comparison between Lambda and Flink and want
>> to know in which situation we prefer Flink over Lambda. Several teams in
>> our company are already in a hot debate about the comparison, and the
>> biggest concern is the non-function requirements about Flink, such as fault
>> tolerance, recovery, etc.
>>
>> I also searched the internet but found there are nearly no comparisons
>> between Lambda and Flink except for their market share :-( I'm wondering
>> what do you think of this? Or any comments from flink community is
>> appreciated.
>>
>> Thanks,
>> J
>>
>>
>> On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger 
>> wrote:
>>
>>> Hey Jiawei,
>>>
>>> I'm sorry that you haven't received an answer yet.
>>>
>>> So you basically have a stream of dynamodb table updates (let's call id
>>> CDC stream), and you would like to maintain the inventory of the last 15
>>> days for each vendor.
>>> Whenever there's an update in the inventory data (a new event arrives in
>>> the CDC stream), you want to produce a new event with the inventory count.
>>>
>>> If I'm not mistaken, you will need to keep all the inventory in Flink's
>>> state to have an accurate count and to drop old records when they are
>>> expired.
>>> There are two options for maintaining the state:
>>> - in memory (using the 

json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


回复: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn

Sorry i sent the Chinese written email to user@ 
Let me translate it to English.

I  create a table using sql-client from kafka topic:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
Then I send message to the topic: 
{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
But the input_date is not recognized on the sql-client and is null, even i  
tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000" 

How should the timestamp(3) look like in the json message?

Thanks,
Lei
 




wangl...@geekplus.com.cn
 
发件人: wangl...@geekplus.com.cn
发送时间: 2020-03-11 17:41
收件人: user
主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread Jark Wu
Hi Lei,

The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong
field name in the DDL.
It should be "input_date", not "intput_date".

Best,
Jark

On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> Sorry i sent the Chinese written email to user@
> Let me translate it to English.
>
> I  create a table using sql-client from kafka topic:
>
> CREATE TABLE order_status (
>   out_order_code VARCHAR,
>   intput_date TIMESTAMP(3),
>   owner_code VARCHAR,
>   status INT
>   ) WITH (
>   'connector.type' = 'kafka',
>
> .
>   'format.type' = 'json',
>
>   'format.derive-schema' = 'true'
>
> )
>
> Then I send message to the topic:
> {"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
> But the input_date is not recognized on the sql-client and is null, even i
>  tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000"
>
> How should the timestamp(3) look like in the json message?
>
> Thanks,
> Lei
>
>
> --
>
> wangl...@geekplus.com.cn
>
>
> *发件人:* wangl...@geekplus.com.cn
> *发送时间:* 2020-03-11 17:41
> *收件人:* user 
> *主题:* json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
>
> 用 sql-client create 了一个 kafka table:
>
> CREATE TABLE order_status (
>   out_order_code VARCHAR,
>   intput_date TIMESTAMP(3),
>   owner_code VARCHAR,
>   status INT
>   ) WITH (
>   'connector.type' = 'kafka',
>
> .
>   'format.type' = 'json',
>
>   'format.derive-schema' = 'true'
>
> )
>
> 然后往 kafka 这个 topic
> 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
> input_date 在 sql-clinet 端始终是 NULL.
> 我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11
> 13:00:00.000" 也都不行。
> 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?
>
> 谢谢,
> 王磊
>
> --
> wangl...@geekplus.com.cn
>
>


Re: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 Thread wangl...@geekplus.com.cn

Thanks Jark,

No word to express my '囧'.


wangl...@geekplus.com.cn 

Sender: Jark Wu
Send Time: 2020-03-11 18:32
Receiver: wangl...@geekplus.com.cn
cc: user; user-zh
Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Hi Lei, 

The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong 
field name in the DDL. 
It should be "input_date", not "intput_date".

Best,
Jark

On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn 
 wrote:

Sorry i sent the Chinese written email to user@ 
Let me translate it to English.

I  create a table using sql-client from kafka topic:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
Then I send message to the topic: 
{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
But the input_date is not recognized on the sql-client and is null, even i  
tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000" 

How should the timestamp(3) look like in the json message?

Thanks,
Lei
 




wangl...@geekplus.com.cn
 
发件人: wangl...@geekplus.com.cn
发送时间: 2020-03-11 17:41
收件人: user
主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


RE: Setting app Flink logger

2020-03-11 Thread Eyal Pe'er
Thank you all.
I will change the configuration file to and deploy next week.

Best regards
Eyal Peer
From: Chesnay Schepler 
Sent: Wednesday, March 11, 2020 11:10 AM
To: Yang Wang ; miki haiat 
Cc: Eyal Pe'er ; Rafi Aroch ; 
user ; StartApp R&D Data Platform 

Subject: Re: Setting app Flink logger

@Eyal:

The image you are using is for 1.9.2, but the logging configuration you fetched 
was from master.

In 1.9.2 we use Log4j1, but on master we switched to Log4j2 instead, which uses 
a different configuration syntax. Log4j1 pretty much ignores the entire file, 
causing the error.

Please use the configuration file from the release-1.9 branch.

On 11/03/2020 03:50, Yang Wang wrote:
Since you are using log4j2, the java dynamic property should not be
"log4j.configuration". Please use "log4j.configurationFile" instead.

Maybe it is not your problem, there is something wrong with the docker
image. The log4j2 properties in "flink-console.sh" are not configured
correctly.


Best,
Yang

miki haiat mailto:miko5...@gmail.com>> 于2020年3月10日周二 
下午11:50写道:
Which image are you using ?

On Tue, Mar 10, 2020, 16:27 Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
Hi Rafi,
The file exists (and is the file from the official image☺, please see below).
The user is root and it has permission. I am running in HA mode using docker.

cat /opt/flink/conf/log4j-console.properties


#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.


# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - 
%m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = 
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

Best regards
Eyal Peer / Data Platform Developer

From: Rafi Aroch mailto:rafi.ar...@gmail.com>>
Sent: Tuesday, March 10, 2020 3:55 PM
To: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Cc: user mailto:user@flink.apache.org>>; StartApp R&D 
Data Platform mailto:startapprnd...@startapp.com>>
Subject: Re: Setting app Flink logger

Hi Eyal,

Sounds trivial, but can you verify that the file actually exists in 
/opt/flink/conf/log4j-console.properties? Also, verify that the user running 
the process has read permissions to that file.
You said you use Flink in YARN mode, but the the example above you run inside a 
docker image so this is a bit confusing. Notice that the official docker images 
run as "flink" user and group ids.

If you wish to try to use Logback instead, you can place you logback.xml file 
as part of your project resources folder to include it in the classpath. That 
should automatically get detected on startup.

Hope this helps,
Rafi


On Tue, Mar 10, 2020 at 1:42 PM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
Hi,
I am running Flink in YARN mode using the official image with few additional 
files.
I’ve noticed that my logger failed to initialize:

root:~# docker logs flink-task-manager
Starting taskexecutor as a console application on host ***.
log4j:WARN No appenders could be found for logger 
(org.apache.flink.runtime.taskexecutor.TaskManagerRunner).
log4j:WARN P

Re: scaling issue Running Flink on Kubernetes

2020-03-11 Thread Eleanore Jin
Hi Flavio,

We have implemented our own flink operator, the operator will start a flink
job cluster (the application jar is already packaged together with flink in
the docker image). I believe Google's flink operator will start a session
cluster, and user can submit the flink job via REST. Not looked into lyft
one before.

Eleanore


On Wed, Mar 11, 2020 at 2:21 AM Flavio Pompermaier 
wrote:

> Sorry I wanted to mention https://github.com/lyft/flinkk8soperator (I
> don't know which one of the 2 is better)
>
> On Wed, Mar 11, 2020 at 10:19 AM Flavio Pompermaier 
> wrote:
>
>> Have you tried to use existing operators such as
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator or
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator?
>>
>> On Wed, Mar 11, 2020 at 4:46 AM Xintong Song 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> That does't sound like a scaling issue. It's probably a data skew, that
>>> the data volume on some of the keys are significantly higher than others.
>>> I'm not familiar with this area though, and have copied Jark for you, who
>>> is one of the community experts in this area.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Wed, Mar 11, 2020 at 10:37 AM Eleanore Jin 
>>> wrote:
>>>
 _Hi Xintong,

 Thanks for the prompt reply! To answer your question:

- Which Flink version are you using?

v1.8.2

- Is this skew observed only after a scaling-up? What happens if
the parallelism is initially set to the scaled-up value?

I also tried this, it seems skew also happens even I do
 not change the parallelism, so it may not caused by scale-up/down

- Keeping the job running a while after the scale-up, does the skew
ease?

So the skew happens in such a way that: some partitions
 lags down to 0, but other partitions are still at level of 10_000, and I am
 seeing the back pressure is ok.

 Thanks a lot!
 Eleanore


 On Tue, Mar 10, 2020 at 7:03 PM Xintong Song 
 wrote:

> Hi Eleanore,
>
> I have a few more questions regarding your issue.
>
>- Which Flink version are you using?
>- Is this skew observed only after a scaling-up? What happens if
>the parallelism is initially set to the scaled-up value?
>- Keeping the job running a while after the scale-up, does the
>skew ease?
>
> I suspect the performance difference might be an outcome of some
> warming up issues. E.g., the existing TMs might have some file already
> localized, or some memory buffers already promoted to the JVM tenured 
> area,
> while the new TMs have not.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin 
> wrote:
>
>> Hi Experts,
>> I have my flink application running on Kubernetes, initially with 1
>> Job Manager, and 2 Task Managers.
>>
>> Then we have the custom operator that watches for the CRD, when the
>> CRD replicas changed, it will patch the Flink Job Manager deployment
>> parallelism and max parallelism according to the replicas from CRD
>> (parallelism can be configured via env variables for our application).
>> which causes the job manager restart. hence a new Flink job. But the
>> consumer group does not change, so it will continue from the offset
>> where it left.
>>
>> In addition, operator will also update Task Manager's deployment
>> replicas, and will adjust the pod number.
>>
>> In case of scale up, the existing task manager pods do not get
>> killed, but new task manager pods will be created.
>>
>> And we observed a skew in the partition offset consumed. e.g. some
>> partitions have huge lags and other partitions have small lags. (observed
>> from burrow)
>>
>> This is also validated by the metrics from Flink UI, showing the
>> throughput differs for slotss
>>
>> Any clue why this is the case?
>>
>> Thanks a lot!
>> Eleanore
>>
>
>>
>


Re: Failure detection and Heartbeats

2020-03-11 Thread Gary Yao
Hi Morgan,

> I am interested in knowing more about the failure detection mechanism
used by Flink, unfortunately information is a little thin on the ground and
I was hoping someone could shed a little light on the topic.
It is probably best to look into the implementation (see my answers below).

> Having the heartbeat interval shorter than the heartbeat timeout would
mean that multiple requests can be underway at the same time.
Yes, in fact the heartbeat interval must be shorter than the timeout or
else an exception is thrown [1]

> - In the worst case the JobManager would detect the failure in the
longest time, i.e. 60 seconds +- (node fails just after sending the last
heartbeat response)
If a heartbeat response is received, the 50s timeout is reset [2]. If we do
not receive a single heartbeat response for 50s, we will assume a failure
[3]. Therefore, I do not think that there is a worst case or best case here.

Lastly I wanted to mention that since FLIP-6 [4], the responsibilities of
the JobManager have been split. We now have a ResourceManager and one
JobManager for every job (note that in the code the class is called
JobMaster). Each instance employs heartbeating to each other and also to
the TaskManagers.

Best,
Gary

[1]
https://github.com/apache/flink/blob/bf1195232a49cce1897c1fa86c5af9ee005212c6/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatServices.java#L43
[2]
https://github.com/apache/flink/blob/1b628d4a7d92f9c79c31f3fe90911940e0676b22/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatMonitorImpl.java#L117-L128
[3]
https://github.com/apache/flink/blob/1b628d4a7d92f9c79c31f3fe90911940e0676b22/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatMonitorImpl.java#L106-L111
[4]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077

On Tue, Mar 10, 2020 at 2:54 PM Morgan Geldenhuys <
morgan.geldenh...@tu-berlin.de> wrote:

> Hi community,
>
> I am interested in knowing more about the failure detection mechanism used
> by Flink, unfortunately information is a little thin on the ground and I
> was hoping someone could shed a little light on the topic.
>
> Looking at the documentation (
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html),
> there are these two configuration options:
>
> heartbeat.interval 1 Long Time interval for requesting heartbeat from
> sender side.
> heartbeat.timeout 5 Long Timeout for requesting and receiving
> heartbeat for both sender and receiver sides. This would indicate Flink
> uses a heartbeat mechanism to ascertain the liveness of TaskManagers. From
> this the following assumptions are made:
>
> The JobManager is responsible for broadcasting a heartbeat requests to all
> TaskManagers and awaits responses.
> If a response is not forthcoming from any particular node within the
> heartbeat timeout period, e.g. 50 seconds by default, then that node is
> timed out and assumed to have failed.
> The heartbeat interval indicated how often the heartbeat request broadcast
> is scheduled.
> Having the heartbeat interval shorter than the heartbeat timeout would
> mean that multiple requests can be underway at the same time.
> Therefore, the TaskManager would need to fail to respond to 4 requests
> (assuming normal response times are lower than 10 seconds) before being
> timed out after 50 seconds.
>
> So therefore if a failure were to occur (considering the default settings):
> - In the best case the JobManager would detect the failure in the shortest
> time, i.e. 50 seconds +- (node fails just before receiving the next
> heartbeat request)
> - In the worst case the JobManager would detect the failure in the longest
> time, i.e. 60 seconds +- (node fails just after sending the last heartbeat
> response)
>
> Is this correct?
>
> For JobManagers in HA mode, this is left to ZooKeeper timeouts which then
> initiates a round of elections and the new leader picks up from the
> previous checkpoint.
>
> Thank you in advance.
>
> Regards,
> M.
>
>
>
>
>
>
>
>


how to specify yarnqueue when starting a new job programmatically?

2020-03-11 Thread Vitaliy Semochkin
Hi,

How can I specify a yarn queue when I start a new job programmatically?

Regards,
Vitaliy


datadog http reporter metrics

2020-03-11 Thread Yitzchak Lieberman
Hi.

Did someone encountered problem with sending metrics with datadog http
reporter?
My setup is flink version 1.8.2 deployed on k8s with 1 job manager and 10
task managers.
Every version deploy I see metrics on my dashboard but after a few minutes
its stopped being sent from all task managers while job manager still sends
(with no error/warn on the logs).
Is it possible to be blocked by datadog due to the cluster size? my staging
cluster with 3 servers sends without any problem.

Thanks in advance,
Yitzchak.


Re: datadog metrics

2020-03-11 Thread Steve Whelan
Hi Fabian,

We ran into the same issue. We modified the reporter to emit the metrics in
chunks and it worked fine after. Would be interested in seeing a ticket on
this as well.

- Steve

On Wed, Mar 11, 2020 at 5:13 AM Chesnay Schepler  wrote:

> Please open a JIRA; we may have to split the datatog report into several
> chunks.
>
> On 09/03/2020 07:47, Fanbin Bu wrote:
>
> quote from the following link:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Query-named-operator-exceeds-80-characters-td24807.html#a24818
>
> "This is a safeguard in the metric system to prevent extremely long names
> (as these could cause the reporting to fail); so long as the prefix is
> unique you can safely ignore this warning."
>
> I do see from log that my sql operator name is too long and says it's
> truncated.
> But i still failed to report to datadog.
>
> Thanks
> Fanbin
>
> On Sun, Mar 8, 2020 at 11:36 PM Fanbin Bu  wrote:
>
>> Hi,
>>
>> Has anybody seen this error before and what is the suggested way to solve
>> it?
>>
>> 2020-03-07 02:54:34,100 WARN
>>  org.apache.flink.metrics.datadog.DatadogHttpClient- Failed to
>> send request to Datadog (response was Response{protocol=http/1.1, code=413,
>> message=Request Entity Too Large, url=
>> https://app.datadoghq.com/api/v1/series?api_key=
>>
>> thanks,
>> Fanbin
>>
>
>


Re: how to specify yarnqueue when starting a new job programmatically?

2020-03-11 Thread Xintong Song
Hi Vitaliy,

You can specify a yarn queue by either setting the configuration option
'yarn.application.queue' [1], or using the command line option '-qu' (or
'--queue') [2].

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#yarn-application-queue
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#start-a-session

On Thu, Mar 12, 2020 at 3:56 AM Vitaliy Semochkin 
wrote:

> Hi,
>
> How can I specify a yarn queue when I start a new job programmatically?
>
> Regards,
> Vitaliy
>


How to set stateBackEnd in flink sql program?

2020-03-11 Thread wangl...@geekplus.com.cn

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.sqlUpdate()...
Is there a way i can set stateBackEnd like normal streaming program as 
folloing:StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend(args[0], true));



wangl...@geekplus.com.cn


Re: How to set stateBackEnd in flink sql program?

2020-03-11 Thread Jingsong Li
Hi wanglei,

If you are using Flink 1.10, you can set "state.backend=rocksdb" to
"TableConfig.getConfiguration".
And you can find related config options here[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html

Jingsong Lee

On Thu, Mar 12, 2020 at 11:15 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> TableEnvironment tEnv = TableEnvironment.create(settings);
>
> tEnv.sqlUpdate()...
>
>
> Is there a way i can set stateBackEnd like normal streaming program as 
> folloing:
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend(new RocksDBStateBackend(args[0], true));
>
>
> --
> wangl...@geekplus.com.cn
>


-- 
Best, Jingsong Lee


Re: Re: How to set stateBackEnd in flink sql program?

2020-03-11 Thread wangl...@geekplus.com.cn
Hi Jingsong, 

So i can write the code as following?

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.getConfig().getConfiguration().setString("state.backend","rocksdb");eEnv.sqlUpdate(..)
Thanks,
Lei


wangl...@geekplus.com.cn
 
From: Jingsong Li
Date: 2020-03-12 11:32
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: How to set stateBackEnd in flink sql program?
Hi wanglei,

If you are using Flink 1.10, you can set "state.backend=rocksdb" to 
"TableConfig.getConfiguration".
And you can find related config options here[1].

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html

Jingsong Lee

On Thu, Mar 12, 2020 at 11:15 AM wangl...@geekplus.com.cn 
 wrote:

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.sqlUpdate()...
Is there a way i can set stateBackEnd like normal streaming program as 
folloing:StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend(args[0], true));



wangl...@geekplus.com.cn


-- 
Best, Jingsong Lee


Re: Re: How to set stateBackEnd in flink sql program?

2020-03-11 Thread Jingsong Li
 Yes,

You should take a look to [1]. Others config you need too.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html

Best,
Jingsong Lee

On Thu, Mar 12, 2020 at 12:26 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Hi Jingsong,
>
> So i can write the code as following?
>
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> TableEnvironment tEnv = TableEnvironment.create(settings);
> tEnv.getConfig().getConfiguration().setString("state.backend","rocksdb");
>
> eEnv.sqlUpdate(..)
>
>
> Thanks,
> Lei
>
> --
> wangl...@geekplus.com.cn
>
>
> *From:* Jingsong Li 
> *Date:* 2020-03-12 11:32
> *To:* wangl...@geekplus.com.cn
> *CC:* user 
> *Subject:* Re: How to set stateBackEnd in flink sql program?
> Hi wanglei,
>
> If you are using Flink 1.10, you can set "state.backend=rocksdb" to
> "TableConfig.getConfiguration".
> And you can find related config options here[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html
>
> Jingsong Lee
>
> On Thu, Mar 12, 2020 at 11:15 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>>
>> EnvironmentSettings settings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> TableEnvironment tEnv = TableEnvironment.create(settings);
>>
>> tEnv.sqlUpdate()...
>>
>>
>> Is there a way i can set stateBackEnd like normal streaming program as 
>> folloing:
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStateBackend(new RocksDBStateBackend(args[0], true));
>>
>>
>> --
>> wangl...@geekplus.com.cn
>>
>
>
> --
> Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee


Flink 1.10 container memory configuration with Mesos.

2020-03-11 Thread Alexander Kasyanenko
Hi folks,

I have a question related configuration for new memory introduced in flink
1.10. Has anyone encountered similar problem?
I'm trying to make use of *taskmanager.memory.process.size* configuration
key in combination with mesos session cluster, but I get an error like this:

2020-03-11 11:44:09,771 [main] ERROR
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner - Error
while starting the TaskManager
org.apache.flink.configuration.IllegalConfigurationException: Failed
to create TaskExecutorResourceSpec
at 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:72)
at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:356)
at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.(TaskManagerRunner.java:152)
at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:308)
at 
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner.lambda$main$0(MesosTaskExecutorRunner.java:106)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner.main(MesosTaskExecutorRunner.java:105)
Caused by: org.apache.flink.configuration.IllegalConfigurationException:
The required configuration option Key:
'taskmanager.memory.task.heap.size' , default: null (fallback keys:
[]) is not set
at 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkConfigOptionIsSet(TaskExecutorResourceUtils.java:90)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.lambda$checkTaskExecutorResourceConfigSet$0(TaskExecutorResourceUtils.java:84)
at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4390)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkTaskExecutorResourceConfigSet(TaskExecutorResourceUtils.java:84)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:70)
... 9 more

But when task manager is launched, it correctly parses process memory key:

2020-03-11 11:43:55,376 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -

2020-03-11 11:43:55,377 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -
Starting MesosTaskExecutorRunner (Version: 1.10.0, Rev:aa4eb8f,
Date:07.02.2020 @ 19:18:19 CET)
2020-03-11 11:43:55,377 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -  OS
current user: root
2020-03-11 11:43:57,347 [main] WARN
org.apache.hadoop.util.NativeCodeLoader   - Unable
to load native-hadoop library for your platform... using builtin-java
classes where applicable
2020-03-11 11:43:57,535 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -  JVM:
OpenJDK 64-Bit Server VM - AdoptOpenJDK - 11/11.0.2+9
2020-03-11 11:43:57,535 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -
Maximum heap size: 746 MiBytes
2020-03-11 11:43:57,535 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -
JAVA_HOME: (not set)
2020-03-11 11:43:57,539 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -
Hadoop version: 2.6.5
2020-03-11 11:43:57,539 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -  JVM
Options:
2020-03-11 11:43:57,539 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -
-Xmx781818251
2020-03-11 11:43:57,539 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -
-Xms781818251
2020-03-11 11:43:57,540 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -
-XX:MaxDirectMemorySize=317424929
2020-03-11 11:43:57,540 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -
-XX:MaxMetaspaceSize=100663296
2020-03-11 11:43:57,540 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -
-Dlog.file=/var/log/flink-session-cluster/taskmanager.log
2020-03-11 11:43:57,540 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -
-Dlog4j.configuration=file:/opt/flink/conf/log4j.properties
2020-03-11 11:43:57,540 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -
-Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
2020-03-11 11:43:57,540 [main] INFO
org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -
Program Arguments: (none)
2020-03-11 11:43:57,540 [main] INFO
org.apache.fli

Re: Flink 1.10 container memory configuration with Mesos.

2020-03-11 Thread Yangze Guo
Hi, Alexander

I could not reproduce it in my local environment. Normally, Mesos RM
will calculate all the mem config and add it to the launch command.
Unfortunately, all the log I could found for this command is at the
DEBUG level. Would you mind changing the log level to DEBUG or sharing
anything about the taskmanager launch command you could found in the
current log?


Best,
Yangze Guo

On Thu, Mar 12, 2020 at 1:38 PM Alexander Kasyanenko
 wrote:
>
> Hi folks,
>
> I have a question related configuration for new memory introduced in flink 
> 1.10. Has anyone encountered similar problem?
> I'm trying to make use of taskmanager.memory.process.size configuration key 
> in combination with mesos session cluster, but I get an error like this:
>
> 2020-03-11 11:44:09,771 [main] ERROR 
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner - Error while 
> starting the TaskManager
> org.apache.flink.configuration.IllegalConfigurationException: Failed to 
> create TaskExecutorResourceSpec
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:72)
> at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:356)
> at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.(TaskManagerRunner.java:152)
> at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:308)
> at 
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner.lambda$main$0(MesosTaskExecutorRunner.java:106)
> at java.base/java.security.AccessController.doPrivileged(Native Method)
> at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at 
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner.main(MesosTaskExecutorRunner.java:105)
> Caused by: org.apache.flink.configuration.IllegalConfigurationException: The 
> required configuration option Key: 'taskmanager.memory.task.heap.size' , 
> default: null (fallback keys: []) is not set
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkConfigOptionIsSet(TaskExecutorResourceUtils.java:90)
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.lambda$checkTaskExecutorResourceConfigSet$0(TaskExecutorResourceUtils.java:84)
> at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4390)
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkTaskExecutorResourceConfigSet(TaskExecutorResourceUtils.java:84)
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:70)
> ... 9 more
>
> But when task manager is launched, it correctly parses process memory key:
>
> 2020-03-11 11:43:55,376 [main] INFO  
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner - 
> 
> 2020-03-11 11:43:55,377 [main] INFO  
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -  Starting 
> MesosTaskExecutorRunner (Version: 1.10.0, Rev:aa4eb8f, Date:07.02.2020 @ 
> 19:18:19 CET)
> 2020-03-11 11:43:55,377 [main] INFO  
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -  OS current 
> user: root
> 2020-03-11 11:43:57,347 [main] WARN  org.apache.hadoop.util.NativeCodeLoader  
>  - Unable to load native-hadoop library for your 
> platform... using builtin-java classes where applicable
> 2020-03-11 11:43:57,535 [main] INFO  
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -  JVM: OpenJDK 
> 64-Bit Server VM - AdoptOpenJDK - 11/11.0.2+9
> 2020-03-11 11:43:57,535 [main] INFO  
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -  Maximum heap 
> size: 746 MiBytes
> 2020-03-11 11:43:57,535 [main] INFO  
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -  JAVA_HOME: 
> (not set)
> 2020-03-11 11:43:57,539 [main] INFO  
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -  Hadoop 
> version: 2.6.5
> 2020-03-11 11:43:57,539 [main] INFO  
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -  JVM Options:
> 2020-03-11 11:43:57,539 [main] INFO  
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner - 
> -Xmx781818251
> 2020-03-11 11:43:57,539 [main] INFO  
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner - 
> -Xms781818251
> 2020-03-11 11:43:57,540 [main] INFO  
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner - 
> -XX:MaxDirectMemorySize=317424929
> 2020-03-11 11:43:57,540 [main] INFO  
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner - 
> -XX:MaxMetaspaceSize=100663296
> 2020-03-11 11:43:57,540 [main] INFO  
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunn

Re: Flink 1.10 container memory configuration with Mesos.

2020-03-11 Thread Xintong Song
Hi Alex,

Could you try to check and post your TM launch command? I suspect that
there might be some unrecognized arguments that prevent the rest of
arguments being parsed.

The TM memory configuration process works as follow:

   1. The resource manager will parse the configurations, checking which
   options are configured and which are not, and calculate the size of each
   memory component. (This is where ‘taskmanager.memory.process.size’ is used.)
   2. After deriving the memory component sizes, the resource manager will
   generate launch command for the task managers, with dynamic configurations
   "-D " overwriting the memory component sizes. Therefore, even
   you have not configured 'taskmanager.memory.task.heap.size', it is expected
   that before when the TM is launched this config option should be available.
   3. When a task manager is started, it will not do the calculations
   again, and will directly read the memory component sizes calculated by
   resource manager from the dynamic configurations. That means it is not
   reading ‘taskmanager.memory.process.size’ and deriving memory component
   sizes from it again.

One thing that might have caused your problem is that, when
MesosTaskExecutorRunner
parses the command line arguments (that's where the dynamic configurations
are passed in), if it meets an unrecognized token it will stop parsing the
rest of the arguments. That could be the reason that
'taskmanager.memory.task.heap.size'
is missing. You can take a look at the launching command, see if there's
anything unexpected before the memory dynamic configurations.

Thank you~

Xintong Song



On Thu, Mar 12, 2020 at 2:26 PM Yangze Guo  wrote:

> Hi, Alexander
>
> I could not reproduce it in my local environment. Normally, Mesos RM
> will calculate all the mem config and add it to the launch command.
> Unfortunately, all the log I could found for this command is at the
> DEBUG level. Would you mind changing the log level to DEBUG or sharing
> anything about the taskmanager launch command you could found in the
> current log?
>
>
> Best,
> Yangze Guo
>
> On Thu, Mar 12, 2020 at 1:38 PM Alexander Kasyanenko
>  wrote:
> >
> > Hi folks,
> >
> > I have a question related configuration for new memory introduced in
> flink 1.10. Has anyone encountered similar problem?
> > I'm trying to make use of taskmanager.memory.process.size configuration
> key in combination with mesos session cluster, but I get an error like this:
> >
> > 2020-03-11 11:44:09,771 [main] ERROR
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner - Error while
> starting the TaskManager
> > org.apache.flink.configuration.IllegalConfigurationException: Failed to
> create TaskExecutorResourceSpec
> > at
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:72)
> > at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:356)
> > at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.(TaskManagerRunner.java:152)
> > at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:308)
> > at
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner.lambda$main$0(MesosTaskExecutorRunner.java:106)
> > at java.base/java.security.AccessController.doPrivileged(Native Method)
> > at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
> > at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> > at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner.main(MesosTaskExecutorRunner.java:105)
> > Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> The required configuration option Key: 'taskmanager.memory.task.heap.size'
> , default: null (fallback keys: []) is not set
> > at
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkConfigOptionIsSet(TaskExecutorResourceUtils.java:90)
> > at
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.lambda$checkTaskExecutorResourceConfigSet$0(TaskExecutorResourceUtils.java:84)
> > at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4390)
> > at
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkTaskExecutorResourceConfigSet(TaskExecutorResourceUtils.java:84)
> > at
> org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:70)
> > ... 9 more
> >
> > But when task manager is launched, it correctly parses process memory
> key:
> >
> > 2020-03-11 11:43:55,376 [main] INFO
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -
> 
> > 2020-03-11 11:43:55,377 [main] INFO
> org.apache.flink.mesos.entrypoint.MesosTaskExecutorRunner -  Starting
> MesosTaskExecut