Unbalanced processing of KeyedStream

2019-01-02 Thread Jozef Vilcek
Hello,

I am facing a problem where KeyedStream is purely parallelised on workers
for case where number of keys is close to parallelism.

Some workers process zero keys, some more than one. This is because of
`KeyGroupRangeAssignment.assignKeyToParallelOperator()` in
`KeyGroupStreamPartitioner` as I found out in this post:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html

I would like to find out what are my options here.
* is there a reason why custom partitioner can not be used in keyed stream?
* can there be an API support for creating keys correct KeyedStream
compatible keys? It would also make
`DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain
scenarios.
* any other option I have?

Many thanks in advance.

Best,
Jozef


Re: subscribe to flink dev maillist

2019-01-02 Thread Chesnay Schepler
Do subscribe to the dev mailing list, please send a mail to 
dev-subscr...@flink.apache.org


On 02.01.2019 06:14, Zhang Shaoquan wrote:

subscribe to flink dev maillist





Re: Apply for flink contributor permission

2019-01-02 Thread Chesnay Schepler

I've given you contributor permissions.

On 02.01.2019 05:37, Haibo Sun wrote:

Hi guys,
Could anyone kindly give me contributor permission? My JIRA username is
sunhaibotb.

Thanks,
Haibo





[jira] [Created] (FLINK-11248) Support Row/CRow state schema evolution

2019-01-02 Thread boshu Zheng (JIRA)
boshu Zheng created FLINK-11248:
---

 Summary: Support Row/CRow state schema evolution
 Key: FLINK-11248
 URL: https://issues.apache.org/jira/browse/FLINK-11248
 Project: Flink
  Issue Type: Sub-task
  Components: Type Serialization System
Reporter: boshu Zheng
Assignee: boshu Zheng






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-02 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-11249:
--

 Summary: FlinkKafkaProducer011 can not be migrated to 
FlinkKafkaProducer
 Key: FLINK-11249
 URL: https://issues.apache.org/jira/browse/FLINK-11249
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.7.1, 1.7.0
Reporter: Piotr Nowojski
 Fix For: 1.8.0


As reported by a user on the mailing list "How to migrate Kafka Producer ?" (on 
18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to 
{{FlinkKafkaProducer}} and the same problem can occur in the future Kafka 
producer versions/refactorings.

The issue is that {{ListState 
FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using 
java serializers and this is causing problems/collisions on 
{{FlinkKafkaProducer011.NextTransactionalIdHint}}  vs
{{FlinkKafkaProducer.NextTransactionalIdHint}}.

To fix that we probably need to release new versions of those classes, that 
will rewrite/upgrade this state field to a new one, that doesn't relay on java 
serialization. After this, we could drop the support for the old field and that 
in turn will allow users to upgrade from 0.11 connector to the universal one.

One bright side is that technically speaking our {{FlinkKafkaProducer011}} has 
the same compatibility matrix as the universal one (it's also forward & 
backward compatible with the same Kafka versions), so for the time being users 
can stick to {{FlinkKafkaProducer011}}.

FYI [~tzulitai] [~yanghua]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11250) fix thread lack when StreamTask switched from DEPLOYING to CANCELING

2019-01-02 Thread lamber-ken (JIRA)
lamber-ken created FLINK-11250:
--

 Summary: fix thread lack when StreamTask switched from DEPLOYING 
to CANCELING
 Key: FLINK-11250
 URL: https://issues.apache.org/jira/browse/FLINK-11250
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime, Streaming
Affects Versions: 1.7.1, 1.6.3, 1.5.6
Reporter: lamber-ken
Assignee: lamber-ken
 Fix For: 1.7.2


begin flink-1.5.x version, streamRecordWriters was created in StreamTask's 
constructor, which start OutputFlusher daemon thread. so when task switched 
from DEPLOYING to CANCELING state, the daemon thread will be lacked.

 

*reproducible example*
{code:java}
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env
.addSource(new SourceFunction() {
@Override
public void run(SourceContext ctx) throws Exception {

for (int i = 0; i < 1; i++) {
Thread.sleep(100);
ctx.collect("data " + i);
}
}

@Override
public void cancel() {

}
})
.addSink(new RichSinkFunction() {

@Override
public void open(Configuration parameters) throws Exception {
System.out.println(1 / 0);
}

@Override
public void invoke(String value, Context context) throws 
Exception {

}

}).setParallelism(2);


env.execute();

}{code}
*some useful log*
{code:java}
2019-01-02 03:03:47.525 [thread==> jobmanager-future-thread-2] 
executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) 
(74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from CREATED to SCHEDULED.
2019-01-02 03:03:47.526 [thread==> flink-akka.actor.default-dispatcher-5] 
slotpool.SlotPool#allocateSlot:326 Received slot request 
[SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] for task: Attempt #1 (Source: 
Custom Source (1/1)) @ (unassigned) - [SCHEDULED]
2019-01-02 03:03:47.527 [thread==> flink-akka.actor.default-dispatcher-5] 
slotpool.SlotSharingManager#createRootSlot:151 Create multi task slot 
[SlotRequestId{494e47eb8318e2c0a1db91dda6b8}] in slot 
[SlotRequestId{6d7f0173c1d48e5559f6a14b080ee817}].
2019-01-02 03:03:47.527 [thread==> flink-akka.actor.default-dispatcher-5] 
slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create 
single task slot [SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] in multi 
task slot [SlotRequestId{494e47eb8318e2c0a1db91dda6b8}] for group 
bc764cd8ddf7a0cff126f51c16239658.
2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] 
slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create 
single task slot [SlotRequestId{8a877431375df8aeadb2fd845cae15fc}] in multi 
task slot [SlotRequestId{494e47eb8318e2c0a1db91dda6b8}] for group 
0a448493b4782967b150582570326227.
2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] 
slotpool.SlotSharingManager#createRootSlot:151 Create multi task slot 
[SlotRequestId{56a36d3902ee1a7d0e2e84f50039c1ca}] in slot 
[SlotRequestId{dbf5c9fa39f1e5a0b34a4a8c10699ee5}].
2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] 
slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create 
single task slot [SlotRequestId{5929c12b52dccee682f86afbe1cff5cf}] in multi 
task slot [SlotRequestId{56a36d3902ee1a7d0e2e84f50039c1ca}] for group 
0a448493b4782967b150582570326227.
2019-01-02 03:03:47.529 [thread==> flink-akka.actor.default-dispatcher-5] 
executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) 
(74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from SCHEDULED to DEPLOYING.
2019-01-02 03:03:47.529 [thread==> flink-akka.actor.default-dispatcher-5] 
executiongraph.Execution#deploy:576 Deploying Source: Custom Source (1/1) 
(attempt #1) to localhost
2019-01-02 03:03:47.530 [thread==> flink-akka.actor.default-dispatcher-2] 
state.TaskExecutorLocalStateStoresManager#localStateStoreForSubtask:162 
Registered new local state store with configuration 
LocalRecoveryConfig{localRecoveryMode=false, 
localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories=[/tmp/localState/aid_AllocationID{7b5faad9073d7fac6759e40981197b8d}],
 jobID=06e76f6e31728025b22fdda9fadd6f01, 
jobVertexID=bc764cd8ddf7a0cff126f51c16239658, subtaskIndex=0}} for 
06e76f6e31728025b22fdda9fadd6f01 - bc764cd8ddf7a0cff126f51c16239658 - 0 under 
allocation id AllocationID{7b5faad9073d7fac6759e40981197b8d}.
2019-01-02 03:03:47.530 [thread==> flink-akka.actor.default-dispatcher-2] 
partition.ResultPartition#:172 Source: Custom Source (1/1) 
(74

[jira] [Created] (FLINK-11251) Incompatible metric name on prometheus reporter

2019-01-02 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-11251:
---

 Summary: Incompatible metric name on prometheus reporter
 Key: FLINK-11251
 URL: https://issues.apache.org/jira/browse/FLINK-11251
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.7.0, 1.6.3, 1.5.6
Reporter: Wei-Che Wei
Assignee: Wei-Che Wei


{code}
# HELP 
flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets
 currentOffsets (scope: 
taskmanager_job_task_operator_KafkaConsumer_topic_partition_4)
# TYPE 
flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets
 gauge
flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_4_currentOffsets{task_attempt_id="5137e35cf7319787f6cd627621fd2ea7",host="localhost",task_attempt_num="0",tm_id="e72a527652f5af1358bdbc0f5bf6f49d",partition="4",topic="rt_lookback_state",job_id="546cf6f0d1f0b818afd9697c612f715c",task_id="d7b1ad914351f9ee527267f51160",operator_id="d7b1ad914351f9ee527267f51160",operator_name="Source:_kafka_lookback_state_source",task_name="Source:_kafka_lookback_state_source",job_name="FlinkRuleMatchPipeline",subtask_index="7",}
 1.456090927E9
# HELP 
flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets
 committedOffsets (scope: 
taskmanager_job_task_operator_KafkaConsumer_topic_partition_24)
# TYPE 
flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets
 gauge
flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_24_committedOffsets{task_attempt_id="9b666af68ec4734b25937b8b94cc5c84",host="localhost",task_attempt_num="0",tm_id="e72a527652f5af1358bdbc0f5bf6f49d",partition="24",topic="rt_event",job_id="546cf6f0d1f0b818afd9697c612f715c",task_id="61252f73469d3ffba207c548d29a0267",operator_id="61252f73469d3ffba207c548d29a0267",operator_name="Source:_kafka_source",task_name="Source:_kafka_sourcesamplingparse_and_filter",job_name="FlinkRuleMatchPipeline",subtask_index="27",}
 3.001186523E9
{code}

This is a snippet from my flink prometheus reporter. It showed that kafka 
current offsets and committed offsets metric names changed after I migrated my 
flink job from 1.6.0 to 1.6.3.

The origin metrics name should not contain {{partition index}} in metric name, 
i.e. the metric name should be 
{{flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_currentOffsets}}
 and 
{{flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_committedOffsets}}.

After digging into the source code, I found that the incompatibility started 
from this [PR|https://github.com/apache/flink/pull/7095], because it overloaded 
a new {{getLogicalScope(CharacterFilter, char, int)}} and didn't override in 
{{GenericValueMetricGroup}} class.
When the tail metric group from a metric is {{GenericValueMetricGroup}} and 
this new {{getLogicalScope}} is called, i.e. calling 
{{FrontMetricGroup#getLogicalScope}}, the value group name will not be ignored, 
but it should be in previous released version.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSSION] Complete restart after successive failures

2019-01-02 Thread Piotr Nowojski
Hi Gyula,

Personally I do not see a problem with providing such an option of “clean 
restart” after N failures, especially if we set the default value for N to 
+infinity. However guys working more with Flink’s scheduling systems might have 
more to say about this.

Piotrek

> On 29 Dec 2018, at 13:36, Gyula Fóra  wrote:
> 
> Hi all!
> 
> In the past years while running Flink in production we have seen a huge
> number of scenarios when the Flink jobs can go into unrecoverable failure
> loops and only a complete manual restart helps.
> 
> This is in most cases due to memory leaks in the user program, leaking
> threads etc and it leads to a failure loop due to the fact that the job is
> restarted within the same JVM (Taskmanager). After the restart the leak
> gets worse and worse eventually crashing the TMs one after the other and
> never recovering.
> 
> These issues are extremely hard to debug (might only cause problems after a
> few failures) and can cause long lasting instabilities.
> 
> I suggest we enable an option that would trigger a complete restart every
> so many failures. This would release all containers (TM and JM) and restart
> everything.
> 
> The only argument against this I see is that this might further hide the
> root cause of the problem on the job/user side. While this is true a stuck
> production job with crashing TM is probably much worse out of these 2.
> 
> What do you think?
> 
> Gyula



[jira] [Created] (FLINK-11252) Download page contains irrelevant "Scala 2.11" column

2019-01-02 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-11252:


 Summary: Download page contains irrelevant "Scala 2.11" column
 Key: FLINK-11252
 URL: https://issues.apache.org/jira/browse/FLINK-11252
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Attachments: downloads.png

The download page has a "Scala 2.11" column, that was used in the past to 
provide distinct download links for different scala versions.
We currently however list releases separately for each scala version.
We should either remove the column title or refactor the download page to also 
have a "Scala 2.12" column.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Unbalanced processing of KeyedStream

2019-01-02 Thread Ken Krugler
FWIW, if you want exactly one record per operator, then this code 

 should generate key values that will be partitioned properly.

— Ken

> On Jan 2, 2019, at 12:16 AM, Jozef Vilcek  wrote:
> 
> Hello,
> 
> I am facing a problem where KeyedStream is purely parallelised on workers
> for case where number of keys is close to parallelism.
> 
> Some workers process zero keys, some more than one. This is because of
> `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in
> `KeyGroupStreamPartitioner` as I found out in this post:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html
> 
> I would like to find out what are my options here.
> * is there a reason why custom partitioner can not be used in keyed stream?
> * can there be an API support for creating keys correct KeyedStream
> compatible keys? It would also make
> `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain
> scenarios.
> * any other option I have?
> 
> Many thanks in advance.
> 
> Best,
> Jozef

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2019-01-02 Thread Zhang, Xuefu
Hi Eron,

Happy New Year!

Thank you very much for your contribution, especially during the holidays. Wile 
I'm encouraged by your work, I'd also like to share my thoughts on how to move 
forward.

First, please note that the design discussion is still finalizing, and we 
expect some moderate changes, especially around TableFactories. Another pending 
change is our decision to shy away from scala, which our work will be impacted 
by.

Secondly, while your work seemed about plugging in catalogs definitions to the 
execution environment, which is less impacted by TableFactory change, I did 
notice some duplication of your work and ours. This is no big deal, but going 
forward, we should probable have a better communication on the work assignment 
so as to avoid any possible duplication of work. On the other hand, I think 
some of your work is interesting and valuable for inclusion once we finalize 
the overall design.

Thus, please continue your research and experiment and let us know when you 
start working on anything so we can better coordinate.

Thanks again for your interest and contributions.

Thanks,
Xuefu




--
From:Eron Wright 
Sent At:2019 Jan. 1 (Tue.) 18:39
To:dev ; Xuefu 
Cc:Xiaowei Jiang ; twalthr ; piotr 
; Fabian Hueske ; suez1224 
; Bowen Li 
Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Hi folks, there's clearly some incremental steps to be taken to introduce 
catalog support to SQL Client, complementary to what is proposed in the 
Flink-Hive Metastore design doc.  I was quietly working on this over the 
holidays.   I posted some new sub-tasks, PRs, and sample code to FLINK-10744. 

What inspired me to get involved is that the catalog interface seems like a 
great way to encapsulate a 'library' of Flink tables and functions.  For 
example, the NYC Taxi dataset (TaxiRides, TaxiFares, various UDFs) may be 
nicely encapsulated as a catalog (TaxiData).   Such a library should be fully 
consumable in SQL Client.

I implemented the above.  Some highlights:
1. A fully-worked example of using the Taxi dataset in SQL Client via an 
environment file.
- an ASCII video showing the SQL Client in action:
https://asciinema.org/a/C8xuAjmZSxCuApgFgZQyeIHuo

- the corresponding environment file (will be even more concise once 
'FLINK-10696 Catalog UDFs' is merged):
https://github.com/EronWright/flink-training-exercises/blob/3be008d64be975ced0f1a7e3901a8c5353f72a7e/src/main/dist/conf/sql-client-defaults.yaml

- the typed API for standalone table applications:
https://github.com/EronWright/flink-training-exercises/blob/3be008d64be975ced0f1a7e3901a8c5353f72a7e/src/main/java/com/dataartisans/flinktraining/examples/table_java/examples/ViaCatalog.java#L50

2. Implementation of the core catalog descriptor and factory.  I realize that 
some renames may later occur as per the design doc, and would be happy to do 
that as a follow-up.
https://github.com/apache/flink/pull/7390

3. Implementation of a connect-style API on TableEnvironment to use catalog 
descriptor.
https://github.com/apache/flink/pull/7392

4. Integration into SQL-Client's environment file:
https://github.com/apache/flink/pull/7393

I realize that the overall Hive integration is still evolving, but I believe 
that these PRs are a good stepping stone. Here's the list (in bottom-up order):
- https://github.com/apache/flink/pull/7386
- https://github.com/apache/flink/pull/7388
- https://github.com/apache/flink/pull/7389
- https://github.com/apache/flink/pull/7390
- https://github.com/apache/flink/pull/7392
- https://github.com/apache/flink/pull/7393

Thanks and enjoy 2019!
Eron W


On Sun, Nov 18, 2018 at 3:04 PM Zhang, Xuefu  wrote:
Hi Xiaowei,

 Thanks for bringing up the question. In the current design, the properties for 
meta objects are meant to cover anything that's specific to a particular 
catalog and agnostic to Flink. Anything that is common (such as schema for 
tables, query text for views, and udf classname) are abstracted as members of 
the respective classes. However, this is still in discussion, and Timo and I 
will go over this and provide an update.

 Please note that UDF is a little more involved than what the current design 
doc shows. I'm still refining this part.

 Thanks,
 Xuefu


 --
 Sender:Xiaowei Jiang 
 Sent at:2018 Nov 18 (Sun) 15:17
 Recipient:dev 
 Cc:Xuefu ; twalthr ; piotr 
; Fabian Hueske ; suez1224 

 Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

 Thanks Xuefu for the detailed design doc! One question on the properties 
associated with the catalog objects. Are we going to leave them completely free 
form or we are going to set some standard for that? I think that the answer may 
depend on if we want to explore catalog specific optimization opportunities. In 
any case, I think that it might be helpful for standardize as much as possible 
into strongly 

Re: Unbalanced processing of KeyedStream

2019-01-02 Thread Jozef Vilcek
Thanks Ken. Yes, similar approach is suggested in post I shared in my
question. But to me it feels a bit hack-ish.
I would like to know if this is only solution with Flink or do I miss
something?
Can there be more API-ish support for such use-case from Flink? Is there a
reason why there is none? Or is there?

On Wed, Jan 2, 2019 at 5:29 PM Ken Krugler 
wrote:

> FWIW, if you want exactly one record per operator, then this code <
> https://github.com/ScaleUnlimited/flink-crawler/blob/ba06aa87226b4c44e30aba6df68984d53cc15519/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153>
> should generate key values that will be partitioned properly.
>
> — Ken
>
> > On Jan 2, 2019, at 12:16 AM, Jozef Vilcek  wrote:
> >
> > Hello,
> >
> > I am facing a problem where KeyedStream is purely parallelised on workers
> > for case where number of keys is close to parallelism.
> >
> > Some workers process zero keys, some more than one. This is because of
> > `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in
> > `KeyGroupStreamPartitioner` as I found out in this post:
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html
> >
> > I would like to find out what are my options here.
> > * is there a reason why custom partitioner can not be used in keyed
> stream?
> > * can there be an API support for creating keys correct KeyedStream
> > compatible keys? It would also make
> > `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain
> > scenarios.
> > * any other option I have?
> >
> > Many thanks in advance.
> >
> > Best,
> > Jozef
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


Re: Unbalanced processing of KeyedStream

2019-01-02 Thread Ken Krugler
Hi Jozef,

Processing just a few keys (# of keys ≅ # of operators) in Flink isn’t common, 
from what I’ve seen.

Another possible option is to broadcast all records, and then in each operator 
decide what records to process, based on the operator index and the key value.

Something like this in your operator's open() method:

public void open(Configuration parameters) throws Exception {
super.open(parameters);   
this.operatorIndex = getRuntimeContext().getIndexOfThisSubtask();
this.numOperators = getRuntimeContext().getIndexOfThisSubtask();
}

And in your operator’s processing method...

int hash = calcPositiveHashCode(key);
if ((hash % this.operatorIndex) == this.numOperators) { … }

— Ken

> On Jan 2, 2019, at 11:32 AM, Jozef Vilcek  wrote:
> 
> Thanks Ken. Yes, similar approach is suggested in post I shared in my
> question. But to me it feels a bit hack-ish.
> I would like to know if this is only solution with Flink or do I miss
> something?
> Can there be more API-ish support for such use-case from Flink? Is there a
> reason why there is none? Or is there?
> 
> On Wed, Jan 2, 2019 at 5:29 PM Ken Krugler 
> wrote:
> 
>> FWIW, if you want exactly one record per operator, then this code <
>> https://github.com/ScaleUnlimited/flink-crawler/blob/ba06aa87226b4c44e30aba6df68984d53cc15519/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153>
>> should generate key values that will be partitioned properly.
>> 
>> — Ken
>> 
>>> On Jan 2, 2019, at 12:16 AM, Jozef Vilcek  wrote:
>>> 
>>> Hello,
>>> 
>>> I am facing a problem where KeyedStream is purely parallelised on workers
>>> for case where number of keys is close to parallelism.
>>> 
>>> Some workers process zero keys, some more than one. This is because of
>>> `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in
>>> `KeyGroupStreamPartitioner` as I found out in this post:
>>> 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html
>>> 
>>> I would like to find out what are my options here.
>>> * is there a reason why custom partitioner can not be used in keyed
>> stream?
>>> * can there be an API support for creating keys correct KeyedStream
>>> compatible keys? It would also make
>>> `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain
>>> scenarios.
>>> * any other option I have?
>>> 
>>> Many thanks in advance.
>>> 
>>> Best,
>>> Jozef
>> 
>> --
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
>> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2019-01-02 Thread Eron Wright
I propose that the community review and merge the PRs that I posted, and
then evolve the design thru 1.8 and beyond.   I think having a basic
infrastructure in place now will accelerate the effort, do you agree?

Thanks again!

On Wed, Jan 2, 2019 at 11:20 AM Zhang, Xuefu 
wrote:

> Hi Eron,
>
> Happy New Year!
>
> Thank you very much for your contribution, especially during the holidays.
> Wile I'm encouraged by your work, I'd also like to share my thoughts on how
> to move forward.
>
> First, please note that the design discussion is still finalizing, and we
> expect some moderate changes, especially around TableFactories. Another
> pending change is our decision to shy away from scala, which our work will
> be impacted by.
>
> Secondly, while your work seemed about plugging in catalogs definitions to
> the execution environment, which is less impacted by TableFactory change, I
> did notice some duplication of your work and ours. This is no big deal, but
> going forward, we should probable have a better communication on the work
> assignment so as to avoid any possible duplication of work. On the other
> hand, I think some of your work is interesting and valuable for inclusion
> once we finalize the overall design.
>
> Thus, please continue your research and experiment and let us know when
> you start working on anything so we can better coordinate.
>
> Thanks again for your interest and contributions.
>
> Thanks,
> Xuefu
>
>
>
> --
> From:Eron Wright 
> Sent At:2019 Jan. 1 (Tue.) 18:39
> To:dev ; Xuefu 
> Cc:Xiaowei Jiang ; twalthr ;
> piotr ; Fabian Hueske ;
> suez1224 ; Bowen Li 
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
>
> Hi folks, there's clearly some incremental steps to be taken to introduce
> catalog support to SQL Client, complementary to what is proposed in the
> Flink-Hive Metastore design doc.  I was quietly working on this over the
> holidays.   I posted some new sub-tasks, PRs, and sample code
> to FLINK-10744.
>
> What inspired me to get involved is that the catalog interface seems like
> a great way to encapsulate a 'library' of Flink tables and functions.  For
> example, the NYC Taxi dataset (TaxiRides, TaxiFares, various UDFs) may be
> nicely encapsulated as a catalog (TaxiData).   Such a library should be
> fully consumable in SQL Client.
>
> I implemented the above.  Some highlights:
>
> 1. A fully-worked example of using the Taxi dataset in SQL Client via an
> environment file.
> - an ASCII video showing the SQL Client in action:
> https://asciinema.org/a/C8xuAjmZSxCuApgFgZQyeIHuo
>
> - the corresponding environment file (will be even more concise once
> 'FLINK-10696 Catalog UDFs' is merged):
> *https://github.com/EronWright/flink-training-exercises/blob/3be008d64be975ced0f1a7e3901a8c5353f72a7e/src/main/dist/conf/sql-client-defaults.yaml
> *
>
> - the typed API for standalone table applications:
> *https://github.com/EronWright/flink-training-exercises/blob/3be008d64be975ced0f1a7e3901a8c5353f72a7e/src/main/java/com/dataartisans/flinktraining/examples/table_java/examples/ViaCatalog.java#L50
> *
>
> 2. Implementation of the core catalog descriptor and factory.  I realize
> that some renames may later occur as per the design doc, and would be happy
> to do that as a follow-up.
> https://github.com/apache/flink/pull/7390
>
> 3. Implementation of a connect-style API on TableEnvironment to use
> catalog descriptor.
> https://github.com/apache/flink/pull/7392
>
> 4. Integration into SQL-Client's environment file:
> https://github.com/apache/flink/pull/7393
>
> I realize that the overall Hive integration is still evolving, but I
> believe that these PRs are a good stepping stone. Here's the list (in
> bottom-up order):
> - https://github.com/apache/flink/pull/7386
> - https://github.com/apache/flink/pull/7388
> - https://github.com/apache/flink/pull/7389
> - https://github.com/apache/flink/pull/7390
> - https://github.com/apache/flink/pull/7392
> - https://github.com/apache/flink/pull/7393
>
> Thanks and enjoy 2019!
> Eron W
>
>
> On Sun, Nov 18, 2018 at 3:04 PM Zhang, Xuefu 
> wrote:
> Hi Xiaowei,
>
> Thanks for bringing up the question. In the current design, the properties
> for meta objects are meant to cover anything that's specific to a
> particular catalog and agnostic to Flink. Anything that is common (such as
> schema for tables, query text for views, and udf classname) are abstracted
> as members of the respective classes. However, this is still in discussion,
> and Timo and I will go over this and provide an update.
>
> Please note that UDF is a little m

Apply for flink contributor permission

2019-01-02 Thread peibin wang
Hi guys,

Could anyone kindly give me the contributor permission?
My JIRA username is wangpeibin.

 

Thanks,
Peibin

 

 



[jira] [Created] (FLINK-11253) Incorrect way to stop yarn session described in yarn_setup document

2019-01-02 Thread Tao Yang (JIRA)
Tao Yang created FLINK-11253:


 Summary: Incorrect way to stop yarn session described in 
yarn_setup document
 Key: FLINK-11253
 URL: https://issues.apache.org/jira/browse/FLINK-11253
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Tao Yang


There are two ways to stop yarn session described in yarn_setup document:
{noformat}
Stop the YARN session by stopping the unix process (using CTRL+C) or by 
entering ‘stop’ into the client.
{noformat}
But in fact, yarn session application still can run after stopping the unix 
process (using CTRL+C).
We can either update the yarn_setup document to remove this incorrect way or 
add ShutdownHook to stop yarn session in FlinkYarnSessionCli to make it correct.
Looking forward to the feedbacks and would like to work on this ticket. Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11254) Unify serialization format of savepoint for switching state backends

2019-01-02 Thread Congxian Qiu (JIRA)
Congxian Qiu created FLINK-11254:


 Summary: Unify serialization format of savepoint for switching 
state backends
 Key: FLINK-11254
 URL: https://issues.apache.org/jira/browse/FLINK-11254
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.7.1
Reporter: Congxian Qiu
Assignee: Congxian Qiu


For the current version, the serialization formats of savepoint between 
HeapKeyedStateBackend and RocksDBStateBackend are different, so we can not 
switch state backend when using savepoint. We should unify the serialization 
formats of the savepoint to support state backend switch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11255) RemoteStreamEnvironment

2019-01-02 Thread Benjamin Lee (JIRA)
Benjamin Lee created FLINK-11255:


 Summary: RemoteStreamEnvironment
 Key: FLINK-11255
 URL: https://issues.apache.org/jira/browse/FLINK-11255
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.7.0
Reporter: Benjamin Lee


h1. Overview

Currently RemoteStreamEnvironment is not capable of submitting a job via 
detached mode. 
h1. Proposed Changes
 * Modify the signature of 

{code:java}
StreamExecutionEnvironment#createRemoteEnvironment(...) : 
StreamExecutionEnvironment{code}
to

{code:java}
StreamExecutionEnvironment#createRemoteEnvironment(...) : 
RemoteStreamEnvironment{code}

 * Add an public overloaded _execute_ method in _RemoteStreamEnvironment_ that 
allows detached executions.

There was a related Jira FLINK-6224 opened a while ago, but I don't think it 
makes sense to change the abstract class _StreamExecutionEnvironment_ since 
detached submission only applies to _RemoteStreamEnvironment_?

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[DISCUSS] Detection Flink Backpressure

2019-01-02 Thread 裴立平
Recently I want to optimize the way to find the positions where the
backpressures occured .

I read some blogs about flink-backpressure and have a rough idea of it .

The method which Flink adopted is thread-stack-sample ,  it's heavy and
no-lasting .

The positions where backpressures occured are very important to the
developers .

They should be treated as monitor-metrics .

Any other choice that we can take to detection the flink backpressures ?


[jira] [Created] (FLINK-11256) Referencing StreamNode objects directly in StreamEdge causes the sizes of JobGraph and TDD to become unnecessarily large

2019-01-02 Thread Haibo Suen (JIRA)
Haibo Suen created FLINK-11256:
--

 Summary: Referencing StreamNode objects directly in StreamEdge 
causes the sizes of JobGraph and TDD to become unnecessarily large
 Key: FLINK-11256
 URL: https://issues.apache.org/jira/browse/FLINK-11256
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.7.1, 1.7.0
Reporter: Haibo Suen
Assignee: Haibo Suen


When a job graph is generated from StreamGraph, StreamEdge(s) on the stream 
graph are serialized to StreamConfig and stored into the job graph. After that, 
the serialized bytes will be included in the TDD and distributed to TM. Because 
StreamEdge directly reference to StreamNode objects including sourceVertex and 
targetVertex, these objects are also written transitively on serializing 
StreamEdge. But these StreamNode objects are not needed at runtime. For a large 
size topology, this will causes JobGraph/TDD to become much larger than that 
actually need, and more likely to occur rpc timeout when transmitted.

In Streamedge, only the ID of StreamNode should be stored to avoid this 
situation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Detection Flink Backpressure

2019-01-02 Thread Yun Gao
Hello liping,

   Thank you for proposing to optimize the backpressure detection! From our 
previous experience, we think the InputBufferPoolUsageGauge and 
OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a list 
of tasks A ---> B > C, if we found that the OutputBufferPoolUsage of task A 
and InputBufferPoolUsage of task B is 100%, but the OutputBufferPoolUsage of 
task B is less than 100%, then it should be the task B that causes the 
backpressure. 

  However, currently we think that the InputBufferPoolUsage and 
OutputBufferPoolUsage requires some modification to be more accurate:
 
 1. When there are multiple inputs or outputs, the InputBufferPoolUsage 
and OutputBufferPoolUsage  should show the maximum usage instead of the average 
usage [1].
 2. Currently the sender side will report backlog right before 
fulfilling the output Buffer. Together with the pre-allocate logic in the 
receiver side, the InputBufferPoolUsage may be 100% even if the data have not 
been received yet [2].

 We may need to address these problems before adopting the 
InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure indicator.

 Besides, another similar thought is that we may also add new 
InputBufferUsage and OutputBufferUsage metrics to show (number of queued 
buffers / number of all buffers) instead.  


Best,
   Yun Gao


[1] https://issues.apache.org/jira/browse/FLINK-10981
[2] https://issues.apache.org/jira/browse/FLINK-11082


--
From:裴立平 
Send Time:2019 Jan. 3 (Thu.) 13:39
To:dev 
Subject:[DISCUSS] Detection Flink Backpressure

Recently I want to optimize the way to find the positions where the
backpressures occured .

I read some blogs about flink-backpressure and have a rough idea of it .

The method which Flink adopted is thread-stack-sample ,  it's heavy and
no-lasting .

The positions where backpressures occured are very important to the
developers .

They should be treated as monitor-metrics .

Any other choice that we can take to detection the flink backpressures ?



[jira] [Created] (FLINK-11257) FlinkKafkaConsumer should support assgin partition

2019-01-02 Thread shengjk1 (JIRA)
shengjk1 created FLINK-11257:


 Summary: FlinkKafkaConsumer should support assgin partition 
 Key: FLINK-11257
 URL: https://issues.apache.org/jira/browse/FLINK-11257
 Project: Flink
  Issue Type: New Feature
  Components: Kafka Connector
Affects Versions: 1.7.1
Reporter: shengjk1


i find flink 1.7 also has  universal Kafka connector ,if the kakfa-connector  
support assgin partition ,the the kakfa-connector should prefect.  such as a 
kafka topci has  3 partition, i only use 1 partition,but i should read all 
partition then filter.this method Not only waste resources but also relatively 
low efficiency.so i suggest FlinkKafkaConsumer should support assgin partition 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)