Questions regarding broadcast join in Flink

2021-09-09 Thread Gerald.Sula
Hello, I am trying to implement a broadcast join of two streams in flink using the broadcast functionality. In my usecase I have a large stream that will be enriched with a much smaller stream. In order to first test my approach, I have adapted the Taxi ride exercise in the official training rep

How to mount PVC volumes using Flink Native Kubernetes ?

2021-09-09 Thread Xiaolong Wang
Hi, I'm facing a tough question. I want to start a Flink Native Kubernetes job with each of the task manager pod mounted with an aws-ebs PVC. The first thought is to use the pod-template file to do this, but it soon went to a dead end. Since the pod-template on each of the task manager pod i

Re: Documentation for deep diving into flink (data-streaming) job restart process

2021-09-09 Thread Robert Metzger
Thanks for the log. >From the partial log that you shared with me, my assumption is that some external resource manager is shutting down your cluster. Multiple TaskManagers are disconnecting, and finally the job is switching into failed state. It seems that you are not stopping only one TaskManger

Re: Job crashing with RowSerializer EOF exception

2021-09-09 Thread Yuval Itzchakov
Hi Robert, There's no custom Kryo serializer. It's a RowSerializer that is generating the output of a Table -> DataStream conversion. On Thu, Sep 9, 2021, 21:42 Robert Metzger wrote: > Hi Yuval, > > EOF exceptions during serialization are usually an indication that some > serializers in the ser

Re: Usecase for flink

2021-09-09 Thread JING ZHANG
Hi Dipanjan, Base your description, I think Flink could handle this user case. Don't worry that Flink can't handle this kind of data scale because Flink is a distributed engine. As long as the problem of data skew is carefully avoided, the input throughput can be handled through appropriate resourc

Re: State processor API very slow reading a keyed state with RocksDB

2021-09-09 Thread Yun Tang
Hi David, I think Seth had shared some useful information. If you want to know what happened within RocksDB when you're reading, you can leverage async-profiler [1] to catch the RocksDB stacks and I guess that index block might be evicted too frequently during your read. And we could use new r

Usecase for flink

2021-09-09 Thread Dipanjan Mazumder
Hi,    I am working on a usecase and thinking of using flink for the same. The use case is i will be having many large resource graphs , i need to parse that graph for each node and edge and evaluate each one of them against some suddhi rules , right now the implementation for evaluating individ

Re: Documentation for deep diving into flink (data-streaming) job restart process

2021-09-09 Thread Puneet Duggal
Hi, Please find attached logfile regarding job not getting restarted on another task manager once existing task manager got restarted. Just FYI - We are using Fixed Delay Restart (5 times, 10s delay) On Thu, Sep 9, 2021 at 4:29 PM Robert Metzger wrote: > Hi Puneet, > > Can you provide us with

Re: Job manager crash

2021-09-09 Thread mejri houssem
thanks for the response, with respect to the api-server i don't think i can do so much about it because i am just using a specific namespace in kubernetes cluster, it's not me who administrate the cluster. otherwise i will try the gc log option to see if can find something useful in order to debu

Re: Job manager crash

2021-09-09 Thread houssem
Hello , with respect to the api-server i dotn re On 2021/09/09 11:37:49, Yang Wang wrote: > I think @Robert Metzger is right. You need to check > whether your Kubernetes APIServer is working properly or not(e.g. > overloaded). > > Another hint is about the fullGC. Please use the following con

Re: Issue while creating Hive table from Kafka topic

2021-09-09 Thread Robert Metzger
Does the jar file you are trying to submit contain the org/apache/kafka/common/serialization/ByteArrayDeserializer class? On Thu, Sep 9, 2021 at 2:10 PM Harshvardhan Shinde < harshvardhan.shi...@oyorooms.com> wrote: > Here's the complete stack trace: > > Server Response:org.apache.flink.runtime.r

Re: Job crashing with RowSerializer EOF exception

2021-09-09 Thread Robert Metzger
Hi Yuval, EOF exceptions during serialization are usually an indication that some serializers in the serializer chain is somehow broken. What data type are you serializating? Does it include some type serializer by a custom serializer, or Kryo, ... ? On Thu, Sep 9, 2021 at 4:35 PM Yuval Itzchakov

Re: State processor API very slow reading a keyed state with RocksDB

2021-09-09 Thread Seth Wiesman
Hi David, I was also able to reproduce the behavior, but was able to get significant performance improvements by reducing the number of slots on each TM to 1. My suspicion, as Piotr alluded to, has to do with the different runtime execution of DataSet over DataStream. In particular, Flink's DataS

DataStreamAPI and Stateful functions

2021-09-09 Thread Barry Higgins
Hi, I'm looing at using the DataStream API from a Flink application against a remote python stateful function deployed on another machine. I would like to investigate how feasible it is to have all of the state management being handled from the calling side meaning that we don't need another in

Job crashing with RowSerializer EOF exception

2021-09-09 Thread Yuval Itzchakov
Hi, Flink 1.13.2 Scala 2.12.7 Running an app in production, I'm running into the following exception that frequently fails the job: switched from RUNNING to FAILED with failure cause: java.io.IOException: Can't get next record for channel InputChannelInfo{gateIdx=0, inputChannelIdx=2}\n\tat org.

Re: Duplicate copies of job in Flink UI/API

2021-09-09 Thread Chesnay Schepler
I'm afraid there's no real workaround. If the information for completed jobs isn't important to you then setting jobstore.expiration-time to a low value can reduce the impact, or setting jobstore.max-capacity to 0 would prevent any completed job from being displayed. Beyond that I can't thin

Re: Duplicate copies of job in Flink UI/API

2021-09-09 Thread Peter Westermann
Thanks Chesnay. You are understanding this correctly. Your explanation makes sense to me. Is there anything we can do to prevent this? At least for us, most times a leader election happens, the leader doesn’t actually change because the jobmanager is still healthy. Thanks, Peter From: Chesn

Re: Duplicate copies of job in Flink UI/API

2021-09-09 Thread Chesnay Schepler
Just to double-check that I'm understanding things correctly: You have a job with HA, then Zookeeper breaks down, the job gets suspended, ZK comes back online, and the _same_ JobManager becomes the leader? If so, then I can explain why this happens and hopefully reproduce it. In short, when

Re: Issue while creating Hive table from Kafka topic

2021-09-09 Thread Harshvardhan Shinde
Here's the complete stack trace: Server Response:org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application. at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108) at java.util.concurrent.CompletableFuture.uniHand

Re: Issue while creating Hive table from Kafka topic

2021-09-09 Thread Robert Metzger
Can you share the full stack trace, not just a part of it? On Thu, Sep 9, 2021 at 1:43 PM Harshvardhan Shinde < harshvardhan.shi...@oyorooms.com> wrote: > Hi, > > I added the dependencies while trying to resolve the same issue, thought I > was missing them. > > Thanks > > On Thu, Sep 9, 2021 at 4

Re: Allocation-preserving scheduling and task-local recovery

2021-09-09 Thread Robert Metzger
Hi, from my understanding of the code [1], the task scheduling first considers the state location, and then uses the evenly spread out scheduling strategy as a fall back. So in my understanding of the code, the local recovery should have preference over the evenly spread out strategy. If you can e

Re: Issue while creating Hive table from Kafka topic

2021-09-09 Thread Harshvardhan Shinde
Hi, I added the dependencies while trying to resolve the same issue, thought I was missing them. Thanks On Thu, Sep 9, 2021 at 4:26 PM Robert Metzger wrote: > Hey, > > Why do you have these dependencies in your pom? > > > > org.apache.kafka > kafka-cli

Re: Job manager crash

2021-09-09 Thread Yang Wang
I think @Robert Metzger is right. You need to check whether your Kubernetes APIServer is working properly or not(e.g. overloaded). Another hint is about the fullGC. Please use the following config option to enable the GC logs and check the full gc time. env.java.opts.jobmanager: -verbose:gc -XX:+

Re: Duplicate copies of job in Flink UI/API

2021-09-09 Thread Peter Westermann
Hi Piotr, Jobmanager logs are attached to this email. The only thing that jumps out to me is this: 09/08/2021 09:02:26.240 -0400 ERROR org.apache.flink.runtime.history.FsJobArchivist Failed to archive job. java.io.IOException: File already exists:s3p://flink-s3-bucket/history/2db4ee6397151a1

Re: Documentation for deep diving into flink (data-streaming) job restart process

2021-09-09 Thread Robert Metzger
Hi Puneet, Can you provide us with the JobManager logs of this incident? Jobs should not disappear, they should restart on other Task Managers. On Wed, Sep 8, 2021 at 3:06 PM Puneet Duggal wrote: > Hi, > > So for past 2-3 days i have been looking for documentation which > elaborates how flink t

Re: Issue while creating Hive table from Kafka topic

2021-09-09 Thread Robert Metzger
Hey, Why do you have these dependencies in your pom? org.apache.kafka kafka-clients 2.8.0 org.apache.kafka kafka_2.12 2.8.0 They are not needed for using the Kafka connector of

Re: Job manager crash

2021-09-09 Thread Robert Metzger
Is the kubernetes server you are using particularly busy? Maybe these issues occur because the server is overloaded? "Triggering checkpoint 2193 (type=CHECKPOINT) @ 1630681482667 for job ." "Completed checkpoint 2193 for job (474 byt

Issue while creating Hive table from Kafka topic

2021-09-09 Thread Harshvardhan Shinde
Hi, I'm trying a simple flink job that reads data from a kafka topic and creates a Hive table. I'm following the steps from here . Here's my code: import org.apache.flink.table