Re: What's the advantage of using BroadcastState?

2018-08-20 Thread Fabian Hueske
Hi, I've recently published a blog post about Broadcast State [1]. Cheers, Fabian [1] https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink 2018-08-20 3:58 GMT+02:00 Paul Lam : > Hi Rong, Hequn > > Your answers are very helpful! Thank you! > > Best Regards, > Paul

Re: processWindowFunction

2018-08-20 Thread antonio saldivar
Hello Thank you for the information, for some reason this KeyedProcessFunction is not found in my Flink version 1.4.2 I can only find ProcessFunction and work like this public class TxnProcessFn extends ProcessFunction { public void open(Configuration parameters) throws Exception { state1 = ge

Re: Flink socketTextStream UDP connection

2018-08-20 Thread Soheil Pourbafrani
Thank you for the information. On Mon, Aug 13, 2018 at 1:51 PM Fabian Hueske wrote: > Hi, > > ExecutionEnvironment.socketTextStream is deprecated and it is very likely > that it will be removed because of its limited use. > I would recommend to have at the implementation of the SourceFunction [1

Re: processWindowFunction

2018-08-20 Thread vino yang
Hi antonio, Oh, if you can't use KeyedProcessFunction, then this would be a pity. Then you can use MapState, where Key is used to store the key of your partition. But I am not sure if this will achieve the effect you want. Thanks, vino. antonio saldivar 于2018年8月20日周一 下午4:32写道: > Hello > > Than

Re: processWindowFunction

2018-08-20 Thread antonio saldivar
Maybe the usage of that function change, now I have to use it as this [1] [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction El lun., 20 ago. 2018 a las 5:56, vino yang () escribió: > Hi antonio, > > Oh, if you can't us

How does flink know which data is modified in dynamic table?

2018-08-20 Thread 徐涛
Hi All, Like the following code,If I use retract stream, I think Flink is able to know which item is modified( if praise has 1 items now, when one item comes to the stream, only very small amount of data is write to sink)  var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FR

How does flink know which data is modified in dynamic table?

2018-08-20 Thread 徐涛
Hi All, Like the following code,If I use retract stream, I think Flink is able to know which item is modified( if praise has 1 items now, when one item comes to the stream, only very small amount of data is write to sink) var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id

How does flink know which data is modified in dynamic table?

2018-08-20 Thread 徐涛
Hi All, Like the following code,If I use retract stream, I think Flink is able to know which item is modified( if praise has 1 items now, when one item comes to the stream, only very small amount of data is write to sink) var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id

Re: Flink Jobmanager Failover in HA mode

2018-08-20 Thread Helmut Zechmann
Hi Dominik, all jobs on the cluster (batch only jobs without state) where in status FINISHED. Best, Helmut On Fri, Aug 17, 2018 at 8:04 PM Dominik Wosiński wrote: > I have faced this issue, but in 1.4.0 IIRC. This seems to be related to > https://issues.apache.org/jira/browse/FLINK-10011. Wha

Re: How does flink know which data is modified in dynamic table?

2018-08-20 Thread Hequn Cheng
Hi Henry, Both sql output incrementally. However there are some problems if you use retract sink. You have to pay attention to the timestamp field since each time the value is different. For example, if the value is updated from 1 to 2, previous row: add (a, 1, 2018-08-20 20:18:10.286) retract

Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Cliff Resnick
Hi Vino, Thanks for the explanation, but the job only ever uses the Avro (1.8.2) pulled in by flink-formats/avro, so it's not a class version conflict there. I'm using default child-first loading. It might be a further transitive dependency, though it's not clear by stack trace or stepping throug

Cluster die when one of the TM killed

2018-08-20 Thread Siew Wai Yow
Hi, When one of the task manager is killed, the whole cluster die, is this something expected? We are using Flink 1.4. Thank you. Regards, Yow

Re: Cluster die when one of the TM killed

2018-08-20 Thread Dominik Wosiński
Hey, Can You please provide a little more information about your setup and maybe logs showing when the crash occurs? Best Regards, Dominik 2018-08-20 16:23 GMT+02:00 Siew Wai Yow : > Hi, > > > When one of the task manager is killed, the whole cluster die, is this > something expected? We are usin

Re: Cluster die when one of the TM killed

2018-08-20 Thread Lasse Nedergaard
Hi. We have seen the same behaviour on Yarn. It turned out that the default settings for was not optimal. yarn.maximum-failed-containers: The maximum number of failed containers the ApplicationMaster accepts until it fails the YARN session. Default: The number of initially requested TaskManage

Flink checkpointing to Google Cloud Storage

2018-08-20 Thread Oleksandr Serdiukov
Hello All! I am trying to configure checkpoints for flink jobs in GCS. Now I am able to write checkpoints but cannot restore from it: java.lang.NoClassDefFoundError: com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6 at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(Goog

Re: Override CaseClassSerializer with custom serializer

2018-08-20 Thread Gerard Garcia
Hi Timo, I see. Yes, we have already use the "Object Reuse" option. It was a nice performance improvement when we first set it! I guess another option we can try is to somehow make things "easier" to Flink so it can chain operators together. Most of them are not chained, I think it's because they

Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Cliff Resnick
Hi Vino, You were right in your assumption -- unshaded avro was being added to our application jar via third-party dependency. Excluding it in packaging fixed the issue. For the record, it looks flink-avro must be loaded from the lib or there will be errors in checkpoint restores. On Mon, Aug 20,

Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Jamie Grier
Hey Cliff, can you provide the stack trace of the issue you were seeing? We recently ran into a similar issue that we're still debugging. Did it look like this: java.lang.IllegalStateException: Could not initialize operator state > backend. > at > org.apache.flink.streaming.api.operators.Abstract

Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Cliff Resnick
Hi Jamie, No, it was nothing of the class not found variety, just parse errors. It had to do with Avro getting mixed up with different versions. -Cliff On Mon, Aug 20, 2018 at 4:18 PM Jamie Grier wrote: > Hey Cliff, can you provide the stack trace of the issue you were seeing? > We recently ra

Re: classloading strangeness with Avro in Flink

2018-08-20 Thread Cliff Resnick
Hi Vino, Unfortunately, I'm still stuck here. By moving the avro dependency chain to lib (and removing it from user jar), my OCFs decode but I get the error described here: https://github.com/confluentinc/schema-registry/pull/509 However, the Flink fix described in the PR above was to move the A

Access to kafka partition per record

2018-08-20 Thread John O
I am consuming data from a kafka topic that has multiple partitions. I would like to keyby(record.partition). What would be the best way to get access to the partition information? Jo

RE: Access to kafka partition per record

2018-08-20 Thread John O
Found it. Using KeyedDeserializationSchema, I can get access to the partition, offset, key and value information. From: John O Sent: Monday, August 20, 2018 3:15 PM To: user Subject: Access to kafka partition per record I am consuming data from a kafka topic that has multiple partitions. I wou

Re: How does flink know which data is modified in dynamic table?

2018-08-20 Thread 徐涛
Hi Hequn, However is it semantically correct? because the sql result is not equal to the bounded table. > 在 2018年8月20日,下午8:34,Hequn Cheng 写道: > > Hi Henry, > > Both sql output incrementally. > > However there are some problems if you use retract sink. You have to pay > atte

Re: classloading strangeness with Avro in Flink

2018-08-20 Thread vino yang
Hi Cliff, If so, you can explicitly exclude Avro's dependencies from related dependencies (using ) and then directly introduce dependencies on the Avro version you need. Thanks, vino. Cliff Resnick 于2018年8月21日周二 上午5:13写道: > Hi Vino, > > Unfortunately, I'm still stuck here. By moving the avro d

Re: Flink checkpointing to Google Cloud Storage

2018-08-20 Thread vino yang
Hi Oleksandr, >From the exception log, you seem to lack the relevant dependencies? You can check again which dependency the related class belongs to. Thanks, vino. Oleksandr Serdiukov 于2018年8月21日周二 上午12:04写道: > Hello All! > > I am trying to configure checkpoints for flink jobs in GCS. > Now I

Re: How does flink know which data is modified in dynamic table?

2018-08-20 Thread Hequn Cheng
Hi Henry, If you upsert by key 'article_id', the result is correct, i.e, the result is (a, 2, 2018-08-20 20:18:10.486). What do you think? Best, Hequn On Tue, Aug 21, 2018 at 9:44 AM, 徐涛 wrote: > Hi Hequn, > However is it semantically correct? because the sql result is not equal to > the bou

Re: How does flink know which data is modified in dynamic table?

2018-08-20 Thread 徐涛
Hi Hequn, Maybe I do not express clearly. I mean if only the update_timestamp of the increment data is updated, it is not enough. Because from the sql, it express the idea “all the time in the table is the same”, but actually each item in the table may be different. It is a bit weird. B

Re: How does flink know which data is modified in dynamic table?

2018-08-20 Thread Hequn Cheng
Hi Henry, You are right that, in MySQL, SYSDATE returns the time at which it executes while LOCALTIMESTAMP returns a constant time that indicates the time at which the statement began to execute. But other database system seems don't have this constraint(correct me if I'm wrong). Sometimes we don'

Re: Flink checkpointing to Google Cloud Storage

2018-08-20 Thread Oleksandr Serdiukov
Hi Vino, I don’t think this is lack of dependencies. If you look at the last line before NoClassDefFoundError you’ll see that the class actually is GoogleCloudStorageImpl and missing dependency is GoogleCloudStorageImpl$6.I can see both classes in the shaded jar. Seems like classloader issue. B

Re: How does flink know which data is modified in dynamic table?

2018-08-20 Thread 徐涛
Hi Hequn, Another question, for some case, I think update the timestamp of the retract row is reasonable, for example, some user does not want to the hard delete, but the soft delete, so I write code when the retract row comes I only do the soft delete, but I want the update_timestamp di

Re: How does flink know which data is modified in dynamic table?

2018-08-20 Thread Hequn Cheng
Hi, You are right. We can make use of it to do soft delete. But there will be problems in other cases. For example, retract messages by the whole row. I opened a jira[1] about this problem. Thanks for bring up this discussion. [1] https://issues.apache.org/jira/browse/FLINK-10188 Best, Hequn On

How to pass a dynamic path while writing to files using writeFileAsText(path)?

2018-08-20 Thread HarshithBolar
Let's say I have a Stream with elements of type `String`. I want to write each element in the stream to a separate file in some folder. I'm using the following set up. > filteredStream.writeAsText(path).setParallelism(1); How do I make this path variable? I even tried adding `System.nanotime(