Hi,
For a streaming job that uses Kafka connector, this doc
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/json.html#format-options
shows that we can parse json data format. However, it does not seem
like Flink JDBC connector support json data type, at least fr
timestamp and report the latency.
>
>
>
> Julian
>
>
>
> *From: *Fanbin Bu
> *Date: *Thursday, December 10, 2020 at 3:41 PM
> *To: *user
> *Subject: *latency monitoring
>
>
>
> Hi,
>
>
>
> I would like to monitor the pipeline latency meas
Hi,
I would like to monitor the pipeline latency measured by
timestamp when writing the output to sink - timestamp when ingested from
the source.
Now I'm able to get the timestamp writing to sink since the sink is
implementing a RichSinkFunction and therefore I can report gauge there [1].
But I h
'll file
> an issue to fix that.
>
> On 12/8/2020 4:42 AM, Fanbin Bu wrote:
>
> Hi,
>
> I followed [1] to define my own metric as:
>
> val dropwizardHistogram = new com.codahale.metrics.Histogram(new
> SlidingWindowReservoir(500))
> histogram = getRuntimeContext
&
Hi,
I followed [1] to define my own metric as:
val dropwizardHistogram = new com.codahale.metrics.Histogram(new
SlidingWindowReservoir(500))
histogram = getRuntimeContext
.getMetricGroup
.histogram("*feature_latency*", new
DropwizardHistogramWrapper(dropwizardHistogram))
and it is su
i have to put the keystore file to the nodes.
On Wed, Nov 18, 2020 at 4:29 PM Fanbin Bu wrote:
> Hi,
>
> This is a repost with modified subject per Sri Tummala's suggestion.
>
> I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I
> tried to put keys
Hi,
This is a repost with modified subject per Sri Tummala's suggestion.
I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried
to put keystore.jks location under /usr/lib/flink/... like:
export
SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/
$SecurityStore
.load(SslEngineBuilder.java:285)
... 23 more
On Tue, Nov 17, 2020 at 10:01 PM Fanbin Bu wrote:
> let me try to put it on s3 and change code like:
> 'properties.ssl.keystore.location'='s3://my-bucket/keystore.jks
>
> Thanks,
> Fanbin
>
> On Tue, Nov 17, 20
Hi,
I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried
to put keystore.jks location under /usr/lib/flink/... like:
export
SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
Notice that this is on EMR master(master) node. Bo
gt;
> Please verify that:
> 1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf
> your-program.jar | grep KafkaDynamicTableFactory")
> 2. kafka-connector version matches the version of Flink distribution on
> EMR.
>
> Regards,
> Roman
>
>
> On
Hi,
I could not launch my flink 1.11.2 application on EMR with exception
Caused by: org.apache.flink.table.api.ValidationException:
Could not find any factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.
I attached the full
; )
>
> Docs:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html
>
> Best,
> Jark
>
> On Wed, 11 Nov 2020 at 11:11, Fanbin Bu wrote:
>
>> Jark,
>>
>> Thanks for the quick response.
>> I tried to_timestamp(ts, ...), but go
;
> Q1: how does watermark know new_ts is a valid timestamp?
> > the framework will validate the return type of the computed column
> expression.
>Currently, it must be a type of TIMESTAMP(3).
>
> Q2: is it possible to reuse ts without introducing a new column?
> > Currently, it i
.
```
could you please provide a concrete example for this?
Thanks
Fanbin
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
On Tue, Nov 10, 2020 at 6:18 PM Fanbin Bu wrote:
> i also tried:
> ts TIMESTAMP WITH LOCAL TIME ZONE
>
> but it
i also tried:
ts TIMESTAMP WITH LOCAL TIME ZONE
but it failed with
Rowtime attribute 'ts' must be of type TIMESTAMP but is of type
'TIMESTAMP(6) WITH LOCAL TIME ZONE'.
On Tue, Nov 10, 2020 at 5:42 PM Fanbin Bu wrote:
> Hi,
>
> I have source json data like:
> {&q
Hi,
I have source json data like:
{"ts": "2020-11-09T20:26:10.368123Z", "user_id": "user1", "action": "click"}
...
my sql is:
create table t (
user_id string,
action string,
ts timestamp,
watermark for ts as ts - interval '5' second
) with (
'connector' = 'kafka',
'topic' = 'test',
'json.timestam
Hi,
Does any have any idea on the following error msg: (it flooded my task
manager log)
I do have datadog metrics present so this is probably only happens for some
metrics.
2020-06-24 03:27:15,362 WARN
org.apache.flink.metrics.datadog.DatadogHttpClient- Failed
sending request to Datad
Jark,
thanks for the reply. Do you know whether it's on the roadmap or what's the
plan?
On Mon, Jun 22, 2020 at 9:36 PM Jark Wu wrote:
> Hi Fanbin,
>
> Currently, over window aggregation doesn't support two-phase optimization.
>
> Best,
> Jark
>
> On Tue,
Hi,
Does over window aggregation support two-phase mode?
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html#table-optimizer-agg-phase-strategy
SELECT
user_id
, event_time
, listagg(event_type, '*') over w as names
FROM table
WINDOW w AS
( PARTITION BY user_id
ORD
%22ouywl%40139.com%22%5D>
>> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>>
>> On 12/17/2019 08:10,Fanbin Bu
>> wrote:
>>
>> Hi,
>>
>> After I upgrade flink 1.9, I got the following error message on EMR, it
>> wor
Hi,
i've been debugging this issue for several days now and still cant get it
to work. I need to read the kinesis historical data (7 days) using Flink
SQL. Here is my setup:
Flink version: 1.9.1
kinesis shard number: 32
Flink parallelism: 32
sql: select * from mytable (i purposely make this trivi
Hi,
I m running flink 1.9 on EMR using flink sql blink planner reading and
writing to JDBC input/output. my sql is just a listagg over window for the
last 7 days. However, i notice that there are one or two subtasks that take
too long to finish. In this thread
http://mail-archives.apache.org/mod_m
Hi,
For savepoint, the dir looks like
s3://bucket/savepoint-jobid/*
To resume, i do:
flink run -s s3://bucket/savepoint-jobid/
perfect!
For checkpoint, the dir looks like
s3://bucket/jobid/chk-100
s3://bucket/jobid/shared. <-- what is this for?
To resume, which one should i do:
flink run -s
> on this as well.
>
> - Steve
>
> On Wed, Mar 11, 2020 at 5:13 AM Chesnay Schepler
> wrote:
>
>> Please open a JIRA; we may have to split the datatog report into several
>> chunks.
>>
>> On 09/03/2020 07:47, Fanbin Bu wrote:
>>
>> quote fro
ng as the prefix is
unique you can safely ignore this warning."
I do see from log that my sql operator name is too long and says it's
truncated.
But i still failed to report to datadog.
Thanks
Fanbin
On Sun, Mar 8, 2020 at 11:36 PM Fanbin Bu wrote:
> Hi,
>
> Has anybody se
Hi,
Has anybody seen this error before and what is the suggested way to solve
it?
2020-03-07 02:54:34,100 WARN
org.apache.flink.metrics.datadog.DatadogHttpClient- Failed to
send request to Datadog (response was Response{protocol=http/1.1, code=413,
message=Request Entity Too Large, u
filter to fetch rows which are greater than the last max ID or max created
> time.
> For (3), this is a changelog support, which will be supported natively in
> 1.11 in Flink SQL.
>
> Best,
> Jark
>
>
> On Fri, 21 Feb 2020 at 02:35, Fanbin Bu wrote:
>
>>
>>
https://stackoverflow.com/questions/48151881/how-to-run-apache-flink-streaming-job-continuously-on-flink-server
On Thu, Feb 20, 2020 at 3:14 AM Chesnay Schepler wrote:
> Can you show us where you found the suggestion to use iterate()?
>
> On 20/02/2020 02:08, Fanbin Bu wrote:
> >
Hi,
My app creates the source from JDBC inputformat and running some sql and
print out. But the source terminates itself after the query is done. Is
there anyway to keep the source running?
samle code:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.
Hi,
Currently, "StreamTableEnvironment can not run in batch mode for now,
please use TableEnvironment."
Are there any plans on the unification of batch/streaming roadmap that
use StreamTableEnvironment for both streamingMode and batchMode?
Thanks,
Fanbin
Hi,
Currently, "StreamTableEnvironment can not run in batch mode for now,
please use TableEnvironment."
Are there any plans on the unification of batch/streaming roadmap that
use StreamTableEnvironment for both streamingMode and batchMode?
Thanks,
Fanbin
Hi,
According to
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/types.html#timestamp
,
the default java bridge time for timestamp is java.time.LocalDateTime. Is
there a setting that can change it to use
java.sql.Timestamp instead?
Thanks,
Fanbin
can u do
RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW?
On Tue, Feb 11, 2020 at 12:15 PM oleg wrote:
> Hi Community,
>
> I do streaming in event time and I want to preserve ordering and late
> events. I have a use case where I need to fire an aggregation function
> for events of la
Hi,
For the following implementation of merge,
https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java#L224
what if acc has the some keys in mergeAcc? the merg
gt; Best,
>
> Arvid
>
> On Tue, Jan 28, 2020 at 1:24 AM Fanbin Bu wrote:
>
>> I can build flink 1.10 and install it on to EMR
>> (flink-dist_2.11-1.10.0.jar). but what about other dependencies in my
>> project build.gradle, ie. flink-scala_2.11, flink-json, fli
u can describe
>> complete SQL and some data informations.
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu wrote:
>>
>>> Jingsong,
>>>
>>> Do you have any suggestions to debug the above mentioned
>&g
Hi,
I m following
https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters
to
debug flink program running on EMR.
how do I specify the host in the `edit configurations` if the terminal on
emr master is
hadoop@ip-10-200-46-186
?
Thanks,
Fanbin
Jingsong,
Do you have any suggestions to debug the above mentioned
IndexOutOfBoundsException error?
Thanks,
Fanbin
On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu wrote:
> I got the following error when running another job. any suggestions?
>
> Caused by: java.lang.IndexOutOfBoundsExcep
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu wrote:
> Jingsong,
>
> I set the config value to be too large. After I change
nfig code?
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu wrote:
>
>> Jingsong,
>>
>> Great, now i got a different error:
>>
>> java.lang.NullPointerException: Initial Segment may not be null
>> at
>> org.apache.flink
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
is there any other config i should add?
thanks,
Fanbin
On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu wrote
Jingsong Lee
>
> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu wrote:
>
>> Jingsong,
>>
>> Thank you for the response.
>> Since I'm using flink on EMR and the latest version is 1.9 now. the
>> second option is ruled out. but will keep that in mind for futu
I saw the doc in
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html
.
Do i have to set that in the code or can i do it through flink-conf.yaml?
On Wed, Jan 22, 2020 at 7:54 PM Fanbin Bu wrote:
> Jingsong,
>
> Thank you for the response.
> Since I'm u
g-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
> [2] https://issues.apache.org/jira/browse/FLINK-15732
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu wrote:
>
>>
>> tried to increase memo
y id, hop(created_at, interval '30' second, interval
'1' minute)
On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu wrote:
> Hi,
>
> I have a batch job using blink planner. and got the following error. I was
> able to successfully run the same job with flink 1.8 on yarn
Hi,
I have a batch job using blink planner. and got the following error. I was
able to successfully run the same job with flink 1.8 on yarn.
I set conf as:
taskmanager.heap.size: 5m
and flink UI gives me
Last Heartbeat:20-01-22
14:56:25ID:container_1579720108062_0018_01_20Data Port:41029
>
>
> On Fri, 17 Jan 2020 at 16:26, Congxian Qiu wrote:
>
>> Hi
>>
>> Currently, Checkpoint/savepoint only works if all operators/tasks are
>> still running., there is an issue[1] tracking this
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-
Hi,
I couldn't make a savepoint for the following graph:
[image: image.png]
with stacktrace:
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger
savepoint. Failure reason: Not all
alue of 'managedMemoryPerSlot (138m in
> your case) * numberOfSlots'.
>
> It's not clear to me why the exactly same code works on emr. Were you
> running the same version of flink?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Jan 8, 2020 at 8:18 AM F
Hi,
with Flink 1.9 running in docker mode, I have a batch job and got the
following error message.
However, it works totally fine with the same code on EMR. I checked the log
and here is the only difference:
managedMemoryInMB=138 . (the working ones has 0 value)
did anybody see this before?
Than
ly discuss the
> solutions
> in the following versions.
>
> Best,
> Kurt
>
> On Sat, Nov 2, 2019 at 7:24 AM Fanbin Bu wrote:
>
>> Kurt,
>>
>> What do you recommend for Flink SQL to use savepoints?
>>
>>
>>
>> On Thu, Oct 31, 2019 at
Hi,
After I upgrade flink 1.9, I got the following error message on EMR, it
works locally on IntelliJ.
I'm explicitly declaring the dependency as
implementation
'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1'
and I have
implementation group: 'com.amazonaws', name: 'aws-java-sdk-em
Hi
I have my sink defined as:
class MyAwesomeSink() extends RichSinkFunction[(Boolean, Row)] {
...
}
But compile complains when I use it like:
val sink = new MyAwesomeSink()
tableEnv.toRetractStream(queryResult, classOf[Row]).addSink(sink)
found : MyAwesomeSink
required:
org.apache.flink.str
> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/util/resource/StreamNodeUtil.java#L44
>
>
>
> Best
>
> Yun Tang
>
>
>
>
>
> *From: *Fanbin Bu
> *Date: *Thursday, October 31, 2019 at 1:17 PM
> *To:
Hi,
it is highly recommended that we assign the uid to the operator for the
sake of savepoint. How do we do this for Flink SQL? According to
https://stackoverflow.com/questions/55464955/how-to-add-uid-to-operator-in-flink-table-api,
it is not possible.
Does that mean, I can't use savepoint to res
olumnIndex);
What would be the best way to handle this on Flink side?
On Thu, Oct 24, 2019 at 12:36 AM Fanbin Bu wrote:
> Hi there,
>
> Flink Version: 1.8.1
> JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver
>
> Here is the code snippet:
>
> val rowTypeInf
Hi there,
Flink Version: 1.8.1
JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver
Here is the code snippet:
val rowTypeInfo = new RowTypeInfo(
Array[TypeInformation[_]](
new RowTypeInfo(
Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TY
Hi,
Just found that count distinct is supported in streaming but not in batch
(version 1.8), is there any plan to add this to batch?
SELECT
user_id
, hop_end(created_at, interval '30' second, interval '30' second) as bucket_ts
, count(distinct name)
FROM $table
GROUP BY
user_id
, hop(cr
Hi,
I have a table with schema being a scala case class or a Map. How do I
access the field?
Tried the following and it doesn't work.
case class MyObject(myField: String)
case class Event(myObject: MyObject, myMap: Map[String, String])
table = tableEnv.fromDataStream[Event](myStream, 'myObject, '
Hi,
Looks like Flink table connectors do not include `kinesis`. (only
FileSystem, Kafka, ES) see
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#table-connectors
.
I also found some examples for Kafka:
https://eventador.io/blog/flink_table_and_sql_api_with_apache_flin
how about move query db filter to the outer select.
On Fri, Jul 26, 2019 at 9:31 AM Tony Wei wrote:
> Hi,
>
> If I have multiple where conditions in my SQL, is it possible to specify
> its order, so that the query
> can be executed more efficiently?
>
> For example, if I have the following SQL,
tion on stackoverflow, see more details
> here[1].
>
> Best, Hequn
>
> [1]
> https://stackoverflow.com/questions/51691269/event-time-window-in-flink-does-not-trigger
>
> On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu wrote:
>
>> If I use proctime, the groupBy happens wit
If I use proctime, the groupBy happens without any delay.
On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu wrote:
> not sure whether this is related:
>
> public SingleOutputStreamOperator assignTimestampsAndWatermarks(
> AssignerWithPeriodicWatermarks timestampAndWater
t to 32
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(32)
and the command for launching the job is
flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS
On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu wrote:
> Thanks Fabian for the prompt reply. I just started using Fl
will
> be computed with approx. 10 minute delay.
>
> Best, Fabian
>
> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <
> fanbin...@coinbase.com>:
>
>> Hi,
>> I have a Flink sql streaming job defined by:
>>
>> SELECT
>> user_id
>>
Hi,
I have a Flink sql streaming job defined by:
SELECT
user_id
, hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
, count(name) as count
FROM event
WHERE name = 'signin'
GROUP BY
user_id
, hop(created_at, interval '30' second, interval '1' minute)
there is a
66 matches
Mail list logo