Re: Tracing and Flink

2020-08-17 Thread bvarga
Hi Aaron,

I've recently been looking at this topic and working on a prototype. The
approach I am trying is "backward tracing", or data provenance tracing,
where we try to explain what inputs and steps have affected the production
of an output record.

Arvid has summarized the most important aspects, my approach to UIDs is as
he described. I would like to add a few thoughts.

- With this backward tracing approach, it is very difficult to do sampling,
as aggregations / multi-input operators can only be traced if all inputs are
also traced. So this is more useful if you need to be able to explain the
origins of all output records.

- As Arvid mentioned, the size of the trace records can become big, and
negatively impact the performance of the pipeline. I'd suggest an approach
where each operator directly outputs its traces to some storage. Each trace
record has a UID. If each trace record contains a list/array of its inputs,
and you use an appropriate storage, you can do recursive lookups based on
the trace UIDs to find a complete trace graph for an output record. You may
even want a separate Flink job that pre-processes and pre-aggregates traces
that belong together (although the lateness / ordering might be difficult to
handle)

- If you choose this directly reporting approach, you still need to pass
along the trace UID in the main pipeline, so that the next operator's
produced trace can list it in the inputs.

- If you leave the production of the trace records explicit (as in you have
to construct and collect the trace record manually in each operator), you
can flexibly choose what inputs to include (e.g. for a large aggregation,
you may only want to list some of the aggregated elements as inputs). You
can then also add any additional metadata to help explain a certain step.

- I've looked into adapting this to OpenTracing, but it didn't seem
well-suited for this task. The span-based approach has a parent-child
relationship that doesn't fit the dataflow model too well. In Flink, with
the backward-tracing approach, the "root span" would logically be the output
record, and its children would need to be constructed earlier. I couldn't
find a way to nicely fit this view into the structure of OpenTracing
records.

Let me know your thoughts, I'd be happy to discuss this further.

Regards,

Balazs Varga  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


[DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Dawid Wysakowicz
Hi devs and users,

I wanted to ask you what do you think about removing some of the
deprecated APIs around the DataStream API.

The APIs I have in mind are:

  * RuntimeContext#getAllAccumulators (deprecated in 0.10)
  * DataStream#fold and all related classes and methods such as
FoldFunction, FoldingState, FoldingStateDescriptor ... (deprecated
in 1.3/1.4)
  * StreamExecutionEnvironment#setStateBackend(AbstractStateBackend)
(deprecated in 1.5)
  * DataStream#split (deprecated in 1.8)
  * Methods in (Connected)DataStream that specify keys as either indices
or field names such as DataStream#keyBy, DataStream#partitionCustom,
ConnectedStream#keyBy,  (deprecated in 1.11)

I think the first three should be straightforward. They are long
deprecated. The getAccumulators method is not used very often in my
opinion. The same applies to the DataStream#fold which additionally is
not very performant. Lastly the setStateBackend has an alternative with
a class from the AbstractStateBackend hierarchy, therefore it will be
still code compatible. Moreover if we remove the
#setStateBackend(AbstractStateBackend) we will get rid off warnings
users have right now when setting a statebackend as the correct method
cannot be used without an explicit casting.

As for the DataStream#split I know there were some objections against
removing the #split method in the past. I still believe the output tags
can replace the split method already.

The only problem in the last set of methods I propose to remove is that
they were deprecated only in the last release and those method were only
partially deprecated. Moreover some of the methods were not deprecated
in ConnectedStreams. Nevertheless I'd still be inclined to remove the
methods in this release.

Let me know what do you think about it.

Best,

Dawid



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Kostas Kloudas
Thanks a lot for starting this Dawid,

Big +1 for the proposed clean-up, and I would also add the deprecated
methods of the StreamExecutionEnvironment like:

enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
enableCheckpointing()
isForceCheckpointing()

readFile(FileInputFormat inputFormat,String
filePath,FileProcessingMode watchType,long interval, FilePathFilter
filter)
readFileStream(...)

socketTextStream(String hostname, int port, char delimiter, long maxRetry)
socketTextStream(String hostname, int port, char delimiter)

There are more, like the (get)/setNumberOfExecutionRetries() that were
deprecated long ago, but I have not investigated to see if they are
actually easy to remove.

Cheers,
Kostas

On Mon, Aug 17, 2020 at 10:53 AM Dawid Wysakowicz
 wrote:
>
> Hi devs and users,
>
> I wanted to ask you what do you think about removing some of the deprecated 
> APIs around the DataStream API.
>
> The APIs I have in mind are:
>
> RuntimeContext#getAllAccumulators (deprecated in 0.10)
> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)
> StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated 
> in 1.5)
> DataStream#split (deprecated in 1.8)
> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)
>
> I think the first three should be straightforward. They are long deprecated. 
> The getAccumulators method is not used very often in my opinion. The same 
> applies to the DataStream#fold which additionally is not very performant. 
> Lastly the setStateBackend has an alternative with a class from the 
> AbstractStateBackend hierarchy, therefore it will be still code compatible. 
> Moreover if we remove the #setStateBackend(AbstractStateBackend) we will get 
> rid off warnings users have right now when setting a statebackend as the 
> correct method cannot be used without an explicit casting.
>
> As for the DataStream#split I know there were some objections against 
> removing the #split method in the past. I still believe the output tags can 
> replace the split method already.
>
> The only problem in the last set of methods I propose to remove is that they 
> were deprecated only in the last release and those method were only partially 
> deprecated. Moreover some of the methods were not deprecated in 
> ConnectedStreams. Nevertheless I'd still be inclined to remove the methods in 
> this release.
>
> Let me know what do you think about it.
>
> Best,
>
> Dawid


Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread David Anderson
I assume that along with DataStream#fold you would also
remove WindowedStream#fold.

I'm in favor of going ahead with all of these.

David

On Mon, Aug 17, 2020 at 10:53 AM Dawid Wysakowicz 
wrote:

> Hi devs and users,
>
> I wanted to ask you what do you think about removing some of the
> deprecated APIs around the DataStream API.
>
> The APIs I have in mind are:
>
>- RuntimeContext#getAllAccumulators (deprecated in 0.10)
>- DataStream#fold and all related classes and methods such as
>FoldFunction, FoldingState, FoldingStateDescriptor ... (deprecated in
>1.3/1.4)
>- StreamExecutionEnvironment#setStateBackend(AbstractStateBackend)
>(deprecated in 1.5)
>- DataStream#split (deprecated in 1.8)
>- Methods in (Connected)DataStream that specify keys as either indices
>or field names such as DataStream#keyBy, DataStream#partitionCustom,
>ConnectedStream#keyBy,  (deprecated in 1.11)
>
> I think the first three should be straightforward. They are long
> deprecated. The getAccumulators method is not used very often in my
> opinion. The same applies to the DataStream#fold which additionally is not
> very performant. Lastly the setStateBackend has an alternative with a class
> from the AbstractStateBackend hierarchy, therefore it will be still code
> compatible. Moreover if we remove the
> #setStateBackend(AbstractStateBackend) we will get rid off warnings users
> have right now when setting a statebackend as the correct method cannot be
> used without an explicit casting.
>
> As for the DataStream#split I know there were some objections against
> removing the #split method in the past. I still believe the output tags can
> replace the split method already.
>
> The only problem in the last set of methods I propose to remove is that
> they were deprecated only in the last release and those method were only
> partially deprecated. Moreover some of the methods were not deprecated in
> ConnectedStreams. Nevertheless I'd still be inclined to remove the methods
> in this release.
>
> Let me know what do you think about it.
>
> Best,
>
> Dawid
>


Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-17 Thread David Anderson
Kostas,

I'm pleased to see some concrete details in this FLIP.

I wonder if the current proposal goes far enough in the direction of
recognizing the need some users may have for "batch" and "bounded
streaming" to be treated differently. If I've understood it correctly, the
section on scheduling allows me to choose STREAMING scheduling even if I
have bounded sources. I like that approach, because it recognizes that even
though I have bounded inputs, I don't necessarily want batch processing
semantics. I think it makes sense to extend this idea to processing time
support as well.

My thinking is that sometimes in development and testing it's reasonable to
run exactly the same job as in production, except with different sources
and sinks. While it might be a reasonable default, I'm not convinced that
switching a processing time streaming job to read from a bounded source
should always cause it to fail.

David

On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas  wrote:

> Hi all,
>
> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
> API in favour of the DataStream API and the Table API. After this work
> is done, the user will be able to write a program using the DataStream
> API and this will execute efficiently on both bounded and unbounded
> data. But before we reach this point, it is worth discussing and
> agreeing on the semantics of some operations as we transition from the
> streaming world to the batch one.
>
> This thread and the associated FLIP [2] aim at discussing these issues
> as these topics are pretty important to users and can lead to
> unpleasant surprises if we do not pay attention.
>
> Let's have a healthy discussion here and I will be updating the FLIP
> accordingly.
>
> Cheers,
> Kostas
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
>


RE: JobManager refusing connections when running many jobs in parallel?

2020-08-17 Thread Hailu, Andreas
Interesting – what is the JobManager submission bounded by? Does it only allow 
a certain number of submissions per second, or is there a number of threads it 
accepts?

// ah

From: Robert Metzger 
Sent: Tuesday, August 11, 2020 4:46 AM
To: Hailu, Andreas [Engineering] 
Cc: user@flink.apache.org; Shah, Siddharth [Engineering] 

Subject: Re: JobManager refusing connections when running many jobs in parallel?

Thanks for checking.

Your analysis sounds correct. The JM is busy processing job submissions, 
resulting in other submissions not being accepted.

Increasing rest.connection-timeout should resolve your problem.


On Fri, Aug 7, 2020 at 1:59 AM Hailu, Andreas 
mailto:andreas.ha...@gs.com>> wrote:
Thanks for pointing this out. We had a look - the nodes in our cluster have a 
cap of 65K open files and we aren’t breaching 50% per metrics, so I don’t 
believe this is the problem.

The connection refused error makes us think it’s some process using a thread 
pool for the JobManager hitting capacity on a port somewhere. This sound 
correct? Is there a config for us to increase the pool size?

From: Robert Metzger mailto:rmetz...@apache.org>>
Sent: Wednesday, July 29, 2020 1:52:53 AM
To: Hailu, Andreas [Engineering]
Cc: user@flink.apache.org; Shah, Siddharth 
[Engineering]
Subject: Re: JobManager refusing connections when running many jobs in parallel?

Hi Andreas,

Thanks for reaching out .. this should not happen ...
Maybe your operating system has configured low limits for the number of 
concurrent connections / sockets. Maybe this thread is helpful: 
https://stackoverflow.com/questions/923990/why-do-i-get-connection-refused-after-1024-connections
 (there might better SO threads, I didn't put much effort into searching :) )

On Mon, Jul 27, 2020 at 6:31 PM Hailu, Andreas 
mailto:andreas.ha...@gs.com>> wrote:
Hi team,

We’ve observed that when we submit a decent number of jobs in parallel from a 
single Job Master, we encounter job failures due with Connection Refused 
exceptions. We’ve seen this behavior start at 30 jobs running in parallel. It’s 
seemingly transient, however, as upon several retries the job succeeds. The 
surface level error varies, but digging deeper in stack traces it looks to stem 
from the Job Manager no longer accepting connections.

I’ve included a couple of examples below from failed jobs’ driver logs, with 
different errors stemming from a connection refused error:

First example: 15 Task Managers/2 cores/4096 Job Manager memory/12288 Task 
Manager memory - 30 jobs submitted in parallel, each with parallelism of 1
Job Manager is running @ d43723-563.dc.gs.com: 
Using job manager web tracking url http://d43723-563.dc.gs.com:41268";> 
Job Manager Web Interface  (http://d43723-563.dc.gs.com:41268) 
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: 1dfef6303cf0e888231d4c57b4b4e0e6)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
...
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Number of retries has been exhausted.
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:273)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:341)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
at 
org

Re: coordination of sinks

2020-08-17 Thread Fabian Hueske
Hi Marco,

You cannot really synchronize data that is being emitted via different
streams (without bringing them together in an operator).

I see two options:

1) emit the event to create the partition and the data to be written into
the partition to the same stream. Flink guarantees that records do not
overtake records in the same partition. However, you need to ensure that
all records remain in the same partition, for example by partitioning on
the same ke.
2) emit the records to two different streams but have a CoProcessFunction
that processes the create partition and data events. The processing
function would just buffer the data events (in state) until it observes the
create partition event for which it creates the partitions (in a
synchronous fashion). Once the partition is created, it forwards all
buffered data and the remaining data.

Hope this helps,
Fabian

Am Sa., 15. Aug. 2020 um 07:45 Uhr schrieb Marco Villalobos <
mvillalo...@kineteque.com>:

> Given a source that goes into a tumbling window with a process function
> that yields two side outputs, in addition to the main data stream, is it
> possible to coordinate the order of completion
> of sink 1, sink 2, and sink 3 as data leaves the tumbling window?
>
> source -> tumbling window -> process function -> side output tag 1 ->
> sink 1   \-> side output tag 2
> -> sink 2
>  \-> main stream -> sink 3
>
>
> sink 1 will create partitions in PostgreSQL for me.
> sink 2 will insert data into the partitioned table
> sink 3 can happen in any order
> but all of them need to finish before the next window fires.
>
> Any advice will help.
>


Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-17 Thread Kostas Kloudas
Hi Kurt and David,

Thanks a lot for the insightful feedback!

@Kurt: For the topic of checkpointing with Batch Scheduling, I totally
agree with you that it requires a lot more work and careful thinking
on the semantics. This FLIP was written under the assumption that if
the user wants to have checkpoints on bounded input, he/she will have
to go with STREAMING as the scheduling mode. Checkpointing for BATCH
can be handled as a separate topic in the future.

In the case of MIXED workloads and for this FLIP, the scheduling mode
should be set to STREAMING. That is why the AUTOMATIC option sets
scheduling to BATCH only if all the sources are bounded. I am not sure
what are the plans there at the scheduling level, as one could imagine
in the future that in mixed workloads, we schedule first all the
bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
subgraph per application, which is going to be scheduled after all
Bounded ones have finished. Essentially the bounded subgraphs will be
used to bootstrap the unbounded one. But, I am not aware of any plans
towards that direction.


@David: The processing time timer handling is a topic that has also
been discussed in the community in the past, and I do not remember any
final conclusion unfortunately.

In the current context and for bounded input, we chose to favor
reproducibility of the result, as this is expected in batch processing
where the whole input is available in advance. This is why this
proposal suggests to not allow processing time timers. But I
understand your argument that the user may want to be able to run the
same pipeline on batch and streaming this is why we added the two
options under future work, namely (from the FLIP):

```
Future Work: In the future we may consider adding as options the capability of:
* firing all the registered processing time timers at the end of a job
(at close()) or,
* ignoring all the registered processing time timers at the end of a job.
```

Conceptually, we are essentially saying that we assume that batch
execution is assumed to be instantaneous and refers to a single
"point" in time and any processing-time timers for the future may fire
at the end of execution or be ignored (but not throw an exception). I
could also see ignoring the timers in batch as the default, if this
makes more sense.

By the way, do you have any usecases in mind that will help us better
shape our processing time timer handling?

Kostas

On Mon, Aug 17, 2020 at 2:52 PM David Anderson  wrote:
>
> Kostas,
>
> I'm pleased to see some concrete details in this FLIP.
>
> I wonder if the current proposal goes far enough in the direction of 
> recognizing the need some users may have for "batch" and "bounded streaming" 
> to be treated differently. If I've understood it correctly, the section on 
> scheduling allows me to choose STREAMING scheduling even if I have bounded 
> sources. I like that approach, because it recognizes that even though I have 
> bounded inputs, I don't necessarily want batch processing semantics. I think 
> it makes sense to extend this idea to processing time support as well.
>
> My thinking is that sometimes in development and testing it's reasonable to 
> run exactly the same job as in production, except with different sources and 
> sinks. While it might be a reasonable default, I'm not convinced that 
> switching a processing time streaming job to read from a bounded source 
> should always cause it to fail.
>
> David
>
> On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas  wrote:
>>
>> Hi all,
>>
>> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
>> API in favour of the DataStream API and the Table API. After this work
>> is done, the user will be able to write a program using the DataStream
>> API and this will execute efficiently on both bounded and unbounded
>> data. But before we reach this point, it is worth discussing and
>> agreeing on the semantics of some operations as we transition from the
>> streaming world to the batch one.
>>
>> This thread and the associated FLIP [2] aim at discussing these issues
>> as these topics are pretty important to users and can lead to
>> unpleasant surprises if we do not pay attention.
>>
>> Let's have a healthy discussion here and I will be updating the FLIP
>> accordingly.
>>
>> Cheers,
>> Kostas
>>
>> [1] 
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>> [2] 
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522


Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Dawid Wysakowicz
@David Yes, my idea was to remove any use of fold method and all related
classes including WindowedStream#fold

@Klou Good idea to also remove the deprecated enableCheckpointing() &
StreamExecutionEnvironment#readFile and alike. I did another pass over
some of the classes and thought we could also drop:

  * ExecutionConfig#set/getCodeAnalysisMode
  * ExecutionConfig#disable/enableSysoutLogging
  * ExecutionConfig#set/isFailTaskOnCheckpointError
  * ExecutionConfig#isLatencyTrackingEnabled

As for the `forceCheckpointing` I am not fully convinced to doing it. As
far as I know iterations still do not participate in checkpointing
correctly. Therefore it still might make sense to force it. In other
words there is no real alternative to that method. Unless we only remove
the methods from StreamExecutionEnvironment and redirect to the setter
in CheckpointConfig. WDYT?

An updated list of methods I suggest to remove:

  * ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)
  * ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)
  * ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)
  * ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)
  * ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)
  * 
StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
(deprecated in 1.2)
  * RuntimeContext#getAllAccumulators (deprecated in 0.10)
  * DataStream#fold and all related classes and methods such as
FoldFunction, FoldingState, FoldingStateDescriptor ... (deprecated
in 1.3/1.4)
  * StreamExecutionEnvironment#setStateBackend(AbstractStateBackend)
(deprecated in 1.5)
  * DataStream#split (deprecated in 1.8)
  * Methods in (Connected)DataStream that specify keys as either indices
or field names such as DataStream#keyBy, DataStream#partitionCustom,
ConnectedStream#keyBy,  (deprecated in 1.11)

Bear in mind that majority of the options listed above in
ExecutionConfig take no effect. They were left there purely to satisfy
the binary compatibility. Personally I don't see any benefit of leaving
a method and silently dropping the underlying feature. The only
configuration that is respected is setting the number of execution retries.

I also wanted to make it explicit that most of the changes above would
result in a binary incompatibility and require additional exclusions in
the japicmp. Those are:

  * ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)
  * ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)
  * ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)
  * ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)
  * ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)
  * DataStream#fold and all related classes and methods such as
FoldFunction, FoldingState, FoldingStateDescriptor ... (deprecated
in 1.3/1.4)
  * DataStream#split (deprecated in 1.8)
  * Methods in (Connected)DataStream that specify keys as either indices
or field names such as DataStream#keyBy, DataStream#partitionCustom,
ConnectedStream#keyBy,  (deprecated in 1.11)
  * 
StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
(deprecated in 1.2)

Looking forward to more opinions on the issue.

Best,

Dawid


On 17/08/2020 12:49, Kostas Kloudas wrote:
> Thanks a lot for starting this Dawid,
>
> Big +1 for the proposed clean-up, and I would also add the deprecated
> methods of the StreamExecutionEnvironment like:
>
> enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
> enableCheckpointing()
> isForceCheckpointing()
>
> readFile(FileInputFormat inputFormat,String
> filePath,FileProcessingMode watchType,long interval, FilePathFilter
> filter)
> readFileStream(...)
>
> socketTextStream(String hostname, int port, char delimiter, long maxRetry)
> socketTextStream(String hostname, int port, char delimiter)
>
> There are more, like the (get)/setNumberOfExecutionRetries() that were
> deprecated long ago, but I have not investigated to see if they are
> actually easy to remove.
>
> Cheers,
> Kostas
>
> On Mon, Aug 17, 2020 at 10:53 AM Dawid Wysakowicz
>  wrote:
>> Hi devs and users,
>>
>> I wanted to ask you what do you think about removing some of the deprecated 
>> APIs around the DataStream API.
>>
>> The APIs I have in mind are:
>>
>> RuntimeContext#getAllAccumulators (deprecated in 0.10)
>> DataStream#fold and all related classes and methods such as FoldFunction, 
>> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)
>> StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated 
>> in 1.5)
>> DataStream#split (deprecated in 1.8)
>> Methods in (Connected)DataStream that specify keys as either indices or 
>> field names such as DataStream#keyBy, DataStream#partitionCustom, 
>> ConnectedStream#keyBy,  (deprecated in 1.11)
>>
>> I think the 

Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Kostas Kloudas
+1 for removing them.

>From a quick look, most of them (not all) have been deprecated a long time ago.

Cheers,
Kostas

On Mon, Aug 17, 2020 at 9:37 PM Dawid Wysakowicz  wrote:
>
> @David Yes, my idea was to remove any use of fold method and all related 
> classes including WindowedStream#fold
>
> @Klou Good idea to also remove the deprecated enableCheckpointing() & 
> StreamExecutionEnvironment#readFile and alike. I did another pass over some 
> of the classes and thought we could also drop:
>
> ExecutionConfig#set/getCodeAnalysisMode
> ExecutionConfig#disable/enableSysoutLogging
> ExecutionConfig#set/isFailTaskOnCheckpointError
> ExecutionConfig#isLatencyTrackingEnabled
>
> As for the `forceCheckpointing` I am not fully convinced to doing it. As far 
> as I know iterations still do not participate in checkpointing correctly. 
> Therefore it still might make sense to force it. In other words there is no 
> real alternative to that method. Unless we only remove the methods from 
> StreamExecutionEnvironment and redirect to the setter in CheckpointConfig. 
> WDYT?
>
> An updated list of methods I suggest to remove:
>
> ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)
> ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)
> ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)
> ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)
> ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)
> StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
>  (deprecated in 1.2)
> RuntimeContext#getAllAccumulators (deprecated in 0.10)
> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)
> StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated 
> in 1.5)
> DataStream#split (deprecated in 1.8)
> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)
>
> Bear in mind that majority of the options listed above in ExecutionConfig 
> take no effect. They were left there purely to satisfy the binary 
> compatibility. Personally I don't see any benefit of leaving a method and 
> silently dropping the underlying feature. The only configuration that is 
> respected is setting the number of execution retries.
>
> I also wanted to make it explicit that most of the changes above would result 
> in a binary incompatibility and require additional exclusions in the japicmp. 
> Those are:
>
> ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)
> ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)
> ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)
> ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)
> ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)
> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)
> DataStream#split (deprecated in 1.8)
> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)
> StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
>  (deprecated in 1.2)
>
> Looking forward to more opinions on the issue.
>
> Best,
>
> Dawid
>
>
> On 17/08/2020 12:49, Kostas Kloudas wrote:
>
> Thanks a lot for starting this Dawid,
>
> Big +1 for the proposed clean-up, and I would also add the deprecated
> methods of the StreamExecutionEnvironment like:
>
> enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
> enableCheckpointing()
> isForceCheckpointing()
>
> readFile(FileInputFormat inputFormat,String
> filePath,FileProcessingMode watchType,long interval, FilePathFilter
> filter)
> readFileStream(...)
>
> socketTextStream(String hostname, int port, char delimiter, long maxRetry)
> socketTextStream(String hostname, int port, char delimiter)
>
> There are more, like the (get)/setNumberOfExecutionRetries() that were
> deprecated long ago, but I have not investigated to see if they are
> actually easy to remove.
>
> Cheers,
> Kostas
>
> On Mon, Aug 17, 2020 at 10:53 AM Dawid Wysakowicz
>  wrote:
>
> Hi devs and users,
>
> I wanted to ask you what do you think about removing some of the deprecated 
> APIs around the DataStream API.
>
> The APIs I have in mind are:
>
> RuntimeContext#getAllAccumulators (deprecated in 0.10)
> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)
> StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated 
> in 1.5)
> DataStream#split (deprecated in 1.8)
> Methods in (Connected)DataStream th

Re: 回复:How to get the evaluation result of a time-based window aggregation in time after a new event falling into the window?

2020-08-17 Thread Theo Diefenthal
Hi Chengcheng Zhang, 

I think your request is related to this feature request from two years ago here 
[1], with me asking about the status one year ago [2]. 
You might want to upvote this so we can hope that it gets some more attention 
in future. 

Today, it is possible to write your own DataStream API where you customize the 
triggers to your wishes (CountTigger of 1), but with Flink SQL, you sadly lose 
most of that flexibility. 

Thanks @ forideal for mentioning that there is something in the configuration. 
I wasn't aware of that. 

[1] [ 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Do-Not-Support-Custom-Trigger-td20932.html
 | 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Do-Not-Support-Custom-Trigger-td20932.html
 ] 
[2] [ 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Are-there-any-news-on-custom-trigger-support-for-SQL-Table-API-td29600.html
 | 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Are-there-any-news-on-custom-trigger-support-for-SQL-Table-API-td29600.html
 ] 

Best regards 
Theo 


Von: "forideal"  
An: "Chengcheng Zhang" <274522...@qq.com> 
CC: "user"  
Gesendet: Sonntag, 16. August 2020 09:14:23 
Betreff: Re:回复:How to get the evaluation result of a time-based window 
aggregation in time after a new event falling into the window? 

Hi Chengcheng Zhang, 

You are welcome. 
I also got the material from the community to answer your question. 
There is also a hidden method here, as if the community has not been placed 
publicly in the document. 
table.exec.emit.early-fire.enabled = true 
table.exec.emit.early-fire.delay = 60 s 
[1] [ http://apache-flink.147419.n8.nabble.com/FLINKSQL1-10-UV-td4003.html | 
http://apache-flink.147419.n8.nabble.com/FLINKSQL1-10-UV-td4003.html ] 

Best, forideal 


















At 2020-08-16 13:21:25, "Chengcheng Zhang" <274522...@qq.com> wrote: 


Hi, forideal 
Thank you so much, it does help a lot. 
The approach you mentioned earlier, happened to be the same path we took two 
days ago, and it worked well as expected. 
To be honest, after some effort-taking searches on the Internet, I' am a little 
convinced that, this maybe the best solution at the moment. However, the 
time-based window aggregation is a great feature in Flink, as we all know. 
Would it be perfect if we could use time-based windows and still get the latest 
result at the same time? 
Best, Chengcheng Zhang 


-- 原始邮件 -- 
发件人: "forideal" ; 
发送时间: 2020年8月16日(星期天) 中午12:24 
收件人: "Chengcheng Zhang"<274522...@qq.com>; 
抄送: "user"; 
主题: Re:How to get the evaluation result of a time-based window aggregation in 
time after a new event falling into the window? 

Hi Chengcheng Zhang, 
Is this your scene? For example, every day is divided into 12 hours, let’s take 
today as an example, 2020081600 2020081601,...2020081623 
For example, if we count pv, we can count like this 
INSERT INTO cumulative_pv 
SELECT time_str, count(1) 
FROM pv_per_hour 
GROUP BY time_str; 
In this sql, time_str is an hour in 2020081600, 2020081601,...2020081623. 

[1] [ http://apache-flink.147419.n8.nabble.com/flink-sql-5-5-td2011.html | 
http://apache-flink.147419.n8.nabble.com/flink-sql-5-5-td2011.html ] 
[2] [ 
http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/
 | 
http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/
 ] 

Hope this helps. 

Best, forideal 







At 2020-08-16 12:05:04, "Chengcheng Zhang" <274522...@qq.com> wrote: 
BQ_BEGIN

Hi, 
I'm a new user of Flink, and have been puzzled a lot by the time-based window 
aggregation result. 
For our business, hourly and daily reports have to been created best in a real 
time style. So, I used a event-time based window aggregation to consume the 
Kafka data stream, but found that, only after the current hour or day passed, 
the newest result could be seen on console or upserted to MySQL. 
How can I get the latest window result immediately after a stream record 
falling into it? Is there a specific configuration option for this, hopefully? 
Please help and rescue me. 
Best regards. 









BQ_END



How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread 范超
Hi, Dev and Users
I’ve 3 machines each one is 8 cores and 16GB memory.
Following it’s my Resource Manager screenshot the cluster have 36GB total.
I specify the paralism to 3 or even up to 12,  But the task manager is always 
running on two nodes not all three machine, the third node does not start the 
task manager.
I tried set the �Cp �Ctm �Cjm parameters, but it always the same, only 
different is more container on the two maching but not all three machine start 
the task manager.
My question is how to set the cli parameter to start all of my three machine 
(all task manager start on 3 machines)

Thanks a lot
[cid:image001.png@01D67546.62291B70]


Chao fan

<>

Re: Performance Flink streaming kafka consumer sink to s3

2020-08-17 Thread Vijayendra Yadav
Hi, Do you think there can be any issue with Flinks performance, with 400Kb
up to 1 MB payload record sizes ? my Spark streaming seems to be doing
better. Are there any recommended configurations or increasing parallelism
to improve Flink streaming  using flink kafka connect?

Regards,
Vijay


On Fri, Aug 14, 2020 at 2:04 PM Vijayendra Yadav 
wrote:

> Hi Robert,
>
> Thanks for information. payloads so far are 400KB (each record).
> To achieve high parallelism at the downstream operator do I rebalance the
> kafka stream ? Could you give me an example please.
>
> Regards,
> Vijay
>
>
> On Fri, Aug 14, 2020 at 12:50 PM Robert Metzger 
> wrote:
>
>> Hi,
>>
>> Also, can we increase parallel processing, beyond the number of
>>> kafka partitions that we have, without causing any overhead ?
>>
>>
>> Yes, the Kafka sources produce a tiny bit of overhead, but the potential
>> benefit of having downstream operators at a high parallelism might be much
>> bigger.
>>
>> How large is a large payload in your case?
>>
>> Best practices:
>> Try to understand what's causing the performance slowdown: Kafka or S3 ?
>> You can do a test where you read from kafka, and write it into a
>> discarding sink.
>> Likewise, use a datagenerator source, and write into S3.
>>
>> Do the math on your job: What's the theoretical limits of your job:
>> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
>>
>> Hope this helps,
>> Robert
>>
>>
>> On Thu, Aug 13, 2020 at 11:25 PM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Team,
>>>
>>> I am trying to increase throughput of my flink stream job streaming from
>>> kafka source and sink to s3. Currently it is running fine for small events
>>> records. But records with large payloads are running extremely slow like at
>>> rate 2 TPS.
>>>
>>> Could you provide some best practices to tune?
>>> Also, can we increase parallel processing, beyond the number of
>>> kafka partitions that we have, without causing any overhead ?
>>>
>>> Regards,
>>> Vijay
>>>
>>


Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread Yangze Guo
Hi,

Do you start the NodeManager in all the three machines? If so, could
you check all the NMs correctly connect to the ResourceManager?

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
>
> Hi, Dev and Users
> I’ve 3 machines each one is 8 cores and 16GB memory.
> Following it’s my Resource Manager screenshot the cluster have 36GB total.
> I specify the paralism to 3 or even up to 12,  But the task manager is always 
> running on two nodes not all three machine, the third node does not start the 
> task manager.
> I tried set the –p –tm –jm parameters, but it always the same, only different 
> is more container on the two maching but not all three machine start the task 
> manager.
> My question is how to set the cli parameter to start all of my three machine 
> (all task manager start on 3 machines)
>
> Thanks a lot
> [cid:image001.png@01D67546.62291B70]
>
>
> Chao fan
>


答复: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread 范超
Thanks Yangze

All 3 machines NodeManager is started.

I just don't know why not three machines each running a Flink TaskManager and 
how to achieve this

-邮件原件-
发件人: Yangze Guo [mailto:karma...@gmail.com] 
发送时间: 2020年8月18日 星期二 10:10
收件人: 范超 
抄送: user (user@flink.apache.org) 
主题: Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job 
Mode

Hi,

Do you start the NodeManager in all the three machines? If so, could you check 
all the NMs correctly connect to the ResourceManager?

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
>
> Hi, Dev and Users
> I’ve 3 machines each one is 8 cores and 16GB memory.
> Following it’s my Resource Manager screenshot the cluster have 36GB total.
> I specify the paralism to 3 or even up to 12,  But the task manager is always 
> running on two nodes not all three machine, the third node does not start the 
> task manager.
> I tried set the –p –tm –jm parameters, but it always the same, only different 
> is more container on the two maching but not all three machine start the task 
> manager.
> My question is how to set the cli parameter to start all of my three 
> machine (all task manager start on 3 machines)
>
> Thanks a lot
> [cid:image001.png@01D67546.62291B70]
>
>
> Chao fan
>


Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread Yangze Guo
Hi,

I think that is only related to the Yarn scheduling strategy. AFAIK,
Flink could not control it. You could check the RM log to figure out
why it did not schedule the containers to all the three machines. BTW,
if you have specific requirements to start with all the three
machines, how about deploying a standalone cluster instead?

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 10:24 AM 范超  wrote:
>
> Thanks Yangze
>
> All 3 machines NodeManager is started.
>
> I just don't know why not three machines each running a Flink TaskManager and 
> how to achieve this
>
> -邮件原件-
> 发件人: Yangze Guo [mailto:karma...@gmail.com]
> 发送时间: 2020年8月18日 星期二 10:10
> 收件人: 范超 
> 抄送: user (user@flink.apache.org) 
> 主题: Re: How to specify the number of TaskManagers in Yarn Cluster using 
> Per-Job Mode
>
> Hi,
>
> Do you start the NodeManager in all the three machines? If so, could you 
> check all the NMs correctly connect to the ResourceManager?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
> >
> > Hi, Dev and Users
> > I’ve 3 machines each one is 8 cores and 16GB memory.
> > Following it’s my Resource Manager screenshot the cluster have 36GB total.
> > I specify the paralism to 3 or even up to 12,  But the task manager is 
> > always running on two nodes not all three machine, the third node does not 
> > start the task manager.
> > I tried set the –p –tm –jm parameters, but it always the same, only 
> > different is more container on the two maching but not all three machine 
> > start the task manager.
> > My question is how to set the cli parameter to start all of my three
> > machine (all task manager start on 3 machines)
> >
> > Thanks a lot
> > [cid:image001.png@01D67546.62291B70]
> >
> >
> > Chao fan
> >


Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread Yangze Guo
Hi,

Flink can control how many TM to start, but where to start the TMs
depends on Yarn.

Do you meet any problem when deploying on Yarn or running Flink job?
Why do you need to start the TMs on all the three machines?

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 11:25 AM 范超  wrote:
>
> Thanks Yangze
> The reason why I don’t deploying a standalone cluster, it's because there 
> kafka, kudu, hadoop, zookeeper on these machines, maybe currently using the 
> yarn to manage resources is the best choice for me.
> If Flink can not control how many tm to start , could anyone providing me 
> some best practice for deploying on yarn please? I read the [1] and still 
> don't very clear
>
> [1] 
> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
>
> -邮件原件-
> 发件人: Yangze Guo [mailto:karma...@gmail.com]
> 发送时间: 2020年8月18日 星期二 10:50
> 收件人: 范超 
> 抄送: user (user@flink.apache.org) 
> 主题: Re: How to specify the number of TaskManagers in Yarn Cluster using 
> Per-Job Mode
>
> Hi,
>
> I think that is only related to the Yarn scheduling strategy. AFAIK, Flink 
> could not control it. You could check the RM log to figure out why it did not 
> schedule the containers to all the three machines. BTW, if you have specific 
> requirements to start with all the three machines, how about deploying a 
> standalone cluster instead?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 18, 2020 at 10:24 AM 范超  wrote:
> >
> > Thanks Yangze
> >
> > All 3 machines NodeManager is started.
> >
> > I just don't know why not three machines each running a Flink
> > TaskManager and how to achieve this
> >
> > -邮件原件-
> > 发件人: Yangze Guo [mailto:karma...@gmail.com]
> > 发送时间: 2020年8月18日 星期二 10:10
> > 收件人: 范超 
> > 抄送: user (user@flink.apache.org) 
> > 主题: Re: How to specify the number of TaskManagers in Yarn Cluster
> > using Per-Job Mode
> >
> > Hi,
> >
> > Do you start the NodeManager in all the three machines? If so, could you 
> > check all the NMs correctly connect to the ResourceManager?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
> > >
> > > Hi, Dev and Users
> > > I’ve 3 machines each one is 8 cores and 16GB memory.
> > > Following it’s my Resource Manager screenshot the cluster have 36GB total.
> > > I specify the paralism to 3 or even up to 12,  But the task manager is 
> > > always running on two nodes not all three machine, the third node does 
> > > not start the task manager.
> > > I tried set the –p –tm –jm parameters, but it always the same, only 
> > > different is more container on the two maching but not all three machine 
> > > start the task manager.
> > > My question is how to set the cli parameter to start all of my three
> > > machine (all task manager start on 3 machines)
> > >
> > > Thanks a lot
> > > [cid:image001.png@01D67546.62291B70]
> > >
> > >
> > > Chao fan
> > >


答复: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread 范超
Thanks Yangze
The reason why I don’t deploying a standalone cluster, it's because there 
kafka, kudu, hadoop, zookeeper on these machines, maybe currently using the 
yarn to manage resources is the best choice for me.
If Flink can not control how many tm to start , could anyone providing me some 
best practice for deploying on yarn please? I read the [1] and still don't very 
clear

[1] 
https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines

-邮件原件-
发件人: Yangze Guo [mailto:karma...@gmail.com] 
发送时间: 2020年8月18日 星期二 10:50
收件人: 范超 
抄送: user (user@flink.apache.org) 
主题: Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job 
Mode

Hi,

I think that is only related to the Yarn scheduling strategy. AFAIK, Flink 
could not control it. You could check the RM log to figure out why it did not 
schedule the containers to all the three machines. BTW, if you have specific 
requirements to start with all the three machines, how about deploying a 
standalone cluster instead?

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 10:24 AM 范超  wrote:
>
> Thanks Yangze
>
> All 3 machines NodeManager is started.
>
> I just don't know why not three machines each running a Flink 
> TaskManager and how to achieve this
>
> -邮件原件-
> 发件人: Yangze Guo [mailto:karma...@gmail.com]
> 发送时间: 2020年8月18日 星期二 10:10
> 收件人: 范超 
> 抄送: user (user@flink.apache.org) 
> 主题: Re: How to specify the number of TaskManagers in Yarn Cluster 
> using Per-Job Mode
>
> Hi,
>
> Do you start the NodeManager in all the three machines? If so, could you 
> check all the NMs correctly connect to the ResourceManager?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
> >
> > Hi, Dev and Users
> > I’ve 3 machines each one is 8 cores and 16GB memory.
> > Following it’s my Resource Manager screenshot the cluster have 36GB total.
> > I specify the paralism to 3 or even up to 12,  But the task manager is 
> > always running on two nodes not all three machine, the third node does not 
> > start the task manager.
> > I tried set the –p –tm –jm parameters, but it always the same, only 
> > different is more container on the two maching but not all three machine 
> > start the task manager.
> > My question is how to set the cli parameter to start all of my three 
> > machine (all task manager start on 3 machines)
> >
> > Thanks a lot
> > [cid:image001.png@01D67546.62291B70]
> >
> >
> > Chao fan
> >


Re: Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Yun Gao
+1 for removing the methods that are deprecated for a while & have alternative 
methods.

One specific thing is that if we remove the DataStream#split, do we consider 
enabling side-output in more operators in the future ? Currently it should be 
only available in ProcessFunctions, but not available to other commonly used 
UDF like Source or AsyncFunction[1].

One temporary solution occurs to me is to add a ProcessFunction after the 
operators want to use side-output. But I think the solution is not very direct 
to come up with and if it really works we might add it to the document of 
side-output. 

[1] https://issues.apache.org/jira/browse/FLINK-7954

Best,
 Yun


 --Original Mail --
Sender:Kostas Kloudas 
Send Date:Tue Aug 18 03:52:44 2020
Recipients:Dawid Wysakowicz 
CC:dev , user 
Subject:Re: [DISCUSS] Removing deprecated methods from DataStream API
+1 for removing them.



From a quick look, most of them (not all) have been deprecated a long time ago.



Cheers,

Kostas



On Mon, Aug 17, 2020 at 9:37 PM Dawid Wysakowicz  wrote:

>

> @David Yes, my idea was to remove any use of fold method and all related 
> classes including WindowedStream#fold

>

> @Klou Good idea to also remove the deprecated enableCheckpointing() & 
> StreamExecutionEnvironment#readFile and alike. I did another pass over some 
> of the classes and thought we could also drop:

>

> ExecutionConfig#set/getCodeAnalysisMode

> ExecutionConfig#disable/enableSysoutLogging

> ExecutionConfig#set/isFailTaskOnCheckpointError

> ExecutionConfig#isLatencyTrackingEnabled

>

> As for the `forceCheckpointing` I am not fully convinced to doing it. As far 
> as I know iterations still do not participate in checkpointing correctly. 
> Therefore it still might make sense to force it. In other words there is no 
> real alternative to that method. Unless we only remove the methods from 
> StreamExecutionEnvironment and redirect to the setter in CheckpointConfig. 
> WDYT?

>

> An updated list of methods I suggest to remove:

>

> ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)

> ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)

> ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)

> ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)

> ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)

> StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
>  (deprecated in 1.2)

> RuntimeContext#getAllAccumulators (deprecated in 0.10)

> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)

> StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated 
> in 1.5)

> DataStream#split (deprecated in 1.8)

> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)

>

> Bear in mind that majority of the options listed above in ExecutionConfig 
> take no effect. They were left there purely to satisfy the binary 
> compatibility. Personally I don't see any benefit of leaving a method and 
> silently dropping the underlying feature. The only configuration that is 
> respected is setting the number of execution retries.

>

> I also wanted to make it explicit that most of the changes above would result 
> in a binary incompatibility and require additional exclusions in the japicmp. 
> Those are:

>

> ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)

> ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)

> ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)

> ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)

> ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)

> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)

> DataStream#split (deprecated in 1.8)

> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)

> StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
>  (deprecated in 1.2)

>

> Looking forward to more opinions on the issue.

>

> Best,

>

> Dawid

>

>

> On 17/08/2020 12:49, Kostas Kloudas wrote:

>

> Thanks a lot for starting this Dawid,

>

> Big +1 for the proposed clean-up, and I would also add the deprecated

> methods of the StreamExecutionEnvironment like:

>

> enableCheckpointing(long interval, CheckpointingMode mode, boolean force)

> enableCheckpointing()

> isForceCheckpointing()

>

> readFile(FileInputFormat inputFormat,String

> filePath,FileProcessingMode watchType,long interval, FilePathFilter

> fi

Flink checkpoint recovery time

2020-08-17 Thread Zhinan Cheng
Hi all,

I am working on measuring the failure recovery time of Flink and I want to
decompose the recovery time into different parts, say the time to detect
the failure, the time to restart the job, and the time to
restore the checkpointing.

I found that I can measure the down time during failure and the time to
restart the job and some metric for the checkpointing as below.

[image: measure.png]
Unfortunately, I cannot find any information about the failure detect time
and checkpoint recovery time, Is there any way that Flink has provided for
this, otherwise, how can I solve this?

Thanks a lot for your help.

Regards,


Re: Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-17 Thread Yun Gao
Hi, 

Very thanks for bringing up this discussion!

One more question is that does the BATCH and STREAMING mode also decides 
the shuffle types and operators? I'm asking so because that even for blocking 
mode, it should also benefit from keeping some edges to be pipeline if the 
resources are known to be enough. Do we also consider to expose more 
fine-grained control on the shuffle types? 

Best,
 Yun 



 --Original Mail --
Sender:Kostas Kloudas 
Send Date:Tue Aug 18 02:24:21 2020
Recipients:David Anderson 
CC:dev , user 
Subject:Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input
Hi Kurt and David,

Thanks a lot for the insightful feedback!

@Kurt: For the topic of checkpointing with Batch Scheduling, I totally
agree with you that it requires a lot more work and careful thinking
on the semantics. This FLIP was written under the assumption that if
the user wants to have checkpoints on bounded input, he/she will have
to go with STREAMING as the scheduling mode. Checkpointing for BATCH
can be handled as a separate topic in the future.

In the case of MIXED workloads and for this FLIP, the scheduling mode
should be set to STREAMING. That is why the AUTOMATIC option sets
scheduling to BATCH only if all the sources are bounded. I am not sure
what are the plans there at the scheduling level, as one could imagine
in the future that in mixed workloads, we schedule first all the
bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
subgraph per application, which is going to be scheduled after all
Bounded ones have finished. Essentially the bounded subgraphs will be
used to bootstrap the unbounded one. But, I am not aware of any plans
towards that direction.


@David: The processing time timer handling is a topic that has also
been discussed in the community in the past, and I do not remember any
final conclusion unfortunately.

In the current context and for bounded input, we chose to favor
reproducibility of the result, as this is expected in batch processing
where the whole input is available in advance. This is why this
proposal suggests to not allow processing time timers. But I
understand your argument that the user may want to be able to run the
same pipeline on batch and streaming this is why we added the two
options under future work, namely (from the FLIP):

```
Future Work: In the future we may consider adding as options the capability of:
* firing all the registered processing time timers at the end of a job
(at close()) or,
* ignoring all the registered processing time timers at the end of a job.
```

Conceptually, we are essentially saying that we assume that batch
execution is assumed to be instantaneous and refers to a single
"point" in time and any processing-time timers for the future may fire
at the end of execution or be ignored (but not throw an exception). I
could also see ignoring the timers in batch as the default, if this
makes more sense.

By the way, do you have any usecases in mind that will help us better
shape our processing time timer handling?

Kostas

On Mon, Aug 17, 2020 at 2:52 PM David Anderson  wrote:
>
> Kostas,
>
> I'm pleased to see some concrete details in this FLIP.
>
> I wonder if the current proposal goes far enough in the direction of 
> recognizing the need some users may have for "batch" and "bounded streaming" 
> to be treated differently. If I've understood it correctly, the section on 
> scheduling allows me to choose STREAMING scheduling even if I have bounded 
> sources. I like that approach, because it recognizes that even though I have 
> bounded inputs, I don't necessarily want batch processing semantics. I think 
> it makes sense to extend this idea to processing time support as well.
>
> My thinking is that sometimes in development and testing it's reasonable to 
> run exactly the same job as in production, except with different sources and 
> sinks. While it might be a reasonable default, I'm not convinced that 
> switching a processing time streaming job to read from a bounded source 
> should always cause it to fail.
>
> David
>
> On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas  wrote:
>>
>> Hi all,
>>
>> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
>> API in favour of the DataStream API and the Table API. After this work
>> is done, the user will be able to write a program using the DataStream
>> API and this will execute efficiently on both bounded and unbounded
>> data. But before we reach this point, it is worth discussing and
>> agreeing on the semantics of some operations as we transition from the
>> streaming world to the batch one.
>>
>> This thread and the associated FLIP [2] aim at discussing these issues
>> as these topics are pretty important to users and can lead to
>> unpleasant surprises if we do not pay attention.
>>
>> Let's have a healthy discussion here and I will be updating the FLIP
>> accordingly.
>>
>> Cheers,
>> Kostas
>>
>> [1] 
>> https://cwik

答复: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread 范超
Thanks Yangze

1. Do you meet any problem when deploying on Yarn or running Flink job?
My job works well

2. Why do you need to start the TMs on all the three machines?
From cluster perspective, I wonder if the process pressure can be balance to 3 
machines.

3. Flink can control how many TM to start, but where to start the TMs depends 
on Yarn.
Yes, the job where to start the TM is depend on Yarn.
Could you please tell me parameter controls how many TM to start, the yn 
parameter is delete from 1.10 as the 1.9 doc sample list[1] below

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/cli.html

Run example program using a per-job YARN cluster with 2 TaskManagers:

./bin/flink run -m yarn-cluster -yn 2 \
   ./examples/batch/WordCount.jar \
   --input hdfs:///user/hamlet.txt --output 
hdfs:///user/wordcount_out

-邮件原件-
发件人: Yangze Guo [mailto:karma...@gmail.com] 
发送时间: 2020年8月18日 星期二 11:31
收件人: 范超 
抄送: user (user@flink.apache.org) 
主题: Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job 
Mode

Hi,

Flink can control how many TM to start, but where to start the TMs depends on 
Yarn.

Do you meet any problem when deploying on Yarn or running Flink job?
Why do you need to start the TMs on all the three machines?

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 11:25 AM 范超  wrote:
>
> Thanks Yangze
> The reason why I don’t deploying a standalone cluster, it's because there 
> kafka, kudu, hadoop, zookeeper on these machines, maybe currently using the 
> yarn to manage resources is the best choice for me.
> If Flink can not control how many tm to start , could anyone providing 
> me some best practice for deploying on yarn please? I read the [1] and 
> still don't very clear
>
> [1] 
> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-g
> eneral-guidelines
>
> -邮件原件-
> 发件人: Yangze Guo [mailto:karma...@gmail.com]
> 发送时间: 2020年8月18日 星期二 10:50
> 收件人: 范超 
> 抄送: user (user@flink.apache.org) 
> 主题: Re: How to specify the number of TaskManagers in Yarn Cluster 
> using Per-Job Mode
>
> Hi,
>
> I think that is only related to the Yarn scheduling strategy. AFAIK, Flink 
> could not control it. You could check the RM log to figure out why it did not 
> schedule the containers to all the three machines. BTW, if you have specific 
> requirements to start with all the three machines, how about deploying a 
> standalone cluster instead?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 18, 2020 at 10:24 AM 范超  wrote:
> >
> > Thanks Yangze
> >
> > All 3 machines NodeManager is started.
> >
> > I just don't know why not three machines each running a Flink 
> > TaskManager and how to achieve this
> >
> > -邮件原件-
> > 发件人: Yangze Guo [mailto:karma...@gmail.com]
> > 发送时间: 2020年8月18日 星期二 10:10
> > 收件人: 范超 
> > 抄送: user (user@flink.apache.org) 
> > 主题: Re: How to specify the number of TaskManagers in Yarn Cluster 
> > using Per-Job Mode
> >
> > Hi,
> >
> > Do you start the NodeManager in all the three machines? If so, could you 
> > check all the NMs correctly connect to the ResourceManager?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
> > >
> > > Hi, Dev and Users
> > > I’ve 3 machines each one is 8 cores and 16GB memory.
> > > Following it’s my Resource Manager screenshot the cluster have 36GB total.
> > > I specify the paralism to 3 or even up to 12,  But the task manager is 
> > > always running on two nodes not all three machine, the third node does 
> > > not start the task manager.
> > > I tried set the –p –tm –jm parameters, but it always the same, only 
> > > different is more container on the two maching but not all three machine 
> > > start the task manager.
> > > My question is how to set the cli parameter to start all of my 
> > > three machine (all task manager start on 3 machines)
> > >
> > > Thanks a lot
> > > [cid:image001.png@01D67546.62291B70]
> > >
> > >
> > > Chao fan
> > >


Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread Yangze Guo
The number of TM mainly depends on the parallelism and job graph.
Flink now allows you to set the maximum slots number
(slotmanager-number-of-slots-max[1]). There is also a plan to support
setting the minimum number of slots[2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#slotmanager-number-of-slots-max
[2] https://issues.apache.org/jira/browse/FLINK-15959

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 12:21 PM 范超  wrote:
>
> Thanks Yangze
>
> 1. Do you meet any problem when deploying on Yarn or running Flink job?
> My job works well
>
> 2. Why do you need to start the TMs on all the three machines?
> From cluster perspective, I wonder if the process pressure can be balance to 
> 3 machines.
>
> 3. Flink can control how many TM to start, but where to start the TMs depends 
> on Yarn.
> Yes, the job where to start the TM is depend on Yarn.
> Could you please tell me parameter controls how many TM to start, the yn 
> parameter is delete from 1.10 as the 1.9 doc sample list[1] below
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/cli.html
>
> Run example program using a per-job YARN cluster with 2 TaskManagers:
>
> ./bin/flink run -m yarn-cluster -yn 2 \
>./examples/batch/WordCount.jar \
>--input hdfs:///user/hamlet.txt --output 
> hdfs:///user/wordcount_out
>
> -邮件原件-
> 发件人: Yangze Guo [mailto:karma...@gmail.com]
> 发送时间: 2020年8月18日 星期二 11:31
> 收件人: 范超 
> 抄送: user (user@flink.apache.org) 
> 主题: Re: How to specify the number of TaskManagers in Yarn Cluster using 
> Per-Job Mode
>
> Hi,
>
> Flink can control how many TM to start, but where to start the TMs depends on 
> Yarn.
>
> Do you meet any problem when deploying on Yarn or running Flink job?
> Why do you need to start the TMs on all the three machines?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 18, 2020 at 11:25 AM 范超  wrote:
> >
> > Thanks Yangze
> > The reason why I don’t deploying a standalone cluster, it's because there 
> > kafka, kudu, hadoop, zookeeper on these machines, maybe currently using the 
> > yarn to manage resources is the best choice for me.
> > If Flink can not control how many tm to start , could anyone providing
> > me some best practice for deploying on yarn please? I read the [1] and
> > still don't very clear
> >
> > [1]
> > https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-g
> > eneral-guidelines
> >
> > -邮件原件-
> > 发件人: Yangze Guo [mailto:karma...@gmail.com]
> > 发送时间: 2020年8月18日 星期二 10:50
> > 收件人: 范超 
> > 抄送: user (user@flink.apache.org) 
> > 主题: Re: How to specify the number of TaskManagers in Yarn Cluster
> > using Per-Job Mode
> >
> > Hi,
> >
> > I think that is only related to the Yarn scheduling strategy. AFAIK, Flink 
> > could not control it. You could check the RM log to figure out why it did 
> > not schedule the containers to all the three machines. BTW, if you have 
> > specific requirements to start with all the three machines, how about 
> > deploying a standalone cluster instead?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Aug 18, 2020 at 10:24 AM 范超  wrote:
> > >
> > > Thanks Yangze
> > >
> > > All 3 machines NodeManager is started.
> > >
> > > I just don't know why not three machines each running a Flink
> > > TaskManager and how to achieve this
> > >
> > > -邮件原件-
> > > 发件人: Yangze Guo [mailto:karma...@gmail.com]
> > > 发送时间: 2020年8月18日 星期二 10:10
> > > 收件人: 范超 
> > > 抄送: user (user@flink.apache.org) 
> > > 主题: Re: How to specify the number of TaskManagers in Yarn Cluster
> > > using Per-Job Mode
> > >
> > > Hi,
> > >
> > > Do you start the NodeManager in all the three machines? If so, could you 
> > > check all the NMs correctly connect to the ResourceManager?
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
> > > >
> > > > Hi, Dev and Users
> > > > I’ve 3 machines each one is 8 cores and 16GB memory.
> > > > Following it’s my Resource Manager screenshot the cluster have 36GB 
> > > > total.
> > > > I specify the paralism to 3 or even up to 12,  But the task manager is 
> > > > always running on two nodes not all three machine, the third node does 
> > > > not start the task manager.
> > > > I tried set the –p –tm –jm parameters, but it always the same, only 
> > > > different is more container on the two maching but not all three 
> > > > machine start the task manager.
> > > > My question is how to set the cli parameter to start all of my
> > > > three machine (all task manager start on 3 machines)
> > > >
> > > > Thanks a lot
> > > > [cid:image001.png@01D67546.62291B70]
> > > >
> > > >
> > > > Chao fan
> > > >


答复: How to specify the number of TaskManagers in Yarn Cluster using Per-Job Mode

2020-08-17 Thread 范超
Thanks Yangze for providing these links I'll try it !

-邮件原件-
发件人: Yangze Guo [mailto:karma...@gmail.com] 
发送时间: 2020年8月18日 星期二 12:57
收件人: 范超 
抄送: user (user@flink.apache.org) 
主题: Re: How to specify the number of TaskManagers in Yarn Cluster using Per-Job 
Mode

The number of TM mainly depends on the parallelism and job graph.
Flink now allows you to set the maximum slots number 
(slotmanager-number-of-slots-max[1]). There is also a plan to support setting 
the minimum number of slots[2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#slotmanager-number-of-slots-max
[2] https://issues.apache.org/jira/browse/FLINK-15959

Best,
Yangze Guo

On Tue, Aug 18, 2020 at 12:21 PM 范超  wrote:
>
> Thanks Yangze
>
> 1. Do you meet any problem when deploying on Yarn or running Flink job?
> My job works well
>
> 2. Why do you need to start the TMs on all the three machines?
> From cluster perspective, I wonder if the process pressure can be balance to 
> 3 machines.
>
> 3. Flink can control how many TM to start, but where to start the TMs depends 
> on Yarn.
> Yes, the job where to start the TM is depend on Yarn.
> Could you please tell me parameter controls how many TM to start, the 
> yn parameter is delete from 1.10 as the 1.9 doc sample list[1] below
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/cli.ht
> ml
>
> Run example program using a per-job YARN cluster with 2 TaskManagers:
>
> ./bin/flink run -m yarn-cluster -yn 2 \
>./examples/batch/WordCount.jar \
>--input hdfs:///user/hamlet.txt --output 
> hdfs:///user/wordcount_out
>
> -邮件原件-
> 发件人: Yangze Guo [mailto:karma...@gmail.com]
> 发送时间: 2020年8月18日 星期二 11:31
> 收件人: 范超 
> 抄送: user (user@flink.apache.org) 
> 主题: Re: How to specify the number of TaskManagers in Yarn Cluster 
> using Per-Job Mode
>
> Hi,
>
> Flink can control how many TM to start, but where to start the TMs depends on 
> Yarn.
>
> Do you meet any problem when deploying on Yarn or running Flink job?
> Why do you need to start the TMs on all the three machines?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 18, 2020 at 11:25 AM 范超  wrote:
> >
> > Thanks Yangze
> > The reason why I don’t deploying a standalone cluster, it's because there 
> > kafka, kudu, hadoop, zookeeper on these machines, maybe currently using the 
> > yarn to manage resources is the best choice for me.
> > If Flink can not control how many tm to start , could anyone 
> > providing me some best practice for deploying on yarn please? I read 
> > the [1] and still don't very clear
> >
> > [1]
> > https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster
> > -g
> > eneral-guidelines
> >
> > -邮件原件-
> > 发件人: Yangze Guo [mailto:karma...@gmail.com]
> > 发送时间: 2020年8月18日 星期二 10:50
> > 收件人: 范超 
> > 抄送: user (user@flink.apache.org) 
> > 主题: Re: How to specify the number of TaskManagers in Yarn Cluster 
> > using Per-Job Mode
> >
> > Hi,
> >
> > I think that is only related to the Yarn scheduling strategy. AFAIK, Flink 
> > could not control it. You could check the RM log to figure out why it did 
> > not schedule the containers to all the three machines. BTW, if you have 
> > specific requirements to start with all the three machines, how about 
> > deploying a standalone cluster instead?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Aug 18, 2020 at 10:24 AM 范超  wrote:
> > >
> > > Thanks Yangze
> > >
> > > All 3 machines NodeManager is started.
> > >
> > > I just don't know why not three machines each running a Flink 
> > > TaskManager and how to achieve this
> > >
> > > -邮件原件-
> > > 发件人: Yangze Guo [mailto:karma...@gmail.com]
> > > 发送时间: 2020年8月18日 星期二 10:10
> > > 收件人: 范超 
> > > 抄送: user (user@flink.apache.org) 
> > > 主题: Re: How to specify the number of TaskManagers in Yarn Cluster 
> > > using Per-Job Mode
> > >
> > > Hi,
> > >
> > > Do you start the NodeManager in all the three machines? If so, could you 
> > > check all the NMs correctly connect to the ResourceManager?
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Aug 18, 2020 at 10:01 AM 范超  wrote:
> > > >
> > > > Hi, Dev and Users
> > > > I’ve 3 machines each one is 8 cores and 16GB memory.
> > > > Following it’s my Resource Manager screenshot the cluster have 36GB 
> > > > total.
> > > > I specify the paralism to 3 or even up to 12,  But the task manager is 
> > > > always running on two nodes not all three machine, the third node does 
> > > > not start the task manager.
> > > > I tried set the –p –tm –jm parameters, but it always the same, only 
> > > > different is more container on the two maching but not all three 
> > > > machine start the task manager.
> > > > My question is how to set the cli parameter to start all of my 
> > > > three machine (all task manager start on 3 machines)
> > > >
> > > > Thanks a lot
> > > > [cid:image001.png@01D67546.62291B70]
> > > >
> > > >
> > > > Chao fan
> > > >