Rolling File Sink Exception

2018-09-03 Thread clay4444
When I want to write compressed string data to hdfs, I found that flink only provides StringWritter, so I used a custom writter, as follows: public class StringCompressWriter extends StreamWriterBase { private static final long serialVersionUID = 1L; private String charsetName; priv

Re: Flink parquet read.write performance

2018-09-05 Thread clay4444
hi Regina I've just been using flink, and recently I've been asked to store Flink data on HDFS in parquet format. I've found several examples in GitHub and the community, but there are always bugs. I see your storage directory, and that's what I want, so I'd like to ask you to reply to me for a sl

Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath.

2018-09-25 Thread clay4444
hi: I am using flink's table api, I receive data from kafka, then register it as a table, then I use sql statement to process, and finally convert the result back to a stream, write to a directory, the code looks like this: def main(args: Array[String]): Unit = { val sEnv = StreamExecutionEnv

Re: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath.

2018-09-25 Thread clay4444
hi Till: thank you for your reply there are some comments: I summit my task to yarn with following command: ./bin/flink run -c org.clay.test.TestX flinkTools-1.0.jar my pon look like this: 1.6.0 provided scala-tools.org Sc

Re: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath.

2018-09-25 Thread clay4444
hi Till: I have solve the problem, this reason is the flink-json which is add to pom didn't work must copy the flink-json-xxx.jar to flink path ./lib/ ... -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

About the retract of the calculation result of flink sql

2018-09-29 Thread clay4444
Hi everyone, I am having some problems in the process of using flink sql, my sql is as follows: SELECT COUNT(DISTINCT mm.student_id),sum(price) FROM ( SELECT a.student_id,a.confirm_date_time,sum(a.real_total_price)as price FROM ( SELECT DISTINCT po.id as order_id ,po.student_id ,po.create_id ,po.

Re: About the retract of the calculation result of flink sql

2018-09-29 Thread clay4444
My final calculation result is implemented in the following way when writing to kafka, because KafkaTableSink does not support retract mode, I am not sure whether this method will affect the calculation result. val userTest: Table = tEnv.sqlQuery(sql) val endStream = tEnv.toRetractStream[Row](use

Re: About the retract of the calculation result of flink sql

2018-10-01 Thread clay4444
hi,Hequn I don't understand you about the group by and non-keyed group by. Can you explain it in a little more detail, or give me an example, thank u . clay, -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: About the retract of the calculation result of flink sql

2018-10-01 Thread clay4444
hi,Timo I use env.setParallelism(1) in my code, I set the overall degree of parallelism of the program to 1, so that some calculations will still be parallelized? clay, -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

About the issue caused by flink's dependency jar package submission method

2018-11-20 Thread clay4444
hi all: I know that when submitting flink jobs, flink's official recommendation is to put all the dependencies and business logic into a fat jar, but now our requirement is to separate the business logic and rely on dynamic commits, so I found one. One way, use the -yt and -C parameters to submit

Re: About the issue caused by flink's dependency jar package submission method

2018-11-20 Thread clay4444
hi I have checked all the dependences, and don't find the jar with different version, so ,I double the way to submit jar has some issue? my commend is like this: /data/flink1.6/bin/flink run -m yarn-cluster -ytm 8032 -yn 1 -ys 1 -yqu -yt /data/flink1.6//lib -c com.xxx.xxx.xxx.Launch -C

Re: About the issue caused by flink's dependency jar package submission method

2018-11-21 Thread clay4444
hi yinhua, I consirdered about that way,but I don't think that way is suitable , because I want each flink job has its own business logic and dependence jar ,separate from other job, that's what I want to do, -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

If you are an expert in flink sql, then I really need your help...

2018-12-03 Thread clay4444
I am using flink sql to do some complicated calculations. I have encountered some very difficult problems in this process, so I would like to ask everyone for your help. My goal is to build a data stream with a very accurate result, which is also in line with the Streaming System. The core id

Re: If you are an expert in flink sql, then I really need your help...

2018-12-03 Thread clay4444
hi Timo: The LAST_VALUE function simply groups by id and then takes the latest row of data for each primary key. I was inspired by this answer: https://stackoverflow.com/questions/48554999/apache-flink-how-to-enable-upsert-mode-for-dynamic-tables Its implementation is also very simple: class Mid

Re: If you are an expert in flink sql, then I really need your help...

2018-12-03 Thread clay4444
I have found out that checkpoint is not triggered. Regarding the in operation in flink sql, this sql will trigger checkpoint normally. select name,age from user where id in (5102,597816,597830,597817,597818,597819,597805,27,597820,597821,597822,597823,597825,597826,597827,597828,597839,597831,59

Re: If you are an expert in flink sql, then I really need your help...

2018-12-04 Thread clay4444
hi Timo: first very thank u, I have solve the ploblems, Regarding the problem of too large state, I set the global parallelism to 7 for the program, which solved my problem very well, checkpoint is very fast, but I would like to ask if there is a way to set parallelism for each operator(translat