flink eventTime, lateness, maxoutoforderness

2017-12-16 Thread chen
eventTime, lateness,  maxoutoforderness are all about time.
event Time is the water mark time on the record.
lateness is record time or the real word time?
maxoutoforderness is record time or the real word time?

dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
 .allowedLateness(Time.seconds(5))
 .fold(initRow(), new MyFoldFunction())

public Watermark getCurrentWatermark() {
return new Watermark(currentTime - 5000);}

Does anyone could explain the time of eventTime,lateness,maxoutoforderness?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink eventTime, lateness, maxoutoforderness

2017-12-22 Thread chen
Hi Eron,
Thanks for your help. Actually I know maxoutoforder, lateness is based
on Event Time. But in my test it is not. Following is my code and test data.
 "key1|148325064|",
 "key1|1483250636000|",
 "key1|1483250649000|",
 "key1|1483250642000|",
 "key1|148325065|",
 "key1|1483250641000|",
 "key1|1483250653000|",
 "key1|1483250648000|",
 "key1|1483250645000|",
 "key1|1483250658000|",
 "key1|1483250647000|",
 "key1|1483250643000|",
 "key1|1483250661000|",
 "key1|1483250662000|",
 "key1|1483250667000|",
 "key1|1483250663000|",

 dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(5))
.fold(initRow(), new FoldFunction() {
@Override
public Row fold(Row ret, Row o) throws Exception {
ret.setField(0, (int)ret.getField(0) + 1);
ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
return  ret;
}
})

1. Send Data *WITHOUT*Thread.sleep(), the result is like this :
 1,1483250636000|
 4,148325064|1483250642000|1483250641000|1483250643000|
 4,1483250649000|1483250648000|1483250645000|1483250647000|
 2,148325065|1483250653000|
 1,1483250658000|
 3,1483250661000|1483250662000|1483250663000|
 1,1483250667000|
2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of allowedLateness, it will trigger the window to calculate again,
the result will come out again.
  1,1483250636000|
  1,148325064|
  2,148325064|1483250642000|
  1,1483250649000|
  2,1483250649000|1483250648000|
  3,1483250649000|1483250648000|1483250645000|
  2,148325065|1483250653000|
  1,1483250658000|
  2,1483250661000|1483250662000|
  3,1483250661000|1483250662000|1483250663000|
  1,1483250667000|




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink eventTime, lateness, maxoutoforderness

2017-12-22 Thread chen

CODE with maxOutOfOrdernesstime effect:
dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.fold(initRow(), new FoldFunction() {
@Override
public Row fold(Row ret, Row o) throws Exception {
ret.setField(0, (int)ret.getField(0) + 1);
ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
return  ret;
}
});
 public Watermark getCurrentWatermark(){
  return new Watermark(currentTime - 5000);}

1. Send Data *WITHOUT*Thread.sleep(), the result is like this :  
1,1483250636000|
4,148325064|1483250642000|1483250641000|1483250643000|
4,1483250649000|1483250648000|1483250645000|1483250647000|
2,148325065|1483250653000|
1,1483250658000|
3,1483250661000|1483250662000|1483250663000|
1,1483250667000|

2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of maxOutOfOrdernesstime, it will delay calculate, then coming out
result.
1,1483250636000|
2,148325064|1483250642000|
3,1483250649000|1483250648000|1483250645000|
2,148325065|1483250653000|
1,1483250658000|
3,1483250661000|1483250662000|1483250663000|
1,1483250667000|

I don`t know how to explain the eventTime, lateness, maxOutOfOrderness.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink eventTime, lateness, maxoutoforderness

2017-12-22 Thread chen
Thanks Gordon, Please see the rely. I use code, but the result it doesn`t
like what the doc explain.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Deep Copy in FLINK, Kryo Copy is used in the different operator

2018-02-11 Thread chen
Actually our team have our own Stream Engine, we tested our engine and flink,
find out when we aggregate the stream data, the throughput is decreasing
very fast.

So we catch the stack and find out a deep copy in flink.

In different operator, there will be
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy between
in different operator.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Deep Copy in FLINK, Kryo Copy is used in the different operator

2018-02-21 Thread chen
@Aljoscha Krettek,
Thanks Aljoscha, I will try this way to test the performance.
Last 7 days is chinese spring fastival, sorry for response you so late.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Deep Copy in FLINK, Kryo Copy is used in the different operator

2018-02-21 Thread chen
@Gábor Gévay,
Thanks Gábor
I just use flink in produce environment, but the performance is not
good, especially in aggregation.
At the beginning I used Java serialization, but it does not work well.
Maybe I do not understood flink very well then. I will try change the
serialization method. And test again.
Last 7 days is chinese spring fastival, sorry for responsing you so
late.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Specially introduced Flink to chinese users in CNCC(China National Computer Congress)

2015-10-24 Thread Liang Chen
Specially introduced Flink to chinese users in CNCC(China National Computer
Congress), many people are interesting in Flink and discussed with me. In
the future, there may have more and more users from china to participate in
Flink project and apply Flink to their big data system :)



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Specially-introduced-Flink-to-chinese-users-in-CNCC-China-National-Computer-Congress-tp3254.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Specially introduced Flink to chinese users in CNCC(China National Computer Congress)

2015-10-24 Thread Liang Chen

 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Specially-introduced-Flink-to-chinese-users-in-CNCC-China-National-Computer-Congress-tp3254p3255.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Specially introduced Flink to chinese users in CNCC(China National Computer Congress)

2015-11-18 Thread Liang Chen
Two aspects are attracting them:
1.Flink is using java, it is easy for most of them to start Flink, and be
more easy to maintain in comparison to Storm(as Clojure is difficult to
maintain, and less people know it.)
2.Users really want an unified system supporting streaming and batch
processing.




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Specially-introduced-Flink-to-chinese-users-in-CNCC-China-National-Computer-Congress-tp3254p3574.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: [ANNOUNCE] Introducing Apache Flink Taiwan User Group - Flink.tw

2016-01-04 Thread Liang Chen
Awesome!

Whether also inclue simple chinese or only Traditional chinese?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ANNOUNCE-Introducing-Apache-Flink-Taiwan-User-Group-Flink-tw-tp4136p4147.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


XGBoost4J: Portable Distributed XGboost in Flink

2016-03-14 Thread Tianqi Chen
Hi Flink Community:
I am sending this email to let you know we just release XGBoost4J which
also runs on Flink. In short, XGBoost is a machine learning package that is
used by more than half of the machine challenge winning solutions and is
already widely used in industry. The distributed version scale to billion
examples(10x faster than spark.mllib in the experiment) with fewer
resources (see .http://arxiv.org/abs/1603.02754)

   See our blogpost for more details
http://dmlc.ml/2016/03/14/xgboost4j-portable-distributed-xgboost-in-spark-flink-and-dataflow.html
 We
would love to have you try it out and helo us to make it better.

Cheers


flink streaming - window chaining example

2016-03-27 Thread Chen Bekor
hi all!

I'm just starting my way with flink and I have a design question.

I'm trying to aggregate incoming events (source: kafka topic) on a 10min
tumbling window in order to calculate the incoming events rate (total per
minute).

I would like to take this window and perform an additional window (60 min)
in order to calculate percentiles, std deviation and some other statistics
on that time window. finally I would like to trigger some business logic in
case the calculation hits a certain threshold.

my main challenge is - how to chain the two windows together.

any help is appreciated (please send scala example code - I'm not using
java :) for this project)


printing datastream to the log - instead of stdout

2016-04-14 Thread Chen Bekor
hi,

the .print() method will print a dataset / datastream to the stdout.

how can I print the stream to the standard logger (logback/log4j)?

I'm using flink scala - so scala example code is much appreciated.

p.s - I noticed that there's a PrintFunction that I can implement but it
feels like I'm reinventing the wheel for something that should be very
elementary.

thanks!!

Chen.


throttled stream

2016-04-16 Thread Chen Bekor
is there a way to consume a kafka stream using flink with  a predefined
rate limit (eg 5 events per second)

we need this because we need to control some 3rd party api rate limitations
so,  even if we have a much larger throughput potential, we must control
the consumption rate in order not to overflow the API channel.


fan out parallel-able operator sub-task beyond total slots number

2016-04-17 Thread Chen Qin
Hi there,


I try run large number of subtasks within a task slot using slot sharing
group. The usage scenario tried to adress operator that makes a network
call with high latency yet less memory or cpu footprint. (sample code below)

>From doc provided, slotsharinggroup seems the place to look at. Yet it
seems it were not designed to address the scenario above.
https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#workers-slots-resources

My question is, which is best way to fan out large number of sub tasking
parallel within a task?

public void testFanOut() throws Exception{
env = StreamExecutionEnvironment.getExecutionEnvironment();
...
env.addSource(...).setParallelism(1).disableChaining().shuffle().flatMap(new
FlatMapFunction() {
@Override
public void flatMap(DummyFlinkRecord dummyFlinkRecord,
Collector collector) throws Exception {
Thread.sleep(1000); //latency is high, needs to fan out
collector.collect(1l);
}
}).slotSharingGroup("flatmap").setParallelism(100).rebalance().filter(new
FilterFunction() {
@Override
public boolean filter(Long aLong) throws Exception {
return true;
}
}).setParallelism(10).addSink(new SinkFunction() {
@Override
public void invoke(Long aLong) throws Exception {
System.out.println(aLong);
}
});
env.execute("fan out 100 subtasks for 1s delay mapper");
}

Thanks,
Chen Qin


design question

2016-04-23 Thread Chen Bekor
hi all,

I have a stream of incoming object versions (objects change over time) and
a requirement to fetch from a datastore the last known object version in
order to link it with the id of the new version,  so that I end up with a
linked list of object versions.

all object versions contain the same guid, so I was thinking about using
flink streaming in order to assure ordering and avoid concurrency / race
conditions in the linkage process (object version might arrive unordered or
may arrive at spikes)

if I use the object guid as a key for a keyed stream I am concerned I will
end up with millions of windowed streams hence causing OOM.

what do you think should be the right approach? do you think flink is the
right technology for this task?


Re: design question

2016-04-24 Thread Chen Bekor
cool - can you point me to some docs about how to configure Rocks DB? I
searched the online docs and found nothing substantial. Also - If I'm using
HDFS (S3backed ) cluster, how would that effect RocksDB? can I configure it
to run on optimized SSD etc?

any help is appreciated.


On Sun, Apr 24, 2016 at 7:57 AM, John Sherwood  wrote:

> This sounds like you have some per-key state to keep track of, so the
> 'correct' way to do it would be to keyBy the guid. I believe that if you
> run your environment using the Rocks DB state backend you will not OOM
> regardless of the number of GUIDs that are eventually tracked. Whether
> flink/stream processing is the most effective way to achieve your goal, I
> can't say, but I am fairly confident that this particular aspect is not a
> problem.
>
> On Sat, Apr 23, 2016 at 1:13 AM, Chen Bekor  wrote:
>
>> hi all,
>>
>> I have a stream of incoming object versions (objects change over time)
>> and a requirement to fetch from a datastore the last known object version
>> in order to link it with the id of the new version,  so that I end up with
>> a linked list of object versions.
>>
>> all object versions contain the same guid, so I was thinking about using
>> flink streaming in order to assure ordering and avoid concurrency / race
>> conditions in the linkage process (object version might arrive unordered or
>> may arrive at spikes)
>>
>> if I use the object guid as a key for a keyed stream I am concerned I
>> will end up with millions of windowed streams hence causing OOM.
>>
>> what do you think should be the right approach? do you think flink is the
>> right technology for this task?
>>
>
>


s3 checkpointing issue

2016-05-03 Thread Chen Qin
Hi there,

I run a test job with filestatebackend and save checkpoints on s3 (via s3a)

The job crash when checkpoint triggered. Looking into s3 directory and list
objects. I found the directory is create successfully but all checkpoints
directory size are empty.

The host running task manager shows following error.

Received error response: com.amazonaws.services.s3.model.AmazonS3Exception:
Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549, AWS
Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID:x

Has anyone met this issue before?

flink 1.0.0
scala 2.10
hadoop-aws 2.7.2
aws-java-sdk 1.7.4


Thanks,
Chen

Attached full log that shows on web dashboard when job canceled.
java.lang.RuntimeException: Error triggering a checkpoint as the result of
receiving checkpoint barrier at
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Could
not open output stream for state backend at
org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444)
at java.io.DataOutputStream.write(DataOutputStream.java:88) at
java.io.DataOutputStream.write(DataOutputStream.java:88) at
org.apache.flink.types.StringValue.writeString(StringValue.java:813) at
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at
org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78)
at
org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27)
at
org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
at
org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:175)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:509)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:481)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678)
... 8 more Caused by: java.lang.NullPointerException at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
at
org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
at org.apache.hadoop.fs.s3a.S3AOutputStream.(S3AOutputStream.java:87)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
... 25 more


Re: s3 checkpointing issue

2016-05-04 Thread Chen Qin
Uruk & Igor,

Thanks for helping out!  Yup, it fixed my issue.

Chen



On Wed, May 4, 2016 at 12:57 PM, Igor Berman  wrote:

> I think I've had this issue too and fixed it as Ufuk suggested
> in core-site.xml
>
> something like
> 
> fs.s3a.buffer.dir
> /tmp
> 
>
>
> On 4 May 2016 at 11:10, Ufuk Celebi  wrote:
>
>> Hey Chen Qin,
>>
>> this seems to be an issue with the S3 file system. The root cause is:
>>
>>  Caused by: java.lang.NullPointerException at
>>
>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
>> at
>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
>> at
>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
>> at
>> org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
>> at
>> org.apache.hadoop.fs.s3a.S3AOutputStream.(S3AOutputStream.java:87)
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
>> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
>> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
>>
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
>> at
>> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
>> ... 25 more
>>
>> From [1] it looks like you have to specify
>>
>> fs.s3a.buffer.dir
>>
>> in the Hadoop configuration (where you set the S3 file system).
>>
>> The expected value is a comma separated list of local directories used
>> to buffer results prior to transmitting the to S3 (for large files).
>>
>> Does this fix the issue? Please report back so that we can include in
>> the "common issues" section of the AWS docs.
>>
>> – Ufuk
>>
>> [1] http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/
>>
>>
>> On Wed, May 4, 2016 at 2:41 AM, Chen Qin  wrote:
>> > Hi there,
>> >
>> > I run a test job with filestatebackend and save checkpoints on s3 (via
>> s3a)
>> >
>> > The job crash when checkpoint triggered. Looking into s3 directory and
>> list
>> > objects. I found the directory is create successfully but all
>> checkpoints
>> > directory size are empty.
>> >
>> > The host running task manager shows following error.
>> >
>> > Received error response:
>> com.amazonaws.services.s3.model.AmazonS3Exception:
>> > Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549,
>> AWS
>> > Error Code: null, AWS Error Message: Not Found, S3 Extended Request
>> ID:x
>> >
>> > Has anyone met this issue before?
>> >
>> > flink 1.0.0
>> > scala 2.10
>> > hadoop-aws 2.7.2
>> > aws-java-sdk 1.7.4
>> >
>> >
>> > Thanks,
>> > Chen
>> >
>> > Attached full log that shows on web dashboard when job canceled.
>> > java.lang.RuntimeException: Error triggering a checkpoint as the result
>> of
>> > receiving checkpoint barrier at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681)
>> > at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674)
>> > at
>> >
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>> > at
>> >
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>> > at
>> >
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>> > at
>> >
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>> > at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
>> > java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException:
>> Could
>> > not open output stream for state backend at
>> >
>> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputSt

s3 statebackend user state size

2016-05-10 Thread Chen Qin
Hi there,

With S3 as state backend, as well as keeping a large chunk of user state on 
heap. I can see task manager starts to fail without showing OOM exception. 
Instead, it shows a generic error message (below) when checkpoint triggered. I 
assume this has something to do with how state were kept in buffer and flush to 
s3 when checkpoint triggered. 

Future, to keep large key/value space, wiki point out using rocksdb as backend. 
My understanding is using rocksdb will write to local file systems instead of 
sync to s3. Does flink support memory->rocksdb(local disk)->s3 checkpoint state 
split yet? Or would implement kvstate interface makes flink take care of large 
state problem?

Chen

java.lang.Exception: The slot in which the task was executed has been released. 
Probably loss of TaskManager eddbcda03a61f61210063a7cd2148b36 @ 10.163.98.18 - 
24 slots - URL: akka.tcp://flink@10.163.98.18:6124/user/taskmanager at 
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) 
at 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
 at 
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) 
at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156) at 
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
 at 
org.apache.flink.runtime.jobmanager.JobManager$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at 
org.apache.flink.runtime.LeaderSessionMessageFilter$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:33) at 
org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:28) at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at 
org.apache.flink.runtime.LogMessages$anon$1.applyOrElse(LogMessages.scala:28) 
at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at 
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at 
akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at 
akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at 
akka.actor.ActorCell.invoke(ActorCell.scala:486) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at 
akka.dispatch.Mailbox.run(Mailbox.scala:221) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:231) at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Re: s3 statebackend user state size

2016-05-10 Thread Chen Qin
Hi Ufuk,

Yes, it does help with Rocksdb backend!
After tune checkpoint frequency align with network throughput, task manager 
released and job get cancelled are gone.

Chen


> On May 10, 2016, at 10:33 AM, Ufuk Celebi  wrote:
> 
>> On Tue, May 10, 2016 at 5:07 PM, Chen Qin  wrote:
>> Future, to keep large key/value space, wiki point out using rocksdb as
>> backend. My understanding is using rocksdb will write to local file systems
>> instead of sync to s3. Does flink support memory->rocksdb(local disk)->s3
>> checkpoint state split yet? Or would implement kvstate interface makes flink
>> take care of large state problem?
> 
> Hey Chen,
> 
> when you use RocksDB, you only need to explicitly configure the file
> system checkpoint directory, for which you can use S3:
> 
> new RocksDBStateBackend(new URI("s3://..."))
> 
> The local disk path are configured via the general Flink temp
> directory configuration (see taskmanager.tmp.dirs in
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html,
> default is /tmp).
> 
> State is written to the local RocksDB instance and the RocksDB files
> are copied to S3 on checkpoints.
> 
> Does this help?
> 
> – Ufuk


rocksdb backend on s3 window operator checkpoint issue

2016-05-16 Thread Chen Qin
Hi there,

I have been testing checkpointing on rocksdb backed by s3. Checkpoints
seems successful except snapshot states of timeWindow operator on
keyedStream. Here is the env setting I used
env.setStateBackend(new RocksDBStateBackend(new URI("s3://backupdir/")))

The checkpoint for always fail consistently when it goes to window operator
snapshotting. Exception log attached below.
I tried to env.setStateBackend(new RocksDBStateBackend(new URI(
"file:///tmp/checkpoints"))); or MemoryStateBackend(default) works no issue
with checkpoints.

Does anyone saw this issue before? Or did I mess up with configuration?

Thanks,
Chen

2016-05-16 17:20:32,132 INFO
 org.apache.flink.runtime.state.filesystem.FsStateBackend  -
Initializing file state backend to URI
s3://xxx/checkpoints/7e6abf126ce3d18f173733b34eda81a9
2016-05-16 17:20:32,423 INFO
 org.apache.flink.streaming.runtime.tasks.StreamTask   - Using
user-defined state backend:
org.apache.flink.contrib.streaming.state.RocksDBStateBackend@2fa68a53
2016-05-16 17:20:32,423 INFO
 org.apache.flink.runtime.state.filesystem.FsStateBackend  -
Initializing file state backend to URI
s3://uber-beats/sjc1/checkpoints/7e6abf126ce3d18f173733b34eda81a9
2016-05-16 17:21:31,423 INFO
 org.apache.flink.contrib.streaming.state.AbstractRocksDBState  - RocksDB
(/directory/flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/dbc64864de-8373-4b41-bd74-a26a8007f066)
backup (synchronous part) took 8 ms.
2016-05-16 17:21:36,125 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask   - Caught
exception while materializing asynchronous checkpoints.
com.amazonaws.AmazonClientException: Unable to calculate MD5
hash:/directory//flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/local-chk-599
(Is a directory)
at
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
at
com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
at
com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

...


Tests look like

.setParallelism(1).assignTimestampsAndWatermarks(new
AssignerWithPunctuatedWatermarks() {
@Override
public Watermark checkAndGetNextWatermark(String s, long l) {
long ts = System.currentTimeMillis() -  60*1000l;
return new Watermark(ts);
}

@Override
public long extractTimestamp(String s, long l) {
long ts =  System.currentTimeMillis();
return ts;
}
}).flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String s, Collector>
collector) throws Exception {
collector.collect(new Tuple2<>(s, 1l));
}
}).keyBy(0).timeWindow(Time.seconds(60)).apply(new
RichWindowFunction, Tuple2, Tuple,
TimeWindow>() {

@Override
public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable> iterable, Collector> collector) throws Exception {
log.info("trigger fire at ", System.currentTimeMillis());
collector.collect(new
Tuple2<>(String.valueOf(timeWindow.toString()), 1l));
}
}).rebalance().addSink(new FakeSink<>());


JobExecutionResult result = env.execute();


Failures due to inevitable high backpressure

2020-08-26 Thread Hubert Chen
Hello,

My Flink application has entered into a bad state and I was wondering if I
could get some advice on how to resolve the issue.

The sequence of events that led to a bad state:

1. A failure occurs (e.g., TM timeout) within the cluster
2. The application successfully recovers from the last completed checkpoint
3. The application consumes events from Kafka as quickly as it can. This
introduces high backpressure.
4. A checkpoint is triggered
5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka
transaction timeout) and the application loops back to step #2. This
creates a vicious cycle where no progress is made.

I believe the underlying issue is the application experiencing high
backpressure. This can cause the TM to not respond to heartbeats or cause
long checkpoint durations due to delayed processing of the checkpoint.

I'm confused on the best next steps to take. How do I ensure that
heartbeats and checkpoints successfully complete when experiencing
inevitable high packpressure?

Best,
Hubert


Native kubernetes setup failed to start job

2020-10-29 Thread Chen Liangde
I created a flink cluster in kubernetes following this guide:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html

The job manager was running. When a job was submitted to the job manager,
it spawned a task manager pod, but the task manager failed to connect to
the job manager. And in the job manager web ui I can't find the task
manager.

This error is
suspicious: 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
Adjusted frame length exceeds 10485760: 352518404 - discarded

2020-10-29 13:22:51,069 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Connecting to ResourceManager
akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*().
2020-10-29 13:22:51,176 WARN
akka.remote.transport.netty.NettyTransport   [] -
Remote connection to
[detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with
java.io.IOException: Connection reset by peer
2020-10-29 13:22:51,176 WARN
akka.remote.transport.netty.NettyTransport   [] -
Remote connection to
[detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
Adjusted frame length exceeds 10485760: 352518404 - discarded
2020-10-29 13:22:51,180 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system
[akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123] has
failed, address is now gated for [50] ms. Reason: [Association failed
with [akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123]]
Caused by: [The remote system explicitly disassociated (reason
unknown).]
2020-10-29 13:22:51,183 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Could not resolve ResourceManager address
akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*,
retrying in 1 ms: Could not connect to rpc endpoint under address
akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*.
2020-10-29 13:23:01,203 WARN
akka.remote.transport.netty.NettyTransport   [] -
Remote connection to
[detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with
java.io.IOException: Connection reset by peer


Re: Native kubernetes setup failed to start job

2020-11-01 Thread Chen Liangde
Please find attached logs.

The kubernetes cluster is an aws EKS cluster but managed by our infra's
team.
I created a service account "flink" for it and it has permission to create,
list, delete pods along with  some other types of resources in the
"team-anti-cheat" namespace.

Below command was used to create the flink cluster:
./bin/kubernetes-session.sh \
-Dexecution.attached=true \
-Dkubernetes.cluster-id=detection-engine-dev \
-Dkubernetes.namespace=team-anti-cheat \
-Dkubernetes.container-start-command-template="%java% %classpath%
%jvmmem% %jvmopts% %logging% %class% %args%" \
-Dkubernetes.jobmanager.service-account=flink

Thanks
Liangde Chen


On Mon, 2 Nov 2020 at 08:20, Yang Wang  wrote:

> Could you share the JobManager logs so that we could check whether it
> received the
> registration from TasManager?
>
> In a non-HA Flink cluster, the TaskManager is using the service to talk to
> JobManager.
> Currently, Flink creates a headless service for JobManager. You could use
> `kubectl get svc`
> to find it. And then start a busybox to check the network connectivity.
>
> And maybe you could share more information about the environment. I could
> not reproduce
> your issue in a typical K8s cluster.
>
> Best,
> Yang
>
> Yun Gao  于2020年10月30日周五 上午11:53写道:
>
>> Hi Liangde,
>>
>>I pull in Yang Wang who is the expert for Flink on K8s.
>>
>> Best,
>>  Yun
>>
>> --Original Mail --
>> *Sender:*Chen Liangde 
>> *Send Date:*Fri Oct 30 05:30:40 2020
>> *Recipients:*Flink ML 
>> *Subject:*Native kubernetes setup failed to start job
>>
>>> I created a flink cluster in kubernetes following this guide:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html
>>>
>>> The job manager was running. When a job was submitted to the job
>>> manager, it spawned a task manager pod, but the task manager failed to
>>> connect to the job manager. And in the job manager web ui I can't find the
>>> task manager.
>>>
>>> This error is
>>> suspicious: 
>>> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>>> Adjusted frame length exceeds 10485760: 352518404 - discarded
>>>
>>> 2020-10-29 13:22:51,069 INFO  
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - 
>>> Connecting to ResourceManager 
>>> akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*().2020-10-29
>>>  13:22:51,176 WARN  akka.remote.transport.netty.NettyTransport  
>>>  [] - Remote connection to 
>>> [detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with 
>>> java.io.IOException: Connection reset by peer2020-10-29 13:22:51,176 WARN  
>>> akka.remote.transport.netty.NettyTransport   [] - Remote 
>>> connection to [detection-engine-dev.team-anti-cheat/10.123.155.112:6123] 
>>> failed with 
>>> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>>>  Adjusted frame length exceeds 10485760: 352518404 - discarded2020-10-29 
>>> 13:22:51,180 WARN  akka.remote.ReliableDeliverySupervisor   
>>> [] - Association with remote system 
>>> [akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123] has failed, 
>>> address is now gated for [50] ms. Reason: [Association failed with 
>>> [akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123]] Caused by: 
>>> [The remote system explicitly disassociated (reason unknown).]2020-10-29 
>>> 13:22:51,183 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
>>> [] - Could not resolve ResourceManager address 
>>> akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*,
>>>  retrying in 1 ms: Could not connect to rpc endpoint under address 
>>> akka.tcp://fl...@detection-engine-dev.team-anti-cheat:6123/user/rpc/resourcemanager_*.2020-10-29
>>>  13:23:01,203 WARN  akka.remote.transport.netty.NettyTransport  
>>>  [] - Remote connection to 
>>> [detection-engine-dev.team-anti-cheat/10.123.155.112:6123] failed with 
>>> java.io.IOException: Connection reset by peer
>>>
>>>
NAME   TYPE   CLUSTER-IP   
EXTERNAL-IP   
PORT(S) AGE
detection-engine-dev   ClusterIP  None 

6123/TCP,6124/TCP   5m28s
detection-engine-dev-rest  LoadBalancer   172.20.210.124   
a375eab7dc75f42fc9935d5940107811-1454167660.us-east-1.elb.amazonaws.com   
8081:32256/TCP  5m28s

jobmanager.log
Description: Binary data


taskmanager.log
Description: Binary data


Re: Flink[Python] questions

2021-01-14 Thread Shuiqiang Chen
Hi Dc,

Thank you for your feedback.

1. Currently, only built-in types are supported in Python DataStream API,
however, you can apply a Row type to represent a  custom Python class as a
workaround that field names stand for the name of member variables and
field types stand for the type of member variables.

2. Could you please provide the full executed command line and which kind
of cluster you are running (standalone/yarn/k8s)? Various command lines to
submit a Pylink job are shown in
https://ci.apache.org/projects/flink/flink-docs-master/deployment/cli.html#submitting-pyflink-jobs
.

The attachment is an example code for a Python DataStream API job, for your
information.

Best,
Shuiqiang

Dc Zhao (BLOOMBERG/ 120 PARK)  于2021年1月14日周四
下午1:00写道:

> Hi Flink Community:
> We are using the pyflink to develop a POC for our project. We encountered
> some questions while using the flink.
>
> We are using the flink version 1.2, python3.7, data stream API
>
> 1. Do you have examples of providing a python customized class as a
> `result type`? Based on the documentation research, we found out only
> built-in types are supported in Python. Also, what is the payload size
> limitation inside the flink, do we have a recommendation for that?
>
> 2. Do you have examples of `flink run --python` data stream API codes to
> the cluster? We tried to do that, however the process hangs on a `socket
> read from the java gateway`, due to the lack of the missing logs, we are
> not sure what is missing while submitting the job.
>

>

>

>
> Regards
> Dc
>
>
> << {CH} {TS} Anything that can possibly go wrong, it does. >>
>
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, ProcessFunction


def datastream_processfunction_example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
data_stream = env.from_collection([(1, '1603708211000'),
   (2, '1603708224000'),
   (3, '1603708226000'),
   (4, '1603708289000')],
  type_info=Types.ROW([Types.INT(), Types.STRING()]))

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[1])

class MyProcessFunction(ProcessFunction):

def process_element(self, value, ctx):
current_timestamp = ctx.timestamp()
current_watermark = ctx.timer_service().current_watermark()
yield "current timestamp: {}, current watermark: {}, current_value: {}" \
.format(str(current_timestamp), str(current_watermark), str(value))

def on_timer(self, timestamp, ctx, out):
pass

watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.process(MyProcessFunction(), output_type=Types.STRING()).print()
env.execute("Python DataStream Example")


if __name__ == '__main__':
datastream_processfunction_example()


Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread Shuiqiang Chen
Hi meneldor,

The main cause of the error is that there is a bug in
`ctx.timer_service().current_watermark()`. At the beginning the stream,
when the first record come into the KeyedProcessFunction.process_element()
, the current_watermark will be the Long.MIN_VALUE at Java side, while at
the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.

>>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)

Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be
automatically converted to a long interger in python but will cause Long
value overflow in Java when deserializing the registered timer value. I
will craete a issue to fix the bug.

Let’s return to your initial question, at PyFlink you could create a Row
Type data as bellow:

>>> row_data = Row(id=‘my id’, data=’some data’, timestamp=)

And I wonder which release version of flink the code snippet you provided
based on? The latest API for KeyedProcessFunction.process_element() and
KeyedProcessFunction.on_timer() will not provid a `collector` to collect
output data but use `yield` which is a more pythonic approach.

Please refer to the following code:

def keyed_process_function_example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
data_stream = env.from_collection([(1, 'hello', '1603708211000'),
   (2, 'hi', '1603708224000'),
   (3, 'hello', '1603708226000'),
   (4, 'hi', '1603708289000')],

type_info=Types.ROW([Types.INT(), Types.STRING(), Types.STRING()]))

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[2])

class MyProcessFunction(KeyedProcessFunction):

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield Row(id=ctx.get_current_key()[1], data='some_string',
timestamp=)
# current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(ctx.timestamp()
+ 1500)

def on_timer(self, timestamp: int, ctx:
'KeyedProcessFunction.OnTimerContext'):
yield Row(id=ctx.get_current_key()[1], data='current on
timer timestamp: ' + str(timestamp),
  timestamp=timestamp)

output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'],
[Types.STRING(), Types.STRING(), Types.INT()])
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: (x[0], x[1]),
key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info).print()
env.execute('test keyed process function')


Best,
Shuiqiang





meneldor  于2021年1月14日周四 下午10:45写道:

> Hello,
>
> What is the correct way to use Python dict's as ROW type in pyflink? Im
> trying this:
>
> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
>  [Types.STRING(), Types.STRING(), 
> Types.LONG() ])
>
> class MyProcessFunction(KeyedProcessFunction):
> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', 
> out: Collector):
> result = {"id": ctx.get_current_key()[0], "data": "some_string", 
> "timestamp": }
> out.collect(result)
> current_watermark = ctx.timer_service().current_watermark()
> ctx.timer_service().register_event_time_timer(current_watermark + 
> 1500)
>
> def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext', 
> out: 'Collector'):
> logging.info(timestamp)
> out.collect("On timer timestamp: " + str(timestamp))
>
> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), 
> Types.STRING()])) \
>.process(MyProcessFunction(), output_type=output_type_info)
>
>
> I just hardcoded the values in MyProcessFunction to be sure that the input
> data doesnt mess the fields. So the data is correct but PyFlink trews an
> exception:
>
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>> at
>> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>> at
>> org.apache.flink.streaming.api.operators.py

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-17 Thread Shuiqiang Chen
Hi meneldor, Xingbo,

Sorry for the late reply.

Thanks a lot for Xingbo’s clarification.

And according to the stacktrace of the exception, could you have a check
whether the result data match the specified return type? BTW, please share
your code if it’s ok, it will be of help to debug.

Best,
Shuiqiang




meneldor  于2021年1月15日周五 下午4:59写道:

> I imported pyflink.common.types.Row and used it as Shuiqiang suggested but
> now Java throws a memory exception:
>
> Caused by: TimerException{java.lang.OutOfMemoryError: Java heap space}
>> ... 11 more
>> Caused by: java.lang.OutOfMemoryError: Java heap space
>> at
>> org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseChars(SegmentsUtil.java:91)
>> at
>> org.apache.flink.table.runtime.util.StringUtf8Utils.decodeUTF8(StringUtf8Utils.java:127)
>> at
>> org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:90)
>> at
>> org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:41)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>> at
>> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator$$Lambda$670/579781231.onProcessingTime(Unknown
>> Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$844/2129217743.run(Unknown
>> Source)
>>
>
> Regards
>
> On Fri, Jan 15, 2021 at 4:00 AM Xingbo Huang  wrote:
>
>> Hi meneldor,
>>
>> I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example.
>> The signature of the `process_element` method has been changed in the new
>> version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out
>> your results.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20647
>>
>> Best,
>> Xingbo
>>
>> meneldor  于2021年1月15日周五 上午1:20写道:
>>
>>> Thank you for the answer Shuiqiang!
>>> Im using the last apache-flink version:
>>>
>>>> Requirement already up-to-date: apache-flink in
>>>> ./venv/lib/python3.7/site-packages (1.12.0)
>>>
>>> however the method signature is using a collector:
>>>
>>> [image: image.png]
>>>  Im using the *setup-pyflink-virtual-env.sh* shell script from the
>>> docs(which uses pip).
>>>
>>> Regards
>>>
>>> On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen 
>>> wrote:
>>>
>>>> Hi meneldor,
>>>>
>>>> The main cause of the error is that there is a bug in
>>>> `ctx.timer_service().current_watermark()`. At the beginning the stream,
>>>> when the first record come into the KeyedProcessFunction.process_element()
>>>> , the current_watermark will be the Long.MIN_VALUE at Java side, while at
>>>> the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.
>>>>
>>>> >>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
>>>>
>>>> Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be
>>>> automatically converted to a long interger in python but will cause Long
>>>> value overflow in Java when deserializing the registered timer value. I
>>>> will craete 

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-18 Thread Shuiqiang Chen
Hi meneldor,

Actually, the return type of the on_timer() must be the same as
process_element(). It seems that the yield value of process_element() is
missing the `timestamp` field.  And the `output_type_info` has four field
names but with 5 field types. Could you align them?

Best,
Shuiqiang


Flink1.10 Cluster's Received is zero in the web when consume from Kafka0.11

2020-03-25 Thread Jim Chen
Hi, all
  When I use flink-connector-kafka-0.11 consume Kafka0.11, the Cluster
web's Received Record is always 0. However, the log is not empty. Any one
can help me?

[image: image.png]


Re: Flink1.10 Cluster's Received is zero in the web when consume from Kafka0.11

2020-03-25 Thread Jim Chen
Thanks for the tip!

May be call env.disableOperatorChaining() can show the received on the
dashborad

Chesnay Schepler  于2020年3月25日周三 下午5:56写道:

> This is a known limitation, see
> https://issues.apache.org/jira/browse/FLINK-7286 .
>
> As a crude workaround you may either break the chain after the source /
> before the sink, or query the numRecordsOut metric for the source /
> numRecordsIn metric for the sink via the WebUI metrics tab or REST API.
>
> On 25/03/2020 10:49, Jim Chen wrote:
>
> Hi, all
>   When I use flink-connector-kafka-0.11 consume Kafka0.11, the Cluster
> web's Received Record is always 0. However, the log is not empty. Any one
> can help me?
>
> [image: image.png]
>
>
>


How to consume kafka from the last offset?

2020-03-25 Thread Jim Chen
Hi, All
  I use flink-connector-kafka-0.11 consume the Kafka0.11. In KafkaConsumer
params, i set the group.id and auto.offset.reset. In the Flink1.10, set
the kafkaConsumer.setStartFromGroupOffsets();
  Then, i restart the application, found the offset is not from the last
position. Any one know where is wrong? HELP!


Re: How to consume kafka from the last offset?

2020-03-26 Thread Jim Chen
Thanks!

I made a mistake. I forget to set the auto.offset.reset=false. It's my
fault.

Dominik Wosiński  于2020年3月25日周三 下午6:49写道:

> Hi Jim,
> Well, *auto.offset.reset *is only used when there is no offset saved for
> this *group.id <http://group.id>* in Kafka. So, if You want to read the
> data from the latest record (and by latest I mean the newest here) You
> should assign the *group.id <http://group.id>* that was not previously
> used and then FlinkKafkaConsumer should automatically fetch the last offset
> and start reading from that place.
>
>
> Best Regards,
> Dom.
>
> śr., 25 mar 2020 o 11:19 Jim Chen 
> napisał(a):
>
>> Hi, All
>>   I use flink-connector-kafka-0.11 consume the Kafka0.11. In
>> KafkaConsumer params, i set the group.id and auto.offset.reset. In the
>> Flink1.10, set the kafkaConsumer.setStartFromGroupOffsets();
>>   Then, i restart the application, found the offset is not from the last
>> position. Any one know where is wrong? HELP!
>>
>


When i use the Tumbling Windows, find lost some record

2020-03-26 Thread Jim Chen
Hi, All

  When i use the Tumbling Windows, find lost some record. My code as follow

*env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);*

*env.addSource(FlinkKafkaConsumer011..)*







*.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor(Time.minutes(3)) {
  @Overridepublic long
extractTimestamp(JSONObject jsonObject) {long
logTime = jsonObject.getLongValue("logTime");return
logTime;}})*


*.keyBy(jsonObject -> {return
jsonObject.getString("userId");})*

*.timeWindow(Time.seconds(30))*

*.process(new ProcessWindowFunction() {*
* public void process(String key, Context context,
Iterable iterable, Collector collector) throws
Exception {*




*SimpleDateFormat sdf = new
SimpleDateFormat("-MM-dd HH:mm:ss");String
start = sdf.format(new Date(context.window().getStart()));
  String end = sdf.format(new Date(context.window().getEnd()));
System.out.println(start + "" + end);*
*for (JSONObject jsonObject : iterable) {*
*   collector.collect(jsonObject);*
*}}}*
*.print("");*

>From the print result, i found lost some record in the tumbling window. I
can't figure out, any one can help me ?


Re: How to consume kafka from the last offset?

2020-03-26 Thread Jim Chen
Hi,
I am so sorry. It's not auto.offset.reset. Correctly, it is
*enable.auto.commit=false*

Best Wishs!

Dominik Wosiński  于2020年3月26日周四 下午4:20写道:

> Hey,
> Are You completely sure you mean *auto.offset.reset ??  *False is not
> valid setting for that AFAIK.
>
> Best,
> Dom.
>
> czw., 26 mar 2020 o 08:38 Jim Chen 
> napisał(a):
>
>> Thanks!
>>
>> I made a mistake. I forget to set the auto.offset.reset=false. It's my
>> fault.
>>
>> Dominik Wosiński  于2020年3月25日周三 下午6:49写道:
>>
>>> Hi Jim,
>>> Well, *auto.offset.reset *is only used when there is no offset saved
>>> for this *group.id <http://group.id>* in Kafka. So, if You want to read
>>> the data from the latest record (and by latest I mean the newest here) You
>>> should assign the *group.id <http://group.id>* that was not previously
>>> used and then FlinkKafkaConsumer should automatically fetch the last offset
>>> and start reading from that place.
>>>
>>>
>>> Best Regards,
>>> Dom.
>>>
>>> śr., 25 mar 2020 o 11:19 Jim Chen 
>>> napisał(a):
>>>
>>>> Hi, All
>>>>   I use flink-connector-kafka-0.11 consume the Kafka0.11. In
>>>> KafkaConsumer params, i set the group.id and auto.offset.reset. In the
>>>> Flink1.10, set the kafkaConsumer.setStartFromGroupOffsets();
>>>>   Then, i restart the application, found the offset is not from the
>>>> last position. Any one know where is wrong? HELP!
>>>>
>>>


Re: Debug Slowness in Async Checkpointing

2020-04-25 Thread Chen Q

Just echo what Lu mentioned, is there documentation we can find more info on

 * when barriers were instrumented at source from checkpoint coordinator
 * when each down stream task observe first barrier of a chk
 * when list of barriers of a chk arrives to a task
 * when snapshot start/complete
 * when upload to remote file system start/complete
 * when ack send to checkpoint coordinator

For now, we only see checkpoint timeout due to a task can't finish in 
time in flink UI, seems limited to debug further.


Chen


On 4/24/20 10:52 PM, Congxian Qiu wrote:

Hi
If the bottleneck is the upload part, did you even have tried upload 
files using multithread[1]


[1] https://issues.apache.org/jira/browse/FLINK-11008
Best,
Congxian


Lu Niu mailto:qqib...@gmail.com>> 于2020年4月24日周五 
下午12:38写道:


Hi, Robert

Thanks for relying. Yeah. After I added monitoring on the above
path, it shows the slowness did come from uploading file to s3.
Right now I am still investigating the issue. At the same time, I
am trying PrestoS3FileSystem to check whether that can mitigate
the problem.

Best
Lu

On Thu, Apr 23, 2020 at 8:10 AM Robert Metzger
mailto:rmetz...@apache.org>> wrote:

Hi Lu,

were you able to resolve the issue with the slow async
checkpoints?

I've added Yu Li to this thread. He has more experience with
the state backends to decide which monitoring is appropriate
for such situations.

Best,
Robert


On Tue, Apr 21, 2020 at 10:50 PM Lu Niu mailto:qqib...@gmail.com>> wrote:

Hi, Robert

Thanks for replying. To improve observability , do you
think we should expose more metrics in checkpointing? for
example, in incremental checkpoint, the time spend on
uploading sst files?

https://github.com/apache/flink/blob/5b71c7f2fe36c760924848295a8090898cb10f15/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L319

Best
Lu


On Fri, Apr 17, 2020 at 11:31 AM Robert Metzger
mailto:rmetz...@apache.org>> wrote:

Hi,
did you check the TaskManager logs if there are
retries by the s3a file system during checkpointing?

I'm not aware of any metrics in Flink that could be
helpful in this situation.

Best,
Robert

On Tue, Apr 14, 2020 at 12:02 AM Lu Niu
mailto:qqib...@gmail.com>> wrote:

Hi, Flink users

We notice sometimes async checkpointing can be
extremely slow, leading to checkpoint timeout. For
example, For a state size around 2.5MB, it could
take 7~12min in async checkpointing:

Screen Shot 2020-04-09 at 5.04.30 PM.png

Notice all the slowness comes from async
checkpointing, no delay in sync part and barrier
assignment. As we use rocksdb incremental
checkpointing, I notice the slowness might be
caused by uploading the file to s3. However, I am
not completely sure since there are other steps in
async checkpointing. Does flink expose
fine-granular metrics to debug such slowness?

setup: flink 1.9.1, rocksdb incremental state
backend, S3AHaoopFileSystem

Best
Lu



Re: Flink consuming rate increases slowly

2020-05-10 Thread Chen Qin
Hi Eyal,

It’s unclear what warmup phase does in your use cases. Usually we see Flink 
start consume at high rate and drop to a point downstream can handle.

Thanks
Chen

> On May 10, 2020, at 12:25 AM, Eyal Pe'er  wrote:
> 
> Hi all,
> Lately I've added more resources to my Flink cluster which required a restart 
> of all apps.
> From the cluster side, the only change I made, is to add more task slots.
> On the cluster I have a streaming app that consumes from Kafka and sinks to 
> files.
> I noticed that since the restart, the applications "warmup" has impacted 
> dramatically.
> Before, the change it took few minutes for the app to start and consume 
> normally (from my point of view, normally is a stable rate) - from 0 to 16K 
> events per second in 4 minutes.
> Now, after the change, it takes hours till it stabilizes on the normal 
> processing rate- from 0 to 12K events per second in 3 hours.
> The data source behavior hasn’t changed (same incoming rate, partitions, 
> servers etc.).
> I am aware to the backpressure mechanism in Flink, but it seems like it works 
> too slow here.
> Is there a way to speed or control it? 
>  
> Thanks a lot
> Eyal Peer



Re: Flink operator throttle

2020-05-17 Thread Chen Qin
Hi Ray,

In a bit abstract point of view, you can always throttle source and get
proper sink throughput control.
One approach might be just override base KafkaFetcher and use shaded
guava rate limtier.

https://github.com/apache/flink/blob/59714b9d6addb1dbf2171cab937a0e3fec52f2b1/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L347

Best,

Chen


On Sat, May 16, 2020 at 11:48 PM Benchao Li  wrote:

> Hi,
>
> > If I want to use the rate limiter in other connectors, such as Kafka
> sink, ES sink, I need to do some more work on these connectors.
> Yes, you can do this by changing Kafka/ES sink, actually, this is how we
> did internally.
>
> > I'd like to know if the community has a plan to make a lower-level
> implementation for all connectors, also for table API and SQL?
> In my understanding, there is no on-going work on this. And usually we
> should leverage the back-pressure feature to do this.
> We can hear more from others whether this is a valid need.
>
> 王雷  于2020年5月17日周日 下午2:32写道:
>
>> Hi Benchao
>>
>> Thanks for your answer!
>>
>> According to your answer, I found `GuavaFlinkConnectorRateLimiter` which
>> is the implementation of the `FlinkConnectorRateLimiter`.
>>
>> If I want to use the rate limiter in other connectors, such as Kafka
>> sink, ES sink, I need to do some more work on these connectors.
>>
>> I'd like to know if the community has a plan to make a lower-level
>> implementation for all connectors, also for table API and SQL?
>>
>> Thanks
>> Ray
>>
>> Benchao Li  于2020年5月14日周四 下午5:49写道:
>>
>>> AFAIK, `FlinkKafkaConsumer010#setRateLimiter` can configure the kafka
>>> source to have a rate limiter.
>>> (I assume you uses Kafka)
>>> However it only exists in Kafka 0.10 DataStream Connector, not in other
>>> versions nor table api.
>>>
>>> 王雷  于2020年5月14日周四 下午5:31写道:
>>>
>>>> hi, All
>>>>
>>>> Does Flink support rate limitation?
>>>> How to limit the rate when the external database connected by the sink
>>>> operator has throughput limitation.
>>>> Instead of passive back pressure after reaching the limit of the
>>>> external database, we want to limit rate actively.
>>>>
>>>> Thanks
>>>> Ray
>>>>
>>>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Flink Kafka Connector Source Parallelism

2020-05-27 Thread Chen, Mason
Hi all,

I’m currently trying to understand Flink’s Kafka Connector and how parallelism 
affects it. So, I am running the flink playground click count job and the 
parallelism is set to 2 by default.


However, I don’t see the 2nd subtask of the Kafka Connector sending any 
records: https://imgur.com/cA5ucSg. Do I need to rebalance after reading from 
kafka?

```
clicks = clicks
   .keyBy(ClickEvent::getPage)
   .map(new BackpressureMap())
   .name("Backpressure");
```

`clicks` is the kafka click stream. From my reading in the operator docs, it 
seems counterintuitive to do a `rebalance()` when I am already doing a 
`keyBy()`.

So, my questions:

1. How do I make use of the 2nd subtask?
2. Does the number of partitions have some sort of correspondence with the 
parallelism of the source operator? If so, is there a general statement to be 
made about parallelism across all source operators?

Thanks,
Mason


Re: Flink Kafka Connector Source Parallelism

2020-05-27 Thread Chen, Mason
I think I may have just answered my own question. There’s only one Kafka 
partition, so the maximum parallelism is one and it doesn’t really make sense 
to make another kafka consumer under the same group id. What threw me off is 
that there’s a 2nd subtask for the kafka source created even though it’s not 
actually doing anything. So, it seems a general statement can be made that (# 
kafka partitions) >= (# parallelism of flink kafka source)…well I guess you 
could have more parallelism than kafka partitions, but the extra subtasks will 
not doing anything.

From: "Chen, Mason" 
Date: Wednesday, May 27, 2020 at 11:09 PM
To: "user@flink.apache.org" 
Subject: Flink Kafka Connector Source Parallelism

Hi all,

I’m currently trying to understand Flink’s Kafka Connector and how parallelism 
affects it. So, I am running the flink playground click count job and the 
parallelism is set to 2 by default.



However, I don’t see the 2nd subtask of the Kafka Connector sending any 
records: https://imgur.com/cA5ucSg. Do I need to rebalance after reading from 
kafka?

```
clicks = clicks
   .keyBy(ClickEvent::getPage)
   .map(new BackpressureMap())
   .name("Backpressure");
```

`clicks` is the kafka click stream. From my reading in the operator docs, it 
seems counterintuitive to do a `rebalance()` when I am already doing a 
`keyBy()`.

So, my questions:

1. How do I make use of the 2nd subtask?
2. Does the number of partitions have some sort of correspondence with the 
parallelism of the source operator? If so, is there a general statement to be 
made about parallelism across all source operators?

Thanks,
Mason


关于flink sql 滚动窗口无法输出结果集合

2020-05-28 Thread steven chen
数据没次都能进来,并且统计,但是为什么结果insert 不会保存到mysql 中?是sql的问题?还是?求大神解答
CREATE TABLE user_behavior (

itemCode VARCHAR,

ts BIGINT COMMENT '时间戳',

t as TO_TIMESTAMP(FROM_UNIXTIME(ts /1000,'-MM-dd HH:mm:ss')),

proctime as PROCTIME(),

WATERMARK FOR t as t - INTERVAL '5' SECOND

) WITH (

'connector.type' = 'kafka',

'connector.version' = '0.11',

'connector.topic' = 'scan-flink-topic',

'connector.properties.group.id' ='qrcode_pv_five_min',

'connector.startup-mode' = 'latest-offset',

'connector.properties.zookeeper.connect' = 'localhost:2181',

'connector.properties.bootstrap.servers' = 'localhost:9092',

'update-mode' = 'append',

'format.type' = 'json',

'format.derive-schema' = 'true'

);

CREATE TABLE pv_five_min (
item_code VARCHAR,
dt VARCHAR,
dd VARCHAR,
pv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://127.0.0.1:3306/qrcode',
'connector.table' = 'qrcode_pv_five_min',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = 'root',
'connector.write.flush.max-rows' = '1'
);

INSERT INTO pv_five_min
SELECT
itemCode As item_code,
DATE_FORMAT(TUMBLE_START(t, INTERVAL '5' MINUTE),'-MM-dd HH:mm') dt,
DATE_FORMAT(TUMBLE_END(t, INTERVAL '5' MINUTE),'-MM-dd HH:mm') dd,
COUNT(*) AS pv
FROM user_behavior
GROUP BY TUMBLE(t, INTERVAL '5' MINUTE),itemCode;




 

HELP ! ! ! When using Flink1.10 to define table with the string type, the query result is null

2020-07-05 Thread Jim Chen
Hi, everyone!

When i use flink1.10 to define table, and i want to define the json array
as the string type. But the query resutl is null when i execute the program.
The detail code as follow:

package com.flink;

import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
 */
public class Problem2 {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
//bsEnv.registerFunction("explode3", new ExplodeFunction());

String ddlSource = "CREATE TABLE actionTable3 (\n" +
"action STRING\n" +
") WITH (\n" +
"'connector.type' = 'kafka',\n" +
"'connector.version' = '0.11',\n" +
"'connector.topic' = 'test_action',\n" +
"'connector.startup-mode' = 'earliest-offset',\n" +
"'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
"'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
"'update-mode' = 'append',\n" +
"'format.type' = 'json',\n" +
//"'format.derive-schema' = 'true',\n" +
"'format.json-schema' = '{\"type\": \"object\",
\"properties\": {\"action\": {\"type\": \"string\"} } }'" +
")";
System.out.println(ddlSource);
bsEnv.sqlUpdate(ddlSource);

Table table = bsEnv.sqlQuery("select * from actionTable3");
//Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
TABLE(explode3(`action`)) as T(`word`)");
table.printSchema();
bsEnv.toAppendStream(table, Row.class)
.print();// the result is null

bsEnv.execute("ARRAY tableFunction Problem");
}
}


Re: Kafka Rate Limit in FlinkConsumer ?

2020-07-06 Thread Chen Qin
My two cents here,

- flink job already has back pressure so rate limit can be done via setting 
parallelism to proper number in some use cases. There is an open issue of 
checkpointing reliability when back pressure, community seems working on it.

- rate limit can be abused easily and cause lot of confusions. Think about a 
use case where you have two streams do a simple interval join. Unless you were 
able to rate limit both with proper value dynamiclly, you might see timestamp 
and watermark gaps keep increasing causing checkpointing failure.

So the question might be, instead of looking at rate limit of one source, how 
to slow down all sources without ever increasing time, wm gaps. It sounds 
complicated already.

with what being said, if you really want to have rate limit on your own, you 
can try following code :) It works well for us.
public class SynchronousKafkaConsumer extends FlinkKafkaConsumer {

  protected static final Logger LOG = 
LoggerFactory.getLogger(SynchronousKafkaConsumer.class);

  private final double topicRateLimit;
  private transient RateLimiter subtaskRateLimiter;

@Override
public void open(Configuration configuration) throws Exception {
  Preconditions.checkArgument(
  topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks() > 0.1,
  "subtask ratelimit should be greater than 0.1 QPS");
  subtaskRateLimiter = RateLimiter.create(
  topicRateLimit / getRuntimeContext().getNumberOfParallelSubtasks());
  super.open(configuration);
}

@Override
protected AbstractFetcher createFetcher(
SourceContext sourceContext,
Map partitionsWithOffsets,
SerializedValue> watermarksPeriodic,
SerializedValue> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup, boolean useMetrics)
throws Exception {

  return new KafkaFetcher(
  sourceContext,
  partitionsWithOffsets,
  watermarksPeriodic,
  watermarksPunctuated,
  runtimeContext.getProcessingTimeService(),
  runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
  runtimeContext.getUserCodeClassLoader(),
  runtimeContext.getTaskNameWithSubtasks(),
  deserializer,
  properties,
  pollTimeout,
  runtimeContext.getMetricGroup(),
  consumerMetricGroup,
  useMetrics) {
@Override
protected void emitRecord(T record,
  KafkaTopicPartitionState 
partitionState,
  long offset) throws Exception {
  subtaskRateLimiter.acquire();
  if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
  }
  super.emitRecord(record, partitionState, offset);
}

@Override
protected void emitRecordWithTimestamp(T record,

KafkaTopicPartitionState partitionState,
   long offset, long timestamp) throws 
Exception {
  subtaskRateLimiter.acquire();
  if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
  }
  super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
}
  };

}
Thanks,

Chen
Pinterest Data


> On Jul 6, 2020, at 7:43 AM, David Magalhães  wrote:
> 
> I've noticed that this FLINK-11501 was implemented in 
> flink-connector-kafka-0.10 [1], but it wasn't in the current version of the 
> flink-connector-kafka. There is any reason for this, and why should be the 
> best solution to implement a rate limit functionality in the current Kafka 
> consumer?
> 
> Thanks,
> David
> 
> [1] 
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
>  
> <https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java>
> 
> [2] 
> https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
>  
> <https://github.com/lyft/flink/blob/release-1.11-lyft/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java>


HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

2020-07-15 Thread Jim Chen
Hi,
  I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase,
report an error like validateSchemaAndApplyImplicitCast. Means that the
Query Schema and Sink Schema are inconsistent.
  Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink
Schema is Row(device_id). I don't know how to write in sql to be consistent
with hbase's sink schema.
  I try to write sql like select device_id as rowkey, ROW( device_id as
[cannot write as]  ) as f1

error message as follow:
[image: image.png]

sample code like:
HBase sink ddl:
String ddlSource = "CREATE TABLE
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
"  rowkey STRING,\n" +
"  f1 ROW< \n" +
"device_id STRING,\n" +
"pass_id STRING,\n" +
"first_date STRING,\n" +
"first_channel_id STRING,\n" +
"first_app_version STRING,\n" +
"first_server_time STRING,\n" +
"first_server_hour STRING,\n" +
"first_ip_location STRING,\n" +
"first_login_time STRING,\n" +
"sys_can_uninstall STRING,\n" +
"update_date STRING,\n" +
"server_time BIGINT,\n" +
"last_pass_id STRING,\n" +
"last_channel_id STRING,\n" +
"last_app_version STRING,\n" +
"last_date STRING,\n" +
"os STRING,\n" +
"attribution_channel_id STRING,\n" +
"attribution_first_date STRING,\n" +
"p_product STRING,\n" +
"p_project STRING,\n" +
"p_dt STRING\n" +
">\n" +
") WITH (\n" +
"  'connector.type' = 'hbase',\n" +
"  'connector.version' = '1.4.3',\n" + //
即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
"  'connector.table-name' =
'dw_common_mobile_device_user_mapping_new',\n" +
"  'connector.zookeeper.quorum' = '"+ zookeeperServers
+"',\n" +
"  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
"  'connector.write.buffer-flush.max-size' = '2mb',\n" +
"  'connector.write.buffer-flush.max-rows' = '1000',\n" +
"  'connector.write.buffer-flush.interval' = '2s'\n" +
")";

insert into sql:

String bodyAndLocalSql = "" +
//"insert into
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
"SELECT CAST(rowkey AS STRING) AS rowkey, " +
" ROW(" +
" device_id, pass_id, first_date, first_channel_id,
first_app_version, first_server_time, first_server_hour, first_ip_location,
first_login_time, sys_can_uninstall, update_date, server_time,
last_pass_id, last_channel_id, last_app_version, last_date, os,
attribution_channel_id, attribution_first_date, p_product, p_project, p_dt
" +
") AS f1" +
" FROM " +
"(" +
" SELECT " +
" MD5(CONCAT_WS('|', kafka.uid, kafka.p_product,
kafka.p_project)) AS rowkey, " +
" kafka.uid AS device_id " +
",kafka.pass_id " +

// first_date
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " +
// 老用户
" ELSE hbase.first_date END AS first_date " +

// first_channel_id
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.wlb_channel_id" +
// 老用户
" ELSE hbase.first_channel_id END AS first_channel_id " +

// first_app_version
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.app_version " +
// 老用户
" ELSE hbase.first_app_version END AS first_app_version " +

// first_server_time
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd
HH:mm:ss') " +
// 老用户
" ELSE hbase.first_server_time END AS first_server_time " +

// first_server_hour
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, 'HH') " +
// 老用户
" ELSE hbase.first_server_hour END AS first_server_hour " +

// first_ip_location
",C

records-lag-max

2020-07-20 Thread Chen, Mason
Hi all,

I am having some trouble with the lag metric from the kafka connector. The 
gauge value is always reported as NaN although I’m producing events first and 
then starting the flink job. Anyone know how to fix this?

```
# TYPE flink_taskmanager_job_task_operator_records_lag_max gauge
flink_taskmanager_job_task_operator_records_lag_max{ … } NaN
```

Thanks,
Mason


Re: records-lag-max

2020-07-20 Thread Chen, Mason
I removed an unnecessary `.keyBy()` and I’m getting the metrics again. Is this 
a potential bug?

From: "Chen, Mason" 
Date: Monday, July 20, 2020 at 12:41 PM
To: "user@flink.apache.org" 
Subject: records-lag-max

Hi all,

I am having some trouble with the lag metric from the kafka connector. The 
gauge value is always reported as NaN although I’m producing events first and 
then starting the flink job. Anyone know how to fix this?

```
# TYPE flink_taskmanager_job_task_operator_records_lag_max gauge
flink_taskmanager_job_task_operator_records_lag_max{ … } NaN
```

Thanks,
Mason


RE: Recommended pattern for implementing a DLQ with Flink+Kafka

2020-07-22 Thread Chen Qin
Could you more specific on what “failed message” means here?In general side output can do something like were  def process(ele) {   try{    biz} catch {   Sideout( ele + exception context)}}  process(func).sideoutput(tag).addSink(kafkasink) Thanks,Chen   From: Eleanore JinSent: Wednesday, July 22, 2020 9:25 AMTo: Tom FennellyCc: userSubject: Re: Recommended pattern for implementing a DLQ with Flink+Kafka +1 we have a similar use case for message schema validation. Eleanore On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly <tfenne...@cloudbees.com> wrote:Hi. I've been searching blogs etc trying to see if there are established patterns/mechanisms for reprocessing of failed messages via something like a DLQ. I've read about using checkpointing and restarting tasks (not what we want because we want to keep processing forward) and then also how some use side outputs to filter "bad" data to a DLQ style topic. Kafka has dead letter topic configs too but it seems that can't really be used from inside Flink (from what I can see). We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there just isn't a defined pattern for it, or if I'm just not asking the right questions in my searches. I searched the archives here and don't see anything either, which obviously makes me think that I'm not thinking about this in the "Flink way" :-| Regards, Tom. 


Re: Job Cluster on Kubernetes with PyFlink

2020-07-28 Thread Shuiqiang Chen
Hi Wojciech,

Currently, we are not able to deploy a job cluster for PyFlink jobs on
kubernetes, but it will be supported in release-1.12.

Best,
Shuiqiang


Re: Job Cluster on Kubernetes with PyFlink

2020-07-28 Thread Shuiqiang Chen
Hi Wojciech,

After double checking, there should be a way to run PyFlink jobs on
kubernetes in the job cluster. You can have a try:
1. The custom image has a corresponding pyflink installed. (it seems that
you have already done this)
2. If you use third-party python dependencies in the Python UDF, please
make sure that the Python dependencies should also be pip installed
3. Putting the flink-python_{your_scala_version}-{your_flink_version}.jar
into the /opt/flink/usrlib directory when building the custom docker image.
4. Setting the value of option "--job-classname" to be
"org.apache.flink.client.python.PythonDriver".
5. Adding '-pym {the_entry_module_of_your_pyflink_job}' to [job arguments].

Best,
Shuiqiang


Shuiqiang Chen  于2020年7月28日周二 下午5:55写道:

> Hi Wojciech,
>
> Currently, we are not able to deploy a job cluster for PyFlink jobs on
> kubernetes, but it will be supported in release-1.12.
>
> Best,
> Shuiqiang
>
>


Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-02 Thread Shuiqiang Chen
Hi jincheng,

Thanks for the discussion. +1 for the FLIP.

A well-organized documentation will greatly improve the efficiency and
experience for developers.

Best,
Shuiqiang

Hequn Cheng  于2020年8月1日周六 上午8:42写道:

> Hi Jincheng,
>
> Thanks a lot for raising the discussion. +1 for the FLIP.
>
> I think this will bring big benefits for the PyFlink users. Currently, the
> Python TableAPI document is hidden deeply under the TableAPI&SQL tab which
> makes it quite unreadable. Also, the PyFlink documentation is mixed with
> Java/Scala documentation. It is hard for users to have an overview of all
> the PyFlink documents. As more and more functionalities are added into
> PyFlink, I think it's time for us to refactor the document.
>
> Best,
> Hequn
>
>
> On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira 
> wrote:
>
>> Hi, Jincheng!
>>
>> Thanks for creating this detailed FLIP, it will make a big difference in
>> the experience of Python developers using Flink. I'm interested in
>> contributing to this work, so I'll reach out to you offline!
>>
>> Also, thanks for sharing some information on the adoption of PyFlink, it's
>> great to see that there are already production users.
>>
>> Marta
>>
>> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang  wrote:
>>
>> > Hi Jincheng,
>> >
>> > Thanks a lot for bringing up this discussion and the proposal.
>> >
>> > Big +1 for improving the structure of PyFlink doc.
>> >
>> > It will be very friendly to give PyFlink users a unified entrance to
>> learn
>> > PyFlink documents.
>> >
>> > Best,
>> > Xingbo
>> >
>> > Dian Fu  于2020年7月31日周五 上午11:00写道:
>> >
>> >> Hi Jincheng,
>> >>
>> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
>> >> improve the Python API doc.
>> >>
>> >> I have received many feedbacks from PyFlink beginners about
>> >> the PyFlink doc, e.g. the materials are too few, the Python doc is
>> mixed
>> >> with the Java doc and it's not easy to find the docs he wants to know.
>> >>
>> >> I think it would greatly improve the user experience if we can have one
>> >> place which includes most knowledges PyFlink users should know.
>> >>
>> >> Regards,
>> >> Dian
>> >>
>> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
>> >>
>> >> Hi folks,
>> >>
>> >> Since the release of Flink 1.11, users of PyFlink have continued to
>> grow.
>> >> As far as I know there are many companies have used PyFlink for data
>> >> analysis, operation and maintenance monitoring business has been put
>> into
>> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According
>> to
>> >> the feedback we received, current documentation is not very friendly to
>> >> PyFlink users. There are two shortcomings:
>> >>
>> >> - Python related content is mixed in the Java/Scala documentation,
>> which
>> >> makes it difficult for users who only focus on PyFlink to read.
>> >> - There is already a "Python Table API" section in the Table API
>> document
>> >> to store PyFlink documents, but the number of articles is small and the
>> >> content is fragmented. It is difficult for beginners to learn from it.
>> >>
>> >> In addition, FLIP-130 introduced the Python DataStream API. Many
>> >> documents will be added for those new APIs. In order to increase the
>> >> readability and maintainability of the PyFlink document, Wei Zhong and
>> me
>> >> have discussed offline and would like to rework it via this FLIP.
>> >>
>> >> We will rework the document around the following three objectives:
>> >>
>> >> - Add a separate section for Python API under the "Application
>> >> Development" section.
>> >> - Restructure current Python documentation to a brand new structure to
>> >> ensure complete content and friendly to beginners.
>> >> - Improve the documents shared by Python/Java/Scala to make it more
>> >> friendly to Python users and without affecting Java/Scala users.
>> >>
>> >> More detail can be found in the FLIP-133:
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
>> >>
>> >> Best,
>> >> Jincheng
>> >>
>> >> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg
>> >> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g
>> >>
>> >>
>> >>
>>
>


Re: Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-03 Thread Shuiqiang Chen
Hi Kevin,

Thank you for your questions. Currently, users are not able to defined
custom source/sinks in Python. This is a greate feature that can unify the
end to end PyFlink application development in Python and is a large topic
that we have no plan to support at present.

As you have noticed that `the Python DataStream API has several connectors
[2] that use Py4J+Java gateways to interoperate with Java source/sinks`.
These connectors are the extensions of the Python abstract class named
`SourceFunction` and `SinkFunction`. Thess two classes can accept a Java
source/sink instance and maintain it to enable the interoperation between
Python and Java.  They can also accept a string of the full name of a
Java/Scala defined Source/SinkFunction class and create the corresponding
java instance. Bellow are the definition of these classes:

class JavaFunctionWrapper(object):
"""
A wrapper class that maintains a Function implemented in Java.
"""

def __init__(self, j_function: Union[str, JavaObject]):
# TODO we should move this part to the get_java_function() to
perform a lazy load.
if isinstance(j_function, str):
j_func_class = get_gateway().jvm.__getattr__(j_function)
j_function = j_func_class()
self._j_function = j_function

def get_java_function(self):
return self._j_function



class SourceFunction(JavaFunctionWrapper):
"""
Base class for all stream data source in Flink.
"""

def __init__(self, source_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.

:param source_func: The java SourceFunction object.
"""
super(SourceFunction, self).__init__(source_func)


class SinkFunction(JavaFunctionWrapper):
"""
The base class for SinkFunctions.
"""

def __init__(self, sink_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.

:param sink_func: The java SinkFunction object or the full name of the
SinkFunction class.
"""
super(SinkFunction, self).__init__(sink_func)

Therefore, you are able to defined custom sources/sinks in Scala and apply
them in Python. Here is the recommended approach for implementation:

class MyBigTableSink(SinkFunction):
def __init__(self, class_name: str):
super(MyBigTableSink, self).__init__(class_name)


def example():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars('/the/path/of/your/MyBigTableSink.jar')
# ...
ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
env.execute("Application with Custom Sink")


if __name__ == '__main__':
example()

Remember that you must add the jar of the Scala defined SinkFunction by
calling `env.add_jars()` before adding the SinkFunction. And your custom
sources/sinks function must be the extension of `SourceFunction` and
`SinkFunction`.

Any further questions are welcomed!

Best,
Shuiqiang


Kevin Lam  于2021年3月3日周三 上午2:50写道:

> Hello everyone,
>
> I have some questions about the Python API that hopefully folks in the
> Apache Flink community can help with.
>
> A little background, I’m interested in using the Python Datastream API
> because of stakeholders who don’t have a background in Scala/Java, and
> would prefer Python if possible. Our team is open to maintaining Scala
> constructs on our end, however we are looking to expose Flink for stateful
> streaming via a Python API to end-users.
>
> Questions:
>
> 1/ The docs mention that custom Sources and Sinks cannot be defined in
> Python, but must be written in Java/Scala [1]. What is the recommended
> approach for interoperating between custom sinks/sources written in Scala,
> with the Python API? If nothing is currently supported, is it on the road
> map?
>
> 2/ Also, I’ve noted that the Python DataStream API has several connectors
> [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is
> there a way for users to build their own connectors? What would this
> process entail?
>
> Ideally, we’d like to be able to define custom sources/sinks in Scala and
> use them in our Python API Flink Applications. For example, defining a
> BigTable sink in Scala for use in the Python API:
>
>
> [3]
>
> Where MyBigTableSink is just somehow importing a Scala defined sink.
>
> More generally, we’re interested in learning more about Scala/Python
> interoperability in Flink, and how we can expose the power of Flink’s Scala
> APIs to Python. Open to any suggestions, strategies, etc.
>
> Looking forward to any thoughts!
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks
>
> [2]
> https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py
>
> [3] Plaintext paste of code in screenshot, in case of attachment issues:
> ```
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import M

Re: PyFlink Connection Refused to Kubernetes Session Cluster

2021-03-04 Thread Shuiqiang Chen
Hi Robert,

It seems the retrieved address of JobManager is a cluster-internal Ip that
can noly be accessed inside the cluster. As you said, you might need to
create an ingress to expose the JobManager service so that the client can
access to it outside of the k8s cluster.

Best,
Shuiqiang

Robert Cullen  于2021年3月5日周五 上午2:58写道:

> Attempting to run the word_count.py example on my kubernetes (session)
> cluster:
>
> ./bin/flink run \
> --target kubernetes-session \
> -Dkubernetes.cluster-id=cmdaa \
> -Dkubernetes.container.image=cmdaa/pyflink:0.0.1 \
> --pyModule word_count \
> --pyFiles /opt/flink-1.12.2/examples/python/table/batch/word_count.py
>
> The following exception occurs:
>
> 2021-03-04 12:51:41,465 INFO  
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
> flink cluster cmdaa successfully, JobManager Web Interface: 
> http://spackler:8081
> Traceback (most recent call last):
>   File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py", line 
> 193, in _run_module_as_main
> "__main__", mod_spec)
>   File "/Users/admin/.pyenv/versions/3.6.10/lib/python3.6/runpy.py", line 85, 
> in _run_code
> exec(code, run_globals)
>   File 
> "/var/folders/zz/zyxvpxvq6csfxvn_n0/T/pyflink/e1f711ac-7dc7-46ed-8c4e-ed8b3880baf7/928f631e-cc11-4eb5-9234-3a1a8fb7052e/word_count.py",
>  line 80, in 
> word_count()
>   File 
> "/var/folders/zz/zyxvpxvq6csfxvn_n0/T/pyflink/e1f711ac-7dc7-46ed-8c4e-ed8b3880baf7/928f631e-cc11-4eb5-9234-3a1a8fb7052e/word_count.py",
>  line 74, in word_count
> t_env.execute("word_count")
>   File 
> "/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
>  line 1276, in execute
>   File 
> "/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", 
> line 1286, in __call__
>   File "/opt/flink-1.12.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", 
> line 147, in deco
>   File "/opt/flink-1.12.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", 
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
> at 
> org.apache.flink.table.api.internal.BatchTableEnvImpl.executePipeline(BatchTableEnvImpl.scala:352)
> at 
> org.apache.flink.table.api.internal.BatchTableEnvImpl.execute(BatchTableEnvImpl.scala:317)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.table.api.internal.BatchTableEnvImpl.executePipeline(BatchTableEnvImpl.scala:349)
> ... 12 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
> at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
> at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
> at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at 
> java.util.concurrent.CompletableFuture.postComplete(Completab

Re: Convert BIGINT to TIMESTAMP in pyflink when using datastream api

2021-03-04 Thread Shuiqiang Chen
Hi Shilpa,

There might be something wrong when defining the rowtime field with the
Connector descriptor, it’s recommended to use SQL DDL to create tables, and
do queries with table API.

Best,
Shuiqiang

Shilpa Shankar  于2021年3月4日周四 下午9:29写道:

> Hello,
>
> We are using pyflink's datastream api v1.12.1 to consume from kafka and
> want to use one of the fields to act as the "rowtime" for windowing.
> We realize we need to convert BIGINT to TIMESTAMP before we use it as
> "rowtime".
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o91.select.
> : org.apache.flink.table.api.ValidationException: A group window expects a
> time attribute for grouping in a stream environment.
>
> But we are not sure where and how that needs to be implemented.
> Some help here would be really appreciated.
>
> Thanks,
> Shilpa
>
> import os
> from pyflink.table.expressions import lit, Expression
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic
> from pyflink.datastream import CheckpointingMode,
> ExternalizedCheckpointCleanup
> from pyflink.table import StreamTableEnvironment, DataTypes,
> EnvironmentSettings, CsvTableSink, TableConfig
> from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka
> from pyflink.table.window import Slide
>
> def main():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>
> env.enable_checkpointing(6, CheckpointingMode.EXACTLY_ONCE)
> config = env.get_checkpoint_config()
>
> config.enable_externalized_checkpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
>
> st_env = StreamTableEnvironment.create(
> env,
>
> environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> )
>
> register_kafka_source(st_env)
> register_transactions_sink_into_csv(st_env)
>
> #Filter
> st_env.from_path("source") \
>
> .window(Slide.over(lit(2).minutes).every(lit(1).minutes).on("rowtime").alias("w"))
> \
> .group_by("customer_id, w") \
> .select("""customer_id as customer_id,
>  count(*) as total_counts,
>  w.start as start_time,
>  w.end as end_time
>  """) \
> .insert_into("sink_into_csv")
>
> def register_kafka_source(st_env):
> # Add Source
> st_env.connect(
> Kafka() \
> .version("universal") \
> .topic("topic1") \
> .property("group.id", "topic_consumer") \
> .property("security.protocol", "SASL_PLAINTEXT") \
> .property("sasl.mechanism", "PLAIN") \
> .property("bootstrap.servers", "") \
> .property("sasl.jaas.config", "") \
> .start_from_earliest()
> ).with_format(
> Json()
> .fail_on_missing_field(False)
> .schema(
> DataTypes.ROW([
> DataTypes.FIELD("customer_id", DataTypes.STRING()),
> DataTypes.FIELD("time_in_epoch_milliseconds",
> DataTypes.BIGINT())
> ])
> )
> ).with_schema(
> Schema()
> .field("customer_id", DataTypes.STRING())
> .field("rowtime", DataTypes.BIGINT())
> .rowtime(
> Rowtime()
> .timestamps_from_field("time_in_epoch_milliseconds")
> .watermarks_periodic_bounded(10)
> )
> ).in_append_mode(
> ).create_temporary_table(
> "source"
> )
>
>
> def register_transactions_sink_into_csv(env):
> result_file = "/opt/examples/data/output/output_file.csv"
> if os.path.exists(result_file):
> os.remove(result_file)
> env.register_table_sink("sink_into_csv",
> CsvTableSink(["customer_id",
>   "total_count",
>   "start_time",
>   "end_time"],
>  [DataTypes.STRING(),
>   DataTypes.DOUBLE(),
>   DataTypes.TIMESTAMP(3),
>   DataTypes.TIMESTAMP(3)],
>  result_file))
>
> if __name__ == "__main__":
> main()
>
>


Re: Running Pyflink job on K8s Flink Cluster Deployment?

2021-03-06 Thread Shuiqiang Chen
Hi Kevin,

You are able to run PyFlink applications on kuberetes cluster, both native
k8s mode and resource definition mode are supported since release-1.12.0.
Currently, Python and PyFlink are not enabled in official flink docker
image, that you might need to build a custom image with Python and PyFlink
install, please refer to Enbale Python in docker

.

Generally, by setting the value of args field in
`jobmanager-application.yaml` to be args: ["standalone-job", "--python",
"my_python_app.py", , ] the job manager
will try to submit a PyFlink job with the specified python file once it is
started. You can check the pod status for jobmanger and taskmanger via
`kubectl get pods [-n namespace]`. The job manger pod will turn to the
completed state once the job is finished or error state if there is
something wrong, while the task manger pod will always be in the running
state.

Finally, it requires you to tear down the cluster by deleting all created
resources (jobmanger/taskmanger jobs, flink-conf configmap,
jobmanger-service, etc).

Best,
Shuiqiang



Kevin Lam  于2021年3月6日周六 上午5:29写道:

> Hello everyone,
>
> I'm looking to run a Pyflink application run in a distributed fashion,
> using kubernetes, and am currently facing issues. I've successfully gotten
> a Scala Flink Application to run using the manifests provided at [0]
>
> I attempted to run the application by updating the jobmanager command args
> from
>
>  args: ["standalone-job", "--job-classname", "com.job.ClassName",  arguments>, ]
>
> to
>
> args: ["standalone-job", "--python", "my_python_app.py",  arguments>, ]
>
> But this didn't work. It resulted in the following error:
>
> Caused by: java.lang.LinkageError: loader constraint violation: loader
> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
> org.apache.commons.cli.Options. A different class with the same name was
> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
> module of loader 'app'
>
> I was able to get things to 'run' by setting args to:
>
> args: ["python", "my_python_app.py", , ]
>
>
> But I'm not sure if things were running in a distributed fashion or not.
>
> 1/ Is there a good way to check if the task pods were being correctly
> utilized?
>
> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on
> kubernetes?
>
> Open to any suggestions you may have. Note: we'd prefer not to run using
> the native K8S route outlined at [1] because we need to maintain the
> ability to customize certain aspects of the deployment (eg. mounting SSDs
> to some of the pods)
>
> Thanks in advance!
>
> [0]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode
>
>


Re: Running Pyflink job on K8s Flink Cluster Deployment?

2021-03-06 Thread Shuiqiang Chen
rpc
- containerPort: 6125
  name: query-state
livenessProbe:
  tcpSocket:
port: 6122
  initialDelaySeconds: 30
  periodSeconds: 60
volumeMounts:
- name: flink-config-volume
  mountPath: /opt/flink/conf/
securityContext:
  runAsUser:   # refers to user _flink_ from official flink
image, change if necessary
  volumes:
  - name: flink-config-volume
configMap:
  name: flink-config
  items:
  - key: flink-conf.yaml
path: flink-conf.yaml
  - key: log4j-console.properties
path: log4j-console.properties

3. Creating resources:

$ kubectl create -f flink-configuration-configmap.yaml$ kubectl create
-f jobmanager-service.yaml# Create the deployments for the cluster$
kubectl create -f job-manager.yaml$ kubectl create -f
task-manager.yaml

Best,
Shuiqiang

Shuiqiang Chen  于2021年3月6日周六 下午5:10写道:

> Hi Kevin,
>
> You are able to run PyFlink applications on kuberetes cluster, both native
> k8s mode and resource definition mode are supported since release-1.12.0.
> Currently, Python and PyFlink are not enabled in official flink docker
> image, that you might need to build a custom image with Python and PyFlink
> install, please refer to Enbale Python in docker
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#enabling-python>
> .
>
> Generally, by setting the value of args field in
> `jobmanager-application.yaml` to be args: ["standalone-job", "--python",
> "my_python_app.py", , ] the job
> manager will try to submit a PyFlink job with the specified python file
> once it is started. You can check the pod status for jobmanger and
> taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will
> turn to the completed state once the job is finished or error state if
> there is something wrong, while the task manger pod will always be in the
> running state.
>
> Finally, it requires you to tear down the cluster by deleting all created
> resources (jobmanger/taskmanger jobs, flink-conf configmap,
> jobmanger-service, etc).
>
> Best,
> Shuiqiang
>
>
>
> Kevin Lam  于2021年3月6日周六 上午5:29写道:
>
>> Hello everyone,
>>
>> I'm looking to run a Pyflink application run in a distributed fashion,
>> using kubernetes, and am currently facing issues. I've successfully gotten
>> a Scala Flink Application to run using the manifests provided at [0]
>>
>> I attempted to run the application by updating the jobmanager command
>> args from
>>
>>  args: ["standalone-job", "--job-classname", "com.job.ClassName", > arguments>, ]
>>
>> to
>>
>> args: ["standalone-job", "--python", "my_python_app.py", > arguments>, ]
>>
>> But this didn't work. It resulted in the following error:
>>
>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
>> org.apache.commons.cli.Options. A different class with the same name was
>> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
>> module of loader 'app'
>>
>> I was able to get things to 'run' by setting args to:
>>
>> args: ["python", "my_python_app.py", , ]
>>
>>
>> But I'm not sure if things were running in a distributed fashion or not.
>>
>> 1/ Is there a good way to check if the task pods were being correctly
>> utilized?
>>
>> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on
>> kubernetes?
>>
>> Open to any suggestions you may have. Note: we'd prefer not to run using
>> the native K8S route outlined at [1] because we need to maintain the
>> ability to customize certain aspects of the deployment (eg. mounting SSDs
>> to some of the pods)
>>
>> Thanks in advance!
>>
>> [0]
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode
>>
>>


Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Shuiqiang Chen
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0.
And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json
format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema =
JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name,
serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema,
kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink",
SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer)

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen  于2021年3月13日周六 上午12:56写道:

> I’ve scoured the web looking for an example of using a Kafka source for a
> DataStream in python. Can someone finish this example?
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> ds = env.from_collection( KAFKA_SOURCE )
> ...
>
> --
> Robert Cullen
> 240-475-4490
>


Re: Python StreamExecutionEnvironment from_collection Kafka example

2021-03-12 Thread Shuiqiang Chen
Hi Robert,

You can refer to
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
for the whole example.

Best,
Shuiqiang

Robert Cullen  于2021年3月13日周六 上午4:01写道:

> Shuiqiang, Can you include the import statements?  thanks.
>
> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen 
> wrote:
>
>> Hi Robert,
>>
>> Kafka Connector is provided in Python DataStream API since
>> release-1.12.0. And the documentation for it is lacking, we will make it up
>> soon.
>>
>> The following code shows how to apply KafkaConsumers and KafkaProducer:
>> ```
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>
>> # define the schema of the message from kafka, here the data is in json
>> format.
>> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
>> 'payPlatform', 'provinceId'],
>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>> Types.INT()])
>> json_row_schema =
>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>
>> # define the kafka connection properties.
>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>> 'pyflink-e2e-source'}
>>
>> # create the KafkaConsumer and KafkaProducer with the specified topic
>> name, serialization/deserialization schema and properties.
>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>> json_row_schema, kafka_props)
>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>> SimpleStringSchema(), kafka_props)
>>
>> # set the kafka source to consume data from earliest offset.
>> kafka_consumer.set_start_from_earliest()
>>
>> # create a DataStream from kafka consumer source
>> ds = env.add_source(kafka_consumer)
>>
>> result_stream = ...
>>
>> # write the result into kafka by a kafka producer sink.
>> result_stream.add_sink(kafka_producer)
>> ```
>>
>> Best,
>> Shuiqiang
>>
>> Robert Cullen  于2021年3月13日周六 上午12:56写道:
>>
>>> I’ve scoured the web looking for an example of using a Kafka source for
>>> a DataStream in python. Can someone finish this example?
>>>
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>> ds = env.from_collection( KAFKA_SOURCE )
>>> ...
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>


Re: Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-14 Thread Shuiqiang Chen
Hi Kevin,

Sorry for the late reply.

Actually, you are able to pass arguments to the constructor of the Java
object when instancing in Python. Basic data types
(char/boolean/int/long/float/double/string, etc) can be directory passed
while complex types (array/list/map/POJO, etc) must be converted to java
objects before passing. Please refer to
https://www.py4j.org/py4j_java_collections.html for more information.

Best,
Shuiqiang

Kevin Lam  于2021年3月11日周四 上午4:28写道:

> A follow-up question--In the example you provided Shuiqiang, there were no
> arguments passed to the constructor of the custom sink/source.
>
> What's the best way to pass arguments to the constructor?
>
> On Fri, Mar 5, 2021 at 4:29 PM Kevin Lam  wrote:
>
>> Thanks Shuiqiang! That's really helpful, we'll give the connectors a try.
>>
>> On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen 
>> wrote:
>>
>>> Hi Kevin,
>>>
>>> Thank you for your questions. Currently, users are not able to defined
>>> custom source/sinks in Python. This is a greate feature that can unify the
>>> end to end PyFlink application development in Python and is a large topic
>>> that we have no plan to support at present.
>>>
>>> As you have noticed that `the Python DataStream API has several
>>> connectors [2] that use Py4J+Java gateways to interoperate with Java
>>> source/sinks`. These connectors are the extensions of the Python abstract
>>> class named `SourceFunction` and `SinkFunction`. Thess two classes can
>>> accept a Java source/sink instance and maintain it to enable the
>>> interoperation between Python and Java.  They can also accept a string of
>>> the full name of a Java/Scala defined Source/SinkFunction class and create
>>> the corresponding java instance. Bellow are the definition of these classes:
>>>
>>> class JavaFunctionWrapper(object):
>>> """
>>> A wrapper class that maintains a Function implemented in Java.
>>> """
>>>
>>> def __init__(self, j_function: Union[str, JavaObject]):
>>> # TODO we should move this part to the get_java_function() to 
>>> perform a lazy load.
>>> if isinstance(j_function, str):
>>> j_func_class = get_gateway().jvm.__getattr__(j_function)
>>> j_function = j_func_class()
>>> self._j_function = j_function
>>>
>>> def get_java_function(self):
>>> return self._j_function
>>>
>>>
>>>
>>> class SourceFunction(JavaFunctionWrapper):
>>> """
>>> Base class for all stream data source in Flink.
>>> """
>>>
>>> def __init__(self, source_func: Union[str, JavaObject]):
>>> """
>>> Constructor of SinkFunction.
>>>
>>> :param source_func: The java SourceFunction object.
>>> """
>>> super(SourceFunction, self).__init__(source_func)
>>>
>>>
>>> class SinkFunction(JavaFunctionWrapper):
>>> """
>>> The base class for SinkFunctions.
>>> """
>>>
>>> def __init__(self, sink_func: Union[str, JavaObject]):
>>> """
>>> Constructor of SinkFunction.
>>>
>>> :param sink_func: The java SinkFunction object or the full name of the
>>> SinkFunction class.
>>> """
>>> super(SinkFunction, self).__init__(sink_func)
>>>
>>> Therefore, you are able to defined custom sources/sinks in Scala and
>>> apply them in Python. Here is the recommended approach for implementation:
>>>
>>> class MyBigTableSink(SinkFunction):
>>> def __init__(self, class_name: str):
>>> super(MyBigTableSink, self).__init__(class_name)
>>>
>>>
>>> def example():
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.add_jars('/the/path/of/your/MyBigTableSink.jar')
>>> # ...
>>> ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
>>> env.execute("Application with Custom Sink")
>>>
>>>
>>> if __name__ == '__main__':
>>> example()
>>>
>>> Remember that you must add the jar of the Scala defined SinkFunction by
>>> calling `env.add_jars()` before adding the SinkFunction. And your custom
>>> sources/sinks function must be the extension of `SourceFunction` and
>>&g

Re: Working with DataStreams of Java objects in Pyflink?

2021-03-15 Thread Shuiqiang Chen
Hi Kevin,

Currently, POJO type is not supported in Python DataStream API because it
is hard to deal with the conversion between Python Objects and Java
Objects. Maybe you can use a RowType to represent the POJO class such as
Types.ROW_NAME([id, created_at, updated_at], [Types.LONG(), Types.LONG(),
Types.LONG()]). We will try to support the POJO type in the future.

Best,
Shuiqiang

Kevin Lam  于2021年3月15日周一 下午10:46写道:

> Hi all,
>
> Looking to use Pyflink to work with some scala-defined objects being
> emitted from a custom source. When trying to manipulate the objects in a
> pyflink defined MapFunction
> ,
> I'm hitting an error like:
>
> Caused by: java.lang.UnsupportedOperationException: The type information:
> Option[<...>$Record(id: Long, created_at: Option[Long], updated_at:
> Option[Long])] is not supported in PyFlink currently.
>
> The scala object is defined something like:
>
> ```
> object <...> {
>   case class Record(
> id: Long,
> created_at: Option[Long],
> updated_at: Option[Long],
> ...
>   )
> }
> ```
>
> The pyflink code is something like:
>
> ```
> class Mutate(MapFunction):
>   def map(self,value):
> print(value.id)
> value.id = 123
>
> ...
>
> records = env.add_source(..)
> records = records.map(Mutate()
> ```
>
> Can you provide any advice on how to work with these kinds of objects in
> Pyflink?
>
> Thanks in advance!
>


Re: Pyflink tutorial output

2021-03-23 Thread Shuiqiang Chen
Hi Robert,

Have you tried exploring the /tmp/output directory in the task manager pods
on you kubernetes cluster? The StreamingFileSink will create the output
directory on the host of task manager in which the sink tasks are executed.

Best,
Shuiqiang

Robert Cullen  于2021年3月24日周三 上午2:48写道:

> I’m running this script taken from the Flink website: tutorial.py
>
> python tutorial.py
>
> from pyflink.common.serialization import SimpleStringEncoder
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import StreamingFileSink
>
> def tutorial():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> ds = env.from_collection(
> collection=[(1, 'aaa'), (2, 'bbb')],
> type_info=Types.ROW([Types.INT(), Types.STRING()]))
> ds.add_sink(StreamingFileSink
> .for_row_format('/tmp/output', SimpleStringEncoder())
> .build())
> env.execute("tutorial_job")
>
> if __name__ == '__main__':
> tutorial()
>
> It correctly outputs a part file to the /tmp/output directory when I run
> it locally. However when I run this on my kubernetes session cluster there
> is no output. Any ideas?
>
> ./bin/flink run \
> --target kubernetes-session \
> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
> --pyModule tutorial \
> --pyFiles /opt/flink-1.12.0/examples/tutorial.py \
> --detached
>
> --
> Robert Cullen
> 240-475-4490
>


Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-24 Thread Shuiqiang Chen
Hi Kevin,

Kinesis connector is not supported yet in Python DataStream API. We will
add it in the future.

Best,
Shuiqiang

Bohinski, Kevin  于2021年3月25日周四 上午5:03写道:

> Is there a kinesis example?
>
>
>
> *From: *"Bohinski, Kevin" 
> *Date: *Wednesday, March 24, 2021 at 4:40 PM
> *To: *"Bohinski, Kevin" 
> *Subject: *Re: PyFlink DataStream Example Kafka/Kinesis?
>
>
>
> Nevermind, found this for anyone else looking:
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>
>
>
> *From: *"Bohinski, Kevin" 
> *Date: *Wednesday, March 24, 2021 at 4:38 PM
> *To: *user 
> *Subject: *PyFlink DataStream Example Kafka/Kinesis?
>
>
>
> Hi,
>
>
>
> Is there an example kafka/kinesis source or sink for the PyFlink
> DataStream API?
>
>
>
> Best,
>
> kevin
>


Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-24 Thread Shuiqiang Chen
I have just created the jira
https://issues.apache.org/jira/browse/FLINK-21966 and will finish it soon.

Best,
Shuiqiang

Xinbin Huang  于2021年3月25日周四 上午10:43写道:

> Hi Shuiqiang,
>
> I am interested in the same feature. Do we have a ticket to track this
> right now?
>
> Best
> Bin
>
> On Wed, Mar 24, 2021 at 7:30 PM Shuiqiang Chen 
> wrote:
>
>> Hi Kevin,
>>
>> Kinesis connector is not supported yet in Python DataStream API. We will
>> add it in the future.
>>
>> Best,
>> Shuiqiang
>>
>> Bohinski, Kevin  于2021年3月25日周四 上午5:03写道:
>>
>>> Is there a kinesis example?
>>>
>>>
>>>
>>> *From: *"Bohinski, Kevin" 
>>> *Date: *Wednesday, March 24, 2021 at 4:40 PM
>>> *To: *"Bohinski, Kevin" 
>>> *Subject: *Re: PyFlink DataStream Example Kafka/Kinesis?
>>>
>>>
>>>
>>> Nevermind, found this for anyone else looking:
>>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>>>
>>>
>>>
>>> *From: *"Bohinski, Kevin" 
>>> *Date: *Wednesday, March 24, 2021 at 4:38 PM
>>> *To: *user 
>>> *Subject: *PyFlink DataStream Example Kafka/Kinesis?
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> Is there an example kafka/kinesis source or sink for the PyFlink
>>> DataStream API?
>>>
>>>
>>>
>>> Best,
>>>
>>> kevin
>>>
>>


Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-24 Thread Shuiqiang Chen
Sure, no problem.  You can refer to the implementation of Kafka connector,
they are very much alike.

Xinbin Huang  于2021年3月25日周四 上午10:55写道:

> Hi Shuiqiang,
>
> Thanks for the quick response on creating the ticket for Kinesis
> Connector. Do you mind giving me the chance to try to implement the
> connector over the weekend?
>
> I am interested in contributing to Flink, and I think this can be a good
> starting point to me
>
> Best
> Bin
>
> On Wed, Mar 24, 2021 at 7:49 PM Shuiqiang Chen 
> wrote:
>
>> I have just created the jira
>> https://issues.apache.org/jira/browse/FLINK-21966 and will finish it
>> soon.
>>
>> Best,
>> Shuiqiang
>>
>> Xinbin Huang  于2021年3月25日周四 上午10:43写道:
>>
>>> Hi Shuiqiang,
>>>
>>> I am interested in the same feature. Do we have a ticket to track this
>>> right now?
>>>
>>> Best
>>> Bin
>>>
>>> On Wed, Mar 24, 2021 at 7:30 PM Shuiqiang Chen 
>>> wrote:
>>>
>>>> Hi Kevin,
>>>>
>>>> Kinesis connector is not supported yet in Python DataStream API. We
>>>> will add it in the future.
>>>>
>>>> Best,
>>>> Shuiqiang
>>>>
>>>> Bohinski, Kevin  于2021年3月25日周四 上午5:03写道:
>>>>
>>>>> Is there a kinesis example?
>>>>>
>>>>>
>>>>>
>>>>> *From: *"Bohinski, Kevin" 
>>>>> *Date: *Wednesday, March 24, 2021 at 4:40 PM
>>>>> *To: *"Bohinski, Kevin" 
>>>>> *Subject: *Re: PyFlink DataStream Example Kafka/Kinesis?
>>>>>
>>>>>
>>>>>
>>>>> Nevermind, found this for anyone else looking:
>>>>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>>>>>
>>>>>
>>>>>
>>>>> *From: *"Bohinski, Kevin" 
>>>>> *Date: *Wednesday, March 24, 2021 at 4:38 PM
>>>>> *To: *user 
>>>>> *Subject: *PyFlink DataStream Example Kafka/Kinesis?
>>>>>
>>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>>
>>>>> Is there an example kafka/kinesis source or sink for the PyFlink
>>>>> DataStream API?
>>>>>
>>>>>
>>>>>
>>>>> Best,
>>>>>
>>>>> kevin
>>>>>
>>>>


Re: [EXTERNAL] Re: PyFlink DataStream Example Kafka/Kinesis?

2021-03-25 Thread Shuiqiang Chen
Hi Kevin, Xinbin,


Hi Shuiqiang,
>
> Thanks for the quick response on creating the ticket for Kinesis
> Connector. Do you mind giving me the chance to try to implement the
> connector over the weekend?
>
> I am interested in contributing to Flink, and I think this can be a good
> starting point to me
>
> Best
> Bin
>

Sorry for the confusion. Xinbin had personal letter that could not be
displayed in this email thread that he is interested in solving this issue
https://issues.apache.org/jira/browse/FLINK-21966.  So both of you can
participate in this contribution as you like.

Best,
Shuiqiang

Bohinski, Kevin  于2021年3月25日周四 下午12:00写道:

> Hi Shuiqiang,
>
>
>
> Thanks for letting me know. Feel free to send any beginner level
> contributions for this effort my way 😊 .
>
>
>
> Best,
>
> kevin
>
>
>
> *From: *Shuiqiang Chen 
> *Date: *Wednesday, March 24, 2021 at 10:31 PM
> *To: *"Bohinski, Kevin" 
> *Cc: *user 
> *Subject: *[EXTERNAL] Re: PyFlink DataStream Example Kafka/Kinesis?
>
>
>
> Hi Kevin,
>
>
>
> Kinesis connector is not supported yet in Python DataStream API. We will
> add it in the future.
>
>
>
> Best,
>
> Shuiqiang
>
>
>
> Bohinski, Kevin  于2021年3月25日周四 上午5:03写道:
>
> Is there a kinesis example?
>
>
>
> *From: *"Bohinski, Kevin" 
> *Date: *Wednesday, March 24, 2021 at 4:40 PM
> *To: *"Bohinski, Kevin" 
> *Subject: *Re: PyFlink DataStream Example Kafka/Kinesis?
>
>
>
> Nevermind, found this for anyone else looking:
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
> <https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py__;!!CQl3mcHX2A!QFaB5DO1ZU_Sx8v59FCwTcqH2lAH3CrM8-jFD1xIuUN-LvDep2fAnOlpFTwBV7CSMLggLw$>
>
>
>
> *From: *"Bohinski, Kevin" 
> *Date: *Wednesday, March 24, 2021 at 4:38 PM
> *To: *user 
> *Subject: *PyFlink DataStream Example Kafka/Kinesis?
>
>
>
> Hi,
>
>
>
> Is there an example kafka/kinesis source or sink for the PyFlink
> DataStream API?
>
>
>
> Best,
>
> kevin
>
>


Long checkpoint duration for Kafka source operators

2021-05-13 Thread Hubert Chen
Hello,

I have an application that reads from two Kafka sources, joins them, and
produces to a Kafka sink. The application is experiencing long end to end
checkpoint durations for the Kafka source operators. I'm hoping I could get
some direction in how to debug this further.

Here is a UI screenshot of a checkpoint instance:

[image: checkpoint.png]

My goal is to bring the total checkpoint duration to sub-minute.

Here are some observations I made:

   - Each source operator task has an E2E checkpoint duration of 1m 7s
   - Each source operator task has sub 100ms sync, async, aligned buffered,
   and start delay
   - Each join operator task has a start delay of 1m 7s
   - There is no backpressure in any operator

These observations are leading me to believe that the source operator is
taking a long amount of time to checkpoint. I find this a bit strange as
the fushioned operator is fairly light. It deserializes the event, assigns
a watermark, and might perform two filters. In addition, it's odd that both
source operators have tasks with all the same E2E checkpoint duration.

Is there some sort of locking that's occurring on the source operators that
can explain these long E2E durations?

Best,
Hubert


Prometheus Reporter Enhancement

2021-05-18 Thread Mason Chen
Hi all,

Would people appreciate enhancements to the prometheus reporter to include 
extra labels via a configuration, as a contribution to Flink? I can see it 
being useful for adding labels that are not job specific, but infra specific.

The change would be nicely integrated with the Flink’s ConfigOptions and unit 
tested.

Best,
Mason


Re: Prometheus Reporter Enhancement

2021-05-19 Thread Mason Chen
Are there any plans to rework some of the metric name formulations 
(getMetricIdentifier or getLogicalScope)? Currently, the label keys and/or 
label values are concatenated in the metric name and the information is 
redundant and makes the metric names longer.

Would it make sense to remove the tag related information (getAllVariables())?

> On May 18, 2021, at 3:45 PM, Chesnay Schepler  wrote:
> 
> There is already a ticket for this. Note that this functionality should be 
> implemented in a generic fashion to be usable for all reporters.
> 
> https://issues.apache.org/jira/browse/FLINK-17495 
> <https://issues.apache.org/jira/browse/FLINK-17495>
> 
> On 5/18/2021 8:16 PM, Andrew Otto wrote:
>> Sounds useful!
>> 
>> On Tue, May 18, 2021 at 2:02 PM Mason Chen > <mailto:mason.c...@apple.com>> wrote:
>> Hi all,
>> 
>> Would people appreciate enhancements to the prometheus reporter to include 
>> extra labels via a configuration, as a contribution to Flink? I can see it 
>> being useful for adding labels that are not job specific, but infra specific.
>> 
>> The change would be nicely integrated with the Flink’s ConfigOptions and 
>> unit tested.
>> 
>> Best,
>> Mason
> 



Re: Long checkpoint duration for Kafka source operators

2021-05-20 Thread Hubert Chen
For the poor soul that stumbles upon this in the future, just increase your
JM resources.

I thought for sure this must have been the TM experiencing some sort of
backpressure. I tried everything from enabling universal compaction to
unaligned checkpoints to profiling the TM. It wasn't until I enabled AWS
debug logs that I noticed the JM will make a lot of DELETE requests to AWS
after a successful checkpoint. If the checkpoint interval is short and the
JM resources limited, then I believe the checkpoint barrier will be delayed
causing long start delays. The JM is too busy making AWS requests to inject
the barrier. After I increased the JM resources, the long start delays
disappeared.

On Thu, May 13, 2021 at 1:56 PM Hubert Chen  wrote:

> Hello,
>
> I have an application that reads from two Kafka sources, joins them, and
> produces to a Kafka sink. The application is experiencing long end to end
> checkpoint durations for the Kafka source operators. I'm hoping I could get
> some direction in how to debug this further.
>
> Here is a UI screenshot of a checkpoint instance:
>
> [image: checkpoint.png]
>
> My goal is to bring the total checkpoint duration to sub-minute.
>
> Here are some observations I made:
>
>- Each source operator task has an E2E checkpoint duration of 1m 7s
>- Each source operator task has sub 100ms sync, async, aligned
>buffered, and start delay
>- Each join operator task has a start delay of 1m 7s
>- There is no backpressure in any operator
>
> These observations are leading me to believe that the source operator is
> taking a long amount of time to checkpoint. I find this a bit strange as
> the fushioned operator is fairly light. It deserializes the event, assigns
> a watermark, and might perform two filters. In addition, it's odd that both
> source operators have tasks with all the same E2E checkpoint duration.
>
> Is there some sort of locking that's occurring on the source operators
> that can explain these long E2E durations?
>
> Best,
> Hubert
>


Flink Metrics Naming

2021-05-28 Thread Mason Chen
Can anyone give insight as to why Flink allows 2 metrics with the same “name”?

For example,

getRuntimeContext.addGroup(“group”, “group1”).counter(“myMetricName”);

And

getRuntimeContext.addGroup(“other_group”, 
“other_group1”).counter(“myMetricName”);

Are totally valid.


It seems that it has lead to some not-so-great implementations—the prometheus 
reporter and attaching the labels to the metric name, making the name quite 
verbose.




Re: Flink Metrics Naming

2021-06-01 Thread Mason Chen
Makes sense. We are primarily concerned with removing the metric labels from 
the names as the user metrics get too long. i.e. the groups from `addGroup` are 
concatenated in the metric name.

Do you think there would be any issues with removing the group information in 
the metric name and putting them into a label instead? In seems like most 
metrics internally, don’t use `addGroup` to create group information but rather 
by creating another subclass of metric group.

Perhaps, I should ONLY apply this custom logic to metrics with the “user” 
scope? Other scoped metrics (e.g. operator, task operator, etc.) shouldn’t have 
these group names in the metric names in my experience...

An example just for clarity, 
flink__group1_group2_metricName{group1=…, group2=…, flink tags}

=>

flink__metricName{group_info=group1_group2, group1=…, group2=…, 
flink tags}

> On Jun 1, 2021, at 9:57 AM, Chesnay Schepler  wrote:
> 
> The uniqueness of metrics and the naming of the Prometheus reporter are 
> somewhat related but also somewhat orthogonal.
> 
> Prometheus works similar to JMX in that the metric name (e.g., 
> taskmanager.job.task.operator.numRecordsIn) is more or less a _class_ of 
> metrics, with tags/labels allowing you to select a specific instance of that 
> metric.
> 
> Restricting metric names to 1 level of the hierarchy would present a few 
> issues:
> a) Effectively, all metric names that Flink uses effectively become reserved 
> keywords that users must not use, which will lead to headaches when adding 
> more metrics or forwarding metrics from libraries (e.g., kafka), because we 
> could always break existing user-defined metrics.
> b) You'd need a cluster-wide lookup that is aware of all hierarchies to 
> ensure consistency across all processes.
> 
> In the end, there are significantly easier ways to solve the issue of the 
> metric name being too long, i.e., give the user more control over the logical 
> scope (taskmanager.job.task.operator), be it shortening the names (t.j.t.o), 
> limiting the depth (e.g, operator.numRecordsIn), removing it outright (but 
> I'd prefer some context to be present for clarity) or supporting something 
> similar to scope formats.
> I'm reasonably certain there are some tickets already in this direction, we 
> just don't get around to doing them because for the most part the metric 
> system works good enough and there are bigger fish to fry.
> 
> On 6/1/2021 3:39 PM, Till Rohrmann wrote:
>> Hi Mason,
>> 
>> The idea is that a metric is not uniquely identified by its name alone but 
>> instead by its path. The groups in which it is defined specify this path 
>> (similar to directories). That's why it is valid to specify two metrics with 
>> the same name if they reside in different groups.
>> 
>> I think Prometheus does not support such a tree structure and that's why the 
>> path is exposed via labels if I am not mistaken. So long story short, what 
>> you are seeing is a combination of how Flink organizes metrics and what can 
>> be reported to Prometheus. 
>> 
>> I am also pulling in Chesnay who is more familiar with this part of the code.
>> 
>> Cheers,
>> Till
>> 
>> On Fri, May 28, 2021 at 7:33 PM Mason Chen > <mailto:mason.c...@apple.com>> wrote:
>> Can anyone give insight as to why Flink allows 2 metrics with the same 
>> “name”?
>> 
>> For example,
>> 
>> getRuntimeContext.addGroup(“group”, “group1”).counter(“myMetricName”);
>> 
>> And
>> 
>> getRuntimeContext.addGroup(“other_group”, 
>> “other_group1”).counter(“myMetricName”);
>> 
>> Are totally valid.
>> 
>> 
>> It seems that it has lead to some not-so-great implementations—the 
>> prometheus reporter and attaching the labels to the metric name, making the 
>> name quite verbose.
>> 
>> 
> 



Re: Flink Metrics Naming

2021-06-01 Thread Mason Chen
Upon further inspection, it seems like the user scope is not universal (i.e. 
comes through the connectors and not UDFs (like rich map function)), but the 
question still stands if the process makes sense.

> On Jun 1, 2021, at 10:38 AM, Mason Chen  wrote:
> 
> Makes sense. We are primarily concerned with removing the metric labels from 
> the names as the user metrics get too long. i.e. the groups from `addGroup` 
> are concatenated in the metric name.
> 
> Do you think there would be any issues with removing the group information in 
> the metric name and putting them into a label instead? In seems like most 
> metrics internally, don’t use `addGroup` to create group information but 
> rather by creating another subclass of metric group.
> 
> Perhaps, I should ONLY apply this custom logic to metrics with the “user” 
> scope? Other scoped metrics (e.g. operator, task operator, etc.) shouldn’t 
> have these group names in the metric names in my experience...
> 
> An example just for clarity, 
> flink__group1_group2_metricName{group1=…, group2=…, flink tags}
> 
> =>
> 
> flink__metricName{group_info=group1_group2, group1=…, group2=…, 
> flink tags}
> 
>> On Jun 1, 2021, at 9:57 AM, Chesnay Schepler > <mailto:ches...@apache.org>> wrote:
>> 
>> The uniqueness of metrics and the naming of the Prometheus reporter are 
>> somewhat related but also somewhat orthogonal.
>> 
>> Prometheus works similar to JMX in that the metric name (e.g., 
>> taskmanager.job.task.operator.numRecordsIn) is more or less a _class_ of 
>> metrics, with tags/labels allowing you to select a specific instance of that 
>> metric.
>> 
>> Restricting metric names to 1 level of the hierarchy would present a few 
>> issues:
>> a) Effectively, all metric names that Flink uses effectively become reserved 
>> keywords that users must not use, which will lead to headaches when adding 
>> more metrics or forwarding metrics from libraries (e.g., kafka), because we 
>> could always break existing user-defined metrics.
>> b) You'd need a cluster-wide lookup that is aware of all hierarchies to 
>> ensure consistency across all processes.
>> 
>> In the end, there are significantly easier ways to solve the issue of the 
>> metric name being too long, i.e., give the user more control over the 
>> logical scope (taskmanager.job.task.operator), be it shortening the names 
>> (t.j.t.o), limiting the depth (e.g, operator.numRecordsIn), removing it 
>> outright (but I'd prefer some context to be present for clarity) or 
>> supporting something similar to scope formats.
>> I'm reasonably certain there are some tickets already in this direction, we 
>> just don't get around to doing them because for the most part the metric 
>> system works good enough and there are bigger fish to fry.
>> 
>> On 6/1/2021 3:39 PM, Till Rohrmann wrote:
>>> Hi Mason,
>>> 
>>> The idea is that a metric is not uniquely identified by its name alone but 
>>> instead by its path. The groups in which it is defined specify this path 
>>> (similar to directories). That's why it is valid to specify two metrics 
>>> with the same name if they reside in different groups.
>>> 
>>> I think Prometheus does not support such a tree structure and that's why 
>>> the path is exposed via labels if I am not mistaken. So long story short, 
>>> what you are seeing is a combination of how Flink organizes metrics and 
>>> what can be reported to Prometheus. 
>>> 
>>> I am also pulling in Chesnay who is more familiar with this part of the 
>>> code.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Fri, May 28, 2021 at 7:33 PM Mason Chen >> <mailto:mason.c...@apple.com>> wrote:
>>> Can anyone give insight as to why Flink allows 2 metrics with the same 
>>> “name”?
>>> 
>>> For example,
>>> 
>>> getRuntimeContext.addGroup(“group”, “group1”).counter(“myMetricName”);
>>> 
>>> And
>>> 
>>> getRuntimeContext.addGroup(“other_group”, 
>>> “other_group1”).counter(“myMetricName”);
>>> 
>>> Are totally valid.
>>> 
>>> 
>>> It seems that it has lead to some not-so-great implementations—the 
>>> prometheus reporter and attaching the labels to the metric name, making the 
>>> name quite verbose.
>>> 
>>> 
>> 
> 



Re: Prometheus Reporter Enhancement

2021-06-02 Thread Mason Chen
Hi Chesnay,

I would like to take on https://issues.apache.org/jira/browse/FLINK-17495 
<https://issues.apache.org/jira/browse/FLINK-17495> as a contribution to OSS, 
if that’s alright with the team. We can discuss implementation details here or 
in the ticket, but I was thinking about modifying the ReporterScopedSettings to 
enable this generic tag support.

Best,
Mason

> On May 20, 2021, at 4:36 AM, Chesnay Schepler  wrote:
> 
> There is no plan to generally exclude label keys from the metric 
> identifier/logical scope. They ensure that the label set for a given 
> identifier/scope is unique, i.e., you can't have 2 metrics called 
> "numRecordsIn" with different label sets. Changing this would also break all 
> existing setups, so if anything if would have to be an opt-in feature.
> 
> What I envision more is for the user to have more control over the metric 
> identifier/logical scope via the scope formats. They are currently quite 
> limited by only   controlling part of the final identifier, while the 
> logical scope isn't controllable at all.
> 
> Generally though, there's a fair bit of internal re-structuring that we'd 
> like to do before extending the metric system further, because we've been 
> tacking on more and more things since it was released in 1.3.0 (!!!) but 
> barely refactored things to properly fit together.
> 
> On 5/20/2021 12:58 AM, Mason Chen wrote:
>> Are there any plans to rework some of the metric name formulations 
>> (getMetricIdentifier or getLogicalScope)? Currently, the label keys and/or 
>> label values are concatenated in the metric name and the information is 
>> redundant and makes the metric names longer.
>> 
>> Would it make sense to remove the tag related information 
>> (getAllVariables())?
>> 
>>> On May 18, 2021, at 3:45 PM, Chesnay Schepler >> <mailto:ches...@apache.org>> wrote:
>>> 
>>> There is already a ticket for this. Note that this functionality should be 
>>> implemented in a generic fashion to be usable for all reporters.
>>> 
>>> https://issues.apache.org/jira/browse/FLINK-17495 
>>> <https://issues.apache.org/jira/browse/FLINK-17495>
>>> 
>>> On 5/18/2021 8:16 PM, Andrew Otto wrote:
>>>> Sounds useful!
>>>> 
>>>> On Tue, May 18, 2021 at 2:02 PM Mason Chen >>> <mailto:mason.c...@apple.com>> wrote:
>>>> Hi all,
>>>> 
>>>> Would people appreciate enhancements to the prometheus reporter to include 
>>>> extra labels via a configuration, as a contribution to Flink? I can see it 
>>>> being useful for adding labels that are not job specific, but infra 
>>>> specific.
>>>> 
>>>> The change would be nicely integrated with the Flink’s ConfigOptions and 
>>>> unit tested.
>>>> 
>>>> Best,
>>>> Mason
>>> 
>> 
> 



subscribe

2021-06-03 Thread Boyang Chen



Re: Flink exported metrics scope configuration

2021-06-03 Thread Mason Chen
Hi Kai,

You can use the excluded variables config for the reporter. 
metrics.reporter..scope.variables.excludes: (optional) A semi-colon (;) 
separate list of variables that should be ignored by tag-based reporters (e.g., 
Prometheus, InfluxDB).

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/#reporter
 


Best,
Mason

> On Jun 3, 2021, at 9:31 PM, Kai Fu  wrote:
> 
> Hi team,
> 
> We noticed that Prometheus metrics exporter exports all of the metrics at the 
> most fine-grained level, which is tremendous for the prometheus server 
> especially when the parallelism is high. The metrics volume crawled from a 
> single host(parallelism 8) is around 40MB for us currently. This is due to 
> task_name attribute in the metrics generated by the engine being very long. 
> The task_name attribute is auto-generated from SQL job, and it seems it's 
> attaching all field names onto it.
> 
> We want to reduce the metrics volume by either drop task_name or at some more 
> coarse-grained level. But I cannot find any related documents about this, any 
> advice on that? 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/
>  
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#list-of-all-variables
>  
> 
> 
> -- 
> Best wishes,
> - Kai



unsubscribe

2021-06-21 Thread steven chen
unsubscribe

Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-07 Thread Zili Chen
Congrats Hequn!

Best,
tison.


Jeff Zhang  于2019年8月7日周三 下午5:14写道:

> Congrats Hequn!
>
> Paul Lam  于2019年8月7日周三 下午5:08写道:
>
>> Congrats Hequn! Well deserved!
>>
>> Best,
>> Paul Lam
>>
>> 在 2019年8月7日,16:28,jincheng sun  写道:
>>
>> Hi everyone,
>>
>> I'm very happy to announce that Hequn accepted the offer of the Flink PMC
>> to become a committer of the Flink project.
>>
>> Hequn has been contributing to Flink for many years, mainly working on
>> SQL/Table API features. He's also frequently helping out on the user
>> mailing lists and helping check/vote the release.
>>
>> Congratulations Hequn!
>>
>> Best, Jincheng
>> (on behalf of the Flink PMC)
>>
>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: some slots are not be available,when job is not running

2019-08-09 Thread Zili Chen
Hi,

Could you attach the stack trace in exception or relevant logs?

Best,
tison.


pengcheng...@bonc.com.cn  于2019年8月9日周五 下午4:55写道:

> Hi,
>
> Why are some slots unavailable?
>
> My cluster model is standalone,and high-availability mode is zookeeper.
> task.cancellation.timeout: 0
> some slots are not be available,when job is not running.
>
>
>
> --
> pengcheng...@bonc.com.cn
>


Re: Why available task slots are not leveraged for pipeline?

2019-08-12 Thread Zili Chen
Hi Cam,

If you set parallelism to 60, then you would make use of all 60 slots you
have and
for you case, each slot executes a chained operator contains 13 tasks. It
is not
the case one slot executes at least 60 sub-tasks.

Best,
tison.


Cam Mach  于2019年8月12日周一 下午7:55写道:

> Hi Zhu and Abhishek,
>
> Thanks for your response and pointers. It's correct, the count of
> parallelism will be the number of slot used for a pipeline. And, the number
> (or count) of the parallelism is also used to generate number of sub-tasks
> for each operator. In my case, I have parallelism of 60, it generates 60
> sub-tasks for each operator. And so it'll be too much for one slot execute
> at least 60 sub-tasks. I am wondering if there is a way we can set number
> of generated sub-tasks, different than number of parallelism?
>
> Cam Mach
> Software Engineer
> E-mail: cammac...@gmail.com
> Tel: 206 972 2768
>
>
>
> On Sun, Aug 11, 2019 at 10:37 PM Zhu Zhu  wrote:
>
>> Hi Cam,
>> This case is expected due to slot sharing.
>> A slot can be shared by one instance of different tasks. So the used slot
>> is count of your max parallelism of a task.
>> You can specify the shared group with slotSharingGroup(String
>> slotSharingGroup) on operators.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Abhishek Jain  于2019年8月12日周一 下午1:23写道:
>>
>>> What you'se seeing is likely operator chaining. This is the default
>>> behaviour of grouping sub tasks to avoid transer overhead (from one slot to
>>> another). You can disable chaining if you need to. Please refer task
>>> and operator chains
>>> 
>>> .
>>>
>>> - Abhishek
>>>
>>> On Mon, 12 Aug 2019 at 09:56, Cam Mach  wrote:
>>>
 Hello Flink expert,

 I have a cluster with 10 Task Managers, configured with 6 task slot
 each, and a pipeline that has 13 tasks/operators with parallelism of 5. But
 when running the pipeline I observer that only  5 slots are being used, the
 other 55 slots are available/free. It should use all of my slots, right?
 since I have 13 (tasks) x 5 = 65 sub-tasks? What are the configuration that
 I missed in order to leverage all of the available slots for my pipelines?

 Thanks,
 Cam


>>>


Re: How can I pass multiple java options in standalone mode ?

2019-08-12 Thread Zili Chen
Hi Vishwas,

Replace ',' with ' '(space) should work.

Best,
tison.


Vishwas Siravara  于2019年8月13日周二 上午6:50写道:

> Hi guys,
> I have this entry in flink-conf.yaml file for jvm options.
> env.java.opts: "-Djava.security.auth.login.config={{ flink_installed_dir
> }}/kafka-jaas.conf,-Djava.security.krb5.conf={{ flink_installed_dir
> }}/krb5.conf"
>
> Is this supposed to be a , separated list ? I get a parse exception when
> the cluster starts.
>
> Thanks,
> Vishwas
>


Re: JDBC sink for streams API

2019-08-12 Thread Zili Chen
Hi Eduardo,

JDBCSinkFunction is a package-private class which you can make use of
by JDBCAppendTableSink. A typical statement could be

new JDBCAppendTableSink.builder()
   . ...
   .build()
   .consumeDataStream(upstream);

Also JDBCUpsertTableSink and JDBCTableSourceSinkFactory are worth to give a
look.

The successor of OutputFormatSinkFunction is StreamingFileSink, but it
seems out of
your requirements.

Best,
tison.


Eduardo Winpenny Tejedor  于2019年8月13日周二
上午7:59写道:

> Hi all,
>
> Could someone point me to the current advised way of adding a JDBC sink?
>
> Online I've seen one can DataStream.writeUsingOutputFormat() however I see
> the OutputFormatSinkFunction is deprecated.
>
> I can also see a JDBCSinkFunction (or JDBCUpsertSinkFunction) but that is
> "package private" so I can't use it myself. JDBCAppendSinkTable makes use
> of it but that doesn't work with the Streams API...I tried simply copying
> its code into a class of my own alas the flush method in JDBCOutputFormat
> is also "package private"...
>
> JDBC does not receive a lot of attention in Fabian's and Vasili's Stream
> Processing with Apache Flink or in the online documentation - is there a
> reason for this? Is it bad practice?
>
>
> Regards,
> Eduardo
>


Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer

2019-08-14 Thread Zili Chen
Congratulations Andrey!

Best,
tison.


Till Rohrmann  于2019年8月14日周三 下午9:26写道:

> Hi everyone,
>
> I'm very happy to announce that Andrey Zagrebin accepted the offer of the
> Flink PMC to become a committer of the Flink project.
>
> Andrey has been an active community member for more than 15 months. He
> has helped shaping numerous features such as State TTL, FRocksDB release,
> Shuffle service abstraction, FLIP-1, result partition management and
> various fixes/improvements. He's also frequently helping out on the
> user@f.a.o mailing lists.
>
> Congratulations Andrey!
>
> Best, Till
> (on behalf of the Flink PMC)
>


Weekly Community Update 2019/33, Personal Chinese Version

2019-08-18 Thread Zili Chen
Hi community,

Inspired by weekly community updates thread, regards the growth
of Chinese community and kind of different concerns for community
members I'd like to start a personally maintained Chinese version of
Weekly Community Update.

Right now I posted these weeks' updates on this page[1] where Chinese
users as well as potential ones could easily reach.

Any feedbacks are welcome and I am looking for a collaborator who is
familiar with TableAPI/SQL topics to enrich the content.

Best,
tison.

[1] https://zhuanlan.zhihu.com/p/78753149


Re: Recovery from job manager crash using check points

2019-08-19 Thread Zili Chen
Hi Min,

I guess you use standalone high-availability and when TM fails,
JM can recovered the job from an in-memory checkpoint store.

However, when JM fails, since you don't persist state on ha backend
such as ZooKeeper, even JM relaunched by YARN RM superseded by a
stand by, the new one knows nothing about the previous jobs.

In short, you need to set up ZooKeepers as you yourself mentioned.

Best,
tison.


Biao Liu  于2019年8月19日周一 下午11:49写道:

> Hi Min,
>
> > Do I need to set up zookeepers to keep the states when a job manager
> crashes?
>
> I guess you need to set up the HA [1] properly. Besides that, I would
> suggest you should also check the state backend.
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html
> 2.
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Mon, 19 Aug 2019 at 23:28,  wrote:
>
>> Hi,
>>
>>
>>
>> I can use check points to recover Flink states when a task manger crashes.
>>
>>
>>
>> I can not use check points to recover Flink states when a job manger
>> crashes.
>>
>>
>>
>> Do I need to set up zookeepers to keep the states when a job manager
>> crashes?
>>
>>
>>
>> Regards
>>
>>
>>
>> Min
>>
>>
>>
>


Re: Weekly Community Update 2019/33, Personal Chinese Version

2019-08-19 Thread Zili Chen
Hi Paul & Jark,

Thanks for your feedbacks!

I also think of putting the content in the email but hesitate on
where it should be sent to(user-zh only IMO), what kind of thread
it should be sorted to([ANNOUNCE] or just normal thread), and how
to format to fit the email form.

It is reasonable to have this thread sync on both user-zh list side
and web page side so that we both follow the Apache way and make it
convenient to reach for (potential) Chinese users.

I'm glad to put the content in the email but decided to collect some
feedback of the idea first :-) If no other suggestions I am going to
start a separated normal thread, i.e., without [ANNOUNCE] header, to
user-zh list later today or tomorrow.

Best,
tison.


Jark Wu  于2019年8月20日周二 上午11:28写道:

> Hi Zili,
>
> +1 for the Chinese Weekly Community Update.
> I think this will categorical attract more Chinese users.
> Btw, could you also put the content of Chinese Weekly Updates in the
> email? I think this will be more align with the Apache Way.
> So that we can help to response users who have interesting/questions on
> some items.
>
> Thanks,
> Jark
>
>
> On Mon, 19 Aug 2019 at 13:27, Paul Lam  wrote:
>
>> Hi Tison,
>>
>> Big +1 for the Chinese Weekly Community Update. The content is
>> well-organized, and I believe it would be very helpful for Chinese users to
>> get an overview of what’s going on in the community.
>>
>> Best,
>> Paul Lam
>>
>> > 在 2019年8月19日,12:27,Zili Chen  写道:
>> >
>> > Hi community,
>> >
>> > Inspired by weekly community updates thread, regards the growth
>> > of Chinese community and kind of different concerns for community
>> > members I'd like to start a personally maintained Chinese version of
>> > Weekly Community Update.
>> >
>> > Right now I posted these weeks' updates on this page[1] where Chinese
>> > users as well as potential ones could easily reach.
>> >
>> > Any feedbacks are welcome and I am looking for a collaborator who is
>> > familiar with TableAPI/SQL topics to enrich the content.
>> >
>> > Best,
>> > tison.
>> >
>> > [1] https://zhuanlan.zhihu.com/p/78753149 <
>> https://zhuanlan.zhihu.com/p/78753149>
>>
>


[SURVEY] How do you use high-availability services in Flink?

2019-08-21 Thread Zili Chen
Hi guys,

We want to have an accurate idea of how users actually use
high-availability services in Flink, especially how you customize
high-availability services by HighAvailabilityServicesFactory.

Basically there are standalone impl., zookeeper impl., embedded impl.
used in MiniCluster, YARN impl. not yet implemented, and a gate to
customized implementations.

Generally I think standalone impl. and zookeeper impl. are the most
widely used implementations. The embedded impl. is used without
awareness when users run a MiniCluster.

Besides that, it is helpful to know how you guys customize
high-availability services using HighAvailabilityServicesFactory
interface for the ongoing FLINK-10333[1] which would evolve
high-availability services in Flink. As well as whether there is any
user take interest in the not yet implemented YARN impl..

Any user case should be helpful. I really appreciate your time and your
insight.

Best,
tison.

[1] https://issues.apache.org/jira/browse/FLINK-10333


Re: [SURVEY] How do you use high-availability services in Flink?

2019-08-21 Thread Zili Chen
In addition, FLINK-13750[1] also likely introduce breaking change
on high-availability services. So it is highly encouraged you who
might be affected by the change share your cases :-)

Best,
tison.

[1] https://issues.apache.org/jira/browse/FLINK-13750


Zili Chen  于2019年8月21日周三 下午3:32写道:

> Hi guys,
>
> We want to have an accurate idea of how users actually use
> high-availability services in Flink, especially how you customize
> high-availability services by HighAvailabilityServicesFactory.
>
> Basically there are standalone impl., zookeeper impl., embedded impl.
> used in MiniCluster, YARN impl. not yet implemented, and a gate to
> customized implementations.
>
> Generally I think standalone impl. and zookeeper impl. are the most
> widely used implementations. The embedded impl. is used without
> awareness when users run a MiniCluster.
>
> Besides that, it is helpful to know how you guys customize
> high-availability services using HighAvailabilityServicesFactory
> interface for the ongoing FLINK-10333[1] which would evolve
> high-availability services in Flink. As well as whether there is any
> user take interest in the not yet implemented YARN impl..
>
> Any user case should be helpful. I really appreciate your time and your
> insight.
>
> Best,
> tison.
>
> [1] https://issues.apache.org/jira/browse/FLINK-10333
>


Re: Recovery from job manager crash using check points

2019-08-21 Thread Zili Chen
Hi Min,

For your question, the answer is no.

In standalone case Flink uses an in memory checkpoint store which
is able to restore your savepoint configured in command-line and
recover states from it.

Besides, stop with savepoint and resume the job from savepoint
is the standard path to migrate jobs.

Best,
tison.


 于2019年8月21日周三 下午9:46写道:

> Thanks for the helpful reply.
>
>
>
> One more question, does this zookeeper or HA requirement apply for a
> savepoint?
>
>
>
> Can I bounce a single jobmanager cluster and rerun my flink job from its
> previous states with a save point directory? e.g.
>
> ./bin/flink run myJob.jar -s savepointDirectory
>
>
>
> Regards,
>
>
>
> Min
>
>
>
> *From:* Zili Chen [mailto:wander4...@gmail.com]
> *Sent:* Dienstag, 20. August 2019 04:16
> *To:* Biao Liu
> *Cc:* Tan, Min; user
> *Subject:* [External] Re: Recovery from job manager crash using check
> points
>
>
>
> Hi Min,
>
>
>
> I guess you use standalone high-availability and when TM fails,
>
> JM can recovered the job from an in-memory checkpoint store.
>
>
>
> However, when JM fails, since you don't persist state on ha backend
>
> such as ZooKeeper, even JM relaunched by YARN RM superseded by a
>
> stand by, the new one knows nothing about the previous jobs.
>
>
>
> In short, you need to set up ZooKeepers as you yourself mentioned.
>
>
>
> Best,
>
> tison.
>
>
>
>
>
> Biao Liu  于2019年8月19日周一 下午11:49写道:
>
> Hi Min,
>
>
>
> > Do I need to set up zookeepers to keep the states when a job manager
> crashes?
>
>
>
> I guess you need to set up the HA [1] properly. Besides that, I would
> suggest you should also check the state backend.
>
>
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html
>
> 2.
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html
>
>
>
> Thanks,
>
> Biao /'bɪ.aʊ/
>
>
>
>
>
>
>
> On Mon, 19 Aug 2019 at 23:28,  wrote:
>
> Hi,
>
>
>
> I can use check points to recover Flink states when a task manger crashes.
>
>
>
> I can not use check points to recover Flink states when a job manger
> crashes.
>
>
>
> Do I need to set up zookeepers to keep the states when a job manager
> crashes?
>
>
>
> Regards
>
>
>
> Min
>
>
>
>


Re: TaskManager not connecting to ResourceManager in HA mode

2019-08-21 Thread Zili Chen
Hi Aleksandar,

base on your log:

taskmanager_1   | 2019-08-22 00:05:03,713 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
to ResourceManager
akka.tcp://flink@jobmanager:6123/user/jobmanager()
.
taskmanager_1   | 2019-08-22 00:05:04,137 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address
akka.tcp://flink@jobmanager:6123/user/jobmanager, retrying in 1 ms:
Could not connect to rpc endpoint under address
akka.tcp://flink@jobmanager:6123/user/jobmanager..

it looks like you return a jobmanager address on retrieval service of
resource manager. Please check the implementation carefully or share it on
mailing list that others can help for investigation.

Best,
tison.


Zhu Zhu  于2019年8月22日周四 上午10:11写道:

> Hi Aleksandar,
>
> The resource manager address is retrieved from the HA services.
> Would you check whether your customized HA services is returning the
> right  LeaderRetrievalService and whether the LeaderRetrievalService is
> really retrieving the right leader's address?
> Or is it possible that the stored resource manager address in HA is
> replaced by jobmanager address in any case?
>
> Thanks,
> Zhu Zhu
>
> Aleksandar Mastilovic  于2019年8月22日周四
> 上午8:16写道:
>
>> Hi all,
>>
>> I’m experimenting with using my own implementation of HA services instead
>> of ZooKeeper that would persist JobManager information on a Kubernetes
>> volume instead of in ZooKeeper.
>>
>> I’ve set the high-availability option in flink-conf.yaml to the FQN of my
>> factory class, and started the docker ensemble as I usually do (i.e. with
>> no special “cluster” arguments or scripts.)
>>
>> What’s happening now is that TaskManager is unable to connect to
>> ResourceManager, because it seems it’s trying to use the /user/jobmanager
>> path instead of /user/resourcemanager.
>>
>> Here’s what I found in the logs:
>>
>>
>> jobmanager_1| 2019-08-22 00:05:00,963 INFO  akka.remote.Remoting
>>  - Remoting started; listening on
>> addresses :[akka.tcp://flink@jobmanager:6123]
>> jobmanager_1| 2019-08-22 00:05:00,975 INFO
>>  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor
>> system started at akka.tcp://flink@jobmanager:6123
>>
>> jobmanager_1| 2019-08-22 00:05:02,380 INFO
>>  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
>> RPC endpoint for
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at
>> akka://flink/user/resourcemanager .
>>
>> jobmanager_1| 2019-08-22 00:05:03,138 INFO
>>  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
>> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
>> at akka://flink/user/dispatcher .
>>
>> jobmanager_1| 2019-08-22 00:05:03,211 INFO
>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> ResourceManager akka.tcp://flink@jobmanager:6123/user/resourcemanager
>> was granted leadership with fencing token 
>>
>> jobmanager_1| 2019-08-22 00:05:03,292 INFO
>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher
>> akka.tcp://flink@jobmanager:6123/user/dispatcher was granted leadership
>> with fencing token ----
>>
>> taskmanager_1   | 2019-08-22 00:05:03,713 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
>> to ResourceManager
>> akka.tcp://flink@jobmanager:6123/user/jobmanager()
>> .
>> taskmanager_1   | 2019-08-22 00:05:04,137 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
>> resolve ResourceManager address
>> akka.tcp://flink@jobmanager:6123/user/jobmanager, retrying in 1 ms:
>> Could not connect to rpc endpoint under address
>> akka.tcp://flink@jobmanager:6123/user/jobmanager..
>>
>> Is this a known bug? I’d appreciate any help I can get.
>>
>> Thanks,
>> Aleksandar Mastilovic
>>
>


Re: TaskManager not connecting to ResourceManager in HA mode

2019-08-21 Thread Zili Chen
Besides, would you like to participant our survey thread[1] on
user list about "How do you use high-availability services in Flink?"

It would help Flink improve its high-availability serving.

Best,
tison.

[1]
https://lists.apache.org/x/thread.html/c0cc07197e6ba30b45d7709cc9e17d8497e5e3f33de504d58dfcafad@%3Cuser.flink.apache.org%3E


Zili Chen  于2019年8月22日周四 上午10:16写道:

> Hi Aleksandar,
>
> base on your log:
>
> taskmanager_1   | 2019-08-22 00:05:03,713 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
> to ResourceManager
> akka.tcp://flink@jobmanager:6123/user/jobmanager()
> .
> taskmanager_1   | 2019-08-22 00:05:04,137 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
> resolve ResourceManager address
> akka.tcp://flink@jobmanager:6123/user/jobmanager, retrying in 1 ms:
> Could not connect to rpc endpoint under address
> akka.tcp://flink@jobmanager:6123/user/jobmanager..
>
> it looks like you return a jobmanager address on retrieval service of
> resource manager. Please check the implementation carefully or share it on
> mailing list that others can help for investigation.
>
> Best,
> tison.
>
>
> Zhu Zhu  于2019年8月22日周四 上午10:11写道:
>
>> Hi Aleksandar,
>>
>> The resource manager address is retrieved from the HA services.
>> Would you check whether your customized HA services is returning the
>> right  LeaderRetrievalService and whether the LeaderRetrievalService is
>> really retrieving the right leader's address?
>> Or is it possible that the stored resource manager address in HA is
>> replaced by jobmanager address in any case?
>>
>> Thanks,
>> Zhu Zhu
>>
>> Aleksandar Mastilovic  于2019年8月22日周四
>> 上午8:16写道:
>>
>>> Hi all,
>>>
>>> I’m experimenting with using my own implementation of HA services
>>> instead of ZooKeeper that would persist JobManager information on a
>>> Kubernetes volume instead of in ZooKeeper.
>>>
>>> I’ve set the high-availability option in flink-conf.yaml to the FQN of
>>> my factory class, and started the docker ensemble as I usually do (i.e.
>>> with no special “cluster” arguments or scripts.)
>>>
>>> What’s happening now is that TaskManager is unable to connect to
>>> ResourceManager, because it seems it’s trying to use the /user/jobmanager
>>> path instead of /user/resourcemanager.
>>>
>>> Here’s what I found in the logs:
>>>
>>>
>>> jobmanager_1| 2019-08-22 00:05:00,963 INFO  akka.remote.Remoting
>>>  - Remoting started; listening on
>>> addresses :[akka.tcp://flink@jobmanager:6123]
>>> jobmanager_1| 2019-08-22 00:05:00,975 INFO
>>>  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor
>>> system started at akka.tcp://flink@jobmanager:6123
>>>
>>> jobmanager_1| 2019-08-22 00:05:02,380 INFO
>>>  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
>>> RPC endpoint for
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at
>>> akka://flink/user/resourcemanager .
>>>
>>> jobmanager_1| 2019-08-22 00:05:03,138 INFO
>>>  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
>>> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
>>> at akka://flink/user/dispatcher .
>>>
>>> jobmanager_1| 2019-08-22 00:05:03,211 INFO
>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> ResourceManager akka.tcp://flink@jobmanager:6123/user/resourcemanager
>>> was granted leadership with fencing token 
>>>
>>> jobmanager_1| 2019-08-22 00:05:03,292 INFO
>>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher
>>> akka.tcp://flink@jobmanager:6123/user/dispatcher was granted leadership
>>> with fencing token ----
>>>
>>> taskmanager_1   | 2019-08-22 00:05:03,713 INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
>>> to ResourceManager
>>> akka.tcp://flink@jobmanager:6123/user/jobmanager()
>>> .
>>> taskmanager_1   | 2019-08-22 00:05:04,137 INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
>>> resolve ResourceManager address
>>> akka.tcp://flink@jobmanager:6123/user/jobmanager, retrying in 1 ms:
>>> Could not connect to rpc endpoint under address
>>> akka.tcp://flink@jobmanager:6123/user/jobmanager..
>>>
>>> Is this a known bug? I’d appreciate any help I can get.
>>>
>>> Thanks,
>>> Aleksandar Mastilovic
>>>
>>


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-22 Thread Zili Chen
Congratulations!

Thanks Gordon and Kurt for being the release manager.

Thanks all the contributors who have made this release possible.

Best,
tison.


Jark Wu  于2019年8月22日周四 下午8:11写道:

> Congratulations!
>
> Thanks Gordon and Kurt for being the release manager and thanks a lot to
> all the contributors.
>
>
> Cheers,
> Jark
>
> On Thu, 22 Aug 2019 at 20:06, Oytun Tez  wrote:
>
>> Congratulations team; thanks for the update, Gordon.
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Thu, Aug 22, 2019 at 8:03 AM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.9.0, which is the latest major release.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this new major release:
>>> https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Cheers,
>>> Gordon
>>>
>>


Re: TaskManager not connecting to ResourceManager in HA mode

2019-08-22 Thread Zili Chen
Nice to hear :-)

Best,
tison.


Aleksandar Mastilovic  于2019年8月23日周五 上午2:22写道:

> Thanks for all the help, people - you made me go through my code once
> again and discover that I switched argument positions for job manager and
> resource manager addresses :-)
>
> The docker ensemble now starts fine, I’m working on ironing out the bugs
> now.
>
> I’ll participate in the survey too!
>
> On Aug 21, 2019, at 7:18 PM, Zili Chen  wrote:
>
> Besides, would you like to participant our survey thread[1] on
> user list about "How do you use high-availability services in Flink?"
>
> It would help Flink improve its high-availability serving.
>
> Best,
> tison.
>
> [1]
> https://lists.apache.org/x/thread.html/c0cc07197e6ba30b45d7709cc9e17d8497e5e3f33de504d58dfcafad@%3Cuser.flink.apache.org%3E
>
>
> Zili Chen  于2019年8月22日周四 上午10:16写道:
>
>> Hi Aleksandar,
>>
>> base on your log:
>>
>> taskmanager_1   | 2019-08-22 00:05:03,713 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
>> to ResourceManager
>> akka.tcp://flink@jobmanager:6123/user/jobmanager()
>> .
>> taskmanager_1   | 2019-08-22 00:05:04,137 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
>> resolve ResourceManager address
>> akka.tcp://flink@jobmanager:6123/user/jobmanager, retrying in 1 ms:
>> Could not connect to rpc endpoint under address
>> akka.tcp://flink@jobmanager:6123/user/jobmanager..
>>
>> it looks like you return a jobmanager address on retrieval service of
>> resource manager. Please check the implementation carefully or share it on
>> mailing list that others can help for investigation.
>>
>> Best,
>> tison.
>>
>>
>> Zhu Zhu  于2019年8月22日周四 上午10:11写道:
>>
>>> Hi Aleksandar,
>>>
>>> The resource manager address is retrieved from the HA services.
>>> Would you check whether your customized HA services is returning the
>>> right  LeaderRetrievalService and whether the LeaderRetrievalService is
>>> really retrieving the right leader's address?
>>> Or is it possible that the stored resource manager address in HA is
>>> replaced by jobmanager address in any case?
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Aleksandar Mastilovic  于2019年8月22日周四
>>> 上午8:16写道:
>>>
>>>> Hi all,
>>>>
>>>> I’m experimenting with using my own implementation of HA services
>>>> instead of ZooKeeper that would persist JobManager information on a
>>>> Kubernetes volume instead of in ZooKeeper.
>>>>
>>>> I’ve set the high-availability option in flink-conf.yaml to the FQN of
>>>> my factory class, and started the docker ensemble as I usually do (i.e.
>>>> with no special “cluster” arguments or scripts.)
>>>>
>>>> What’s happening now is that TaskManager is unable to connect to
>>>> ResourceManager, because it seems it’s trying to use the /user/jobmanager
>>>> path instead of /user/resourcemanager.
>>>>
>>>> Here’s what I found in the logs:
>>>>
>>>>
>>>> jobmanager_1| 2019-08-22 00:05:00,963 INFO  akka.remote.Remoting
>>>>- Remoting started; listening on
>>>> addresses :[akka.tcp://flink@jobmanager:6123]
>>>> jobmanager_1| 2019-08-22 00:05:00,975 INFO
>>>>  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor
>>>> system started at akka.tcp://flink@jobmanager:6123
>>>>
>>>> jobmanager_1| 2019-08-22 00:05:02,380 INFO
>>>>  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
>>>> RPC endpoint for
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at
>>>> akka://flink/user/resourcemanager .
>>>>
>>>> jobmanager_1| 2019-08-22 00:05:03,138 INFO
>>>>  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
>>>> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
>>>> at akka://flink/user/dispatcher .
>>>>
>>>> jobmanager_1| 2019-08-22 00:05:03,211 INFO
>>>>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> ResourceManager akka.tcp://flink@jobmanager:6123/user/resourcemanager
>>>> was granted leadership with fencing token 
>>>>
>>

Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-23 Thread Zili Chen
Hi Gavin,

I also find a problem in shell if the directory contain whitespace
then the final command to run is incorrect. Could you check the
final command to be executed?

FYI, here is the ticket[1].

Best,
tison.

[1] https://issues.apache.org/jira/browse/FLINK-13827


Gavin Lee  于2019年8月23日周五 下午3:36写道:

> Why bin/start-scala-shell.sh local return following error?
>
> bin/start-scala-shell.sh local
>
> Error: Could not find or load main class
> org.apache.flink.api.scala.FlinkShell
> For flink 1.8.1 and previous ones, no such issues.
>
> On Fri, Aug 23, 2019 at 2:05 PM qi luo  wrote:
>
>> Congratulations and thanks for the hard work!
>>
>> Qi
>>
>> On Aug 22, 2019, at 8:03 PM, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.9.0, which is the latest major release.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this new major release:
>> https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Cheers,
>> Gordon
>>
>>
>>
>
> --
> Gavin
>


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-23 Thread Zili Chen
I downloaded 1.9.0 dist from here[1] and didn't see the issue. Where do you
download it? Could you try to download the dist from [1] and see whether
the problem last?

Best,
tison.

[1]
http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz


Gavin Lee  于2019年8月23日周五 下午4:34写道:

> Thanks for your reply @Zili.
> I'm afraid it's not the same issue.
> I found that the FlinkShell.class was not included in flink dist jar file
> in 1.9.0 version.
> Nowhere can find this class file inside jar, either in opt or lib
> directory under root folder of flink distribution.
>
>
> On Fri, Aug 23, 2019 at 4:10 PM Zili Chen  wrote:
>
>> Hi Gavin,
>>
>> I also find a problem in shell if the directory contain whitespace
>> then the final command to run is incorrect. Could you check the
>> final command to be executed?
>>
>> FYI, here is the ticket[1].
>>
>> Best,
>> tison.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-13827
>>
>>
>> Gavin Lee  于2019年8月23日周五 下午3:36写道:
>>
>>> Why bin/start-scala-shell.sh local return following error?
>>>
>>> bin/start-scala-shell.sh local
>>>
>>> Error: Could not find or load main class
>>> org.apache.flink.api.scala.FlinkShell
>>> For flink 1.8.1 and previous ones, no such issues.
>>>
>>> On Fri, Aug 23, 2019 at 2:05 PM qi luo  wrote:
>>>
>>>> Congratulations and thanks for the hard work!
>>>>
>>>> Qi
>>>>
>>>> On Aug 22, 2019, at 8:03 PM, Tzu-Li (Gordon) Tai 
>>>> wrote:
>>>>
>>>> The Apache Flink community is very happy to announce the release of
>>>> Apache Flink 1.9.0, which is the latest major release.
>>>>
>>>> Apache Flink® is an open-source stream processing framework for
>>>> distributed, high-performing, always-available, and accurate data streaming
>>>> applications.
>>>>
>>>> The release is available for download at:
>>>> https://flink.apache.org/downloads.html
>>>>
>>>> Please check out the release blog post for an overview of the
>>>> improvements for this new major release:
>>>> https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>>>>
>>>> The full release notes are available in Jira:
>>>>
>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
>>>>
>>>> We would like to thank all contributors of the Apache Flink community
>>>> who made this release possible!
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>>
>>>
>>> --
>>> Gavin
>>>
>>
>
> --
> Gavin
>


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-23 Thread Zili Chen
Hi Till,

Did we mention this in release note(or maybe previous release note where we
did the exclusion)?

Best,
tison.


Till Rohrmann  于2019年8月23日周五 下午10:28写道:

> Hi Gavin,
>
> if I'm not mistaken, then the community excluded the Scala FlinkShell
> since a couple of versions for Scala 2.12. The problem seems to be that
> some of the tests failed. See here [1] for more information.
>
> [1] https://issues.apache.org/jira/browse/FLINK-10911
>
> Cheers,
> Till
>
> On Fri, Aug 23, 2019 at 11:44 AM Gavin Lee  wrote:
>
>> I used package on apache official site, with mirror [1], the difference is
>> I used scala 2.12 version.
>> I also tried to build from source for both scala 2.11 and 2.12, when build
>> 2.12 the FlinkShell.class is in flink-dist jar file but after running mvn
>> clean package -Dscala-2.12, this class was removed in flink-dist_2.12-1.9
>> jar
>> file.
>> Seems broken here for scala 2.12, right?
>>
>> [1]
>>
>> http://mirror.bit.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.12.tgz
>>
>> On Fri, Aug 23, 2019 at 4:37 PM Zili Chen  wrote:
>>
>> > I downloaded 1.9.0 dist from here[1] and didn't see the issue. Where do
>> you
>> > download it? Could you try to download the dist from [1] and see whether
>> > the problem last?
>> >
>> > Best,
>> > tison.
>> >
>> > [1]
>> >
>> http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
>> >
>> >
>> > Gavin Lee  于2019年8月23日周五 下午4:34写道:
>> >
>> >> Thanks for your reply @Zili.
>> >> I'm afraid it's not the same issue.
>> >> I found that the FlinkShell.class was not included in flink dist jar
>> file
>> >> in 1.9.0 version.
>> >> Nowhere can find this class file inside jar, either in opt or lib
>> >> directory under root folder of flink distribution.
>> >>
>> >>
>> >> On Fri, Aug 23, 2019 at 4:10 PM Zili Chen 
>> wrote:
>> >>
>> >>> Hi Gavin,
>> >>>
>> >>> I also find a problem in shell if the directory contain whitespace
>> >>> then the final command to run is incorrect. Could you check the
>> >>> final command to be executed?
>> >>>
>> >>> FYI, here is the ticket[1].
>> >>>
>> >>> Best,
>> >>> tison.
>> >>>
>> >>> [1] https://issues.apache.org/jira/browse/FLINK-13827
>> >>>
>> >>>
>> >>> Gavin Lee  于2019年8月23日周五 下午3:36写道:
>> >>>
>> >>>> Why bin/start-scala-shell.sh local return following error?
>> >>>>
>> >>>> bin/start-scala-shell.sh local
>> >>>>
>> >>>> Error: Could not find or load main class
>> >>>> org.apache.flink.api.scala.FlinkShell
>> >>>> For flink 1.8.1 and previous ones, no such issues.
>> >>>>
>> >>>> On Fri, Aug 23, 2019 at 2:05 PM qi luo  wrote:
>> >>>>
>> >>>>> Congratulations and thanks for the hard work!
>> >>>>>
>> >>>>> Qi
>> >>>>>
>> >>>>> On Aug 22, 2019, at 8:03 PM, Tzu-Li (Gordon) Tai <
>> tzuli...@apache.org>
>> >>>>> wrote:
>> >>>>>
>> >>>>> The Apache Flink community is very happy to announce the release of
>> >>>>> Apache Flink 1.9.0, which is the latest major release.
>> >>>>>
>> >>>>> Apache Flink® is an open-source stream processing framework for
>> >>>>> distributed, high-performing, always-available, and accurate data
>> streaming
>> >>>>> applications.
>> >>>>>
>> >>>>> The release is available for download at:
>> >>>>> https://flink.apache.org/downloads.html
>> >>>>>
>> >>>>> Please check out the release blog post for an overview of the
>> >>>>> improvements for this new major release:
>> >>>>> https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>> >>>>>
>> >>>>> The full release notes are available in Jira:
>> >>>>>
>> >>>>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
>> >>>>>
>> >>>>> We would like to thank all contributors of the Apache Flink
>> community
>> >>>>> who made this release possible!
>> >>>>>
>> >>>>> Cheers,
>> >>>>> Gordon
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>> --
>> >>>> Gavin
>> >>>>
>> >>>
>> >>
>> >> --
>> >> Gavin
>> >>
>> >
>>
>> --
>> Gavin
>>
>


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-26 Thread Zili Chen
Hi Oytun,

I think it intents to publish flink-queryable-state-client-java
without scala suffix since it is scala-free. An artifact without
scala suffix has been published [2].

See also [1].

Best,
tison.

[1] https://issues.apache.org/jira/browse/FLINK-12602
[2]
https://mvnrepository.com/artifact/org.apache.flink/flink-queryable-state-client-java/1.9.0



Till Rohrmann  于2019年8月26日周一 下午3:50写道:

> The missing support for the Scala shell with Scala 2.12 was documented in
> the 1.7 release notes [1].
>
> @Oytun, the docker image should be updated in a bit. Sorry for the
> inconveniences. Thanks for the pointer that
> flink-queryable-state-client-java_2.11 hasn't been published. We'll upload
> this in a bit.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/release-notes/flink-1.7.html#scala-shell-does-not-work-with-scala-212
>
> Cheers,
> Till
>
> On Sat, Aug 24, 2019 at 12:14 PM chaojianok  wrote:
>
>> Congratulations and thanks!
>> At 2019-08-22 20:03:26, "Tzu-Li (Gordon) Tai" 
>> wrote:
>> >The Apache Flink community is very happy to announce the release of
>> Apache
>> >Flink 1.9.0, which is the latest major release.
>> >
>> >Apache Flink® is an open-source stream processing framework for
>> >distributed, high-performing, always-available, and accurate data
>> streaming
>> >applications.
>> >
>> >The release is available for download at:
>> >https://flink.apache.org/downloads.html
>> >
>> >Please check out the release blog post for an overview of the
>> improvements
>> >for this new major release:
>> >https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>> >
>> >The full release notes are available in Jira:
>> >
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
>> >
>> >We would like to thank all contributors of the Apache Flink community who
>> >made this release possible!
>> >
>> >Cheers,
>> >Gordon
>>
>


Re: [SURVEY] How do you use high-availability services in Flink?

2019-08-28 Thread Zili Chen
Thanks for your email Aleksandar! Sorry for reply late.

May I ask a question, do you config high-availability.storageDir in your
case?
That is, do you persist and retrieve job graph & checkpoint totally in MapDB
or, as ZooKeeper implementation does, persist them in an external filesystem
and just store a handle in MapDB?

Best,
tison.


Aleksandar Mastilovic  于2019年8月24日周六 上午7:04写道:

> Hi all,
>
> Since I’m currently working on an implementation of
> HighAvailabilityServicesFactory I thought it would be good to report here
> about my experience so far.
>
> Our use case is cloud based, where we package Flink and our supplementary
> code into a docker image, then run those images through Kubernetes+Helm
> orchestration.
>
> We don’t use Hadoop nor HDFS but rather Google Cloud Storage, and we don’t
> run ZooKeepers. Our Flink setup consists of one JobManager and multiple
> TaskManagers on-demand.
>
> Due to the nature of cloud computing there’s a possibility our JobManager
> instance might go down, only to be automatically recreated through
> Kubernetes. Since we don’t run ZooKeeper
> We needed a way to run a variant of High Availability cluster where we
> would keep JobManager information on our attached persistent k8s volume
> instead of ZooKeeper.
> We found this (
> https://stackoverflow.com/questions/52104759/apache-flink-on-kubernetes-resume-job-if-jobmanager-crashes/52112538)
> post on StackOverflow and decided to give it a try.
>
> So far we have a setup that seems to be working on our local deployment,
> we haven’t yet tried it in the actual cloud.
>
> As far as implementation goes, here’s what we did:
>
> We used MapDB (mapdb.org) as our storage format, to persist lists of
> objects onto disk. We partially relied on StandaloneHaServices for our
> HaServices implementation. Otherwise we looked at the ZooKeeperHaServices
> and related classes for inspiration and guidance.
>
> Here’s a list of new classes:
>
> FileSystemCheckpointIDCounter implements CheckpointIDCounter
> FileSystemCheckpointRecoveryFactory implements CheckpointRecoveryFactory
> FileSystemCompletedCheckpointStore implements CompletedCheckpointStore
> FileSystemHaServices extends StandaloneHaServices
> FileSystemHaServicesFactory implements HighAvailabilityServicesFactory
> FileSystemSubmittedJobGraphStore implements SubmittedJobGraphStore
>
> Testing so far proved that bringing down a JobManager and bringing it back
> up does indeed restore all the running jobs. Job creation/destruction also
> works.
>
> Hope this helps!
>
> Thanks,
> Aleksandar Mastilovic
>
> On Aug 21, 2019, at 12:32 AM, Zili Chen  wrote:
>
> Hi guys,
>
> We want to have an accurate idea of how users actually use
> high-availability services in Flink, especially how you customize
> high-availability services by HighAvailabilityServicesFactory.
>
> Basically there are standalone impl., zookeeper impl., embedded impl.
> used in MiniCluster, YARN impl. not yet implemented, and a gate to
> customized implementations.
>
> Generally I think standalone impl. and zookeeper impl. are the most
> widely used implementations. The embedded impl. is used without
> awareness when users run a MiniCluster.
>
> Besides that, it is helpful to know how you guys customize
> high-availability services using HighAvailabilityServicesFactory
> interface for the ongoing FLINK-10333[1] which would evolve
> high-availability services in Flink. As well as whether there is any
> user take interest in the not yet implemented YARN impl..
>
> Any user case should be helpful. I really appreciate your time and your
> insight.
>
> Best,
> tison.
>
> [1] https://issues.apache.org/jira/browse/FLINK-10333
>
>
>


Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC

2019-09-06 Thread Zili Chen
Congrats Klou!

Best,
tison.


Till Rohrmann  于2019年9月6日周五 下午9:23写道:

> Congrats Klou!
>
> Cheers,
> Till
>
> On Fri, Sep 6, 2019 at 3:00 PM Dian Fu  wrote:
>
>> Congratulations Kostas!
>>
>> Regards,
>> Dian
>>
>> > 在 2019年9月6日,下午8:58,Wesley Peng  写道:
>> >
>> > On 2019/9/6 8:55 下午, Fabian Hueske wrote:
>> >> I'm very happy to announce that Kostas Kloudas is joining the Flink
>> PMC.
>> >> Kostas is contributing to Flink for many years and puts lots of effort
>> in helping our users and growing the Flink community.
>> >> Please join me in congratulating Kostas!
>> >
>> > congratulation Kostas!
>> >
>> > regards.
>>
>>


Re: Post-processing batch JobExecutionResult

2019-09-06 Thread Zili Chen
Hi spoganshev,

If you deploy in per-job mode, OptimizerPlanEnvironment would be used, and
thus
as you pointed out, there is _no_ way to post processing JobExecutionResult.
We the community regard this situation as a shortcoming and work on an
enhancement
progress to enable you get a JobClient as return value of #execute in all
deployment
and execution mode. Take a look at [1] and [2] for a preview and feel free
to describe
your requirement so that the following version can satisfy your demand.

Besides, if you deploy in session mode, which might be more natural in
batch cases,
at the moment ContextEnvironment is used, which execute normally and return
the
JobExecutionResult that you can make use of.

Simply sum up, you can try out session mode deployment to see if it satisfy
your
requirement on post processing.

Best,
tison.


Zhu Zhu  于2019年9月7日周六 上午12:07写道:

> Hi spoganshev,
>
> The *OptimizerPlanEnvironment* is for creating optimized plan only, as
> described in the javadoc
> "An {@link ExecutionEnvironment} that never executes a job but only
> creates the optimized plan."
> It execute() is invoked with some internal handling so that it only
> generates optimized plan and do not actually submit a job.
> Some other execution environment will execute the job instead.
>
> Not sure how you created your ExecutionEnvironment?
> Usually for DataSet jobs, it should be created in the way as below.
> "final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();"
>
> Thanks,
> Zhu Zhu
>
> spoganshev  于2019年9月6日周五 下午11:39写道:
>
>> Due to OptimizerPlanEnvironment.execute() throwing exception on the last
>> line
>> there is not way to post-process batch job execution result, like:
>>
>> JobExecutionResult r = env.execute(); // execute batch job
>> analyzeResult(r); // this will never get executed due to plan optimization
>>
>>
>> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L54
>>
>> Is there any way to allow such post-processing in batch jobs?
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Post-processing batch JobExecutionResult

2019-09-06 Thread Zili Chen
Besides, if you submit the job by Jar Run REST API, it is also
OptimizerPlanEnvironment to be used. So again, _no_ post
processing support at the moment.


Zili Chen  于2019年9月7日周六 上午12:51写道:

> Hi spoganshev,
>
> If you deploy in per-job mode, OptimizerPlanEnvironment would be used, and
> thus
> as you pointed out, there is _no_ way to post processing
> JobExecutionResult.
> We the community regard this situation as a shortcoming and work on an
> enhancement
> progress to enable you get a JobClient as return value of #execute in all
> deployment
> and execution mode. Take a look at [1] and [2] for a preview and feel free
> to describe
> your requirement so that the following version can satisfy your demand.
>
> Besides, if you deploy in session mode, which might be more natural in
> batch cases,
> at the moment ContextEnvironment is used, which execute normally and
> return the
> JobExecutionResult that you can make use of.
>
> Simply sum up, you can try out session mode deployment to see if it
> satisfy your
> requirement on post processing.
>
> Best,
> tison.
>
>
> Zhu Zhu  于2019年9月7日周六 上午12:07写道:
>
>> Hi spoganshev,
>>
>> The *OptimizerPlanEnvironment* is for creating optimized plan only, as
>> described in the javadoc
>> "An {@link ExecutionEnvironment} that never executes a job but only
>> creates the optimized plan."
>> It execute() is invoked with some internal handling so that it only
>> generates optimized plan and do not actually submit a job.
>> Some other execution environment will execute the job instead.
>>
>> Not sure how you created your ExecutionEnvironment?
>> Usually for DataSet jobs, it should be created in the way as below.
>> "final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();"
>>
>> Thanks,
>> Zhu Zhu
>>
>> spoganshev  于2019年9月6日周五 下午11:39写道:
>>
>>> Due to OptimizerPlanEnvironment.execute() throwing exception on the last
>>> line
>>> there is not way to post-process batch job execution result, like:
>>>
>>> JobExecutionResult r = env.execute(); // execute batch job
>>> analyzeResult(r); // this will never get executed due to plan
>>> optimization
>>>
>>>
>>> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L54
>>>
>>> Is there any way to allow such post-processing in batch jobs?
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


  1   2   3   4   5   6   >