Re: Interactive Programming in Flink (Cache operation)

2021-05-03 Thread Jack Kolokasis
in [2]. Best, Matthias [1] https://issues.apache.org/jira/browse/FLINK-19343 <https://issues.apache.org/jira/browse/FLINK-19343> [2] https://flink.apache.org/news/2021/05/03/release-1.13.0.html <https://flink.apache.org/news/2021/05/03/release-1.13.0.html> On Mon, May 3, 2021 at 8:12

Interactive Programming in Flink (Cache operation)

2021-05-03 Thread Jack Kolokasis
Hello, Does the new release of Flink 1.13.0 includes the cache operation feature (https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink). Thank you, Iacovos

Re: Caching Mechanism in Flink

2020-11-11 Thread Jack Kolokasis
#managed-memory On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis mailto:koloka...@ics.forth.gr>> wrote: Hi Matthias, Thank you for your reply and useful information. I find that the off-heap is used when Flink uses HybridMemorySegments. Well, how the Flink knows when to us

Re: Caching Mechanism in Flink

2020-11-11 Thread Jack Kolokasis
ink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis <mailto:koloka...@ics.forth.gr>> wrote: Thank you Xuannan for the reply. Also I want to ask about how Flink uses the off-heap memory. If I set taskmanager.me

Re: Caching Mechanism in Flink

2020-11-09 Thread Jack Kolokasis
Thank you Xuannan for the reply. Also I want to ask about how Flink uses the off-heap memory. If I set taskmanager.memory.task.off-heap.size then which data does Flink allocate off-heap? This is handle by the programmer? Best, Iacovos On 10/11/20 4:42 π.μ., Xuannan Su wrote: Hi Jack, At

Caching Mechanism in Flink

2020-11-09 Thread Jack Kolokasis
Hello all, I am new to Flink and I want to ask if the Flink supports a caching mechanism to store intermediate results in memory for machine learning workloads. If yes, how can I enable it and how can I use it? Thank you, Iacovos

Kafka source, committing and retries

2020-07-31 Thread Jack Phelan
Scenario === A partition that Flink is reading: [ 1 - 2 - 3 - 4 - 5 - 6 - 7 - | 8 _ 9 _ 10 _ 11 | 12 ~ 13 ] [. Committed. | In flight | unread ] Kafka basically breaks off pieces of the end of the queue and shoves them downstream for processing? So suppose whil

Re:Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-22 Thread jack
您好,jincheng老师,我已经验证了您提供的这种分开处理的逻辑,可以解决我的问题,非常感谢您的解惑 Best, Jack 在 2020-06-22 14:28:04,"jincheng sun" 写道: 您好,jack: Table API 不用 if/else 直接用类似逻辑即可: val t1 = table.filter('x > 2).groupBy(..) val t2 = table.filter('x <= 2).groupBy(..) t1.insert_into(&qu

Re:pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-18 Thread jack
试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支?? 在 2020-06-19 10:08:25,"jack" 写道: >使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中? &g

pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-18 Thread jack
使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中? 场景:使用pyflink通过filter进行条件过滤后插入到sink中, 比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中: { "logType":"syslog", "message":"sla;flkdsjf" } { "logType":"alarm", "message":"sla;flkdsjf" } t_env.from_path("source")\ .filter("

Re:Re: pyflink连接elasticsearch5.4问题

2020-06-16 Thread jack
e('taxiidcnt') >> .key_delimiter("$")) \ 在 2020-06-16 15:38:28,"Dian Fu" 写道: >I guess it's because the ES version specified in the job is `6`, however, the >jar used is `5`. > >> 在 2020年6月16日,下午1:47,jack 写道: >

pyflink连接elasticsearch5.4问题

2020-06-15 Thread jack
我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。 连接es的时候报错,findAndCreateTableSink failed。 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。 Caused by Could not find a suitable factory for ‘org.apac

Re: pyflink数据查询

2020-06-15 Thread jack
hi 感谢您的建议,我这边尝试一下自定义实现sink的方式。 Best, Jack 在 2020-06-15 18:08:15,"godfrey he" 写道: hi jack,jincheng Flink 1.11 支持直接将select的结果collect到本地,例如: CloseableIterator it = tEnv.executeSql("select ...").collect(); while(it.hasNext()) { it.next() } 但是 pyflink 还

pyflink数据查询

2020-06-09 Thread jack
问题请教: 描述: pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。 flink能否实现这样的方式? 感谢

Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread jack
非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教 在 2020-06-01 20:50:53,"Xingbo Huang" 写道: Hi, 其实这个是CSV connector的一个可选的quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。 st_env.connect( Kafka() .version("0.11") .topic("logSink")

pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 Thread jack
请教各位,我这边使用pyflink 消费kafka json字符串数据发送到kafka, 把kafka消息按照一个字段来处理, 数据输入: {"topic": "logSource", "message": "x=1,y=1,z=1"} 发送到kafka里面的数据结果如下: "{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}" 又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。 @udf(input_types=[DataTypes.STRING()

How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

2019-03-06 Thread Jack Tuck
I currently have Flink setup and have a Job running on EMR and I'm now trying to add monitoring by sending metrics off to prometheus. I have come across an issue with running Flink on EMR. I'm using Terraform to provision EMR (I run ansible after to download and run a job). Out the box, it doe

How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

2019-03-06 Thread Jack Tuck
I currently have Flink setup and have a Job running on EMR and I'm now trying to add monitoring by sending metrics off to prometheus. I have come across an issue with running Flink on EMR. I'm using Terraform to provision EMR (I run ansible after to download and run a job). Out the box, it doe

Re: Metrics not reported to graphite

2016-09-01 Thread Jack Huang
Found the solution to the follow-up question: https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#metrics On Thu, Sep 1, 2016 at 3:46 PM, Jack Huang wrote: > Hi Greg, > > Following your hint, I found the solution here ( > https://issues.apache.org/jira/

Re: Metrics not reported to graphite

2016-09-01 Thread Jack Huang
their names starting with the host ip address? Thanks, Jack On Thu, Sep 1, 2016 at 3:04 PM, Greg Hogan wrote: > Have you copied the required jar files into your lib/ directory? Only JMX > support is provided in the distribution. > > On Thu, Sep 1, 2016 at 5:07 PM, Jack Huang wrote:

Metrics not reported to graphite

2016-09-01 Thread Jack Huang
ounter.inc return true; } }) ​ However I don't see anything on my graphite server. What am I missing? Thanks, Jack

Re: Handle deserialization error

2016-09-01 Thread Jack Huang
Hi Yassine, For now my workaround is catching exceptions in my custom deserializer and producing some default object to the downstream. It would still be very nice to avoid this inefficiency by not producing an object at all. Thanks, Jack On Fri, Aug 26, 2016 at 6:51 PM, Yassine Marzougui

Re: Cannot pass objects with null-valued fields to the next operator

2016-09-01 Thread Jack Huang
or nullable fields? > > Stephan > > > On Mon, Aug 29, 2016 at 8:04 PM, Jack Huang wrote: > >> Hi all, >> >> It seems like flink does not allow passing case class objects with >> null-valued fields to the next operators. I am getting the following error >>

Cannot pass objects with null-valued fields to the next operator

2016-08-29 Thread Jack Huang
case. Is there a way to make it work? I am using flink-1.1.1. Thanks, Jack

Handle deserialization error

2016-08-26 Thread Jack Huang
formed, in which case I want to catch the exception, log some error message, and go on to the next message without producing an event to the downstream. It doesn't seem like the DeserializationSchema interface allows such behavior. How could I achieve this? Thanks, Jack

Re: Cannot use WindowedStream.fold with EventTimeSessionWindows

2016-08-17 Thread Jack Huang
er than it should. Therefore I need to perform incremental aggregation. If I can assume ascending timestamp for the incoming events, would there be a workaround? Thanks, Jack On Wed, Aug 17, 2016 at 2:17 AM, Till Rohrmann wrote: > Hi Jack, > > the problem with session windows and a fo

Cannot use WindowedStream.fold with EventTimeSessionWindows

2016-08-16 Thread Jack Huang
java.lang.UnsupportedOperationException: Fold cannot be used with a merging WindowAssigner. ​ Does anyone have a workaround? Thanks, Jack

Re: Parsing source JSON String as Scala Case Class

2016-08-05 Thread Jack Huang
tributing them). > > Making a Scala 'val' a 'lazy val' often does the trick (at minimal > performance cost). > > On Thu, Aug 4, 2016 at 3:56 AM, Jack Huang wrote: > >> Hi all, >> >> I want to read a source of JSON String as Scala Case Cla

Parsing source JSON String as Scala Case Class

2016-08-03 Thread Jack Huang
ackson and Gson have classes that is not serializable. I couldn't find any other solution to perform this JSON-to-Case-Class parsing, yet it seems a very basic need. What am I missing? Thanks, Jack

Re: Container running beyond physical memory limits when processing DataStream

2016-08-03 Thread Jack Huang
Hi Max, Changing yarn-heap-cutoff-ratio works seem to suffice for the time being. Thanks for your help. Regards, Jack On Tue, Aug 2, 2016 at 11:11 AM, Jack Huang wrote: > Hi Max, > > Is there a way to limit the JVM memory usage (something like the -Xmx > flag) for the task manage

Re: Container running beyond physical memory limits when processing DataStream

2016-08-02 Thread Jack Huang
se I could have indefinitely many messages backed-up in the source to be process. Thanks, Jack On Tue, Aug 2, 2016 at 5:21 AM, Maximilian Michels wrote: > Your job creates a lot of String objects which need to be garbage > collected. It could be that the JVM is not fast enough and Yarn kills > t

Container running beyond physical memory limits when processing DataStream

2016-07-28 Thread Jack Huang
ssages to process, but not failing altogether. I am not storing any states either. Does anyone know the reason and the way to fix/avoid this issue? Thanks, Jack

Periodically evicting internal states when using mapWithState()

2016-06-06 Thread Jack Huang
le way of knowing the end of a session, since events are likely to get lost. If I keep this process running, the number of stored sessions will keep growing until it fills up the disk. Is there a recommended way of periodically evicting sessions that are too old (e.g. a day old)? Thanks, Jack

Replays message in Kafka topics with FlinkKafkaConsumer09

2016-04-21 Thread Jack Huang
;auto.offset.reset", "earliest") env.addSource(new FlinkKafkaConsumer09[String](input, new SimpleStringSchema, kafkaProp)) .print ​ I thought *auto.offset.reset* is going to do the trick. What am I missing here? Thanks, Jack Huang

Re: Checkpoint and restore states

2016-04-21 Thread Jack Huang
ob manager automatically restarts the job under the same job ID 7. Observe from the output that the states are restored Jack Jack Huang On Thu, Apr 21, 2016 at 1:40 AM, Aljoscha Krettek wrote: > Hi, > yes Stefano is spot on! The state is only restored if a job is restarted > because

Re: Checkpoint and restore states

2016-04-20 Thread Jack Huang
ount > } > def restoreState(state: Integer) { > count = state > } > } Thanks, Jack Huang On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino < stefano.bagh...@radicalbit.io> wrote: > My bad, thanks for pointing that out. > > On Wed, Apr 20, 2016 at 1:49 PM, Alj

Checkpoint and restore states

2016-04-19 Thread Jack Huang
yBy({s => s}) > > > > *.mapWithState((in:String, count:Option[Int]) => {val newCount = > count.getOrElse(0) + 1((in, newCount), Some(newCount))})* > .print() Thanks, Jack Huang

Re: Immutable data

2015-09-23 Thread Jack
23 Sep 2015, at 15:02, Aljoscha Krettek wrote: > > Hi Jack, > Stephan is right, this should work. Unfortunately the TypeAnalyzer does not > correctly detect that it cannot treat your Id class as a Pojo. I will add a > Jira issue for that. For the time being you can use this co

Re: Immutable data

2015-09-23 Thread Jack
nitializeFields(PojoSerializer.java:232) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:221) ... 5 more > On 23 Sep 2015, at 14:37, Stephan Ewen wrote: > > Hi Jack! > > This should be supported, there is no strict requirement for muta

Immutable data

2015-09-23 Thread Jack
Hi, I'm having trouble integrating existing Scala code with Flink, due to POJO-only requirement. We're using AnyVal heavily for type safety, and immutable classes as a default. For example, the following does not work: object Test { class Id(val underlying: Int) extends AnyVal class X(va

Many topologies

2015-09-23 Thread Jack
Hello, I need to implement an engine for running a large number (several hundred) of different kinds of topologies. The topologies are defined ad-hoc by customers, hence combining the topologies is difficult. How does Flink perform with so many topologies? Thanks