Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-29 Thread Stephan Ewen
@Becket One thing that may be non-obvious is that the Configuration class
also defines serialization / persistence logic at the moment. So it needs
to know the set of types it supports. That stands in the way of an
arbitrary generic map type.

@Timo I agree though that it seems a bit inconsistent to have one
collection orthogonal to the type (List) and another one bound to specific
types (Map).

On Thu, Aug 29, 2019 at 8:20 AM Becket Qin  wrote:

> Hi Timo,
>
> Thanks for the proposal. Sorry for the late comments, but I have a few
> questions / comments.
>
> 1. Is a new field of isList necessary in the ConfigOption?
> Would it be enough to just check the atomicClass to see if it is a List or
> not?
> Also, in the ConfigOption class case, are we always assume both key
> and value types are String? Can we just apply the same to the
> ConfigOption?
> BTW, I did a quick search in the codebase but did not find any usage of
> ConfigOption.
>
> 2. The same config name, but with two ConfigOption with different semantic
> in different component seems super confusing. For example, when users set
> both configs, they may have no idea one is overriding the other. There
> might be two cases:
>  - If it is just the same config used by different components to act
> accordingly, it might be better to just have one config, but describe
> clearly on how that config will be used.
>  - If it is actually two configurations that can be set differently, I
> think the config names should just be different.
>
> 3. Regarding the ConfigurableFactory, is the toConfiguration() method
> pretty much means getConfiguration()? The toConfiguration() method sounds
> like converting an object to a configuration, which only works if the
> object does not contain any state / value. I am also wondering if there is
> a real use case of this method. Because supposedly the configurations could
> just be passed around to caller of this method.
>
> Also, can you put the proposal into the FLIP wiki instead of in the Google
> doc before voting? The FLIP wiki allows track the modification history and
> has a more established structure to ensure nothing is missed.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Aug 27, 2019 at 11:34 PM Timo Walther  wrote:
>
> > Hi everyone,
> >
> > I updated the FLIP proposal one more time as mentioned in the voting
> > thread. If there are no objections, I will start a new voting thread
> > tomorrow at 9am Berlin time.
> >
> > Thanks,
> > Timo
> >
> >
> > On 22.08.19 14:19, Timo Walther wrote:
> > > Hi everyone,
> > >
> > > thanks for all the feedback we have received online and offline. It
> > > showed that many people support the idea of evolving the Flink
> > > configuration functionality. I'm almost sure that this FLIP will not
> > > solve all issues but at least will improve the current status.
> > >
> > > We've updated the document and replaced the Correlation part with the
> > > concept of a ConfigOptionGroup that can provide all available options
> > > of a group plus custom group validators for eager validation. For now,
> > > this eager group validation will only be used at certain locations in
> > > the Flink code but it prepares for maybe validating the entire global
> > > configuration before submitting a job in the future.
> > >
> > > Please take another look if you find time. I hope we can proceed with
> > > the voting process if there are no objections.
> > >
> > > Regards,
> > > Timo
> > >
> > > Am 19.08.19 um 12:54 schrieb Timo Walther:
> > >> Hi Stephan,
> > >>
> > >> thanks for your suggestions. Let me give you some background about
> > >> the decisions made in this FLIP:
> > >>
> > >> 1. Goal: The FLIP is labelled "evolve" not "rework" because we did
> > >> not want to change the entire configuration infrastructure. Both for
> > >> backwards-compatibility reasons and the amount of work that would be
> > >> required to update all options. If our goal is to rework the
> > >> configuration option entirely, I might suggest to switch to JSON
> > >> format with JSON schema and JSON validator. However, setting
> > >> properties in a CLI or web interface becomes more tricky the more
> > >> nested structures are allowed.
> > >>
> > >> 2. Class-based Options: The current ConfigOption class is centered
> > >> around Java classes where T is determined by the default value. The
> > >> FLIP just makes this more explicit by offering an explicit
> > >> `intType()` method etc. The current design of validators centered
> > >> around Java classes makes it possible to have typical domain
> > >> validators baked by generics as you suggested. If we introduce types
> > >> such as "quantity with measure and unit" we still need to get a class
> > >> out of this option at the end, so why changing a proven concept?
> > >>
> > >> 3. List Options: The `isList` prevents having arbitrary nesting. As
> > >> Dawid mentioned, we kept human readability in mind. For every atomic
> > >> option like "key=12" can be represented by a 

[jira] [Created] (FLINK-13888) Translate "How To Contribute" page into Chinese

2019-08-29 Thread Lord i Will (Jira)
Lord i Will created FLINK-13888:
---

 Summary: Translate "How To Contribute" page into Chinese
 Key: FLINK-13888
 URL: https://issues.apache.org/jira/browse/FLINK-13888
 Project: Flink
  Issue Type: Task
  Components: chinese-translation, Project Website
Reporter: Lord i Will


Translate page [https://flink.apache.org/contributing/how-to-contribute.html] 
into Chinese. The page is located in 
[https://github.com/apache/flink-web/blob/asf-site/contributing/how-to-contribute.zh.md].



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13889) sql client fetch result oom

2019-08-29 Thread richt richt (Jira)
richt richt created FLINK-13889:
---

 Summary: sql client fetch result oom
 Key: FLINK-13889
 URL: https://issues.apache.org/jira/browse/FLINK-13889
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.8.1
Reporter: richt richt


i submit sql by sql-client to flink on yarn .

in the flink dashboard , it said the job is finished ,

but the client get this error
{code:java}
//代码占位符
[ERROR] Could not execute SQL statement. Reason:[ERROR] Could not execute SQL 
statement. Reason:org.apache.flink.runtime.rest.util.RestClientException: 
[Internal server error., ]
{code}
 

And yarn web ui i got message as below
{code:java}
//代码占位符
 ERROR org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
Unhandled exception. ERROR 
org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
Unhandled exception.java.lang.OutOfMemoryError: Java heap space at 
java.util.Arrays.copyOf(Arrays.java:3332) at 
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
 at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:596) at 
java.lang.StringBuffer.append(StringBuffer.java:367) at 
java.io.StringWriter.write(StringWriter.java:94) at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator._flushBuffer(WriterBasedJsonGenerator.java:1946)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator._writeBinary(WriterBasedJsonGenerator.java:1485)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.writeBinary(WriterBasedJsonGenerator.java:602)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator.writeBinary(JsonGenerator.java:1163)
 at 
org.apache.flink.runtime.rest.messages.json.SerializedValueSerializer.serialize(SerializedValueSerializer.java:54)
 at 
org.apache.flink.runtime.rest.messages.json.JobResultSerializer.serialize(JobResultSerializer.java:92)
 at 
org.apache.flink.runtime.rest.messages.json.JobResultSerializer.serialize(JobResultSerializer.java:41)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:719)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3905)
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3201)
 at 
org.apache.flink.runtime.rest.handler.util.HandlerUtils.sendResponse(HandlerUtils.java:81)
 at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.lambda$respondToRequest$0(AbstractRestHandler.java:78)
 at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler$$Lambda$119/1537887475.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:821)
 at akka.dispatch.OnComplete.internal(Future.scala:264) at 
akka.dispatch.OnComplete.internal(Future.scala:261) at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
{code}
i tried  some parameter below 

./bin/yarn-session.sh -s 4 --container 64 -tm 8024m -m 12000m -yjm12100m -m5096

sql client parameter max-table-result-rows

and they seem dose not work.

how to trace the error  with flink on yarn , how to find the  container log for 
flink ?

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13890) HiveCatalogUseBlinkITCase failed to get metastore connection

2019-08-29 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-13890:
--

 Summary: HiveCatalogUseBlinkITCase failed to get metastore 
connection
 Key: FLINK-13890
 URL: https://issues.apache.org/jira/browse/FLINK-13890
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive, Tests
Affects Versions: 1.10.0
Reporter: Piotr Nowojski


Failed on master with

{code:java}
Could not connect to meta store using any of the URIs provided. Most recent 
failure: org.apache.thrift.transport.TTransportException: 
java.net.ConnectException: Connection refused (Connection refused)
{code}

https://api.travis-ci.org/v3/job/578208116/log.txt

CC [~lzljs3620320] [~ykt836]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-08-29 Thread Stephan Ewen
When computing the values in the JVM process after it started, how would
you deal with values like Max Direct Memory, Metaspace size. native memory
reservation (reduce heap size), etc? All the values that are parameters to
the JVM process and that need to be supplied at process startup?

On Wed, Aug 28, 2019 at 4:46 PM Till Rohrmann  wrote:

> Thanks for the clarification. I have some more comments:
>
> - I would actually split the logic to compute the process memory
> requirements and storing the values into two things. E.g. one could name
> the former TaskExecutorProcessUtility and  the latter
> TaskExecutorProcessMemory. But we can discuss this on the PR since it's
> just a naming detail.
>
> - Generally, I'm not opposed to making configuration values overridable by
> ENV variables. I think this is a very good idea and makes the
> configurability of Flink processes easier. However, I think that adding
> this functionality should not be part of this FLIP because it would simply
> widen the scope unnecessarily.
>
> The reasons why I believe it is unnecessary are the following: For Yarn we
> already create write a flink-conf.yaml which could be populated with the
> memory settings. For the other processes it should not make a difference
> whether the loaded Configuration is populated with the memory settings from
> ENV variables or by using TaskExecutorProcessUtility to compute the missing
> values from the loaded configuration. If the latter would not be possible
> (wrong or missing configuration values), then we should not have been able
> to actually start the process in the first place.
>
> - Concerning the memory reservation: I agree with you that we need the
> memory reservation functionality to make streaming jobs work with "managed"
> memory. However, w/o this functionality the whole Flip would already bring
> a good amount of improvements to our users when running batch jobs.
> Moreover, by keeping the scope smaller we can complete the FLIP faster.
> Hence, I would propose to address the memory reservation functionality as a
> follow up FLIP (which Yu is working on if I'm not mistaken).
>
> Cheers,
> Till
>
> On Wed, Aug 28, 2019 at 11:43 AM Yang Wang  wrote:
>
> > Just add my 2 cents.
> >
> > Using environment variables to override the configuration for different
> > taskmanagers is better.
> > We do not need to generate dedicated flink-conf.yaml for all
> taskmanagers.
> > A common flink-conf.yam and different environment variables are enough.
> > By reducing the distributed cached files, it could make launching a
> > taskmanager faster.
> >
> > Stephan gives a good suggestion that we could move the logic into
> > "GlobalConfiguration.loadConfig()" method.
> > Maybe the client could also benefit from this. Different users do not
> have
> > to export FLINK_CONF_DIR to update few config options.
> >
> >
> > Best,
> > Yang
> >
> > Stephan Ewen  于2019年8月28日周三 上午1:21写道:
> >
> > > One note on the Environment Variables and Configuration discussion.
> > >
> > > My understanding is that passed ENV variables are added to the
> > > configuration in the "GlobalConfiguration.loadConfig()" method (or
> > > similar).
> > > For all the code inside Flink, it looks like the data was in the config
> > to
> > > start with, just that the scripts that compute the variables can pass
> the
> > > values to the process without actually needing to write a file.
> > >
> > > For example the "GlobalConfiguration.loadConfig()" method would take
> any
> > > ENV variable prefixed with "flink" and add it as a config key.
> > > "flink_taskmanager_memory_size=2g" would become
> "taskmanager.memory.size:
> > > 2g".
> > >
> > >
> > > On Tue, Aug 27, 2019 at 4:05 PM Xintong Song 
> > > wrote:
> > >
> > > > Thanks for the comments, Till.
> > > >
> > > > I've also seen your comments on the wiki page, but let's keep the
> > > > discussion here.
> > > >
> > > > - Regarding 'TaskExecutorSpecifics', how do you think about naming it
> > > > 'TaskExecutorResourceSpecifics'.
> > > > - Regarding passing memory configurations into task executors, I'm in
> > > favor
> > > > of do it via environment variables rather than configurations, with
> the
> > > > following two reasons.
> > > >   - It is easier to keep the memory options once calculate not to be
> > > > changed with environment variables rather than configurations.
> > > >   - I'm not sure whether we should write the configuration in startup
> > > > scripts. Writing changes into the configuration files when running
> the
> > > > startup scripts does not sounds right to me. Or we could make a copy
> of
> > > > configuration files per flink cluster, and make the task executor to
> > load
> > > > from the copy, and clean up the copy after the cluster is shutdown,
> > which
> > > > is complicated. (I think this is also what Stephan means in his
> comment
> > > on
> > > > the wiki page?)
> > > > - Regarding reserving memory, I think this change should be included
> in
> > > > this FLIP. I thin

Re: [DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

2019-08-29 Thread Aljoscha Krettek
Overall, this is a very nice development that should also simplify the code 
base once we deprecate the expression parser!

Regarding method names, I agree with Seth that values/literals should use 
something like “lit()”. I also think that for column references we could use 
“col()” to make it clear that it is a column reference. What do you think?

Aljoscha

> On 28. Aug 2019, at 15:59, Seth Wiesman  wrote:
> 
> I would prefer ‘lit()’ over  ‘val()’ since val is a keyword in Scala. 
> Assuming the intention is to make the dsl ergonomic for Scala developers.
> 
> Seth 
> 
>> On Aug 28, 2019, at 7:58 AM, Timo Walther  wrote:
>> 
>> Hi David,
>> 
>> thanks for your feedback. I was also skeptical about 1 char method names, I 
>> restored the `val()` method for now. If you read literature such as 
>> Wikipedia [1]: "literal is a notation for representing a fixed value in 
>> source code. Almost all programming languages have notations for atomic 
>> values". So they are also talking about "values".
>> 
>> Alteratively we could use `lit(12)` or `l(12)` but I'm not convinced that 
>> this is better.
>> 
>> Regards,
>> Timo
>> 
>> [1] https://en.wikipedia.org/wiki/Literal_(computer_programming)
>> 
>>> On 27.08.19 22:10, David Anderson wrote:
>>> TImo,
>>> 
>>> While it's not exactly pretty, I don't mind the $("field") construct.
>>> It's not particularly surprising. The v() method troubles me more; it
>>> looks mysterious. I think we would do better to have something more
>>> explicit. val() isn't much better -- val("foo") could be interpreted
>>> to mean the value of the "foo" column, or a literal string.
>>> 
>>> David
>>> 
 On Tue, Aug 27, 2019 at 5:45 PM Timo Walther  wrote:
 Hi David,
 
 thanks for your feedback. With the current design, the DSL would be free
 of any ambiguity but it is definitely more verbose esp. around defining
 values.
 
 I would be happy about further suggestions that make the DSL more
 readable. I'm also not sure if we go for `$()` and `v()` instead of more
 readable `ref()` and `val()`. This could maybe make it look less
 "alien", what do you think?
 
 Some people mentioned to overload certain methods for accepting values
 or column names. E.g. `$("field").isEqual("str")` but then string values
 could be confused with column names.
 
 Thanks,
 Timo
 
> On 27.08.19 17:34, David Anderson wrote:
> In general I'm in favor of anything that is going to make the Table
> API easier to learn and more predictable in its behavior. This
> proposal kind of falls in the middle. As someone who has spent hours
> in the crevices between the various flavors of the current
> implementations, I certainly view keeping the various APIs and DSLs
> more in sync, and making them less buggy, as highly desirable.
> 
> On the other hand, some of the details in the proposal do make the
> resulting user code less pretty and less approachable than the current
> Java DSL. In a training context it will be easy to teach, but I wonder
> if we can find a way to make it look less alien at first glance.
> 
> David
> 
>> On Wed, Aug 21, 2019 at 1:33 PM Timo Walther  wrote:
>> Hi everyone,
>> 
>> some of you might remember the discussion I started end of March [1]
>> about introducing a new Java DSL for Table API that is not embedded in a
>> string.
>> 
>> In particular, it solves the following issues:
>> 
>> - No possibility of deprecating functions
>> 
>> - Missing documentation for users
>> 
>> - Missing auto-completion for users
>> 
>> - Need to port the ExpressionParser from Scala to Java
>> 
>> - Scala symbols are deprecated! A Java DSL can also enable the Scala DSL
>> one.
>> 
>> Due to shift of priorities, we could not work on it in Flink 1.9 but the
>> feedback at that time was positive and we should aim for 1.10 to
>> simplify the API with this change.
>> 
>> We propose the following FLIP-55:
>> 
>> https://docs.google.com/document/d/1CfaaD3j8APJDKwzIT4YsX7QD2huKTB4xlA3vnMUFJmA/edit?usp=sharing
>> 
>> 
>> Thanks for any feedback,
>> 
>> Timo
>> 
>> [1]
>> https://lists.apache.org/thread.html/e6f31d7fa53890b91be0991c2da64556a91ef0fc9ab3ffa889dacc23@%3Cdev.flink.apache.org%3E
>> 
>> 



[jira] [Created] (FLINK-13891) Increment flink-shaded version

2019-08-29 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-13891:


 Summary: Increment flink-shaded version
 Key: FLINK-13891
 URL: https://issues.apache.org/jira/browse/FLINK-13891
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.10.0






--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13892) HistoryServerTest failed on Travis

2019-08-29 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-13892:


 Summary: HistoryServerTest failed on Travis
 Key: FLINK-13892
 URL: https://issues.apache.org/jira/browse/FLINK-13892
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / REST
Affects Versions: 1.10.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler


{code}
16:56:29.548 [ERROR] testHistoryServerIntegration[Flink version less than 1.4: 
false](org.apache.flink.runtime.webmonitor.history.HistoryServerTest)  Time 
elapsed: 0.69 s  <<< ERROR!
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
 
Cannot construct instance of 
`org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails`, problem: 
`java.lang.NullPointerException`
 at [Source: (String)"{"errors":["File not found."]}"; line: 1, column: 30]
at 
org.apache.flink.runtime.webmonitor.history.HistoryServerTest.testHistoryServerIntegration(HistoryServerTest.java:142)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.runtime.webmonitor.history.HistoryServerTest.testHistoryServerIntegration(HistoryServerTest.java:142)
{code}

https://api.travis-ci.org/v3/job/577860508/log.txt



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13893) S3 tests are failing due to missing jaxb dependency

2019-08-29 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-13893:


 Summary: S3 tests are failing due to missing jaxb dependency
 Key: FLINK-13893
 URL: https://issues.apache.org/jira/browse/FLINK-13893
 Project: Flink
  Issue Type: Sub-task
  Components: FileSystems
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.10.0


{code}
16:20:12.975 [ERROR] Tests run: 3, Failures: 0, Errors: 2, Skipped: 0, Time 
elapsed: 5.691 s <<< FAILURE! - in 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterExceptionITCase
16:20:12.992 [ERROR] 
testResumeAfterCommit(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterExceptionITCase)
  Time elapsed: 5.067 s  <<< ERROR!
java.lang.Exception: Unexpected exception, expected but 
was
at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterExceptionITCase.testResumeAfterCommit(HadoopS3RecoverableWriterExceptionITCase.java:165)
Caused by: java.lang.ClassNotFoundException: javax.xml.bind.JAXBException
at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterExceptionITCase.testResumeAfterCommit(HadoopS3RecoverableWriterExceptionITCase.java:165)

16:20:12.992 [ERROR] 
testResumeWithWrongOffset(org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterExceptionITCase)
  Time elapsed: 0.194 s  <<< ERROR!
java.lang.Exception: Unexpected exception, expected but 
was
at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterExceptionITCase.testResumeWithWrongOffset(HadoopS3RecoverableWriterExceptionITCase.java:185)
Caused by: java.lang.ClassNotFoundException: javax.xml.bind.JAXBException
at 
org.apache.flink.fs.s3hadoop.HadoopS3RecoverableWriterExceptionITCase.testResumeWithWrongOffset(HadoopS3RecoverableWriterExceptionITCase.java:185)
{code}

https://api.travis-ci.org/v3/job/577860512/log.txt



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-29 Thread Timo Walther

Hi Becket,

let me try to clarify some of your questions:

1. For every option, we also needed to think about how to represent it 
in a human readable format. We do not want to allow arbitrary nesting 
because that would easily allow to bypass the flattened hierarchy of 
config options (`session.memory.min`). The current design allows to 
represent every option type as a list. E.g.:


`myIntOption: 12` can be `myIntListOption: 12;12`
`myObjectOption: field=12,other=true` can be `myObjectListOption: 
field=12,other=true; field=12,other=true`
`myPropertyOption: key=str0,other=str1` can be `myPropertyListOption: 
key=str0,other=str1;key=str0,other=str1`


We need the atomic class for serialization/deserialization both in 
binary and string format.


ConfigOption is not present in the code base yet, but this FLIP is 
a preparation of making ExecutionConfig configurable. If you look into 
this class or also in existing table connectors/formats, you will see 
that each proposed option type has its requirements.


2. Regarding extending the description of ConfigOptions, the semantic of 
one option should be a super set of the other option. E.g. in Table API 
we might use general ExecutionConfig properties. But we would like to a) 
make external options more prominent in the Table API config docs to 
link people to properties they should pay attention b) notice about side 
effects. The core semantic of a property should not change.


3. The factory will not receive the entire configuration but works in a 
separate key space. For `myObjectOption` above, it would receive a 
configuration that consists of `field: 12` and `other: true`.


I agree. I will convert the document into a Wiki page today.

Thanks,
Timo

On 29.08.19 09:00, Stephan Ewen wrote:

@Becket One thing that may be non-obvious is that the Configuration class
also defines serialization / persistence logic at the moment. So it needs
to know the set of types it supports. That stands in the way of an
arbitrary generic map type.

@Timo I agree though that it seems a bit inconsistent to have one
collection orthogonal to the type (List) and another one bound to specific
types (Map).

On Thu, Aug 29, 2019 at 8:20 AM Becket Qin  wrote:


Hi Timo,

Thanks for the proposal. Sorry for the late comments, but I have a few
questions / comments.

1. Is a new field of isList necessary in the ConfigOption?
Would it be enough to just check the atomicClass to see if it is a List or
not?
Also, in the ConfigOption class case, are we always assume both key
and value types are String? Can we just apply the same to the
ConfigOption?
BTW, I did a quick search in the codebase but did not find any usage of
ConfigOption.

2. The same config name, but with two ConfigOption with different semantic
in different component seems super confusing. For example, when users set
both configs, they may have no idea one is overriding the other. There
might be two cases:
  - If it is just the same config used by different components to act
accordingly, it might be better to just have one config, but describe
clearly on how that config will be used.
  - If it is actually two configurations that can be set differently, I
think the config names should just be different.

3. Regarding the ConfigurableFactory, is the toConfiguration() method
pretty much means getConfiguration()? The toConfiguration() method sounds
like converting an object to a configuration, which only works if the
object does not contain any state / value. I am also wondering if there is
a real use case of this method. Because supposedly the configurations could
just be passed around to caller of this method.

Also, can you put the proposal into the FLIP wiki instead of in the Google
doc before voting? The FLIP wiki allows track the modification history and
has a more established structure to ensure nothing is missed.

Thanks,

Jiangjie (Becket) Qin

On Tue, Aug 27, 2019 at 11:34 PM Timo Walther  wrote:


Hi everyone,

I updated the FLIP proposal one more time as mentioned in the voting
thread. If there are no objections, I will start a new voting thread
tomorrow at 9am Berlin time.

Thanks,
Timo


On 22.08.19 14:19, Timo Walther wrote:

Hi everyone,

thanks for all the feedback we have received online and offline. It
showed that many people support the idea of evolving the Flink
configuration functionality. I'm almost sure that this FLIP will not
solve all issues but at least will improve the current status.

We've updated the document and replaced the Correlation part with the
concept of a ConfigOptionGroup that can provide all available options
of a group plus custom group validators for eager validation. For now,
this eager group validation will only be used at certain locations in
the Flink code but it prepares for maybe validating the entire global
configuration before submitting a job in the future.

Please take another look if you find time. I hope we can proceed with
the voting process if there are no objections.

Regar

Re: [DISCUSS] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-08-29 Thread Till Rohrmann
My understanding was that before starting the Flink process we call a
utility which calculates these values. I assume that this utility will do
the calculation based on a set of configured values (process memory, flink
memory, network memory etc.). Assuming that these values don't differ from
the values with which the JVM is started, it should be possible to
recompute them in the Flink process in order to set the values.



On Thu, Aug 29, 2019 at 11:29 AM Stephan Ewen  wrote:

> When computing the values in the JVM process after it started, how would
> you deal with values like Max Direct Memory, Metaspace size. native memory
> reservation (reduce heap size), etc? All the values that are parameters to
> the JVM process and that need to be supplied at process startup?
>
> On Wed, Aug 28, 2019 at 4:46 PM Till Rohrmann 
> wrote:
>
> > Thanks for the clarification. I have some more comments:
> >
> > - I would actually split the logic to compute the process memory
> > requirements and storing the values into two things. E.g. one could name
> > the former TaskExecutorProcessUtility and  the latter
> > TaskExecutorProcessMemory. But we can discuss this on the PR since it's
> > just a naming detail.
> >
> > - Generally, I'm not opposed to making configuration values overridable
> by
> > ENV variables. I think this is a very good idea and makes the
> > configurability of Flink processes easier. However, I think that adding
> > this functionality should not be part of this FLIP because it would
> simply
> > widen the scope unnecessarily.
> >
> > The reasons why I believe it is unnecessary are the following: For Yarn
> we
> > already create write a flink-conf.yaml which could be populated with the
> > memory settings. For the other processes it should not make a difference
> > whether the loaded Configuration is populated with the memory settings
> from
> > ENV variables or by using TaskExecutorProcessUtility to compute the
> missing
> > values from the loaded configuration. If the latter would not be possible
> > (wrong or missing configuration values), then we should not have been
> able
> > to actually start the process in the first place.
> >
> > - Concerning the memory reservation: I agree with you that we need the
> > memory reservation functionality to make streaming jobs work with
> "managed"
> > memory. However, w/o this functionality the whole Flip would already
> bring
> > a good amount of improvements to our users when running batch jobs.
> > Moreover, by keeping the scope smaller we can complete the FLIP faster.
> > Hence, I would propose to address the memory reservation functionality
> as a
> > follow up FLIP (which Yu is working on if I'm not mistaken).
> >
> > Cheers,
> > Till
> >
> > On Wed, Aug 28, 2019 at 11:43 AM Yang Wang 
> wrote:
> >
> > > Just add my 2 cents.
> > >
> > > Using environment variables to override the configuration for different
> > > taskmanagers is better.
> > > We do not need to generate dedicated flink-conf.yaml for all
> > taskmanagers.
> > > A common flink-conf.yam and different environment variables are enough.
> > > By reducing the distributed cached files, it could make launching a
> > > taskmanager faster.
> > >
> > > Stephan gives a good suggestion that we could move the logic into
> > > "GlobalConfiguration.loadConfig()" method.
> > > Maybe the client could also benefit from this. Different users do not
> > have
> > > to export FLINK_CONF_DIR to update few config options.
> > >
> > >
> > > Best,
> > > Yang
> > >
> > > Stephan Ewen  于2019年8月28日周三 上午1:21写道:
> > >
> > > > One note on the Environment Variables and Configuration discussion.
> > > >
> > > > My understanding is that passed ENV variables are added to the
> > > > configuration in the "GlobalConfiguration.loadConfig()" method (or
> > > > similar).
> > > > For all the code inside Flink, it looks like the data was in the
> config
> > > to
> > > > start with, just that the scripts that compute the variables can pass
> > the
> > > > values to the process without actually needing to write a file.
> > > >
> > > > For example the "GlobalConfiguration.loadConfig()" method would take
> > any
> > > > ENV variable prefixed with "flink" and add it as a config key.
> > > > "flink_taskmanager_memory_size=2g" would become
> > "taskmanager.memory.size:
> > > > 2g".
> > > >
> > > >
> > > > On Tue, Aug 27, 2019 at 4:05 PM Xintong Song 
> > > > wrote:
> > > >
> > > > > Thanks for the comments, Till.
> > > > >
> > > > > I've also seen your comments on the wiki page, but let's keep the
> > > > > discussion here.
> > > > >
> > > > > - Regarding 'TaskExecutorSpecifics', how do you think about naming
> it
> > > > > 'TaskExecutorResourceSpecifics'.
> > > > > - Regarding passing memory configurations into task executors, I'm
> in
> > > > favor
> > > > > of do it via environment variables rather than configurations, with
> > the
> > > > > following two reasons.
> > > > >   - It is easier to keep the memory options once calculate not to

Re: [DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

2019-08-29 Thread Timo Walther
I'm fine with `lit()`. Regarding `col()`, I initially suggested `ref()` 
but I think Fabian and Dawid liked single char methods for the most 
commonly used expressions.


Btw, what is your opinion on the names of commonly used methods such as 
`isEqual`, `isGreaterOrEqual`? Are we fine with the current naming.
In theory we could make them shorter like `equals(), greaterOrEqual()` 
or even shorter to `eq`, `gt`, `gte`?


Thanks,
Timo


On 29.08.19 11:51, Aljoscha Krettek wrote:

Overall, this is a very nice development that should also simplify the code 
base once we deprecate the expression parser!

Regarding method names, I agree with Seth that values/literals should use 
something like “lit()”. I also think that for column references we could use 
“col()” to make it clear that it is a column reference. What do you think?

Aljoscha


On 28. Aug 2019, at 15:59, Seth Wiesman  wrote:

I would prefer ‘lit()’ over  ‘val()’ since val is a keyword in Scala. Assuming 
the intention is to make the dsl ergonomic for Scala developers.

Seth


On Aug 28, 2019, at 7:58 AM, Timo Walther  wrote:

Hi David,

thanks for your feedback. I was also skeptical about 1 char method names, I restored the `val()` 
method for now. If you read literature such as Wikipedia [1]: "literal is a notation for 
representing a fixed value in source code. Almost all programming languages have notations for 
atomic values". So they are also talking about "values".

Alteratively we could use `lit(12)` or `l(12)` but I'm not convinced that this 
is better.

Regards,
Timo

[1] https://en.wikipedia.org/wiki/Literal_(computer_programming)


On 27.08.19 22:10, David Anderson wrote:
TImo,

While it's not exactly pretty, I don't mind the $("field") construct.
It's not particularly surprising. The v() method troubles me more; it
looks mysterious. I think we would do better to have something more
explicit. val() isn't much better -- val("foo") could be interpreted
to mean the value of the "foo" column, or a literal string.

David


On Tue, Aug 27, 2019 at 5:45 PM Timo Walther  wrote:
Hi David,

thanks for your feedback. With the current design, the DSL would be free
of any ambiguity but it is definitely more verbose esp. around defining
values.

I would be happy about further suggestions that make the DSL more
readable. I'm also not sure if we go for `$()` and `v()` instead of more
readable `ref()` and `val()`. This could maybe make it look less
"alien", what do you think?

Some people mentioned to overload certain methods for accepting values
or column names. E.g. `$("field").isEqual("str")` but then string values
could be confused with column names.

Thanks,
Timo


On 27.08.19 17:34, David Anderson wrote:
In general I'm in favor of anything that is going to make the Table
API easier to learn and more predictable in its behavior. This
proposal kind of falls in the middle. As someone who has spent hours
in the crevices between the various flavors of the current
implementations, I certainly view keeping the various APIs and DSLs
more in sync, and making them less buggy, as highly desirable.

On the other hand, some of the details in the proposal do make the
resulting user code less pretty and less approachable than the current
Java DSL. In a training context it will be easy to teach, but I wonder
if we can find a way to make it look less alien at first glance.

David


On Wed, Aug 21, 2019 at 1:33 PM Timo Walther  wrote:
Hi everyone,

some of you might remember the discussion I started end of March [1]
about introducing a new Java DSL for Table API that is not embedded in a
string.

In particular, it solves the following issues:

- No possibility of deprecating functions

- Missing documentation for users

- Missing auto-completion for users

- Need to port the ExpressionParser from Scala to Java

- Scala symbols are deprecated! A Java DSL can also enable the Scala DSL
one.

Due to shift of priorities, we could not work on it in Flink 1.9 but the
feedback at that time was positive and we should aim for 1.10 to
simplify the API with this change.

We propose the following FLIP-55:

https://docs.google.com/document/d/1CfaaD3j8APJDKwzIT4YsX7QD2huKTB4xlA3vnMUFJmA/edit?usp=sharing


Thanks for any feedback,

Timo

[1]
https://lists.apache.org/thread.html/e6f31d7fa53890b91be0991c2da64556a91ef0fc9ab3ffa889dacc23@%3Cdev.flink.apache.org%3E





[jira] [Created] (FLINK-13894) Web Ui add log url for subtask of vertex

2019-08-29 Thread lining (Jira)
lining created FLINK-13894:
--

 Summary: Web Ui add log url for subtask of vertex
 Key: FLINK-13894
 URL: https://issues.apache.org/jira/browse/FLINK-13894
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Reporter: lining






--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13895) Client does not exit when bin/yarn-session.sh come fail

2019-08-29 Thread Yu Wang (Jira)
Yu Wang created FLINK-13895:
---

 Summary: Client does not exit when bin/yarn-session.sh come fail
 Key: FLINK-13895
 URL: https://issues.apache.org/jira/browse/FLINK-13895
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.9.0
Reporter: Yu Wang


2019-08-29 09:42:00,589 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying 
cluster, current state ACCEPTED
2019-08-29 09:42:04,718 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Error while running the Flink Yarn session.
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy 
Yarn session cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:385)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:616)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$3(FlinkYarnSessionCli.java:844)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:844)
Caused by: 
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1565802461003_0608 failed 1 
times due to AM Container for appattempt_1565802461003_0608_01 exited with  
exitCode: 1
For more detailed output, check application tracking 
page:https://hadoop-btnn9001.eniot.io:8090/cluster/app/application_1565802461003_0608Then,
 click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e35_1565802461003_0608_01_01
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
at org.apache.hadoop.util.Shell.run(Shell.java:456)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
at 
org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:387)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Shell output: main : command provided 1
main : run as user is flinktest
main : requested yarn user is flinktest


Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further 
investigate the issue:
yarn logs -applicationId application_1565802461003_0608
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1024)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:507)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:378)
... 7 more
2019-08-29 09:42:04,723 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cancelling 
deployment from Deployment Failure Hook
2019-08-29 09:42:04,723 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Killing YARN 
application
2019-08-29 09:42:04,729 INFO  org.apache.hadoop.io.retry.RetryInvocationHandler 
- Exception while invoking forceKillApplication of class 
ApplicationClientProtocolPBClientImpl over rm1. Trying to fail over immediately.
java.io.IOException: The client is stopped
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1508)
at org.apache.hadoop.ipc.Client.call(Client.java:1452)
at org.apache.hadoop.ipc.Client.call(Client.java:1413)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy7.forceKillApplication(Unknown Source)
at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.forceKillApplication(ApplicationClientProtocolPBClientImpl.java:176)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  

Re: [DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

2019-08-29 Thread Fabian Hueske
Hi,

IMO, we should define what we would like to optimize for:
1) easy-to-get-started experience or
2) productivity and ease-of-use

While 1) is certainly important, I think we should put more emphasis on
goal 2).
That's why I favor as short as possible names for commonly used methods
like column references and literals/values.
These are used *many* times in *every* query.
Every user who uses the API for more than 30 mins will know what $() or v()
(or whatever method names we come up with) are used for and everybody who
doesn't know can have a look at the JavaDocs or regular documentation.
Shorter method names are not only about increasing the speed to write a
query, but also reducing clutter that needs to be parsed to understand an
expression / query.

I'm OK with descriptive names for other expressions like call(),
isEqualTo() (although these could be the commonly used eq(), gte(), etc.),
and so on but column references (and literals) should be as lightweight as
possible, IMO.

Cheers,
Fabian

Am Do., 29. Aug. 2019 um 12:15 Uhr schrieb Timo Walther :

> I'm fine with `lit()`. Regarding `col()`, I initially suggested `ref()`
> but I think Fabian and Dawid liked single char methods for the most
> commonly used expressions.
>
> Btw, what is your opinion on the names of commonly used methods such as
> `isEqual`, `isGreaterOrEqual`? Are we fine with the current naming.
> In theory we could make them shorter like `equals(), greaterOrEqual()`
> or even shorter to `eq`, `gt`, `gte`?
>
> Thanks,
> Timo
>
>
> On 29.08.19 11:51, Aljoscha Krettek wrote:
> > Overall, this is a very nice development that should also simplify the
> code base once we deprecate the expression parser!
> >
> > Regarding method names, I agree with Seth that values/literals should
> use something like “lit()”. I also think that for column references we
> could use “col()” to make it clear that it is a column reference. What do
> you think?
> >
> > Aljoscha
> >
> >> On 28. Aug 2019, at 15:59, Seth Wiesman  wrote:
> >>
> >> I would prefer ‘lit()’ over  ‘val()’ since val is a keyword in Scala.
> Assuming the intention is to make the dsl ergonomic for Scala developers.
> >>
> >> Seth
> >>
> >>> On Aug 28, 2019, at 7:58 AM, Timo Walther  wrote:
> >>>
> >>> Hi David,
> >>>
> >>> thanks for your feedback. I was also skeptical about 1 char method
> names, I restored the `val()` method for now. If you read literature such
> as Wikipedia [1]: "literal is a notation for representing a fixed value in
> source code. Almost all programming languages have notations for atomic
> values". So they are also talking about "values".
> >>>
> >>> Alteratively we could use `lit(12)` or `l(12)` but I'm not convinced
> that this is better.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>> [1] https://en.wikipedia.org/wiki/Literal_(computer_programming)
> >>>
>  On 27.08.19 22:10, David Anderson wrote:
>  TImo,
> 
>  While it's not exactly pretty, I don't mind the $("field") construct.
>  It's not particularly surprising. The v() method troubles me more; it
>  looks mysterious. I think we would do better to have something more
>  explicit. val() isn't much better -- val("foo") could be interpreted
>  to mean the value of the "foo" column, or a literal string.
> 
>  David
> 
> > On Tue, Aug 27, 2019 at 5:45 PM Timo Walther 
> wrote:
> > Hi David,
> >
> > thanks for your feedback. With the current design, the DSL would be
> free
> > of any ambiguity but it is definitely more verbose esp. around
> defining
> > values.
> >
> > I would be happy about further suggestions that make the DSL more
> > readable. I'm also not sure if we go for `$()` and `v()` instead of
> more
> > readable `ref()` and `val()`. This could maybe make it look less
> > "alien", what do you think?
> >
> > Some people mentioned to overload certain methods for accepting
> values
> > or column names. E.g. `$("field").isEqual("str")` but then string
> values
> > could be confused with column names.
> >
> > Thanks,
> > Timo
> >
> >> On 27.08.19 17:34, David Anderson wrote:
> >> In general I'm in favor of anything that is going to make the Table
> >> API easier to learn and more predictable in its behavior. This
> >> proposal kind of falls in the middle. As someone who has spent hours
> >> in the crevices between the various flavors of the current
> >> implementations, I certainly view keeping the various APIs and DSLs
> >> more in sync, and making them less buggy, as highly desirable.
> >>
> >> On the other hand, some of the details in the proposal do make the
> >> resulting user code less pretty and less approachable than the
> current
> >> Java DSL. In a training context it will be easy to teach, but I
> wonder
> >> if we can find a way to make it look less alien at first glance.
> >>
> >> David
> >>
> >>> On Wed, Aug 21, 2019 at

[jira] [Created] (FLINK-13896) Scala 2.11 maven compile should target Java 1.8

2019-08-29 Thread Terry Wang (Jira)
Terry Wang created FLINK-13896:
--

 Summary: Scala 2.11 maven compile should target Java 1.8
 Key: FLINK-13896
 URL: https://issues.apache.org/jira/browse/FLINK-13896
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.9.0
 Environment: When setting TableEnvironment in scala as follwing:

 
{code:java}
// we can repoduce this problem by put following code in 
// org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImplTest

@Test
def testCreateEnvironment(): Unit = {
 val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
 val tEnv = TableEnvironment.create(settings);
}
{code}

Then mvn test would fail with an error message like:

 

error: Static methods in interface require -target:JVM-1.8

 

We can fix this bug by adding:


 
 -target:jvm-1.8
 


 

to scala-maven-plugin 

 

 

 
Reporter: Terry Wang






--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-08-29 Thread Stephan Ewen
I see. Under the assumption of strict determinism that should work.

The original proposal had this point "don't compute inside the TM, compute
outside and supply a full config", because that sounded more intuitive.

On Thu, Aug 29, 2019 at 12:15 PM Till Rohrmann  wrote:

> My understanding was that before starting the Flink process we call a
> utility which calculates these values. I assume that this utility will do
> the calculation based on a set of configured values (process memory, flink
> memory, network memory etc.). Assuming that these values don't differ from
> the values with which the JVM is started, it should be possible to
> recompute them in the Flink process in order to set the values.
>
>
>
> On Thu, Aug 29, 2019 at 11:29 AM Stephan Ewen  wrote:
>
> > When computing the values in the JVM process after it started, how would
> > you deal with values like Max Direct Memory, Metaspace size. native
> memory
> > reservation (reduce heap size), etc? All the values that are parameters
> to
> > the JVM process and that need to be supplied at process startup?
> >
> > On Wed, Aug 28, 2019 at 4:46 PM Till Rohrmann 
> > wrote:
> >
> > > Thanks for the clarification. I have some more comments:
> > >
> > > - I would actually split the logic to compute the process memory
> > > requirements and storing the values into two things. E.g. one could
> name
> > > the former TaskExecutorProcessUtility and  the latter
> > > TaskExecutorProcessMemory. But we can discuss this on the PR since it's
> > > just a naming detail.
> > >
> > > - Generally, I'm not opposed to making configuration values overridable
> > by
> > > ENV variables. I think this is a very good idea and makes the
> > > configurability of Flink processes easier. However, I think that adding
> > > this functionality should not be part of this FLIP because it would
> > simply
> > > widen the scope unnecessarily.
> > >
> > > The reasons why I believe it is unnecessary are the following: For Yarn
> > we
> > > already create write a flink-conf.yaml which could be populated with
> the
> > > memory settings. For the other processes it should not make a
> difference
> > > whether the loaded Configuration is populated with the memory settings
> > from
> > > ENV variables or by using TaskExecutorProcessUtility to compute the
> > missing
> > > values from the loaded configuration. If the latter would not be
> possible
> > > (wrong or missing configuration values), then we should not have been
> > able
> > > to actually start the process in the first place.
> > >
> > > - Concerning the memory reservation: I agree with you that we need the
> > > memory reservation functionality to make streaming jobs work with
> > "managed"
> > > memory. However, w/o this functionality the whole Flip would already
> > bring
> > > a good amount of improvements to our users when running batch jobs.
> > > Moreover, by keeping the scope smaller we can complete the FLIP faster.
> > > Hence, I would propose to address the memory reservation functionality
> > as a
> > > follow up FLIP (which Yu is working on if I'm not mistaken).
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Aug 28, 2019 at 11:43 AM Yang Wang 
> > wrote:
> > >
> > > > Just add my 2 cents.
> > > >
> > > > Using environment variables to override the configuration for
> different
> > > > taskmanagers is better.
> > > > We do not need to generate dedicated flink-conf.yaml for all
> > > taskmanagers.
> > > > A common flink-conf.yam and different environment variables are
> enough.
> > > > By reducing the distributed cached files, it could make launching a
> > > > taskmanager faster.
> > > >
> > > > Stephan gives a good suggestion that we could move the logic into
> > > > "GlobalConfiguration.loadConfig()" method.
> > > > Maybe the client could also benefit from this. Different users do not
> > > have
> > > > to export FLINK_CONF_DIR to update few config options.
> > > >
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > > Stephan Ewen  于2019年8月28日周三 上午1:21写道:
> > > >
> > > > > One note on the Environment Variables and Configuration discussion.
> > > > >
> > > > > My understanding is that passed ENV variables are added to the
> > > > > configuration in the "GlobalConfiguration.loadConfig()" method (or
> > > > > similar).
> > > > > For all the code inside Flink, it looks like the data was in the
> > config
> > > > to
> > > > > start with, just that the scripts that compute the variables can
> pass
> > > the
> > > > > values to the process without actually needing to write a file.
> > > > >
> > > > > For example the "GlobalConfiguration.loadConfig()" method would
> take
> > > any
> > > > > ENV variable prefixed with "flink" and add it as a config key.
> > > > > "flink_taskmanager_memory_size=2g" would become
> > > "taskmanager.memory.size:
> > > > > 2g".
> > > > >
> > > > >
> > > > > On Tue, Aug 27, 2019 at 4:05 PM Xintong Song <
> tonysong...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thanks for the comments, Till.
> 

Re: [DISCUSS] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-08-29 Thread Till Rohrmann
I think our goal should be that the configuration is fully specified when
the process is started. By considering the internal calculation step to be
rather validate existing values and calculate missing ones, these two
proposal shouldn't even conflict (given determinism).

Since we don't want to change an existing flink-conf.yaml, specifying the
full configuration would require to pass in the options differently.

One way could be the ENV variables approach. The reason why I'm trying to
exclude this feature from the FLIP is that I believe it needs a bit more
discussion. Just some questions which come to my mind: What would be the
exact format (FLINK_KEY_NAME)? Would we support a dot separator which is
supported by some systems (FLINK.KEY.NAME)? If we accept the dot separator
what would be the order of precedence if there are two ENV variables
defined (FLINK_KEY_NAME and FLINK.KEY.NAME)? What is the precedence of env
variable vs. dynamic configuration value specified via -D?

Another approach could be to pass in the dynamic configuration values via
`-Dkey=value` to the Flink process. For that we don't have to change
anything because the functionality already exists.

Cheers,
Till

On Thu, Aug 29, 2019 at 12:50 PM Stephan Ewen  wrote:

> I see. Under the assumption of strict determinism that should work.
>
> The original proposal had this point "don't compute inside the TM, compute
> outside and supply a full config", because that sounded more intuitive.
>
> On Thu, Aug 29, 2019 at 12:15 PM Till Rohrmann 
> wrote:
>
> > My understanding was that before starting the Flink process we call a
> > utility which calculates these values. I assume that this utility will do
> > the calculation based on a set of configured values (process memory,
> flink
> > memory, network memory etc.). Assuming that these values don't differ
> from
> > the values with which the JVM is started, it should be possible to
> > recompute them in the Flink process in order to set the values.
> >
> >
> >
> > On Thu, Aug 29, 2019 at 11:29 AM Stephan Ewen  wrote:
> >
> > > When computing the values in the JVM process after it started, how
> would
> > > you deal with values like Max Direct Memory, Metaspace size. native
> > memory
> > > reservation (reduce heap size), etc? All the values that are parameters
> > to
> > > the JVM process and that need to be supplied at process startup?
> > >
> > > On Wed, Aug 28, 2019 at 4:46 PM Till Rohrmann 
> > > wrote:
> > >
> > > > Thanks for the clarification. I have some more comments:
> > > >
> > > > - I would actually split the logic to compute the process memory
> > > > requirements and storing the values into two things. E.g. one could
> > name
> > > > the former TaskExecutorProcessUtility and  the latter
> > > > TaskExecutorProcessMemory. But we can discuss this on the PR since
> it's
> > > > just a naming detail.
> > > >
> > > > - Generally, I'm not opposed to making configuration values
> overridable
> > > by
> > > > ENV variables. I think this is a very good idea and makes the
> > > > configurability of Flink processes easier. However, I think that
> adding
> > > > this functionality should not be part of this FLIP because it would
> > > simply
> > > > widen the scope unnecessarily.
> > > >
> > > > The reasons why I believe it is unnecessary are the following: For
> Yarn
> > > we
> > > > already create write a flink-conf.yaml which could be populated with
> > the
> > > > memory settings. For the other processes it should not make a
> > difference
> > > > whether the loaded Configuration is populated with the memory
> settings
> > > from
> > > > ENV variables or by using TaskExecutorProcessUtility to compute the
> > > missing
> > > > values from the loaded configuration. If the latter would not be
> > possible
> > > > (wrong or missing configuration values), then we should not have been
> > > able
> > > > to actually start the process in the first place.
> > > >
> > > > - Concerning the memory reservation: I agree with you that we need
> the
> > > > memory reservation functionality to make streaming jobs work with
> > > "managed"
> > > > memory. However, w/o this functionality the whole Flip would already
> > > bring
> > > > a good amount of improvements to our users when running batch jobs.
> > > > Moreover, by keeping the scope smaller we can complete the FLIP
> faster.
> > > > Hence, I would propose to address the memory reservation
> functionality
> > > as a
> > > > follow up FLIP (which Yu is working on if I'm not mistaken).
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Wed, Aug 28, 2019 at 11:43 AM Yang Wang 
> > > wrote:
> > > >
> > > > > Just add my 2 cents.
> > > > >
> > > > > Using environment variables to override the configuration for
> > different
> > > > > taskmanagers is better.
> > > > > We do not need to generate dedicated flink-conf.yaml for all
> > > > taskmanagers.
> > > > > A common flink-conf.yam and different environment variables are
> > enough.
> > > > > By reduci

Re: [DISCUSS] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-08-29 Thread Till Rohrmann
What I forgot to add is that we could tackle specifying the configuration
fully in an incremental way and that the full specification should be the
desired end state.

On Thu, Aug 29, 2019 at 1:33 PM Till Rohrmann  wrote:

> I think our goal should be that the configuration is fully specified when
> the process is started. By considering the internal calculation step to be
> rather validate existing values and calculate missing ones, these two
> proposal shouldn't even conflict (given determinism).
>
> Since we don't want to change an existing flink-conf.yaml, specifying the
> full configuration would require to pass in the options differently.
>
> One way could be the ENV variables approach. The reason why I'm trying to
> exclude this feature from the FLIP is that I believe it needs a bit more
> discussion. Just some questions which come to my mind: What would be the
> exact format (FLINK_KEY_NAME)? Would we support a dot separator which is
> supported by some systems (FLINK.KEY.NAME)? If we accept the dot
> separator what would be the order of precedence if there are two ENV
> variables defined (FLINK_KEY_NAME and FLINK.KEY.NAME)? What is the
> precedence of env variable vs. dynamic configuration value specified via -D?
>
> Another approach could be to pass in the dynamic configuration values via
> `-Dkey=value` to the Flink process. For that we don't have to change
> anything because the functionality already exists.
>
> Cheers,
> Till
>
> On Thu, Aug 29, 2019 at 12:50 PM Stephan Ewen  wrote:
>
>> I see. Under the assumption of strict determinism that should work.
>>
>> The original proposal had this point "don't compute inside the TM, compute
>> outside and supply a full config", because that sounded more intuitive.
>>
>> On Thu, Aug 29, 2019 at 12:15 PM Till Rohrmann 
>> wrote:
>>
>> > My understanding was that before starting the Flink process we call a
>> > utility which calculates these values. I assume that this utility will
>> do
>> > the calculation based on a set of configured values (process memory,
>> flink
>> > memory, network memory etc.). Assuming that these values don't differ
>> from
>> > the values with which the JVM is started, it should be possible to
>> > recompute them in the Flink process in order to set the values.
>> >
>> >
>> >
>> > On Thu, Aug 29, 2019 at 11:29 AM Stephan Ewen  wrote:
>> >
>> > > When computing the values in the JVM process after it started, how
>> would
>> > > you deal with values like Max Direct Memory, Metaspace size. native
>> > memory
>> > > reservation (reduce heap size), etc? All the values that are
>> parameters
>> > to
>> > > the JVM process and that need to be supplied at process startup?
>> > >
>> > > On Wed, Aug 28, 2019 at 4:46 PM Till Rohrmann 
>> > > wrote:
>> > >
>> > > > Thanks for the clarification. I have some more comments:
>> > > >
>> > > > - I would actually split the logic to compute the process memory
>> > > > requirements and storing the values into two things. E.g. one could
>> > name
>> > > > the former TaskExecutorProcessUtility and  the latter
>> > > > TaskExecutorProcessMemory. But we can discuss this on the PR since
>> it's
>> > > > just a naming detail.
>> > > >
>> > > > - Generally, I'm not opposed to making configuration values
>> overridable
>> > > by
>> > > > ENV variables. I think this is a very good idea and makes the
>> > > > configurability of Flink processes easier. However, I think that
>> adding
>> > > > this functionality should not be part of this FLIP because it would
>> > > simply
>> > > > widen the scope unnecessarily.
>> > > >
>> > > > The reasons why I believe it is unnecessary are the following: For
>> Yarn
>> > > we
>> > > > already create write a flink-conf.yaml which could be populated with
>> > the
>> > > > memory settings. For the other processes it should not make a
>> > difference
>> > > > whether the loaded Configuration is populated with the memory
>> settings
>> > > from
>> > > > ENV variables or by using TaskExecutorProcessUtility to compute the
>> > > missing
>> > > > values from the loaded configuration. If the latter would not be
>> > possible
>> > > > (wrong or missing configuration values), then we should not have
>> been
>> > > able
>> > > > to actually start the process in the first place.
>> > > >
>> > > > - Concerning the memory reservation: I agree with you that we need
>> the
>> > > > memory reservation functionality to make streaming jobs work with
>> > > "managed"
>> > > > memory. However, w/o this functionality the whole Flip would already
>> > > bring
>> > > > a good amount of improvements to our users when running batch jobs.
>> > > > Moreover, by keeping the scope smaller we can complete the FLIP
>> faster.
>> > > > Hence, I would propose to address the memory reservation
>> functionality
>> > > as a
>> > > > follow up FLIP (which Yu is working on if I'm not mistaken).
>> > > >
>> > > > Cheers,
>> > > > Till
>> > > >
>> > > > On Wed, Aug 28, 2019 at 11:43 AM Yang Wang 
>> > > wrote:

[jira] [Created] (FLINK-13897) OSS FS NOTICE file is placed in wrong directory

2019-08-29 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-13897:


 Summary: OSS FS NOTICE file is placed in wrong directory
 Key: FLINK-13897
 URL: https://issues.apache.org/jira/browse/FLINK-13897
 Project: Flink
  Issue Type: Bug
  Components: Build System, FileSystems
Affects Versions: 1.8.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.8.2, 1.10.0, 1.9.1


The NOTICE file for the OSS filesystem is directly in the resources directory, 
and not in META-INF where it belongs. As a result the contained dependencies 
are not properly listed in NOTICE-binary.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [CODE-STYLE] Builder pattern

2019-08-29 Thread Gyula Fóra
Hi all,

Thank you all for the valuable feedback, let me try to summarize the rough
agreement:

If there is a builder for class A, then A should:
 -  Have only private ctor -> force usage of the builder
 - Be immutable (no setters)
 - Have a public static .builder(required params if not too many) method
that creates a new builder instance.

The builder:
 - Should follow a mutable pattern (return this after setting property) to
simplify code
 - Argument setter methods should follow some reasonable convention (setX,
withX, x...) whatever makes most sense in the context
 - There should be a single .build() method that constructs the instance of
the target class

These guidelines mostly applicable for internal builders. User API builders
should also follow this in most cases but we should allow deviations when
it makes code more readable or the API more user friendly.

If you have more comments don't hesitate to add them here, if everyone
agrees I will open a PR in the next few days to add this to the code-style
guide!

Cheers,
Gyula

On Wed, Aug 28, 2019 at 1:37 PM Piotr Nowojski  wrote:

> > For Piotr's comment 4. I. I agree with Klou that this sounds rather like
> a
> > problem of the builder's usage than a builder problem. I think such a
> > scenario can easily be solved by introducing a transfer object.
>
> It could be solved, but that would mean we have some kind of
> builder/factory/descriptor that creates a factory/builder that creates a
> final object. I think in this specific example I would prefer to add
> missing runtime parameters in the build/create() method instead of asking
> people implementing an operator to wrap it into two layers of indirection.
>
> However let’s not clutter this discussion, I might be missing something.
> Feel free to open a Jira tickets/start a new design discussion about this
> pattern (it’s currently partially implemented in the form of
> `StreamOperatorFactory` [1]).
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-11974 <
> https://issues.apache.org/jira/browse/FLINK-11974>
>
> > On 27 Aug 2019, at 14:43, Till Rohrmann  wrote:
> >
> > Hi all,
> >
> > I would be in favour of the following convention
> >
> > 1. a) static method for builder creation
> > 2. No strict rule because especially for boolean flags it might make
> sense
> > to have something like `enableX()` or `withY()` where one doesn't
> specify a
> > concrete value.
> > 3. Mutable builders but if there is a good reason to follow the immutable
> > approach then I wouldn't forbid it.
> > 4. Private constructor if there is a builder and no backwards
> compatibility
> > constraint
> > 5. No setX methods on the instance to create
> >
> > For Piotr's comment 4. I. I agree with Klou that this sounds rather like
> a
> > problem of the builder's usage than a builder problem. I think such a
> > scenario can easily be solved by introducing a transfer object.
> >
> > Cheers,
> > Till
> >
> > On Tue, Aug 27, 2019 at 1:46 PM Timo Walther  wrote:
> >
> >> Hi all,
> >>
> >> great to put this code style discussion on the mailing list because I
> >> also have found this style inconsistent in the past.
> >>
> >> Regarding Gyula's suggestions:
> >> 1. a static method `builder()` I think IDEs are also hightlight methods
> >> with this name
> >> 2. I would vote for a more declarative `propertyA(...).propertyB(...)`
> >> approach instead of setters because most methods should be setters.
> >> However, if implementers want to add methods such as `addField(..)`,
> >> `useProcessingTime()` this sounds also fine to me.
> >> 3. mutable
> >> Regarding Dawid's suggestions:
> >> 4. regarding required and optional parameters, I would allow both
> options
> >> 5. always end with `build()`
> >>
> >> Thanks,
> >> Timo
> >>
> >> On 27.08.19 10:04, Kostas Kloudas wrote:
> >>> Hi all,
> >>>
> >>> I agree with Arvid, although for point 2 I would be less strict.
> >>>
> >>> @Piotr, for the side note you mentioned, and from the description you
> >>> mention in the mail for example I,
> >>> it seems that the need to pass parameters in the build() is not an
> >>> inherent need of the build pattern but it
> >>> can be mitigated by just creating sth like a StreamOperatorConfig (and
> >>> not the operator builder itself) on the
> >>> client, serialize it, and then at the TM, use the actual
> >>> StreamOperator builder with that config to create the
> >>> operator. There you can have all the needed parameters and also
> >>> perform the validation that
> >>> Dawid mention.
> >>>
> >>> Again, this is just from the summary you provided, not from looking at
> >>> the code, so I may be missing something.
> >>>
> >>> On the validation, I think that it should happen at the build(), as
> >>> this is the only place where
> >>> we know all the parameters.
> >>>
> >>> Cheers,
> >>> Kostas
> >>>
> >>> On Tue, Aug 27, 2019 at 9:47 AM Arvid Heise 
> >> wrote:
>  Hi all,
> 
>  I'd like to differentiate between API level builder usage and
> "i

[jira] [Created] (FLINK-13898) Migrate restart strategy config constants to RestartStrategyOptions

2019-08-29 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-13898:
-

 Summary: Migrate restart strategy config constants to 
RestartStrategyOptions
 Key: FLINK-13898
 URL: https://issues.apache.org/jira/browse/FLINK-13898
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration, Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.10.0


In order to improve detectability of restart strategy related config constants 
I propose to migrate them to {{ConfigOptions}}. For that to happen we should 
create a {{RestartStrategyOptions}} class which contains {{ConfigOptions}} for 
all restart strategy related config constants.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13899) Add SQL DDL for Elasticsearch 5.X version

2019-08-29 Thread limbo (Jira)
limbo created FLINK-13899:
-

 Summary: Add SQL DDL for Elasticsearch 5.X version
 Key: FLINK-13899
 URL: https://issues.apache.org/jira/browse/FLINK-13899
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch
Affects Versions: 1.9.0
Reporter: limbo
 Fix For: 1.9.1


Hi, I need Elasticsearch 5.X verison  DDL to connect our old version 
Elasticsearch



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-50: Spill-able Heap Keyed State Backend

2019-08-29 Thread Zili Chen
Hi Yu,

Notice that the wiki is still marked as "*Under Discussion*" state.

I think you can update it correspondingly.

Best,
tison.


Yu Li  于2019年8月20日周二 下午10:28写道:

> Sorry for the lag but since we've got a consensus days ago, I started a
> vote thread which will have a result by EOD, thus I'm closing this
> discussion thread. Thanks all for the participation and
> comments/suggestions!
>
> Best Regards,
> Yu
>
>
> On Fri, 16 Aug 2019 at 09:09, Till Rohrmann  wrote:
>
> > +1 for this FLIP and the feature. I think this feature will be super
> > helpful for many Flink users.
> >
> > Once the SpillableHeapKeyedStateBackend has proven to be superior to the
> > HeapKeyedStateBackend we should think about removing the latter
> completely
> > to reduce maintenance burden.
> >
> > Cheers,
> > Till
> >
> > On Fri, Aug 16, 2019 at 4:06 AM Congxian Qiu 
> > wrote:
> >
> > > Big +1 for this feature.
> > >
> > > This FLIP can help improves at least the following two scenarios:
> > > - Temporary data peak when using Heap StateBackend
> > > - Heap State Backend has better performance than RocksDBStateBackend,
> > > especially on SATA disk. there are some guys ever told me that they
> > > increased the parallelism of operators(and use HeapStateBackend) other
> > than
> > > use RocksDBStateBackend to get better performance. But increase
> > parallelism
> > > will have some other problems, after this FLIP, we can run Flink Job
> with
> > > the same parallelism as RocksDBStateBackend and get better performance
> > > also.
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > Yu Li  于2019年8月16日周五 上午12:14写道:
> > >
> > > > Thanks all for the reviews and comments!
> > > >
> > > > bq. From the implementation plan, it looks like this exists purely
> in a
> > > new
> > > > module and does not require any changes in other parts of Flink's
> code.
> > > Can
> > > > you confirm that?
> > > > Confirmed, thanks!
> > > >
> > > > Best Regards,
> > > > Yu
> > > >
> > > >
> > > > On Thu, 15 Aug 2019 at 18:04, Tzu-Li (Gordon) Tai <
> tzuli...@apache.org
> > >
> > > > wrote:
> > > >
> > > > > +1 to start a VOTE for this FLIP.
> > > > >
> > > > > Given the properties of this new state backend and that it will
> exist
> > > as
> > > > a
> > > > > new module without touching the original heap backend, I don't see
> a
> > > harm
> > > > > in including this.
> > > > > Regarding design of the feature, I've already mentioned my comments
> > in
> > > > the
> > > > > original discussion thread.
> > > > >
> > > > > Cheers,
> > > > > Gordon
> > > > >
> > > > > On Thu, Aug 15, 2019 at 5:53 PM Yun Tang  wrote:
> > > > >
> > > > > > Big +1 for this feature.
> > > > > >
> > > > > > Our customers including me, have ever met dilemma where we have
> to
> > > use
> > > > > > window to aggregate events in applications like real-time
> > monitoring.
> > > > The
> > > > > > larger of timer and window state, the poor performance of
> RocksDB.
> > > > > However,
> > > > > > switching to use FsStateBackend would always make me feel fear
> > about
> > > > the
> > > > > > OOM errors.
> > > > > >
> > > > > > Look forward for more powerful enrichment to state-backend, and
> > help
> > > > > Flink
> > > > > > to achieve better performance together.
> > > > > >
> > > > > > Best
> > > > > > Yun Tang
> > > > > > 
> > > > > > From: Stephan Ewen 
> > > > > > Sent: Thursday, August 15, 2019 23:07
> > > > > > To: dev 
> > > > > > Subject: Re: [DISCUSS] FLIP-50: Spill-able Heap Keyed State
> Backend
> > > > > >
> > > > > > +1 for this feature. I think this will be appreciated by users,
> as
> > a
> > > > way
> > > > > to
> > > > > > use the HeapStateBackend with a safety-net against OOM errors.
> > > > > > And having had major production exposure is great.
> > > > > >
> > > > > > From the implementation plan, it looks like this exists purely
> in a
> > > new
> > > > > > module and does not require any changes in other parts of Flink's
> > > code.
> > > > > Can
> > > > > > you confirm that?
> > > > > >
> > > > > > Other that that, I have no further questions and we could proceed
> > to
> > > > vote
> > > > > > on this FLIP, from my side.
> > > > > >
> > > > > > Best,
> > > > > > Stephan
> > > > > >
> > > > > >
> > > > > > On Tue, Aug 13, 2019 at 10:00 PM Yu Li  wrote:
> > > > > >
> > > > > > > Sorry for forgetting to give the link of the FLIP, here it is:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-50%3A+Spill-able+Heap+Keyed+State+Backend
> > > > > > >
> > > > > > > Thanks!
> > > > > > >
> > > > > > > Best Regards,
> > > > > > > Yu
> > > > > > >
> > > > > > >
> > > > > > > On Tue, 13 Aug 2019 at 18:06, Yu Li  wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > >
> > > > > > > > We ever held a discussion about this feature before [1] but
> now
> > > > > opening
> > > > > > > > another thread because after a second thought introducing a
> new
> > > > > backend
> 

[jira] [Created] (FLINK-13900) Add a built-in "console" sink to print results in console

2019-08-29 Thread Jark Wu (Jira)
Jark Wu created FLINK-13900:
---

 Summary: Add a built-in "console" sink to print results in console
 Key: FLINK-13900
 URL: https://issues.apache.org/jira/browse/FLINK-13900
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API, Table SQL / Planner
Reporter: Jark Wu


It is a useful feature for debugging and quickstart. With the console builtin 
sink, we can directly output arbitrary query into the console or 
{{taskmanager.out}} in cluster setup. 

Currently, we have to implement a dedicate PrintTableSink and configure every 
field names and types before output the query, which is very verbose. 

{code:sql}
INSERT INTO console
SELECT * FROM kafka_logs
{code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [VOTE] FLIP-54: Evolve ConfigOption and Configuration

2019-08-29 Thread Timo Walther

I converted the mentioned Google doc into a wiki page:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration

The core semantics have not changed.

Happy voting,
Timo

On 29.08.19 04:30, Zili Chen wrote:

The design looks good to me.

+1 go ahead!

Best,
tison.


Jark Wu  于2019年8月28日周三 下午6:08写道:


Hi Timo,

The new changes looks good to me.

+1 to the FLIP.


Cheers,
Jark

On Wed, 28 Aug 2019 at 16:02, Timo Walther  wrote:


Hi everyone,

after some last minute changes yesterday, I would like to start a new
vote on FLIP-54. The discussion seems to have reached an agreement. Of
course this doesn't mean that we can't propose further improvements on
ConfigOption's and Flink configuration in general in the future. It is
just one step towards having a better unified configuration for the
project.

Please vote for the following design document:




https://docs.google.com/document/d/1IQ7nwXqmhCy900t2vQLEL3N2HIdMg-JO8vTzo1BtyKU/edit#

The discussion can be found at:




https://lists.apache.org/thread.html/a56c6b52e5f828d4a737602b031e36b5dd6eaa97557306696a8063a9@%3Cdev.flink.apache.org%3E

This voting will be open for at least 72 hours. I'll try to close it on
2019-09-02 8:00 UTC, unless there is an objection or not enough votes.

I will convert it to a Wiki page afterwards.

Thanks,

Timo






Re: [DISCUSS] Best practice to run flink on kubernetes

2019-08-29 Thread Thomas Weise
Till had already summed it up, but I want to emphasize that Flink as
project only needs to provide #1 (reactive mode) and #3 (active mode, which
necessarily is tied to the cluster manager of choice). The latter would be
needed for Flink jobs to be elastic (in the future), although we may want
to discuss how such capability can be made easier with #1 as well.

For users #1 alone is of little value, since they need to solve their
deployment problem. So it will be good to list options such as the Lyft
Flink k8s operator on the ecosystem page and possibly point to that from
the Flink documentation as well.

I also want to point out that #3, while it looks easy to start with, has an
important limitation when it comes to manage long running streaming
applications. Such application essentially will be a sequence of jobs that
come and go across stateful upgrades or rollbacks. Any solution that is
designed to manage a single Flink job instance can't address that need.
That is why the k8s operator was created. It specifically understands the
concept of an application.

Thomas


On Wed, Aug 28, 2019 at 7:56 PM Zhenghua Gao  wrote:

> Thanks Yang for bringing this up. I think option1 is very useful for early
> adopters.
> People do not know much about k8s and can easily set up on minikube to have
> a taste.
>
> For option2 and option3, i prefer option3 because i am familiar yarn and
> don't have much concept of k8s.
> And there is some doube about starting a session cluster in option3:
>
> > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm
> flink-session-example
> > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT
>
> Is the -n option means number of TaskManager?
> Do we pre-running taskmanager pods or requesting and launching taskmanager
> pods dynamically?
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Fri, Aug 9, 2019 at 9:12 PM Yang Wang  wrote:
>
> > Hi all,
> >
> > Currently cloud native architectures has been introduced to many
> companies
> > in production. They use kubernetes to run deep learning, web server, etc.
> > If we could deploy the per-job/session flink cluster on kubernetes to
> make
> > it mix-run with other workloads, the cluster resource utilization will be
> > better. Also many kubernetes users are more easier to have a taste on the
> > flink.
> >
> > By now we have three options to run flink jobs on k8s.
> >
> > [1]. Create jm/tm/service yaml and apply, then you will get a flink
> > standalone cluster on k8s. Use flink run to submit job to the existed
> flink
> > cluster. Some companies may have their own deploy system to manage the
> > flink cluster.
> >
> > [2]. Use flink-k8s-operator to manage multiple flink clusters, including
> > session and perjob. It could manage the complete deployment lifecycle of
> > the application. I think this option is really easy to use for the k8s
> > users. They are familiar with k8s-opertor, kubectl and other tools of
> k8s.
> > They could debug and run the flink cluster just like other k8s
> > applications.
> >
> > [3]. Natively integration with k8s, use the flink run or
> > kubernetes-session.sh to start a flink cluster. It is very similar to
> > submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks to
> > k8s api server to start a flink master deployment of 1.
> > KubernetesResourceManager dynamically allocates resource from k8s to
> start
> > task manager as demand. This option is very easy for flink users to get
> > started. In the simplest case, we just need to update the '-m
> yarn-cluster'
> > to -m '-m kubernetes-cluster'.
> >
> > We have make an internal implementation of option [3] and use it in
> > production. After fully tested, we hope to contribute it to the
> community.
> > Now we want to get some feedbacks about the three options. Any comments
> are
> > welcome.
> >
> >
> > > What do we need to prepare when start a flink cluster on k8s using
> native
> > integration?
> >
> > Download the flink release binary and create the ~/.kube/config file
> > corresponding to the k8s cluster. It is all what you need.
> >
> >
> > > Flink Session cluster
> >
> > * start a session cluster
> >
> > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm
> flink-session-example
> > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT
> >
> > *  You will get an address to submit job, specify it through ’-ksa’
> option
> >
> > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-session-example
> > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar
> >
> >
> > > Flink Job Cluster
> >
> > * running with official flink image
> >
> > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1
> > -ki flink:latest examples/streaming/WindowJoin.jar
> >
> > * running with user image
> >
> > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1
> > -ki flink-user:latest examples/streaming/WindowJoin.jar
> >
> >
> >
> > [1].
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1

Re: Flink operators for Kubernetes

2019-08-29 Thread Thomas Weise
In case anyone comes across this thread in the archives,
the FlinkK8sOperator is now available here:

https://github.com/lyft/flinkk8soperator

The community is invited to check it out, provide feedback (use github
issues) or even better, join and contribute to it.

Thomas


On Fri, Nov 2, 2018 at 2:08 AM Till Rohrmann  wrote:

> Thanks for sharing your work with the community.
>
> Cheers,
> Till
>
> On Fri, Nov 2, 2018 at 1:16 AM Anand Swaminathan
>  wrote:
>
> > Hello,
> >
> > I have documented our work of running Flink applications on Kubernetes
> > using FlinkK8sOperator - here
> > <
> >
> https://docs.google.com/document/d/1_AITTq0fay-NwUIqNfqN0pum-erOmS_4kYZnlMxw6_o/edit#heading=h.ge413lh374xj
> > >
> >
> > We have finished the implementation of the FlinkK8sOperator, and are
> > currently testing it with few applications. Please let us know your
> > feedback.
> >
> > Thanks,
> > Anand
> > (Flyte @Lyft)
> >
> >
> > On Thu, Oct 4, 2018 at 1:19 AM Till Rohrmann 
> wrote:
> >
> > > Great to hear that you intend to open source your K8s operators. I
> would
> > be
> > > keen to see what and how you do things with the operator. If there are
> > > things to change on the Flink side in order to improve the integration,
> > > then let's discuss them.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Oct 3, 2018 at 2:52 AM Jin Sun  wrote:
> > >
> > > > Sounds interesting.
> > > >
> > > >
> > > >
> > > > Currently Flink can run in standalone mode in a kubernets cluster,
> > > details
> > > > see here.   There is also a JIRA Flink-9953: Active kubernetes
> > > integration
> > > > is target for run Flink on Kubernetes natively.
> > > >
> > > >
> > > >
> > > > Jin
> > > >
> > > >
> > > >
> > > > On 10/2/18, 1:30 PM, "Anand Swaminathan"
> >  > > >
> > > > wrote:
> > > >
> > > >
> > > >
> > > > Hello All,
> > > >
> > > >
> > > >
> > > > This is Anand from Lyft. Just wanted to send out a note that we
> at
> > > > Lyft are
> > > >
> > > > working on building Operators (https://coreos.com/operators/) to
> > > > support
> > > >
> > > > deploying, managing Flink Applications in Kubernetes. The
> operator
> > is
> > > >
> > > > responsible for creating Flink Clusters, start jobs and
> seamlessly
> > > >
> > > > transition a job from one cluster to another during deployment.
> > > >
> > > >
> > > >
> > > > We hope to open source it at some point once deployed and tested
> > > within
> > > >
> > > > Lyft. Let us know if any of you are looking for something like
> > this.
> > > We
> > > >
> > > > will be happy to collaborate.
> > > >
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Anand
> > > >
> > > > (Flyte @Lyft)
> > > >
> > > >
> > > >
> > > >
> > >
> >
>


[DISCUSS] FLIP-59: Enable execution configuration from Configuration object

2019-08-29 Thread Dawid Wysakowicz
Hi,

I wanted to propose a new, additional way of configuring execution
parameters that can currently be set only on such objects like
ExecutionConfig, CheckpointConfig and StreamExecutionEnvironment. This
poses problems such as:

  * no easy way to configure those from a file
  * there is no easy way to pass a configuration from layers built on
top of StreamExecutionEnvironment. (e.g. when we want to configure
those options from TableEnvironment)
  * they are not automatically documented

Note that there are a few concepts from FLIP-54[1] that this FLIP is
based on.

Would be really grateful to know if you think this would be a valuable
addition and any other feedback.

Best,

Dawid

Wiki page:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-59%3A+Enable+execution+configuration+from+Configuration+object

Google doc:
https://docs.google.com/document/d/1l8jW2NjhwHH1mVPbLvFolnL2vNvf4buUMDZWMfN_hFM/edit?usp=sharing


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration




signature.asc
Description: OpenPGP digital signature


[PROPOSAL] Force rebase on master before merge

2019-08-29 Thread Zili Chen
Hi devs,

GitHub provides a mechanism which is able to require branches to be
up to date before merged[1](point 6). I can see several advantages
enabling it. Thus propose our project to turn on this switch. Below are
my concerns. Looking forward to your insights.

1. Avoid CI failures in pr which fixed by another commit. We now merge a
pull request even if CI fails but the failures knowns as flaky tests.
We doesn't resolve this by turn on the switch but it helps to find any
other potential valid failures.

2. Avoid CI failures in master after pull request merged. Actually, CI
tests the branch that pull request bind exactly. Even if it gave green
it is still possible a systematic failure introduced because conflicts
with another new commit merged in master but not merged in this branch.

For the downside, it might require contributors rebase his pull requests
some times before getting merged. But it should not inflict too much
works.

Best,
tison.

[1] https://help.github.com/en/articles/enabling-required-status-checks


Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-29 Thread Becket Qin
Hi Timo and Stephan,

Thanks for the detail explanation.

1. I agree that each config should be in a human readable format. My
concern is that the current List looks going a little too far
from what the configuration is supposed to do. They are essentially
creating some Configurable objects instead of defining the config to create
those Configurable. This mixes ConfigOption and the usage of it. API wise
it would be good to keep the configs and their usages (such as how to
create objects using the ConfigOption) apart from each other.
I am wondering if we can just make List also only take string. For example,
is the following definition of map and list sufficient?

A MapConfigOption is ConfigOption>. It can be defined
as:
map_config_name: k1=v1, k2=v2, k3=v3, ...

A ListConfigOption is ConfigOption>. It can be defined as:
list_config_name: v1, v2, v3, ...

A ListOfMapConfigOption is ConfigOption>. It can
be defined as:
list_of_map_config_name: k1=v1, k2=v2; k3=v3, k4=v4;

All the key and values in the configuration are String. This also
guarantees that the configuration is always serializable.
If we want to do one more step, we can allow the ConfigOption to set all
the primitive types and parse that for the users. So something like
List, List> seems fine.

The configuration class could also have util methods to create a list of
configurable such as:
 List  clazz).
But the configuration class will not take arbitrary Configurable as the
value of its config.

2. I might have misunderstood this. But my concern on the description
extension is in the following example.

public static final ConfigOption MAX_PARALLELISM =

CoreOptions.MAX_PARALLELISM.withExtendedDescription(
"Note: That this property means that a table program has a side-effect
XYZ.");

In this case, we will have two MAX_PARALLELISM configs now. One is
CoreOptions.MAX_PARALLELISM. The other one is defined here. I suppose users
will see both configurations. One with an extended description and one
without. Let's say there is a third component which also users
MAX_PARALLELISM, will there be yet another MAX_PARALLELISM ConfigOption? If
so, what would that ConfigOption's description look like?

Ideally, we would want to have just one CoreOptions.MAX_PARALLELISM and the
description should clearly state all the usage of this ConfigOption.

3. I see, in that case, how about we name it something like
extractConfiguration()? I am just trying to see if we can make it clear
this is not something like fromBytes() and toBytes().

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 6:09 PM Timo Walther  wrote:

> Hi Becket,
>
> let me try to clarify some of your questions:
>
> 1. For every option, we also needed to think about how to represent it
> in a human readable format. We do not want to allow arbitrary nesting
> because that would easily allow to bypass the flattened hierarchy of
> config options (`session.memory.min`). The current design allows to
> represent every option type as a list. E.g.:
>
> `myIntOption: 12` can be `myIntListOption: 12;12`
> `myObjectOption: field=12,other=true` can be `myObjectListOption:
> field=12,other=true; field=12,other=true`
> `myPropertyOption: key=str0,other=str1` can be `myPropertyListOption:
> key=str0,other=str1;key=str0,other=str1`
>
> We need the atomic class for serialization/deserialization both in
> binary and string format.
>
> ConfigOption is not present in the code base yet, but this FLIP is
> a preparation of making ExecutionConfig configurable. If you look into
> this class or also in existing table connectors/formats, you will see
> that each proposed option type has its requirements.
>
> 2. Regarding extending the description of ConfigOptions, the semantic of
> one option should be a super set of the other option. E.g. in Table API
> we might use general ExecutionConfig properties. But we would like to a)
> make external options more prominent in the Table API config docs to
> link people to properties they should pay attention b) notice about side
> effects. The core semantic of a property should not change.
>
> 3. The factory will not receive the entire configuration but works in a
> separate key space. For `myObjectOption` above, it would receive a
> configuration that consists of `field: 12` and `other: true`.
>
> I agree. I will convert the document into a Wiki page today.
>
> Thanks,
> Timo
>
> On 29.08.19 09:00, Stephan Ewen wrote:
> > @Becket One thing that may be non-obvious is that the Configuration class
> > also defines serialization / persistence logic at the moment. So it needs
> > to know the set of types it supports. That stands in the way of an
> > arbitrary generic map type.
> >
> > @Timo I agree though that it seems a bit inconsistent to have one
> > collection orthogonal to the type (List) and another one bound to
> specific
> > types (Map).
> >
> > On Thu, Aug 29, 2019 at 8:20 AM Becket Qin  wrote:
> >
> >> Hi Timo,
> >>
> >> Thanks for the proposal. Sorry for the late comments,

Re: [DISCUSS] FLIP-59: Enable execution configuration from Configuration object

2019-08-29 Thread Gyula Fóra
Hi!

Huuuge +1 from me, this has been an operational pain for years.
This would also introduce a nice and simple way to extend it in the future
if we need.

Ship it!

Gyula

On Thu, Aug 29, 2019 at 5:05 PM Dawid Wysakowicz 
wrote:

> Hi,
>
> I wanted to propose a new, additional way of configuring execution
> parameters that can currently be set only on such objects like
> ExecutionConfig, CheckpointConfig and StreamExecutionEnvironment. This
> poses problems such as:
>
>- no easy way to configure those from a file
>- there is no easy way to pass a configuration from layers built on
>top of StreamExecutionEnvironment. (e.g. when we want to configure those
>options from TableEnvironment)
>- they are not automatically documented
>
> Note that there are a few concepts from FLIP-54[1] that this FLIP is based
> on.
>
> Would be really grateful to know if you think this would be a valuable
> addition and any other feedback.
>
> Best,
>
> Dawid
>
> Wiki page:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-59%3A+Enable+execution+configuration+from+Configuration+object
>
> Google doc:
> https://docs.google.com/document/d/1l8jW2NjhwHH1mVPbLvFolnL2vNvf4buUMDZWMfN_hFM/edit?usp=sharing
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>
>
>


Re: [DISCUSS] FLIP-59: Enable execution configuration from Configuration object

2019-08-29 Thread Gyula Fóra
What we could also add to make this a bit more generic and extensible is to
create some interfaces for reconfiguring the StreamExecutionEnvironment,
ExecutionConfig etc and let users specify a class the implements the
reconfiguration logic based on the flink configuration.

This could be executed after the default behaviour that you outlined in the
FLIP.

What do you think?

Gyula

On Thu, Aug 29, 2019 at 7:21 PM Gyula Fóra  wrote:

> Hi!
>
> Huuuge +1 from me, this has been an operational pain for years.
> This would also introduce a nice and simple way to extend it in the future
> if we need.
>
> Ship it!
>
> Gyula
>
> On Thu, Aug 29, 2019 at 5:05 PM Dawid Wysakowicz 
> wrote:
>
>> Hi,
>>
>> I wanted to propose a new, additional way of configuring execution
>> parameters that can currently be set only on such objects like
>> ExecutionConfig, CheckpointConfig and StreamExecutionEnvironment. This
>> poses problems such as:
>>
>>- no easy way to configure those from a file
>>- there is no easy way to pass a configuration from layers built on
>>top of StreamExecutionEnvironment. (e.g. when we want to configure those
>>options from TableEnvironment)
>>- they are not automatically documented
>>
>> Note that there are a few concepts from FLIP-54[1] that this FLIP is
>> based on.
>>
>> Would be really grateful to know if you think this would be a valuable
>> addition and any other feedback.
>>
>> Best,
>>
>> Dawid
>>
>> Wiki page:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-59%3A+Enable+execution+configuration+from+Configuration+object
>>
>> Google doc:
>> https://docs.google.com/document/d/1l8jW2NjhwHH1mVPbLvFolnL2vNvf4buUMDZWMfN_hFM/edit?usp=sharing
>>
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>>
>>
>>


How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-29 Thread Elkhan Dadashov
Dear Flink developers,

Having  difficulty of getting  a Flink job started.

The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
containers.

The default HDFS replication is 3.

*The Yarn queue is empty, and 800 containers  are allocated
almost immediately  by Yarn  RM.*

It takes very long time until all 800 nodes (node managers) will download
Uberjar from HDFS to local machines.

*Q1:*

a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
size = HDFS replication size)

b) Or Do Flink TM's can replicate from each other  ? or  already started
TM's replicate  to  yet-started  nodes?

Most probably answer is (a), but  want to confirm.

*Q2:*

What  is the recommended way of handling  400MB+ Uberjar with 800+
containers ?

Any specific params to tune?

Thanks.

Because downloading the UberJar takes really   long time, after around 15
minutes since the job kicked, facing this exception:

org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request
to start container.
This token is expired. current time is 1567116179193 found 1567116001610
Note: System times on machines may be out of sync. Check system time
and time zones.
at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
Source)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
at 
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
at 
org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-08-29 Thread Bowen Li
Thanks everyone for the feedback.

I have updated the document accordingly. Here're the summary of changes:

- clarify the concept of temporary functions, to facilitate deciding
function resolution order
- provide two options to support Hive built-in functions, with the 2nd one
being preferred
- add detailed prototype code for FunctionCatalog#lookupFunction(name)
- move the section of ”rename existing FunctionCatalog APIs in favor of
temporary functions“ out of the scope of the FLIP
- add another reasonable limitation for function resolution, to not
consider resolving overloaded functions - those with the same name but
different params. (It's still valid to have a single function with
overloaded eval() methods)

Please take another look.

Thanks,
Bowen

On Tue, Aug 27, 2019 at 11:49 AM Bowen Li  wrote:

> Hi folks,
>
> I'd like to kick off a discussion on reworking Flink's FunctionCatalog.
> It's critically helpful to improve function usability in SQL.
>
>
> https://docs.google.com/document/d/1w3HZGj9kry4RsKVCduWp82HkW6hhgi2unnvOAUS72t8/edit?usp=sharing
>
> In short, it:
> - adds support for precise function reference with fully/partially
> qualified name
> - redefines function resolution order for ambiguous function reference
> - adds support for Hive's rich built-in functions (support for Hive user
> defined functions was already added in 1.9.0)
> - clarifies the concept of temporary functions
>
> Would love to hear your thoughts.
>
> Bowen
>


Re: [DISCUSS] Enhance Support for Multicast Communication Pattern

2019-08-29 Thread Yun Gao
Hi all,

Very thanks Jark for the new scenarios. Based on the these new scenarios, I 
think these scenarios and iteration should be able to represent a type of 
scenarios that requires broadcasting events.

I also totally agree with Piotr that all the scenarios we have discussed should 
be clearly motivated. From what we learned from the discussion, now we think 
that broadcasting events seems to be most suitable for iteration and also some 
other scenarios, therefore, we would rewrite a motivation design doc for 
broadcasting events first and reinitiate a separate discussion for that. The 
current discussion would be then continue for scenarios require actual 
multicasting. Very thanks for all the valuable points raised, and I think now 
the comparison of different methods and scenarios are more clear. :)

Best,
Yun


--
From:Jark Wu 
Send Time:2019 Aug. 27 (Tue.) 16:27
To:dev 
Cc:Yun Gao 
Subject:Re: [DISCUSS] Enhance Support for Multicast Communication Pattern

Hi all,

Thanks Yun for bringing this topic. I missed this discussion because of the 
"multicast" title. 
After reading the design, if I understand correctly, it is proposing a custom 
event mach mechanism, i.e. broadcasting custom event. 
It is a orthogonality topic with multicasting. So I would suggest to start a 
new thread to discuss about it. 

Regarding to broadcasting custom event:

I would +1 for motivation, because we also encountered similar requirements 
when improving Table API & SQL before. 

For example, the mini-batch mechanism in blink planner will emit a special 
mini-batch event to the data stream to indicate this is a start of a new 
mini-batch. 
The downstream aggregation operator will buffer the data records until it 
receive the mini-batch event, and then process the buffer at once. This will 
reduce a lot of state access. 
However, we don't have a proper custom event mechanism currently, so we 
leverage the watermark as the mini-batch event (which is a little hack in my 
opinion).

Another case is joining a huge dimension table which is stored/produced in hive 
daily. We can scan the hive table and shuffle to the JOIN operators by the join 
key to join with the main stream.
Note that the dimension table is changed every day, we want to join the latest 
version of the hive table. Then we need to re-scan and re-shuffle the hive 
table once a new daily partition is produced. 
However, we need some special events to distinguish the boundary of different 
version of the dimension table. The events will be used to notify downstream 
operators (mainly the JOIN operator)
 to know "ok, I will receive a new version of the dimension table", "ok, I 
received the all the data of this version."

From my understanding, in order to support this feature, we might need to:
 1) expose collectEvent(CustomEvent) or broadcastEvent(CustomEvent) API to 
users. 
 2) support to register the serialization and deserialization of the custom 
event
 3) expose processEvent(int channel, CustomEvent) API to StreamOperator


Regards,
Jark


On Tue, 27 Aug 2019 at 15:18, Piotr Nowojski  wrote:
Hi,

 Before starting a work on the design doc, I would suggest to find someone to 
shepherd this project. Otherwise this effort might drown among other parallel 
things. I could take care of that from the runtime perspective, however most of 
the changes are about the API and changes, which are outside of my area of 
expertise.

 Regarding the multicast, before we start working on that, I would also prefer 
to see a motivation design doc, how that feature would be used for example for 
cross or theta joins in the Table API, since very similar questions would apply 
to that as well.

 Piotrek

 > On 27 Aug 2019, at 08:10, SHI Xiaogang  wrote:
 > 
 > Hi Yun Gao,
 > 
 > Thanks a lot for your clarification.
 > 
 > Now that the notification of broadcast events requires alignment whose
 > implementation, in my opinion, will affect the correctness of synchronous
 > iterations, I prefer to postpone the discussion until you have completed
 > the design of the new iteration library, or at least the progress tracking
 > part. Otherwise, the discussion for broadcasting events may become an empty
 > talk if it does not fit in with the final design.
 > 
 > What do you think?
 > 
 > Regards,
 > Xiaogang
 > 
 > Yun Gao  于2019年8月27日周二 上午11:33写道:
 > 
 >> Hi Xiaogang,
 >> 
 >>  Very thanks for also considering the iteration case! :) These points
 >> are really important for iteration. As a whole, we are implementing a new
 >> iteration library on top of Stream API. As a library, most of its
 >> implementation does not need to touch Runtime layer, but it really has some
 >> new requirements on the API, like the one for being able to broadcast the
 >> progressive events. To be more detail, these events indeed carry the
 >> sender's index and the downstream operators need to do alignment the events
 >> from all the ups

Re: [PROPOSAL] Force rebase on master before merge

2019-08-29 Thread Kurt Young
Hi Zili,

Thanks for the proposal, I had similar confusion in the past with your
point #2.
Force rebase to master before merging can solve some problems, but it also
introduces new problem. Given the CI testing time is quite long (couple of
hours)
now, it's highly possible that before your test which triggered by rebasing
finishes,
the master will get some more new commits. This situation will get worse if
more
people are doing this. One possible solution is let the committer decide
what should
do before he/she merges it. If it's a trivial issue, just merge it if
travis passes is
fine. But if it's a rather big one, and some related codes just got merged
in to master,
I will choose to rebase to master and push it to my own repo to trigger my
personal
CI test on it because this can guarantee the testing time.

To summarize: I think this should be decided by the committer who is
merging the PR,
but not be forced.

Best,
Kurt


On Thu, Aug 29, 2019 at 11:07 PM Zili Chen  wrote:

> Hi devs,
>
> GitHub provides a mechanism which is able to require branches to be
> up to date before merged[1](point 6). I can see several advantages
> enabling it. Thus propose our project to turn on this switch. Below are
> my concerns. Looking forward to your insights.
>
> 1. Avoid CI failures in pr which fixed by another commit. We now merge a
> pull request even if CI fails but the failures knowns as flaky tests.
> We doesn't resolve this by turn on the switch but it helps to find any
> other potential valid failures.
>
> 2. Avoid CI failures in master after pull request merged. Actually, CI
> tests the branch that pull request bind exactly. Even if it gave green
> it is still possible a systematic failure introduced because conflicts
> with another new commit merged in master but not merged in this branch.
>
> For the downside, it might require contributors rebase his pull requests
> some times before getting merged. But it should not inflict too much
> works.
>
> Best,
> tison.
>
> [1] https://help.github.com/en/articles/enabling-required-status-checks
>


Re: [VOTE] FLIP-58: Flink Python User-Defined Function for Table API

2019-08-29 Thread Wei Zhong
Hi Dian,

+1 non-binding
Thanks for driving this!

Best, Wei

> 在 2019年8月29日,09:25,Hequn Cheng  写道:
> 
> Hi Dian,
> 
> +1
> Thanks a lot for driving this.
> 
> Best, Hequn
> 
> On Wed, Aug 28, 2019 at 2:01 PM jincheng sun 
> wrote:
> 
>> Hi Dian,
>> 
>> +1, Thanks for your great job!
>> 
>> Best,
>> Jincheng
>> 
>> Dian Fu  于2019年8月28日周三 上午11:04写道:
>> 
>>> Hi all,
>>> 
>>> I'd like to start a voting thread for FLIP-58 [1] since that we have
>>> reached an agreement on the design in the discussion thread [2],
>>> 
>>> This vote will be open for at least 72 hours. Unless there is an
>>> objection, I will try to close it by Sept 2, 2019 00:00 UTC if we have
>>> received sufficient votes.
>>> 
>>> PS: This doesn't mean that we cannot further improve the design. We can
>>> still discuss the implementation details case by case in the JIRA as long
>>> as it doesn't affect the overall design.
>>> 
>>> [1]
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Function+for+Table+API
>>> <
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58:+Flink+Python+User-Defined+Function+for+Table+API
 
>>> [2]
>>> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
>>> <
>>> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
 
>>> 
>>> Thanks,
>>> Dian
>> 



Re: [VOTE] FLIP-58: Flink Python User-Defined Function for Table API

2019-08-29 Thread Xingbo Huang
Hi Dian,

+1,
Thanks a lot for driving this.

Best, 
Xingbo
> 在 2019年8月30日,上午9:39,Wei Zhong  写道:
> 
> Hi Dian,
> 
> +1 non-binding
> Thanks for driving this!
> 
> Best, Wei
> 
>> 在 2019年8月29日,09:25,Hequn Cheng  写道:
>> 
>> Hi Dian,
>> 
>> +1
>> Thanks a lot for driving this.
>> 
>> Best, Hequn
>> 
>> On Wed, Aug 28, 2019 at 2:01 PM jincheng sun 
>> wrote:
>> 
>>> Hi Dian,
>>> 
>>> +1, Thanks for your great job!
>>> 
>>> Best,
>>> Jincheng
>>> 
>>> Dian Fu  于2019年8月28日周三 上午11:04写道:
>>> 
 Hi all,
 
 I'd like to start a voting thread for FLIP-58 [1] since that we have
 reached an agreement on the design in the discussion thread [2],
 
 This vote will be open for at least 72 hours. Unless there is an
 objection, I will try to close it by Sept 2, 2019 00:00 UTC if we have
 received sufficient votes.
 
 PS: This doesn't mean that we cannot further improve the design. We can
 still discuss the implementation details case by case in the JIRA as long
 as it doesn't affect the overall design.
 
 [1]
 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Function+for+Table+API
 <
 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58:+Flink+Python+User-Defined+Function+for+Table+API
> 
 [2]
 
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
 <
 
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
> 
 
 Thanks,
 Dian
>>> 
> 



Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-29 Thread SHI Xiaogang
Hi Datashov,

We faced similar problems in our production clusters.

Now both lauching and stopping of containers are performed in the main
thread of YarnResourceManager. As containers are launched and stopped one
after another, it usually takes long time to boostrap large jobs. Things
get worse when some node managers get lost. Yarn will retry many times to
communicate with them, leading to heartbeat timeout of TaskManagers.

Following are some efforts we made to help Flink deal with large jobs.

1. We provision some common jars in all cluster nodes and ask our users not
to include these jars in their uberjar. When containers bootstrap, these
jars are added to the classpath via JVM options. That way, we can
efficiently reduce the size of uberjars.

2. We deploys some asynchronous threads to launch and stop containers in
YarnResourceManager. The bootstrap time can be efficiently  reduced when
launching a large amount of containers. We'd like to contribute it to the
community very soon.

3. We deploys a timeout timer for each launching container. If a task
manager does not register in time after its container has been launched, a
new container will be allocated and launched. That will lead to certain
waste of resources, but can reduce the effects caused by slow or
problematic nodes.

Now the community is considering the refactoring of ResourceManager. I
think it will be the time for improving its efficiency.

Regards,
Xiaogang

Elkhan Dadashov  于2019年8月30日周五 上午7:10写道:

> Dear Flink developers,
>
> Having  difficulty of getting  a Flink job started.
>
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
> containers.
>
> The default HDFS replication is 3.
>
> *The Yarn queue is empty, and 800 containers  are allocated
> almost immediately  by Yarn  RM.*
>
> It takes very long time until all 800 nodes (node managers) will download
> Uberjar from HDFS to local machines.
>
> *Q1:*
>
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
> size = HDFS replication size)
>
> b) Or Do Flink TM's can replicate from each other  ? or  already started
> TM's replicate  to  yet-started  nodes?
>
> Most probably answer is (a), but  want to confirm.
>
> *Q2:*
>
> What  is the recommended way of handling  400MB+ Uberjar with 800+
> containers ?
>
> Any specific params to tune?
>
> Thanks.
>
> Because downloading the UberJar takes really   long time, after around 15
> minutes since the job kicked, facing this exception:
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>   at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
> Source)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>   at 
> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>   at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>


[jira] [Created] (FLINK-13901) Documentation links check errors in release-1.9

2019-08-29 Thread Jark Wu (Jira)
Jark Wu created FLINK-13901:
---

 Summary:  Documentation links check errors in release-1.9
 Key: FLINK-13901
 URL: https://issues.apache.org/jira/browse/FLINK-13901
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.9.0
Reporter: Jark Wu
 Fix For: 1.9.1


[2019-08-29 16:04:44] ERROR `/zh/dev/table/config.html' not found.
[2019-08-29 16:04:47] ERROR `/zh/dev/table/catalog.html' not found.
http://localhost:4000/zh/dev/table/config.html:
Remote file does not exist -- broken link!!!



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [VOTE] FLIP-58: Flink Python User-Defined Function for Table API

2019-08-29 Thread Jark Wu
+1

Thanks for the great work!

On Fri, 30 Aug 2019 at 10:04, Xingbo Huang  wrote:

> Hi Dian,
>
> +1,
> Thanks a lot for driving this.
>
> Best,
> Xingbo
> > 在 2019年8月30日,上午9:39,Wei Zhong  写道:
> >
> > Hi Dian,
> >
> > +1 non-binding
> > Thanks for driving this!
> >
> > Best, Wei
> >
> >> 在 2019年8月29日,09:25,Hequn Cheng  写道:
> >>
> >> Hi Dian,
> >>
> >> +1
> >> Thanks a lot for driving this.
> >>
> >> Best, Hequn
> >>
> >> On Wed, Aug 28, 2019 at 2:01 PM jincheng sun 
> >> wrote:
> >>
> >>> Hi Dian,
> >>>
> >>> +1, Thanks for your great job!
> >>>
> >>> Best,
> >>> Jincheng
> >>>
> >>> Dian Fu  于2019年8月28日周三 上午11:04写道:
> >>>
>  Hi all,
> 
>  I'd like to start a voting thread for FLIP-58 [1] since that we have
>  reached an agreement on the design in the discussion thread [2],
> 
>  This vote will be open for at least 72 hours. Unless there is an
>  objection, I will try to close it by Sept 2, 2019 00:00 UTC if we have
>  received sufficient votes.
> 
>  PS: This doesn't mean that we cannot further improve the design. We
> can
>  still discuss the implementation details case by case in the JIRA as
> long
>  as it doesn't affect the overall design.
> 
>  [1]
> 
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Function+for+Table+API
>  <
> 
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58:+Flink+Python+User-Defined+Function+for+Table+API
> >
>  [2]
> 
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
>  <
> 
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
> >
> 
>  Thanks,
>  Dian
> >>>
> >
>
>


[jira] [Created] (FLINK-13902) Can not use index to convert FieldReferenceExpression to RexNode

2019-08-29 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-13902:


 Summary: Can not use index to convert FieldReferenceExpression to 
RexNode
 Key: FLINK-13902
 URL: https://issues.apache.org/jira/browse/FLINK-13902
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Jingsong Lee


Now, we can not use inputCount+inputIndex+FieldIndex to construct rex input 
reference of calcite.

See QueryOperationConverter.SingleRelVisitor.visit(AggregateQueryOperation). 
Calcite will shuffle the output order of groupings.(See RelBuilder.aggregate, 
it use ImmutableBitSet to store groupings) So the output fields order will be 
changed too. This lead to the output fields orders of AggregateOperationFactory 
is different from calcite output orders.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13903) Support Hive version 2.3.6

2019-08-29 Thread Xuefu Zhang (Jira)
Xuefu Zhang created FLINK-13903:
---

 Summary: Support Hive version 2.3.6
 Key: FLINK-13903
 URL: https://issues.apache.org/jira/browse/FLINK-13903
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Reporter: Xuefu Zhang
Assignee: Xuefu Zhang
 Fix For: 1.10.0


This is to support all 1.2 (1.2.0, 1.2.1, 1.2.2) and 2.3 (2.3.0-5) versions.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [PROPOSAL] Force rebase on master before merge

2019-08-29 Thread Zili Chen
Hi Kurt,

Thanks for your reply!

I find two concerns about the downside from your email. Correct
me if I misunderstanding.

1. Rebase times. Typically commits are independent one another, rebase
just fast-forward changes so that contributors rarely resolve conflicts
by himself. Reviews doesn't get blocked by this force rebase if there is
a green travis report ever -- just require contributor rebase and test
again, which generally doesn't involve changes(unless resolve conflicts).
Contributor rebases his pull request when he has spare time or is required
by reviewer/before getting merged. This should not inflict too much works.

2. Testing time. It is a separated topic that discussed in this thread[1].
I don't think we finally live with a long testing time, so it won't be a
problem then we trigger multiple tests.

Simply sum up, for trivial cases, works are trivial and it
prevents accidentally
failures; for complicated cases, it already requires rebase and fully tests.

Best,
tison.

[1]
https://lists.apache.org/x/thread.html/b90aa518fcabce94f8e1de4132f46120fae613db6e95a2705f1bd1ea@%3Cdev.flink.apache.org%3E


Kurt Young  于2019年8月30日周五 上午9:15写道:

> Hi Zili,
>
> Thanks for the proposal, I had similar confusion in the past with your
> point #2.
> Force rebase to master before merging can solve some problems, but it also
> introduces new problem. Given the CI testing time is quite long (couple of
> hours)
> now, it's highly possible that before your test which triggered by rebasing
> finishes,
> the master will get some more new commits. This situation will get worse if
> more
> people are doing this. One possible solution is let the committer decide
> what should
> do before he/she merges it. If it's a trivial issue, just merge it if
> travis passes is
> fine. But if it's a rather big one, and some related codes just got merged
> in to master,
> I will choose to rebase to master and push it to my own repo to trigger my
> personal
> CI test on it because this can guarantee the testing time.
>
> To summarize: I think this should be decided by the committer who is
> merging the PR,
> but not be forced.
>
> Best,
> Kurt
>
>
> On Thu, Aug 29, 2019 at 11:07 PM Zili Chen  wrote:
>
> > Hi devs,
> >
> > GitHub provides a mechanism which is able to require branches to be
> > up to date before merged[1](point 6). I can see several advantages
> > enabling it. Thus propose our project to turn on this switch. Below are
> > my concerns. Looking forward to your insights.
> >
> > 1. Avoid CI failures in pr which fixed by another commit. We now merge a
> > pull request even if CI fails but the failures knowns as flaky tests.
> > We doesn't resolve this by turn on the switch but it helps to find any
> > other potential valid failures.
> >
> > 2. Avoid CI failures in master after pull request merged. Actually, CI
> > tests the branch that pull request bind exactly. Even if it gave green
> > it is still possible a systematic failure introduced because conflicts
> > with another new commit merged in master but not merged in this branch.
> >
> > For the downside, it might require contributors rebase his pull requests
> > some times before getting merged. But it should not inflict too much
> > works.
> >
> > Best,
> > tison.
> >
> > [1] https://help.github.com/en/articles/enabling-required-status-checks
> >
>


Re: [VOTE] FLIP-58: Flink Python User-Defined Function for Table API

2019-08-29 Thread Terry Wang
+1. That would be very helpful.
Best,
Terry Wang



> 在 2019年8月30日,上午10:18,Jark Wu  写道:
> 
> +1
> 
> Thanks for the great work!
> 
> On Fri, 30 Aug 2019 at 10:04, Xingbo Huang  wrote:
> 
>> Hi Dian,
>> 
>> +1,
>> Thanks a lot for driving this.
>> 
>> Best,
>> Xingbo
>>> 在 2019年8月30日,上午9:39,Wei Zhong  写道:
>>> 
>>> Hi Dian,
>>> 
>>> +1 non-binding
>>> Thanks for driving this!
>>> 
>>> Best, Wei
>>> 
 在 2019年8月29日,09:25,Hequn Cheng  写道:
 
 Hi Dian,
 
 +1
 Thanks a lot for driving this.
 
 Best, Hequn
 
 On Wed, Aug 28, 2019 at 2:01 PM jincheng sun 
 wrote:
 
> Hi Dian,
> 
> +1, Thanks for your great job!
> 
> Best,
> Jincheng
> 
> Dian Fu  于2019年8月28日周三 上午11:04写道:
> 
>> Hi all,
>> 
>> I'd like to start a voting thread for FLIP-58 [1] since that we have
>> reached an agreement on the design in the discussion thread [2],
>> 
>> This vote will be open for at least 72 hours. Unless there is an
>> objection, I will try to close it by Sept 2, 2019 00:00 UTC if we have
>> received sufficient votes.
>> 
>> PS: This doesn't mean that we cannot further improve the design. We
>> can
>> still discuss the implementation details case by case in the JIRA as
>> long
>> as it doesn't affect the overall design.
>> 
>> [1]
>> 
> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Function+for+Table+API
>> <
>> 
> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58:+Flink+Python+User-Defined+Function+for+Table+API
>>> 
>> [2]
>> 
> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
>> <
>> 
> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
>>> 
>> 
>> Thanks,
>> Dian
> 
>>> 
>> 
>> 



Re: [VOTE] FLIP-58: Flink Python User-Defined Function for Table API

2019-08-29 Thread Yu Li
+1 (non-binding)

Thanks for driving this!

Best Regards,
Yu


On Fri, 30 Aug 2019 at 11:01, Terry Wang  wrote:

> +1. That would be very helpful.
> Best,
> Terry Wang
>
>
>
> > 在 2019年8月30日,上午10:18,Jark Wu  写道:
> >
> > +1
> >
> > Thanks for the great work!
> >
> > On Fri, 30 Aug 2019 at 10:04, Xingbo Huang  wrote:
> >
> >> Hi Dian,
> >>
> >> +1,
> >> Thanks a lot for driving this.
> >>
> >> Best,
> >> Xingbo
> >>> 在 2019年8月30日,上午9:39,Wei Zhong  写道:
> >>>
> >>> Hi Dian,
> >>>
> >>> +1 non-binding
> >>> Thanks for driving this!
> >>>
> >>> Best, Wei
> >>>
>  在 2019年8月29日,09:25,Hequn Cheng  写道:
> 
>  Hi Dian,
> 
>  +1
>  Thanks a lot for driving this.
> 
>  Best, Hequn
> 
>  On Wed, Aug 28, 2019 at 2:01 PM jincheng sun <
> sunjincheng...@gmail.com>
>  wrote:
> 
> > Hi Dian,
> >
> > +1, Thanks for your great job!
> >
> > Best,
> > Jincheng
> >
> > Dian Fu  于2019年8月28日周三 上午11:04写道:
> >
> >> Hi all,
> >>
> >> I'd like to start a voting thread for FLIP-58 [1] since that we have
> >> reached an agreement on the design in the discussion thread [2],
> >>
> >> This vote will be open for at least 72 hours. Unless there is an
> >> objection, I will try to close it by Sept 2, 2019 00:00 UTC if we
> have
> >> received sufficient votes.
> >>
> >> PS: This doesn't mean that we cannot further improve the design. We
> >> can
> >> still discuss the implementation details case by case in the JIRA as
> >> long
> >> as it doesn't affect the overall design.
> >>
> >> [1]
> >>
> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Function+for+Table+API
> >> <
> >>
> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58:+Flink+Python+User-Defined+Function+for+Table+API
> >>>
> >> [2]
> >>
> >
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
> >> <
> >>
> >
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
> >>>
> >>
> >> Thanks,
> >> Dian
> >
> >>>
> >>
> >>
>
>


Using Avro SpecficRecord serialization instead of slower ReflectDatumWriter/GenericDatumWriter

2019-08-29 Thread Roshan Naik
Noticing that Flink takes very long inside collect(..) due to Avro 
serialization that relies on  ReflectDatumWriter & GenericDatumWriter.   The 
object being serialized here is an Avro object that implements 
SpecificRecordBase. It is somewhat about large (~50Kb) and complex. 

Looking for a way to use SpecificDatumWriter for the serialization instead of 
the generic/reflection based stuff to speed it up. But don't see a way to 
influence that change. 










Re: [VOTE] FLIP-58: Flink Python User-Defined Function for Table API

2019-08-29 Thread Jeff Zhang
+1, very looking forward this feature in flink 1.10


Yu Li  于2019年8月30日周五 上午11:08写道:

> +1 (non-binding)
>
> Thanks for driving this!
>
> Best Regards,
> Yu
>
>
> On Fri, 30 Aug 2019 at 11:01, Terry Wang  wrote:
>
> > +1. That would be very helpful.
> > Best,
> > Terry Wang
> >
> >
> >
> > > 在 2019年8月30日,上午10:18,Jark Wu  写道:
> > >
> > > +1
> > >
> > > Thanks for the great work!
> > >
> > > On Fri, 30 Aug 2019 at 10:04, Xingbo Huang  wrote:
> > >
> > >> Hi Dian,
> > >>
> > >> +1,
> > >> Thanks a lot for driving this.
> > >>
> > >> Best,
> > >> Xingbo
> > >>> 在 2019年8月30日,上午9:39,Wei Zhong  写道:
> > >>>
> > >>> Hi Dian,
> > >>>
> > >>> +1 non-binding
> > >>> Thanks for driving this!
> > >>>
> > >>> Best, Wei
> > >>>
> >  在 2019年8月29日,09:25,Hequn Cheng  写道:
> > 
> >  Hi Dian,
> > 
> >  +1
> >  Thanks a lot for driving this.
> > 
> >  Best, Hequn
> > 
> >  On Wed, Aug 28, 2019 at 2:01 PM jincheng sun <
> > sunjincheng...@gmail.com>
> >  wrote:
> > 
> > > Hi Dian,
> > >
> > > +1, Thanks for your great job!
> > >
> > > Best,
> > > Jincheng
> > >
> > > Dian Fu  于2019年8月28日周三 上午11:04写道:
> > >
> > >> Hi all,
> > >>
> > >> I'd like to start a voting thread for FLIP-58 [1] since that we
> have
> > >> reached an agreement on the design in the discussion thread [2],
> > >>
> > >> This vote will be open for at least 72 hours. Unless there is an
> > >> objection, I will try to close it by Sept 2, 2019 00:00 UTC if we
> > have
> > >> received sufficient votes.
> > >>
> > >> PS: This doesn't mean that we cannot further improve the design.
> We
> > >> can
> > >> still discuss the implementation details case by case in the JIRA
> as
> > >> long
> > >> as it doesn't affect the overall design.
> > >>
> > >> [1]
> > >>
> > >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Function+for+Table+API
> > >> <
> > >>
> > >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58:+Flink+Python+User-Defined+Function+for+Table+API
> > >>>
> > >> [2]
> > >>
> > >
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
> > >> <
> > >>
> > >
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
> > >>>
> > >>
> > >> Thanks,
> > >> Dian
> > >
> > >>>
> > >>
> > >>
> >
> >
>


-- 
Best Regards

Jeff Zhang


[jira] [Created] (FLINK-13904) Avoid competition between different rounds of checkpoint triggering

2019-08-29 Thread Biao Liu (Jira)
Biao Liu created FLINK-13904:


 Summary: Avoid competition between different rounds of checkpoint 
triggering
 Key: FLINK-13904
 URL: https://issues.apache.org/jira/browse/FLINK-13904
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.10.0


As a part of {{CheckpointCoordinator}} refactoring, I'd like to simplify the 
concurrent triggering logic.
The different rounds of checkpoint triggering would be processed sequentially. 
The final target is getting rid of timer thread and {{triggerLock}}.

Note that we can't avoid all competitions of triggering for now. There is still 
a competition between normal checkpoint triggering and savepoint triggering. We 
could avoid this competition by executing triggering in main thread. But it 
could not be achieved until all blocking operations are handled well in IO 
threads.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] Best practice to run flink on kubernetes

2019-08-29 Thread Yang Wang
Hi Zhenghua,

You are right. For per-job cluster, the taskmanagers will be allocated

dynamically by KubernetesResourceManager. For session cluster, we hope

taskmangers could be pre-allocated even though it does not work now.

Please navigate to the doc[1] for more details.




Hi Thomas,

We have no doubt that flink only need to support #1 and #3. For #1,

we need external deployment management tools to make it in production.

I also think kubernetes operator is good choice. It makes managing multiple

flink jobs and long running streaming applications easier.


Also in some companies, they have their own flink job management platform.

Platform users submit flink job through webui. Update the flink
configuration

and restart the the job.


For #3, we just want to make it possible to start flink job cluster and
session

cluster through cli. These users who used to run flink workloads on yarn
are

very convenient to migrate to kubernetes cluster. Compared to #1, the
dynamic

resource allocation is an important advantage. Maybe it could also be
introduced

to #1 in the future by some way.




[1].
https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing

Thomas Weise  于2019年8月29日周四 下午10:24写道:

> Till had already summed it up, but I want to emphasize that Flink as
> project only needs to provide #1 (reactive mode) and #3 (active mode, which
> necessarily is tied to the cluster manager of choice). The latter would be
> needed for Flink jobs to be elastic (in the future), although we may want
> to discuss how such capability can be made easier with #1 as well.
>
> For users #1 alone is of little value, since they need to solve their
> deployment problem. So it will be good to list options such as the Lyft
> Flink k8s operator on the ecosystem page and possibly point to that from
> the Flink documentation as well.
>
> I also want to point out that #3, while it looks easy to start with, has an
> important limitation when it comes to manage long running streaming
> applications. Such application essentially will be a sequence of jobs that
> come and go across stateful upgrades or rollbacks. Any solution that is
> designed to manage a single Flink job instance can't address that need.
> That is why the k8s operator was created. It specifically understands the
> concept of an application.
>
> Thomas
>
>
> On Wed, Aug 28, 2019 at 7:56 PM Zhenghua Gao  wrote:
>
> > Thanks Yang for bringing this up. I think option1 is very useful for
> early
> > adopters.
> > People do not know much about k8s and can easily set up on minikube to
> have
> > a taste.
> >
> > For option2 and option3, i prefer option3 because i am familiar yarn and
> > don't have much concept of k8s.
> > And there is some doube about starting a session cluster in option3:
> >
> > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm
> > flink-session-example
> > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT
> >
> > Is the -n option means number of TaskManager?
> > Do we pre-running taskmanager pods or requesting and launching
> taskmanager
> > pods dynamically?
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Fri, Aug 9, 2019 at 9:12 PM Yang Wang  wrote:
> >
> > > Hi all,
> > >
> > > Currently cloud native architectures has been introduced to many
> > companies
> > > in production. They use kubernetes to run deep learning, web server,
> etc.
> > > If we could deploy the per-job/session flink cluster on kubernetes to
> > make
> > > it mix-run with other workloads, the cluster resource utilization will
> be
> > > better. Also many kubernetes users are more easier to have a taste on
> the
> > > flink.
> > >
> > > By now we have three options to run flink jobs on k8s.
> > >
> > > [1]. Create jm/tm/service yaml and apply, then you will get a flink
> > > standalone cluster on k8s. Use flink run to submit job to the existed
> > flink
> > > cluster. Some companies may have their own deploy system to manage the
> > > flink cluster.
> > >
> > > [2]. Use flink-k8s-operator to manage multiple flink clusters,
> including
> > > session and perjob. It could manage the complete deployment lifecycle
> of
> > > the application. I think this option is really easy to use for the k8s
> > > users. They are familiar with k8s-opertor, kubectl and other tools of
> > k8s.
> > > They could debug and run the flink cluster just like other k8s
> > > applications.
> > >
> > > [3]. Natively integration with k8s, use the flink run or
> > > kubernetes-session.sh to start a flink cluster. It is very similar to
> > > submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks
> to
> > > k8s api server to start a flink master deployment of 1.
> > > KubernetesResourceManager dynamically allocates resource from k8s to
> > start
> > > task manager as demand. This option is very easy for flink users to get
> > > started. In the simplest case, we just need to update the '-m
> > yarn-cluster'
> > > to -m '-m kubernetes

[jira] [Created] (FLINK-13905) Separate checkpoint triggering into stages

2019-08-29 Thread Biao Liu (Jira)
Biao Liu created FLINK-13905:


 Summary: Separate checkpoint triggering into stages
 Key: FLINK-13905
 URL: https://issues.apache.org/jira/browse/FLINK-13905
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.10.0


Currently {{CheckpointCoordinator#triggerCheckpoint}} includes some heavy IO 
operations. We plan to separate the triggering into different stages. The IO 
operations are executed in IO threads, while other on-memory operations are not.

This is a preparation for making all on-memory operations of 
{{CheckpointCoordinator}} single threaded (in main thread).
Note that we could not put on-memory operations of triggering into main thread 
directly now. Because there are still some operations on a heavy lock 
(coordinator-wide).



--
This message was sent by Atlassian Jira
(v8.3.2#803003)