Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Caizhi Weng
Hi Gyula. I'm afraid there is no way to override all Flink configurations currently. SQL client yaml file can only override some of the Flink configurations. Configuration entries indeed can only set Table specific configs, while deployment entires are used to set the result fetching address and

Re: RuntimeException: Could not instantiate generated class 'StreamExecCalc$23166'

2020-04-23 Thread Caizhi Weng
This plan looks indeed complicated, however it is hard to see what the SQL is doing as the plan is too long... Could you provide your SQL to us? Also, what version of Flink are you using? It seems that there is a very long method in the generated code, but Flink should have split it into many short

Re: Re: RuntimeException: Could not instantiate generated class 'StreamExecCalc$23166'

2020-04-29 Thread Caizhi Weng
t(c))).EXPR$0 AS EXPR$0, > CAST((my_test(a) ROW my_test(c))).EXPR$1 AS EXPR$1]) > > > the column `body` will be "generated" twice. > > > In my real case, the column `body` has many columns, and if the sql try to > SELECT body.EXPR$0, body.EXPR$1, ..body.EXPR$n, the

What's the best practice to determine whether a job has finished or not?

2020-05-07 Thread Caizhi Weng
Hi dear Flink community, I would like to determine whether a job has finished (no matter successfully or exceptionally) in my code. I used to think that JobClient#getJobStatus is a good idea, but I found that it behaves quite differently under different executing environments. For example, under

Re: What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Caizhi Weng
e for standalone > mode, the flink cluster is always up. > > > Caizhi Weng 于2020年5月8日周五 下午2:47写道: > >> Hi dear Flink community, >> >> I would like to determine whether a job has finished (no matter >> successfully or exceptionally) in my code. >> >&g

Re: How to use Hbase Connector Sink

2020-06-11 Thread Caizhi Weng
Hi, The stack trace indicates that your query schema does not match with your sink schema. It seems that `active_ratio*25 score` in your query is a double value, not a `ROW` you declared in your sink. op <520075...@qq.com> 于2020年6月11日周四 下午3:31写道: > hi > flink1.10,wen i want to sink data to hbase

Re: How do we debug on a local task manager

2019-08-21 Thread Caizhi Weng
Hi Raj, Have you restarted the cluster? You need to restart the cluster to apply changes in flink-config.yaml. You can also set suspend=y in the debug argument so that task managers will pause and wait for the connection of Intellij before going on. Raj, Smriti 于2019年8月22日周四 上午11:06写道: > Hello

Re: Metrics for Task States

2019-11-25 Thread Caizhi Weng
Hi Kelly, As far as I know Flink currently does not have such metrics to monitor on the number of tasks in each states. See https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html for the complete metrics list. (It seems that `taskSlotsAvailable` in the metrics list is the m

Re: Apache Flink - Throttling stream flow

2019-11-25 Thread Caizhi Weng
Hi, As far as I know, Flink currently doesn't have a built-in throttling function. You can write your own user-defined function to achieve this. Your function just gives out what it reads in and limits the speed it gives out records at the same time. If you're not familiar with user-defined funct

Re: Problem loading JDBC driver

2019-11-26 Thread Caizhi Weng
Hi Nick, The "Test" after "org.apache.derby" % "derby" % "10.15.1.3" seems suspicious. Is that intended? Nicholas Walton 于2019年11月26日周二 下午4:46写道: > Hi, > > *I have a pipeline which is sinking into an Apache Derby database, but I’m > constantly receiving the error* > > java.lang.IllegalArgument

Re: How to custom (or use) a window to specify everyday's beginning as watermark?

2019-11-26 Thread Caizhi Weng
Hi Rock, I think you can write your own trigger which fires when the date of the process time of the current record is different from that of the last record. Pinging @Jark Wu for a more professional answer. Rock 于2019年11月26日周二 下午3:37写道: > I need my job to aggregator every device's mertic as d

Re: Some doubts about window start time and end time

2019-11-27 Thread Caizhi Weng
Hi Jun, How do you define your window? Could you please show us the code? Thanks. Jun Zhang <825875...@qq.com> 于2019年11月27日周三 下午5:22写道: > , > Hi: > I defined a Tumbling window, I set the time size to one hour, and the > resulting windows are [00: 00: 00-01: 00: 00], [01: 00: 00-02: 00: 00]. ...

Re: Some doubts about window start time and end time

2019-11-27 Thread Caizhi Weng
.aggregate(new MyAggre(), new > WindowResultFunction()) > .print(); > > I add a trigger for quick output > > > On 11/27/2019 17:54,Caizhi Weng > wrote: > > Hi Jun, > > How do you define your window? Could you please show us the c

Re: Converting streaming to batch execution

2019-11-27 Thread Caizhi Weng
Hi Nick, It seems to me that the slow part of the whole pipeline is the Derby sink. Could you change it into other sinks (for example, csv sink or even a "discard everything" sink) and see if the throughput improves? If this is the case, are you using the JDBC connector? If yes, you might conside

Re: Auto Scaling in Flink

2019-11-28 Thread Caizhi Weng
Hi Akash, Flink doesn't support auto scaling in core currently, it may be supported in the future, when the new scheduling architecture is implemented https://issues.apache.org/jira/browse/FLINK-10407 . You can do it externally by cancel the job with a savepoint, update the parallelism, and resta

Re: Ho to convert flat json to nested complex json in Flink sql?

2021-07-09 Thread Caizhi Weng
Hi! You can define your sink with the following schema: CREATE TABLE kafka_sink ( employee ROW ) WITH ( 'connector' = 'kafka', 'format' = 'json' // other properties... ); You can also insert into this sink with the following SQL: INSERT INTO kafka_sink SELECT ROW(id, name) FROM kafka_so

Re: Custom type for env.readCsvFile

2021-07-11 Thread Caizhi Weng
Hi! If your custom type is a POJO then readCsvFile should work, otherwise as far as I know there is no way to let the parser directly parse a custom type for you. You can of course implement your own csv input format by extending GenericCsvInputFormat but I think it would be better to just read th

Re: Flink UDF Scalar Function called only once for all rows of select SQL in case of no parameter passed

2021-07-13 Thread Caizhi Weng
Hi! You should override the isDeterministic() method and return false. The default return value of this method is true. >From the java doc of this method: > Furthermore, return false if the planner should always execute this > function on the cluster side. In other words: the planner should not

Re: Stateful Functions PersistentTable duration

2021-07-13 Thread Caizhi Weng
Hi By PersistentTable do you mean state backend? If yes, the answer differs with different operators and state backends. For keyed states the duration is for per key. However the exact time to clean up a key really depends on the operator and the state backend. Most operators will register a time

Re: Watermark UI after checkpoint failure

2021-07-18 Thread Caizhi Weng
Hi! This does not sound like an expected behavior. Could you share your code / SQL and flink configuration so that others can help diagnose the issue? Dan Hill 于2021年7月19日周一 下午1:41写道: > After my dev flink job hits a checkpoint failure (e.g. timeout) and then > has successful checkpoints, the fl

Re: Can we share state between different keys in the same window?

2021-07-20 Thread Caizhi Weng
Hi! For this use case it seems that we'll either have to use a custom connector for that external database (if currently there is no such connector in Flink), or have to first read the data into some other sources supported by Flink. Sweta Kalakuntla 于2021年7月21日周三 上午10:39写道: > That is my unders

Re: Kafka data sources, multiple interval joins and backfilling

2021-07-20 Thread Caizhi Weng
Hi! Streaming joins will not throw away records in the state unless it exceeds the TTL. Have you tried increasing the parallelism of join operators (and maybe decrease the parallelism of the large Kafka source)? Dan Hill 于2021年7月21日周三 上午4:19写道: > Hi. My team's flink job has cascading interval

Re: connect to schema registry kafka SSL with flink 1.11.2

2021-07-20 Thread Caizhi Weng
Hi! You'll need to set props.put("schema.registry.basic.auth.user.info", ":"); tkg_cangkul 于2021年7月21日周三 上午12:06写道: > Hi, > > > i'm trying to connect to kafka with schema registry that using SSL with > flink 1.11.2 > > i've got this error message when i try to submit the job. > > Caused by: io

Re: Move already processed file from one folder to another folder in flink

2021-07-23 Thread Caizhi Weng
Hi! JobListener#onJobExecuted might help, if your job is not a forever-running streaming job. See https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html Samir Vasani 于2021年7月23日周五 下午3:22写道: > Hi, > > I am a new bee to flink and facing so

Re: Move already processed file from one folder to another folder in flink

2021-07-23 Thread Caizhi Weng
DF monitors the data flowing through it, and if it gets the EOF record it moves the file. Samir Vasani 于2021年7月23日周五 下午3:44写道: > Hi Caizhi Weng, > > Thanks for your input. > I would explain the requirement in little more detail. > Flink pipeline will be running forever (until som

Re: Move already processed file from one folder to another folder in flink

2021-07-25 Thread Caizhi Weng
UDF as I did not understand it. >> >> Thanks & Regards, >> Samir Vasani >> >> >> >> On Fri, Jul 23, 2021 at 1:22 PM Caizhi Weng wrote: >> >>> Hi! >>> >>> In this case it won't work, as JobListener#onJobExecuted will o

Re: How to keep flink batch job running continously on local

2021-07-25 Thread Caizhi Weng
Hi! Flink 1.7.2 is a very dated version. I suppose by batch job you mean DataSet API? A socket source might be what you need. However I'm not sure if the provided socket source in 1.7 supports DataSet API. If not you might need to write your own socket source. Samir Vasani 于2021年7月24日周六 下午9:31写

Re: FlinkKinesis consumer

2021-07-25 Thread Caizhi Weng
Hi! It's stated on the line just below that in the document. It is recommended to monitor the shard distribution and adjust assignment > appropriately. A custom assigner implementation can be set via > setShardAssigner(KinesisShardAssigner) to optimize the hash function or use > static overrides

Re: foreach exec sql

2021-07-27 Thread Caizhi Weng
Hi! Try this: sql.zipWithIndex.foreach { case (sql, idx) => val result = tableEnv.executeSql(sql) if (idx == 7) { result.print() } } igyu 于2021年7月27日周二 下午4:38写道: > tableEnv.executeSql(sql(0)) > tableEnv.executeSql(sql(1)) > tableEnv.executeSql(sql(2)) > tableEnv.ex

Re: Session Windows should have a max size

2021-07-28 Thread Caizhi Weng
Hi! You can use DynamicProcessingTimeSessionWindows with your own SessionWindowTimeGapExtractor implementation. You can count the number of records processed in the extractor and return a time gap of almost zero (but not exactly zero, as it is invalid) if the number of records exceeds the limit.

Re: Over Window Aggregation Tuning

2021-08-01 Thread Caizhi Weng
Hi! As the state grows the processing speed will slow down a bit. Which state backend are you using? Is mini batch enabled[1]? [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-mini-batch-enabled Wanghui (HiCampus) 于2021年7月30日周五 下午3:59写道: > Hi : > > W

Re: Flink 1.13 Tumble Window Setting Time Attribute Column

2021-08-02 Thread Caizhi Weng
Hi! The precision of time attribute can only be 3, you can try casting the proctime column to TIMESTAMP(3) and that should work. Pranav Patil 于2021年8月3日周二 上午8:51写道: > Hi, > > I'm upgrading a repository from Flink 1.11 to Flink 1.13. I have Flink SQL > command that used to do tumbling windows us

Re: Flink SQL support array transform function

2021-08-03 Thread Caizhi Weng
Hi! Currently there is no such built-in function in Flink SQL. You can try to write your own user-defined function[1] to achieve this. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/ Xuekui 于2021年8月3日周二 下午3:22写道: > Hi, > > I'm using Flink SQL and

Re: Output[StreamRecord[T]] thread safety

2021-08-03 Thread Caizhi Weng
Hi! As far as I know, this output.collect thingy is not thread safe, and you should never run your operator's main logic (from reading in the record to writing the results out) in a separated thread. Flink's runtime expect the whole operator chain to run in a single thread. Yuval Itzchakov 于2021

Re: Output[StreamRecord[T]] thread safety

2021-08-03 Thread Caizhi Weng
Hi! It is not documented. If you're interested you can refer to OperatorChain class and StreamTask class.

Re: Understanding the semantics of SourceContext.collect

2021-08-04 Thread Caizhi Weng
Hi! There is no such guarantee unless the whole DAG is a single node. Flink's runtime runs the same node (task) in the same thread, while different nodes (tasks) are executed in different threads, even in different machines. Yuval Itzchakov 于2021年8月5日周四 上午2:10写道: > Hi, > > I have a question reg

Re: Understanding the semantics of SourceContext.collect

2021-08-04 Thread Caizhi Weng
needed. Caizhi Weng 于2021年8月5日周四 上午10:19写道: > Hi! > > There is no such guarantee unless the whole DAG is a single node. Flink's > runtime runs the same node (task) in the same thread, while different nodes > (tasks) are executed in different threads, even in different machines.

Re: KafkaDeserializationSchema.open() is not called after task state change

2021-08-08 Thread Caizhi Weng
Hi! This does not sound like an expected behavior. However there might be a lot of reasons causing some values to be uninitialized (for example, once I've met a bug that a thread is created and runs in the open method before some values are initialized). You can always add some log at the beginnin

Re: Approach to test custom Source/Sink

2021-08-09 Thread Caizhi Weng
Hi! Currently there is no general principle for testing sources and sinks. However you might want to check out the unit tests and IT cases for Flink connectors. For example flink-connector-jdbc module has a lot of tests for the JDBC source and sink. Follow the ideas in these tests should be enough

Re: [External] Re: KafkaDeserializationSchema.open() is not called after task state change

2021-08-09 Thread Caizhi Weng
reply. > I already have some traces in the open() methods, and I do not see that it > is being called. > We are using Flink version 1.11.2. > Should I open a bug for that? > > > On Mon, Aug 9, 2021 at 5:24 AM Caizhi Weng wrote: > >> Hi! >> >> This does no

Re: Sliding window with filtering

2021-08-10 Thread Caizhi Weng
Hi! It seems that you want to filter some record out before they go into the window. So why not filter them before applying the window? Dario Heinisch 于2021年8月10日周二 下午6:26写道: > Hey there, > > So I have a stream of data, let the stream be a_1, a_2, a_3, a_4, a_5. > Now I would like to have a sli

Re: Scaling Flink for batch jobs

2021-08-15 Thread Caizhi Weng
Hi! if I use parallelism of 2 or 4 - it takes the same time. > It might be that there is no data in some parallelisms. You can click on the nodes in Flink web UI and see if it is the case for each parallelism, or you can check out the metrics of each operator. if I don't increase parallelism and

Re: Handling HTTP Requests for Keyed Streams

2021-08-16 Thread Caizhi Weng
Hi! As you mentioned that the configuration fetching is very infrequent, why don't you use a blocking approach to send HTTP requests and receive responses? This seems like a more reasonable solution to me. Rion Williams 于2021年8月17日周二 上午4:00写道: > Hi all, > > I've been exploring a few different o

Re: Can i contribute for flink doc ?

2021-08-16 Thread Caizhi Weng
Hi! Thanks for your interest in contributing to Flink. Currently most of the committers are busy with the upcoming Flink 1.14 so there might be few people having their eyes on the new PRs, especially if they do not exist in a JIRA issue. Please follow Jing Zhang's advice by first creating the cor

Re: Timer Service vs Custom Triggers

2021-08-19 Thread Caizhi Weng
Hi! If you'd like to aggregate something on the records before time out, then you want to consider using session window (instead of writing your own trigger). However if aggregation is not needed I would prefer using a process function to process watermark by myself, as the registered timer in the

Re: Theory question on process_continously processing mode and watermarks

2021-08-19 Thread Caizhi Weng
Hi! FileProcessingMode.PROCESS_CONTINUOUSLY means to continuously scans the file for updates, and there should be nothing to do with stopping the streaming job. I'm suspecting that in the column you defined the watermark there is some data which exceeds Long.MAX_VALUE. A Long.MAX_VALUE watermark

Re: How can I build the flink docker image from source code?

2021-08-19 Thread Caizhi Weng
Hi! If you only modified Java code, use mvn clean package to build Flink from source code. After that COPY all jars in flink-dist/target/flink-/lib to the lib directory of the latest Flink image. Chenyu Zheng 于2021年8月19日周四 下午7:36写道: > Hi contributors, > > > > I’ve changed a little bit code in f

Re: DataStream to Table API

2021-08-19 Thread Caizhi Weng
Hi! This is because TypeExtractor#getMapReturnTypes are not dealing with row types (see that method and also TypeExtractor#privateGetForClass). You might want to open a JIRA ticket for this. Matthias Broecheler 于2021年8月20日周五 上午7:01写道: > Hey Flinkers, > > I am trying to follow the docs >

Re: How can I achieve 'sink.partition-commit.policy.kind'='metastore,success-file' with batch Hive sink?

2021-08-19 Thread Caizhi Weng
Hi! As far as I know Flink batch jobs will not add the _SUCCESS file. However for batch jobs you can register a JobListener and add the _SUCCESS file by yourself in JobListener#onJobExecuted. See registerJobListener method in StreamExecutionEnvironment. Yik San Chan 于2021年8月20日周五 上午10:26写道: > H

Re: DataStream to Table API

2021-08-19 Thread Caizhi Weng
Hi! I've created a JIRA ticket[1] for this issue. Please check it out and track the progress there. [1] https://issues.apache.org/jira/browse/FLINK-23885 Caizhi Weng 于2021年8月20日周五 上午10:47写道: > Hi! > > This is because TypeExtractor#getMapReturnTypes are not dealing with row >

Re: aggregation, triggers, and no activity

2021-08-22 Thread Caizhi Weng
Hi! If I'm not mistaken, you would like your window to be triggered every 15 minutes, or if there are no activity for 15 minutes? This seems like an integration of tumbling window and session window. You can refer to ProcessingTimeSessionWindows for the implementation of a session window and modi

Re: Sending the partition request to 'null' failed on Kubernetes cluster, session mode

2021-08-22 Thread Caizhi Weng
Hi! This might be that some task managers cannot reach out to the job manager in time. Has any of the task manager instance restarted after this failure? If yes, what does the log (Flink log and kubernetes log) of the failed task manager say? Zbyszko Papierski 于2021年8月20日周五 下午11:07写道: > Hi! > >

Re: Using RMQ connector in pyflink

2021-08-22 Thread Caizhi Weng
Hi! You can first use the Table & SQL API to create a RMQ source table[1]. Then you can use the to_data_stream method in TableEnvironment to change the table to a data stream. [1] https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/python/table/python_table_api_connectors/ Nadia Mos

Re: Creating a generic ARRAY_AGG aggregate function for Flink SQL

2021-08-23 Thread Caizhi Weng
Hi! As far as I know, returning an array from the getValue method containing external data format is OK. Flink will do the conversion for you. Are you faced with any exception when using this array_agg? If yes what's the exception stack? You can also open a JIRA ticket to require a built-in supp

Re: Flink SQL: Custom exception handling External

2021-08-25 Thread Caizhi Weng
Hi! As far as I know JDBC does not have this error handling mechanism. Also there are very few connectors / formats which support skipping erroneous records (for example the csv format). Which type of exception are you faced with? As JDBC connectors, unlike message queue connectors, rarely (if ev

Re: Do we have date_sub function in flink sql?

2021-08-25 Thread Caizhi Weng
Hi! Try this ts - interval '1' day where ts is your timestamp or date column. 1095193...@qq.com <1095193...@qq.com> 于2021年8月25日周三 下午5:20写道: > Hi >I want to substract 1 day from current date with Flink sql. Do we have > this function like date_sub()? > > -- > 1

Re: Not able to avoid Dynamic Class Loading

2021-08-25 Thread Caizhi Weng
Hi! What Flink version are you using? In current Flink code base FlinkKafkaConsumer does not contain fields related to Avro. Jars in usrlib has a higher priority to be loaded than jars in lib. So if there is another FlinkKafkaConsumer class in your user jar then it might affect class loading and

Re: 1.9 to 1.11 Managed Memory Migration Questions

2021-08-25 Thread Caizhi Weng
Hi! Why does this ~30% memory reduction happen? I don't know how memory is calculated in Flink 1.9 but this 1.11 memory allocation result is reasonable. This is because managed memory, network memory and JVM overhead memory in 1.11 all has their default sizes or fractions (managed memory 40%, ne

Re: Flink SQL: Custom exception handling External

2021-08-25 Thread Caizhi Weng
道: > Hi, > > Thanks for the quick response. > The use case is not specific to JDBC (JDBC is just an example) but more > for custom error handling in all connectors. > How would we go about proposing such a new feature to be added to Flink? > > On 2021/08/25 09:02:31, Ca

Re: 1.9 to 1.11 Managed Memory Migration Questions

2021-08-26 Thread Caizhi Weng
jects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-task-off-heap-size > > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_setup.html#configure-off-heap-memory-direct-or-native > > > > *// *ah > > > > *From:* Caiz

Re: Any one can help me? How to connect offline hadoop cluster and realtime hadoop cluster by different hive catalog?

2021-08-26 Thread Caizhi Weng
Hi! It seems that your Flink cluster cannot connect to realtime-cluster-master001/xx.xx.xx.xx:8050. Please check your network and port status. Jim Chen 于2021年8月27日周五 下午2:20写道: > Hi, All > My flink version is 1.13.1 and my company have two hadoop cluster, > offline hadoop cluster and realtime

Re: Flink performance with multiple operators reshuffling data

2021-08-30 Thread Caizhi Weng
Hi! Key-by operations can scale with parallelisms. Flink will shuffle your record to different sub-task according to the hash value of the key modulo number of parallelism, so the more parallelism you have the faster Flink can process data, unless there is a data skew. Jason Liu 于2021年8月31日周二 上午

Re: Session windows - how to get the last value from a window using FlinkSQL?

2021-08-31 Thread Caizhi Weng
Hi! Yes it is, by using the last_value() aggregate function. For example SELECT last_value(v) FROM T GROUP BY k, session(ts, interval '1' minute) Michał Rudko 于2021年8月31日周二 下午11:40写道: > Hi, > > I had trouble to find in the documentation of Flink/Ververica which > aggregations or analytical fun

Re: Clarifying Documentation on Custom Triggers

2021-08-31 Thread Caizhi Weng
Hi! I don't quite understand this problem. But if you look into WindowedStream#trigger you'll find that the trigger of WindowOperatorBuilder will change when you call that method, and thus the default trigger will be overwritten by calling WindowedStream#trigger. Aeden Jameson 于2021年9月1日周三 上午12:

Re: when does app to print

2021-08-31 Thread Caizhi Weng
Hi! With streaming API you'll need to call env.execute() at the end to start the job. igyu 于2021年9月1日周三 上午11:29写道: > this is my data > > {"timestamp":"2021-08-01 15:31:56,895","msg":" INFO > org.apache.hadoop.hive.metastore.HiveMetaStore: [pool-9-thread-97]: 88: > get_table : db=hivetest tbl=ch

Re: De/Serialization API to tear-down user code

2021-09-01 Thread Caizhi Weng
Hi! The (De)serializationSchema is only a helper for changing the data object to another format. What's your use case? If you're creating a (De)serializationSchema for a source / sink you might want to open and close the resources in the open / close methods of the source / sink, not in the (De)se

Re: Flink restarts on Checkpoint failure

2021-09-01 Thread Caizhi Weng
Hi! There are a ton of possible reasons for a checkpoint failure. The most possible reasons might be * The JVM is busy with garbage collecting when performing the checkpoints. This can be checked by looking into the GC logs of a task manager. * The state suddenly becomes quite large due to some sp

Re: Clarifying Documentation on Custom Triggers

2021-09-01 Thread Caizhi Weng
n becomes > where are those timers coming from? The documentation states, > > "By specifying a trigger using trigger() you are overwriting the default > trigger of a WindowAssigner." > > > This is the puzzling part to me about the above statement. > > Thanks

Re: Reuse in Blink execution plan

2021-09-02 Thread Caizhi Weng
Hi! Reusing common sub-plans are an optimization of Flink. Flink is really reusing them in runtime and the results of the reused tasks are calculated only once. Vasily Melnik 于2021年9月2日周四 下午6:32写道: > > Hi all. > > Using SQL with blink planner for batch calculations, i see *Reused* > nodes in Op

Re: Verifying max-parallelism value

2021-09-02 Thread Caizhi Weng
Hi! Do you mean pipeline.max-parallelism or any other config options? If yes you should see it in the "Job Manager > Configuration" page of Flink UI. Which config option are you setting and how do you set that? Niklas Wilcke 于2021年9月3日周五 上午12:53写道: > Hi Flink community, > > most likely I'm mis

Re: Triggers for windowed aggregations in Table API

2021-09-02 Thread Caizhi Weng
Hi! You might want to use your custom trigger to achieve this. Tumble windows are using EventTimeTrigger by default. Flink has another built-in trigger called CountTrigger but it only fires for every X records, ignoring the event time completely. You might want to create your own trigger to combi

Re: Job manager crash

2021-09-05 Thread Caizhi Weng
Hi! There is a message saying "java.lang.NoClassDefFound Error: org/apache/hadoop/hdfs/HdfsConfiguration" in your log file. Are you visiting HDFS in your job? If yes it seems that your Flink distribution or your cluster is lacking hadoop classes. Please make sure that there are hadoop jars in the

Re: pyflink table to datastream

2021-09-05 Thread Caizhi Weng
Hi! I don't quite understand this question, but I suppose you first run the table program and then run the data stream program and you want the results of the two programs to be identical? If this is the case, the job will run twice as Flink will not cache the result of a job, so in each run the

Re: Broadcast data to all keyed streams

2021-09-06 Thread Caizhi Weng
Hi! Your RefDataPriceJoiner should implement KeyedBroadcastProcessFunction instead of KeyedCoProcessFunction. See the Java docs of DataStream#connect. What's your Flink version by the way? James Sandys-Lumsdaine 于2021年9月7日周二 上午12:02写道: > Hello, > > I have a Flink workflow which is partitioned o

Re: When using the batch api, the sink task is always in the created state.

2021-09-06 Thread Caizhi Weng
Hi! If you mean batch SQL then you'll need to prepare enough task slots for all subtasks. The number of task slots needed is the sum of parallelism of all subtasks as there is no slot reusing in batch jobs. lec ssmi 于2021年9月7日周二 下午2:13写道: > And My flink version is 1.11.0 > > lec ssmi 于2021年9月7

Re: When using the batch api, the sink task is always in the created state.

2021-09-06 Thread Caizhi Weng
lt;= 1.11) or "ALL_EDGES_PIPELINED" (Flink >= 1.12). Caizhi Weng 于2021年9月7日周二 下午2:47写道: > Hi! > > If you mean batch SQL then you'll need to prepare enough task slots for > all subtasks. The number of task slots needed is the sum of parallelism of > all subtasks as there is

Re: When using the batch api, the sink task is always in the created state.

2021-09-07 Thread Caizhi Weng
ply! > But doesn't flink use stream to perform batch calculations? As you said > above, to some extent, it is same as real batch computing eg.spark . > > Caizhi Weng 于2021年9月7日周二 下午2:53写道: > >> My previous mail intends to answer what is needed for all subtasks in a >&

Re: JVM Metaspace capacity planning

2021-09-13 Thread Caizhi Weng
Hi! Which API are you using? The datastream API or the Table / SQL API? If it is the Table / SQL API then some Java classes for some operators (for example aggregations, projection, filter, etc.) will be generated when compiling user code to executable Java code. These Java classes are new to the

Re: Error while fetching data from Apache Kafka

2021-09-13 Thread Caizhi Weng
Hi! This seems to be caused by some mismatching types in your source definition and your workflow. If possible could you describe the schema of your Kafka source and paste your datastream / Table / SQL code here? Dhiru 于2021年9月14日周二 上午3:49写道: > *I am not sure when we try to receive data from Ap

Re: Table program cannot be compiled. This is a bug. Please file an issue

2021-09-13 Thread Caizhi Weng
Hi! This is because Java has a maximum method length of 64 KB. For Flink <= 1.13 please set table.generated-code.max-length to less than 65536 (~8192 is preferred) to limit the length of each generated method. If this doesn't help, we've (hopefully) completely fixed this issue in Flink 1.14 by cr

Re: Stream join with (changing) dimension in Kafka

2021-09-22 Thread Caizhi Weng
Hi! What type of time attribute is u_ts? If it is an event time attribute then this query you're running is an event time temporal table join, which will pause outputting records until the watermark from both inputs has risen above the row time of that record. As the dimension table is changing q

Re: byte array as keys in Flink

2021-09-24 Thread Caizhi Weng
Hi! It depends on the state backend you use. For example if you use a heap memory state backend it is backed by a hash map and it uses the hash code of byte[] to compare the two byte[] (see HeapMapState#put). However for rocksdb state backend it uses the serialized bytes (that is to say, the conte

Re: Relation between Flink Configuration and TableEnv

2021-09-24 Thread Caizhi Weng
Hi! TableConfig is for configurations related to the Table and SQL API, especially the configurations in OptimizerConfigOptions and ExecutionConfigOptions. By Flink Configuration I guess you mean the configuration in Configuration. Sadly, as you say, it can be configured only once when creating t

Re: Write Streaming data to S3 in Parquet files

2021-09-25 Thread Caizhi Weng
Hi! Try the PARTITIONED BY clause. See https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/formats/parquet/ Harshvardhan Shinde 于2021年9月24日周五 下午5:52写道: > Hi, > I wanted to know if we can write streaming data to S3 in parquet format > with partitioning. > Here's what I w

Re: Exceeded Checkpoint tolerable failure threshold Exception

2021-10-07 Thread Caizhi Weng
Hi! You need to look into the root cause of checkpoint failure. You can see the "Checkpoint" tab to see if checkpointing timeout occurs or see the "Exception" tab for exception messages other than this one. You can also dive into the logs for suspicious information. If checkpoint failures are rar

Re: jdbc connector configuration

2021-10-07 Thread Caizhi Weng
Hi! These configurations are not required to merely read from a database. They are here to accelerate the reads by allowing sources to read data in parallel. This optimization works by dividing the data into several (scan.partition.num) partitions and each partition will be read by a task slot (n

Re: Can BroadcastProcessFunction invoke both methods concurrently?

2021-10-07 Thread Caizhi Weng
Hi! Just like what you said, they won't be invoked concurrently. Flink is using the actor model in runtime so methods in operators won't be called at the same time. By the caching layer I suppose you would like to store the broadcast messages into the java map for some time and periodically store

Re: Installing signalfx on flink image

2021-10-08 Thread Caizhi Weng
Hi! Just as you said, you need to build your own custom image based on the official Flink image and copy the signalfx jar into the lib directory. Deniz Koçak 于2021年10月9日周六 上午3:58写道: > Hi, > > In order to install SignalFX on Flink it says on Flink Packages page [1] > `In order to use this report

Re: Flink production configuration

2021-10-08 Thread Caizhi Weng
Hi! The number of task slots needed is the total number of parallelisms of all the jobs running at the same time (if all the jobs are streaming jobs). The number of task managers is the total number of task slots divide the number of task slots per task manager (taskmanager.numberOfTaskSlots). So

Re: OVER IGNORE NULLS support

2021-10-08 Thread Caizhi Weng
Hi! Currently all built-in aggregate functions ignore null input values, so I guess this is the reason why Flink didn't support this syntax. I'm sort of curious about this syntax. Does it come from the SQL standard? What's the opposite of IGNORE NULLS? Is there a NOT IGNORE NULLS and if the user

Re: Does the flink sql support checkpoints

2021-10-11 Thread Caizhi Weng
(Forwarding this to the user mailing list as this mail is written in English) Hi! I think problem 1 is the expected behavior. Is this behavior inconvenient for you? If yes why it is the case? For problem 2, could you explain in detail how do you run the word count program and where do you store

Re: Timeout settings for Flink jobs?

2021-10-11 Thread Caizhi Weng
Hi! There is currently no such setting. You need to rely on an external system to read the execution time (from Flink's job information, see [1]) and cancel the job once it exceeds the time limit. Could you elaborate more on your use case? Are you running a streaming job or a batch job? For strea

Re: Yarn job not exit when flink job exit

2021-10-11 Thread Caizhi Weng
Hi! yarn-cluster is the mode for a yarn session cluster, which means the cluster will remain even after the job is finished. If you want to finish the Flink job as well as the yarn job, use yarn-per-job mode instead. Jake 于2021年10月9日周六 下午5:53写道: > Hi > > When submit job in yarn-cluster model, f

Re: Checkpoint size increasing even i enable increasemental checkpoint

2021-10-11 Thread Caizhi Weng
Hi! Checkpoint sizes are highly related to your job. Incremental checkpointing will help only when the values in the state are converging (for example a distinct count aggregation). If possible, could you provide your user code or explain what jobs are you running? Lei Wang 于2021年10月11日周一 下午4:1

Re: jdbc connector configuration

2021-10-12 Thread Caizhi Weng
Caizhi! >> >> On Thu, Oct 7, 2021 at 7:27 PM Caizhi Weng wrote: >> >>> Hi! >>> >>> These configurations are not required to merely read from a database. >>> They are here to accelerate the reads by allowing sources to read data in >>&g

Re: How to refresh topics to ingest with KafkaSource?

2021-10-13 Thread Caizhi Weng
Hi! I suppose you want to read from different topics every now and then? Does the topic-pattern option [1] in Table API Kafka connector meet your needs? [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/kafka/#topic-pattern Preston Price 于2021年10月14日周四 上午1:34写道:

Re: Migrating createTemporaryView to new Table api.

2021-10-13 Thread Caizhi Weng
Hi! To implement the renaming of fields with the new API, try this: tableEnv.createTemporaryView( "AgentStream", inputStream, Schema.newBuilder() .columnByExpression("useragent", "f0") .columnByExpression("expectedDeviceClass", "f1")

Re: dataStream can not use multiple classloaders

2021-10-17 Thread Caizhi Weng
Hi! There is only one classloader for user code by default in runtime. The main method of your code is only executed on the client side. It generates a job graph and sends it to the cluster. To avoid class loading conflict it is recommended to shade the dependencies of your source and sink functi

Re: Catching SIGINT With flink Jobs

2021-10-17 Thread Caizhi Weng
Hi! This is generally "how to capture SIGINT in Java". See [1] for the answer. By the way, can you briefly explain why you want to do this in your custom source and sink? [1] https://stackoverflow.com/questions/2541475/capture-sigint-in-java Vijay Bhaskar 于2021年10月16日周六 下午10:54写道: > Can we re

Re: Display time in UTC on the UI

2021-10-17 Thread Caizhi Weng
Hi! Append -Duser.timezone=UTC to env.java.opts.jobmanager and env.java.opts.taskmanager. These two configurations are the Java options to start JVM of the jobmanager / taskmanager (see [1]). Note that this essentially changes the time zone of the whole JVM. So if you have any time zone related o

  1   2   3   >