How to override flink-conf parameters for SQL Client

2020-03-05 Thread Gyula Fóra
Hi All! I am trying to understand if there is any way to override flink configuration parameters when starting the SQL Client. It seems that the only way to pass any parameters is through the environment yaml. There I found 2 possible routes: configuration: this doesn't work as it only sets Tab

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Caizhi Weng
Hi Gyula. I'm afraid there is no way to override all Flink configurations currently. SQL client yaml file can only override some of the Flink configurations. Configuration entries indeed can only set Table specific configs, while deployment entires are used to set the result fetching address and

Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-05 Thread Yang Wang
Hi Andrey, Thanks for driving this significant FLIP. From the user ML, we could also know there are many users running Flink in container environment. Then the docker image will be the very basic requirement. Just as you say, we should provide a unified place for all various usage(e.g. session,

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Gyula Fóra
Thanks Caizhi, This seems like a pretty big shortcoming for any multi-user/multi-app environment. I will open a jira for this. Gyula On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng wrote: > Hi Gyula. > > I'm afraid there is no way to override all Flink configurations currently. > SQL client yaml f

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jark Wu
Hi Gyula, Flink configurations can be overrided via `TableConfig#getConfiguration()`, however, SQL CLI only allows to set Table specific configs. I will think it as a bug/improvement of SQL CLI which should be fixed in 1.10.1. Best, Jark On Thu, 5 Mar 2020 at 18:12, Gyula Fóra wrote: > Thanks

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jingsong Li
Hi Gyula, Maybe Blink planner has invoked "StreamExecutionEnvironment.configure", which planner do you use? But "StreamExecutionEnvironment.configure" is only for partial configuration, can not for all configuration in flink-conf.yaml. So what's the config do you want to set? I know some config l

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Gyula Fóra
I could basically list a few things I want to set (execution.target for example), but it's fair to assume that I would like to be able to set anything :) Gyula On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li wrote: > Hi Gyula, > > Maybe Blink planner has invoked "StreamExecutionEnvironment.configur

Setting the operator-id to measure percentile latency over several jobs

2020-03-05 Thread Felipe Gutierrez
Hi community, I am tracking the latency of operators in Flink according to this reference [1]. When I am using Prometheus+Grafana I can issue a query using "flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency" and I can check the percentiles of each "operator_id" and

Re: Setting the operator-id to measure percentile latency over several jobs

2020-03-05 Thread Khachatryan Roman
Hi Felipe, Please find the answers to your questions below. > Each "operator_subtask_index" means each instance of the parallel physical operator, doesn't it? Yes. > How can I set a fixed ID for the "operator_id" in my code so I can identify quickly which operator I am measuring? You are using th

Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
Hi All! Excuse my stupid question, I am pretty new to the Table/SQL API and I am trying to play around with it implementing and running a few use-cases. I have a simple window join + aggregation, grouped on some id that I want to write to Kafka but I am hitting the following error: "AppendStream

Re: Rocksdb Serialization issue

2020-03-05 Thread David Morin
Hello Arvid, After some investigations with the help of my colleague we finally found the root cause. In order to improve the init of the state, I've created some threads to parallelize the read of bucket files. This is a temporary solution because I've planned to use the State Processor API. Here

Re: Rocksdb Serialization issue

2020-03-05 Thread Arvid Heise
Hi David, could you please explain what you are actually trying to achieve? It seems like you are reading in the SinkFunction#open some files from S3 and put it into state (bootstrapping?) How many instances of the sink are executed? How do you shard the buckets / e.g. how do you avoid reading th

Re: Writing retract streams to Kafka

2020-03-05 Thread Khachatryan Roman
Hi Gyula, Could you provide the code of your Flink program, the error with stacktrace and the Flink version? Thanks., Roman On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra wrote: > Hi All! > > Excuse my stupid question, I am pretty new to the Table/SQL API and I am > trying to play around with it i

Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
Hi Roman, This is the core logic: CREATE TABLE QueryResult ( queryIdBIGINT, itemIdSTRING, quantity INT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'query.output.log.1', 'connector.properties.bootstrap.servers' = '', 'format.type' =

Re: Writing retract streams to Kafka

2020-03-05 Thread Benoît Paris
Hi Gyula, I'm afraid conversion to see the retractions vs inserts can't be done in pure SQL (though I'd love that feature). You might want to go lower level and implement a RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink [3]. This will give you a emitDataStream(DataStrea

Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
Thanks Benoît! I can see now how I can implement this myself through the provided sink interfaces but I was trying to avoid having to write code for this :D My initial motivation was to see whether we are able to write out any kind of table to Kafka as a simple stream of "upserts". I also don't c

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jeff Zhang
Hi Gyula, I am doing integration Flink with Zeppelin. One feature in Zeppelin is that user could override any features in flink-conf.yaml. (Actually any features here https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html). Of course you can run flink sql in Zeppelin, and cou

Re: Rocksdb Serialization issue

2020-03-05 Thread David Morin
Yes Arvid, the Sink is keyed by a String dbName::tableName This is kafka as input but to init the state we have to read Hive delta files febore consume kafka records. This is ORC files we have to read to init the state with one directory per table. A key (primary key) is only in one bucket file. So

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
> I also don't completely understand at this point why I can write the result of a group, tumble window aggregate to Kafka and not this window join / aggregate. If you are doing a tumble window aggregate with watermark enabled, Flink will only fire a final result for each window at once, no modifi

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
Back to this case, I assume you are expecting something like "ignore all delete messages" flag? With this flag turned on, Flink will only send insert messages which corresponding current correct results to kafka and drop all retractions and deletes on the fly. Best, Kurt On Thu, Mar 5, 2020 at 1

Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
That's exactly the kind of behaviour I am looking for Kurt ("ignore all delete messages"). As for the data completion, in my above example it is basically an event time interval join. With watermarks defined Flink should be able to compute results once in exactly the same way as for the tumbling w

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
I think the issue is not caused by event time interval join, but the aggregation after the join: GROUP BY t.itemId, q.event_time, q.queryId; In this case, there is still no chance for Flink to determine whether the groups like (itemId, eventtime, queryId) have complete data or not. As a compar

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Kurt Young
IIRC the tricky thing here is not all the config options belong to flink-conf.yaml can be adjust dynamically in user's program. So it will end up like some of the configurations can be overridden but some are not. The experience is not quite good for users. Best, Kurt On Thu, Mar 5, 2020 at 10:1

Re: How to use self defined json format when create table from kafka stream?

2020-03-05 Thread Kurt Young
User defined formats also sounds like an interesting extension. Best, Kurt On Thu, Mar 5, 2020 at 3:06 PM Jark Wu wrote: > Hi Lei, > > Currently, Flink SQL doesn't support to register a binlog format (i.e. > just define "order_id" and "order_no", but the json schema has other binlog > fields).

Re: java.time.LocalDateTime in POJO type

2020-03-05 Thread KristoffSC
Thanks, do you have any example how I could use it? Basically I have a POJO class that has LocalDateTime filed in it. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
I see, maybe I just dont understand how to properly express what I am trying to compute. Basically I want to aggregate the quantities of the transactions that happened in the 5 seconds before the query. Every query.id belongs to a single query (event_time, itemid) but still I have to group :/ Gyu

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Gyula Fóra
Kurt can you please explain which conf parameters do you mean? In regular executions (Yarn for instance) we have dynamic config parameters overriding any flink-conf argument. So it is not about setting them in the user code but it should happen before the ClusterDescriptors are created (ie in the

Re: checkpoint _metadata file has >20x different in size among different check-points

2020-03-05 Thread Congxian Qiu
Hi Maybe there contains some ByteStreamStateHandle in the checkpoint, if you want to verify this, maybe you can configure `state.backend.fs.memory-threshold` to verify it. Please be careful to set this config, because it may produce many files with small size. Best, Congxian Arvid Heise 于2020年

Re: Setting the operator-id to measure percentile latency over several jobs

2020-03-05 Thread Felipe Gutierrez
thanks! I was wondering why the operator name is not implemented for the latency metrics, because for the other metrics it is implemented. but thanks anyway! *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com

RE: Re: Teradata as JDBC Connection

2020-03-05 Thread Norm Vilmer (Contractor)
Thanks for the reply, Arvid. I changed the property names in my ConnectorDescriptor subclass to match what the validator wanted and now get: “Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath. Reason: No factory supports all propert

Flink Deployment failing with RestClientException

2020-03-05 Thread Samir Tusharbhai Chauhan
Hi, I am having issue where after deploying few jobs, it starts failing with below errors. I don't have such issue in other environments. What should I check first in such scenario? My environment is Azure Kubernetes 1.15.7 Flink 1.6.0 Zookeeper 3.4.10 The program finished with the following exc

How do I get the value of 99th latency inside an operator?

2020-03-05 Thread Felipe Gutierrez
Hi community, where from the Dlink code I can get the value of 99th percentile latency (flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{operator_id="93352199ce18d8917f20fdf82cedb1b4",quantile="0.99"})? Probably I will have to hack the Flink source code to export

Re: Flink Deployment failing with RestClientException

2020-03-05 Thread Andrey Zagrebin
Hi Samir, It may be a known issue [1][2] where some action during job submission takes too long time but eventually completes in job manager. Have you checked job manager logs whether there are any other failures, not “Ask timed out"? Have you checked Web UI whether all the jobs have been starte

Re: StreamingFileSink Not Flushing All Data

2020-03-05 Thread Kostas Kloudas
Thanks Austin, If the CompressionWriterFactory works for you in 1.10, then you can copy it as is in 1.9 and use it. The BulkWriter interfaces have not changed between the versions (as far as I recall). But please keep in mind that there is a bug in the CompressWriterFactory with a pending PR that

Re: Very large _metadata file

2020-03-05 Thread Kostas Kloudas
Hi Jacob, As I said previously I am not 100% sure what can be causing this behavior, but this is a related thread here: https://lists.apache.org/thread.html/r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d%40%3Cuser.flink.apache.org%3E Which you can re-post your problem and monitor for a

Weird behaviour testing TwoPhaseCommit

2020-03-05 Thread David Magalhães
I've implemented a CustomSink with TwoPhaseCommit. To test this I've create a test using the baselines of this [1] one, and it works fine. To test the integration with S3 (and with an exponential back off), I've tried to implement a new test, using the following code: ... val invalidWriter = writ

Re: Very large _metadata file

2020-03-05 Thread Jacob Sevart
Thanks, I will monitor that thread. I'm having a hard time following the serialization code, but if you know anything about the layout, tell me if this makes sense. What I see in the hex editor is, first, many HDFS paths. Then gigabytes of unreadable data. Then finally another HDFS path at the end

Re: Weird behaviour testing TwoPhaseCommit

2020-03-05 Thread Arvid Heise
Hi David, bounded sources do not work well with checkpointing. As soon as the source is drained, no checkpoints are performed anymore. It's an unfortunate limitation that we want to get rid of, but haven't found the time (because it requires larger changes). So for your test to work, you need to

Re: Weird behaviour testing TwoPhaseCommit

2020-03-05 Thread David Magalhães
Awesome Arvid, thanks a lot! :) And I thought when doing this that I was simplifying the test ... On Thu, Mar 5, 2020 at 8:27 PM Arvid Heise wrote: > Hi David, > > bounded sources do not work well with checkpointing. As soon as the source > is drained, no checkpoints are performed anymore. It's

Backpressure and 99th percentile latency

2020-03-05 Thread Felipe Gutierrez
Hi, I am a bit confused about the topic of tracking latency in Flink [1]. It says if I use the latency track I am measuring the Flink’s network stack but application code latencies also can influence it. For instance, if I am using the metrics.latency.granularity: operator (default) and setLatency

Re: Single stream, two sinks

2020-03-05 Thread Gadi Katsovich
Guys, thanks for the great advice. It works! I used HttpAsyncClient from Apache Commons. At first I tried to implement the async http client by implementing AsyncFunction. I implemented the asyncInvoke method and used try-with-resouce to instantiate the client (because it's CloseableHttpAsyncClient

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Kurt Young
If you already have a running flink cluster and you want submit another job to this cluster, then all the configurations relates to process parameters like TM memory, slot number etc are not be able to modify. Best, Kurt On Thu, Mar 5, 2020 at 11:08 PM Gyula Fóra wrote: > Kurt can you please e

Re: Single stream, two sinks

2020-03-05 Thread Austin Cawley-Edwards
We have the same setup and it works quite well. One thing to take into account is that your HTTP call may happen multiple times if you’re using checkpointing/ fault tolerance mechanism, so it’s important that those calls are idempotent and won’t duplicate data. Also we’ve found that it’s important

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jeff Zhang
There's 2 kinds of configuration: job level & cluster level. I am afraid we don't have document to differentiate that, it depends on how user understand these configuration. We may need to improve document on that. Kurt Young 于2020年3月6日周五 上午8:34写道: > If you already have a running flink cluster a

Re: Writing retract streams to Kafka

2020-03-05 Thread Jark Wu
Hi Gyula, Does tumbling 5 seconds for aggregation meet your need? For example: INSERT INTO QueryResult SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5' SECOND), sum(t.quantity) AS quantity FROM ItemTransactions AS t, Queries AS q WHERE t.itemId = q.itemId AND t.event_ti

Re: JobMaster does not register with ResourceManager in high availability setup

2020-03-05 Thread Xintong Song
Hi Abhinav, Thanks for the log. However, the attached log seems to be incomplete. The NoResourceAvailableException cannot be found in this log. Regarding connecting to ResourceManager, the log suggests that: - ZK was back to life and connected at 06:29:56. 2020-02-27 06:29:56.539 [main-Eve

Re: Re: Teradata as JDBC Connection

2020-03-05 Thread Jark Wu
Hi Norm, Here is a documentation for JDBC connector, you can find the supported properties there: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector Regarding to your exception, you don't need to call `inAppendMode`. JDBC sink support both append-mode and

RE: Re: Re: Teradata as JDBC Connection

2020-03-05 Thread Norm Vilmer (Contractor)
Thanks Jack. I’ll try removing the inAppendMode() ☺ Regarding the Teradata dialect, crossing my fingers and hoping insert queries work. From: Jark Wu Sent: Thursday, March 5, 2020 8:41 PM To: Norm Vilmer (Contractor) Cc: Arvid Heise ; user@flink.apache.org Subject: EXTERNAL - Re: Re: Teradata a

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
Actually this use case lead me to start thinking about one question: If watermark is enabled, could we also support GROUP BY event_time instead of forcing user defining a window based on the event_time. GROUP BY a standalone event_time can also be treated as a special window, which has both start_

Re: Backpressure and 99th percentile latency

2020-03-05 Thread Zhijiang
Hi Felipe, Try to answer your below questions. > I understand that I am tracking latency every 10 seconds for each physical > instance operator. Is that right? Generally right. The latency marker is emitted from source and flow through all the intermediate operators until sink. This interval c

Re: Backpressure and 99th percentile latency

2020-03-05 Thread Arvid Heise
Hi Felipe, latency under backpressure has to be carefully interpreted. Latency's semantics actually require that the data source is read in a timely manner; that is, there is no bottleneck in your pipeline where data is piling up. Thus, to measure latency in experiments you must ensure that the c