Renaming the metrics

2020-06-22 Thread Ori Popowski
I have two Flink clusters sending metrics via Prometheus and they share all the metric names (i.e. flink_taskmanager_job_task_operator_currentOutputWatermark). I want to change the flink_ prefix to something else to distinguish between the clusters (maybe the job-name). How can I do it? Thanks.

[ANNOUNCE] Weekly Community Update 2020/25

2020-06-22 Thread Konstantin Knauf
Dear community, happy to share this week's community update: release testing for Flink 1.11.0 is slowly converging, and the first feature discussions for the upcoming release cycle are coming up. Flink Development == * [releases] The community has published another non-voting release

Flink(1.8.3) UI exception is overwritten, and the cause of the failure is not displayed

2020-06-22 Thread Andrew
versin: 1.8.3graph: source -> map -> sink Scenes??  source subtask failed causes the graph to restart, but the exception displayed on the flink UI is not the cause of the task failure displayed?? JM log: 020-06-22 14:29:01.087 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Jo

Re:Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-22 Thread jack
您好,jincheng老师,我已经验证了您提供的这种分开处理的逻辑,可以解决我的问题,非常感谢您的解惑 Best, Jack 在 2020-06-22 14:28:04,"jincheng sun" 写道: 您好,jack: Table API 不用 if/else 直接用类似逻辑即可: val t1 = table.filter('x > 2).groupBy(..) val t2 = table.filter('x <= 2).groupBy(..) t1.insert_into("sink1) t2.insert_into("sink2")

what to consider when testing a data stream application using the TPC-H benchmark data?

2020-06-22 Thread Felipe Gutierrez
Hi all, I would like to create some data stream queries tests using the TPC-H benchmark. I saw that there are some examples of TPC Q3[1] and Q10[2], however, they are using DataSet. If I consider creating these queries but using DataStream what are the caveats that I have to ensure when implementi

Problems with type erasure

2020-06-22 Thread Vincenzo Pronestì
Hi there, I need to execute the following code: 72: KeyedStream, String> keyedDelays = delays 73: .flatMap(new Query1FlatMap())74: .keyBy(item -> item.f0); but I keep getting this error message: The program finished with the following exception: The return type

Re: Problems with type erasure

2020-06-22 Thread Yun Gao
Hi Vincenzo: Could you also attach the codes before line 72, namely how `delays` is defined ? Since the exception says the return type of "Custom Source" could not be defined, and I think it should refer to `delays`, and the exception is thrown when an operator is called on `delays` and Fli

Re: Problems with type erasure

2020-06-22 Thread Arvid Heise
Hi Vincenzo, the preferred way to get the type information for tuples is to use org.apache.flink.api.common.typeinfo.Types. For Tuple2, Integer>, you'd perform Types.TUPLE(Types.TUPLE(Types.STRING, Types.STRING), Types.INT) Nested tuples are not an issue in general. On Mon, Jun 22, 2020 at 2:18

Re: what to consider when testing a data stream application using the TPC-H benchmark data?

2020-06-22 Thread Arvid Heise
Hi Felipe, The examples are pretty old (6 years), hence they still use DataSet. You should be fine by mostly replacing sources with file sources (no need to write your own source, except you want to generators) and using global windows for joining. However, why not use SQL for TPC-H? We have an

Re: Flink(1.8.3) UI exception is overwritten, and the cause of the failure is not displayed

2020-06-22 Thread Arvid Heise
Hi Andrew, this looks like your Flink cluster has a flaky connection to the Kafka cluster or your Kafka cluster was down. Since the operator failed on the sync part of the snapshot, it resorted to failure to avoid having inconsistent operator state. If you configured restarts, it just restart fro

Re: Renaming the metrics

2020-06-22 Thread Arvid Heise
Hi Ori, I see that the PrometheusPushGatewayReporter [1] has an option for a job name, maybe you can use that. I'm also including Chesnay who probably has more ideas. [1] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricspro

Re: Unaligned Checkpoint and Exactly Once

2020-06-22 Thread Arvid Heise
Hi Lu, Thank you for your interest in unaligned checkpoints! I just published some PRs that will warn you if you set both unaligned checkpoints and AT_LEAST_ONCE. It's indeed not possible or even meaningful to use them at the same time. AT_LEAST_ONCE has no alignment phase, so it's faster than bo

Re: [DISCUSS] Upgrade HBase connector to 2.2.x

2020-06-22 Thread Arvid Heise
If we support both HBase 1 and 2, maybe it's a good time to pull them out to Bahir and list them in flink-packages to avoid adding even more modules to Flink core? On Mon, Jun 22, 2020 at 4:05 AM OpenInx wrote: > Hi > > According to my observation in the hbase community, there are still lots > o

Re: Why side-outputs are only supported by Process functions?

2020-06-22 Thread Arvid Heise
Hi Ivneet, Q1) you can read about the deprecation of split in FLINK-11084 [1]. In general side-outputs subsume the functionality and allow some advanced cases (like emitting the same record into two outputs). Q2) It's simply a matter of API design. The basic idea is to keep most interfaces as sle

Re: State backend considerations

2020-06-22 Thread Arvid Heise
Hi Nick, Both questions are hard to answer given that it depends on your hardware, access patterns (read/update), record size/structure, parallelism, and probably a ton of other parameters. The usual approach is to simply evaluate it in your setting. Since it's a matter of configuration, you can

Re: [DISCUSS] Upgrade HBase connector to 2.2.x

2020-06-22 Thread Gyula Fóra
If we were to go the bahir route, I don't see the point in migrating the 1.4.x version there since that's already available in Flink. To me that is almost the same as dropping explicit support for 1.4 and telling users to use older connector versions if they wish to keep using it. If we want to ke

Re: what to consider when testing a data stream application using the TPC-H benchmark data?

2020-06-22 Thread Felipe Gutierrez
Hi Arvid, thanks for the references. I didn't find those tests before. I will definitely consider them to test my application. The thing is that I am testing a pre-aggregation stream operator that I have implemented. Particularly I need a high workload to create backpressure on the shuffle phase,

Re: Renaming the metrics

2020-06-22 Thread Chesnay Schepler
There's currently no way to change this. A related enhancement was proposed on FLINK-17495 that would at least allow you to attach a custom label, but the initial implementation wasn't general enough. On 22/06/2020 15:08, Arvid Heise wrote: Hi Ori, I see that the PrometheusPushGatewayReport

Re: what is the "Flink" recommended way of assigning a backfill to an average on an event time keyed windowed stream?

2020-06-22 Thread Arvid Heise
Hi Marco, That's a lot of code to digest. So I'm sorry if I did get something wrong. >From your example, it looks like you want to use the average within a tumble window. If no record for a particular key has been emitted in that time, you want to repeat the last value. I'd use a dummy record to

Re: Problems with type erasure

2020-06-22 Thread Vincenzo Pronestì
Hi Yun, after reading your message I checked the source and managed to fix the problem. So thank you Yun. In case someone has the same problem. The source is a Kafka Consumer and as such it need a class that implements DeserializationSchema. One of the required methods is getProducedType. In

Re: Submitted Flink Jobs EMR are failing (Could not start rest endpoint on any port in port range 8081)

2020-06-22 Thread Arvid Heise
Hi Sateesh, the solution still applies, there are not all entries listed in the conf template. >From what you have written, it's most certainly that the first jobs are not finished (hence port is taken). Make sure you don't use the detached mode when submitting. You can see the status of the jobs

Re: Interact with different S3 buckets from a shared Flink cluster

2020-06-22 Thread Arvid Heise
Hi Ricardo, one option is to use s3p for checkpointing (Presto) and s3a for custom applications and attach different configurations. In general, I'd recommend to use a cluster per application to exactly avoid such issues. I'd use K8s and put the respective IAM roles on each application pod (e.g.

Re: Unable to run flink job in dataproc cluster with jobmanager provided

2020-06-22 Thread Chesnay Schepler
Is your user-jar packaging and relocating Flink classes? If so, then your job actually operate against the classes provided by the cluster, which, well, just wouldn't work. On 18/06/2020 09:34, Sourabh Mehta wrote: Hi , application is using 1.10.0 but cluster is setup on 1.9.0. Yes I do have

Re: what to consider when testing a data stream application using the TPC-H benchmark data?

2020-06-22 Thread Arvid Heise
If you are interested in measuring performance, you should also take a look at our benchmark repo [1] and particular the Throughput job [2]. [1] https://github.com/dataArtisans/performance [2] https://github.com/dataArtisans/performance/blob/master/flink-jobs/src/main/java/com/github/projectflink/

Re: Renaming the metrics

2020-06-22 Thread Ori Popowski
Thanks for answering. Unrelated to Flink, but if anyone knows a way to rename the metrics inside Prometheus I'd appreciate if you can share. About the push gateway - I think I'll stick with the pull options because it looks like a better fit to the use case On Mon, Jun 22, 2020 at 4:47 PM Chesna

Re: [EXTERNAL] Re: Renaming the metrics

2020-06-22 Thread Slotterback, Chris
Hi Ori, Another more temporary brute-force option, while not officially flink, could be building a modified version of the metrics plugin into flink where you manually manipulate the prefixes yourself. It’s actually pretty easy to build the jar, and to test it you drop the jar into the plugin p

Rocksdb state directory path in EMR

2020-06-22 Thread Sudan S
Hi, I have enabled rocksdb(State store) with s3 (external checkpoint) on EMR . I am using rocksdb as state store with ValueState and checkpoints are stored in s3. I am able to see checkpoints in s3 and functionality with respect to state store is working fine. But i am trying to dissect rocksdb

Re: Trouble with large state

2020-06-22 Thread Jeff Henrikson
Bhaskar, I think I am unstuck. The performance numbers I sent after throttling were due to a one character error in business logic. I think I now have something good enough to work with for now. I will repost if I encounter further unexpected issues. Adding application-level throttling en

[no subject]

2020-06-22 Thread 王宇
Hi, all some error occurred when I run flink in minicluster, flink-version:1.11、scala-version:2.12.0. Error:(33, 41) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)] val solutionInput = env.fromElements((1, "1

Re: Submitted Flink Jobs EMR are failing (Could not start rest endpoint on any port in port range 8081)

2020-06-22 Thread Yang Wang
Hi Sateesh, if the "rest.port" or "rest.bind-port" is configured explicitly, it will be used to start the rest server. So you need to remove them from the flink-conf.yaml or configure them to "0" or port range(50100-50200). By default, "flink run" will always start a dedicated Flink cluster for ea

Re: TypeInformation not found

2020-06-22 Thread Yun Gao
Hi yu, Have you add "import org.apache.flink.api.scala._"? It seems should be ok if the import has been added in the program: import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object Test { def main(args: Array[String]): Unit = {

Re: Flink Stream job to parquet sink

2020-06-22 Thread aj
I am stuck on this . Please give some suggestions. On Tue, Jun 9, 2020, 21:40 aj wrote: > please help with this. Any suggestions. > > On Sat, Jun 6, 2020 at 12:20 PM aj wrote: > >> Hello All, >> >> I am receiving a set of events in Avro format on different topics. I want >> to consume these and

Re: Trouble with large state

2020-06-22 Thread Vijay Bhaskar
Jeff Glad to know that you are able to progress well and issue got resolved Regards Bhaskar On Tue, Jun 23, 2020 at 12:24 AM Jeff Henrikson wrote: > Bhaskar, > > I think I am unstuck. The performance numbers I sent after throttling > were due to a one character error in business logic. I thin

two phase aggregation

2020-06-22 Thread Fanbin Bu
Hi, Does over window aggregation support two-phase mode? https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html#table-optimizer-agg-phase-strategy SELECT user_id , event_time , listagg(event_type, '*') over w as names FROM table WINDOW w AS ( PARTITION BY user_id ORD

Re: two phase aggregation

2020-06-22 Thread Jark Wu
Hi Fanbin, Currently, over window aggregation doesn't support two-phase optimization. Best, Jark On Tue, 23 Jun 2020 at 12:14, Fanbin Bu wrote: > Hi, > > Does over window aggregation support two-phase mode? > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html#table-

Re: two phase aggregation

2020-06-22 Thread Fanbin Bu
Jark, thanks for the reply. Do you know whether it's on the roadmap or what's the plan? On Mon, Jun 22, 2020 at 9:36 PM Jark Wu wrote: > Hi Fanbin, > > Currently, over window aggregation doesn't support two-phase optimization. > > Best, > Jark > > On Tue, 23 Jun 2020 at 12:14, Fanbin Bu wrote:

Re: two phase aggregation

2020-06-22 Thread Jark Wu
AFAIK, this is not on the roadmap. The problem is that it doesn't get much improvement for over window aggregates. If we support two-phase for over window aggregate, the local over operator doesn't reduce any data, it has to emit the same number of records it received, and can't reduce pressure of

Re: Rocksdb state directory path in EMR

2020-06-22 Thread Dawid Wysakowicz
Hi, If I understand you correctly, you want to check the local RocksDB files, right? They are stored locally on each TaskManager in a temporary directory. This can be configured via "state.backend.rocksdb.localdir"[1]. If not specified it will use the globally defined temporary directory set via