RocksDB state on HDFS seems not being cleanned up

2019-11-05 Thread shuwen zhou
Hi Community,
I have a job running on Flink1.9.0 on YARN with rocksDB on HDFS with
incremental checkpoint enabled. I have some MapState in code with following
config:

val ttlConfig = StateTtlConfig
  .newBuilder(Time.minutes(30)
  .updateTtlOnCreateAndWrite()
  .cleanupInBackground()
  .cleanupFullSnapshot()
  
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)

After running for around 2 days, I observed checkpoint folder is showing

44.4 M   /flink-chk743e4568a70b626837b/chk-40
65.9 M   /flink-chk743e4568a70b626837b/chk-41
91.7 M   /flink-chk743e4568a70b626837b/chk-42
96.1 M   /flink-chk743e4568a70b626837b/chk-43
48.1 M   /flink-chk743e4568a70b626837b/chk-44
71.6 M   /flink-chk743e4568a70b626837b/chk-45
50.9 M   /flink-chk743e4568a70b626837b/chk-46
90.2 M   /flink-chk743e4568a70b626837b/chk-37
49.3 M   /flink-chk743e4568a70b626837b/chk-38
96.9 M   /flink-chk743e4568a70b626837b/chk-39
797.9 G  /flink-chk743e4568a70b626837b/shared
The ./shared folder size seems continuing increasing and seems the folder
is not being clean up.
However while I disabled incremental cleanup, the expired full snapshot
will be removed automatically.
Is there any way to remove outdated state on HDFS to stop it from
increasing? Thanks.



-- 
Best Wishes,
Shuwen Zhou


[jira] [Created] (FLINK-14611) Move allVerticesInSameSlotSharingGroupByDefault from ExecutionConfig to StreamGraph

2019-11-05 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-14611:
---

 Summary: Move allVerticesInSameSlotSharingGroupByDefault from 
ExecutionConfig to StreamGraph
 Key: FLINK-14611
 URL: https://issues.apache.org/jira/browse/FLINK-14611
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Zhu Zhu
 Fix For: 1.10.0


allVerticesInSameSlotSharingGroupByDefault is currently for internal use only.
It's better to not add it in ExecutionConfig which is for user configurations.
More details see discussion 
[this|https://issues.apache.org/jira/browse/FLINK-14059?focusedCommentId=16967392&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16967392]
 and [this|https://github.com/apache/flink/pull/10007#discussion_r342481078].

I'd propose to move it to StreamGraph.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Anybody can help to review the PR about re-work the e2e framework in Java ?

2019-11-05 Thread OpenInx
Hi :

I’m working on the e2e framework re-work, say rewrite the the e2e framework
in java so that we can do more things,
such as running it on both standalone & distributed Flink cluster,  test on
standalone or distributed kafka env ,  run
under maven env etc.

Talked with Till and Chesnay in Slack, both of them are quite busy and have
no bandwidth now.  Anyone else can help to
review this PR ? will be appreciated if any help, :-)

The Pull Request is: https://github.com/apache/flink/pull/10042

Thanks.


[jira] [Created] (FLINK-14612) Degenerate the current ConcurrentHashMap type of intermediateResults to a normal HashMap type.

2019-11-05 Thread vinoyang (Jira)
vinoyang created FLINK-14612:


 Summary: Degenerate the current ConcurrentHashMap type of 
intermediateResults to a normal HashMap type.
 Key: FLINK-14612
 URL: https://issues.apache.org/jira/browse/FLINK-14612
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: vinoyang


After FLINK-11417, we made ExecutionGraph be a single-thread mode. It will no 
longer be plagued by concurrency issues. So, we can degenerate the current 
ConcurrentHashMap type of intermediateResults to a normal HashMap type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] Flink Avro Cloudera Registry (FLINK-14577)

2019-11-05 Thread Őrhidi Mátyás
Dear Flink Community!

We have noticed a recent request for Hortonworks schema registry support (
FLINK-14577 ). We have
an implementation for it already, and we would be happy to contribute it to
Apache Flink.

You can find the documentation below[1]. Let us know your thoughts!

Best Regards,
Matyas

[1] Flink Avro Cloudera Registry User Guide
---

Add the following dependency to use the schema registry integration:

org.apache.flink
flink-avro-cloudera-registry
${flink.version}



The schema registry can be plugged directly into the FlinkKafkaConsumer and
FlinkKafkaProducer using the appropriate schema:
-
org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
-
org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema


Supported types
--
- Avro Specific Record types
- Avro Generic Records
- Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, Long,
String, Boolean

SchemaRegistrySerializationSchema
--
The serialization schema can be constructed using the included builder
object SchemaRegistrySerializationSchema.builder(..).

Required settings:
- Topic configuration when creating the builder. Can be static or dynamic
(extracted from the data)
- RegistryAddress parameter on the builder to establish the connection

Optional settings:
- Arbitrary SchemaRegistry client configuration using the setConfig method
- Key configuration for the produced Kafka messages
 - By specifying a KeySelector function that extracts the key from each
record
 - Using a Tuple2 stream for (key, value) pairs directly
- Security configuration

Example:
KafkaSerializationSchema schema =
SchemaRegistrySerializationSchema
.builder(topic)
.setRegistryAddress(registryAddress)
.setKey(ItemTransaction::getItemId)
.build();
FlinkKafkaProducer sink = new
FlinkKafkaProducer<>("dummy", schema, kafkaProps,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

SchemaRegistryDeserializationSchema
-
The deserialization schema can be constructed using the included builder
object SchemaRegistryDeserializationSchema.builder(..).
When reading messages (and keys) we always have to specify the expected
Class or record Schema of the input records so that Flink can do any
necessary conversion between the data on Kafka and what is expected.

Required settings:
- Class or Schema of the input messages depending on the data type
- RegistryAddress parameter on the builder to establish the connection

Optional settings:
- Arbitrary SchemaRegistry client configuration using the setConfig method
- Key configuration for the consumed Kafka messages
 - Should only be specified when we want to read the keys as well into a
(key, value) stream
- Security configuration

Example:
KafkaDeserializationSchema schema =
SchemaRegistryDeserializationSchema
   .builder(ItemTransaction.class)
   .setRegistryAddress(registryAddress)
   .build();
FlinkKafkaConsumer source = new
FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);


[jira] [Created] (FLINK-14613) Add validation check when applying UDF to tempral table key in Temporal Table Join condition

2019-11-05 Thread hailong wang (Jira)
hailong wang created FLINK-14613:


 Summary: Add validation check when applying UDF to  tempral table 
key in  Temporal Table Join condition 
 Key: FLINK-14613
 URL: https://issues.apache.org/jira/browse/FLINK-14613
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.9.1
Reporter: hailong wang
 Fix For: 1.10.0


In Temporal Table Join, We don't support using  UDF in tempral table join key. 
For we can't analyze LookupKeys  when call is an expression. When users use 
like this, the program run normally,  and the result will be wrong. So we 
should add validation to prevent it.

The SQL as following:
{code:java}
INSERT INTO A
SELECT B.amount, B.currency, C.amount, C.product 
FROM B join C FOR SYSTEM_TIME AS OF B.proctime 
on B.amount = C.amount and C.product = '1'
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14614) add annotation location javastyle rule

2019-11-05 Thread lamber-ken (Jira)
lamber-ken created FLINK-14614:
--

 Summary: add annotation location javastyle rule
 Key: FLINK-14614
 URL: https://issues.apache.org/jira/browse/FLINK-14614
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.9.1
Reporter: lamber-ken
 Fix For: 1.9.2


Check location of annotation on language elements.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14615) Add Flink Web UI capabilities for savepoint

2019-11-05 Thread Mario Georgiev (Jira)
 Mario Georgiev created FLINK-14615:
---

 Summary: Add Flink Web UI capabilities for savepoint
 Key: FLINK-14615
 URL: https://issues.apache.org/jira/browse/FLINK-14615
 Project: Flink
  Issue Type: Improvement
Reporter:  Mario Georgiev


Having the ability to do the following would greatly simplify Kubernetes/any 
cluster deployment that utilizes a single cluster for multiple jobs, rather 
than job per cluster.

My proposal is the following: 

Have a way to trigger savepoint from the UI for a job and have a way to get the 
savepoint's progress somewhere in the UI.

Have a way to deploy a new job/existing job from the UI using an existing 
savepoint and choosing between --allowNonRestoredState or not allowing such 
state at all. 

Have a way to trigger savepoints at scheduled interval for job or group of jobs 
( this could very well be a second iteration since this would require 
persistence for the job schedules )



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Flink Avro Cloudera Registry (FLINK-14577)

2019-11-05 Thread Dawid Wysakowicz
Hi Matyas,

I think this would be a valuable addition. You may reuse some of the
already available abstractions for writing avro deserialization schema
based on a schema registry (have a look at RegistryDeserializationSchema
and SchemaCoderProvider). There is also an opened PR for adding a
similar serialization schema[1].

The only concern is that I am not 100% sure what is the consensus on
which connectors do we want to adapt into the main repository and which
would we prefer to be hosted separately and included in the ecosystem
webpage[2] (that I hope will be published soon).

Whatever option will be preferred I could help review the code.

Best,

Dawid

[1] https://github.com/apache/flink/pull/8371

[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html

On 05/11/2019 12:40, Őrhidi Mátyás wrote:
> Dear Flink Community!
>
> We have noticed a recent request for Hortonworks schema registry support (
> FLINK-14577 ). We have
> an implementation for it already, and we would be happy to contribute it to
> Apache Flink.
>
> You can find the documentation below[1]. Let us know your thoughts!
>
> Best Regards,
> Matyas
>
> [1] Flink Avro Cloudera Registry User Guide
> ---
>
> Add the following dependency to use the schema registry integration:
> 
> org.apache.flink
> flink-avro-cloudera-registry
> ${flink.version}
> 
>
>
> The schema registry can be plugged directly into the FlinkKafkaConsumer and
> FlinkKafkaProducer using the appropriate schema:
> -
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
> -
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema
>
>
> Supported types
> --
> - Avro Specific Record types
> - Avro Generic Records
> - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, Long,
> String, Boolean
>
> SchemaRegistrySerializationSchema
> --
> The serialization schema can be constructed using the included builder
> object SchemaRegistrySerializationSchema.builder(..).
>
> Required settings:
> - Topic configuration when creating the builder. Can be static or dynamic
> (extracted from the data)
> - RegistryAddress parameter on the builder to establish the connection
>
> Optional settings:
> - Arbitrary SchemaRegistry client configuration using the setConfig method
> - Key configuration for the produced Kafka messages
>  - By specifying a KeySelector function that extracts the key from each
> record
>  - Using a Tuple2 stream for (key, value) pairs directly
> - Security configuration
>
> Example:
> KafkaSerializationSchema schema =
> SchemaRegistrySerializationSchema
> .builder(topic)
> .setRegistryAddress(registryAddress)
> .setKey(ItemTransaction::getItemId)
> .build();
> FlinkKafkaProducer sink = new
> FlinkKafkaProducer<>("dummy", schema, kafkaProps,
> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>
> SchemaRegistryDeserializationSchema
> -
> The deserialization schema can be constructed using the included builder
> object SchemaRegistryDeserializationSchema.builder(..).
> When reading messages (and keys) we always have to specify the expected
> Class or record Schema of the input records so that Flink can do any
> necessary conversion between the data on Kafka and what is expected.
>
> Required settings:
> - Class or Schema of the input messages depending on the data type
> - RegistryAddress parameter on the builder to establish the connection
>
> Optional settings:
> - Arbitrary SchemaRegistry client configuration using the setConfig method
> - Key configuration for the consumed Kafka messages
>  - Should only be specified when we want to read the keys as well into a
> (key, value) stream
> - Security configuration
>
> Example:
> KafkaDeserializationSchema schema =
> SchemaRegistryDeserializationSchema
>.builder(ItemTransaction.class)
>.setRegistryAddress(registryAddress)
>.build();
> FlinkKafkaConsumer source = new
> FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);
>



signature.asc
Description: OpenPGP digital signature


Re: RocksDB state on HDFS seems not being cleanned up

2019-11-05 Thread bupt_ljy
Hi Shuwen,


The “shared” means that the state files are shared among multiple checkpoints, 
which happens when you enable incremental checkpointing[1]. Therefore, it’s 
reasonable that the size keeps growing if you set 
“state.checkpoint.num-retained” to be a big value.


[1] https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html


Best,
Jiayi Liao


 Original Message 
Sender: shuwen zhou
Recipient: dev
Date: Tuesday, Nov 5, 2019 17:59
Subject: RocksDB state on HDFS seems not being cleanned up


Hi Community, I have a job running on Flink1.9.0 on YARN with rocksDB on HDFS 
with incremental checkpoint enabled. I have some MapState in code with 
following config: val ttlConfig = StateTtlConfig .newBuilder(Time.minutes(30) 
.updateTtlOnCreateAndWrite() .cleanupInBackground() .cleanupFullSnapshot() 
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) 
After running for around 2 days, I observed checkpoint folder is showing 44.4 M 
/flink-chk743e4568a70b626837b/chk-40 65.9 M 
/flink-chk743e4568a70b626837b/chk-41 91.7 M 
/flink-chk743e4568a70b626837b/chk-42 96.1 M 
/flink-chk743e4568a70b626837b/chk-43 48.1 M 
/flink-chk743e4568a70b626837b/chk-44 71.6 M 
/flink-chk743e4568a70b626837b/chk-45 50.9 M 
/flink-chk743e4568a70b626837b/chk-46 90.2 M 
/flink-chk743e4568a70b626837b/chk-37 49.3 M 
/flink-chk743e4568a70b626837b/chk-38 96.9 M 
/flink-chk743e4568a70b626837b/chk-39 797.9 G 
/flink-chk743e4568a70b626837b/shared The ./shared folder size seems continuing 
increasing and seems the folder is not being clean up. However while I disabled 
incremental cleanup, the expired full snapshot will be removed automatically. 
Is there any way to remove outdated state on HDFS to stop it from increasing? 
Thanks. -- Best Wishes, Shuwen Zhou

Re: [DISCUSS] Flink Avro Cloudera Registry (FLINK-14577)

2019-11-05 Thread Gyula Fóra
Thanks Matyas for starting the discussion!
I think this would be a very valuable addition to Flink as many companies
are already using the Hortonworks/Cloudera registry and it would enable
them to connect to Flink easily.

@Dawid:
Regarding the implementation this a much more lightweight connector than
what we have now for the Confluent registry and the PR you linked. This
wraps the cloudera registry directly, providing a very thin wrapper + some
enhanced functionality regarding handling of Kafka messages keys.

As for the question of main repo outside, I would prefer this to be
included in the main repo, similar to the Confluent registry connector.
Unless we decide to move all of these connectors out I would like to take a
consistent approach.

Cheers,
Gyula


On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz 
wrote:

> Hi Matyas,
>
> I think this would be a valuable addition. You may reuse some of the
> already available abstractions for writing avro deserialization schema
> based on a schema registry (have a look at RegistryDeserializationSchema
> and SchemaCoderProvider). There is also an opened PR for adding a
> similar serialization schema[1].
>
> The only concern is that I am not 100% sure what is the consensus on
> which connectors do we want to adapt into the main repository and which
> would we prefer to be hosted separately and included in the ecosystem
> webpage[2] (that I hope will be published soon).
>
> Whatever option will be preferred I could help review the code.
>
> Best,
>
> Dawid
>
> [1] https://github.com/apache/flink/pull/8371
>
> [2]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
>
> On 05/11/2019 12:40, Őrhidi Mátyás wrote:
> > Dear Flink Community!
> >
> > We have noticed a recent request for Hortonworks schema registry support
> (
> > FLINK-14577 ). We
> have
> > an implementation for it already, and we would be happy to contribute it
> to
> > Apache Flink.
> >
> > You can find the documentation below[1]. Let us know your thoughts!
> >
> > Best Regards,
> > Matyas
> >
> > [1] Flink Avro Cloudera Registry User Guide
> > ---
> >
> > Add the following dependency to use the schema registry integration:
> > 
> > org.apache.flink
> > flink-avro-cloudera-registry
> > ${flink.version}
> > 
> >
> >
> > The schema registry can be plugged directly into the FlinkKafkaConsumer
> and
> > FlinkKafkaProducer using the appropriate schema:
> > -
> >
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
> > -
> >
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema
> >
> >
> > Supported types
> > --
> > - Avro Specific Record types
> > - Avro Generic Records
> > - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float,
> Long,
> > String, Boolean
> >
> > SchemaRegistrySerializationSchema
> > --
> > The serialization schema can be constructed using the included builder
> > object SchemaRegistrySerializationSchema.builder(..).
> >
> > Required settings:
> > - Topic configuration when creating the builder. Can be static or dynamic
> > (extracted from the data)
> > - RegistryAddress parameter on the builder to establish the connection
> >
> > Optional settings:
> > - Arbitrary SchemaRegistry client configuration using the setConfig
> method
> > - Key configuration for the produced Kafka messages
> >  - By specifying a KeySelector function that extracts the key from each
> > record
> >  - Using a Tuple2 stream for (key, value) pairs directly
> > - Security configuration
> >
> > Example:
> > KafkaSerializationSchema schema =
> > SchemaRegistrySerializationSchema
> > .builder(topic)
> > .setRegistryAddress(registryAddress)
> > .setKey(ItemTransaction::getItemId)
> > .build();
> > FlinkKafkaProducer sink = new
> > FlinkKafkaProducer<>("dummy", schema, kafkaProps,
> > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> >
> > SchemaRegistryDeserializationSchema
> > -
> > The deserialization schema can be constructed using the included builder
> > object SchemaRegistryDeserializationSchema.builder(..).
> > When reading messages (and keys) we always have to specify the expected
> > Class or record Schema of the input records so that Flink can do any
> > necessary conversion between the data on Kafka and what is expected.
> >
> > Required settings:
> > - Class or Schema of the input messages depending on the data type
> > - RegistryAddress parameter on the builder to establish the connection
> >
> > Optional settings:
> > - Arbitrary SchemaRegistry client configuration using the setConfig
> method
> > - Key configuration for the consumed Kafka messages
> >  - Should only be specified when we want to read the keys as well i

Re: [DISCUSS] FLIP 69 - Flink SQL DDL Enhancement

2019-11-05 Thread Terry Wang
Hi Bowen~

We don’t intend to support create/drop catalog  syntax in this flip, we may 
support it if there indeed has a strong desire.
And I’m going to kick off a vote for this flip, feel free to review again.

Best,
Terry Wang



> 2019年9月26日 00:44,Xuefu Z  写道:
> 
> Actually catalogs are more of system settings than of user objects that a
> user might create or drop constantly. Thus, it's probably sufficient to set
> up catalog information in the config file, at least for now.
> 
> Thanks,
> Xuefu
> 
> On Tue, Sep 24, 2019 at 7:10 PM Terry Wang  > wrote:
> 
>> Thanks Bowen for your insightful comments, I’ll think twice and do
>> corresponding improvement.
>> After finished, I’ll update in this mailing thread again.
>> Best,
>> Terry Wang
>> 
>> 
>> 
>>> 在 2019年9月25日,上午8:28,Bowen Li  写道:
>>> 
>>> BTW, will there be a "CREATE/DROP CATALOG" DDL?
>>> 
>>> Though it's not SQL standard, I can see it'll be useful and handy for
>> our end users in many cases.
>>> 
>>> On Mon, Sep 23, 2019 at 12:28 PM Bowen Li >>  > bowenl...@gmail.com >> wrote:
>>> Hi Terry,
>>> 
>>> Thanks for driving the effort! I left some comments in the doc.
>>> 
>>> AFAIU, the biggest motivation is to support DDLs in sql parser so that
>> both Table API and SQL CLI can share the stack, despite that SQL CLI has
>> already supported some commands itself. However, I don't see details on how
>> SQL CLI would migrate and depend on sql parser, and how Table API and SQL
>> CLI would actually share SQL parser. I'm not sure yet how much work that
>> will take, just want to double check that you didn't include them because
>> they are very trivial according to your estimate?
>>> 
>>> 
>>> On Mon, Sep 16, 2019 at 1:46 AM Terry Wang >>  > zjuwa...@gmail.com >> wrote:
>>> Hi everyone,
>>> 
>>> In flink 1.9, we have introduced some awesome features such as complete
>> catalog support[1] and sql ddl support[2]. These features have been a
>> critical integration for Flink to be able to manage data and metadata like
>> a classic RDBMS and make developers more easy to construct their
>> real-time/off-line warehouse or sth similar base on flink.
>>> 
>>> But there is still a lack of support on how Flink SQL DDL to manage
>> metadata and data like classic RDBMS such as `alter table rename` and so on.
>>> 
>>> So I’d like to kick off a discussion on enhancing Flink Sql Ddls:
>>> 
>> https://docs.google.com/document/d/1mhZmx1h2ecfL0x8OzYD1n-nVRn4yE7pwk4jGed4k7kc/edit?usp=sharing
>>  
>> 
>> <
>> https://docs.google.com/document/d/1mhZmx1h2ecfL0x8OzYD1n-nVRn4yE7pwk4jGed4k7kc/edit?usp=sharing
>>  
>> >
>> <
>> https://docs.google.com/document/d/1mhZmx1h2ecfL0x8OzYD1n-nVRn4yE7pwk4jGed4k7kc/edit?usp=sharing
>>  
>> 
>> <
>> https://docs.google.com/document/d/1mhZmx1h2ecfL0x8OzYD1n-nVRn4yE7pwk4jGed4k7kc/edit?usp=sharing
>>  
>> 
 
>>> 
>>> In short, it:
>>>- Add Catalog DDL enhancement support:  show catalogs / describe
>> catalog / use catalog
>>>- Add Database DDL enhancement support:  show databses / create
>> database / drop database/ alter database
>>>- Add Table DDL enhancement support:show tables/ describe
>> table / alter table
>>>- Add Function DDL enhancement support: show functions/ create
>> function /drop function
>>> 
>>> Looking forward to your opinions.
>>> 
>>> Best,
>>> Terry Wang
>>> 
>>> 
>>> 
>>> [1]:https://issues.apache.org/jira/browse/FLINK-11275 
>>>  <
>> https://issues.apache.org/jira/browse/FLINK-11275 
>> > <
>> https://issues.apache.org/jira/browse/FLINK-11275 
>>  <
>> https://issues.apache.org/jira/browse/FLINK-11275 
>> >>
>>> [2]:https://issues.apache.org/jira/browse/FLINK-1 
>>>  <
>> https://issues.apache.org/jira/browse/FLINK-1 
>> > <
>> https://issues.apache.org/jira/browse/FLINK-11275 
>>  <
>> https://issues.apache.org/jira/browse/FLINK-11275 
>> >>0232
>>> >>  <
>> https://issues.apache.org/jira/browse/FLINK-11275 
>> >>
>

[VOTE] FLIP-69: Flink SQL DDL Enhancement

2019-11-05 Thread Terry Wang
Hi all,

I would like to start the vote for FLIP-69[1] which is discussed and reached 
consensus in the discussion thread[2].

The vote will be open for at least 72 hours. I'll try to close it by 2019-11-08 
14:30 UTC, unless there is an objection or not enough votes.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP+69+-+Flink+SQL+DDL+Enhancement
 

[2] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-69-Flink-SQL-DDL-Enhancement-td33090.html
 

Best,
Terry Wang





Re: RocksDB state on HDFS seems not being cleanned up

2019-11-05 Thread shuwen zhou
Hi Jiayi,
I understand that being shared folder means to store state of multiple
checkpoints. I think that shared folder should only retain data across
number “state.checkpoint.num-retained” checkpoints and remove outdated
checkpoint, isn't it?
In my case I doubt that outdated checkpoint's states wasn't cleaned up,
which makes shared folder keep increasing even after TTL was passed.


On Tue, 5 Nov 2019 at 21:13, bupt_ljy  wrote:

> Hi Shuwen,
>
>
> The “shared” means that the state files are shared among multiple
> checkpoints, which happens when you enable incremental checkpointing[1].
> Therefore, it’s reasonable that the size keeps growing if you set
> “state.checkpoint.num-retained” to be a big value.
>
>
> [1]
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
>
>
> Best,
> Jiayi Liao
>
>
>  Original Message
> Sender: shuwen zhou
> Recipient: dev
> Date: Tuesday, Nov 5, 2019 17:59
> Subject: RocksDB state on HDFS seems not being cleanned up
>
>
> Hi Community, I have a job running on Flink1.9.0 on YARN with rocksDB on
> HDFS with incremental checkpoint enabled. I have some MapState in code with
> following config: val ttlConfig = StateTtlConfig
> .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite()
> .cleanupInBackground() .cleanupFullSnapshot()
> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
> After running for around 2 days, I observed checkpoint folder is showing
> 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M
> /flink-chk743e4568a70b626837b/chk-41 91.7 M
> /flink-chk743e4568a70b626837b/chk-42 96.1 M
> /flink-chk743e4568a70b626837b/chk-43 48.1 M
> /flink-chk743e4568a70b626837b/chk-44 71.6 M
> /flink-chk743e4568a70b626837b/chk-45 50.9 M
> /flink-chk743e4568a70b626837b/chk-46 90.2 M
> /flink-chk743e4568a70b626837b/chk-37 49.3 M
> /flink-chk743e4568a70b626837b/chk-38 96.9 M
> /flink-chk743e4568a70b626837b/chk-39 797.9 G
> /flink-chk743e4568a70b626837b/shared The ./shared folder size seems
> continuing increasing and seems the folder is not being clean up. However
> while I disabled incremental cleanup, the expired full snapshot will be
> removed automatically. Is there any way to remove outdated state on HDFS to
> stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou



-- 
Best Wishes,
Shuwen Zhou


Re: RocksDB state on HDFS seems not being cleanned up

2019-11-05 Thread Till Rohrmann
Hi Shuwen,

I think the problem is that you configured state ttl to clean up on full
snapshots which aren't executed when using RocksDB with incremental
snapshots. Instead you need to activate `cleanupInRocksdbCompactFilter`:

val ttlConfig = StateTtlConfig
  .newBuilder(Time.minutes(30)
  .updateTtlOnCreateAndWrite()
  .cleanupInBackground()
  .cleanupInRocksdbCompactFilter()

.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)

Cheers,
Till

On Tue, Nov 5, 2019 at 4:04 PM shuwen zhou  wrote:

> Hi Jiayi,
> I understand that being shared folder means to store state of multiple
> checkpoints. I think that shared folder should only retain data across
> number “state.checkpoint.num-retained” checkpoints and remove outdated
> checkpoint, isn't it?
> In my case I doubt that outdated checkpoint's states wasn't cleaned up,
> which makes shared folder keep increasing even after TTL was passed.
>
>
> On Tue, 5 Nov 2019 at 21:13, bupt_ljy  wrote:
>
> > Hi Shuwen,
> >
> >
> > The “shared” means that the state files are shared among multiple
> > checkpoints, which happens when you enable incremental checkpointing[1].
> > Therefore, it’s reasonable that the size keeps growing if you set
> > “state.checkpoint.num-retained” to be a big value.
> >
> >
> > [1]
> >
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
> >
> >
> > Best,
> > Jiayi Liao
> >
> >
> >  Original Message
> > Sender: shuwen zhou
> > Recipient: dev
> > Date: Tuesday, Nov 5, 2019 17:59
> > Subject: RocksDB state on HDFS seems not being cleanned up
> >
> >
> > Hi Community, I have a job running on Flink1.9.0 on YARN with rocksDB on
> > HDFS with incremental checkpoint enabled. I have some MapState in code
> with
> > following config: val ttlConfig = StateTtlConfig
> > .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite()
> > .cleanupInBackground() .cleanupFullSnapshot()
> >
> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
> > After running for around 2 days, I observed checkpoint folder is showing
> > 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M
> > /flink-chk743e4568a70b626837b/chk-41 91.7 M
> > /flink-chk743e4568a70b626837b/chk-42 96.1 M
> > /flink-chk743e4568a70b626837b/chk-43 48.1 M
> > /flink-chk743e4568a70b626837b/chk-44 71.6 M
> > /flink-chk743e4568a70b626837b/chk-45 50.9 M
> > /flink-chk743e4568a70b626837b/chk-46 90.2 M
> > /flink-chk743e4568a70b626837b/chk-37 49.3 M
> > /flink-chk743e4568a70b626837b/chk-38 96.9 M
> > /flink-chk743e4568a70b626837b/chk-39 797.9 G
> > /flink-chk743e4568a70b626837b/shared The ./shared folder size seems
> > continuing increasing and seems the folder is not being clean up. However
> > while I disabled incremental cleanup, the expired full snapshot will be
> > removed automatically. Is there any way to remove outdated state on HDFS
> to
> > stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou
>
>
>
> --
> Best Wishes,
> Shuwen Zhou
>


Re: [VOTE] Accept Stateful Functions into Apache Flink

2019-11-05 Thread Stephan Ewen
Thanks, all, I love it when a quorum comes together!

Also, really cool to see all the votes from outside the PMC, thank you for
voicing your interest.

Result:
  - 19/25 PMC members voted, that is 76% of the PMC.
  - 20 non-PMC members voted.
  - 19 x +1 (binding)
  - 20 x +1 (non-binding)
  - 0 x -1
  - 0 x +/-0

Binding +1 votes:
  - Robert Metzger
  - Till Rohrmann
  - Timo Walther
  - Thomas Weise
  - Aljoscha Krettek
  - Kostas Kloudas
  - Stephan Ewen
  - Fabian Hueske
  - Kurt Young
  - Tzu-Li (Gordon) Tai
  - Becket Qin
  - Maximilian Michels
  - Ufuk Celebi
  - Vasiliki Kalavri
  - Chesnay Schepler
  - Jincheng Sun
  - Gyula Fóra
  - Greg Hogan
  - Márton Balassi

Non-binding +1 votes:
  - Zili Chen
  - Dian Fu
  - Hequn Cheng
  - Jark Wu
  - Biao Liu
  - Zhu Zhu
  - Vino Yang
  - Zhijiang
  - Haibo Sun
  - Yu Li
  - Jingsong Li
  - Terry Wang
  - Congxian Qiu
  - Igal Shilman
  - Andrey Zagrebin
  - Seth Wiesman
  - Yun Gao
  - Vijay Bhaskar
  - Konstantin Knauf
  - Paris Carbone

Best,
Stephan



On Mon, Nov 4, 2019 at 1:29 AM Paris Carbone  wrote:

> +1 from me as well!
> Looking forward to seeing stateful functions evolving within Flink.
>
> > On 2 Nov 2019, at 18:00, Márton Balassi 
> wrote:
> >
> > +1 (binding)
> >
> > Thank you for proposing this contribution!
> >
> > On Fri, Nov 1, 2019 at 2:46 PM Konstantin Knauf <
> konstan...@ververica.com>
> > wrote:
> >
> >> +1 (non-binding)
> >>
> >> Stateful Functions, already in its current initial release, simplifies
> the
> >> development of event-driven application on Flink quite significantly.
> >>
> >> On Thu, Oct 31, 2019 at 9:24 AM Vijay Bhaskar  >
> >> wrote:
> >>
> >>> +1 from me
> >>>
> >>> Regards
> >>> Bhaskar
> >>>
> >>> On Thu, Oct 31, 2019 at 11:42 AM Gyula Fóra 
> >> wrote:
> >>>
>  +1 from me, this is a great addition to Flink!
> 
>  Gyula
> 
>  On Thu, Oct 31, 2019, 03:52 Yun Gao 
> >>> wrote:
> 
> >+1 (non-binding)
> >Very thanks for bringing this to the community!
> >
> >
> > --
> > From:jincheng sun 
> > Send Time:2019 Oct. 31 (Thu.) 10:22
> > To:dev 
> > Cc:Vasiliki Kalavri 
> > Subject:Re: [VOTE] Accept Stateful Functions into Apache Flink
> >
> > big +1 (binding)
> >
> > Andrey Zagrebin 于2019年10月30日 周三23:45写道:
> >
> >> sorry, my +1 was non-binding, confused that it was not a committer
> >>> vote
> > but
> >> PMC.
> >>
> >> On Wed, Oct 30, 2019 at 4:43 PM Chesnay Schepler <
> >> ches...@apache.org
> 
> >> wrote:
> >>
> >>> +1 (binding)
> >>>
> >>> On 30/10/2019 15:25, Vasiliki Kalavri wrote:
>  +1 (binding) from me. I hope this is not too late :)
> 
>  Thank you for this great contribution!
> 
>  On Wed, 30 Oct 2019 at 14:45, Stephan Ewen 
>  wrote:
> 
> > Thank you all for voting.
> >
> > The voting period has passed, but only 13 PMC members have
> >> voted
>  so
> >> far,
> > that is less than 2/3rd of the PMCs (17 members).
> >
> > I will take a few days to ping other members to vote, after
> >> that
>  we
> >> will
> > gradually lower the threshold as per the process to account
> >> for
> >> inactive
> > members.
> >
> > Best,
> > Stephan
> >
> >
> >
> >
> > On Tue, Oct 29, 2019 at 6:20 PM Seth Wiesman <
> >>> sjwies...@gmail.com
> >
> >>> wrote:
> >
> >> +1 (non-binding)
> >>
> >> Seth
> >>
> >>> On Oct 23, 2019, at 9:31 PM, Jingsong Li <
>  jingsongl...@gmail.com>
> > wrote:
> >>> +1 (non-binding)
> >>>
> >>> Best,
> >>> Jingsong Lee
> >>>
>  On Wed, Oct 23, 2019 at 9:02 PM Yu Li 
>  wrote:
> 
>  +1 (non-binding)
> 
>  Best Regards,
>  Yu
> 
> 
> > On Wed, 23 Oct 2019 at 16:56, Haibo Sun <
> >> sunhaib...@163.com
> 
> >> wrote:
> >
> > +1 (non-binding)Best,
> > Haibo
> >
> >
> > At 2019-10-23 09:07:41, "Becket Qin" <
> >> becket@gmail.com>
> >> wrote:
> >> +1 (binding)
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Tue, Oct 22, 2019 at 11:44 PM Tzu-Li (Gordon) Tai <
>  tzuli...@apache.org
> >> wrote:
> >>
> >>> +1 (binding)
> >>>
> >>> Gordon
> >>>
> >>> On Tue, Oct 22, 2019, 10:58 PM Zhijiang <
> > wangzhijiang...@aliyun.com
> >>> .invalid>
> >>> wrote:
> >>>
>  +1 (non-bi

Re: RocksDB state on HDFS seems not being cleanned up

2019-11-05 Thread Yun Tang
@Till Rohrmann , I think just set `cleanupInBackground()` should be enough for 
RocksDB to clean up in compaction filter after Flink-1.9.0 [1]

@Shuwen , I have several questions for your behavior:
1. Is the ` flink-chk743e4568a70b626837b` real folder for checkpoints? I don't 
think a job-id would act like this.
2. why you have 10 checkpoints left under checkpoint folder, did you configure 
the retained checkpoints as 10?
3. what do you mean "while I disabled incremental cleanup, the expired full 
snapshot will be removed automatically." ? I cannot see that you have 
configured state ttl configure as `cleanupIncrementally()`, moreover, what is 
the actual meaning of "removed automatically"?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#cleanup-in-background

Best
Yun Tang

On 11/5/19, 11:24 PM, "Till Rohrmann"  wrote:

Hi Shuwen,

I think the problem is that you configured state ttl to clean up on full
snapshots which aren't executed when using RocksDB with incremental
snapshots. Instead you need to activate `cleanupInRocksdbCompactFilter`:

val ttlConfig = StateTtlConfig
  .newBuilder(Time.minutes(30)
  .updateTtlOnCreateAndWrite()
  .cleanupInBackground()
  .cleanupInRocksdbCompactFilter()


.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)

Cheers,
Till

On Tue, Nov 5, 2019 at 4:04 PM shuwen zhou  wrote:

> Hi Jiayi,
> I understand that being shared folder means to store state of multiple
> checkpoints. I think that shared folder should only retain data across
> number “state.checkpoint.num-retained” checkpoints and remove outdated
> checkpoint, isn't it?
> In my case I doubt that outdated checkpoint's states wasn't cleaned up,
> which makes shared folder keep increasing even after TTL was passed.
>
>
> On Tue, 5 Nov 2019 at 21:13, bupt_ljy  wrote:
>
> > Hi Shuwen,
> >
> >
> > The “shared” means that the state files are shared among multiple
> > checkpoints, which happens when you enable incremental checkpointing[1].
> > Therefore, it’s reasonable that the size keeps growing if you set
> > “state.checkpoint.num-retained” to be a big value.
> >
> >
> > [1]
> >
> 
https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
> >
> >
> > Best,
> > Jiayi Liao
> >
> >
> >  Original Message
> > Sender: shuwen zhou
> > Recipient: dev
> > Date: Tuesday, Nov 5, 2019 17:59
> > Subject: RocksDB state on HDFS seems not being cleanned up
> >
> >
> > Hi Community, I have a job running on Flink1.9.0 on YARN with rocksDB on
> > HDFS with incremental checkpoint enabled. I have some MapState in code
> with
> > following config: val ttlConfig = StateTtlConfig
> > .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite()
> > .cleanupInBackground() .cleanupFullSnapshot()
> >
> 
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
> > After running for around 2 days, I observed checkpoint folder is showing
> > 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M
> > /flink-chk743e4568a70b626837b/chk-41 91.7 M
> > /flink-chk743e4568a70b626837b/chk-42 96.1 M
> > /flink-chk743e4568a70b626837b/chk-43 48.1 M
> > /flink-chk743e4568a70b626837b/chk-44 71.6 M
> > /flink-chk743e4568a70b626837b/chk-45 50.9 M
> > /flink-chk743e4568a70b626837b/chk-46 90.2 M
> > /flink-chk743e4568a70b626837b/chk-37 49.3 M
> > /flink-chk743e4568a70b626837b/chk-38 96.9 M
> > /flink-chk743e4568a70b626837b/chk-39 797.9 G
> > /flink-chk743e4568a70b626837b/shared The ./shared folder size seems
> > continuing increasing and seems the folder is not being clean up. 
However
> > while I disabled incremental cleanup, the expired full snapshot will be
> > removed automatically. Is there any way to remove outdated state on HDFS
> to
> > stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou
>
>
>
> --
> Best Wishes,
> Shuwen Zhou
>




Re: [VOTE] FLIP-69: Flink SQL DDL Enhancement

2019-11-05 Thread Xuefu Z
+1 to the long missing feature in Flink SQL.

On Tue, Nov 5, 2019 at 6:32 AM Terry Wang  wrote:

> Hi all,
>
> I would like to start the vote for FLIP-69[1] which is discussed and
> reached consensus in the discussion thread[2].
>
> The vote will be open for at least 72 hours. I'll try to close it by
> 2019-11-08 14:30 UTC, unless there is an objection or not enough votes.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+69+-+Flink+SQL+DDL+Enhancement
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+69+-+Flink+SQL+DDL+Enhancement
> >
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-69-Flink-SQL-DDL-Enhancement-td33090.html
> <
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-69-Flink-SQL-DDL-Enhancement-td33090.html
> >
> Best,
> Terry Wang
>
>
>
>

-- 
Xuefu Zhang

"In Honey We Trust!"


[jira] [Created] (FLINK-14616) Clarify the ordering guarantees in the "The Broadcast State Pattern"

2019-11-05 Thread Filip Niksic (Jira)
Filip Niksic created FLINK-14616:


 Summary: Clarify the ordering guarantees in the "The Broadcast 
State Pattern"
 Key: FLINK-14616
 URL: https://issues.apache.org/jira/browse/FLINK-14616
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.9.1
Reporter: Filip Niksic


When talking about the order of events in [The Broadcast State 
Pattern|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/broadcast_state.html#important-considerations],
 the current documentation states that the downstream tasks must not assume the 
broadcast events to be ordered. However, this seems to be imprecise. According 
to the response I got from [~fhueske] to a 
[question|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Ordered-events-in-broadcast-state-tp30879.html]
 I sent to the Flink user mailing list:
{quote}The order of broadcasted inputs is not guaranteed when the operator that 
broadcasts its output has a parallelism > 1 because the tasks that receive the 
broadcasted input consume the records in "random" order from their input 
channels.
{quote}
In particular, when the parallelism of the broadcasting operator is 1, the 
order _is_ guaranteed.

[~fhueske] continues with his suggestions on how to ensure the correct ordering 
of the broadcast events:
{quote}So there are two approaches:
1) make the operator that broadcasts its output run as an operator with 
parallelism 1 (or add a MapOperator with parallelism 1 that just forwards its 
input). This will cause all broadcasted records to go through the same network 
channel and their order is guaranteed on each receiver.
2) use timestamps of broadcasted records for ordering and watermarks to reason 
about completeness.

If the broadcasted data is (comparatively) small in volume (which is usually 
given because otherwise broadcasting would be expensive), I'd go with the first 
option.
The second approach is more difficult to implement.
{quote}
It would be great if the ordering guarantees could be clarified to avoid 
confusion. This could be achieved by simply expanding the paragraph that talks 
about the order of events in the "important considerations" section. More 
ambitiously, the suggestions given by [~fhueske] could be turned into examples.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14617) Dataset Parquet ClassCastException for SpecificRecord

2019-11-05 Thread Jira
Dominik Wosiński created FLINK-14617:


 Summary: Dataset Parquet ClassCastException for SpecificRecord
 Key: FLINK-14617
 URL: https://issues.apache.org/jira/browse/FLINK-14617
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.8.0
 Environment: {code:java}
 {code}
Reporter: Dominik Wosiński


The following code runs smoothly when the _executionEnvironment_ is instance of 
_StreamExecutionEnvironment_:
{code:java}
val job = Job.getInstance()
AvroReadSupport.setAvroDataSupplier(job.getConfiguration, 
classOf[AvroDataSupplierWithTimestampConversion])
val avroParquetInputFormat = new AvroParquetInputFormat[GpsPointDTO]()
val hadoopInputFormat = new HadoopInputFormat[Void, 
GpsPointDTO](avroParquetInputFormat, classOf[Void], classOf[GpsPointDTO], job)
FileInputFormat.addInputPaths(job, filePaths.head)
executionEnvironment.createInput(hadoopInputFormat).map(_._2).print(){code}
But when the _ExecutionEnvironment_ is used instead of 
_StreamExecutionEnviroment,_ then the code throws the: 
{code:java}
Caused by: java.lang.ClassCastException: class 
org.apache.avro.generic.GenericData$Record cannot be cast to class 
com.company.GpsPointDTO (org.apache.avro.generic.GenericData$Record and 
com.company.GpsPointDTO are in unnamed module of loader 'app'){code}
I don't think this is the expected behavior. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-69: Flink SQL DDL Enhancement

2019-11-05 Thread Peter Huang
+1 for the enhancement.

On Tue, Nov 5, 2019 at 11:04 AM Xuefu Z  wrote:

> +1 to the long missing feature in Flink SQL.
>
> On Tue, Nov 5, 2019 at 6:32 AM Terry Wang  wrote:
>
> > Hi all,
> >
> > I would like to start the vote for FLIP-69[1] which is discussed and
> > reached consensus in the discussion thread[2].
> >
> > The vote will be open for at least 72 hours. I'll try to close it by
> > 2019-11-08 14:30 UTC, unless there is an objection or not enough votes.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+69+-+Flink+SQL+DDL+Enhancement
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+69+-+Flink+SQL+DDL+Enhancement
> > >
> > [2]
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-69-Flink-SQL-DDL-Enhancement-td33090.html
> > <
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-69-Flink-SQL-DDL-Enhancement-td33090.html
> > >
> > Best,
> > Terry Wang
> >
> >
> >
> >
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>


[jira] [Created] (FLINK-14618) Give more detailed debug information on akka framesize exception

2019-11-05 Thread Jacob Sevart (Jira)
Jacob Sevart created FLINK-14618:


 Summary: Give more detailed debug information on akka framesize 
exception
 Key: FLINK-14618
 URL: https://issues.apache.org/jira/browse/FLINK-14618
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Network
Affects Versions: 1.6.3
Reporter: Jacob Sevart


I'm hitting the akka framesize limit in production with some regularity, often 
when the job has been running for a long time and we try to deploy or restart. 
I suspect it's checkpoint related because clearing the checkpoint enables the 
job to start up. 

The 
[guidance|[https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html]]
 says:
{quote}If Flink fails because messages exceed this limit, then you should 
increase it.
{quote}
The [error 
message|[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java#L270]]
 is not very helpful towards that end. How large does it need to be? How do I 
know whether increasing the size will fix it, or if the message is unreasonably 
large due to a bug?

I'd like to modify the exception message to report the value of size. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-69: Flink SQL DDL Enhancement

2019-11-05 Thread Bowen Li
Hi Terry,

I went over the FLIP in detail again. The FLIP mostly LGTM. A couple issues:

- since we on't plan to support catalog ddl, can you remove them from the
FLIP?
- I found there are some discrepancies in proposed database and table DDLs.
  For db ddl, the create db syntax proposes specifying k-v properties
following "WITH". However, alter db ddl comes with a keyword "DBPROPERTIES":

CREATE  DATABASE [ IF NOT EXISTS ] [ catalogName.] dataBaseName [ COMMENT
database_comment ]
 [*WITH *( name=value [, name=value]*)]


ALTER  DATABASE  [ catalogName.] dataBaseName SET *DBPROPERTIES* (
name=value [, name=value]*)


IIUIC, are you borrowing syntax from Hive? Note that Hive's db create
ddl comes with "DBPROPERTIES" though - "CREATE (DATABASE|SCHEMA) [IF NOT
EXISTS] database_name ...  [*WITH DBPROPERTIES* (k=v, ...)];" [1]

   The same applies to table ddl. The proposed alter table ddl comes with
"SET *PROPERTIES* (...)", however, Flink's existing table create ddl since
1.9 [2] doesn't have "PROPERTIES" keyword. As opposed to Hive's syntax,
both create and alter table ddl comes with "TBLPROPERTIES" [1].

   I feel it's better to be consistent among our DDLs. One option is to
just remove the "PROPERTIES" and "DBPROPERTIES" keywords in proposed syntax.

[1] https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-ddl

On Tue, Nov 5, 2019 at 12:54 PM Peter Huang 
wrote:

> +1 for the enhancement.
>
> On Tue, Nov 5, 2019 at 11:04 AM Xuefu Z  wrote:
>
> > +1 to the long missing feature in Flink SQL.
> >
> > On Tue, Nov 5, 2019 at 6:32 AM Terry Wang  wrote:
> >
> > > Hi all,
> > >
> > > I would like to start the vote for FLIP-69[1] which is discussed and
> > > reached consensus in the discussion thread[2].
> > >
> > > The vote will be open for at least 72 hours. I'll try to close it by
> > > 2019-11-08 14:30 UTC, unless there is an objection or not enough votes.
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+69+-+Flink+SQL+DDL+Enhancement
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+69+-+Flink+SQL+DDL+Enhancement
> > > >
> > > [2]
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-69-Flink-SQL-DDL-Enhancement-td33090.html
> > > <
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-69-Flink-SQL-DDL-Enhancement-td33090.html
> > > >
> > > Best,
> > > Terry Wang
> > >
> > >
> > >
> > >
> >
> > --
> > Xuefu Zhang
> >
> > "In Honey We Trust!"
> >
>


Flunk savepoin(checkpoint) load api or debug

2019-11-05 Thread qq
Hi all,
   I want to load checkpoint or savepoint metadata on dev . in this case , I 
want to debug saved checkpoint metadata. And I knew flink provided a api which 
is Savepoint.load(env, path), but I can’t find it and can’t use it. Anyone who 
know about this ? Could you help me ? Thanks very much;



[jira] [Created] (FLINK-14619) Failed to fetch BLOB

2019-11-05 Thread liang yu (Jira)
liang yu created FLINK-14619:


 Summary: Failed to fetch BLOB
 Key: FLINK-14619
 URL: https://issues.apache.org/jira/browse/FLINK-14619
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.9.1
Reporter: liang yu


java.io.IOException: Failed to fetch BLOB 
e78e9574da4f5e4bdbc8de9678ebfb36/p-650534cd619de1069630141f1dcc9876d6ce2ce0-ee11ae52caa20ff81909708a783fd596
 from /: and store it under 
/hadoop/yarn/local/usercache/hdfs/appcache/application_1570784539965_0165/blobStore-79420f3a-6a83-40d4-8058-f01686a1ced8/incoming/temp-0072
at 
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:169)
at 
org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
at 
org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:202)
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:915)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:595)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not connect to BlobServer at address 
ambari11.fydata/172.16.20.11:10898
at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:100)
at 
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:143)
... 7 more
Caused by: java.net.SocketException: 打开的文件过多
at java.net.Socket.createImpl(Socket.java:460)
at java.net.Socket.connect(Socket.java:587)
at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:95)
... 8 more
 
I set the ulimit-a parameter of the server to 65, but this error will still 
occur. The server opening file descriptor with the error in the monitoring 
display is 5. Who can tell me why?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-69: Flink SQL DDL Enhancement

2019-11-05 Thread Terry Wang
Hi Bowen:
Thanks for your feedback. 
Your opinion convinced me and I just remove the section about catalog create 
statement and also remove `DBPROPERTIES` `PROPERTIES` from alter DDLs.
Open to more comments or votes :) !

Best,
Terry Wang



> 2019年11月6日 07:22,Bowen Li  写道:
> 
> Hi Terry,
> 
> I went over the FLIP in detail again. The FLIP mostly LGTM. A couple issues:
> 
> - since we on't plan to support catalog ddl, can you remove them from the
> FLIP?
> - I found there are some discrepancies in proposed database and table DDLs.
>  For db ddl, the create db syntax proposes specifying k-v properties
> following "WITH". However, alter db ddl comes with a keyword "DBPROPERTIES":
> 
> CREATE  DATABASE [ IF NOT EXISTS ] [ catalogName.] dataBaseName [ COMMENT
> database_comment ]
> [*WITH *( name=value [, name=value]*)]
> 
> 
> ALTER  DATABASE  [ catalogName.] dataBaseName SET *DBPROPERTIES* (
> name=value [, name=value]*)
> 
> 
>IIUIC, are you borrowing syntax from Hive? Note that Hive's db create
> ddl comes with "DBPROPERTIES" though - "CREATE (DATABASE|SCHEMA) [IF NOT
> EXISTS] database_name ...  [*WITH DBPROPERTIES* (k=v, ...)];" [1]
> 
>   The same applies to table ddl. The proposed alter table ddl comes with
> "SET *PROPERTIES* (...)", however, Flink's existing table create ddl since
> 1.9 [2] doesn't have "PROPERTIES" keyword. As opposed to Hive's syntax,
> both create and alter table ddl comes with "TBLPROPERTIES" [1].
> 
>   I feel it's better to be consistent among our DDLs. One option is to
> just remove the "PROPERTIES" and "DBPROPERTIES" keywords in proposed syntax.
> 
> [1] https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-ddl
> 
> On Tue, Nov 5, 2019 at 12:54 PM Peter Huang 
> wrote:
> 
>> +1 for the enhancement.
>> 
>> On Tue, Nov 5, 2019 at 11:04 AM Xuefu Z  wrote:
>> 
>>> +1 to the long missing feature in Flink SQL.
>>> 
>>> On Tue, Nov 5, 2019 at 6:32 AM Terry Wang  wrote:
>>> 
 Hi all,
 
 I would like to start the vote for FLIP-69[1] which is discussed and
 reached consensus in the discussion thread[2].
 
 The vote will be open for at least 72 hours. I'll try to close it by
 2019-11-08 14:30 UTC, unless there is an objection or not enough votes.
 
 [1]
 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+69+-+Flink+SQL+DDL+Enhancement
 <
 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+69+-+Flink+SQL+DDL+Enhancement
> 
 [2]
 
>>> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-69-Flink-SQL-DDL-Enhancement-td33090.html
 <
 
>>> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-69-Flink-SQL-DDL-Enhancement-td33090.html
> 
 Best,
 Terry Wang
 
 
 
 
>>> 
>>> --
>>> Xuefu Zhang
>>> 
>>> "In Honey We Trust!"
>>> 
>> 



[jira] [Created] (FLINK-14620) Rewrite the elasticsearch related end-to-end tests by using the newly introduce e2e java framework

2019-11-05 Thread Zheng Hu (Jira)
Zheng Hu created FLINK-14620:


 Summary: Rewrite the elasticsearch related end-to-end tests by 
using the newly introduce e2e java framework
 Key: FLINK-14620
 URL: https://issues.apache.org/jira/browse/FLINK-14620
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / ElasticSearch
Reporter: Zheng Hu


In FLINK-11463, we introduced a new java e2e framework ( which can run in 
maven).  Will rewrite the elasticsearch e2e tests by the new framework. Also I 
found that seems we have no sql e2e test for elasticsearch, maybe we can add 
that part. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-83: Flink End-to-end Performance Testing Framework

2019-11-05 Thread Yang Wang
Thanks Yu for starting this discussion.

I'm in favor of adding a e2e performance testing framework. Currently the
e2e tests are mainly focused
on functionality and written in shell. We need a better e2e framework for
performance and functionality tests.


Best,
Yang

Biao Liu  于2019年11月5日周二 上午10:16写道:

> Thanks Yu for bringing this topic.
>
> +1 for this proposal. Glad to have an e2e performance testing.
>
> It seems this proposal is separated into several stages. Is there a more
> detailed plan?
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Mon, 4 Nov 2019 at 19:54, Congxian Qiu  wrote:
>
> > +1 for this idea.
> >
> > Currently, we have the micro benchmark for flink, which can help us find
> > the regressions. And I think the e2e jobs performance testing can also
> help
> > us to cover more scenarios.
> >
> > Best,
> > Congxian
> >
> >
> > Jingsong Li  于2019年11月4日周一 下午5:37写道:
> >
> > > +1 for the idea. Thanks Yu for driving this.
> > > Just curious about that can we collect the metrics about Job scheduling
> > and
> > > task launch. the speed of this part is also important.
> > > We can add tests for watch it too.
> > >
> > > Look forward to more batch test support.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Mon, Nov 4, 2019 at 10:00 AM OpenInx  wrote:
> > >
> > > > > The test cases are written in java and scripts in python. We
> propose
> > a
> > > > separate directory/module in parallel with flink-end-to-end-tests,
> with
> > > the
> > > > > name of flink-end-to-end-perf-tests.
> > > >
> > > > Glad to see that the newly introduced e2e test will be written in
> Java.
> > > > because  I'm re-working on the existed e2e tests suites from BASH
> > scripts
> > > > to Java test cases so that we can support more external system , such
> > as
> > > > running the testing job on yarn+flink, docker+flink,
> standalone+flink,
> > > > distributed kafka cluster etc.
> > > > BTW, I think the perf e2e test suites will also need to be designed
> as
> > > > supporting running on both standalone env and distributed env. will
> be
> > > > helpful
> > > > for developing & evaluating the perf.
> > > > Thanks.
> > > >
> > > > On Mon, Nov 4, 2019 at 9:31 AM aihua li 
> wrote:
> > > >
> > > > > In stage1, the checkpoint mode isn't disabled,and uses heap as the
> > > > > statebackend.
> > > > > I think there should be some special scenarios to test checkpoint
> and
> > > > > statebackend, which will be discussed and added in the release-1.11
> > > > >
> > > > > > 在 2019年11月2日,上午12:13,Yun Tang  写道:
> > > > > >
> > > > > > By the way, do you think it's worthy to add a checkpoint mode
> which
> > > > just
> > > > > disable checkpoint to run end-to-end jobs? And when will stage2 and
> > > > stage3
> > > > > be discussed in more details?
> > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>


Re: RocksDB state on HDFS seems not being cleanned up

2019-11-05 Thread shuwen zhou
Hi Yun and Till,
Thank you for your response.
For @Yun
1. No, I just renamed the checkpoint directory name since the directory
name contains company data. Sorry for the confusion.
2. Yes, I set

state.checkpoints.num-retained: 10
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED

In flink.conf

I was expecting, shared folder will no longer contains outdated state,
since my TTL is set to 30 mins, I shouldn't have seen date older than
1 day. However I could still see those outdated data in shared folder

For example, current time is 2019-11-06 03:58:00 UTC, I could see
following file on HDFS

65.1 M 2019-11-04 17:58
/flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/03dea380-758b-4d52-b335-5e6318ba6c40
2.1 K 2019-11-04 17:28
/flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/1205f112-f5ba-4516-ae32-1424afda08ac
65.1 M 2019-11-04 17:58
/flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/2298e34d-8cdc-4f8a-aac0-76cf4b9ac0f5
65.1 M 2019-11-04 17:58
/flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/25e58576-f86f-4ac9-83b8-08ce0be036c4
65.1 M 2019-11-05 17:42
/flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/27031a93-3ae5-4247-a751-62552c29f325

3.I actually mean that, only latest 10 checkpoint containing full
state will be retained on HDFS. In my case, around 20G for each
checkpoint. In such way I could have control on how much data was
stored on HDFS, Rather than having a increasing shared folder.

But it takes a lot of time to store full state on HDFS. Thus I would
still like to use incremental.



For @Till

I would have a try on cleanupInRocksdbCompactFilter to see if it
works. Thank you.


On Wed, 6 Nov 2019 at 01:50, Yun Tang  wrote:

> @Till Rohrmann , I think just set `cleanupInBackground()` should be enough
> for RocksDB to clean up in compaction filter after Flink-1.9.0 [1]
>
> @Shuwen , I have several questions for your behavior:
> 1. Is the ` flink-chk743e4568a70b626837b` real folder for checkpoints? I
> don't think a job-id would act like this.
> 2. why you have 10 checkpoints left under checkpoint folder, did you
> configure the retained checkpoints as 10?
> 3. what do you mean "while I disabled incremental cleanup, the expired
> full snapshot will be removed automatically." ? I cannot see that you have
> configured state ttl configure as `cleanupIncrementally()`, moreover, what
> is the actual meaning of "removed automatically"?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#cleanup-in-background
>
> Best
> Yun Tang
>
> On 11/5/19, 11:24 PM, "Till Rohrmann"  wrote:
>
> Hi Shuwen,
>
> I think the problem is that you configured state ttl to clean up on
> full
> snapshots which aren't executed when using RocksDB with incremental
> snapshots. Instead you need to activate
> `cleanupInRocksdbCompactFilter`:
>
> val ttlConfig = StateTtlConfig
>   .newBuilder(Time.minutes(30)
>   .updateTtlOnCreateAndWrite()
>   .cleanupInBackground()
>   .cleanupInRocksdbCompactFilter()
>
>
> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
>
> Cheers,
> Till
>
> On Tue, Nov 5, 2019 at 4:04 PM shuwen zhou  wrote:
>
> > Hi Jiayi,
> > I understand that being shared folder means to store state of
> multiple
> > checkpoints. I think that shared folder should only retain data
> across
> > number “state.checkpoint.num-retained” checkpoints and remove
> outdated
> > checkpoint, isn't it?
> > In my case I doubt that outdated checkpoint's states wasn't cleaned
> up,
> > which makes shared folder keep increasing even after TTL was passed.
> >
> >
> > On Tue, 5 Nov 2019 at 21:13, bupt_ljy  wrote:
> >
> > > Hi Shuwen,
> > >
> > >
> > > The “shared” means that the state files are shared among multiple
> > > checkpoints, which happens when you enable incremental
> checkpointing[1].
> > > Therefore, it’s reasonable that the size keeps growing if you set
> > > “state.checkpoint.num-retained” to be a big value.
> > >
> > >
> > > [1]
> > >
> >
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
> > >
> > >
> > > Best,
> > > Jiayi Liao
> > >
> > >
> > >  Original Message
> > > Sender: shuwen zhou
> > > Recipient: dev
> > > Date: Tuesday, Nov 5, 2019 17:59
> > > Subject: RocksDB state on HDFS seems not being cleanned up
> > >
> > >
> > > Hi Community, I have a job running on Flink1.9.0 on YARN with
> rocksDB on
> > > HDFS with incremental checkpoint enabled. I have some MapState in
> code
> > with
> > > following config: val ttlConfig = StateTtlConfig
> > > .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite()
> > > .cleanupInBackground() .cleanupFullSnapshot()
> > >
> >
> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
> > > A

[jira] [Created] (FLINK-14621) Do not generate watermark assigner operator if no time attribute operations on rowtime

2019-11-05 Thread Jark Wu (Jira)
Jark Wu created FLINK-14621:
---

 Summary: Do not generate watermark assigner operator if no time 
attribute operations on rowtime
 Key: FLINK-14621
 URL: https://issues.apache.org/jira/browse/FLINK-14621
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jark Wu


Currently, if watermark is specified in DDL, there is always a watermark 
assigner operator generated. However, it is only needed when time attribute 
operations (e.g. windows) on the rowtime. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14622) Cooperate WatermarkSpec in TableSourceScan with minibatch configuration

2019-11-05 Thread Jark Wu (Jira)
Jark Wu created FLINK-14622:
---

 Summary: Cooperate WatermarkSpec in TableSourceScan with minibatch 
configuration
 Key: FLINK-14622
 URL: https://issues.apache.org/jira/browse/FLINK-14622
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jark Wu


Currently, watermark specified in DDL can't work with minibatch configuration. 
We should support it in {{MiniBatchIntervalInferRule}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14623) Add computed column information into TableSchema

2019-11-05 Thread Danny Chen (Jira)
Danny Chen created FLINK-14623:
--

 Summary: Add computed column information into TableSchema
 Key: FLINK-14623
 URL: https://issues.apache.org/jira/browse/FLINK-14623
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.9.1
Reporter: Danny Chen
 Fix For: 1.10.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14624) Support computed column as rowtime attribute

2019-11-05 Thread Jark Wu (Jira)
Jark Wu created FLINK-14624:
---

 Summary: Support computed column as rowtime attribute
 Key: FLINK-14624
 URL: https://issues.apache.org/jira/browse/FLINK-14624
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jark Wu


After 
[FLIP-70|https://cwiki.apache.org/confluence/display/FLINK/FLIP-70%3A+Flink+SQL+Computed+Column+Design]
 computed column is supported, we should support computed column as rowtime 
attribute. This is a very useful feature when rowtime attribute is not in 
existing columns. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14625) Eliminate cross join in multi join to reduce cost

2019-11-05 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-14625:
--

 Summary: Eliminate cross join in multi join to reduce cost 
 Key: FLINK-14625
 URL: https://issues.apache.org/jira/browse/FLINK-14625
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.9.1
Reporter: Leonard Xu


cross join always has  higher cost than other joins, if we can eliminate cross 
join in multi join scene as much as possible,we can obtain  better performance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14626) User jar packaged with hadoop dependencies may cause class conflit with hadoop jars on yarn

2019-11-05 Thread Victor Wong (Jira)
Victor Wong created FLINK-14626:
---

 Summary: User jar packaged with hadoop dependencies may cause 
class conflit with hadoop jars on yarn
 Key: FLINK-14626
 URL: https://issues.apache.org/jira/browse/FLINK-14626
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.9.1
Reporter: Victor Wong


Currently, the yarn application classpath is placed behind Flink classpath 
(including user jars), which will cause conflict if the user jar accidentally 
included hadoop dependencies.
{code:java}
// org.apache.flink.yarn.Utils#setupYarnClassPath
public static void setupYarnClassPath(Configuration conf, Map 
appMasterEnv) {
   addToEnvironment(
  appMasterEnv,
  Environment.CLASSPATH.name(),
  appMasterEnv.get(ENV_FLINK_CLASSPATH));
   String[] applicationClassPathEntries = conf.getStrings(
  YarnConfiguration.YARN_APPLICATION_CLASSPATH,
  YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
   for (String c : applicationClassPathEntries) {
  addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
   }
}
{code}
Maybe we should place the user jars behind yarn application classpath when 
`org.apache.flink.yarn.configuration.YarnConfigOptions.UserJarInclusion` is set 
to LAST, like this "flink-xxx.jar:hadoop-xxx.jar:user-xxx.jar".

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Flunk savepoin(checkpoint) load api or debug

2019-11-05 Thread Jark Wu
Hi,

Savepoint.load(env, path) is in state processor API library, you should add
the following dependency in your project.


  org.apache.flink
  flink-state-processor-api_2.11
  1.9.1



You can see the docuementation for more detailed instructions [1].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html

On Wed, 6 Nov 2019 at 09:21, qq <471237...@qq.com> wrote:

> Hi all,
>I want to load checkpoint or savepoint metadata on dev . in this case ,
> I want to debug saved checkpoint metadata. And I knew flink provided a api
> which is Savepoint.load(env, path), but I can’t find it and can’t use it.
> Anyone who know about this ? Could you help me ? Thanks very much;
>
>


Re: Flunk savepoin(checkpoint) load api or debug

2019-11-05 Thread Jark Wu
Btw, user questions should be asked in user@f.a.o or user-zh@f.a.o. The dev
ML is mainly used to discuss development.

Best,
Jark

On Wed, 6 Nov 2019 at 15:36, Jark Wu  wrote:

> Hi,
>
> Savepoint.load(env, path) is in state processor API library, you should
> add the following dependency in your project.
>
> 
>   org.apache.flink
>   flink-state-processor-api_2.11
>   1.9.1
> 
>
>
> You can see the docuementation for more detailed instructions [1].
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
>
> On Wed, 6 Nov 2019 at 09:21, qq <471237...@qq.com> wrote:
>
>> Hi all,
>>I want to load checkpoint or savepoint metadata on dev . in this case
>> , I want to debug saved checkpoint metadata. And I knew flink provided a
>> api which is Savepoint.load(env, path), but I can’t find it and can’t use
>> it. Anyone who know about this ? Could you help me ? Thanks very much;
>>
>>


Re: [DISCUSS] FLIP-84: Improve & Refactor API of Table Module

2019-11-05 Thread Jark Wu
Hi Terry,

I would suggest to change the title a bit.
For example, "Improve & Refactor TableEnvironment APIs".
Or more specifically, "Improve & Refactor TableEnvironment
execute/sqlQuery/sqlUpdate.. APIs"

Currently, the title is a little wide (there are so many APIs in table
module) .
Make the title more specifically can attract more people who care about it.

Best,
Jark



On Tue, 5 Nov 2019 at 14:51, Kurt Young  wrote:

> cc @Fabian here, thought you might be interesting to review this.
>
> Best,
> Kurt
>
>
> On Thu, Oct 31, 2019 at 1:39 PM Kurt Young  wrote:
>
> > Thanks Terry for bringing this up. TableEnv's interface is really
> critical
> > not only
> > to users, but also for components built upon it like SQL CLI. Your
> > proposal
> > solved some pain points we currently have, so +1 to the proposal.
> >
> > I left some comments in the document.
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Oct 31, 2019 at 10:38 AM Terry Wang  wrote:
> >
> >> Hi everyone,
> >>
> >> TableEnvironment has provided two `Table sqlQuery(String sql)` and `void
> >> sqlUpdate(String sql)` interfaces to create a table(actually a view
> here)
> >> or describe an update action from one sql string.
> >> But with more use cases come up, there are some fatal shortcomings in
> >> current API design. Such as  `sqlUpdate()` don’t support get a return
> value
> >> and buggy support for buffer sql exception and so on.
> >>
> >> So I’d like to kick off a discussion on improvement and refactor the api
> >> of table module:
> >>
> >> google doc:
> >>
> https://docs.google.com/document/d/19-mdYJjKirh5aXCwq1fDajSaI09BJMMT95wy_YhtuZk/edit?usp=sharing
> >> <
> >>
> https://docs.google.com/document/d/19-mdYJjKirh5aXCwq1fDajSaI09BJMMT95wy_YhtuZk/edit?usp=sharing
> >> >
> >> Flip link:
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >> <
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >> >
> >>
> >> In short, it:
> >> - Discuss buffering sql execute problem
> >> - Discuss current `sqlQuery/sqlUpdate` and propose two new api
> >> - Introduce one new `executeBatch` method to support batch sql
> >> execute
> >> - Discuss how SQL CLI should deal with multiple statements
> >>
> >> Looking forward to all your guys comments.
> >>
> >> Best,
> >> Terry Wang
> >>
> >>
> >>
> >>
>


Re: RocksDB state on HDFS seems not being cleanned up

2019-11-05 Thread Yun Tang
Hi Shuwen

Since you just have 10 “chk-“ folders as expected and when subsuming 
checkpoints, the “chk-” folder would be removed after we successfully removed 
shared state [1]. That is to say, I think you might not have too many orphan 
states files left. To ensure this, you could use state process API [2] to load 
your checkpoints and compare all the files under “shared” folder to see whether 
there existed too many orphan files. If this is true, we might think of the 
custom compaction filter future of FRocksDB.

Secondly, your judgment of “20GB each checkpoint” might not be accurate when 
RocksDB incremental checkpoint is enabled, the UI showed is only the 
incremental size [3], I suggest you to count your files’s size within your 
checkpoint meta to know the accurate checkpoint size for each checkpoint.

Last but not least, RocksDB’s future of compaction filter to delete expired 
data only happened during compaction [4], I’m afraid you might need to look up 
your rocksDB’s LOG file to see the frequency of compaction on task managers. 
And I think the increasing size might be related with the interval of your 
checkpoints, what the interval when you executing checkpoints?


[1] 
https://github.com/apache/flink/blob/2ea14169a1997434d45d6f1da6dfe9acd6bd8da3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L264
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
[3] https://issues.apache.org/jira/browse/FLINK-13390
[4] 
https://github.com/facebook/rocksdb/blob/834feaff05a4bf7ae49c736305d5eb180aed4011/include/rocksdb/compaction_filter.h#L61

Best
Yun Tang

From: shuwen zhou 
Date: Wednesday, November 6, 2019 at 12:02 PM
To: dev , Yun Tang , Till Rohrmann 

Subject: Re: RocksDB state on HDFS seems not being cleanned up

Hi Yun and Till,
Thank you for your response.
For @Yun
1. No, I just renamed the checkpoint directory name since the directory name 
contains company data. Sorry for the confusion.
2. Yes, I set

state.checkpoints.num-retained: 10
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED

In flink.conf

I was expecting, shared folder will no longer contains outdated state, since my 
TTL is set to 30 mins, I shouldn't have seen date older than 1 day. However I 
could still see those outdated data in shared folder

For example, current time is 2019-11-06 03:58:00 UTC, I could see following 
file on HDFS
65.1 M 2019-11-04 17:58 
/flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/03dea380-758b-4d52-b335-5e6318ba6c40
2.1 K 2019-11-04 17:28 
/flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/1205f112-f5ba-4516-ae32-1424afda08ac
65.1 M 2019-11-04 17:58 
/flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/2298e34d-8cdc-4f8a-aac0-76cf4b9ac0f5
65.1 M 2019-11-04 17:58 
/flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/25e58576-f86f-4ac9-83b8-08ce0be036c4
65.1 M 2019-11-05 17:42 
/flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/27031a93-3ae5-4247-a751-62552c29f325

3.I actually mean that, only latest 10 checkpoint containing full state will be 
retained on HDFS. In my case, around 20G for each checkpoint. In such way I 
could have control on how much data was stored on HDFS, Rather than having a 
increasing shared folder.

But it takes a lot of time to store full state on HDFS. Thus I would still like 
to use incremental.





For @Till

I would have a try on cleanupInRocksdbCompactFilter to see if it works. Thank 
you.

On Wed, 6 Nov 2019 at 01:50, Yun Tang 
mailto:myas...@live.com>> wrote:
@Till Rohrmann , I think just set `cleanupInBackground()` should be enough for 
RocksDB to clean up in compaction filter after Flink-1.9.0 [1]

@Shuwen , I have several questions for your behavior:
1. Is the ` flink-chk743e4568a70b626837b` real folder for checkpoints? I don't 
think a job-id would act like this.
2. why you have 10 checkpoints left under checkpoint folder, did you configure 
the retained checkpoints as 10?
3. what do you mean "while I disabled incremental cleanup, the expired full 
snapshot will be removed automatically." ? I cannot see that you have 
configured state ttl configure as `cleanupIncrementally()`, moreover, what is 
the actual meaning of "removed automatically"?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#cleanup-in-background

Best
Yun Tang

On 11/5/19, 11:24 PM, "Till Rohrmann" 
mailto:trohrm...@apache.org>> wrote:

Hi Shuwen,

I think the problem is that you configured state ttl to clean up on full
snapshots which aren't executed when using Rocks