RE: Any issues with reinterpretAsKeyedStream when scaling partitions?

2021-10-14 Thread Schwalbe Matthias
… didn’t mean to hit the send button so soon 😊 I guess we are getting closer to a solution Thias From: Schwalbe Matthias Sent: Freitag, 15. Oktober 2021 08:49 To: 'Dan Hill' ; user Subject: RE: Any issues with reinterpretAsKeyedStream when scaling partitions? Hi Dan again 😊, I shed a secon

RE: Any issues with reinterpretAsKeyedStream when scaling partitions?

2021-10-14 Thread Schwalbe Matthias
Hi Dan again 😊, I shed a second look … from what I see from your call stack I conclude that indeed you have a network shuffle between your two operators, In which case reinterpretAsKeyedStream wouldn’t work ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277 indicates t

Re: Any issues with reinterpretAsKeyedStream when scaling partitions?

2021-10-14 Thread Dan Hill
Hi JING ZHANG! I changed StreamExecutionEnvironment.setParallelism(...). If I savepoint and start with the same parallelism, then it's fine. I hit the error with the parallelism values that I've tried. The failing rows change depending on the checkpoint that I use. Is there a good job template

Re: Any issues with reinterpretAsKeyedStream when scaling partitions?

2021-10-14 Thread JING ZHANG
Hi Hill, Would you please give more detail about "When I savepoint and start again with a different number of partitions" ? Do you change max_parallellism or Partitioner strategy? Besides, does this problem always happen, or does it happen occasionally when you restore from the savepoint? Would you

Re: Table API joining 2 streams with periodic updates

2021-10-14 Thread JING ZHANG
Hi Robert, The exception is caused by converting Table which contain retractions and updates to a DataStream. During table-to-stream conversion, this could lead to an exception similar to Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...]. In the case you n

NoClassDefFoundError if Kafka classes are assemblied in application jar

2021-10-14 Thread L . C . Hsieh
Hi, Flink developers, Does anyone encounter the following error? java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322) at org.apa

Re: How to refresh topics to ingest with KafkaSource?

2021-10-14 Thread Prasanna kumar
Yes you are right. We tested recently to find that the flink jobs do not pick up the new topics that got created with the same pattern provided to flink kafka consumer. The topics are set only during the start of the jobs. Prasanna. On Fri, 15 Oct 2021, 05:44 Preston Price, wrote: > Okay so t

Removing metrics

2021-10-14 Thread Mason Chen
Hi all, Suppose I have a short lived process within a UDF that defines metrics. After the process has completed, the underlying resources should be cleaned up. Is there an API to remove/unregister metrics? Best, Mason

Re: How to refresh topics to ingest with KafkaSource?

2021-10-14 Thread Preston Price
Okay so topic discovery is possible with topic patterns, and maybe topic lists. However I don't believe it's possible to change the configured topic list, or topic pattern after the source is created. On Thu, Oct 14, 2021, 3:52 PM Denis Nutiu wrote: > There is a setting for dynamic topic discove

Re: How to refresh topics to ingest with KafkaSource?

2021-10-14 Thread Denis Nutiu
There is a setting for dynamic topic discovery https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#topic-and-partition-discovery Best, Denis On Fri, Oct 15, 2021 at 12:48 AM Denis Nutiu wrote: > Hi, > > In my experience with the librdkafka client and the Go

Re: How to refresh topics to ingest with KafkaSource?

2021-10-14 Thread Denis Nutiu
Hi, In my experience with the librdkafka client and the Go wrapper, the topic-pattern subscribe is reactive. The Flink Kafka connector might behave similarly. Best, Denis On Fri, Oct 15, 2021 at 12:34 AM Preston Price wrote: > No, the topic-pattern won't work for my case. Topics that I should

Re: How to refresh topics to ingest with KafkaSource?

2021-10-14 Thread Preston Price
No, the topic-pattern won't work for my case. Topics that I should subscribe to can be enabled/disabled based on settings I read from another system, so there's no way to craft a single regular expression that would fit the state of all potential topics. Additionally the documentation you linked se

Table API joining 2 streams with periodic updates

2021-10-14 Thread Robert Cullen
I have a job that joins 2 streams using the Table API: DataStream stream = env.fromSource(dataSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); DataStream logTableStream = env.fromSource(logTableSource, WatermarkStrategy.forMonotonousTimestamps(), "Log Table Sou

Any issues with reinterpretAsKeyedStream when scaling partitions?

2021-10-14 Thread Dan Hill
I have a job that uses reinterpretAsKeyedStream across a simple map to avoid a shuffle. When changing the number of partitions, I'm hitting an issue with registerEventTimeTimer complaining that "key group from 110 to 119 does not contain 186". I'm using Flink v1.12.3. Any thoughts on this? I do

Re: Flink 1.11 loses track of event timestamps and watermarks after process function

2021-10-14 Thread Arvid Heise
Hi Ahmad, The ProcessFunction is simply forwarding the Watermark [1]. So I don't have any explanation as to why it would not advance anymore as soon as you emit data. My assumption was that by emitting in the process function causes backpressure and thus halts the advancement of the watermark upst

Re: Issue with Flink UI for Flink 1.14.0

2021-10-14 Thread Dawid Wysakowicz
I am afraid it is a bug in flink 1.14. I created a ticket for it FLINK-24550[1]. I believe we should pick it up soonish. Thanks for reporting the issue! Best, Dawid [1] https://issues.apache.org/jira/browse/FLINK-24550 On 13/10/2021 20:32, Peter Westermann wrote: > > Hello, > >   > > I just sta

Re: Flame Graph not showing up in UI

2021-10-14 Thread Shilpa Shankar
We are using flink 1.13.1 and its running in a kubernetes environment. - Shilpa On Thu, Oct 14, 2021 at 9:44 AM Shilpa Shankar wrote: > Hi Ingo, > > I am using google chrome and there are no errors on the console. > > Thanks, > Shilpa > > On Thu, Oct 14, 2021 at 9:41 AM Ingo Bürk wrote: > >>

Re: Flame Graph not showing up in UI

2021-10-14 Thread Shilpa Shankar
Hi Ingo, I am using google chrome and there are no errors on the console. Thanks, Shilpa On Thu, Oct 14, 2021 at 9:41 AM Ingo Bürk wrote: > Hi Shilpa, > > what browser are you using? Are there any errors in the browser's > developer console? > > > Ingo > > On Thu, Oct 14, 2021 at 3:17 PM Shilp

Re: Flame Graph not showing up in UI

2021-10-14 Thread Ingo Bürk
Hi Shilpa, what browser are you using? Are there any errors in the browser's developer console? Ingo On Thu, Oct 14, 2021 at 3:17 PM Shilpa Shankar wrote: > We enabled flame graphs to troubleshoot an issue with our job by adding > rest.flamegraph.enabled: > true to flink.conf . The UI doe

Re: a question about flink table catalog.

2021-10-14 Thread Martijn Visser
Hi, You could also consider using the JDBC Catalog implementation (via Postgres) or building your own custom one, see https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/catalogs/#user-defined-catalog Curious if that would help you out or if you need anything else. Best regards

Re: Exception: SequenceNumber is treated as a generic type

2021-10-14 Thread Ori Popowski
Thanks for answering. Not sure I understood the hack suggestion. If I copy SequenceNumber over to my job, how the original Flink Kinesis lib will use that class? It's fixed on a specific package (in this case org.apache.flink.streaming.connectors.kinesis.model. Unless, you meant to somehow hack th

Flame Graph not showing up in UI

2021-10-14 Thread Shilpa Shankar
We enabled flame graphs to troubleshoot an issue with our job by adding rest.flamegraph.enabled: true to flink.conf . The UI does not display anything when we select an operator and go to FlameGraph. Is there something else that needs to be enabled on the flink cluster? [image: image.png] Than

Re: a question about flink table catalog.

2021-10-14 Thread Dawid Wysakowicz
I don't think so. As the name tells it is stored "in-memory" which intrinsically means transient. If you need a persistent catalog you can use e.g. Hive or Postgres. You can also try to implement a catalog backed by e.g. a property file. This could potentially end up in Flink / flink-packages, but

Re:Re: a question about flink table catalog.

2021-10-14 Thread Yuepeng Pan
Dawid Wysakowicz Thanks for your reply. Will community to plan to implement this feature ? Best, Roc At 2021-10-14 21:00:37, "Dawid Wysakowicz" wrote: If I understand your question correctly, you're asking if you can somehow persist the GenericInMemoryCatalog. I am afraid i

Re: a question about flink table catalog.

2021-10-14 Thread Dawid Wysakowicz
If I understand your question correctly, you're asking if you can somehow persist the GenericInMemoryCatalog. I am afraid it is not possible. The idea of the GenericInMemoryCatalog is that it is transient and is stored purely in memory. Best, Dawid On 14/10/2021 13:44, Yuepeng Pan wrote: > Hi, 

Re: [External] metric collision using datadog and standalone Kubernetes HA mode

2021-10-14 Thread Chesnay Schepler
I think you are misunderstanding a few things. a) when you include a variable in the scope format, then Flink fills that in /before/ it reaches Datadog. If you set it to "flink.", then what we send to Datadog is "flink.myAwesomeJob". b) the exception you see is not coming from Datadog. They occ

Re: Exception: SequenceNumber is treated as a generic type

2021-10-14 Thread Dawid Wysakowicz
Hey Ori, As for the SequenceNumber issue, I'd say yes, it can be seen as a bug. In the current state one can not use kinesis consumer with the pipeline.generic-types=false. The problem is because we use the TypeInformation.of(SequenceNumber.class) method, which will in this case always fallback to

Re: hook a callback on checkpointing failure.

2021-10-14 Thread Martijn Visser
Hi, One way to do it would be to use the Flink Metrics [1] and use something like Prometheus to scrape the metrics and use them to create alerts? Thanks, Martijn [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#checkpointing On Thu, 14 Oct 2021 at 14:45, Mathieu D

Re:

2021-10-14 Thread Rui Li
Hey Andrew, Could you try something like the following? It worked for me to connect to a kerberized Hive. UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); ugi.doAs((PrivilegedAction) () -> { HiveCatalog hiveCatalog = new HiveCatalog(.

Re: JM / TM Starting With Modified Classpath Causes log4j2 Exception

2021-10-14 Thread Chesnay Schepler
Could you try setting set that env.java.opts in the Flink configuration? I remember a similar thread; we have a sort of bootstrap application which runs with a very limited classpath for some configuration stuff, and by setting the log4j configuration you are also affecting said application, w

hook a callback on checkpointing failure.

2021-10-14 Thread Mathieu D
Hey there, We have some instabilities around checkpointing, that we don't quite understand. In general, as soon as a checkpoint fails, our cluster does not recover back to a proper state. But to better understand the mechanism, we'd like to be notified as soon as this happens, so we can jump on ou

Re: JM / TM Starting With Modified Classpath Causes log4j2 Exception

2021-10-14 Thread Yuval Itzchakov
Yes I am On Thu, Oct 14, 2021, 13:32 Chesnay Schepler wrote: > Are you by chance explicitly setting the -Dlog4j.configurationFile option > (or however it is called?) > > On 14/10/2021 11:59, Yuval Itzchakov wrote: > > Just tried adding the jars to lib/, I still receive the same error message. >

Re:

2021-10-14 Thread Dawid Wysakowicz
I hope Rui (in cc) will be able to help you. Best, Dawid On 12/10/2021 15:32, Andrew Otto wrote: > Hello, > > I'm trying to use HiveCatalog with Kerberos.  Our Hadoop cluster, our > Hive Metastore, and our Hive Server are kerberized.  I can > successfully submit Flink jobs to Yarn authenticated

Re: Migrating createTemporaryView to new Table api.

2021-10-14 Thread Niels Basjes
Thanks. That works like a charm. Niels On Thu, Oct 14, 2021 at 5:17 AM Caizhi Weng wrote: > Hi! > > To implement the renaming of fields with the new API, try this: > > tableEnv.createTemporaryView( > "AgentStream", > inputStream, > Schema.newBuilder() >

a question about flink table catalog.

2021-10-14 Thread Yuepeng Pan
Hi, Community. If I want to save the catalog state after the operation based on GenericInMemoryCatalog, which is convenient to recover the last catalog instance when opening the session or tableEnvironment next time. Does flink support this feature?Thank you. Best, Roc

Re: FlinkJobNotFoundException

2021-10-14 Thread Matthias Pohl
Hi Doug, sorry for being not responsive the last two weeks. Other stuff kept me busy. A few things to note on your issue: It looks like the job result is requested while doing the job execution in a synchronous way. Flink will try to access the ArchivedExecutionGraphStore to get the job's result af

Re: Exception: SequenceNumber is treated as a generic type

2021-10-14 Thread Ori Popowski
I'd appreciate if someone could advice on this issue. Thanks On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski wrote: > Hi, > > I have a large backpressure in a somewhat simple Flink application in > Scala. Using Flink version 1.12.1. > > To find the source of the problem, I want to eliminate all cl

Re: JM / TM Starting With Modified Classpath Causes log4j2 Exception

2021-10-14 Thread Chesnay Schepler
Are you by chance explicitly setting the -Dlog4j.configurationFile option (or however it is called?) On 14/10/2021 11:59, Yuval Itzchakov wrote: Just tried adding the jars to lib/, I still receive the same error message. On Thu, Oct 14, 2021 at 12:53 PM Yuval Itzchakov wrote: I have a

Let PubSubSource support changing subscriptions?

2021-10-14 Thread Shiao-An Yuan
Hi community, Google Cloud PubSub has a feature called snapshot[1], which allows us to apply snapshots to subscriptions. I recently have a requirement to update the "filter" of subscription, but "filter" is unable to modify once it is created. Therefore, I create a snapshot on the current subscri

Re: JM / TM Starting With Modified Classpath Causes log4j2 Exception

2021-10-14 Thread Yuval Itzchakov
Just tried adding the jars to lib/, I still receive the same error message. On Thu, Oct 14, 2021 at 12:53 PM Yuval Itzchakov wrote: > I have an UBER jar that contains all the dependencies I require for the > execution, I find it weird that I need to add an external library to lib to > support th

Re: JM / TM Starting With Modified Classpath Causes log4j2 Exception

2021-10-14 Thread Yuval Itzchakov
I have an UBER jar that contains all the dependencies I require for the execution, I find it weird that I need to add an external library to lib to support this. On Thu, Oct 14, 2021 at 12:38 PM Chesnay Schepler wrote: > You need to add the jars to the lib directory. > This is independent of how

Re: JM / TM Starting With Modified Classpath Causes log4j2 Exception

2021-10-14 Thread Chesnay Schepler
You need to add the jars to the lib directory. This is independent of how Flink works; the JSONLayout needs jackson, so you need to make sure it is on the classpath. On 14/10/2021 11:34, Yuval Itzchakov wrote: Scala 2.12 Java 11 Flink 1.13.2 Running in Kubernetes Hi, While trying to use a co

Re: Flink-1.12 Sql on Job two SQL sink control order

2021-10-14 Thread Francesco Guardiani
I'm not aware of any way to control the sink order, afaik each Table#executeInsert will generate a separate job on its own. You may be able to hack it around by having a custom DynamicTableSink that for each record sends it to tidb and then to kafka. May I ask why you need that? If the notificatio

JM / TM Starting With Modified Classpath Causes log4j2 Exception

2021-10-14 Thread Yuval Itzchakov
Scala 2.12 Java 11 Flink 1.13.2 Running in Kubernetes Hi, While trying to use a configuration with JSONLayout in log4j2 with XML format, we receive the following error when bootstrapping the JM / TM: flink-adte-f7925524a2d37c92873ff0020dcd46c3-22b605b8-jm-946v7qt jobmanager Exception in thread "

Re: How to deserialize Avro enum type in Flink SQL?

2021-10-14 Thread Francesco Guardiani
It reproduces on my machine, so I've opened a JIRA issue about that: FLINK-24544 . Unfortunately, I don't have any ready to use workarounds for you. On Wed, Oct 13, 2021 at 8:43 PM Dongwon Kim wrote: > Can you provide a minimal reproducer (witho

Flink-1.12 Sql on Job two SQL sink control order

2021-10-14 Thread WuKong
Hi all: I have two Flink SQL , the same source from Kafka, and one SQL sink data into Tidb ,another one SQL sink Kafka to notify downstream system, how can I control the sink order , I wish If source Kafka data come, first sink Tidb and after that sink Kafka . --- Best, WuKong

Re: pyflink keyed stream checkpoint error

2021-10-14 Thread Dian Fu
Hi Curt, Could you try if it works by reducing python.fn-execution.bundle.size to 1000 or 100? Regards, Dian On Thu, Oct 14, 2021 at 2:47 AM Curt Buechter wrote: > Hi guys, > I'm still running into this problem. I checked the logs, and there is no > evidence that the python process crashed. I