Re: Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-18 Thread Biao Liu
Hi Mans, That's indeed a problem. We have a plan to fix it. I think it could be included in 1.11. You could follow this issue [1] to check the progress. [1] https://issues.apache.org/jira/browse/FLINK-9543 Thanks, Biao /'bɪ.aʊ/ On Thu, 19 Dec 2019 at 14:51, vino yang wrote: > Hi Mans, > > I

Re: Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-18 Thread vino yang
Hi Mans, IMO, one job manager represents one Flink cluster and one Flink cluster has a suite of Flink configuration e.g. metrics reporter. Some metrics reporters support tag feature, you can specify it to distinguish different Flink cluster.[1] [1]: https://ci.apache.org/projects/flink/flink-doc

Kafka table descriptor missing startFromTimestamp()

2019-12-18 Thread Steve Whelan
Examining the org.apache.flink.table.descriptors.Kafka class in Flink v1.8, it has the following startUpModes for consumers: .startFromEarliest() .startFromLatest() .startFromSpecificOffsets(...) However, it does not have a method to support starting from a Timestamp. The FlinkKafkaCon

Re: Operators resource requirements on K8s Flink session cluster

2019-12-18 Thread Yang Wang
Hi Michaël, Glad to hear that you are going to run Flink workload on Kubernetes. AFAIK, we have two deployment ways. 1. Running Flink standalone session/per-job cluster on K8s. You need to calculate how many taskmanagers you need and the per taskmanager. All the taskmanager will be started by a K

Re: Rich Function Thread Safety

2019-12-18 Thread Zhu Zhu
Hi Aaron, It is thread safe since the state snapshot happens in the same thread with the user function. Thanks, Zhu Zhu Aaron Langford 于2019年12月19日周四 上午11:25写道: > Hello Flink Community, > > I'm hoping to verify some understanding: > > If I have a function with managed state, I'm wondering if a

Rich Function Thread Safety

2019-12-18 Thread Aaron Langford
Hello Flink Community, I'm hoping to verify some understanding: If I have a function with managed state, I'm wondering if a checkpoint will ever be taken while a function is mutating state. I'll try to illustrate the situation I'm hoping to be safe from: Happy Path: t0 -> processFunction invoked

Re: [Question] How to use different filesystem between checkpointdata and user data sink

2019-12-18 Thread ouywl
Hi Piotr Nowojski,   I have move my filesystem plugin to FLINK_HOME/pulgins in flink 1.9.1. The jobmanage don’t start up ,and It load the filesystem plugin in my owner plugin jar . and the log is :  “2019-12-19 10:58:32,394 WARN org.apache.flink.configuration.Configurat

Re: [DISCUSS] What parts of the Python API should we focus on next ?

2019-12-18 Thread jincheng sun
Also CC user-zh. Best, Jincheng jincheng sun 于2019年12月19日周四 上午10:20写道: > Hi folks, > > As release-1.10 is under feature-freeze(The stateless Python UDF is > already supported), it is time for us to plan the features of PyFlink for > the next release. > > To make sure the features supported in

Re: How to convert retract stream to dynamic table?

2019-12-18 Thread Kurt Young
Hi James, If I understand correctly, you can use `TableEnvironment#sqlQuery` to achieve what you want. You can pass the whole sql statement in and get a `Table` back from the method. I believe this is the table you want which is semantically equivalent with the stream you mentioned. For example,

Re: MiniCluster with ProcessingTimeTrigger

2019-12-18 Thread Biao Liu
Hi John, The critical issue of your test case is that it's a finite streaming job. The mini cluster or distributed cluster does not matter. When the job is finishing, there are some windows not triggered yet. The current behavior is dropping these windows. It's acceptable from the perspective of

[DISCUSS] What parts of the Python API should we focus on next ?

2019-12-18 Thread jincheng sun
Hi folks, As release-1.10 is under feature-freeze(The stateless Python UDF is already supported), it is time for us to plan the features of PyFlink for the next release. To make sure the features supported in PyFlink are the mostly demanded for the community, we'd like to get more people involved

Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread vino yang
Hi Utopia, IMO, your analysis is correct. Best, Vino Utopia 于2019年12月19日周四 上午12:44写道: > Hi Vino, > > Maybe it is due to the type of window. What I used is > ProcessingTimeSessionWindows, while keyedState is scoped to *window and > key*. Window changes so that the ValueState is different. > > B

Need guidance on a use case

2019-12-18 Thread Eva Eva
Hi Team, I'm trying Flink for the first time and encountered an issue that I would like to discuss and understand if there is a way to achieve my use case with Flink. *Use case:* I need to perform unbounded stream joins on multiple data streams by listening to different Kafka topics. I have a sce

Re: MiniCluster with ProcessingTimeTrigger

2019-12-18 Thread John Morrow
Thanks Biao! I tried slowing down the input stream by replacing the env.fromCollection() with a custom SourceFunction (below) which drip feeds the data a bit slower. By the way, in my real scenario the datasource for the pipeline will be a RabbitMQ source. I do get better results, but it seem

Re: MapState with List Type for values

2019-12-18 Thread Yun Tang
Hi Aaron There exists a runtime key which acts in the keyBy action, and one map-key in your map state. Generally speaking, the runtime key is not the same as the map-key. If you could store your emoji as the map-key in your state, no list state is necessary. The basic idea is a bit like join im

Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-18 Thread M Singh
Hi: I am using AWS EMR with Flink application and two of the job managers are running on the same host.  I am looking at the metrics documentation (Apache Flink 1.9 Documentation: Metrics) and and see the following:  | | | | Apache Flink 1.9 Documentation: Metrics | | | - metr

Re: MapState with List Type for values

2019-12-18 Thread Aaron Langford
So the suggestion as I read it is to have some kind of shared queue for all waiting records. This allows for use of ListState, and cheap appends. Then the map state counts how many of each record is queued. When I finally get a record that allows me to remove elements from the queue, I can iterat

How to convert retract stream to dynamic table?

2019-12-18 Thread James Baker
Hi! I've been looking at Flink for the last few days and have very much appreciated the concept of Dynamic Tables, it solves a lot of my needs and handles a lot of the complex state tracking that is otherwise painful. I have a question about the composability of the system which the docs don't a

Operators resource requirements on K8s Flink session cluster

2019-12-18 Thread Michaël Melchiore
Hello, I plan to run topologies on a Flink session cluster on Kubernetes. In my topologies, operators will have varying resource requirements in term of CPU and RAM. How can I make these informations available from Flink to Kubernetes so the latter takes it into account to optimize its deployment

Re: MapState with List Type for values

2019-12-18 Thread Yun Tang
Hi Aaron You cannot set up a map state whose value is a list state, but you can set its value as a list. However, I think that would also suffer in serialize/deserialize when appending the list as value. What is the KEY in your map state? If you could use emoji as your KEY, and you could act l

Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread Utopia
Hi Vino, Maybe it is due to the type of window. What I used is ProcessingTimeSessionWindows, while keyedState is scoped to window and key. Window changes so that the ValueState is different. Best  regards Utopia 在 2019年12月18日 +0800 22:30,Utopia ,写道: > Hi Vino, > > Thanks for your reply ! > > Th

Re: Restore metrics on broadcast state after restart

2019-12-18 Thread Yun Tang
Hi Gaël You can try initializeState [1] to initialize your metrics values from states when restoring from a checkpoint. context.getOperatorStateStore().getBroadcastState() could visit your restored broadcast state. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/s

Re: [DISCUSS] Drop vendor specific repositories from pom.xml

2019-12-18 Thread Robert Metzger
I guess we are talking about this profile [1] in the pom.xml? +1 to remove. I'm not sure if we need to rush this for the 1.10 release. The profile is not doing us any harm at the moment. [1]https://github.com/apache/flink/blob/master/pom.xml#L1035 On Wed, Dec 18, 2019 at 4:51 PM Till Rohrmann

Re: [Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread Piotr Nowojski
Hi, As Yang Wang pointed out, you should use the new plugins mechanism. If it doesn’t work, first make sure that you are shipping/distributing the plugins jars correctly - the correct plugins directory structure both on the client machine. Next make sure that the cluster has the same correct se

[DISCUSS] Drop vendor specific repositories from pom.xml

2019-12-18 Thread Till Rohrmann
Hi everyone, following the discussion started by Seth [1] I would like to discuss dropping the vendor specific repositories from Flink's parent pom.xml. As building Flink against a vendor specific Hadoop version is no longer needed (as it simply needs to be added to the classpath) and documented,

Re: Questions about taskmanager.memory.off-heap and taskmanager.memory.preallocate

2019-12-18 Thread Ethan Li
Thank you Vino for the information. Best, Ethan > On Dec 17, 2019, at 8:29 PM, vino yang wrote: > > Hi Ethan, > > Share two things: > > I have found "taskmanager.memory.preallocate" config option has been removed > in the master codebase. > After researching git history, I found the descrip

Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread Utopia
Hi Vino, Thanks for your reply ! The key of my input data is same value. So I think there is only one partition. And Why sometimes I can get the value stored in the ValueState before update? > > > > before update value : 3 > > > > after update value: 4 What’s more, How can I stored the previous

Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread vino yang
Hi Utopia, The behavior may be correct. First, the default value is null. It's the correct value. `ValueStateDescriptor` has multiple constructors, some of them can let you specify a default value. However, these constructors are deprecated. And the doc does not recommend them.[1] For the other c

Re: Deploying Operator on concrete Task Manager

2019-12-18 Thread KristoffSC
Hi, thanks for the replay. Just to clarify, I will have to have *a new Flink Cluster* (Job Manager and Task Manager) that will run in the secure zone which will ran the AsyncEnrich Job right? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Deploying Operator on concrete Task Manager

2019-12-18 Thread Zhu Zhu
Hi KristoffSC, Flink does not support specifying the TM for tasks. So I think you need to launch a separate job to do the "AsyncCall + map" in the secured zone. Thanks, Zhu Zhu KristoffSC 于2019年12月18日周三 下午8:04写道: > Hi, > I have a question regarding job/operator deployment on Task Managers. > >

Deploying Operator on concrete Task Manager

2019-12-18 Thread KristoffSC
Hi, I have a question regarding job/operator deployment on Task Managers. If I understand correctly, my job will be spitted into individual tasks, which will be "deployed and executed" on particular task slot/s of Task Manager (depending on parallelism level of course). Lets imagine I have a Job

Re: [Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread Yang Wang
You could have a try the new plugin mechanism. Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put your filesystem related jars in it. Different plugins will be loaded by separate classloader to avoid conflict. Best, Yang vino yang 于2019年12月18日周三 下午6:46写道: > Hi ouywl,

Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread Utopia
Hi, I want to get the last value stored in ValueState when processing element in Trigger. But as the log shows that sometimes I can get the value, sometimes not. Only one key in my data(SensorReading). ValueState: class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] { privat

Re: [Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread vino yang
Hi ouywl, *>>Thread.currentThread().getContextClassLoader();* What does this statement mean in your program? In addition, can you share your implementation of the customized file system plugin and the related exception? Best, Vino ouywl 于2019年12月18日周三 下午4:59写道: > Hi all, > We have im

Re: How to reprocess certain events in Flink?

2019-12-18 Thread Zhu Zhu
Hi Pooja, My main confusion is, if 2 events have the same transaction_id, how can we tell if it is a wanted one for value updates, or it is an unwanted duplicate? MapState with a TTL can be used for deduplicating, if it is supposed that a duplicated event would not happen too late after the origi

[Question] How to use different filesystem between checkpoint data and user data sink

2019-12-18 Thread ouywl
Hi all,    We have implemented a filesystem plugin for sink data to hdfs1, and the yarn for flink running is used hdfs2. So when the job running, the jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin  is conflict with flink component.     We im