Re: How to print log in flink-streaming-java module

2020-04-20 Thread Leonard Xu
Hi, Lee Flink ships with the following default properties files[1]: log4j-cli.properties: Used by the Flink command line client (e.g. flink run) (not code executed on the cluster) log4j-yarn-session.properties: Used by the Flink command line client when starting a YARN session (yarn-session.sh)

Re:Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread forideal
Hi Kurt: I had the same mistake. sql: insertinto dw_access_log select get_json_value(query_nor, query_nor_counter) as`value`from ods_access_log_source groupby tumble (time_key, interval'1'MINUTE), group_key get_json_value public class GetJsonValue extends AggregateFunction> { @Ove

Re: How to print log in flink-streaming-java module

2020-04-20 Thread Lee Sysuke
Pls refer to conf/log4j.properties Polarisary 于2020年4月21日周二 上午11:08写道: > Hi all, > when i add some log in org.apache.flink.streaming.api.environment( > flink-streaming-java module ) and package flink-dist_2.11-1.10.0.jar, but > it print nothing in jm or tm log. > > i did it like this: > > ``` >

Re: Enrich streaming data with small lookup data that slowly changes over time

2020-04-20 Thread Mu Kong
Hi Jark Wu, Thanks for your help! I gave the document a quick glimpse, it seems method [1] fits my purpose better. Let me give it a deeper look. Thank you very much!! Best, Mu On Tue, Apr 21, 2020 at 12:06 PM Jark Wu wrote: > Hi Mu, > > Flink SQL does support dimension table join. There are

Re: Checkpoints for kafka source sometimes get 55 GB size (instead of 2 MB) and flink job fails during restoring from such checkpoint

2020-04-20 Thread Yun Tang
Hi Oleg Have you ever checked to load the _metadata via Checkpoints.loadCheckpointMetadata to see how many records in the offsets meta? If only one record which is indicated from the logs, that will be really weird. Moreover, I have several comments based on your description: * state.backen

How to print log in flink-streaming-java module

2020-04-20 Thread Polarisary
Hi all, when i add some log in org.apache.flink.streaming.api.environment( flink-streaming-java module ) and package flink-dist_2.11-1.10.0.jar, but it print nothing in jm or tm log. i did it like this: ``` public JobClient executeAsync(StreamGraph streamGraph) throws Exception { ch

Re: Enrich streaming data with small lookup data that slowly changes over time

2020-04-20 Thread Jark Wu
Hi Mu, Flink SQL does support dimension table join. There are two ways to join the dimension table. If the data is in your database (e.g. MySQL, HBase), you can use this way [1] to join the data in your database in realtime and enrich fresh data. If the data is in a log stream (change stream), you

Enrich streaming data with small lookup data that slowly changes over time

2020-04-20 Thread Mu Kong
Hi community, I have a stream of traffic data with a service_id in it. I'm enriching this data with a map of (service_id, service_name), which only has 10 ~ 20 pairs and is read from config file. The problem I'm facing now is, this map changes from time to time, and I don't want to redeploy the a

Re: Questions about Flink RichSinkFunction constructor VS open()

2020-04-20 Thread Jark Wu
Hi Jiawei, Yes, you should initialize connection in open() method, because constructor is only called in client side (where may can't connect to your database). Besides, after construction, the RichSinkFunction instance will be serialized into binary and ship to server (TaskManagers) for deseriali

Re: StreamQueryConfig vs TemporalTableFunction

2020-04-20 Thread Jark Wu
Hi Dom, The TemporalTableFunction join will also cleanup expired state if `setIdleStateRetentionTime()` is enabled in StreamQueryConfig or TableConfig. Best, Jark On Tue, 21 Apr 2020 at 04:47, Dominik Wosiński wrote: > Hey, > I wanted to ask whether the TemporalTableFunctions are subject to >

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread Kurt Young
Thanks, once you can reproduce this issue locally, please open a jira with your testing program. Best, Kurt On Tue, Apr 21, 2020 at 8:48 AM 刘建刚 wrote: > Thank you. It is an online job and my input is huge. I check the trace and > find that the array is resized when the array is not enough. The

Questions about Flink RichSinkFunction constructor VS open()

2020-04-20 Thread Jiawei Wu
I have posted this question in StackOverflow: https://stackoverflow.com/questions/61334549/flink-richsinkfunction-constructor-vs-open The question is: > Let's say I need to implemnt a custom sink using RichSinkFunction, and I need some variables like DBConnection in the sink. Where should I initia

Re: Changing number of partitions for a topic

2020-04-20 Thread Benchao Li
Hi Suraj, There is a config option[1] to enable partition discovery, which is disabled by default. The community discussed to enable it by default[2], but only aims to the new Source API. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#partition-discovery [2]

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread 刘建刚
Thank you. It is an online job and my input is huge. I check the trace and find that the array is resized when the array is not enough. The code is as below: public void add (int value) { int[] items = this.items; if (size == items.length) items = resize(Math.max(8, (int)(size * 1.75f)));

Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi, I tried to add the following cast, and it works. Doesn't look nice though /StreamingFileSink .forRowFormat(new Path(path), myEncoder) .withRollingPolicy(DefaultRollingPolicy.create().build()) .withBucketAssigner(myBucketAssigner)*.asInstan

Re: Job manager URI rpc address:port

2020-04-20 Thread Som Lima
This is the code I was looking for, which will allow me programmatically to connect to remote jobmanager same as spark remote master . The spark master which shares the compute load with slaves , in the case of flink jobmanager with taskmanagers. Configuration conf = new Configuration(); conf.s

Suppressing illegal Access Warnings

2020-04-20 Thread Zahid Rahman
Hi, *I was getting these warnings, I think these are due to certain version of Maven libraries which is impacting Java frameworks every where.* WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/ho

RocksDB default logging configuration

2020-04-20 Thread Bajaj, Abhinav
Hi, Some of our teams ran into the disk space issues because of RocksDB default logging configuration - FLINK-15068. It seems the workaround suggested uses the OptionsFactory to set some of the parameters from inside the job. Since we provisio

Re: Problem getting watermark right with event time

2020-04-20 Thread Fabian Hueske
Hi Sudan, I noticed a few issues with your code: 1) Please check the computation of timestamps. Your code public long extractAscendingTimestamp(Eventi.Event element) { return element.getEventTime().getSeconds() * 1000; } only seems to look at the seconds of a timestamp. Typically, you wou

Re: FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema

2020-04-20 Thread Fabian Hueske
Hi Anil, Here's a pointer to Flink's end-2-end test that's checking the integration with schema registry [1]. It was recently updated so I hope it works the same way in Flink 1.9. Best, Fabian [1] https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/

Running in LocalExecutionEnvironment in production

2020-04-20 Thread Suraj Puvvada
Hello, We currently have a lot of jobs running in LocalExecutionEnvorinment and wanted to understand the limitations and if it is recommended to run in this mode. Appreciate your thoughts on this. Thanks Suraj

Changing number of partitions for a topic

2020-04-20 Thread Suraj Puvvada
Hello, I have a flink job that reads from a source topic that currently has 4 partitions and I need to increase the partition count to 8. Do you need to restart the job for that to take effect ? How does it work in case there is persistent state (like a window operator) involved ? Any design doc

StreamQueryConfig vs TemporalTableFunction

2020-04-20 Thread Dominik Wosiński
Hey, I wanted to ask whether the TemporalTableFunctions are subject to StreamQueryConfig retention? I was pretty sure that they are not, but I have recently noticed weird behavior in one of my jobs that suggests that they indeed are. Thanks for answers, Best Regards, Dom.

Distributed Incremental Streaming Graph Analytics: State Accessing/Message Passing Options

2020-04-20 Thread burgeraw
I'm working on a system to process streaming graphs in Flink. I am trying to maintain the state of the graph within a time window, so I can then run graph algorithms on it. The goal is to do this with incremental updates, so the state does not have to be fully recomputed for each window. I figured

Re: Cannot register for Flink Forward Conference

2020-04-20 Thread Eleanore Jin
Hi Seth, Thanks for the prompt response! Yes all my colleagues were able to register. Best, Eleanore On Mon, Apr 20, 2020 at 12:49 PM Seth Wiesman wrote: > Hi Eleanore, > > There was a misconfiguration on the website if you try again everything > should work. > > Seth > > On Mon, Apr 20, 2020

Re: Cannot register for Flink Forward Conference

2020-04-20 Thread Seth Wiesman
Hi Eleanore, There was a misconfiguration on the website if you try again everything should work. Seth On Mon, Apr 20, 2020 at 1:39 PM Eleanore Jin wrote: > Hi community, > > My colleagues tried to register for the Flink forward conference: > https://www.bigmarker.com/series/flink-forward-virt

Cannot register for Flink Forward Conference

2020-04-20 Thread Eleanore Jin
Hi community, My colleagues tried to register for the Flink forward conference: https://www.bigmarker.com/series/flink-forward-virtual-confer1/series_summit?__hssc=37051071.2.1587407738905&__hstc=37051071.db5b837955b42a71990541edc07d7beb.1587407738904.1587407738904.1587407738904.1&hsCtaTracking=33

Re: FLINK JOB solved

2020-04-20 Thread Som Lima
I found the problem. in the flink1.0.0/conf There are two files. Masters and slaves the Masters contains localhost:8081 in the slaves just localhost. I changed them both to server ipaddress. Now the FLINK JOB link has full :8081 link and displays Apache Flink Dashboard in browser. On Mon

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread Jark Wu
Hi, Are you using versions < 1.9.2? From the exception stack, it looks like caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0. Could you try it using 1.9.2? Best, Jark On Mon, 20 Apr 2020 at 21:00, Kurt Young wrote: > Can you reproduce this in a local program with mini-cluster?

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
Thanks Timo, I can see why this is pretty complicated to solve nicely at the moment (and in general). We will work around this for now, and looking forward to help make this better in the future! Gyula On Mon, Apr 20, 2020 at 4:37 PM Timo Walther wrote: > Hi Gyula, > > first of all the excepti

Re: Flink Serialization as stable (kafka) output format?

2020-04-20 Thread Theo Diefenthal
Hi Robert, Thank you very much for pointing me to the nice blog post. It aligns with my readings that the flink serializer is fast, outperfoms avro (especially reflect) and still supports schema evolution well. So nice done job @flink :) But as Arvid says, Avro is compatible with much more

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Timo Walther
Hi Gyula, first of all the exception ``` org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. ``` is IMHO one of the biggest shortcomings that we cu

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-20 Thread Yang Wang
Hi till, thanks for the feedback and suggestion. I think it make senses to only support flink-dist-*.jar at the first step. Just as your suggestion, the config option could be "yarn.submission.automatic-flink-dist-upload", default is true. Users could use "-yt/--yarnship" to specify a HDFS path th

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
Thanks for the clarification, we can live with this restriction I just wanted to make sure that I fully understand why we are getting these errors and if there is any reasonable workaround. Thanks again :) Gyula On Mon, Apr 20, 2020 at 4:21 PM Kurt Young wrote: > According to the current implem

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Kurt Young
According to the current implementation, yes you are right hive table source will always be bounded. But conceptually, we can't do this assumption. For example, we might further improve hive table source to also support unbounded cases, .e.g. monitoring hive tables and always read newly appeared da

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
The HiveTableSource (and many others) return isBounded() -> true. In this case it is not even possible for it to change over time, so I am a bit confused. To me it sounds like you should always be able to join a stream against a bounded table, temporal or not it is pretty well defined. Maybe there

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Kurt Young
The reason here is Flink doesn't know the hive table is static. After you create these two tables and trying to join them, Flink will assume both table will be changing with time. Best, Kurt On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra wrote: > Hi! > > The problem here is that I dont have a temp

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
Hi! The problem here is that I dont have a temporal table. I have a regular stream from kafka (with even time attribute) and a static table in hive. The Hive table is static, it doesn't change. It doesn't have any time attribute, it's not temporal. Gyula On Mon, Apr 20, 2020 at 3:43 PM godfrey

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread godfrey he
Hi Gyual, Can you convert the regular join to lookup join (temporal join) [1], and then you can use window aggregate. > I understand that the problem is that we cannot join with the Hive table and still maintain the watermark/even time column. But why is this? Regular join can't maintain the tim

StreamingFileSink to a S3 Bucket on a remote account using STS

2020-04-20 Thread orionemail
Hi, New to both AWS and Flink but currently have a need to write incoming data into a S3 bucket managed via AWS Tempory credentials. I am unable to get this to work, but I am not entirely sure on the steps needed. I can write to S3 buckets that are not 'remote' and managed by STS tempory cred

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread Kurt Young
Can you reproduce this in a local program with mini-cluster? Best, Kurt On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman wrote: > You can read this for this type error. > > > https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#

Re: Flink incremental checkpointing - how long does data is kept in the share folder

2020-04-20 Thread Yun Tang
Hi Shachar You can refer to [1] to know the directory structure. The files (usually ByteStreamStateHandle) which are not in the shared folder are exclusive state like operator state or exclusive files uploaded during each incremental checkpoint. And actually I don't understand why you would say

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-20 Thread Till Rohrmann
Thanks for the clarification Yang. Now it makes sense to me. If it makes things easier, then I would still go first with the dead simple solution to turn automatic upload of local dist off via a configuration option before trying to implement a smart solution which relies on pattern matching or so

Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread Zahid Rahman
You can read this for this type error. https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446 I would suggest you set break points in your code. Step through the code, this method should show you which

Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi Sivaprasanna, That is a compile-time error, not a runtime error. /value build is not a member of ?0 possible cause: maybe a semicolon is missing before `value build'?/. There won't be any issue with either *withRollingPolicy*() or /withBucketAssigner/(), but not both. Thanks and regards, Av

Is there a good benchmark for Flink Stream API?

2020-04-20 Thread Felipe Gutierrez
Hi community, Is there a benchmark for Flink Stream API to test varying workload using a real data set? I was reading [2] but it does not say if I can vary the workload on-the-fly neither if I can use a real data set like the Taxi Ride from NYC [1]. [1] https://training.ververica.com/setup/taxiDa

Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Sivaprasanna
Hi Averell, Can you please the complete stacktrace of the error? On Mon, Apr 20, 2020 at 4:48 PM Averell wrote: > Hi, > > I have the following code: > / StreamingFileSink > .forRowFormat(new Path(path), myEncoder) > .withRollingPolicy(DefaultRollingPolicy.

Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Averell
Hi, I have the following code: / StreamingFileSink .forRowFormat(new Path(path), myEncoder) .withRollingPolicy(DefaultRollingPolicy.create().build()) .withBucketAssigner(myBucketAssigner) .build()/ This is working fine in Flink 1.8

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-20 Thread Yang Wang
Hi till, Sorry for that i do not giving a detailed explanation of the optimization. Actually, the optimization contains the following two parts. * Use remote uploaded jars to avoid unnecessary uploading(e.g. flink-dist-*.jar, user jars, dependencies). this could be done via enriching "-yt/--yarnsh

Re: FLINK JOB

2020-04-20 Thread Som Lima
Yes exactly that is the change I am having to make. Changing FLINK JOB default localhost to ip of server computer in the browser. I followed the instructions as per your link. https://medium.com/@zjffdu/flink-on-zeppelin-part-1-get-started-2591aaa6aa47 i.e. 0.0.0.0 of zeppelin.server.addr. for

Re: FLINK JOB

2020-04-20 Thread Jeff Zhang
I see, so you are running flink interpreter in local mode. But you access zeppelin from a remote machine, right ? Do you mean you can access it after changing localhost to ip ? If so, then I can add one configuration in zeppelin side to replace the localhost to real ip. Som Lima 于2020年4月20日周一 下午

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-20 Thread Till Rohrmann
Shall we say for the first version we only can deactivate the upload of local files instead of doing some optimizations? I guess my problem is that I don't fully understand the optimizations yet. Maybe we introduce a power user config option `yarn.submission.automatic-flink-dist-upload` or so. Why

Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread 刘建刚
I am using Roaring64NavigableMap to compute uv. It is ok to us flink planner and not ok with blink planner. The SQL is as following: SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, B, C, D, E, uv(bitmap(id)) as bmp FROM person GROUP BY TUMBLE(eventTi

Re: Flink 1.10 Out of memory

2020-04-20 Thread Zahid Rahman
As you can see from the task manager tab of flink web dashboard Physical Memory:3.80 GB JVM Heap Size:1.78 GB Flink Managed Memory:128 MB *Flink is only using 128M MB which can easily cause OOM* *error.* *These are DEFAULT settings.* *I dusted off an old laptop so it only 3.8 GB RAM.* What doe

Re: FLINK JOB

2020-04-20 Thread Som Lima
I am only running the zeppelin word count example by clicking the zeppelin run arrow. On Mon, 20 Apr 2020, 09:42 Jeff Zhang, wrote: > How do you run flink job ? It should not always be localhost:8081 > > Som Lima 于2020年4月20日周一 下午4:33写道: > >> Hi, >> >> FLINK JOB url defaults to localhost >>

Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
Hi All! We hit a the following problem with SQL and trying to understand if there is a valid workaround. We have 2 tables: *Kafka* timestamp (ROWTIME) item quantity *Hive* item price So we basically have incoming (ts, id, quantity) and we want to join it with the hive table to get the total pr

Re: FLINK JOB

2020-04-20 Thread Jeff Zhang
How do you run flink job ? It should not always be localhost:8081 Som Lima 于2020年4月20日周一 下午4:33写道: > Hi, > > FLINK JOB url defaults to localhost > > i.e. localhost:8081. > > I have to manually change it to server :8081 to get Apache flink Web > Dashboard to display. > > > > > -- Best Regar

FLINK JOB

2020-04-20 Thread Som Lima
Hi, FLINK JOB url defaults to localhost i.e. localhost:8081. I have to manually change it to server :8081 to get Apache flink Web Dashboard to display.

Re: Flink incremental checkpointing - how long does data is kept in the share folder

2020-04-20 Thread Shachar Carmeli
Hi Yum I noticed that the some files are related to the checkpoint but are not mentioned in the metadata file and some of the files that are related in the metadata file (usually ByteStreamStateHandle ) are not in the share file can you explain this behaviour ? see code I was using final Savepo

Re: Flink 1.10 Out of memory

2020-04-20 Thread Lasse Nedergaard
Hi Thnaks for the reply. We Will try it out and let everybody know Med venlig hilsen / Best regards Lasse Nedergaard > Den 20. apr. 2020 kl. 08.26 skrev Xintong Song : > >  > Hi Lasse, > > From what I understand, your problem is that JVM tries to fork some native > process (if you look at t