Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

2020-04-16 Thread Till Rohrmann
Hi Kaan,

I'm not entirely sure what's going wrong w/o having a minimal code example
which is able to reproduce the problem. So if you could provide us with
this, that would allow us to look into it.

Cheers,
Till

On Wed, Apr 15, 2020 at 6:59 PM Kaan Sancak  wrote:

> Thanks that is working now!
>
> I have one last question.
>
> Goin one step further, I have changed vertex value type to be a POJO
> class. The structure is somewhat similar to this,
>
> class LocalStorage {
> Integer id;
> Long degree;
> Boolean active;
> List labels;
> Map neighborDegree;
>
>….
> }
>
> During the execution, I got an error saying that
> `org.apache.flink.api.common.InvalidProgramException: This type
> (GenericType) cannot be used as key`.
>
>  After having some reading I have implemented Value and Comparable
> interfaces . But now, after scatter phase ends, the localstorage values
> seems to be not persistent. For, I set the the active flag true in scatter
> phase, but during gather phase, the `active` flag seems to be null. (Note
> that, I have already know the degree and id is accessible within vertex
> context, but I am just trying to see what can I do with the framework).
>
> I am guessing  this is a serialization/deserialization related issue. I
> had some online digging and github search but I couldn’t really find a fix
> it. I have tried some approaches suggested on SO but they didn’t work for
> me. Is this a problem related to my POJO class having list and map types?
> Is this supported?
>
> It would be great if someone can point out a similar example or an easy
> fix.
>
> Best
> Kaan
>
> On Apr 15, 2020, at 11:07 AM, Till Rohrmann  wrote:
>
> Hi Kaan,
>
> I think what you are proposing is something like this:
>
> Graph graph = ... // get first batch
>
> Graph graphAfterFirstSG =
> graph.runScatterGatherIteration();
>
> Graph secondBatch = ... // get second batch
>
> // Adjust the result of SG iteration with secondBatch
>
> Graph updatedGraph =
> graphAfterFirstSG.union/difference(secondBatch));
>
> updatedGraph.runScatterGatherIteration();
>
> Then I believe this should work.
>
> Cheers,
> Till
>
> On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak  wrote:
>
>> Thanks for the useful information! It seems like a good and fun idea to
>> experiment. I will definitely give it a try.
>>
>> I have a very close upcoming deadline and I have already implemented the
>> Scatter-Gather iteration algorithm.
>>
>> I have another question on whether we can chain Scatter-Gather or
>> Vertex-Centric iterations.
>> Let’s say that we have an initial batch/dataset, we run a Scatter-Gather
>> and obtain graph.
>> Using another batch we added/deleted vertices to the graph we obtained.
>> Now we run another Scatter-Gather on the modified graph.
>>
>> This is no streaming but a naive way to simulate batch updates that are
>> happening concurrently.
>> Do you think it is a feasible way to do this way?
>>
>> Best
>> Kaan
>>
>> On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>> Hi,
>>
>> As you mentioned, Gelly Graph's are backed by Flink DataSets, and
>> therefore
>> work primarily on static graphs. I don't think it'll be possible to
>> implement incremental algorithms described in your SO question.
>>
>> Have you tried looking at Stateful Functions, a recent new API added to
>> Flink?
>> It supports arbitrary messaging between functions, which may allow you to
>> build what you have in mind.
>> Take a look at Seth's an Igal's comments here [1], where there seems to
>> be a
>> similar incremental graph-processing use case for sessionization.
>>
>> Cheers,
>> Gordon
>>
>> [1]
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>


Re: AvroParquetWriter issues writing to S3

2020-04-16 Thread Till Rohrmann
Hi Diogo,

thanks for reporting this issue. It looks quite strange to be honest.
flink-s3-fs-hadoop-1.10.0.jar contains the DateTimeParserBucket class. So
either this class wasn't loaded when starting the application from scratch
or there could be a problem with the plugin mechanism on restarts. I'm
pulling in Arvid who worked on the plugin mechanism and might be able to
tell us more. In the meantime, could you provide us with the logs? They
might tell us a bit more what happened.

Cheers,
Till

On Wed, Apr 15, 2020 at 5:54 PM Diogo Santos 
wrote:

> Hi guys,
>
> I'm using AvroParquetWriter to write parquet files into S3 and when I
> setup the cluster (starting fresh instances jobmanager/taskmanager etc),
> the scheduled job starts executing without problems and could write the
> files into S3 but if the job is canceled and starts again the job throws
> the exception java.lang.NoClassDefFoundError:
> org/joda/time/format/DateTimeParserBucket
>
> *Caused by: java.lang.NoClassDefFoundError:
> org/joda/time/format/DateTimeParserBucket at 
> *org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:196) at
> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:88)
> at
> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:121)
> at
> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
> at
> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
> at
> com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:69)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1714)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleSuccessResponse(AmazonHttpClient.java:1434)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1356)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5052)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4998)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1335)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1309)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at
> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
> at
> org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:246)
> at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:280)
> at
> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
> at
> 
>
>
> Environment configuration:
> - apache flink 1.10
> - scala 2.12
> - the uber jar is in the application classloader (/lib)
> flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> - in plugins folder exists the folder s3-fs-hadoop with the jar
> flink-s3-fs-hadoop-1.10.0.jar
>
> I can fix this issue adding the dependency joda-time to the flink lib
> folder and excluding the dependency joda-time from the hadoop-aws that is
> required by the application code.
>
> Do you know what is the root cause of this? Or if I could do another
> thing than adding the joda-time dependency on the flink lib folder?
>
> Thanks
>
> --
> cumprimentos,
> Diogo Santos
>


Re: AvroParquetWriter issues writing to S3

2020-04-16 Thread Till Rohrmann
For future reference, here is the stack trace in an easier to read format:

Caused by: java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeParserBucket at
org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825
 at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:196
 at
com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:88
 at
com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:121
 at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32
 at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25
 at
com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:69
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1714
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleSuccessResponse(AmazonHttpClient.java:1434
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1356
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680
 at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544
 at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524
 at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5052
 at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4998
 at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1335
 at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1309
 at
org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904
 at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553
 at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910
 at
org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81
 at
org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:246
 at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:280
 at
org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535
undefined) at


On Thu, Apr 16, 2020 at 9:26 AM Till Rohrmann  wrote:

> Hi Diogo,
>
> thanks for reporting this issue. It looks quite strange to be honest.
> flink-s3-fs-hadoop-1.10.0.jar contains the DateTimeParserBucket class. So
> either this class wasn't loaded when starting the application from scratch
> or there could be a problem with the plugin mechanism on restarts. I'm
> pulling in Arvid who worked on the plugin mechanism and might be able to
> tell us more. In the meantime, could you provide us with the logs? They
> might tell us a bit more what happened.
>
> Cheers,
> Till
>
> On Wed, Apr 15, 2020 at 5:54 PM Diogo Santos 
> wrote:
>
>> Hi guys,
>>
>> I'm using AvroParquetWriter to write parquet files into S3 and when I
>> setup the cluster (starting fresh instances jobmanager/taskmanager etc),
>> the scheduled job starts executing without problems and could write the
>> files into S3 but if the job is canceled and starts again the job throws
>> the exception java.lang.NoClassDefFoundError:
>> org/joda/time/format/DateTimeParserBucket
>>
>> *Caused by: java.lang.NoClassDefFoundError:
>> org/joda/time/format/DateTimeParserBucket at 
>> *org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:196) at
>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:88)
>> at
>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:121)
>> at
>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>> at
>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>> at
>> com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:69)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1714)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleSuccessResponse(AmazonHttpClient.java:1434)
>> at
>> com.amazonaws.http.AmazonHttpClie

Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

2020-04-16 Thread Kaan Sancak
Thanks for the reply. Turns out that my serializer was writing one of the
fields wrong.
I fixed it and everything seems to be working correctly for now.


Best
Kaan


On Apr 16, 2020, at 3:05 AM, Till Rohrmann  wrote:

Hi Kaan,

I'm not entirely sure what's going wrong w/o having a minimal code example
which is able to reproduce the problem. So if you could provide us with
this, that would allow us to look into it.

Cheers,
Till

On Wed, Apr 15, 2020 at 6:59 PM Kaan Sancak  wrote:

> Thanks that is working now!
>
> I have one last question.
>
> Goin one step further, I have changed vertex value type to be a POJO
> class. The structure is somewhat similar to this,
>
> class LocalStorage {
> Integer id;
> Long degree;
> Boolean active;
> List labels;
> Map neighborDegree;
>
>….
> }
>
> During the execution, I got an error saying that
> `org.apache.flink.api.common.InvalidProgramException: This type
> (GenericType) cannot be used as key`.
>
>  After having some reading I have implemented Value and Comparable
> interfaces . But now, after scatter phase ends, the localstorage values
> seems to be not persistent. For, I set the the active flag true in scatter
> phase, but during gather phase, the `active` flag seems to be null. (Note
> that, I have already know the degree and id is accessible within vertex
> context, but I am just trying to see what can I do with the framework).
>
> I am guessing  this is a serialization/deserialization related issue. I
> had some online digging and github search but I couldn’t really find a fix
> it. I have tried some approaches suggested on SO but they didn’t work for
> me. Is this a problem related to my POJO class having list and map types?
> Is this supported?
>
> It would be great if someone can point out a similar example or an easy
> fix.
>
> Best
> Kaan
>
> On Apr 15, 2020, at 11:07 AM, Till Rohrmann  wrote:
>
> Hi Kaan,
>
> I think what you are proposing is something like this:
>
> Graph graph = ... // get first batch
>
> Graph graphAfterFirstSG =
> graph.runScatterGatherIteration();
>
> Graph secondBatch = ... // get second batch
>
> // Adjust the result of SG iteration with secondBatch
>
> Graph updatedGraph =
> graphAfterFirstSG.union/difference(secondBatch));
>
> updatedGraph.runScatterGatherIteration();
>
> Then I believe this should work.
>
> Cheers,
> Till
>
> On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak  wrote:
>
>> Thanks for the useful information! It seems like a good and fun idea to
>> experiment. I will definitely give it a try.
>>
>> I have a very close upcoming deadline and I have already implemented the
>> Scatter-Gather iteration algorithm.
>>
>> I have another question on whether we can chain Scatter-Gather or
>> Vertex-Centric iterations.
>> Let’s say that we have an initial batch/dataset, we run a Scatter-Gather
>> and obtain graph.
>> Using another batch we added/deleted vertices to the graph we obtained.
>> Now we run another Scatter-Gather on the modified graph.
>>
>> This is no streaming but a naive way to simulate batch updates that are
>> happening concurrently.
>> Do you think it is a feasible way to do this way?
>>
>> Best
>> Kaan
>>
>> On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>> Hi,
>>
>> As you mentioned, Gelly Graph's are backed by Flink DataSets, and
>> therefore
>> work primarily on static graphs. I don't think it'll be possible to
>> implement incremental algorithms described in your SO question.
>>
>> Have you tried looking at Stateful Functions, a recent new API added to
>> Flink?
>> It supports arbitrary messaging between functions, which may allow you to
>> build what you have in mind.
>> Take a look at Seth's an Igal's comments here [1], where there seems to
>> be a
>> similar incremental graph-processing use case for sessionization.
>>
>> Cheers,
>> Gordon
>>
>> [1]
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>


FlinkSQL query error when specify json-schema.

2020-04-16 Thread wangl...@geekplus.com.cn

CREATE TABLE user_log(
`id` INT,
`timestamp`  BIGINT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'wanglei_jsontest',
'connector.startup-mode' = 'latest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = '172.19.78.32:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = '172.19.78.32:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.json-schema' = '{
"type": "object",
"properties": {
   "id": {"type": "integer"},
   "timestamp": {"type": "number"}
}
}'
);

Then select * from user_log;

org.apache.flink.table.api.ValidationException: Type INT of table field 'id' 
does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'id' 
field of the TableSource return type.

Seems the specified type "integer", "number" can not be mapped to  INT, BIGINT 

How can i solve this problem?

Thanks,
Lei



wangl...@geekplus.com.cn 



How to scale a streaming Flink pipeline without abusing parallelism for long computation tasks?

2020-04-16 Thread Elkhan Dadashov
Hi Flink users,

I have a basic Flnk pipeline, doing flatmap.

inside flatmap, I get the input, path it to the client library to compute
some result.

That library execution takes around 30 seconds to 2 minutes (depending on
the input ) for producing the output from the given input ( it is
time-series based long-running computation).

As it takes the library long time to compute, the input payloads keep
buffered, and if not given enough parallelism, the job will crash/restart.
(java.lang.RuntimeException: Buffer pool is destroyed.)

Wanted to check what are other options for scaling Flink streaming pipeline
without abusing parallelism for long-running computations in Flink operator?

Is multi-threading inside the operator recommended? ( even though the
single input computation takes a long time, but I can definitely run 4-8 of
them in parallel threads, instead of one by one, inside the same FlatMap
operator.

1 core for each yarn slot ( which will hold 1 flatmap operator) seems too
expensive. If we could launch more link operators with only 1 core, it
could have been easier.

If anyone faced a similar issue please share your experience. I'm using
Flink 1..6.3 version.

Thanks.


Flink SQL Gateway

2020-04-16 Thread Flavio Pompermaier
Hi Jeff,
FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL but
since then no progress has been made on that point. Do you think that
Zeppelin could be used somehow as a SQL Gateway towards Flink for the
moment?
Any chance that a Flink SQL Gateway could ever be developed? Is there
anybody interested in this?

Best,
Flavio

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client


Re: Flink SQL Gateway

2020-04-16 Thread godfrey he
Hi Flavio,

We prose FLIP-91[1] to support SQL Gateway at the beginning of this year.
After a long discussion, we reached an agreement that
SQL Gateway is an eco-system under ververia as first step.[2]
Which could help SQL Gateway move forward faster.
Now we almost finish first version development, some users are trying it
out.
Any suggestions are welcome!

[1]
https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit?ts=5e4cd711#heading=h.sru3xzbt2i1k
[2] https://github.com/ververica/flink-sql-gateway

Best,
Godfrey

Flavio Pompermaier  于2020年4月16日周四 下午4:42写道:

> Hi Jeff,
> FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL but
> since then no progress has been made on that point. Do you think that
> Zeppelin could be used somehow as a SQL Gateway towards Flink for the
> moment?
> Any chance that a Flink SQL Gateway could ever be developed? Is there
> anybody interested in this?
>
> Best,
> Flavio
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>


Re: Flink SQL Gateway

2020-04-16 Thread Flavio Pompermaier
Great, I'm very interested in trying it out!
Maybe we can also help with the development because we need something like
that.
Thanks a lot for the pointers

On Thu, Apr 16, 2020 at 10:55 AM godfrey he  wrote:

> Hi Flavio,
>
> We prose FLIP-91[1] to support SQL Gateway at the beginning of this year.
> After a long discussion, we reached an agreement that
> SQL Gateway is an eco-system under ververia as first step.[2]
> Which could help SQL Gateway move forward faster.
> Now we almost finish first version development, some users are trying it
> out.
> Any suggestions are welcome!
>
> [1]
> https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit?ts=5e4cd711#heading=h.sru3xzbt2i1k
> [2] https://github.com/ververica/flink-sql-gateway
>
> Best,
> Godfrey
>
> Flavio Pompermaier  于2020年4月16日周四 下午4:42写道:
>
>> Hi Jeff,
>> FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL but
>> since then no progress has been made on that point. Do you think that
>> Zeppelin could be used somehow as a SQL Gateway towards Flink for the
>> moment?
>> Any chance that a Flink SQL Gateway could ever be developed? Is there
>> anybody interested in this?
>>
>> Best,
>> Flavio
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>>
>


Flink upgrade to 1.10: function

2020-04-16 Thread seeksst
Hi, All


Recently, I try to upgrade flink from 1.8.2 to 1.10, but i meet some problem 
about function. In 1.8.2, there are just Built-In function and User-defined 
Functions, but in 1.10, there are 4 categories of funtions.
I defined a function which named JSON_VALUE in my system, it doesn’t exist in 
1.8.2, but present to 1.10.0. of course i use it in sql, something like 'select 
JSON_VALUE(string, string) from table1’, no category or database. the problem 
is in 1.10.0, my function will be recognized as SqlJsonValueFunction, and args 
not match, so my sql is wrong.
I read document about Ambiguous Function Reference, In my understanding, my 
function will be registered as temporary system function, and it should be 
chosen first. isn’t it? I try to debug it, and find some information:
First, sql will be parsed by ParseImpl, and JSON_VALUE will be parsed as 
SqlBasicCall, operator is SqlJsonValueFunction, it’s belonged to SYSTEM catalog 
and the kind is OTHER_FUNCTION. Then,SqlUtil.lookupRoutine will not find this 
SqlFunction, because it not in BasicOperatorTable. my function 
inFunctionCatalog, butSqlJsonValueFunctionbelonged to SYSTEM, not belong 
toUSER_DEFINED, so program will not search it in FunctionCatalog.
How can i solve this problem without modifying sql and function name? my 
program can choose flink version and have many sql jobs, so i don’t wish to 
modify sql and function name.
Thansk.

Re: FlinkSQL query error when specify json-schema.

2020-04-16 Thread Benchao Li
Hi wanglei,

You don't need to specify 'format.json-schema', the format can derive
schema from the DDL.
Your exception above means the schema in 'format.json-schema' and DDL are
not match.

wangl...@geekplus.com.cn  于2020年4月16日周四 下午4:21写道:

>
> CREATE TABLE user_log(
> `id` INT,
> `timestamp`  BIGINT
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = 'wanglei_jsontest',
> 'connector.startup-mode' = 'latest-offset',
> 'connector.properties.0.key' = 'zookeeper.connect',
> 'connector.properties.0.value' = '172.19.78.32:2181',
> 'connector.properties.1.key' = 'bootstrap.servers',
> 'connector.properties.1.value' = '172.19.78.32:9092',
> 'update-mode' = 'append',
> 'format.type' = 'json',
> 'format.json-schema' = '{
> "type": "object",
> "properties": {
>"id": {"type": "integer"},
>"timestamp": {"type": "number"}
> }
> }'
> );
>
> Then select * from user_log;
>
> org.apache.flink.table.api.ValidationException: Type INT of table field
> 'id' does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of
> the 'id' field of the TableSource return type.
>
> Seems the specified type "integer", "number" can not be mapped to  INT,
> BIGINT
>
> How can i solve this problem?
>
> Thanks,
> Lei
>
> --
> wangl...@geekplus.com.cn
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

2020-04-16 Thread Kaan Sancak
If the vertex type is POJO what happens during the union of the graph? Is there 
a persistent approach, or can we define a function handle such occasions?

Would there be a performance difference between two cases:

1)
Graph graph = … // From edges list

graph = graph.runScatterGatherIteration();

Graph secondGraph = … // From second edge list

graph = graph.union(secondGraph).runScatterGatherIteration()

2)

Graph graph = … // From edges list

graph = graph.runScatterGatherIteration();

graph.addEdges(second_edge_list)

graph = graph.runScatterGatherIteration();


Before starting the second scatter-gather, I want to set/reset some fields of 
the vertex value of the vertices that are effected by edge additions/deletions 
(or union). It would be good to have a callback function that touches the 
end-points of the edges that are added/deleted.

Best
Kaan



> On Apr 15, 2020, at 11:07 AM, Till Rohrmann  wrote:
> 
> Hi Kaan,
> 
> I think what you are proposing is something like this:
> 
> Graph graph = ... // get first batch
> 
> Graph graphAfterFirstSG = 
> graph.runScatterGatherIteration();
> 
> Graph secondBatch = ... // get second batch
> 
> // Adjust the result of SG iteration with secondBatch
> 
> Graph updatedGraph = 
> graphAfterFirstSG.union/difference(secondBatch));
> 
> updatedGraph.runScatterGatherIteration();
> 
> Then I believe this should work.
> 
> Cheers,
> Till
> 
> On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak  > wrote:
> Thanks for the useful information! It seems like a good and fun idea to 
> experiment. I will definitely give it a try.
> 
> I have a very close upcoming deadline and I have already implemented the 
> Scatter-Gather iteration algorithm.
> 
> I have another question on whether we can chain Scatter-Gather or 
> Vertex-Centric iterations.
> Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and 
> obtain graph.
> Using another batch we added/deleted vertices to the graph we obtained. 
> Now we run another Scatter-Gather on the modified graph.
> 
> This is no streaming but a naive way to simulate batch updates that are 
> happening concurrently.
> Do you think it is a feasible way to do this way? 
> 
> Best
> Kaan
> 
>> On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai > > wrote:
>> 
>> Hi,
>> 
>> As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
>> work primarily on static graphs. I don't think it'll be possible to
>> implement incremental algorithms described in your SO question.
>> 
>> Have you tried looking at Stateful Functions, a recent new API added to
>> Flink?
>> It supports arbitrary messaging between functions, which may allow you to
>> build what you have in mind.
>> Take a look at Seth's an Igal's comments here [1], where there seems to be a
>> similar incremental graph-processing use case for sessionization.
>> 
>> Cheers,
>> Gordon
>> 
>> [1]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017
>>  
>> 
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>> 
> 



Re: Flink SQL Gateway

2020-04-16 Thread godfrey he
Hi Flavio, that's great~

Best,
Godfrey

Flavio Pompermaier  于2020年4月16日周四 下午5:01写道:

> Great, I'm very interested in trying it out!
> Maybe we can also help with the development because we need something like
> that.
> Thanks a lot for the pointers
>
> On Thu, Apr 16, 2020 at 10:55 AM godfrey he  wrote:
>
>> Hi Flavio,
>>
>> We prose FLIP-91[1] to support SQL Gateway at the beginning of this year.
>> After a long discussion, we reached an agreement that
>> SQL Gateway is an eco-system under ververia as first step.[2]
>> Which could help SQL Gateway move forward faster.
>> Now we almost finish first version development, some users are trying it
>> out.
>> Any suggestions are welcome!
>>
>> [1]
>> https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit?ts=5e4cd711#heading=h.sru3xzbt2i1k
>> [2] https://github.com/ververica/flink-sql-gateway
>>
>> Best,
>> Godfrey
>>
>> Flavio Pompermaier  于2020年4月16日周四 下午4:42写道:
>>
>>> Hi Jeff,
>>> FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL but
>>> since then no progress has been made on that point. Do you think that
>>> Zeppelin could be used somehow as a SQL Gateway towards Flink for the
>>> moment?
>>> Any chance that a Flink SQL Gateway could ever be developed? Is there
>>> anybody interested in this?
>>>
>>> Best,
>>> Flavio
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>>>
>>


instance number of user defined function

2020-04-16 Thread lec ssmi
Hi:
   I always wonder how much instance has been initialized in the whole
flink application.
   Suppose there is such a scenario:
   I have a  UDTF  called '*mongo_join'*  through  which the flink
table can join with external different mongo table  according to the
parameters passed in.
   So ,I have a sql table called*trade . *Throughout  all the
pipeline, I  join the *trade *table with  *item, * And *payment. *The sql
statement as bellows:

  * create view  trade_payment as  select trade_id, payment_id
from trade , lateral table (mongo_join('payment')) as T(payment_id);*
*  create view trade_item as  select trade_id,item_id from trade ,
, lateral table (mongo_join('item')) as T(payment_id); *

As everyone thinks, I use  some *member variables* to store  the
different MongoConnection  in the  instance of the UDTF.
So , will there be concurrency problems?  And how are the instances of
the function distributed?

  Thanks!


Re: Re: FlinkSQL query error when specify json-schema.

2020-04-16 Thread wangl...@geekplus.com.cn

Thanks, I have tried. 

 'format.derive-schema' = 'true' will work.

But if i insist to use format.json-schema,  the CREATE TABLE must be writtten 
as: 

`id` DECIMAL(38,18),
`timestamp` DECIMAL(38,18)



wangl...@geekplus.com.cn 

 
From: Benchao Li
Date: 2020-04-16 16:56
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: FlinkSQL query error when specify json-schema.
Hi wanglei,

You don't need to specify 'format.json-schema', the format can derive schema 
from the DDL.
Your exception above means the schema in 'format.json-schema' and DDL are not 
match.

wangl...@geekplus.com.cn  于2020年4月16日周四 下午4:21写道:

CREATE TABLE user_log(
`id` INT,
`timestamp`  BIGINT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'wanglei_jsontest',
'connector.startup-mode' = 'latest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = '172.19.78.32:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = '172.19.78.32:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.json-schema' = '{
"type": "object",
"properties": {
   "id": {"type": "integer"},
   "timestamp": {"type": "number"}
}
}'
);

Then select * from user_log;

org.apache.flink.table.api.ValidationException: Type INT of table field 'id' 
does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'id' 
field of the TableSource return type.

Seems the specified type "integer", "number" can not be mapped to  INT, BIGINT 

How can i solve this problem?

Thanks,
Lei



wangl...@geekplus.com.cn 



-- 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Re: FlinkSQL query error when specify json-schema.

2020-04-16 Thread Benchao Li
Hi wanglei,

Yes, your observation is correct. Currently the type derivation relies on
legacy types, which only support (38, 18) as decimal precisions.

wangl...@geekplus.com.cn  于2020年4月16日周四 下午6:54写道:

>
> Thanks, I have tried.
>
>  'format.derive-schema' = 'true' will work.
>
> But if i insist to use format.json-schema,  the CREATE TABLE must be
> writtten as:
>
> `id` DECIMAL(38,18),
> `timestamp` DECIMAL(38,18)
>
> --
> wangl...@geekplus.com.cn
>
>
> *From:* Benchao Li 
> *Date:* 2020-04-16 16:56
> *To:* wangl...@geekplus.com.cn
> *CC:* user 
> *Subject:* Re: FlinkSQL query error when specify json-schema.
> Hi wanglei,
>
> You don't need to specify 'format.json-schema', the format can derive
> schema from the DDL.
> Your exception above means the schema in 'format.json-schema' and DDL are
> not match.
>
> wangl...@geekplus.com.cn  于2020年4月16日周四
> 下午4:21写道:
>
>>
>> CREATE TABLE user_log(
>> `id` INT,
>> `timestamp`  BIGINT
>> ) WITH (
>> 'connector.type' = 'kafka',
>> 'connector.version' = 'universal',
>> 'connector.topic' = 'wanglei_jsontest',
>> 'connector.startup-mode' = 'latest-offset',
>> 'connector.properties.0.key' = 'zookeeper.connect',
>> 'connector.properties.0.value' = '172.19.78.32:2181',
>> 'connector.properties.1.key' = 'bootstrap.servers',
>> 'connector.properties.1.value' = '172.19.78.32:9092',
>> 'update-mode' = 'append',
>> 'format.type' = 'json',
>> 'format.json-schema' = '{
>> "type": "object",
>> "properties": {
>>"id": {"type": "integer"},
>>"timestamp": {"type": "number"}
>> }
>> }'
>> );
>>
>> Then select * from user_log;
>>
>> org.apache.flink.table.api.ValidationException: Type INT of table field
>> 'id' does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of
>> the 'id' field of the TableSource return type.
>>
>> Seems the specified type "integer", "number" can not be mapped to  INT,
>> BIGINT
>>
>> How can i solve this problem?
>>
>> Thanks,
>> Lei
>>
>> --
>> wangl...@geekplus.com.cn
>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Flink SQL Gateway

2020-04-16 Thread godfrey he
Hi Flavio,

Since 1.11(master), Flink supports "CREATE CATALOG ..." [1],
we can use this statement create catalog dynamically.

Currently, Catalog[2] dose not supports any operations on TRIGGER.
Flink can't also use such info now. What's your user scenario?

[1] https://issues.apache.org/jira/browse/FLINK-15349
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/catalogs.html#catalog-api

Best,
Godfrey

Flavio Pompermaier  于2020年4月16日周四 下午6:16写道:

> Hi Godfrey,
> I'd like to use the SQL gateway as a data proxy in our architecture.
> However, catalogs in our use case are not know at configuration time..
> is there a way to permit to register a JDBC catalog (for example when I
> want to connect to a Postgres database)?
> What if I want to add SHOW TRIGGERS? Do you think it could be interesting?
>
> On Thu, Apr 16, 2020 at 10:55 AM godfrey he  wrote:
>
>> Hi Flavio,
>>
>> We prose FLIP-91[1] to support SQL Gateway at the beginning of this year.
>> After a long discussion, we reached an agreement that
>> SQL Gateway is an eco-system under ververia as first step.[2]
>> Which could help SQL Gateway move forward faster.
>> Now we almost finish first version development, some users are trying it
>> out.
>> Any suggestions are welcome!
>>
>> [1]
>> https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit?ts=5e4cd711#heading=h.sru3xzbt2i1k
>> [2] https://github.com/ververica/flink-sql-gateway
>>
>> Best,
>> Godfrey
>>
>> Flavio Pompermaier  于2020年4月16日周四 下午4:42写道:
>>
>>> Hi Jeff,
>>> FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL but
>>> since then no progress has been made on that point. Do you think that
>>> Zeppelin could be used somehow as a SQL Gateway towards Flink for the
>>> moment?
>>> Any chance that a Flink SQL Gateway could ever be developed? Is there
>>> anybody interested in this?
>>>
>>> Best,
>>> Flavio
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>>>
>>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809
>


Re: instance number of user defined function

2020-04-16 Thread godfrey he
Hi,

An UDTF will be wrapped into an operator, an operator instance will be
executed by a slot (or parallelism/thread) ,
About operator, task, slot, you can refer to [1] for more details.
A TM (a JVM process) may has multiple slots, that means a JVM process may
has multiple UDTF instances.
It's better to make sure your UDTF stateless, otherwise you should care
about thread-safe problem.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html#task-slots-and-resources

Best,
Godfrey



lec ssmi  于2020年4月16日周四 下午6:20写道:

> Hi:
>I always wonder how much instance has been initialized in the whole
> flink application.
>Suppose there is such a scenario:
>I have a  UDTF  called '*mongo_join'*  through  which the flink
> table can join with external different mongo table  according to the
> parameters passed in.
>So ,I have a sql table called*trade . *Throughout  all the
> pipeline, I  join the *trade *table with  *item, * And *payment. *The sql
> statement as bellows:
>
>   * create view  trade_payment as  select trade_id, payment_id
> from trade , lateral table (mongo_join('payment')) as T(payment_id);*
> *  create view trade_item as  select trade_id,item_id from trade ,
> , lateral table (mongo_join('item')) as T(payment_id); *
>
> As everyone thinks, I use  some *member variables* to store  the
> different MongoConnection  in the  instance of the UDTF.
> So , will there be concurrency problems?  And how are the instances of
> the function distributed?
>
>   Thanks!
>
>


Re: Flink SQL Gateway

2020-04-16 Thread Flavio Pompermaier
Basically we want to give a UI to the user to register its data sources
(i.e. catalogs in the Flink world), preview them (SELECT * LIMIT 100 for
example) but, in the case of JDBC catalogs, also to see relationships and
triggers.
We don't want to reimplement the wheel so we would like to reuse and
contribute to Flink as much as possible (since then in the batch jobs we
use Flink and we don't like to do the same work twice..).
In this way we can contribute to Flink if something is missing in the SQL
Gateway. However I don't know how to extend the existing stuff (for example
if I want table relationships and triggers)..

Best,
Flavio

On Thu, Apr 16, 2020 at 1:38 PM godfrey he  wrote:

> Hi Flavio,
>
> Since 1.11(master), Flink supports "CREATE CATALOG ..." [1],
> we can use this statement create catalog dynamically.
>
> Currently, Catalog[2] dose not supports any operations on TRIGGER.
> Flink can't also use such info now. What's your user scenario?
>
> [1] https://issues.apache.org/jira/browse/FLINK-15349
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/catalogs.html#catalog-api
>
> Best,
> Godfrey
>
> Flavio Pompermaier  于2020年4月16日周四 下午6:16写道:
>
>> Hi Godfrey,
>> I'd like to use the SQL gateway as a data proxy in our architecture.
>> However, catalogs in our use case are not know at configuration time..
>> is there a way to permit to register a JDBC catalog (for example when I
>> want to connect to a Postgres database)?
>> What if I want to add SHOW TRIGGERS? Do you think it could be interesting?
>>
>> On Thu, Apr 16, 2020 at 10:55 AM godfrey he  wrote:
>>
>>> Hi Flavio,
>>>
>>> We prose FLIP-91[1] to support SQL Gateway at the beginning of this
>>> year.
>>> After a long discussion, we reached an agreement that
>>> SQL Gateway is an eco-system under ververia as first step.[2]
>>> Which could help SQL Gateway move forward faster.
>>> Now we almost finish first version development, some users are trying it
>>> out.
>>> Any suggestions are welcome!
>>>
>>> [1]
>>> https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit?ts=5e4cd711#heading=h.sru3xzbt2i1k
>>> [2] https://github.com/ververica/flink-sql-gateway
>>>
>>> Best,
>>> Godfrey
>>>
>>> Flavio Pompermaier  于2020年4月16日周四 下午4:42写道:
>>>
 Hi Jeff,
 FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL
 but since then no progress has been made on that point. Do you think that
 Zeppelin could be used somehow as a SQL Gateway towards Flink for the
 moment?
 Any chance that a Flink SQL Gateway could ever be developed? Is there
 anybody interested in this?

 Best,
 Flavio

 [1]
 https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client

>>>


Re: How to scale a streaming Flink pipeline without abusing parallelism for long computation tasks?

2020-04-16 Thread Theo Diefenthal
Hi, 

I think you could utilize AsyncIO in your case with just using a local thread 
pool [1]. 

Best regards 
Theo 

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
 


Von: "Elkhan Dadashov"  
An: "user"  
Gesendet: Donnerstag, 16. April 2020 10:37:55 
Betreff: How to scale a streaming Flink pipeline without abusing parallelism 
for long computation tasks? 

Hi Flink users, 
I have a basic Flnk pipeline, doing flatmap. 

inside flatmap, I get the input, path it to the client library to compute some 
result. 

That library execution takes around 30 seconds to 2 minutes (depending on the 
input ) for producing the output from the given input ( it is time-series based 
long-running computation). 

As it takes the library long time to compute, the input payloads keep buffered, 
and if not given enough parallelism, the job will crash/restart. 
(java.lang.RuntimeException: Buffer pool is destroyed.) 

Wanted to check what are other options for scaling Flink streaming pipeline 
without abusing parallelism for long-running computations in Flink operator? 

Is multi-threading inside the operator recommended? ( even though the single 
input computation takes a long time, but I can definitely run 4-8 of them in 
parallel threads, instead of one by one, inside the same FlatMap operator. 

1 core for each yarn slot ( which will hold 1 flatmap operator) seems too 
expensive. If we could launch more link operators with only 1 core, it could 
have been easier. 

If anyone faced a similar issue please share your experience. I'm using Flink 
1..6.3 version. 

Thanks. 


Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

2020-04-16 Thread Yun Gao
Hi Kaan,

   For the first issue, I think the two implementation should have difference 
and the first should be slower, but I think which one to use should be depend 
on your algorithm if it could compute incrementally only with the changed 
edges. However, as far as I know I think most graph algorithm does not satisfy 
this property, therefore I think you might have to use the first one. 

For the second issue, I think you might use Graph.getVertices() and 
graph.getEdges() to get the underlying vertices and edges dataset of the graph, 
then you could do any operations with the two datasets, like join the vertices 
dataset with the second edge list, and finally create a new Graph with new 
Graph(updated_vertices, edges, env).

Best,
 Yun


--
From:Kaan Sancak 
Send Time:2020 Apr. 16 (Thu.) 17:17
To:Till Rohrmann 
Cc:Tzu-Li (Gordon) Tai ; user 
Subject:Re: Question about Writing Incremental Graph Algorithms using Apache 
Flink Gelly

If the vertex type is POJO what happens during the union of the graph? Is there 
a persistent approach, or can we define a function handle such occasions?

Would there be a performance difference between two cases:

1)
 Graph graph = … // From edges list

 graph = graph.runScatterGatherIteration();

 Graph secondGraph = … // From second edge list

 graph = graph.union(secondGraph).runScatterGatherIteration()

2)
  
Graph graph = … // From edges list

 graph = graph.runScatterGatherIteration();

 graph.addEdges(second_edge_list)

 graph = graph.runScatterGatherIteration();


Before starting the second scatter-gather, I want to set/reset some fields of 
the vertex value of the vertices that are effected by edge additions/deletions 
(or union). It would be good to have a callback function that touches the 
end-points of the edges that are added/deleted.

Best
Kaan




On Apr 15, 2020, at 11:07 AM, Till Rohrmann  wrote:
Hi Kaan,

I think what you are proposing is something like this:

Graph graph = ... // get first batch

Graph graphAfterFirstSG = 
graph.runScatterGatherIteration();

Graph secondBatch = ... // get second batch

// Adjust the result of SG iteration with secondBatch

Graph updatedGraph = 
graphAfterFirstSG.union/difference(secondBatch));

updatedGraph.runScatterGatherIteration();

Then I believe this should work.

Cheers,
Till
On Wed, Apr 15, 2020 at 1:14 AM Kaan Sancak  wrote:
Thanks for the useful information! It seems like a good and fun idea to 
experiment. I will definitely give it a try.

I have a very close upcoming deadline and I have already implemented the 
Scatter-Gather iteration algorithm.

I have another question on whether we can chain Scatter-Gather or 
Vertex-Centric iterations.
Let’s say that we have an initial batch/dataset, we run a Scatter-Gather and 
obtain graph.
Using another batch we added/deleted vertices to the graph we obtained. 
Now we run another Scatter-Gather on the modified graph.

This is no streaming but a naive way to simulate batch updates that are 
happening concurrently.
Do you think it is a feasible way to do this way? 

Best
Kaan

On Apr 13, 2020, at 11:16 PM, Tzu-Li (Gordon) Tai  wrote:
Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




Re: Streaming Job eventually begins failing during checkpointing

2020-04-16 Thread Stephen Patel
I can't say that I ever call that directly.  The beam library that I'm
using does call it in a couple places:
https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429

But it seems to be the same descriptor every time.  Is that limit per
operator?  That is, can each operator host up to 32767 operator/broadcast
states?  I assume that's by name?

On Wed, Apr 15, 2020 at 10:46 PM Yun Tang  wrote:

> Hi  Stephen
>
> This is not related with RocksDB but with default on-heap operator state
> backend. From your exception stack trace, you have created too many
> operator states (more than 32767).
> How do you call context.getOperatorStateStore().getListState or
> context.getOperatorStateStore().getBroadcastState ? Did you pass a
> different operator state descriptor each time?
>
> Best
> Yun Tang
> --
> *From:* Stephen Patel 
> *Sent:* Thursday, April 16, 2020 2:09
> *To:* user@flink.apache.org 
> *Subject:* Streaming Job eventually begins failing during checkpointing
>
> I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's
> configured to use rocksdb, and checkpoint once a minute to hdfs.  This job
> operates just fine for around 20 days, and then begins failing with this
> exception (it fails, restarts, and fails again, repeatedly):
>
> 2020-04-15 13:15:02,920 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
> 2020-04-15 13:15:05,762 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes
> in 2667 ms).
> 2020-04-15 13:16:02,919 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
> 2020-04-15 13:16:03,147 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph-
>  (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from
> RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 32702 for operator  (1/2).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for
> operator  (1/2).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalArgumentException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 more
> Caused by: java.lang.IllegalArgumentException
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.(OperatorBackendSerializationProxy.java:68)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> ... 7 more
>
> This application configured to retain external checkpoints.  When I
> attempt to restart from the last successful checkpoint, it will fail with
> the same error on the first checkpoint that happens after the restart.
>
> I haven't been able to find out why this might be. The source code doesn't
> seem particularly informative to my eyes:
> https://github.com/apache/flink/blob/release-1.8.0/

Re: Streaming Job eventually begins failing during checkpointing

2020-04-16 Thread Stephen Patel
Correction.  I've actually found a place where it potentially might be
creating a new operator state per checkpoint:
https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L105
https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L141-L149

This gives me something I can investigate locally at least.

On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel  wrote:

> I can't say that I ever call that directly.  The beam library that I'm
> using does call it in a couple places:
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429
>
> But it seems to be the same descriptor every time.  Is that limit per
> operator?  That is, can each operator host up to 32767 operator/broadcast
> states?  I assume that's by name?
>
> On Wed, Apr 15, 2020 at 10:46 PM Yun Tang  wrote:
>
>> Hi  Stephen
>>
>> This is not related with RocksDB but with default on-heap operator state
>> backend. From your exception stack trace, you have created too many
>> operator states (more than 32767).
>> How do you call context.getOperatorStateStore().getListState or
>> context.getOperatorStateStore().getBroadcastState ? Did you pass a
>> different operator state descriptor each time?
>>
>> Best
>> Yun Tang
>> --
>> *From:* Stephen Patel 
>> *Sent:* Thursday, April 16, 2020 2:09
>> *To:* user@flink.apache.org 
>> *Subject:* Streaming Job eventually begins failing during checkpointing
>>
>> I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's
>> configured to use rocksdb, and checkpoint once a minute to hdfs.  This job
>> operates just fine for around 20 days, and then begins failing with this
>> exception (it fails, restarts, and fails again, repeatedly):
>>
>> 2020-04-15 13:15:02,920 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
>> 2020-04-15 13:15:05,762 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes
>> in 2667 ms).
>> 2020-04-15 13:16:02,919 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
>> 2020-04-15 13:16:03,147 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph-
>>  (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from
>> RUNNING to FAILED.
>> AsynchronousException{java.lang.Exception: Could not materialize
>> checkpoint 32702 for operator  (1/2).}
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint 32702
>> for operator  (1/2).
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>> ... 6 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.IllegalArgumentException
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>> at
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>> ... 5 more
>> Caused by: java.lang.IllegalArgumentException
>> at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>> at
>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.(OperatorBackendSerializationProxy.java:68)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBacke

Re: Flink SQL Gateway

2020-04-16 Thread Jeff Zhang
Hi Flavio,

If you would like to use have a UI to register data sources, run flink sql
and preview the sql result, then you can use zeppelin directly. You can
check the tutorial here,
1) Get started https://link.medium.com/oppqD6dIg5
 2) Batch https://link.medium.com/3qumbwRIg5
 3) Streaming https://
link.medium.com/RBHa2lTIg5  4) Advanced
usage https://link.medium.com/CAekyoXIg5 

And here's one article shared by someone else about how to use flink on
zeppelin.
https://medium.com/@abdelkrim.hadjidj/event-driven-supply-chain-for-crisis-with-flinksql-be80cb3ad4f9

Besides that, Zeppelin provides rest api which you can use to integarte
with other system, but it is not standard jdbc protocol.
http://zeppelin.apache.org/docs/0.9.0-preview1/usage/rest_api/notebook.html

And I am doing more improvement recently, I will reveal more details in
next week's flink forward.
https://www.flink-forward.org/sf-2020/conference-program#it%E2%80%99s-finally-here--python-on-flink---flink-on-zeppelin





Flavio Pompermaier  于2020年4月16日周四 下午8:24写道:

> Basically we want to give a UI to the user to register its data sources
> (i.e. catalogs in the Flink world), preview them (SELECT * LIMIT 100 for
> example) but, in the case of JDBC catalogs, also to see relationships and
> triggers.
> We don't want to reimplement the wheel so we would like to reuse and
> contribute to Flink as much as possible (since then in the batch jobs we
> use Flink and we don't like to do the same work twice..).
> In this way we can contribute to Flink if something is missing in the SQL
> Gateway. However I don't know how to extend the existing stuff (for example
> if I want table relationships and triggers)..
>
> Best,
> Flavio
>
> On Thu, Apr 16, 2020 at 1:38 PM godfrey he  wrote:
>
>> Hi Flavio,
>>
>> Since 1.11(master), Flink supports "CREATE CATALOG ..." [1],
>> we can use this statement create catalog dynamically.
>>
>> Currently, Catalog[2] dose not supports any operations on TRIGGER.
>> Flink can't also use such info now. What's your user scenario?
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15349
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/catalogs.html#catalog-api
>>
>> Best,
>> Godfrey
>>
>> Flavio Pompermaier  于2020年4月16日周四 下午6:16写道:
>>
>>> Hi Godfrey,
>>> I'd like to use the SQL gateway as a data proxy in our architecture.
>>> However, catalogs in our use case are not know at configuration time..
>>> is there a way to permit to register a JDBC catalog (for example when I
>>> want to connect to a Postgres database)?
>>> What if I want to add SHOW TRIGGERS? Do you think it could be
>>> interesting?
>>>
>>> On Thu, Apr 16, 2020 at 10:55 AM godfrey he  wrote:
>>>
 Hi Flavio,

 We prose FLIP-91[1] to support SQL Gateway at the beginning of this
 year.
 After a long discussion, we reached an agreement that
 SQL Gateway is an eco-system under ververia as first step.[2]
 Which could help SQL Gateway move forward faster.
 Now we almost finish first version development, some users are trying
 it out.
 Any suggestions are welcome!

 [1]
 https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit?ts=5e4cd711#heading=h.sru3xzbt2i1k
 [2] https://github.com/ververica/flink-sql-gateway

 Best,
 Godfrey

 Flavio Pompermaier  于2020年4月16日周四 下午4:42写道:

> Hi Jeff,
> FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL
> but since then no progress has been made on that point. Do you think that
> Zeppelin could be used somehow as a SQL Gateway towards Flink for the
> moment?
> Any chance that a Flink SQL Gateway could ever be developed? Is there
> anybody interested in this?
>
> Best,
> Flavio
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>


-- 
Best Regards

Jeff Zhang


UNSUBSCRIBE

2020-04-16 Thread JOHN MILLER
Greetings

Please unsubscribe me from your mailing list

JOhn M


Unsubscribe

2020-04-16 Thread Jose Cisneros
Unsubscribe


Re: AvroParquetWriter issues writing to S3

2020-04-16 Thread Diogo Santos
Hi Till,

definitely seems to be a strange issue. The first time the job is loaded
(with a clean instance of the Cluster) the job goes well, but if it is
canceled or started again the issue came. 

I built an example here https://github.com/congd123/flink-s3-example

You can generate the artifact of the Flink Job and start the cluster with
the configuration on the docker-compose.

Thanks for helping







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming Job eventually begins failing during checkpointing

2020-04-16 Thread Stephen Patel
I posted to the beam mailing list:
https://lists.apache.org/thread.html/rb2ebfad16d85bcf668978b3defd442feda0903c20db29c323497a672%40%3Cuser.beam.apache.org%3E

I think this is related to a Beam feature called RequiresStableInput (which
my pipeline is using).  It will create a new operator (or keyed) state per
checkpoint.  I'm not sure that there are any parameters that I have control
over to tweak it's behavior (apart from increasing the checkpoint interval
to let the pipeline run longer before building up that many states).

Perhaps this is something that can be fixed (maybe by unregistering
Operator States after they aren't used any more in the RequiresStableInput
code).  It seems to me that this isn't a Flink issue, but rather a Beam
issue.

Thanks for pointing me in the right direction.

On Thu, Apr 16, 2020 at 11:29 AM Yun Tang  wrote:

> Hi Stephen
>
> I think the state name [1] which would be changed every time might the
> root cause. I am not familiar with Beam code, would it be possible to
> create so many operator states? Did you configure some parameters wrongly?
>
>
> [1]
> https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L95
>
> Best
> Yun Tang
> --
> *From:* Stephen Patel 
> *Sent:* Thursday, April 16, 2020 22:30
> *To:* Yun Tang 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Streaming Job eventually begins failing during
> checkpointing
>
> Correction.  I've actually found a place where it potentially might be
> creating a new operator state per checkpoint:
>
> https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L105
> https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L141-L149
>
> This gives me something I can investigate locally at least.
>
> On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel  wrote:
>
> I can't say that I ever call that directly.  The beam library that I'm
> using does call it in a couple places:
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429
>
> But it seems to be the same descriptor every time.  Is that limit per
> operator?  That is, can each operator host up to 32767 operator/broadcast
> states?  I assume that's by name?
>
> On Wed, Apr 15, 2020 at 10:46 PM Yun Tang  wrote:
>
> Hi  Stephen
>
> This is not related with RocksDB but with default on-heap operator state
> backend. From your exception stack trace, you have created too many
> operator states (more than 32767).
> How do you call context.getOperatorStateStore().getListState or
> context.getOperatorStateStore().getBroadcastState ? Did you pass a
> different operator state descriptor each time?
>
> Best
> Yun Tang
> --
> *From:* Stephen Patel 
> *Sent:* Thursday, April 16, 2020 2:09
> *To:* user@flink.apache.org 
> *Subject:* Streaming Job eventually begins failing during checkpointing
>
> I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's
> configured to use rocksdb, and checkpoint once a minute to hdfs.  This job
> operates just fine for around 20 days, and then begins failing with this
> exception (it fails, restarts, and fails again, repeatedly):
>
> 2020-04-15 13:15:02,920 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
> 2020-04-15 13:15:05,762 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes
> in 2667 ms).
> 2020-04-15 13:16:02,919 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
> 2020-04-15 13:16:03,147 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph-
>  (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from
> RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 32702 for operator  (1/2).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concu

Re: Streaming Job eventually begins failing during checkpointing

2020-04-16 Thread Yun Tang
Hi Stephen

I think the state name [1] which would be changed every time might the root 
cause. I am not familiar with Beam code, would it be possible to create so many 
operator states? Did you configure some parameters wrongly?


[1] 
https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L95

Best
Yun Tang

From: Stephen Patel 
Sent: Thursday, April 16, 2020 22:30
To: Yun Tang 
Cc: user@flink.apache.org 
Subject: Re: Streaming Job eventually begins failing during checkpointing

Correction.  I've actually found a place where it potentially might be creating 
a new operator state per checkpoint:
https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L105https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L141-L149

This gives me something I can investigate locally at least.

On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel 
mailto:merli...@gmail.com>> wrote:
I can't say that I ever call that directly.  The beam library that I'm using 
does call it in a couple places: 
https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429

But it seems to be the same descriptor every time.  Is that limit per operator? 
 That is, can each operator host up to 32767 operator/broadcast states?  I 
assume that's by name?

On Wed, Apr 15, 2020 at 10:46 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi  Stephen

This is not related with RocksDB but with default on-heap operator state 
backend. From your exception stack trace, you have created too many operator 
states (more than 32767).
How do you call context.getOperatorStateStore().getListState or 
context.getOperatorStateStore().getBroadcastState ? Did you pass a different 
operator state descriptor each time?

Best
Yun Tang

From: Stephen Patel mailto:merli...@gmail.com>>
Sent: Thursday, April 16, 2020 2:09
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Streaming Job eventually begins failing during checkpointing

I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's 
configured to use rocksdb, and checkpoint once a minute to hdfs.  This job 
operates just fine for around 20 days, and then begins failing with this 
exception (it fails, restarts, and fails again, repeatedly):

2020-04-15 13:15:02,920 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:15:05,762 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 
2667 ms).
2020-04-15 13:16:02,919 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:16:03,147 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph-  
(1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
32702 for operator  (1/2).}
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for 
operator  (1/2).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.IllegalArgumentException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(O

Re: instance number of user defined function

2020-04-16 Thread lec ssmi
appreciating our reply.


Re: Flink SQL Gateway

2020-04-16 Thread godfrey he
Hi Flavio,

Thanks for the detailed explanation.

I think we should let Catalog know this concept first,
then TableEnvironment or SQL Gateway can do more stuff based on that.
But "trigger" is Database domain concept, I think it need more discuss
whether Flink should support this. also cc @bowenl...@gmail.com

Best,
Godfrey

Jeff Zhang  于2020年4月16日周四 下午10:54写道:

> Hi Flavio,
>
> If you would like to use have a UI to register data sources, run flink sql
> and preview the sql result, then you can use zeppelin directly. You can
> check the tutorial here,
> 1) Get started https://link.medium.com/oppqD6dIg5
>  2) Batch https://
> link.medium.com/3qumbwRIg5  3) Streaming
> https://link.medium.com/RBHa2lTIg5  4)
> Advanced usage https://link.medium.com/CAekyoXIg5
> 
>
> And here's one article shared by someone else about how to use flink on
> zeppelin.
>
> https://medium.com/@abdelkrim.hadjidj/event-driven-supply-chain-for-crisis-with-flinksql-be80cb3ad4f9
>
> Besides that, Zeppelin provides rest api which you can use to integarte
> with other system, but it is not standard jdbc protocol.
> http://zeppelin.apache.org/docs/0.9.0-preview1/usage/rest_api/notebook.html
>
> And I am doing more improvement recently, I will reveal more details in
> next week's flink forward.
>
> https://www.flink-forward.org/sf-2020/conference-program#it%E2%80%99s-finally-here--python-on-flink---flink-on-zeppelin
>
>
>
>
>
> Flavio Pompermaier  于2020年4月16日周四 下午8:24写道:
>
>> Basically we want to give a UI to the user to register its data sources
>> (i.e. catalogs in the Flink world), preview them (SELECT * LIMIT 100 for
>> example) but, in the case of JDBC catalogs, also to see relationships and
>> triggers.
>> We don't want to reimplement the wheel so we would like to reuse and
>> contribute to Flink as much as possible (since then in the batch jobs we
>> use Flink and we don't like to do the same work twice..).
>> In this way we can contribute to Flink if something is missing in the SQL
>> Gateway. However I don't know how to extend the existing stuff (for example
>> if I want table relationships and triggers)..
>>
>> Best,
>> Flavio
>>
>> On Thu, Apr 16, 2020 at 1:38 PM godfrey he  wrote:
>>
>>> Hi Flavio,
>>>
>>> Since 1.11(master), Flink supports "CREATE CATALOG ..." [1],
>>> we can use this statement create catalog dynamically.
>>>
>>> Currently, Catalog[2] dose not supports any operations on TRIGGER.
>>> Flink can't also use such info now. What's your user scenario?
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-15349
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/catalogs.html#catalog-api
>>>
>>> Best,
>>> Godfrey
>>>
>>> Flavio Pompermaier  于2020年4月16日周四 下午6:16写道:
>>>
 Hi Godfrey,
 I'd like to use the SQL gateway as a data proxy in our architecture.
 However, catalogs in our use case are not know at configuration time..
 is there a way to permit to register a JDBC catalog (for example when I
 want to connect to a Postgres database)?
 What if I want to add SHOW TRIGGERS? Do you think it could be
 interesting?

 On Thu, Apr 16, 2020 at 10:55 AM godfrey he 
 wrote:

> Hi Flavio,
>
> We prose FLIP-91[1] to support SQL Gateway at the beginning of this
> year.
> After a long discussion, we reached an agreement that
> SQL Gateway is an eco-system under ververia as first step.[2]
> Which could help SQL Gateway move forward faster.
> Now we almost finish first version development, some users are trying
> it out.
> Any suggestions are welcome!
>
> [1]
> https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit?ts=5e4cd711#heading=h.sru3xzbt2i1k
> [2] https://github.com/ververica/flink-sql-gateway
>
> Best,
> Godfrey
>
> Flavio Pompermaier  于2020年4月16日周四 下午4:42写道:
>
>> Hi Jeff,
>> FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL
>> but since then no progress has been made on that point. Do you think that
>> Zeppelin could be used somehow as a SQL Gateway towards Flink for the
>> moment?
>> Any chance that a Flink SQL Gateway could ever be developed? Is there
>> anybody interested in this?
>>
>> Best,
>> Flavio
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Schema with TypeInformation or DataType

2020-04-16 Thread tison
Hi,

I notice that our type system has two branches. One  is TypeInformation
while the other is
DataType. It is said that Table API will use DataType but there are several
questions about
this statement:

1. Will TypeInformation be deprecated and we use DataType as type system
everywhere?
2. Schema in Table API currently support only TypeInformation to register a
field, shall we support
the DataType way as well?

Best,
tison.


Re: Schema with TypeInformation or DataType

2020-04-16 Thread godfrey he
Hi tison,

>1. Will TypeInformation be deprecated and we use DataType as type system
everywhere?
AFAIK, runtime will still supports TypeInformation, while table module
supports DataType

> 2. Schema in Table API currently support only TypeInformation to register
a field, shall we support
the DataType way as well?
Schema also supports DataType since FLINK-14645[1]

[1] https://issues.apache.org/jira/browse/FLINK-14645

Best,
Godfrey

tison  于2020年4月17日周五 下午2:14写道:

> Hi,
>
> I notice that our type system has two branches. One  is TypeInformation
> while the other is
> DataType. It is said that Table API will use DataType but there are
> several questions about
> this statement:
>
> 1. Will TypeInformation be deprecated and we use DataType as type system
> everywhere?
> 2. Schema in Table API currently support only TypeInformation to register
> a field, shall we support
> the DataType way as well?
>
> Best,
> tison.
>


Re: Schema with TypeInformation or DataType

2020-04-16 Thread Jark Wu
Hi Tison,

Migration from TypeInformation to DataType is a large work and will across
many releases. As far as I can tell, we will finalize the work in 1.11.
As godfrey said above, Flink SQL & Table API should always use DataType,
DataStream uses TypeInformation.

Schema already supports DataType to register a field, and the the method
using TypeInformation to register field is deprecated since 1.10.

Best,
Jark

On Fri, 17 Apr 2020 at 14:14, tison  wrote:

> Hi,
>
> I notice that our type system has two branches. One  is TypeInformation
> while the other is
> DataType. It is said that Table API will use DataType but there are
> several questions about
> this statement:
>
> 1. Will TypeInformation be deprecated and we use DataType as type system
> everywhere?
> 2. Schema in Table API currently support only TypeInformation to register
> a field, shall we support
> the DataType way as well?
>
> Best,
> tison.
>