Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

2018-06-08 Thread Shuyi Chen
Thanks a lot for the comments, Till and Fabian.

The RemoteEnvrionment does provide a way to specify jar files at
construction, but we want the jar files to be specified dynamically in the
user code, e.g. in a DDL statement, and the jar files might be in a remote
DFS. As we discussed, I think there are 2 approaches:

1) add new interface env.registerJarFile(jarFiles...), which ships the JAR
files using JobGraph.addJar(). In this case, all jars will be loaded by
default at runtime. This approach will be the same as how SQL client ship
UDF jars now.
2) add new interface env.registerJarFile(name, jarFiles...). It will do
similar things as env.registerCachedFile(), which will register a set of
Jar files with a key name, and we can add a new interface in
RuntimeContext as Fabian suggests, i.e.,
RuntimeContext.getClassloaderWithJar(). Now user will be able to
load the functions in remote jar dynamically using the returned ClassLoader.

Comparing the 2 approaches:

   - Approach 1) will be simpler for user to use.
   - Approach 2) will allow us to use different versions of a class in the
   same code, and might solve some dependency conflict issues. Also in 2), we
   can load Jars on demand, while in 1) all jars will be loaded by default.

I think we can support both interfaces. On the SQL DDL implementation, both
will work and approach 2) will be more complicated, but with some nice
benefit as stated above. However, the implementation choice should be
transparent to the end user. Also, I am wondering outside of the SQL DDL,
will these new functionality/interface be helpful in other scenarios?
Maybe, that will help make the interface better and more generic. Thanks a
lot.

Shuyi

On Tue, Jun 5, 2018 at 1:47 AM Fabian Hueske  wrote:

> We could also offer a feature that users can request classloaders with
> additional jars.
> This could work as follows:
>
> 1) Users register jar files in the ExecutionEnvironment (similar to cached
> files) with a name, e.g., env.registerJarFile("~/myJar.jar", "myName");
> 2) In a function, the user can request a user classloader with the
> additional classes, e.g., RuntimeContext.getClassloaderWithJar("myName");
> This could also support to load multiple jar files in the same classloader.
>
> IMO, the interesting part of Shuyi's proposal is to be able to dynamically
> load code from remote locations without fetching it to the client first.
>
> Best, Fabian
>
>
> 2018-05-29 12:42 GMT+02:00 Till Rohrmann :
>
> > I see Shuyi's point that it would nice to allow adding jar files which
> > should be part of the user code classloader programmatically. Actually,
> we
> > expose this functionality in the `RemoteEnvironment` where you can
> specify
> > additional jars which shall be shipped to the cluster in the
> constructor. I
> > assume that is exactly the functionality you are looking for. In that
> > sense, it might be an API inconsistency that we allow it for some cases
> and
> > for others not.
> >
> > But I could also see that the whole functionality of dynamically loading
> > jars at runtime could also perfectly live in the `UdfSqlOperator`. This,
> of
> > course, would entail that one has to take care of clean up of the
> > downloaded resources. But it should be possible to first download the
> > resources and create a custom URLClassLoader at startup and then use this
> > class loader when calling into the UDF.
> >
> > Cheers,
> > Till
> >
> > On Wed, May 16, 2018 at 9:28 PM, Shuyi Chen  wrote:
> >
> > > Hi Aljoscha, Fabian, Rong, Ted and Timo,
> > >
> > > Thanks a lot for the feedback. Let me clarify the usage scenario in a
> bit
> > > more detail. The context is that we want to add support for SQL DDL to
> > load
> > > UDF from external JARs located either in local filesystem or HDFS or a
> > HTTP
> > > endpoint in Flink SQL. The local FS option is more for debugging
> purpose
> > > for user to submit the job jar locally, and the later 2 are for
> > production
> > > uses. Below is an example User application with the *CREATE FUNCTION*
> DDL
> > > (Note: grammar and interface not finalized yet).
> > >
> > > 
> > > -
> > >
> > >
> > >
> > >
> > > *val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv =
> > > TableEnvironment.getTableEnvironment(env)// setup the
> > DataStream//..*
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > *// register the DataStream under the name
> > > "OrderAtEnv.registerDataStream("OrderA", orderA, 'user, 'product,
> > > 'amount)tEnv.sqlDDL(  "create function helloFunc as
> > > 'com.example.udf.HelloWorld' using jars
> > > ('hdfs:///users/david/libraries/my-udf-1.0.1-SNAPSHOT.jar')")val result
> > =
> > > tEnv.sqlQuery(  "SELECT user, helloFunc(product), amount FROM OrderA
> > WHERE
> > > amount > 2")result.toAppendStream[Order].print()env.execute()*
> > > 
>

[jira] [Created] (FLINK-9555) Support table api in scala shell

2018-06-08 Thread Jeff Zhang (JIRA)
Jeff Zhang created FLINK-9555:
-

 Summary: Support table api in scala shell
 Key: FLINK-9555
 URL: https://issues.apache.org/jira/browse/FLINK-9555
 Project: Flink
  Issue Type: New Feature
  Components: Scala Shell
Affects Versions: 1.5.0
Reporter: Jeff Zhang


It would be nice to have table api available in scala shell so that user can 
experience table api in interactive way. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9556) Scala TypeAnalyzer does not consider ResultTypeQueryable

2018-06-08 Thread Timo Walther (JIRA)
Timo Walther created FLINK-9556:
---

 Summary: Scala TypeAnalyzer does not consider ResultTypeQueryable
 Key: FLINK-9556
 URL: https://issues.apache.org/jira/browse/FLINK-9556
 Project: Flink
  Issue Type: Bug
  Components: Scala API
Reporter: Timo Walther
Assignee: Timo Walther


Scala's TypeAnalyzer does not consider the ResultTypeQueryable interface. This 
means that type information that has been specified manually is ignored in many 
cases.

For example:
{code}
val myConsumer: FlinkKafkaConsumer010[Row] = new FlinkKafkaConsumer010(topics, 
new MyDeserializationSchema(jsonSchema), properties)
 val inputStream: DataStream[Row] = env.addSource(myConsumer)
{code}

The row type is always generic even though a type information has been 
specified in the DeserializationSchema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9557) FlinkTypeFactory should support BigInteger type

2018-06-08 Thread JIRA
Dominik Wosiński created FLINK-9557:
---

 Summary: FlinkTypeFactory should support BigInteger type
 Key: FLINK-9557
 URL: https://issues.apache.org/jira/browse/FLINK-9557
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.5.0
Reporter: Dominik Wosiński


Currently, `FlinkTypeFactory` method `typeInfoToSqlTypeName` does not support 
BigInteger, since this is default type returned by `JsonSchemaConverter` for 
all fields with type: `number` this can create issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9558) Memory leaks during usage with disabled checkpointing

2018-06-08 Thread Rinat Sharipov (JIRA)
Rinat Sharipov created FLINK-9558:
-

 Summary: Memory leaks during usage with disabled checkpointing
 Key: FLINK-9558
 URL: https://issues.apache.org/jira/browse/FLINK-9558
 Project: Flink
  Issue Type: Bug
  Components: filesystem-connector
Affects Versions: 1.3.0
Reporter: Rinat Sharipov


Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
using Bucketing-Sink.
For some reasons, those jobs are running without checkpointing. For now, it not 
a big problem for us, if some files are remained opened in case of job 
reloading.
 
Periodically, those jobs fail with *OutOfMemory* exception, and seems, that I 
found a strange thing in the implementation of BucketingSink.
 
During the sink lifecycle, we have a state object, implemented as a map, where 
key is a bucket path, and value is a state, that contains information about 
opened files and list of pending files.
After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 
Mb.
 
I’ve looked through the code, and found, that we removing the buckets from the 
state, in *notifyCheckpointComplete* method. 
{code:java}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
  Iterator>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
  while (bucketStatesIt.hasNext()) {
       if (!bucketState.isWriterOpen &&
   bucketState.pendingFiles.isEmpty() &&
   bucketState.pendingFilesPerCheckpoint.isEmpty()) {

   // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
   // Therefore this bucket is currently inactive and we can remove it from 
our state.
   bucketStatesIt.remove();
}
    }
}
{code}
So, this looks like an issue, when you are using this sink in checkpointless 
environment, because the data always added to the state, but never removed.
 
Of course, we could enable checkpointing, and use one of available backends, 
but as for me, it seems like a non expected behaviour, like I have an 
opportunity to run the job without checkpointing, but really, if I do so, I got 
an exception in sink component.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Flink 1.6 features

2018-06-08 Thread Stephan Ewen
Hi all!

Thanks for the discussion and good input. Many suggestions fit well with
the proposal above.

Please bear in mind that with a time-based release model, we would release
whatever is mature by end of July.
The good thing is we could schedule the next release not too far after
that, so that the features that did not quite make it will not be delayed
too long.
In some sense, you could read this as as "*what to do first*" list, rather
than "*this goes in, other things stay out"*.

Some thoughts on some of the suggestions

*Kubernetes integration:* An opaque integration with Kubernetes should be
supported through the "as a library" mode. For a deeper integration, I know
that some committers have experimented with some PoC code. I would let Till
add some thoughts, he has worked the most on the deployment parts recently.

*Per partition watermarks with idleness:* Good point, could one implement
that on the current interface, with a periodic watermark extractor?

*Atomic cancel-with-savepoint:* Agreed, this is important. Making this work
with all sources needs a bit more work. We should have this in the roadmap.

*Elastic Bloomfilters:* This seems like an interesting new feature - the
above suggested feature set was more about addressing some longer standing
issues/requests. However, nothing should prevent contributors to work on
that.

Best,
Stephan


On Wed, Jun 6, 2018 at 6:23 AM, Yan Zhou [FDS Science] 
wrote:

> +1 on https://issues.apache.org/jira/browse/FLINK-5479
> [FLINK-5479] Per-partition watermarks in ...
> 
> issues.apache.org
> Reported in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.
> nabble.com/Kafka-topic-partition-skewness-causes-watermark-
> not-being-emitted-td11008.html It's normally not a common case to have
> Kafka partitions not producing any data, but it'll probably be good to
> handle this as well. I ...
>
> --
> *From:* Rico Bergmann 
> *Sent:* Tuesday, June 5, 2018 9:12:00 PM
> *To:* Hao Sun
> *Cc:* dev@flink.apache.org; user
> *Subject:* Re: [DISCUSS] Flink 1.6 features
>
> +1 on K8s integration
>
>
>
> Am 06.06.2018 um 00:01 schrieb Hao Sun :
>
> adding my vote to K8S Job mode, maybe it is this?
> > Smoothen the integration in Container environment, like "Flink as a
> Library", and easier integration with Kubernetes services and other proxies.
>
>
>
> On Mon, Jun 4, 2018 at 11:01 PM Ben Yan 
> wrote:
>
> Hi Stephan,
>
> Will  [ https://issues.apache.org/jira/browse/FLINK-5479 ]
> (Per-partition watermarks in FlinkKafkaConsumer should consider idle
> partitions) be included in 1.6? As we are seeing more users with this
> issue on the mailing lists.
>
> Thanks.
> Ben
>
> 2018-06-05 5:29 GMT+08:00 Che Lui Shum :
>
> Hi Stephan,
>
> Will FLINK-7129 (Support dynamically changing CEP patterns) be included in
> 1.6? There were discussions about possibly including it in 1.6:
> http://mail-archives.apache.org/mod_mbox/flink-user/201803.
> mbox/%3cCAMq=OU7gru2O9JtoWXn1Lc1F7NKcxAyN6A3e58kxctb4b508RQ@
> mail.gmail.com%3e
>
> Thanks,
> Shirley Shum
>
> [image: Inactive hide details for Stephan Ewen ---06/04/2018 02:21:47
> AM---Hi Flink Community! The release of Apache Flink 1.5 has happ]Stephan
> Ewen ---06/04/2018 02:21:47 AM---Hi Flink Community! The release of Apache
> Flink 1.5 has happened (yay!) - so it is a good time
>
> From: Stephan Ewen 
> To: dev@flink.apache.org, user 
> Date: 06/04/2018 02:21 AM
> Subject: [DISCUSS] Flink 1.6 features
> --
>
>
>
> Hi Flink Community!
>
> The release of Apache Flink 1.5 has happened (yay!) - so it is a good time
> to start talking about what to do for release 1.6.
>
> *== Suggested release timeline ==*
>
> I would propose to release around *end of July* (that is 8-9 weeks from
> now).
>
> The rational behind that: There was a lot of effort in release testing
> automation (end-to-end tests, scripted stress tests) as part of release
> 1.5. You may have noticed the big set of new modules under
> "flink-end-to-end-tests" in the Flink repository. It delayed the 1.5
> release a bit, and needs to continue as part of the coming release cycle,
> but should help make releasing more lightweight from now on.
>
> (Side note: There are also some nightly stress tests that we created and
> run at data Artisans, and where we are looking whether and in which way it
> would make sense to contribute them to Flink.)
>
> *== Features and focus areas ==*
>
> We had a lot of big and heavy features in Flink 1.5, with FLIP-6, the new
> network stack, recovery, SQL joins and client, ... Following something like
> a "tick-tock-model", I would suggest to focus the next release more on
> integrations, tooling, and reducing user friction.
>
> Of course, this does not mean that no other pull request gets reviewed, an
> no other topic will be examined - it is simply meant as a help to
> understand where to expect more activity during the next release cycl

Re: [DISCUSS] Flink 1.6 features

2018-06-08 Thread Elias Levy
Since wishes are free:

- Standalone cluster job isolation:
https://issues.apache.org/jira/browse/FLINK-8886
- Proper sliding window joins (not overlapping hoping window joins):
https://issues.apache.org/jira/browse/FLINK-6243
- Sharing state across operators:
https://issues.apache.org/jira/browse/FLINK-6239
- Synchronizing streams: https://issues.apache.org/jira/browse/FLINK-4558

Seconded:
- Atomic cancel-with-savepoint:
https://issues.apache.org/jira/browse/FLINK-7634
- Support dynamically changing CEP patterns :
https://issues.apache.org/jira/browse/FLINK-7129


On Fri, Jun 8, 2018 at 1:31 PM, Stephan Ewen  wrote:

> Hi all!
>
> Thanks for the discussion and good input. Many suggestions fit well with
> the proposal above.
>
> Please bear in mind that with a time-based release model, we would release
> whatever is mature by end of July.
> The good thing is we could schedule the next release not too far after
> that, so that the features that did not quite make it will not be delayed
> too long.
> In some sense, you could read this as as "*what to do first*" list,
> rather than "*this goes in, other things stay out"*.
>
> Some thoughts on some of the suggestions
>
> *Kubernetes integration:* An opaque integration with Kubernetes should be
> supported through the "as a library" mode. For a deeper integration, I know
> that some committers have experimented with some PoC code. I would let Till
> add some thoughts, he has worked the most on the deployment parts recently.
>
> *Per partition watermarks with idleness:* Good point, could one implement
> that on the current interface, with a periodic watermark extractor?
>
> *Atomic cancel-with-savepoint:* Agreed, this is important. Making this
> work with all sources needs a bit more work. We should have this in the
> roadmap.
>
> *Elastic Bloomfilters:* This seems like an interesting new feature - the
> above suggested feature set was more about addressing some longer standing
> issues/requests. However, nothing should prevent contributors to work on
> that.
>
> Best,
> Stephan
>
>
> On Wed, Jun 6, 2018 at 6:23 AM, Yan Zhou [FDS Science] 
> wrote:
>
>> +1 on https://issues.apache.org/jira/browse/FLINK-5479
>> [FLINK-5479] Per-partition watermarks in ...
>> 
>> issues.apache.org
>> Reported in ML: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-
>> skewness-causes-watermark-not-being-emitted-td11008.html It's normally
>> not a common case to have Kafka partitions not producing any data, but
>> it'll probably be good to handle this as well. I ...
>>
>> --
>> *From:* Rico Bergmann 
>> *Sent:* Tuesday, June 5, 2018 9:12:00 PM
>> *To:* Hao Sun
>> *Cc:* dev@flink.apache.org; user
>> *Subject:* Re: [DISCUSS] Flink 1.6 features
>>
>> +1 on K8s integration
>>
>>
>>
>> Am 06.06.2018 um 00:01 schrieb Hao Sun :
>>
>> adding my vote to K8S Job mode, maybe it is this?
>> > Smoothen the integration in Container environment, like "Flink as a
>> Library", and easier integration with Kubernetes services and other proxies.
>>
>>
>>
>> On Mon, Jun 4, 2018 at 11:01 PM Ben Yan 
>> wrote:
>>
>> Hi Stephan,
>>
>> Will  [ https://issues.apache.org/jira/browse/FLINK-5479 ]
>> (Per-partition watermarks in FlinkKafkaConsumer should consider idle
>> partitions) be included in 1.6? As we are seeing more users with this
>> issue on the mailing lists.
>>
>> Thanks.
>> Ben
>>
>> 2018-06-05 5:29 GMT+08:00 Che Lui Shum :
>>
>> Hi Stephan,
>>
>> Will FLINK-7129 (Support dynamically changing CEP patterns) be included
>> in 1.6? There were discussions about possibly including it in 1.6:
>> http://mail-archives.apache.org/mod_mbox/flink-user/201803.m
>> box/%3cCAMq=OU7gru2O9JtoWXn1Lc1F7NKcxAyN6A3e58kxctb4b508RQ@m
>> ail.gmail.com%3e
>>
>> Thanks,
>> Shirley Shum
>>
>> [image: Inactive hide details for Stephan Ewen ---06/04/2018 02:21:47
>> AM---Hi Flink Community! The release of Apache Flink 1.5 has happ]Stephan
>> Ewen ---06/04/2018 02:21:47 AM---Hi Flink Community! The release of Apache
>> Flink 1.5 has happened (yay!) - so it is a good time
>>
>> From: Stephan Ewen 
>> To: dev@flink.apache.org, user 
>> Date: 06/04/2018 02:21 AM
>> Subject: [DISCUSS] Flink 1.6 features
>> --
>>
>>
>>
>> Hi Flink Community!
>>
>> The release of Apache Flink 1.5 has happened (yay!) - so it is a good
>> time to start talking about what to do for release 1.6.
>>
>> *== Suggested release timeline ==*
>>
>> I would propose to release around *end of July* (that is 8-9 weeks from
>> now).
>>
>> The rational behind that: There was a lot of effort in release testing
>> automation (end-to-end tests, scripted stress tests) as part of release
>> 1.5. You may have noticed the big set of new modules under
>> "flink-end-to-end-tests" in the Flink repository. It delayed the 1.5
>> release a bit, and needs to continue as part of the coming release cycle,
>> but should help make re

[Proposal] Code Reviewer List

2018-06-08 Thread Yaz Sh
Hi Flink Community,

I am working with Flink for last couple of months and still learning it. 
Recently, I have started as a contributor in Flink to perform some minor 
changes and I am very excited to start working on bigger tasks. 
Questions I always have in mind, when I make a PR, are “Who should I tag in my 
PR for code review?” and “How long does it take to get a first response from 
the reviewer?” 

Usually if there is a Jira ticket reported on the task, I will tag reporter as 
well as people who commented on it. Also, I look at other PRs with similar 
topics and try to find out who is doing code review for those PRs.

Here I would like to share a proposal to create a "Code Reviewer List" as part 
of Flink contribution guideline. This list would provide name of code 
reviewers, their availabilities, topics of interest as well as turnaround time 
for initial code review. This will provide good starting point for new 
contributors like myself to find a code reviewer based on a topic he/she is 
contributing. 

I would appreciate if Flink community shares thoughts on this proposal!

Cheers,
Yazdan

[jira] [Created] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR

2018-06-08 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-9559:
--

 Summary: The type of a union of CHAR columns of different lengths 
should be VARCHAR
 Key: FLINK-9559
 URL: https://issues.apache.org/jira/browse/FLINK-9559
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Hequn Cheng
Assignee: Hequn Cheng


Currently, If the case-when expression has two branches which return string 
literal, redundant white spaces will be appended to the short string literal. 
For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the 
return value will be 'a ' of CHAR(3) instead of 'a'.

Although, this follows the behavior in strict SQL standard mode(SQL:2003). We 
should get the pragmatic return type in a real scenario without blank-padded. 

Happily, this problem has been fixed by 
[CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can 
upgrade calcite to the next release(1.17.0) and override {{RelDataTypeSystem}} 
in flink to configure the return type, i.e., making 
{{shouldConvertRaggedUnionTypesToVarying()}} return true.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9560) RateLimiting for FileSystem

2018-06-08 Thread Etienne CARRIERE (JIRA)
Etienne CARRIERE created FLINK-9560:
---

 Summary: RateLimiting for FileSystem
 Key: FLINK-9560
 URL: https://issues.apache.org/jira/browse/FLINK-9560
 Project: Flink
  Issue Type: Improvement
  Components: FileSystem
Affects Versions: 1.5.0
Reporter: Etienne CARRIERE


*Pain*: On our system, we see that during checkpoint , all the bandwidth is 
take to send the checkpoint to object storage (s3 in our case)

*Proposal* : After the creation of some limitation on Filesystem (mostly number 
of connections with the  tickets FLINK-8125/FLINK-8198/FLINK-9468), I propose 
to add ratelimiting "per Filesystem" .

Proposal of implementation : Modify LimitedConnectionsFileSystem to add a 
ratelimiter on both Input and OutputStream.

Current issue : I would like to use the guava RateLimiter that is a good 
ratelimiter but it is guava and so not included in flink (dependency clash with 
hadoop) if I am right. What would be a right strategy in this case ?

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)