Re: StreamingFileSink Not Flushing All Data

2020-03-02 Thread Rafi Aroch
Hi, This happens because StreamingFileSink does not support a finite input stream. In the docs it's mentioned under "Important Considerations": [image: image.png] This behaviour often surprises users... There's a FLIP

回复: Table API connect method timestamp watermark assignment problem

2020-03-02 Thread Lu Weizheng
Thanks a lot, hope it will be fixed soon! 发件人: Jark Wu 发送时间: 2020年3月3日 11:25 收件人: Lu Weizheng 抄送: user@flink.apache.org 主题: Re: Table API connect method timestamp watermark assignment problem Hi Lu, DDL and Schema descriptor do not share the same code path. I g

Re: Table API connect method timestamp watermark assignment problem

2020-03-02 Thread Jark Wu
Hi Lu, DDL and Schema descriptor do not share the same code path. I guess the reason why Schema descriptor doesn't work is because of FLINK-16160. We will fix that in the next minor release. Please use DDL to define watermark which is also the suggested way to do that. The current Schema descripto

Use flink to calculate sum of the inventory under certain conditions

2020-03-02 Thread Jiawei Wu
Hi flink users, We have a problem and think flink may be a good solution for that. But I'm new to flink and hope can get some insights from flink community :) Here is the problem. Suppose we have a DynamoDB table which store the inventory data, the schema is like: * vendorId (primary key) * inve

Table API connect method timestamp watermark assignment problem

2020-03-02 Thread Lu Weizheng
Hey guys, I am using Flink Table API recently. I want to use EventTime and use a Kakfa Table Connector. I think in my code Flink cannot recognize event time timestamp field. Here is my code : public static void main(String[] args) throws Exception { EnvironmentSettings fsSettings = En

Re: SHOW CREATE TABLE in Flink SQL

2020-03-02 Thread JingsongLee
Hi, Some previous discussion in [1], FYI [1] https://issues.apache.org/jira/browse/FLINK-10230 Best, Jingsong Lee -- From:Jark Wu Send Time:2020年3月2日(星期一) 22:42 To:Jeff Zhang Cc:"Gyula Fóra" ; user Subject:Re: SHOW CREATE TABLE

Re: java.time.LocalDateTime in POJO type

2020-03-02 Thread JingsongLee
Hi, I'v introduced LocalDateTime type information to flink-core. But for compatibility reason, I revert the modification in TypeExtractor. It seems that at present you can only use Types.LOCAL_DATE_TIME explicitly. [1] http://jira.apache.org/jira/browse/FLINK-12850 Best, Jingsong Lee -

Re: Providing hdfs name node IP for streaming file sink

2020-03-02 Thread Yang Wang
It may work. However, you need to set your own retry policy(similar as `ConfiguredFailoverProxyProvider` in hadoop). Also if you directly use namenode address and do not load HDFS configuration, some HDFS client configuration (e.g. dfs.client.*) will not take effect. Best, Yang Nick Bendtner 于2

Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread Robert Metzger
side note: this question has been asked on SO as well: https://stackoverflow.com/questions/60487571/flink-eventtime-processing-watermark-is-always-coming-as-9223372036854725808 (I'm mentioning this here so that we are not wasting support resources in our community on double-debugging issues) On Mo

Re: [Kubernetes] java.lang.OutOfMemoryError: Metaspace ?

2020-03-02 Thread Niels Basjes
I've put some information about my situation in the ticket https://issues.apache.org/jira/browse/FLINK-16142?focusedCommentId=17049679&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17049679 On Mon, Mar 2, 2020 at 2:55 PM Arvid Heise wrote: > Hi Niels, > > to add t

Re: java.time.LocalDateTime in POJO type

2020-03-02 Thread KristoffSC
Hi Tzu-Li, I think you misunderstood Oskar's question. The question was if there are there any plans to support Java's LocalDateTime in Flink's "native" de/serialization mechanism. As we can read in [1], for basic types, Flink supports all Java primitives and their boxed form, plus void, String, D

Re: what is the hash function that Flink creates the UID?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi, Flink currently performs a 128-bit murmur hash on the user-provided uids to generate the final node hashes in the stream graph. Specifically, this library is being used [1] as the hash function. If what you are looking for is for Flink to use exactly the provided hash, you can use `setUidHash

Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread aj
Hi David, Currently, I am testing it with a single source and parallelism 1 only so not able to understand this behavior. On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz wrote: > Hi Anuj, > > What parallelism has your source? Do all of your source tasks produce > records? Watermark is always th

Re: java.time.LocalDateTime in POJO type

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi, What that LOG means (i.e. "must be processed as a Generic Type") is that Flink will have to fallback to using Kryo for the serialization for that type. You should be concerned about that if: 1) That type is being used for some persisted state in snapshots. That would be the case if you've reg

Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi Kaymak, To answer your last question: there will be no data loss in that scenario you described, but there could be duplicate processed records. With checkpointing enabled, the Flink Kafka consumer does not commit offsets back to Kafka until offsets in Flink checkpoints have been persisted. T

Re: Providing hdfs name node IP for streaming file sink

2020-03-02 Thread Nick Bendtner
Thanks a lot Yang. What are your thoughts on catching the exception when a name node is down and retrying with the secondary name node ? Best, Nick. On Sun, Mar 1, 2020 at 9:05 PM Yang Wang wrote: > Hi Nick, > > Certainly you could directly use "namenode:port" as the schema of you HDFS > path.

what is the hash function that Flink creates the UID?

2020-03-02 Thread Felipe Gutierrez
Hi there! I am tracking the latency of my operators using "setLatencyTrackingInterval(1)" and I can see the latency metrics on the browser http://127.0.0.1:8081/jobs//metrics . For each logical operator I set a .uid("operator_name") and I know that Flink uses the UidHash to create a string for

Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread Dawid Wysakowicz
Hi Anuj, What parallelism has your source? Do all of your source tasks produce records? Watermark is always the minimum of timestamps seen from all the upstream operators. Therefore if some of them do not produce records the watermark will not progress. You can read more about Watermarks and how t

Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-02 Thread aj
I am trying to use process function to some processing on a set of events. I am using event time and keystream. The issue I am facing is The watermark value is always coming as 9223372036854725808. I have put print statement to debug and it shows like this: timestamp--1583128014000 extractedTi

Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Kaymak, Tobias
Thank you! One last question regarding Gordons response. When a pipeline stops consuming and cleanly shuts down and there is no error during that process, and then it gets started again and uses the last committed offset in Kafka - there should be no data loss - or am I missing something? In what

Re: StreamingFileSink Not Flushing All Data

2020-03-02 Thread Austin Cawley-Edwards
Hi Dawid and Kostas, Sorry for the late reply + thank you for the troubleshooting. I put together an example repo that reproduces the issue[1], because I did have checkpointing enabled in my previous case -- still must be doing something wrong with that config though. Thanks! Austin [1]: https:/

Re: SHOW CREATE TABLE in Flink SQL

2020-03-02 Thread Gyula Fóra
Thanks for the positive feedback and creating the JIRA ticket :) Gyula On Mon, Mar 2, 2020 at 3:15 PM Jark Wu wrote: > big +1 for this. I created an issue for "SHOW CREATE TABLE" [1]. Many > database systems also support this. > We can also introduce "describe extended table" in the future but

Re: SHOW CREATE TABLE in Flink SQL

2020-03-02 Thread Jark Wu
big +1 for this. I created an issue for "SHOW CREATE TABLE" [1]. Many database systems also support this. We can also introduce "describe extended table" in the future but is an orthogonal requirement. Best, Jark [1]: https://issues.apache.org/jira/browse/FLINK-16384 On Mon, 2 Mar 2020 at 22:09

java.time.LocalDateTime in POJO type

2020-03-02 Thread OskarM
Hi all, I wanted to use LocalDateTime field in my POJO class used in Flink's pipeline. However when I run the job I can see in the logs following statements: /class java.time.LocalDateTime does not contain a getter for field date class java.time.LocalDateTime does not contain a setter for field d

Re: SHOW CREATE TABLE in Flink SQL

2020-03-02 Thread Jeff Zhang
+1 for this, maybe we can add 'describe extended table' like hive Gyula Fóra 于2020年3月2日周一 下午8:49写道: > Hi All! > > I am looking for the functionality to show how a table was created or show > all the properties (connector, etc.) > > I could only find DESCRIBE at this point which only shows the sc

Re: [Kubernetes] java.lang.OutOfMemoryError: Metaspace ?

2020-03-02 Thread Arvid Heise
Hi Niels, to add to Yang. 96m is plenty of space and was heavily tested by Alibaba. The most likely reason and the motivation for the change is that you probably have a classloader leak in your pipeline, quite possibly by one of our connectors. For example, see FLINK-16142 [1]. If you could give

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Nitish Pant
Hi Arvid, It’s actually the second case. I just wanted to build a scalable generic case where I can pass a set of kafka topics and my consumer can use the same AvroDeserializationSchema. But yeah, I think I’ll do the fetching latest schema part in main() separately. Thanks for the help! > On

Re: How JobManager and TaskManager find each other?

2020-03-02 Thread Yang Wang
Hi KristoffSC, Regarding your questions inline. > 1. task deployment descriptor The `TaskDeploymentDescriptor` is used by JobMaster to submit a task to TaskManager. Since the JobMaster knows all the operator and its location, it will put the upstream operator location in the `TaskDeploymentDescri

Re: Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 Thread Benchao Li
I just run it in my IDE. sunfulin 于2020年3月2日周一 下午9:04写道: > > > Hi, > Yep, I am using 1.10 > Did you submit the job in the cluster or just run it in your IDE? Because > I can also run it successfully in my IDE, but cannot run it through cluster > by a shading jar. So I think maybe the problem is

Re:Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 Thread sunfulin
Hi, Yep, I am using 1.10 Did you submit the job in the cluster or just run it in your IDE? Because I can also run it successfully in my IDE, but cannot run it through cluster by a shading jar. So I think maybe the problem is related with maven jar classpath. But not sure about that. If y

SHOW CREATE TABLE in Flink SQL

2020-03-02 Thread Gyula Fóra
Hi All! I am looking for the functionality to show how a table was created or show all the properties (connector, etc.) I could only find DESCRIBE at this point which only shows the schema. Is there anything similar to "SHOW CREATE TABLE" or is this something that we should maybe add in the futu

Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Piotr Nowojski
Hi, Sorry for my previous slightly confusing response, please take a look at the response from Gordon. Piotrek > On 2 Mar 2020, at 12:05, Kaymak, Tobias wrote: > > Hi, > > let me refine my question: My pipeline is generated from Beam, so the Flink > pipeline is a translated Beam pipeline. W

Re: Flink on AWS - ActiveMQ connector

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi, The connectors that are listed in the AWS documentation page that you referenced are not provided by AWS. They are bundled connectors shipped by the Apache Flink community as part of official Flink releases, and are discoverable as artifacts from the Maven central repository. See the respectiv

Re: Re: Re: Re: Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF

2020-03-02 Thread Benchao Li
Hi fulin, I cannot reproduce your exception on current master using your SQLs. I searched the error message, it seems that this issue[1] is similar with yours, but it seems that current compile util does not have this issue. BTW, do you using 1.10? [1] https://issues.apache.org/jira/browse/FLINK

Re: How is state stored in rocksdb?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi, First of all, state is only managed by Flink (and therefore Flink's state backends) if the state is registered by the user. You can take a look at the documents here [1] on details on how to register state. A state has to be registered for it to be persisted in checkpoints / savepoints, and be

Re: [Kubernetes] java.lang.OutOfMemoryError: Metaspace ?

2020-03-02 Thread Yang Wang
>From 1.10, Flink will enable the metaspace limit via "-XX:MaxMetaspaceSize" by default. The default value is 96m, loading too many classes will cause "OutOfMemoryError: Metaspace"[1]. So you need to increase the configured value. [1]. https://ci.apache.org/projects/flink/flink-docs-release-1.10/

Re: Correct way to e2e test a Flink application?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi Laurent, You can take a look at Flink's MiniClusterResource JUnit test rule, and its usages in the codebase for that. The rule launches a Flink MiniCluster within the same JVM, and submission to the mini cluster resembles how it would be submitting to an actual Flink cluster, so you would alrea

Re: Question about runtime filter

2020-03-02 Thread faaron zheng
I set sql.exec.runtime-filter.wait to true. HiveTableSource take much longer time but get same result. I think the reason is not commit preAggregateAccumulator. But I dont know why it happens? JingsongLee 于 2020年3月2日周一 下午3:22写道: > Hi, > > Does runtime filter probe side wait for building runtime

How is state stored in rocksdb?

2020-03-02 Thread kant kodali
Hi All, I am wondering how Flink serializes and deserializes state from rockdb? What is the format used? For example, say I am doing some stateful streaming and say an object for my class below represents a state. how does Flink serializes and deserializes the object of MyClass below? is it just

Correct way to e2e test a Flink application?

2020-03-02 Thread Laurent Exsteens
Hello, I would like to test a Flink application, including any problem that would happen when deployed on a distributed cluster. The way we do this currently is to launch a Flink cluster in Docker and run the job on it. This setup seems heavy and might not be necessary. Is there a way to simulat

Re: Flink Session Window to enrich Event with unique id

2020-03-02 Thread aj
Hi, Is using the session window to implement the above logic is good idea or i should use process function. On Sun, Mar 1, 2020 at 11:39 AM aj wrote: > Hi , > > I am working on a use case where i have a stream of events. I want to > attach a unique id to all the events happened in a session. > B

[Kubernetes] java.lang.OutOfMemoryError: Metaspace ?

2020-03-02 Thread Niels Basjes
Hi, I'm running a lot of batch jobs on Kubernetes once in a while I get this exception. What is causing this? How can I fix this? Niels Basjes java.lang.OutOfMemoryError: Metaspace at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:

Flink on AWS - ActiveMQ connector

2020-03-02 Thread KristoffSC
Hi all, In AWS documentation [1] we can see that AWS provides some set of connectors for Flink. I would need to use an ActiveMQ one provided by [2]. Currently I'm using Docker based stand alone Job Cluster and not AWS one. Whats up with those connectors provided by AWS? Will I be able to use my c

Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Kaymak, Tobias
Hi, let me refine my question: My pipeline is generated from Beam, so the Flink pipeline is a translated Beam pipeline. When I update my Apache Beam pipeline code, working with a snapshot in Flink to stop the pipeline is not an option, as the snapshot will use the old representation of the the Fli

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Arvid Heise
I didn't get the use case completely. Are you using several sensors with different schemas? Are processing them jointly? Let's assume some cases: 1) Only one format, it would be best to generate a case class with avrohugger. That is especially true if you processing actually requires specific fiel

Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Piotr Nowojski
Hi Tobi, No, FlinkKafkaConsumer is not using committed Kafka’s offsets for recovery. Offsets where to start from are stored in the checkpoint itself. Updating the offsets back to Kafka is an optional, purely cosmetic thing from the Flink’s perspective, so the job will start from the correct off

Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Tzu-Li Tai
Hi Tobi, In this case, the job would indeed continue from the last offset that has been committed in Kafka (assuming that you are using the `startFromGroupOffsets` start position) for the specified group id. However, do keep in mind that those offsets are not consistent with the offsets written in

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Nitish Pant
Hi, So I am building a data pipeline that takes input from sensors via MQTT broker and passes it to kafka. Before it goes to kafka, I am filtering and serializing the filtered data into avro format and keeping the schema in the registry. Now I want to get that data in flink to process it using

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Arvid Heise
Could you please give more background on your use case? It's hard to give any advice with the little information you gave us. Usually, the consumer should know the schema or else it's hard to do meaningful processing. If it's something completely generic, then there is no way around it, but that s

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Nitish Pant
Hi, Thanks for the replies. I get that it is not wise to use GenericRecord and that is what is causing the Kryo fallback, but then if not this, how should I go about writing a AvroSchemaRegistrySchema for when I don’t know the schema. Without the knowledge of schema, I can’t create a class. Can

Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Kaymak, Tobias
Thank you Piotr! One last question - let's assume my source is a Kafka topic - if I stop via the CLI with a savepoint in Flink 1.9, but do not use that savepoint when restarting my job - the job would continue from the last offset that has been committed in Kafka and thus I would also not experien

Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-02 Thread kant kodali
Hi Arvid, Yes I got it..and it works as said in my previous email. Thanks! On Mon, Mar 2, 2020 at 12:10 AM Arvid Heise wrote: > Hi Kant, > > I think Dawid meant to not add the Kafka version number like this: > > flinkShadowJar > "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion

Re: Question about runtime filter

2020-03-02 Thread faaron zheng
Thanks for replying Lee, I follow your method to debug the code and I find the build side only call addPreAggregatedAccumulator but not call commit method. Furthermore, I add a breakpoint at future.handleAsync in asyncGetBroadcastBloomFilter method. But when program stop at if(e==null && accumulat

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Dawid Wysakowicz
Hi Nitish, Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer comes from the call to TypeExtractor.getForClass(classOf[GenericRecord]). As a GenericRecord is not a pojo this call will produce a GenericTypeInfo which uses Kryo serialization. For a reference example I would

Re: Exceptions in Web UI do not appear in logs

2020-03-02 Thread Arvid Heise
If an exception is unhandled in connectors, it will eventually be handled by the runtime, where it is logged and the task fails. Doing both logging and throwing an exception is an anti-pattern as the consumer of an exception should have the sole responsibility of handling it correctly. In your cas

Re: Do I need to use Table API or DataStream API to construct sources?

2020-03-02 Thread Arvid Heise
Hi Kant, I think Dawid meant to not add the Kafka version number like this: flinkShadowJar "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}" On Sun, Mar 1, 2020 at 7:31 PM kant kodali wrote: > * What went wrong: > Could not determine the dependencies of task ':shadowJar'. > >

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Arvid Heise
Hi Nitish, Kryo is the fallback serializer of Flink when everything else fails. In general, performance suffers quite a bit and it's not always applicable as in your case. Especially, in production code, it's best to avoid it completely. In your case, the issue is that your provided type informat