Flink SQL join usecase

2020-05-12 Thread shadowell
Hi, I am new to Flink SQL, I want to know whether Flink SQL(Flink-1.10) supports the following join syntax: ``` select a.id, a.col_1, b.col_1, c.col_1 from topic_a a inner join topic_b b on a.id = b.id left join topic_c c on a.id = c.id and a.col_1 = c.col_1 and b.col_

Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-12 Thread Robert Metzger
Hi Peter, I filed a ticket for this feature request: https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add your thoughts / requirements to the ticket) Best, Robert On Wed, May 6, 2020 at 3:41 AM Jingsong Li wrote: > Hi Peter, > > The troublesome is how to know the "ending" for a

Re: Not able to implement an usecase

2020-05-12 Thread Khachatryan Roman
AFAIK, yes, you can write streams. I'm pulling in Jingsong Li and Rui Li as they might know better. Regards, Roman On Mon, May 11, 2020 at 10:21 PM Jaswin Shah wrote: > If I go with table apis, can I write the streams to hive or it is only for > batch processing as of now. > > Get Outlook for

Re: Not able to implement an usecase

2020-05-12 Thread Jingsong Li
Thanks Roman for involving me. Hi Jaswin, FLIP-115[1] will finish Kafka -> Hive/Filesystem. And will be released in 1.11. We will provide two connectors in table: - file system connector, this connector manage partitions and files by file system paths. You can define a file system table with par

Re: Not able to implement an usecase

2020-05-12 Thread Rui Li
The hive table sink is only for batch processing in Flink 1.10. There're some on-going efforts to support writing streams to hive and we intend to make it available in 1.11. Stay tuned :) On Tue, May 12, 2020 at 3:52 PM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > AFAIK, yes, you can

Tumbling window per key

2020-05-12 Thread Navneeth Krishnan
Hi All, I was looking at the documentation for windows and got a little confused. As per my understanding tumbling window per key will create a non overlapping window based on when the data for that key arrived. For example consider a tumbling window of 30 seconds user1 - 10:01:01 user2 - 10:01:02

Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Gary Yao
Hi Nick, Are you able to upgrade to Flink 1.9? Beginning with Flink 1.9 you can use KafkaSerializationSchema to produce a ProducerRecord [1][2]. Best, Gary [1] https://issues.apache.org/jira/browse/FLINK-11693 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/f

回复:Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-12 Thread Yun Gao
Hi Peter, Sorry for missing the question and response later, I'm currently sworking together with Jingsong on the issue to support "global committing" (like writing _SUCCESS file or adding partitions to hive store) after buckets terminated. In 1.11 we may first support watermark/time relate

TableConfig TTL and Watermark TTL

2020-05-12 Thread lec ssmi
Hi: If I join two streams in SQL, the time range is used as a condition, similar to the time interval join in DataStream. So, will this join state expire as the watermark moves, or will it expire with the TTL time configured by TableConfig? Or both? Best Lec Ssmi

Flink BLOB server port exposed externally

2020-05-12 Thread Omar Gawi
Hi All, I have Apache Flink running as part of our java program , on a linux machine. The Flink runs on thread(s) within the same java process. I see that the machine has the BLOB server port 1098 exposed to the outside : davc@sdavc:~$ netstat -anp | grep LISTEN (Not all processes could be ident

Re: Tumbling window per key

2020-05-12 Thread Arvid Heise
Hi Navneeth, Your understanding is correct. In the image, all windows across the keys for the same timespan are grouped together, which make sense from a logical perspective as you would talk about the first, second, ... window. But technically, there are 15 small windows involved instead of the

Re: Broadcast state vs data enrichment

2020-05-12 Thread Khachatryan Roman
Thanks for the clarification. Apparently, the second option (with enricher) creates more load by adding configuration to every event. Unless events are much bigger than the configuration, this will significantly increase network, memory, and CPU usage. Btw, I think you don't need a broadcast in th

Re: Flink BLOB server port exposed externally

2020-05-12 Thread Arvid Heise
Hi Omar, wouldn't it be possible to just create an iptable rule that allows access to 1098 only from localhost? I don't think you can open a socket just for localhost programmatically (at least not from Java). Best, Arvid On Tue, May 12, 2020 at 12:51 PM Omar Gawi wrote: > Hi All, > > I have

Re: Flink SQL join usecase

2020-05-12 Thread Benchao Li
Yes. Flink SQL supports this syntax. shadowell 于2020年5月12日周二 下午3:25写道: > Hi, > > I am new to Flink SQL, I want to know whether Flink SQL(Flink-1.10) > supports the following join syntax: > > ``` >select a.id, a.col_1, b.col_1, c.col_1 from topic_a a > inner join topic_b b on a.id =

Re: Need suggestion on Flink-Kafka stream processing design

2020-05-12 Thread Khachatryan Roman
Hello Hemant, Thanks for your reply. I think partitioning you described (event type + protocol type) is subject to data skew. Including a device ID should solve this problem. Also, including "protocol_type" into the key and having topic per protocol_type seems redundant. Furthermore, do you have

Re: TableConfig TTL and Watermark TTL

2020-05-12 Thread Benchao Li
The state will be cleaned with watermark movement. lec ssmi 于2020年5月12日周二 下午5:55写道: > Hi: > If I join two streams in SQL, the time range is used as a condition, > similar to the time interval join in DataStream. So, will this join state > expire as the watermark moves, or will it expire with

Re: TableConfig TTL and Watermark TTL

2020-05-12 Thread lec ssmi
Then if I don't write time constraints, will it expire with the TTL time configured by TableConfig? Benchao Li 于 2020年5月12日周二 20:27写道: > The state will be cleaned with watermark movement. > > lec ssmi 于2020年5月12日周二 下午5:55写道: > >> Hi: >> If I join two streams in SQL, the time range is used as

Re: TableConfig TTL and Watermark TTL

2020-05-12 Thread Benchao Li
Yes, you are right. If you add time constraints, it will be translated [Proc/Row]TimeBoundedStreamJoin, If not, it will be translated into StreamingJoinOperator. They are totally different two operators. lec ssmi 于2020年5月12日周二 下午8:43写道: > Then if I don't write time constraints, > will it expire

Re: TableConfig TTL and Watermark TTL

2020-05-12 Thread lec ssmi
Thank you for the clarification. Benchao Li 于 2020年5月12日周二 20:48写道: > Yes, you are right. > > If you add time constraints, it will be translated > [Proc/Row]TimeBoundedStreamJoin, > If not, it will be translated into StreamingJoinOperator. > They are totally different two operators. > > lec ssmi

Re: Need suggestion on Flink-Kafka stream processing design

2020-05-12 Thread hemant singh
Hello Roman, Thanks for your response. I think partitioning you described (event type + protocol type) is subject to data skew. Including a device ID should solve this problem. Also, including "protocol_type" into the key and having topic per protocol_type seems redundant. Each protocol is in sin

Re: Flink Memory analyze on AWS EMR

2020-05-12 Thread Jacky D
hi, Xintong Thanks for reply , I attached those lines below for application master start command : 2020-05-11 21:16:16,635 DEBUG org.apache.hadoop.util.PerformanceAdvisory - Crypto codec org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec is not available. 2020-05-11 21:16:16,635 D

Flink Metrics in kubernetes

2020-05-12 Thread Averell
Hi, I'm trying to config Flink running in Kubernetes native to push some metrics to NewRelic (using a custom ScheduledDropwizardReporter). >From the logs, I could see that an instance of ScheduledDropwizardReporter has already been created successfully (the overridden getReporter() method

Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Nick Bendtner
Hi Gary, Thanks for the info. I am aware this feature is available in 1.9.0 onwards. Our cluster is still very old and have CICD challenges,I was hoping not to bloat up the application jar by packaging even flink-core with it. If its not possible to do this with older version without writing our ow

Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-12 Thread Theo Diefenthal
Hi Yun, For me, that sounds quite nice. I implemented the same for my application a few weeks ago, but of course tailored only to my app. What I did: 1. I wrapped the Parquet-StreamingFileSink into a Process-Function. 2. I extended the default ProcessOperator and instead of "notifyCheckpoint

Flink Streaming Job Tuning help

2020-05-12 Thread Senthil Kumar
Hello Flink Community! We have a fairly intensive flink streaming application, processing 8-9 million records a minute, with each record being 10k. One of our steps is a keyBy operation. We are finding that flink lags seriously behind when we introduce the keyBy (presumably because of shuffle ac

Re: Flink Streaming Job Tuning help

2020-05-12 Thread Senthil Kumar
I forgot to mention, we are consuming said records from AWS kinesis and writing out to S3. From: Senthil Kumar Date: Tuesday, May 12, 2020 at 10:47 AM To: "user@flink.apache.org" Subject: Flink Streaming Job Tuning help Hello Flink Community! We have a fairly intensive flink streaming applica

Re: Flink Memory analyze on AWS EMR

2020-05-12 Thread Arvid Heise
Hi Jacky, I suspect that the quotes are the actual issue. Could you try to remove them? See also [1]. [1] http://blogs.perl.org/users/tinita/2018/03/strings-in-yaml---to-quote-or-not-to-quote.html On Tue, May 12, 2020 at 4:03 PM Jacky D wrote: > hi, Xintong > > Thanks for reply , I attached th

Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-12 Thread Peter Groesbeck
Robert, Yun, and Theo, Thanks for the responses! I'm very much looking forward to upgrading once those changes are made. I hacked this in myself but likely in a much less elegant way than Theo. For anybody who is curious - I extended the DateTimeBucketAssigner class and overrode the getBucketId m

Re: Flink Memory analyze on AWS EMR

2020-05-12 Thread Jacky D
hi, Arvid thanks for the advice , I removed the quotes and it do created a yarn session on EMR , but I didn't find any jit log file generated . The config with quotes is working on standalone cluster . I also tried to dynamic pass the property within the yarn session command : flink-yarn-sessi

Incremental state with purging

2020-05-12 Thread Annemarie Burger
Hi, I'm trying to implement the most efficient way to incrementally put incoming DataStream elements in my (map)state, while removing old elements (older that x) from that same state. I then want to output the state every y seconds. I've looked into using the ProcessFunction with onTimer, or build

Re: Need suggestion on Flink-Kafka stream processing design

2020-05-12 Thread hemant singh
Hello Flink Users, Any views on this question of mine. Thanks, Hemant On Tue, May 12, 2020 at 7:00 PM hemant singh wrote: > Hello Roman, > > Thanks for your response. > > I think partitioning you described (event type + protocol type) is subject > to data skew. Including a device ID should sol

Re: Flink Metrics in kubernetes

2020-05-12 Thread Gary Yao
Hi Averell, If you are seeing the log message from [1] and Scheduled#report() is not called, the thread in the "Flink-MetricRegistry" thread pool might be blocked. You can use the jstack utility to see on which task the thread pool is blocked. Best, Gary [1] https://github.com/apache/flink/blob

Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Gary Yao
Hi Nick, Can you explain why it is required to package flink-core into your application jar? Usually flink-core is a dependency with provided scope [1] Best, Gary [1] https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#dependency-scope On Tue, May 12, 2020 at

Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Nick Bendtner
Hi Gary, Its because the flink distribution of the cluster is 1.7.2. We use a standalone cluster , so in the lib directory in flink the artifact is flink-core-1.7.2.jar . I need to pack flink-core-1.9.0.jar from application and use child first class loading to use newer version of flink-core. If I

Re: Need suggestion on Flink-Kafka stream processing design

2020-05-12 Thread Arvid Heise
Hi Hemant, In general, you want to keep all data coming from one device in one Kafka partition, such that the timestamps of that device are monotonically increasing. Thus, when processing data from one device, you have ensured that no out-of-order events with respect to this device happen. If you

Re: Window processing in Stateful Functions

2020-05-12 Thread Igal Shilman
Hi, One way to keep the state size under control would be: 1) attach for every incoming edge it's "insertion time" into the vertex function's state. 2) in addition, the vertex function would send a delayed message, with a delay of insertion time + expiration duration 3) once a delayed message arriv

changing the output files names in Streamfilesink from part-00 to something else

2020-05-12 Thread dhurandar S
We want to change the name of the file being generated as the output of our StreamFileSink. , when files are generated they are named part-00*, is there a way that we can change the name. In Hadoop, we can change RecordWriters and MultipleOutputs. May I please some help in this regard. This is cau

Re: Statefun 2.0 questions

2020-05-12 Thread Wouter Zorgdrager
Hi Igal, all, In the meantime we found a way to serve Flink stateful functions in a frontend. We decided to add another (set of) Flask application(s) which link to Kafka topics. These Kafka topics then serve as ingress and egress for the statefun cluster. However, we're wondering how we can scale

Re: Statefun 2.0 questions

2020-05-12 Thread Igal Shilman
Hi Wouter, Triggering a stateful function from a frontend indeed requires an ingress between them, so the way you've approached this is also the way we were thinking of. As Gordon mentioned a potential improvement might be an HTTP ingress, that would allow triggering stateful functions directly fr

Re: Need suggestion on Flink-Kafka stream processing design

2020-05-12 Thread hemant singh
Hi Arvid, I don't want to aggregate all events, rather I want to create a record for a device combining data from multiple events. Each of this event gives me a metric for a device, so for example if I want one record for device-id=1 the metric will look like metric1, metric2, metric3 where m

Register time attribute while converting a DataStream to Table

2020-05-12 Thread Jiahui Jiang
Hello Flink friends, I have a retract stream in the format of 'DataStream' that I want to register into my table environment, and also expose processing time column in the table. For a regular datastream, I have being doing 'tableEnvironment.createTemporaryView(path, dataStream, 'field1,field2,

Re: Flink restart strategy on specific exception

2020-05-12 Thread Ken Krugler
Hi Til, Sorry to resurface an ancient question, but is there a working example anywhere of setting a custom restart strategy? Asking because I’ve been wandering through the Flink 1.9 code base for a while, and the restart strategy implementation is…pretty tangled. From what I’ve been able to f

Re: Flink restart strategy on specific exception

2020-05-12 Thread Ken Krugler
Hi Til, Sorry, missed the key question…in the RestartStrategy.restart() method, I don’t see any good way to get at the underlying exception. I can cast the RestartCallback to an ExecutionGraphRestartCallback, but I still need access to the private execGraph to be able to get at the failure info

Re: Flink Metrics in kubernetes

2020-05-12 Thread Averell
Hi Gary, Thanks for the help. Here below is the output from jstack. It seems not being blocked. In my JobManager log, there's this WARN, I am not sure whether it's relevant at all. Attached is the full jstack dump k8xDump.txt

Re: Flink Memory analyze on AWS EMR

2020-05-12 Thread Xintong Song
Hi Jacky, I don't think ${FLINK_LOG_PREFIX} is available for Flink Yarn deployment. This is just my guess, that the actual file name becomes ".jit". You can try to verify that by looking for the hidden file. If it is indeed this problem, you can try to replace "${FLINK_LOG_PREFIX}" with "/your-fi

Prometheus Pushgateway Reporter Can not DELETE metrics on pushgateway

2020-05-12 Thread 李佳宸
Hi, I got stuck in using Prometheus,Pushgateway to collect metrics. Here is my configuration about reporter: metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter metrics.reporter.promgateway.host: localhost metrics.reporter.promgateway.port: 9091

回复:changing the output files names in Streamfilesink from part-00 to something else

2020-05-12 Thread Yun Gao
Hi Dhurandar: Currently StreamingFileSink should be able to change the prefix and suffix of the filename[1], it could be changed to something like -0-0. Could this solve your problem ? Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_si

Re: Prometheus Pushgateway Reporter Can not DELETE metrics on pushgateway

2020-05-12 Thread Thomas Huang
I met this issue three months ago. Finally, we got the conclusion that is Prometheus push gateway can not handle high throughout metric data. But we solved the issue via service discovery. We changed the Prometheus metric reporter code, adding the registration logic, so the job can expose the ho

Re: Flink Streaming Job Tuning help

2020-05-12 Thread Zhijiang
Hi Kumar, I can give some general ideas for further analysis. > We are finding that flink lags seriously behind when we introduce the keyBy > (presumably because of shuffle across the network) The `keyBy` would break the chained operators, so it might bring obvious performance sensitive in pra

How To subscribe a Kinesis Stream using enhance fanout?

2020-05-12 Thread Xiaolong Wang
Hello Flink Community! I'm currently coding on a project relying on AWS Kinesis. With the provided connector (flink-connector-kinesis_2.11;1.10.0), I can consume the message. But as the main stream is used among several other teams, I was required to use the enhance fanout of Kinesis.

Re: Flink restart strategy on specific exception

2020-05-12 Thread Zhu Zhu
Hi Ken, Custom restart-strategy was an experimental feature and was deprecated since 1.10. [1] That's why you cannot find any documentation for it. The old RestartStrategy was deprecated and replaced by RestartBackoffTimeStrategy since 1.10 (unless you are using the legacy scheduler which was als