Re: How do I pass the aggregated results of an SQL TUMBLING Window as one set of data to a Process Function?

2020-12-14 Thread Timo Walther

Hi Marco,

sorry for the late reply. Have you looked into user-defined aggregate 
functions for SQL? I think your requirements can be easily implemented 
there. You can declare multiple aggregate functions per window. There is 
also the built-in function LISTAGG that might help for your use case. 
But Flink SQL aggregate functions support arbirary data types (e.g. 
arrays as result type).


Regarding `do I need to wait another 15 minutes to aggregate this`: This 
is another example of why event time is important. Actually you would 
like to process the data quicker than wall-clock time. If your example 
would work in event-time, the watermark would be emitted after the 
window 1 has been processed and this watermark would also trigger the 
second window immediately without the need to another 15 min in 
processing time.


I hope this helps.

Regards,
Timo

On 12.12.20 01:38, Marco Villalobos wrote:

Alright, maybe my example needs to be more concrete. How about this:
In this example, I don't want to create to windows just to re-combine 
what was just aggregated in SQL.  Is there a way to transform the 
aggregate results into one datastream object so that I don't have to 
aggregate again?



// aggregate this stream for 15 minutes
final Table employeeDailyPurchasesTable =tableEnv.sqlQuery("SELECT\n" +
   " t.organization_id, t.department_id, s.date, s.employee_id, t.fullName, 
t.dob, SUM(s.purchase) AS purchases\n" +

   "FROM\n" +
   " employee_purchases s\n" +
   "LEFT JOIN\n" +
   " employees FOR SYSTEM_TIME AS OF s.procTime AS t ON t.organization = 
s.organization AND t.department = s.department AND t.employee_id = 
s.employee_id\n" +

   "GROUP BY\n" +
   " TUMBLE(s.procTime, INTERVAL '15' MINUTE), t.organization_id, 
t.department_id, s.date, s.employee_id, t.fullName, t.dob");


// now I want everything that was just aggregated processed together,
// below gives me each row again in a stream
final DataStream employeeDailyPurchasesDataStream 
=tableEnv.toAppendStream(employeeDailyPurchasesTable, Row.class);

// so, do I need to wait another 15 minutes to aggregate this? It was 
just aggregated for 15 minutes above!
// how do I get the previous aggregated results into one object so that 
I don't have to wait and aggregate it again

final DataStream aggregatedAgainBecauseINeedHelp 
=employeeDailyPurchasesDataStream
.keyBy(0, 1, 2)
   .window(TumblingProcessingTimeWindows.of(Time.minutes(15)))
   .aggregate(new AggregateFunction() {

  @Override
public DailyEmployeePurchases createAccumulator() {
 return new DailyEmployeePurchases();
}

  @Override
public DailyEmployeePurchases add(Row value, DailyEmployeePurchases 
accumulator) {
 return accumulator.add(value);
}

  @Override
public DailyEmployeePurchases getResult(DailyEmployeePurchases accumulator) {
 return accumulator;
}

  @Override
public DailyEmployeePurchases merge(DailyEmployeePurchases a, 
DailyEmployeePurchases b) {
 return a.merge(b);
}
   });

// important business logic that needs to be applied to the group of 
employees

aggregatedAgainBecauseINeedHelp.keyBy("organizationId", "departmentId")
   .process(new KeyedProcessFunction() {

  @Override
public void processElement(DailyEmployeePurchases value, Context ctx, 
Collector out)throws Exception {
 // very important stuff here
}
   });







Re: Is there any way to directly convert org.apache.flink.table.api.Table into my POJO.

2020-12-14 Thread Timo Walther

Hi,

first, we should clarify "continue to be put into the Flink table": A 
Flink Table object does not physically store the data. It is basically a 
view that contains a transformation pipeline.


When you are calling `collect()` the pipeline is executed and all 
results from the cluster are streamed to one local machine (this might 
be a bottleneck when processing large data). It might reveal a design 
issue in your pipeline because ideally all logic should be expressed in 
Flink SQL or a DataStream API transformations.


In general, Flink SQL comes with basic structured type support. A 
structured type is basically a business POJO. Starting from Flink 1.11, 
a structured type can be created and passed through UDFs. However, 
connectors and collect() cannot return them yet. If you really don't 
want to implement conversion logic yourself, you can also take a look at 
internal converters:

org.apache.flink.table.data.conversion.DataStructureConverters
In theory, you can convert from Row -> RowData -> POJO.

I hope this helps.

Regards,
Timo


On 13.12.20 06:57, Luo Jason wrote:

Hello, I'm new to Flink. Thank you for your help.

My application scenario is to process the log through the Flink program, 
and finally store the log in HBase.



Through Kafka, my Flink application receives log information from other 
systems. This information can not be immediately sent to  HBASE. I first 
store these logs into the flink table. After new logs are received, the 
associated logs will be selected from the link table for calculation. 
According to the calculation results, they will be stored in HBase or 
continue to be put into the Flink table.


My problem is that when I use SQL statements to query the data structure 
from the flink table as org.apache.flink.table.api.Table From Flink In 
the document, the method I learned is to use the 
org.apache.flink.util.CloseableIterator The iterator of < row > loops 
through each row to obtain the corresponding field by position. But this 
is too troublesome. Is there any way to directly convert a Table into my 
business POJO.


In addition, whether there is a way to insert POJO into the link table, 
I do not seem to see a suitable method.


thanks.

Json
12.13




Re: How does Flink cache values that also do not exist in the database?

2020-12-14 Thread Timo Walther

Hi Marco,

when you say "database" are you refering to the JDBC connector or would 
you like to perform a JDBC query within some UDF? In the latter case, I 
would recommend to use Flink's ProcessFunction because you can store the 
cache hits in state (and thus keep them forever). SQL/Table API does not 
expose state functionality. Aggregate functions are the only type of 
functions that are stateful but not useful in this case I guess.


Regards,
Timo

On 07.12.20 17:14, Marco Villalobos wrote:

How does Flink cache values that also do not exist in the database?

I would like to cache hits forever, but I would like to check items that do not 
exist in the database only every 15 minutes? Is there a way to set that up in 
the SQL / Table api?  Also, is there a way to set that up in Keyed State?





Re: How to debug a Flink Exception that does not have a stack trace?

2020-12-14 Thread Till Rohrmann
Thanks a lot for this information Steven. I learned again something :-)

Cheers,
Till

On Sat, Dec 12, 2020 at 9:02 PM Dan Hill  wrote:

> Thanks!  That makes sense.
>
> On Sat, Dec 12, 2020 at 11:13 AM Steven Wu  wrote:
>
>> This is a performance optimization in JVM when the same exception is
>> thrown too frequently. You can set `-XX:-OmitStackTraceInFastThrow` to
>> disable the feature. You can typically find the full stack trace in the log
>> before the optimization kicks in.
>>
>> On Sat, Dec 12, 2020 at 2:05 AM Till Rohrmann 
>> wrote:
>>
>>> Ok, then let's see whether it reoccurs. What you could do is to revert
>>> the fix and check the stack trace again.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sat, Dec 12, 2020, 02:16 Dan Hill  wrote:
>>>
 Hmm, I don't have a good job I can separate for reproduction.  I was
 using Table SQL and inserting a long field (which was null) into a table
 that sinked out to avro.  The exception was being thrown from this Avro
 function.  I can watch to see if it keeps happening.


 https://github.com/apache/avro/blob/e982f2e6ee57c362a7fae21ba7373c1cfc964fce/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java#L127

 On Fri, Dec 11, 2020 at 3:31 AM Till Rohrmann 
 wrote:

> Hi Dan,
>
> Do you have an example job and some sample data to reproduce this
> problem? I couldn't reproduce it locally with a simple example job.
>
> Cheers,
> Till
>
> On Thu, Dec 10, 2020 at 5:51 PM Dan Hill 
> wrote:
>
>> Yea, the error makes sense and was an easy fix.
>>
>> Any idea what happened with the hidden stacktrace?  The hidden
>> stacktrace made this 100x more difficult.
>>
>> On Thu, Dec 10, 2020 at 12:59 AM Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> It looks like the problem is that there's a problem in reading a
>>> null value in the AvroRowDataDeserializationSchema (see below for the
>>> snippet of code from Flink 1.11.1).
>>> The problem is due to the fact that there's a bad typing of the
>>> source so the call to createConverter()
>>> within the createNullableConverter() returns null, creating a null on
>>> fieldConverters[i] and, in the end, a NullPointer in
>>> fieldConverters[i].convert(). Does it make sense?
>>>
>>> static DeserializationRuntimeConverter createRowConverter(RowType
>>> rowType) {
>>> final DeserializationRuntimeConverter[] fieldConverters =
>>> rowType.getFields().stream()
>>> .map(RowType.RowField::getType)
>>>
>>> .map(AvroRowDataDeserializationSchema::createNullableConverter)
>>> .toArray(DeserializationRuntimeConverter[]::new);
>>> final int arity = rowType.getFieldCount();
>>> return avroObject -> {
>>> IndexedRecord record = (IndexedRecord) avroObject;
>>> GenericRowData row = new GenericRowData(arity);
>>> for (int i = 0; i < arity; ++i) {
>>> row.setField(i,
>>> fieldConverters[i].convert(record.get(i)));
>>> }
>>> return row;
>>> };
>>> }
>>>
>>> Best,
>>> Flavio
>>>
>>> On Thu, Dec 10, 2020 at 8:39 AM Dan Hill 
>>> wrote:
>>>
 One of the Exception instances finally reported a stacktrace.  I'm
 not sure why it's so infrequent.

 java.lang.NullPointerException: null

 at
 org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
 ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]

 at
 org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$7(RowData.java:338)
 ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]

 at
 org.apache.flink.formats.avro.AvroRowDataSerializationSchema.lambda$createRowConverter$6827278$1(AvroRowDataSerializationSchema.java:177)
 ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?]

 at
 org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.addElement(AvroFileSystemFormatFactory.java:251)
 ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?]

 at
 org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.addElement(AvroFileSystemFormatFactory.java:247)
 ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?]

 at
 org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:498)
 ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]

 at
 org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTa

unsubscribe

2020-12-14 Thread yuguangyuan
unsubscribe



Re: flink-cdc-connector 使用场景和限制是什么?

2020-12-14 Thread Chesnay Schepler

Moving to chinese user mailing ist.

On 12/14/2020 3:19 AM, 陈帅 wrote:
传统CDC方式是通过 mysql -> debezium -> kafka, 
这样便于DBA管控资源,因为像postgres库需要创建slot资源,但如果像 
flink-cdc-connector 
每(几)张表就创建一个CDC流的话对数据库的资源要求很高,而且不可控。所以我的理解flink-cdc-connector更适合少量的即席cdc处理,而不是大规模的cdc处理,不知我的理解对不对?







Re: Never terminating test ...

2020-12-14 Thread Chesnay Schepler
My guess would be that the consumer does not stop running once it 
exhausted the kinesis stream. Which makes sense since this isn't a batch 
job.
(Wouldn't want the source to shut down just because it happened to catch 
up with your input ;) )


On 12/14/2020 8:09 AM, Avi Levi wrote:


I have the following test. the problem is it doesn't end ... meaning 
it doesn't reach the assertion point. What am I doing wrong?


|"kinesis consumer" should "consume message from kinesis stream" in { 
import ExecutionContext.Implicits.global val sampleData = Seq("a", 
"b", "c") val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment env.addSource(new 
FlinkKinesisConsumer[String]( "SampleStream", new SimpleStringSchema, 
consumerConfig)) .addSink(new TestSink[String]) 
Future(createSampleDataStream(sampleData)) //publish to kinesis stream 
env.execute() TestSink.values should contain theSameElementsAs 
(sampleData) //not executed }|





Trying to simulate the Split Distinct Aggregation optimizations from Table API

2020-12-14 Thread Felipe Gutierrez
Hi,

I am trying to simulate the "Split Distinct Aggregation" [1] with the
data from Taxi Ride. I am using the following query:

SELECT dayOfTheYear, COUNT(DISTINCT driverId) FROM TaxiRide GROUP BY
dayOfTheYear

and I am analyzing the different methods for optimizing. So I started
using (1) no optimization, then the (2) "table.exec.mini-batch.size" =
TRUE with "table.optimizer.agg-phase-strategy" = "ONE_PHASE", then I
changed to (3) "table.optimizer.agg-phase-strategy" = "TWO_PHASE", and
finally I use the (4) "table.optimizer.distinct-agg.split.enabled" =
TRUE.

What does the sentence "COUNT DISTINCT is not good at reducing records
if the value of distinct key (i.e. user_id) is sparse." mean on the
website? In my case the distinct key is the driverId. So, should I
change the data source to have a lot of null values on the driverId
column?

I am asking because I tested this query with optimizations (1) and (2)
I got ~20K r/s on each operator (parallelism of 8) when I set the
workload to ~200K r/s. This is almost the total workload. Then I
changed only the optimization to (3, TWO_PHASE) and maximum throughput
reaches only 4K.

I think that the problem is in my data that the query with distinct is
consuming. So, how should I prepare the data to see the optimization
of split distinct take effect?

Thanks,
Felipe

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com


Re: Flink 1.12

2020-12-14 Thread Chesnay Schepler

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services 
documentation that it also works for the native Kubernetes integration.


   /Kubernetes high availability services can only be used when
   deploying to Kubernetes. Consequently, they can be configured when
   using //standalone Flink on Kubernetes
   
//or
   the //native Kubernetes integration
   

   /

From what I understand you only need to configure the 3 listed options; 
the documentation also contains an example configuration 
.


On 12/14/2020 4:52 AM, Boris Lublinsky wrote:

It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution 
https://flink.apache.org/downloads.html specifies Scala versions, but 
not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here 
https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, 
but Flink native Kubernetes support 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has 
no mentioning of HA. Are the 2 integrated? DO you have any examples of 
starting HA cluster using Flink native Kubernetes?






Re: unsubscribe

2020-12-14 Thread Chesnay Schepler
To unsubscribe from this mailing list, please send a mail to 
/user-unsubscr...@flink.apache.org/ .


On 12/14/2020 10:25 AM, yuguangyuan wrote:

unsubscribe






Re: How to tell when flink is done restoring from a savepoint

2020-12-14 Thread Chesnay Schepler
I do not believe there is anything in the UI, or the logs for that 
matter, that give a definite answer to that.
I suppose if a new checkpoint was completed then you can be sure the 
state was restored.


FLINK-19013 

On 12/14/2020 6:40 AM, Rex Fenley wrote:

Hi,

Every time I restore from a savepoint it looks like it can take 20+ 
min to restore given the network i/o graphs I'm seeing. However, I 
can't find a way to see in the Flink UI if the savepoint is currently 
restoring or if it's finished to be sure. Is there a way to tell if 
flink is in the middle of restoring or not?


Thanks

--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG  | 
FOLLOW US  | LIKE US 







upsert-kafka to do temporal table joins

2020-12-14 Thread guoliubi...@foxmail.com
I just try to follow the example list in the page
https://flink.apache.org/news/2020/12/10/release-1.12.0.html#table-apisql-support-for-temporal-table-joins-in-sql
 

Unfortunately when I try to upload the python file to flink cluster, the error 
occured:

py4j.protocol.Py4JJavaError: An error occurred while calling o2.sqlQuery.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
"ON" at line 8, column 5.
Was expecting one of:

"EXCEPT" ...
"FETCH" ...
"GROUP" ...

 Maybe I miss something?


guoliubi...@foxmail.com


Re: Getting an exception while stopping Flink with savepoints on Kubernetes+Minio

2020-12-14 Thread Folani
Hi Piotrek,

Sorry for late response.
I have another problem with setting logs. I think the logging issue comes
from using Flink on my host machine and running a job on a jobmanager in
K8s. I'm managing the issue. But, this is what I got in /log folder of my
host machine:



2020-12-14 15:04:51,329 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -

2020-12-14 15:04:51,331 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -  Starting Command Line Client (Version: 1.12.0, Scala: 2.11,
Rev:fc00492, Date:2020-12-02T08:49:16+01:00)
2020-12-14 15:04:51,331 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -  OS current user: folani
2020-12-14 15:04:51,332 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -  Current Hadoop/Kerberos user: 
2020-12-14 15:04:51,332 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.212-b04
2020-12-14 15:04:51,332 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -  Maximum heap size: 3538 MiBytes
2020-12-14 15:04:51,332 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -  JAVA_HOME: (not set)
2020-12-14 15:04:51,332 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -  No Hadoop Dependency available
2020-12-14 15:04:51,332 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -  JVM Options:
2020-12-14 15:04:51,332 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -
-Dlog.file=/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/log/flink-folani-client-hralaptop.log
2020-12-14 15:04:51,332 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -
-Dlog4j.configuration=file:/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/conf/log4j-cli.properties
2020-12-14 15:04:51,332 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -
-Dlog4j.configurationFile=file:/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/conf/log4j-cli.properties
2020-12-14 15:04:51,332 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -
-Dlogback.configurationFile=file:/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/conf/logback.xml
2020-12-14 15:04:51,332 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -  Program Arguments:
2020-12-14 15:04:51,333 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] - stop
2020-12-14 15:04:51,334 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] - -p
2020-12-14 15:04:51,334 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] - cc4a9a04e164fff8628b3ac59e5fbe80
2020-12-14 15:04:51,334 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -  Classpath:
/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/lib/flink-csv-1.12.0.jar:/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/lib/flink-json-1.12.0.jar:/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/lib/flink-shaded-zookeeper-3.4.14.jar:/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/lib/flink-table_2.11-1.12.0.jar:/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/lib/flink-table-blink_2.11-1.12.0.jar:/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/lib/log4j-1.2-api-2.12.1.jar:/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/lib/log4j-api-2.12.1.jar:/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/lib/log4j-core-2.12.1.jar:/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar:/home/folani/Softwares/flink-1.12.0-bin-scala_2.11/flink-1.12.0/lib/flink-dist_2.11-1.12.0.jar:::
2020-12-14 15:04:51,334 INFO  org.apache.flink.client.cli.CliFrontend   
  
[] -

2020-12-14 15:04:51,336 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.rpc.address, localhost
2020-12-14 15:04:51,336 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.rpc.port, 6123
2020-12-14 15:04:51,337 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.memory.process.size, 1024m
2020-12-14 15:04:51,337 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.memory.process.size, 1024m
2020-12-14 15:04:51,337 INFO 
org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2020-12-14 15:04:51,337 INFO 
org.apache.flink.configuration.Glo

How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

2020-12-14 Thread Dongwon Kim
Hi,

The following program compiles and runs w/o exceptions:

> public class Test {
>
>   public static class A {
> private int n;
>
> public A() { }
> public int getN() {  return n;  }
> public void setN(int n) {  this.n = n;  }
>   }
>
>   public static class B {
> private List lst;
>
> public B() { }
> public List getLst() {  return lst;  }
> public void setLst(List lst) {  this.lst = lst;  }
>   }
>
>   public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
>
> env.fromElements(new B())
>   .print();
>
> env.execute();
>   }
> }
>

When I add the following line,

> env.getConfig().disableGenericTypes();

then the program shows me an exception:

> Exception in thread "main" java.lang.UnsupportedOperationException:
> Generic types have been disabled in the ExecutionConfig and type
> java.util.List is treated as a generic type.
> at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
> at Test.main(Test.java:29)


To avoid this exception, I found that I have to declare a type factory like:

>   public static class BTypeFactory extends TypeInfoFactory {
> @Override
> public TypeInformation createTypeInfo(Type t, Map TypeInformation> genericParameters) {
>   return Types.POJO(
> B.class,
> ImmutableMap.>builder()
>   .put("lst", Types.LIST(Types.POJO(A.class)))
> .build()
>   );
> }
>   }

and give it to class B as follows:

>   @TypeInfo(BTypeFactory.class)
>   public static class B {


Is there no other way but to declare BTypeFactory in such cases?
I don't like the way I have to type a field name twice, one for a member
variable and the other for an Map entry in TypeInfoFactory.

Thanks in advance,

Dongwon


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-14 Thread Yun Gao

Hi all,

I would like to resume this discussion for supporting checkpoints after 
tasks Finished :) Based on the previous discussion, we now implement a version 
of PoC [1] to try the idea. During the PoC we also met with some possible 
issues:

1. To include EndOfPartition into consideration for barrier alignment at 
the TM side, we now tend to decouple the logic for EndOfPartition with the 
normal alignment behaviors to avoid the complex interference (which seems to be 
a bit not trackable). We could do so by inserting suitable barriers for input 
channels received but not processed EndOfPartition. For example, if a task with 
four inputs has received barrier 2 from two input channels, but the other two 
inputs do not received barrier 2  before EndOfPartition due to the precedent 
tasks are finished, we could then insert barrier 2 for the last two channels so 
that we could still finish the checkpoint 2.
2. As we have discussed, if a tasks finished during we triggering the 
tasks, it would cause checkpoint failure and we should re-trigger its 
descendants. But if possible we think we might skip this issue at the first 
version to reduce the implementation complexity since it should not affect the 
correctness. We could considering support it in the following versions.

3. We would have to add a field isFinished  to OperatorState so that we 
could not re-run finished sources after failover. However, this would require a 
new version of checkpoint meta. Currently Flink have an abstract 
MetaV2V3SerializerBase and have V2 and V3 extends it to share some 
implementation. To add V4 which is only different from V3 for one field, the 
current PoC want to introduce a new MetaV3V4SerializerBase extends 
MetaV2V3SerializerBase to share implementation between V3 and V4. This might 
looks a little complex and we might need a general mechanism to extend 
checkpoint meta format. 

4. With the change StreamTask would have two types of subclasses according 
to how to implement triggerCheckpoint, one is source tasks that perform 
checkpoints immediately and another is the non-source tasks that would notify 
CheckpointBarrierHandler in some way. However, since we have multiple source 
tasks (legacy and new source) and multiple non-source tasks (one-input, 
two-input, multiple-input), it would cause the cases that multiple subclasses 
share the same implementation and  cause code repetition. Currently the PoC 
introduces a new level of abstraction, namely SourceStreamTasks and 
NonSourceStreamTasks, but what makes it more complicated is that 
StreamingIterationHead extends OneInputStreamTask but it need to perform 
checkpoint as source tasks.

Glad to hear your opinions!

Best,
 Yun

[1] https://github.com/gaoyunhaii/flink/commits/try_checkpoint_6 , starts from 
commit f8005be1ab5e5124e981e56db7bdf2908f4a969a.

Fine-grained task recovery

2020-12-14 Thread Stanislav Borissov
Hi,

I'm running a simple, "embarassingly parallel" ETL-type job. I noticed that
a failure in one subtask causes the entire job to restart. Even with the
region failover strategy, all subtasks of this task and connected ones
would fail. Is there any way to limit restarting to only the single subtask
that failed, so all other subtasks can stay alive and keep working?

For context, I use Flink 1.11 in AWS Kinesis Data Analytics, so some
configuration is not controlled by me

.

Thanks


Re: Flink jobmanager TLS connectivity to Zookeeper

2020-12-14 Thread Azeem Mufti
Hey Matthias,

I have and it doesn't seem like there are any native properties
that support this interaction. I did try enabling the rest/internal SSL
properties to see if that would work but when my jobmanager tries to make a
connection to zookeeper, zookeeper is rejecting the connection saying it's
not a TLS/SSL record.

Thanks,
Azeem

On Thu, Dec 10, 2020 at 9:36 AM Matthias Pohl 
wrote:

> Hi Azeem,
> I haven't worked with Flink's SSL support, yet. But have you taken a look
> at the SSL configuration options listed under [1]?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/security/security-ssl.html#complete-list-of-ssl-options
>
> On Tue, Dec 8, 2020 at 8:01 PM Azeem Mufti 
> wrote:
>
>>  I'm trying to figure out a way to make Flink jobmanager (in HA) connect
>> to zookeeper over SSL/TLS. It doesn't seem like there are native properties
>> like Kafka has that support this interaction yet. Is this true or is there
>> some way that I can go about doing this?
>>
>>
>
> --
>
> Matthias Pohl | Engineer
>
> Follow us @VervericaData Ververica 
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>


Re: How to tell when flink is done restoring from a savepoint

2020-12-14 Thread Rex Fenley
Ok, thank you.

On Mon, Dec 14, 2020 at 2:07 AM Chesnay Schepler  wrote:

> I do not believe there is anything in the UI, or the logs for that matter,
> that give a definite answer to that.
> I suppose if a new checkpoint was completed then you can be sure the state
> was restored.
>
> FLINK-19013 
>
> On 12/14/2020 6:40 AM, Rex Fenley wrote:
>
> Hi,
>
> Every time I restore from a savepoint it looks like it can take 20+ min to
> restore given the network i/o graphs I'm seeing. However, I can't find a
> way to see in the Flink UI if the savepoint is currently restoring or if
> it's finished to be sure. Is there a way to tell if flink is in the middle
> of restoring or not?
>
> Thanks
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: pause and resume flink stream job based on certain condition

2020-12-14 Thread Eleanore Jin
Hi Guowei and Arvid,

Thanks for the suggestion. I wonder if it makes sense and possible that the
operator will produce a side output message telling the source to 'pause',
and the same side output as the side input to the source, based on which,
the source would pause and resume?

Thanks a lot!
Eleanore

On Sun, Nov 29, 2020 at 11:33 PM Arvid Heise  wrote:

> Hi Eleanore,
>
> if the external system is down, you could simply fail the job after a
> given timeout (for example, using asyncIO). Then the job would restart
> using the restarting policies.
>
> If your state is rather small (and thus recovery time okay), you would
> pretty much get your desired behavior. The job would stop to make progress
> until eventually the external system is responding again.
>
> On Mon, Nov 30, 2020 at 7:39 AM Guowei Ma  wrote:
>
>> Hi, Eleanore
>>
>> 1. AFAIK I think only the job could "pause" itself.  For example the
>> "query" external system could pause when the external system is down.
>> 2. Maybe you could try the "iterate" and send the failed message back to
>> retry if you use the DataStream api.
>>
>> Best,
>> Guowei
>>
>>
>> On Mon, Nov 30, 2020 at 1:01 PM Eleanore Jin 
>> wrote:
>>
>>> Hi experts,
>>>
>>> Here is my use case, it's a flink stateless streaming job for message
>>> validation.
>>> 1. read from a kafka topic
>>> 2. perform validation of message, which requires query external system
>>>2a. the metadata from the external system will be cached in
>>> memory for 15minutes
>>>2b. there is another stream that will send updates to update the
>>> cache if metadata changed within 15 minutes
>>> 3. if message is valid, publish to valid topic
>>> 4. if message is invalid, publish to error topic
>>> 5. if the external system is down, the message is marked as invalid with
>>> different error code, and published to the same error topic.
>>>
>>> Ask:
>>> For those messages that failed due to external system failures, it
>>> requires manual replay of those messages.
>>>
>>> Is there a way to pause the job if there is an external system failure,
>>> and resume once the external system is online?
>>>
>>> Or are there any other suggestions to allow auto retry such error?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Direct Memory full

2020-12-14 Thread Rex Fenley
Hello,

Our job consistently shows
Outside JVM
Type
Count
Used
Capacity
*Direct* 32,839 1.03 GB 1.03 GB
for direct memory.

Is it typical for it to be full? What are the consequences that we may not
be noticing of direct memory being full?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US