Re: Flink kafka - Message Prioritization

2020-11-03 Thread Robert Metzger
Hi Vignesh, I'm adding Aljoscha to the thread, he might have an idea how to solve this with the existing Flink APIs (the closest idea I had was the N-ary stream operator, but I guess that doesn't support backpressuring individual upstream operators -- side inputs would be needed for that?) The on

Re: JM upload files to blob server is slow

2020-11-03 Thread Arvid Heise
A jar upload shouldn't take minutes. There are two possibilities that likely co-occured: - your jar is much bigger than needed. Did you make sure that you don't put Flink into the fatjar? That's counterproductive on many levels. Please check the jar size. - your connection to the JM is bad. Where i

Re: Manage multiple jobs in Flink

2020-11-03 Thread Robert Metzger
Hi Alexandru, 1. You can either create a Flink cluster per job (preferred), or use one big cluster to run all your jobs. This depends a bit on the resource manager you are using, and the workloads you are planning to process. If you are using Kubernetes, it makes sense to deploy each job separatel

JM upload files to blob server is slow

2020-11-03 Thread forideal
Hello my friend: My line of code runs very slowly. What are the possibilities? code: CompletableFuture jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServe

Re: Question about processing a 3-level List data type in parquet

2020-11-03 Thread Jingsong Li
Hi Naehee, sorry for the late reply. I think you are right, there are bugs here. We didn't think about nested structures very well before. Now we mainly focus on the new BulkFormat implementation, which we need to consider when implementing the new ParquetBulkFormat. Best, Jingsong On Tue, Nov

Re: Filter By Value in List

2020-11-03 Thread Rex Fenley
None of the following appear to work either. Flink 1.11.2, Scala 2.12. table.filter("apple".in(List("apple"))) [info] org.apache.flink.table.api.ValidationException: IN operator on incompatible types: String and ObjectArrayTypeInfo. table.filter("apple".in(java.util.Arrays.asList("apple"))) [in

Re: Filter By Value in List

2020-11-03 Thread Rex Fenley
Using a custom serializer to make sure I'm using a List does not help. [info] org.apache.flink.table.api.ValidationException: IN operator on incompatible types: String and List. On Tue, Nov 3, 2020 at 12:44 PM Rex Fenley wrote: > For clarification, I'm using Pojo and operating on a column of

Re: Filter By Value in List

2020-11-03 Thread Rex Fenley
For clarification, I'm using Pojo and operating on a column of this type public java.util.List fruits adding the following annotation does not help @DataTypeHint("ARRAY") On Mon, Nov 2, 2020 at 7:02 AM Aljoscha Krettek wrote: > I believe this is happening because the type system does not recogn

Re: I have some interesting result with my test code

2020-11-03 Thread Robert Metzger
Hi Kevin, thanks a lot for posting this problem. I'm adding Jark to the thread, he or another committer working on Flink SQL can maybe provide some insights. On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon wrote: > Looks like the event time that I've specified in the consumer is not being > respected.

Re: Is a flink-1.11.2 compiled job run on flink-1.11.0 cluster?

2020-11-03 Thread Arvid Heise
Hi Marco, to add to Robert's answer. Minor releases are mostly and foremost used as bugfixes (actually from semantic versioning guideline, the last digit indicates a bugfix release). So there are no API changes between 1.11.0 and 1.11.2. However, sometimes bugfixes require some changes of underlyi

Re: Good tutorial troubleshoot and reading logs

2020-11-03 Thread Robert Metzger
Hi Noah, sadly there's no generic guide on how to approach Flink logs. What exactly do you mean by "the job hangs"? Did you verify via the metrics that it is not making any progress anymore at all? If so, are all operators affected, or just some? If your Flink cluster really is stuck, and you are

Re: how to enable metrics in Flink 1.11

2020-11-03 Thread Robert Metzger
Hey Diwakar, the logs you are providing still don't contain the full Flink logs. You can not stop the Flink on YARN using "yarn app -stop application_1603649952937_0002". To stop Flink on YARN, use: "yarn application -kill ". On Sat, Oct 31, 2020 at 6:26 PM Diwakar Jha wrote: > Hi, > > I wan

Re: Increase in parallelism has very bad impact on performance

2020-11-03 Thread Arvid Heise
Hi Sidney, you might recheck your first message. Either it's incorrectly written or you are a victim of a fallacy. With 1 slot, you have 1.6K events per slot = 1.6K overall. With parallelism 5, you have 1.2K events per slot, so 6K overall. That's a decent speedup. With 10, you still have 6K overa

Re: Native kubernetes setup

2020-11-03 Thread Boris Lublinsky
Thanks a lot, This helped a lot. And I did make it work. It probably would of help if documentation, explicitly gave an example of role/rolebinding, something like: kubectl apply -f - < On Nov 3, 2020, at 7:02 AM, Yang Wang wrote: > > You could follow the guide[1] here to output the logs to the

Re: Error while retrieving the leader gateway after making Flink config changes

2020-11-03 Thread Robert Metzger
Thanks a lot for providing the logs. My theory of what is happening is the following: 1. You are probably increasing the memory for the JobManager, when changing the jobmanager.memory.flink.size configuration value 2. Due to this changed memory configuration, Kubernetes, Docker or the Linux kerne

Re: What does Kafka Error sending fetch request mean for the Kafka source?

2020-11-03 Thread Robert Metzger
Thanks a lot. Just a clarification, it's not the Kafka source that is configured AT_LEAST_ONCE, it is the Flink checkpointing mode as a whole, for all operations. This has no effect on regular operations, only on recovery records may be send multiple times... but it leads to lower latency. I guess

Re: Error while retrieving the leader gateway after making Flink config changes

2020-11-03 Thread Claude M
Thanks for your reply Robert. Please see attached log from the job manager, the last line is the only thing I see different from a pod that starts up successfully. On Tue, Nov 3, 2020 at 10:41 AM Robert Metzger wrote: > Hi Claude, > > I agree that you should be able to restart individual pods w

Re: What does Kafka Error sending fetch request mean for the Kafka source?

2020-11-03 Thread John Smith
Kafka source is configured as AT_LEAST_ONCE and the JDBC sink handles duplicates with unique key/constraint and logs duplicates in a separate SQL table. And essentially it gives us EXACTLY_ONCE semantics. That's not a problem, it works great! 1- I was curious if that specific Kafka message was th

Re: Run Flink Job with Confluent Schema Registry over SSL

2020-11-03 Thread Robert Metzger
Hi Patrick, The upcoming Flink 1.12 release will update the version to 5.4.2 at least: https://github.com/apache/flink/pull/12919/files This is closer to what you need, but still not there :( What you can try is compile your own version of flink-avro-confluent-registry, where you pass -Dconfluent

Re: Is Apache Flink suitable for an application where messages are routed to different services

2020-11-03 Thread Raghavendar T S
Hi Thamidu You can very well fit your use case in Flink. Also your use case looks simple based on your post. You can also check Kafka Streams which can run as standalone applications. If your use case is simple, Flink is not necessary. Thank you On Tue, Nov 3, 2020 at 7:42 PM Ardhani Narasimha S

Re: Is a flink-1.11.2 compiled job run on flink-1.11.0 cluster?

2020-11-03 Thread Robert Metzger
Hi, I agree that our docs are not mentioning this anywhere. I would have expected it on this page: https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html. I filed a ticket to address this: https://issues.apache.org/jira/browse/FLINK-19955 The only thing Flink officially guarante

Re: I have some interesting result with my test code

2020-11-03 Thread Kevin Kwon
Looks like the event time that I've specified in the consumer is not being respected. Does the timestamp assigner actually work in Kafka consumers? .withTimestampAssigner(new SerializableTimestampAssigner[Order] { override def extractTimestamp(order: Order, recordTimestamp: Long): Lo

Re: What does Kafka Error sending fetch request mean for the Kafka source?

2020-11-03 Thread Robert Metzger
How did you configure the Kafka source as at least once? Afaik the source is always exactly-once (as long as there aren't any restarts). Are you seeing the duplicates in the context of restarts of the Flink job? On Tue, Nov 3, 2020 at 1:54 AM John Smith wrote: > Sorry, got confused with your re

Re: Error while retrieving the leader gateway after making Flink config changes

2020-11-03 Thread Robert Metzger
Hi Claude, I agree that you should be able to restart individual pods with a changed memory configuration. Can you share the full Jobmanager log of the failed restart attempt? I don't think that the log statement you've posted explains a start failure. Regards, Robert On Tue, Nov 3, 2020 at 2:3

Re: Increase in parallelism has very bad impact on performance

2020-11-03 Thread Sidney Feiner
Hey 🙂 1. I have 150 partitions in the kafka topic 2. I'll check that soon but why doesn't the same happen when I use a smaller parallelism? If that was the reason, I'd expect the same behavior also if I had a parallelism of 5. How does the increase in parallelism, decrease the throughput

Re: Is Apache Flink suitable for an application where messages are routed to different services

2020-11-03 Thread Ardhani Narasimha Swamy
Hi Thamidu, I have been working on flink for the past few months, below is some analysis based on your use case. Few questions 1. What is the volume of data in the context? 2. What is the TPS that you are looking for? 3. Is it fine if the same message gets forwarded to 3rd party service multiple

Is a flink-1.11.2 compiled job run on flink-1.11.0 cluster?

2020-11-03 Thread Marco Villalobos
SITUATION Amazon EMR 6.1 supports Flink 1.11.0 and JDK 11. Docker Flink images support for Flink 1.11.0 only supports JDK 8. Much of the testing that I perform uses Docker and Docker compose. It's much easier for me to develop with Docker with Flink 1.11.2 and JDK 11. QUESTION Would a flink

Run Flink Job with Confluent Schema Registry over SSL

2020-11-03 Thread Patrick Eifler
Hello, I'm running a Flink Session Cluster on K8s and deploy the Flink jobs using the the Flink rest API. The jobs using Avro for the producers and consumers. The jobs consume and produce from/to a secured Kafka cluster via TLS and SCRAM-SHA. Everything works as expected. Now I need to introdu

Is Apache Flink suitable for an application where messages are routed to different services

2020-11-03 Thread Thamidu Muthukumarana
Hello, I'm exploring different options for my use case, which I will describe in detail below. Few of the options considered include apache kafka and flink. Use Case Description When a message comes to the application, it needs to extract the configurations set by the user who authored the messa

Re: Native kubernetes setup

2020-11-03 Thread Yang Wang
You could follow the guide[1] here to output the logs to the console so that it could be accessed via "kubectl logs". And from 1.12. we will make this as default. [1]. https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#log-files Best, Yang Chesnay

Re: Flink 1.11 not showing logs

2020-11-03 Thread Yang Wang
You could issue "ps -ef | grep container_id_for_some_tm". And then you will find the following java options about log4j. -Dlog.file=/var/log/hadoop-yarn/containers/application_xx/container_xx/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.prope

Re: Re: How to use both of SQL and DataStream in 1.11

2020-11-03 Thread Danny Chan
You can still use the .sqlQuery(...) to create a common table there, then converts the table into a DataStream, with this DataStream, you can add the multiple sink functions you like. izual 于2020年11月2日周一 下午5:18写道: > Hi, Danny: > > > Thanks for your help. > > > As in the question, some result wa

Re: Increase in parallelism has very bad impact on performance

2020-11-03 Thread Sidney Feiner
Hey, I just ran a simple consumer that does nothing but consume event event (without aggregating) and every slot handles above 3K per second, and with parallelism set to 15, it succesffully handles 45K events per second Sidney Feiner / Data Platform Developer M: +972.528197720 / Skype: sidney.f

Re: Increase in parallelism has very bad impact on performance

2020-11-03 Thread Arvid Heise
Hi Sidney, there could be a couple of reasons where scaling actually hurts. Let's include them one by one. First, you need to make sure that your source actually supports scaling. Thus, your Kafka topic needs at least as many partitions as you want to scale. So if you want to scale at some point

Re: Configure vvp 2.3 with file blob storage

2020-11-03 Thread Fabian Hueske
Hi Laurent, Thanks for trying out Ververica platform! However, please note that this is the mailing list of the Apache Flink project. Please post further questions using the "Community Edition Feedback" button on this page: https://ververica.zendesk.com/hc/en-us We are working on setting up a bett

Re: unknown process kdevtmpfsi is taking more cpu

2020-11-03 Thread Chesnay Schepler
As far as I understand this issue is only common with Redis because Redis is common and allows remote code execution; it's not really specific to Redis in that sense; I've found similar articles for Apache Solr. If the Flink cluster were accessible from the outside, then the same thing might

Re: Native kubernetes setup

2020-11-03 Thread Chesnay Schepler
1) -Dkubernetes.namespace 2) The -D syntax is actually just a way to specify configurations options from the command-line. As such, the configuration page lists all options. 3) if the diff between the conf

Re: Dependency injection and flink.

2020-11-03 Thread Arvid Heise
Hi Santhosh, Flink does not support automatic DI on task level and there is no immediate plan as of now to support it out-of-the-box. In general, there are quite a few implications of using automatic DI in a distributed setting. For example, how is a singleton supposed to work? Nevertheless, Flink