[jira] [Created] (FLINK-11282) Merge StreamRecordWriter into RecordWriter

2019-01-08 Thread zhijiang (JIRA)
zhijiang created FLINK-11282:


 Summary: Merge StreamRecordWriter into RecordWriter
 Key: FLINK-11282
 URL: https://issues.apache.org/jira/browse/FLINK-11282
 Project: Flink
  Issue Type: Sub-task
  Components: Network
Reporter: zhijiang
Assignee: zhijiang
 Fix For: 1.8.0


{{StreamRecordWriter}} is only used for streaming job which extends 
{{RecordWriter}}. The only difference in {{StreamRecordWriter}} is maintaining 
the {{OutputFlusher}} thread which can be migrated into {{RecordWriter}} 
because the {{flushAlways}} property in {{RecordWriter}} has relationship with 
{{OutputFlusher}}.

To do so, we can introduce the special {{BroadcastRecordWriter}} which extends 
{{RecordWriter}} for improving broadcast selector for 
[FLINK-10662|https://issues.apache.org/jira/browse/FLINK-10662] . And the 
{{RecordWriter}} division is unified for both streaming and batch jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[attention] now there is a test broken in CI environment

2019-01-08 Thread qianjin xu
[attention] now there is a test broken in CI environment, please do not
submit the test for the time being, otherwise the test will fail!!!The
error is shown below:
[image: image.png]

Best,
qianjin


[attention] now there is a test broken in CI environment

2019-01-08 Thread qianjin xu
[attention] now there is a test broken in CI environment, please do not
submit the test for the time being, otherwise the test will fail!!!The
error is shown below:

[image: .png]

Best,
qianjin


Re: [DISCUSS] Support Interactive Programming in Flink Table API

2019-01-08 Thread Piotr Nowojski
Hi Becket,

With `uncache` there are probably two features that we can think about:

a)

Physically dropping the cached table from the storage, freeing up the resources

b)

Hinting the optimizer to not cache the reads for the next query/table

a) Has the issue as I wrote before, that it seemed to be an operation 
inherently “flawed" with having side effects.

I’m not sure how it would be best to express. We could make it work:

1. via a method on a Table as you proposed:

void Table#dropCache()
void Table#uncache()

2. Operation on the environment

env.dropCacheFor(table) // or some other argument that allows user to identify 
the desired cache

3. Extending (from your original design doc) `setTableService` method to return 
some control handle like:

TableServiceControl setTableService(TableFactory tf, 
 TableProperties properties, 
 TempTableCleanUpCallback cleanUpCallback);

(TableServiceControl? TableService? TableServiceHandle? CacheService?)

And having the drop cache method there:

TableServiceControl#dropCache(table)

Out of those options, option 1 might have a disadvantage of kind of not making 
the user aware, that this is a global operation with side effects. Like the old 
example of:

public void foo(Table t) {
  // …
  t.dropCache();
}

It might not be immediately obvious that `t.dropCache()` is some kind of global 
operation, with side effects visible outside of the `foo` function.

On the other hand, both option 2 and 3, might have greater chance of catching 
user’s attention:

public void foo(Table t, CacheService cacheService) {
  // …
  cacheService.dropCache(t);
}

b) could be achieved quite easily:

Table a = …
val notCached1 = a.doNotCache()
val cachedA = a.cache()
val notCached2 = cachedA.doNotCache() // equivalent of notCached1

`doNotCache()` would behave similarly to `cache()` - return a copy of the table 
with removed “cache” hint and/or added “never cache” hint.

Piotrek


> On 8 Jan 2019, at 03:17, Becket Qin  wrote:
> 
> Hi Piotr,
> 
> Thanks for the proposal and detailed explanation. I like the idea of
> returning a new hinted Table without modifying the original table. This
> also leave the room for users to benefit from future implicit caching.
> 
> Just to make sure I get the full picture. In your proposal, there will also
> be a 'void Table#uncache()' method to release the cache, right?
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> On Mon, Jan 7, 2019 at 11:50 PM Piotr Nowojski 
> wrote:
> 
>> Hi Becket!
>> 
>> After further thinking I tend to agree that my previous proposal (*Option
>> 2*) indeed might not be if would in the future introduce automatic caching.
>> However I would like to propose a slightly modified version of it:
>> 
>> *Option 4*
>> 
>> Adding `cache()` method with following signature:
>> 
>> Table Table#cache();
>> 
>> Without side-effects, and `cache()` call do not modify/change original
>> Table in any way.
>> It would return a copy of original table, with added hint for the
>> optimizer to cache the table, so that the future accesses to the returned
>> table might be cached or not.
>> 
>> Assuming that we are talking about a setup, where we do not have automatic
>> caching enabled (possible future extension).
>> 
>> Example #1:
>> 
>> ```
>> Table a = …
>> a.foo() // not cached
>> 
>> val cachedTable = a.cache();
>> 
>> cachedA.bar() // maybe cached
>> a.foo() // same as before - effectively not cached
>> ```
>> 
>> Both the first and the second `a.foo()` operations would behave in the
>> exactly same way. Again, `a.cache()` call doesn’t affect `a` itself. If `a`
>> was not hinted for caching before `a.cache();`, then both `a.foo()` calls
>> wouldn’t use cache.
>> 
>> Returned `cachedA` would be hinted with “cache” hint, so probably
>> `cachedA.bar()` would go through cache (unless optimiser decides the
>> opposite)
>> 
>> Example #2
>> 
>> ```
>> Table a = …
>> 
>> a.foo() // not cached
>> 
>> val b = a.cache();
>> 
>> a.foo() // same as before - effectively not cached
>> b.foo() // maybe cached
>> 
>> val c = b.cache();
>> 
>> a.foo() // same as before - effectively not cached
>> b.foo() // same as before - effectively maybe cached
>> c.foo() // maybe cached
>> ```
>> 
>> Now, assuming that we have some future “automatic caching optimisation”:
>> 
>> Example #3
>> 
>> ```
>> env.enableAutomaticCaching()
>> Table a = …
>> 
>> a.foo() // might be cached, depending if `a` was selected to automatic
>> caching
>> 
>> val b = a.cache();
>> 
>> a.foo() // same as before - might be cached, if `a` was selected to
>> automatic caching
>> b.foo() // maybe cached
>> ```
>> 
>> 
>> More or less this is the same behaviour as:
>> 
>> Table a = ...
>> val b = a.filter(x > 20)
>> 
>> calling `filter` hasn’t changed or altered `a` in anyway. If `a` was
>> previously filtered:
>> 
>> Table src = …
>> val a = src.filter(x > 20)
>> val b = a.filter(x > 20)
>> 
>> then yes, `a` and `b` will be the same. But the point is that neithe

Re: [DISCUSS] Detection Flink Backpressure

2019-01-08 Thread Piotr Nowojski
Hi,

> I think the direct and accurate way to monitor backpressure is stating how 
> many times "availableMemorySegments.wait(2000)" is triggered during 
> "LocalBufferPool#requestMemorySegment", and reflecting the ratio as 
> backpressure if not affect theperformance.

This could be an interesting idea, however it might be non trivial or 
impossible to define a time window for calculating the ration/tuning moving 
average resolution, to reliably handle different workloads. I would lean 
towards something like this:

backPressureMonitor.backPressurued(this.id);
availableMemorySegments.wait(2000);

Where `backPressureMonitor` is some global/job level entity, that collects the 
"back pressured” events from one side. There can be some periodic thread (from 
metrics?) to query the backPressureMonitor and process the back pressured 
events. For example report that task was back pressured if there was even one 
“back pressured” event in the the reporting interval.

From performance perspective, this could be dirt cheap. 
`backPressureMonitor.backPressured(id)`’s call (backed by for example 
`AtomicIntArray#incrementAndGet(id)` would be probably negligible compared to 
synchronisation and waiting on `availableMemorySegments`. On the other hand, 
reporting processing could be done by one thread per job per task manager.

Piotrek 

> On 7 Jan 2019, at 09:33, zhijiang  wrote:
> 
> Hi all,
> 
> The current thread strace sample can not cover all the cases. And the output 
> queue length might not be very accurate sometimes, because the last 
> BufferConsumer in each subpartition might not be fulfilled and still be 
> writable although the outqueu length is already equal or more (considering 
> event buffer) than the buffer pool size, but it is not backpressure in this 
> case.
> 
> I think the direct and accurate way to monitor backpressure is stating how 
> many times "availableMemorySegments.wait(2000)" is triggered during 
> "LocalBufferPool#requestMemorySegment", and reflecting the ratio as 
> backpressure if not affect theperformance.
> 
> Best,
> Zhijiang
> 
> 
> --
> From:Piotr Nowojski 
> Send Time:2019年1月4日(星期五) 17:01
> To:dev 
> Subject:Re: [DISCUSS] Detection Flink Backpressure
> 
> Hi,
> 
> In that case I think instead of fixing the current back pressure monitoring 
> mechanism, it would be better to replace it with a new one based on output 
> queues length. But I haven’t thought it through, especially with respect to 
> performance implications, however my gut feeling is that it should be 
> solvable in one way or another.
> 
> Piotrek
> 
>> On 3 Jan 2019, at 20:05, Ken Krugler  wrote:
>> 
>> There’s the related issue of Async I/O not showing up in back pressure 
>> reporting, also due to the same issue of threads not being sampled.
>> 
>> — Ken
>> 
>>> On Jan 3, 2019, at 10:25 AM, Jamie Grier  wrote:
>>> 
>>> One unfortunate problem with the current back-pressure detection mechanism
>>> is that it doesn't work well with all of our sources.  The problem is that
>>> some sources (Kinesis for sure) emit elements from threads Flink knows
>>> nothing about and therefore those stack traces aren't sampled.  The result
>>> is that you never see back-pressure detected in the first chain of a Flink
>>> job containing that source.
>>> 
>>> On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski  wrote:
>>> 
 Hi all,
 
 peiliping: I think your idea could be problematic for couple of reasons.
 Probably minor concern is that checkpoint time could be affected not only
 because of the back pressure, but also because how long does it take to
 actually perform the checkpoint. Bigger issues are that this bottleneck
 detection would be limited to only during checkpointing (what if one has
 checkpoints only once every 1 hour? Or none at all?) AND
 performance/bottlenecks may change significantly during checkpointing (for
 example writing state for the first operator to DFS can affect indirectly
 down stream operators).
 
 The idea of detecting back pressure/bottlenecks using output/input buffers
 is much more natural. Because in the end, almost by definition, if the
 output buffers are full, that means that the given task is back pressured.
 
 Both input and output queues length are already exposed via metrics, so
 developers have an access to raw data to manually calculate/detect
 bottlenecks. It would be actually nice to automatically aggregate those
 metrics and provide ready to use metrics: boolean flags whether
 task/stage/job are back pressured or not.
 
 Replacing current back pressure detection mechanism that probes the
 threads and checks which of them are waiting for buffers is another issues.
 Functionally it is equivalent to monitoring whether the output queues are
 full. This might be more hacky, but will give the same results, thus it
 wasn’t high on 

[jira] [Created] (FLINK-11283) Accessing the key when processing connected keyed stream

2019-01-08 Thread Truong Duc Kien (JIRA)
Truong Duc Kien created FLINK-11283:
---

 Summary: Accessing the key when processing connected keyed stream
 Key: FLINK-11283
 URL: https://issues.apache.org/jira/browse/FLINK-11283
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Truong Duc Kien


Currently, we can access the key when using \{{ KeyedProcessedFunction }} .

Simillar functionality would be very useful when processing connected keyed 
stream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Support Interactive Programming in Flink Table API

2019-01-08 Thread Becket Qin
Hi Piotr,

You are right. There might be two intuitive meanings when users call
'a.uncache()', namely:
1. release the resource
2. Do not use cache for the next operation.

Case (1) would likely be the dominant use case. So I would suggest we
dedicate uncache() method to case (1), i.e. for resource release, but not
for ignoring cache.

For case 2, i.e. explicitly ignoring cache (which is rare), users may use
something like 'hint("ignoreCache")'. I think this is better as it is a
little weird for users to call `a.uncache()` while they may not even know
if the table is cached at all.

Assuming we let `uncache()` to only release resource, one possibility is
using ref count to mitigate the side effect. That means a ref count is
incremented on `cache()` and decremented on `uncache()`. That means
`uncache()` does not physically release the resource immediately, but just
means the cache could be released.
That being said, I am not sure if this is really a better solution as it
seems a little counter intuitive. Maybe calling it releaseCache() help a
little bit?

Thanks,

Jiangjie (Becket) Qin




On Tue, Jan 8, 2019 at 5:36 PM Piotr Nowojski  wrote:

> Hi Becket,
>
> With `uncache` there are probably two features that we can think about:
>
> a)
>
> Physically dropping the cached table from the storage, freeing up the
> resources
>
> b)
>
> Hinting the optimizer to not cache the reads for the next query/table
>
> a) Has the issue as I wrote before, that it seemed to be an operation
> inherently “flawed" with having side effects.
>
> I’m not sure how it would be best to express. We could make it work:
>
> 1. via a method on a Table as you proposed:
>
> void Table#dropCache()
> void Table#uncache()
>
> 2. Operation on the environment
>
> env.dropCacheFor(table) // or some other argument that allows user to
> identify the desired cache
>
> 3. Extending (from your original design doc) `setTableService` method to
> return some control handle like:
>
> TableServiceControl setTableService(TableFactory tf,
>  TableProperties properties,
>  TempTableCleanUpCallback cleanUpCallback);
>
> (TableServiceControl? TableService? TableServiceHandle? CacheService?)
>
> And having the drop cache method there:
>
> TableServiceControl#dropCache(table)
>
> Out of those options, option 1 might have a disadvantage of kind of not
> making the user aware, that this is a global operation with side effects.
> Like the old example of:
>
> public void foo(Table t) {
>   // …
>   t.dropCache();
> }
>
> It might not be immediately obvious that `t.dropCache()` is some kind of
> global operation, with side effects visible outside of the `foo` function.
>
> On the other hand, both option 2 and 3, might have greater chance of
> catching user’s attention:
>
> public void foo(Table t, CacheService cacheService) {
>   // …
>   cacheService.dropCache(t);
> }
>
> b) could be achieved quite easily:
>
> Table a = …
> val notCached1 = a.doNotCache()
> val cachedA = a.cache()
> val notCached2 = cachedA.doNotCache() // equivalent of notCached1
>
> `doNotCache()` would behave similarly to `cache()` - return a copy of the
> table with removed “cache” hint and/or added “never cache” hint.
>
> Piotrek
>
>
> > On 8 Jan 2019, at 03:17, Becket Qin  wrote:
> >
> > Hi Piotr,
> >
> > Thanks for the proposal and detailed explanation. I like the idea of
> > returning a new hinted Table without modifying the original table. This
> > also leave the room for users to benefit from future implicit caching.
> >
> > Just to make sure I get the full picture. In your proposal, there will
> also
> > be a 'void Table#uncache()' method to release the cache, right?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Jan 7, 2019 at 11:50 PM Piotr Nowojski 
> > wrote:
> >
> >> Hi Becket!
> >>
> >> After further thinking I tend to agree that my previous proposal
> (*Option
> >> 2*) indeed might not be if would in the future introduce automatic
> caching.
> >> However I would like to propose a slightly modified version of it:
> >>
> >> *Option 4*
> >>
> >> Adding `cache()` method with following signature:
> >>
> >> Table Table#cache();
> >>
> >> Without side-effects, and `cache()` call do not modify/change original
> >> Table in any way.
> >> It would return a copy of original table, with added hint for the
> >> optimizer to cache the table, so that the future accesses to the
> returned
> >> table might be cached or not.
> >>
> >> Assuming that we are talking about a setup, where we do not have
> automatic
> >> caching enabled (possible future extension).
> >>
> >> Example #1:
> >>
> >> ```
> >> Table a = …
> >> a.foo() // not cached
> >>
> >> val cachedTable = a.cache();
> >>
> >> cachedA.bar() // maybe cached
> >> a.foo() // same as before - effectively not cached
> >> ```
> >>
> >> Both the first and the second `a.foo()` operations would behave in the
> >> exactly same way. Again, `a.cache()` call doesn’t affect `a` itself. If
> `a`
> >>

[jira] [Created] (FLINK-11284) RocksDBStateBackendMigrationTest.testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration fails on Travis

2019-01-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-11284:
-

 Summary: 
RocksDBStateBackendMigrationTest.testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration
 fails on Travis
 Key: FLINK-11284
 URL: https://issues.apache.org/jira/browse/FLINK-11284
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.8.0
Reporter: Till Rohrmann
 Fix For: 1.8.0


The 
{{RocksDBStateBackendMigrationTest.testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration}}
 fails on Travis.

{code}
11:35:52.592 [ERROR] Tests run: 50, Failures: 0, Errors: 2, Skipped: 2, Time 
elapsed: 2.138 s <<< FAILURE! - in 
org.apache.flink.contrib.streaming.state.RocksDBStateBackendMigrationTest
11:35:52.594 [ERROR] 
testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration[Incremental
 checkpointing: 
false](org.apache.flink.contrib.streaming.state.RocksDBStateBackendMigrationTest)
  Time elapsed: 0.092 s  <<< ERROR!
java.lang.UnsupportedOperationException: The serializer should have been 
reconfigured as a new instance; shouldn't be used.

11:35:52.596 [ERROR] 
testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration[Incremental
 checkpointing: 
true](org.apache.flink.contrib.streaming.state.RocksDBStateBackendMigrationTest)
  Time elapsed: 0.06 s  <<< ERROR!
java.lang.UnsupportedOperationException: The serializer should have been 
reconfigured as a new instance; shouldn't be used.

11:35:52.930 [INFO] 
11:35:52.930 [INFO] Results:
11:35:52.930 [INFO] 
11:35:52.930 [ERROR] Errors: 
11:35:52.930 [ERROR]   
RocksDBStateBackendMigrationTest>StateBackendMigrationTestBase.testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration:351->StateBackendMigrationTestBase.testKeySerializerUpgrade:401
 » UnsupportedOperation
11:35:52.930 [ERROR]   
RocksDBStateBackendMigrationTest>StateBackendMigrationTestBase.testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration:351->StateBackendMigrationTestBase.testKeySerializerUpgrade:401
 » UnsupportedOperation
11:35:52.930 [INFO] 
11:35:52.930 [ERROR] Tests run: 346, Failures: 0, Errors: 2, Skipped: 7
{code}

https://api.travis-ci.org/v3/job/476770419/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Support Interactive Programming in Flink Table API

2019-01-08 Thread Piotr Nowojski
Hi,

I think that introducing ref counting could be confusing and it will be error 
prone, since Flink-table’s users are not used to closing/releasing resources. I 
was more objecting placing the `uncache()`/`dropCache()`/`releaseCache()` 
(releaseCache sounds best to me) as a method in the “Table”. It might be not 
obvious that it will drop the cache for all of the usages of the given table. 
For example:

public void foo(Table t) {
 // …
 t.releaseCache();
}

public void bar(Table t) {
  // ...
}

Table a = …
val cachedA = a.cache()

foo(cachedA)
bar(cachedA)


My problem with above example is that `t.releaseCache()` call is not doing the 
best possible job in communicating to the user that it will have a side effects 
for other places, like `bar(cachedA)` call. Something like this might be a 
better (not perfect, but just a bit better):

public void foo(Table t, CacheService cacheService) {
 // …
 cacheService.releaseCacheFor(t);
}

Table a = …
val cachedA = a.cache()

foo(cachedA, env.getCacheService())
bar(cachedA)


Also from another perspective, maybe placing `releaseCache()` method in Table 
might not be the best separation of concerns - `releaseCache()` method seams 
significantly different compared to other existing methods.

Piotrek

> On 8 Jan 2019, at 12:28, Becket Qin  wrote:
> 
> Hi Piotr,
> 
> You are right. There might be two intuitive meanings when users call
> 'a.uncache()', namely:
> 1. release the resource
> 2. Do not use cache for the next operation.
> 
> Case (1) would likely be the dominant use case. So I would suggest we
> dedicate uncache() method to case (1), i.e. for resource release, but not
> for ignoring cache.
> 
> For case 2, i.e. explicitly ignoring cache (which is rare), users may use
> something like 'hint("ignoreCache")'. I think this is better as it is a
> little weird for users to call `a.uncache()` while they may not even know
> if the table is cached at all.
> 
> Assuming we let `uncache()` to only release resource, one possibility is
> using ref count to mitigate the side effect. That means a ref count is
> incremented on `cache()` and decremented on `uncache()`. That means
> `uncache()` does not physically release the resource immediately, but just
> means the cache could be released.
> That being said, I am not sure if this is really a better solution as it
> seems a little counter intuitive. Maybe calling it releaseCache() help a
> little bit?
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> 
> 
> On Tue, Jan 8, 2019 at 5:36 PM Piotr Nowojski  wrote:
> 
>> Hi Becket,
>> 
>> With `uncache` there are probably two features that we can think about:
>> 
>> a)
>> 
>> Physically dropping the cached table from the storage, freeing up the
>> resources
>> 
>> b)
>> 
>> Hinting the optimizer to not cache the reads for the next query/table
>> 
>> a) Has the issue as I wrote before, that it seemed to be an operation
>> inherently “flawed" with having side effects.
>> 
>> I’m not sure how it would be best to express. We could make it work:
>> 
>> 1. via a method on a Table as you proposed:
>> 
>> void Table#dropCache()
>> void Table#uncache()
>> 
>> 2. Operation on the environment
>> 
>> env.dropCacheFor(table) // or some other argument that allows user to
>> identify the desired cache
>> 
>> 3. Extending (from your original design doc) `setTableService` method to
>> return some control handle like:
>> 
>> TableServiceControl setTableService(TableFactory tf,
>> TableProperties properties,
>> TempTableCleanUpCallback cleanUpCallback);
>> 
>> (TableServiceControl? TableService? TableServiceHandle? CacheService?)
>> 
>> And having the drop cache method there:
>> 
>> TableServiceControl#dropCache(table)
>> 
>> Out of those options, option 1 might have a disadvantage of kind of not
>> making the user aware, that this is a global operation with side effects.
>> Like the old example of:
>> 
>> public void foo(Table t) {
>>  // …
>>  t.dropCache();
>> }
>> 
>> It might not be immediately obvious that `t.dropCache()` is some kind of
>> global operation, with side effects visible outside of the `foo` function.
>> 
>> On the other hand, both option 2 and 3, might have greater chance of
>> catching user’s attention:
>> 
>> public void foo(Table t, CacheService cacheService) {
>>  // …
>>  cacheService.dropCache(t);
>> }
>> 
>> b) could be achieved quite easily:
>> 
>> Table a = …
>> val notCached1 = a.doNotCache()
>> val cachedA = a.cache()
>> val notCached2 = cachedA.doNotCache() // equivalent of notCached1
>> 
>> `doNotCache()` would behave similarly to `cache()` - return a copy of the
>> table with removed “cache” hint and/or added “never cache” hint.
>> 
>> Piotrek
>> 
>> 
>>> On 8 Jan 2019, at 03:17, Becket Qin  wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> Thanks for the proposal and detailed explanation. I like the idea of
>>> returning a new hinted Table without modifying the original table. This
>>> also leave the room for users to benefit from future

[jira] [Created] (FLINK-11285) Change the modifier of CsvTableSourceFactoryBase.createTableSource and CsvTableSinkFactoryBase.createTableSink to private[flink]

2019-01-08 Thread Dian Fu (JIRA)
Dian Fu created FLINK-11285:
---

 Summary: Change the modifier of 
CsvTableSourceFactoryBase.createTableSource and 
CsvTableSinkFactoryBase.createTableSink to private[flink]
 Key: FLINK-11285
 URL: https://issues.apache.org/jira/browse/FLINK-11285
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Dian Fu
Assignee: Dian Fu


The modifier of CsvTableSourceFactoryBase.createTableSource and 
CsvTableSinkFactoryBase.createTableSink are currently protected. Users should 
not see these methods and they should use 
CsvBatchTableSinkFactory.createBatchTableSink, 
CsvAppendTableSinkFactory.createStreamTableSink, 
CsvBatchTableSourceFactory.createBatchTableSource, 
CsvAppendTableSourceFactory.createStreamTableSource instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Support Interactive Programming in Flink Table API

2019-01-08 Thread Becket Qin
Hi Piotr,

I don't think it is feasible to ask every third party library to have
method signature with CacheService as an argument.

And even that signature does not really solve the problem. Imagine function
foo() looks like following:

void foo(Table t) {
  ...
  t.cache(); // create cache for t
  ...
  env.getCacheService().releaseCacheFor(t); // release cache for t
}

>From function foo()'s perspective, it created a cache and released it.
However, if someone invokes foo like this:
{
  Table src = ...
  Table t = src.select(...).cache()
  foo(t)
  // t is uncached by foo() already.
}

So the "side effect" still exists.

I think the only safe way to ensure there is no side effect while sharing
the cache is to use ref count.

BTW, the discussion we are having here is exactly the reason that I prefer
option 3. From technical perspective option 3 solves all the concerns.

Thanks,

Jiangjie (Becket) Qin


On Tue, Jan 8, 2019 at 8:41 PM Piotr Nowojski  wrote:

> Hi,
>
> I think that introducing ref counting could be confusing and it will be
> error prone, since Flink-table’s users are not used to closing/releasing
> resources. I was more objecting placing the
> `uncache()`/`dropCache()`/`releaseCache()` (releaseCache sounds best to me)
> as a method in the “Table”. It might be not obvious that it will drop the
> cache for all of the usages of the given table. For example:
>
> public void foo(Table t) {
>  // …
>  t.releaseCache();
> }
>
> public void bar(Table t) {
>   // ...
> }
>
> Table a = …
> val cachedA = a.cache()
>
> foo(cachedA)
> bar(cachedA)
>
>
> My problem with above example is that `t.releaseCache()` call is not doing
> the best possible job in communicating to the user that it will have a side
> effects for other places, like `bar(cachedA)` call. Something like this
> might be a better (not perfect, but just a bit better):
>
> public void foo(Table t, CacheService cacheService) {
>  // …
>  cacheService.releaseCacheFor(t);
> }
>
> Table a = …
> val cachedA = a.cache()
>
> foo(cachedA, env.getCacheService())
> bar(cachedA)
>
>
> Also from another perspective, maybe placing `releaseCache()` method in
> Table might not be the best separation of concerns - `releaseCache()`
> method seams significantly different compared to other existing methods.
>
> Piotrek
>
> > On 8 Jan 2019, at 12:28, Becket Qin  wrote:
> >
> > Hi Piotr,
> >
> > You are right. There might be two intuitive meanings when users call
> > 'a.uncache()', namely:
> > 1. release the resource
> > 2. Do not use cache for the next operation.
> >
> > Case (1) would likely be the dominant use case. So I would suggest we
> > dedicate uncache() method to case (1), i.e. for resource release, but not
> > for ignoring cache.
> >
> > For case 2, i.e. explicitly ignoring cache (which is rare), users may use
> > something like 'hint("ignoreCache")'. I think this is better as it is a
> > little weird for users to call `a.uncache()` while they may not even know
> > if the table is cached at all.
> >
> > Assuming we let `uncache()` to only release resource, one possibility is
> > using ref count to mitigate the side effect. That means a ref count is
> > incremented on `cache()` and decremented on `uncache()`. That means
> > `uncache()` does not physically release the resource immediately, but
> just
> > means the cache could be released.
> > That being said, I am not sure if this is really a better solution as it
> > seems a little counter intuitive. Maybe calling it releaseCache() help a
> > little bit?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >
> > On Tue, Jan 8, 2019 at 5:36 PM Piotr Nowojski 
> wrote:
> >
> >> Hi Becket,
> >>
> >> With `uncache` there are probably two features that we can think about:
> >>
> >> a)
> >>
> >> Physically dropping the cached table from the storage, freeing up the
> >> resources
> >>
> >> b)
> >>
> >> Hinting the optimizer to not cache the reads for the next query/table
> >>
> >> a) Has the issue as I wrote before, that it seemed to be an operation
> >> inherently “flawed" with having side effects.
> >>
> >> I’m not sure how it would be best to express. We could make it work:
> >>
> >> 1. via a method on a Table as you proposed:
> >>
> >> void Table#dropCache()
> >> void Table#uncache()
> >>
> >> 2. Operation on the environment
> >>
> >> env.dropCacheFor(table) // or some other argument that allows user to
> >> identify the desired cache
> >>
> >> 3. Extending (from your original design doc) `setTableService` method to
> >> return some control handle like:
> >>
> >> TableServiceControl setTableService(TableFactory tf,
> >> TableProperties properties,
> >> TempTableCleanUpCallback cleanUpCallback);
> >>
> >> (TableServiceControl? TableService? TableServiceHandle? CacheService?)
> >>
> >> And having the drop cache method there:
> >>
> >> TableServiceControl#dropCache(table)
> >>
> >> Out of those options, option 1 might have a disadvantage of kind of not
> >> mak

Re: [DISCUSS] Support Interactive Programming in Flink Table API

2019-01-08 Thread Becket Qin
Just to clarify, when I say foo() like below, I assume that foo() must have
a way to release its own cache, so it must have access to table env.

void foo(Table t) {
  ...
  t.cache(); // create cache for t
  ...
  env.getCacheService().releaseCacheFor(t); // release cache for t
}

Thanks,

Jiangjie (Becket) Qin

On Tue, Jan 8, 2019 at 9:04 PM Becket Qin  wrote:

> Hi Piotr,
>
> I don't think it is feasible to ask every third party library to have
> method signature with CacheService as an argument.
>
> And even that signature does not really solve the problem. Imagine
> function foo() looks like following:
>
> void foo(Table t) {
>   ...
>   t.cache(); // create cache for t
>   ...
>   env.getCacheService().releaseCacheFor(t); // release cache for t
> }
>
> From function foo()'s perspective, it created a cache and released it.
> However, if someone invokes foo like this:
> {
>   Table src = ...
>   Table t = src.select(...).cache()
>   foo(t)
>   // t is uncached by foo() already.
> }
>
> So the "side effect" still exists.
>
> I think the only safe way to ensure there is no side effect while sharing
> the cache is to use ref count.
>
> BTW, the discussion we are having here is exactly the reason that I prefer
> option 3. From technical perspective option 3 solves all the concerns.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Tue, Jan 8, 2019 at 8:41 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I think that introducing ref counting could be confusing and it will be
>> error prone, since Flink-table’s users are not used to closing/releasing
>> resources. I was more objecting placing the
>> `uncache()`/`dropCache()`/`releaseCache()` (releaseCache sounds best to me)
>> as a method in the “Table”. It might be not obvious that it will drop the
>> cache for all of the usages of the given table. For example:
>>
>> public void foo(Table t) {
>>  // …
>>  t.releaseCache();
>> }
>>
>> public void bar(Table t) {
>>   // ...
>> }
>>
>> Table a = …
>> val cachedA = a.cache()
>>
>> foo(cachedA)
>> bar(cachedA)
>>
>>
>> My problem with above example is that `t.releaseCache()` call is not
>> doing the best possible job in communicating to the user that it will have
>> a side effects for other places, like `bar(cachedA)` call. Something like
>> this might be a better (not perfect, but just a bit better):
>>
>> public void foo(Table t, CacheService cacheService) {
>>  // …
>>  cacheService.releaseCacheFor(t);
>> }
>>
>> Table a = …
>> val cachedA = a.cache()
>>
>> foo(cachedA, env.getCacheService())
>> bar(cachedA)
>>
>>
>> Also from another perspective, maybe placing `releaseCache()` method in
>> Table might not be the best separation of concerns - `releaseCache()`
>> method seams significantly different compared to other existing methods.
>>
>> Piotrek
>>
>> > On 8 Jan 2019, at 12:28, Becket Qin  wrote:
>> >
>> > Hi Piotr,
>> >
>> > You are right. There might be two intuitive meanings when users call
>> > 'a.uncache()', namely:
>> > 1. release the resource
>> > 2. Do not use cache for the next operation.
>> >
>> > Case (1) would likely be the dominant use case. So I would suggest we
>> > dedicate uncache() method to case (1), i.e. for resource release, but
>> not
>> > for ignoring cache.
>> >
>> > For case 2, i.e. explicitly ignoring cache (which is rare), users may
>> use
>> > something like 'hint("ignoreCache")'. I think this is better as it is a
>> > little weird for users to call `a.uncache()` while they may not even
>> know
>> > if the table is cached at all.
>> >
>> > Assuming we let `uncache()` to only release resource, one possibility is
>> > using ref count to mitigate the side effect. That means a ref count is
>> > incremented on `cache()` and decremented on `uncache()`. That means
>> > `uncache()` does not physically release the resource immediately, but
>> just
>> > means the cache could be released.
>> > That being said, I am not sure if this is really a better solution as it
>> > seems a little counter intuitive. Maybe calling it releaseCache() help a
>> > little bit?
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> >
>> >
>> > On Tue, Jan 8, 2019 at 5:36 PM Piotr Nowojski 
>> wrote:
>> >
>> >> Hi Becket,
>> >>
>> >> With `uncache` there are probably two features that we can think about:
>> >>
>> >> a)
>> >>
>> >> Physically dropping the cached table from the storage, freeing up the
>> >> resources
>> >>
>> >> b)
>> >>
>> >> Hinting the optimizer to not cache the reads for the next query/table
>> >>
>> >> a) Has the issue as I wrote before, that it seemed to be an operation
>> >> inherently “flawed" with having side effects.
>> >>
>> >> I’m not sure how it would be best to express. We could make it work:
>> >>
>> >> 1. via a method on a Table as you proposed:
>> >>
>> >> void Table#dropCache()
>> >> void Table#uncache()
>> >>
>> >> 2. Operation on the environment
>> >>
>> >> env.dropCacheFor(table) // or some other argument that allows user to
>> >> identify the desired cache
>> >>
>> >> 3. Ext

[jira] [Created] (FLINK-11286) Support to send StreamStatus.IDLE for non-source operators

2019-01-08 Thread vinoyang (JIRA)
vinoyang created FLINK-11286:


 Summary: Support to send StreamStatus.IDLE for non-source 
operators 
 Key: FLINK-11286
 URL: https://issues.apache.org/jira/browse/FLINK-11286
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: vinoyang
Assignee: vinoyang


Currently, only stream source tasks can be marked as temporary idle. But many 
times, this approach has limitations.

Considering such a scenario, there is a DAG as follows: 
{{source->map->filter->flatmap->keyBy->window}}, with a degree of parallelism 
of 10. Among them, the watermark is not sent by the source operator, but is 
downstream, such as flatmap. Every source subtask will not be idle. However, 
after the filter, some pipelines generate "idle". For example, there are 3 
pipelines that will no longer have data sent downstream. At this time, we can't 
call the {{markAsTemporarilyIdle}} method to mark the current pipeline in the 
idle state. This will affect the downstream window.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11287) RocksDBListState creation should use actual registered state serializer instead of serializer provided by descriptor

2019-01-08 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-11287:
---

 Summary: RocksDBListState creation should use actual registered 
state serializer instead of serializer provided by descriptor
 Key: FLINK-11287
 URL: https://issues.apache.org/jira/browse/FLINK-11287
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.8.0


Currently, when creating a {{RocksDBListState}}, the element serializer wrapped 
by the {{RocksDBListState}} is retrieved from the state descriptor:


{code}

static  IS create(
    StateDescriptor stateDesc,
    Tuple2> 
registerResult,
    RocksDBKeyedStateBackend backend) {


    return (IS) new RocksDBListState<>(
        registerResult.f0,
        registerResult.f1.getNamespaceSerializer(),
        (TypeSerializer>) registerResult.f1.getStateSerializer(),
        (List) stateDesc.getDefaultValue(),
        backend);
}
{code}

This is incorrect, since new serializers retrieved from state descriptors have 
not been checked for compatibility with restored state.

Instead, the element serializer should be retrieved from the register result, 
which contains the actual state serializer registered in the state backend for 
state access (and has already been checked for compatibility / reconfigured 
appropriately).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11288) Add separate flink-ml for building fat jar

2019-01-08 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-11288:


 Summary: Add separate flink-ml for building fat jar
 Key: FLINK-11288
 URL: https://issues.apache.org/jira/browse/FLINK-11288
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Affects Versions: 1.7.1, 1.8.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.7.2, 1.8.0


Similar to sql-connectors the flink-ml fat jar , that is included in 
flink-dist, should be built in a separate module so that we can add proper 
licensing to it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11289) Rework example module structure to account for licensing

2019-01-08 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-11289:


 Summary: Rework example module structure to account for licensing
 Key: FLINK-11289
 URL: https://issues.apache.org/jira/browse/FLINK-11289
 Project: Flink
  Issue Type: Improvement
  Components: Build System, Examples
Affects Versions: 1.7.1, 1.8.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.7.2, 1.8.0


Some of our example bundle additional dependencies (like kafka). The example 
jars (that are deployed to maven central and included in flink-dist) should 
have proper licensing.
Otherwise we would have to disable deployment of these jars, and keep the 
flink-dist NOTICE file in sync with example dependencies manually.

One way to do this would be to add a separate module for each jar, or at the 
very least for those example that bundle dependencies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[DISCUSS] Checkpoint Failure process improvement

2019-01-08 Thread vino yang
Hi all,


Currently, the checkpoint's failure handling logic is somewhat confusing
(not focused), which makes some functions on existing code passive.

So I provide a design document to improve the Checkpoint failure process
logic.

This design document primarily describes how to improve checkpoint failure
handling logic and make it more clear.

Based on this, we introduce a CheckpointFailureManager, which makes the
checkpoint failure processing more flexible.

This mainly comes from the following appeals:


   -

   FLINK-4810[1]: Checkpoint Coordinator should fail ExecutionGraph after
   "n" unsuccessful checkpoints
   -

   FLINK-10074[3]: Allowable number of checkpoint failure
   -

   FLINK-10724[2]: Refactor failure handling in checkpoint coordinator


https://docs.google.com/document/d/1ce7RtecuTxcVUJlnU44hzcO2Dwq9g4Oyd8_biy94hJc/edit?usp=sharing

*Thanks to @Andrey Zagrebin for helping me review the documentation and
suggesting a lot of improvements.*

Feedback and comments are very welcome!

Best,
Vino

[1]: https://issues.apache.org/jira/browse/FLINK-4810

[2]: https://issues.apache.org/jira/browse/FLINK-10724
[3]: https://issues.apache.org/jira/browse/FLINK-10074


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2019-01-08 Thread Bowen Li
Thank you, Xuefu and Timo, for putting together the FLIP! I like that both
its scope and implementation plan are clear. Look forward to feedbacks from
the group.

I also added a few more complementary details in the doc.

Thanks,
Bowen


On Mon, Jan 7, 2019 at 8:37 PM Zhang, Xuefu  wrote:

> Thanks, Timo!
>
> I have started put the content from the google doc to FLIP-30 [1].
> However, please still keep the discussion along this thread.
>
> Thanks,
> Xuefu
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs
>
>
> --
> From:Timo Walther 
> Sent At:2019 Jan. 7 (Mon.) 05:59
> To:dev 
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
>
> Hi everyone,
>
> Xuefu and I had multiple iterations over the catalog design document
> [1]. I believe that it is in a good shape now to be converted into FLIP.
> Maybe we need a bit more explanation at some places but the general
> design would be ready now.
>
> The design document covers the following changes:
> - Unify external catalog interface and Flink's internal catalog in
> TableEnvironment
> - Clearly define a hierarchy of reference objects namely:
> "catalog.database.table"
> - Enable a tight integration with Hive + Hive data connectors as well as
> a broad integration with existing TableFactories and discovery mechanism
> - Make the catalog interfaces more feature complete by adding views and
> functions
>
> If you have any further feedback, it would be great to give it now
> before we convert it into a FLIP.
>
> Thanks,
> Timo
>
> [1]
>
> https://docs.google.com/document/d/1Y9it78yaUvbv4g572ZK_lZnZaAGjqwM_EhjdOv4yJtw/edit#
>
>
>
> Am 07.01.19 um 13:51 schrieb Timo Walther:
> > Hi Eron,
> >
> > thank you very much for the contributions. I merged the first little
> > bug fixes. For the remaining PRs I think we can review and merge them
> > soon. As you said, the code is agnostic to the details of the
> > ExternalCatalog interface and I don't expect bigger merge conflicts in
> > the near future.
> >
> > However, exposing the current external catalog interfaces to SQL
> > Client users would make it even more difficult to change the
> > interfaces in the future. So maybe I would first wait until the
> > general catalog discussion is over and the FLIP has been created. This
> > should happen shortly.
> >
> > We should definitely coordinate the efforts better in the future to
> > avoid duplicate work.
> >
> > Thanks,
> > Timo
> >
> >
> > Am 07.01.19 um 00:24 schrieb Eron Wright:
> >> Thanks Timo for merging a couple of the PRs.   Are you also able to
> >> review the others that I mentioned? Xuefu I would like to incorporate
> >> your feedback too.
> >>
> >> Check out this short demonstration of using a catalog in SQL Client:
> >> https://asciinema.org/a/C8xuAjmZSxCuApgFgZQyeIHuo
> >>
> >> Thanks again!
> >>
> >> On Thu, Jan 3, 2019 at 9:37 AM Eron Wright  >> > wrote:
> >>
> >> Would a couple folks raise their hand to make a review pass thru
> >> the 6 PRs listed above?  It is a lovely stack of PRs that is 'all
> >> green' at the moment.   I would be happy to open follow-on PRs to
> >> rapidly align with other efforts.
> >>
> >> Note that the code is agnostic to the details of the
> >> ExternalCatalog interface; the code would not be obsolete if/when
> >> the catalog interface is enhanced as per the design doc.
> >>
> >>
> >>
> >> On Wed, Jan 2, 2019 at 1:35 PM Eron Wright  >> > wrote:
> >>
> >> I propose that the community review and merge the PRs that I
> >> posted, and then evolve the design thru 1.8 and beyond.  I
> >> think having a basic infrastructure in place now will
> >> accelerate the effort, do you agree?
> >>
> >> Thanks again!
> >>
> >> On Wed, Jan 2, 2019 at 11:20 AM Zhang, Xuefu
> >> mailto:xuef...@alibaba-inc.com>>
> >> wrote:
> >>
> >> Hi Eron,
> >>
> >> Happy New Year!
> >>
> >> Thank you very much for your contribution, especially
> >> during the holidays. Wile I'm encouraged by your work, I'd
> >> also like to share my thoughts on how to move forward.
> >>
> >> First, please note that the design discussion is still
> >> finalizing, and we expect some moderate changes,
> >> especially around TableFactories. Another pending change
> >> is our decision to shy away from scala, which our work
> >> will be impacted by.
> >>
> >> Secondly, while your work seemed about plugging in
> >> catalogs definitions to the execution environment, which
> >> is less impacted by TableFactory change, I did notice some
> >> duplication of your work and ours. This is no big deal,
> >> but going forward, we should probable have a be

[jira] [Created] (FLINK-11290) BatchTableEnvironment and StreamTableEnvironment should transparent to users

2019-01-08 Thread sunjincheng (JIRA)
sunjincheng created FLINK-11290:
---

 Summary: BatchTableEnvironment and StreamTableEnvironment should 
transparent to users
 Key: FLINK-11290
 URL: https://issues.apache.org/jira/browse/FLINK-11290
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.8.0
Reporter: sunjincheng


Users only care about their own business logic, do not care about the different 
implementation of stream and batch TableEnvironment. 
 From:
{code:java}
ExecutionEnvironment env = ...
BatchTableEnvironment tEnv = 
  TableEnvironment.getTableEnvironment(env);
{code}
To:
{code:java}
ExecutionEnvironment env = …
TableEnvironment tEnv = TableEnvironment.getTableEnvironment(env)
{code}
 

The google doc: 
[https://docs.google.com/document/d/1t-AUGuaChADddyJi6e0WLsTDEnf9ZkupvvBiQ4yTTEI/edit#|https://docs.google.com/document/d/1t-AUGuaChADddyJi6e0WLsTDEnf9ZkupvvBiQ4yTTEI/edit]


 The DISCUSS thread: 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Enhance-convenience-of-TableEnvironment-in-TableAPI-SQL-td25825.html#a25839]

Any feedback is welcome!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11291) Reorganize the package of flink-cep module

2019-01-08 Thread Dian Fu (JIRA)
Dian Fu created FLINK-11291:
---

 Summary: Reorganize the package of flink-cep module
 Key: FLINK-11291
 URL: https://issues.apache.org/jira/browse/FLINK-11291
 Project: Flink
  Issue Type: Task
  Components: CEP
Reporter: Dian Fu
Assignee: Dian Fu


Currently, there is no package such as org.apache.flink.cep.api in flink-cep 
module and the API classes are located in a few of packages, e.g. 
org.apache.flink.cep.nfa.aftermatch, org.apache.flink.cep.pattern, 
org.apache.flink.cep, etc. Personally I think we should add an api package in 
flink-cep module and move APIs to that package. Any feedback will be welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)