Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-12-03 Thread Pierre Oberholzer
Hi Xingbo, Wei, Dian,

Many thanks for this plus for the high quality and prompt support overall.
Let’s close this thread here. Looking forward trying your approach.
Community, feel free to reach out with additional remarks and experiences
on structured streaming on complex/sparse objects.

Best regards,

Le jeu. 3 déc. 2020 à 08:47, Xingbo Huang  a écrit :

> Hi Pierre,
>
> The serialization/deserialization of sparse Row in flink is specially
> optimized. The principle is that each Row will have a leading mask when
> serializing to identify whether the field at the specified position is
> NULL, and one field corresponds to one bit. For example, if you have 10k
> fields, then there will be a mask of 10k bit / 8 = 1250 byte. In this way,
> the serialization/deserialization overhead can be omitted for those field
> values that are NULL.
>
> For specific code optimization logic, you can refer to java logic[1], or
> python logic[2] and cython logic[3].
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java#L185
> [2]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py#L100
> [3]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/coder_impl_fast.pyx#L697
>
> Best,
> Xingbo
>
> Pierre Oberholzer  于2020年12月3日周四 下午3:08写道:
>
>> Hi Xingbo, Community,
>>
>> Thanks a lot for your support.
>> May I finally ask to conclude this thread, including wider audience:
>> - Are serious performance issues to be expected with 100k fields per ROW
>> (i.e. due solely to metadata overhead and independently of queries logic) ?
>> - In sparse population (say 99% sparsity) already optimized in the ROW
>> object or are sparse types on your roadmap ?
>> Any experience with sparse Table from other users (including benchmarks
>> vs. other frameworks) are also highly welcome.
>>
>> Thanks !
>>
>> Best
>>
>>
>> Le jeu. 3 déc. 2020 à 02:53, Xingbo Huang  a écrit :
>>
>>> Hi Pierre,
>>>
>>> This example is written based on the syntax of release-1.12 that is
>>> about to be released, and the test passed. In release-1.12, input_type can
>>> be omitted and expression can be used directly. If you are using
>>> release-1.11, you only need to modify the grammar of udf used slightly
>>> according to the udf documentation[1].
>>>
>>> The flink table connector supports avro format, please refer to the
>>> document[2].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format
>>>
>>> Best,
>>> Xingbo
>>>
>>> Pierre Oberholzer  于2020年12月3日周四 上午2:57写道:
>>>
 Hi Xingbo,

 Nice ! This looks a bit hacky, but shows that it can be done ;)

 I just got an exception preventing me running your code, apparently
 from udf.py:

 TypeError: Invalid input_type: input_type should be DataType but
 contains None

 Can you pls check again ?
 If the schema is defined is a .avsc file, do we have to parse it and
 rebuild those syntax (ddl and udf) and or is there an existing component
 that could be used ?

 Thanks a lot !

 Best,


 Le mer. 2 déc. 2020 à 04:50, Xingbo Huang  a
 écrit :

> Hi Pierre,
>
> I wrote a PyFlink implementation, you can see if it meets your needs:
>
>
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
> DataTypes
> from pyflink.table.udf import udf
>
>
> def test():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env,
>
> environment_settings=EnvironmentSettings.new_instance()
>
> .in_streaming_mode().use_blink_planner().build())
>
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>   '80m')
>
> # 10k nested columns
> num_field = 10_000
> fields = ['f%s INT' % i for i in range(num_field)]
> field_str = ','.join(fields)
> t_env.execute_sql(f"""
> CREATE TABLE source_table (
> f0 BIGINT,
> f1 DECIMAL(32,2),
> f2 ROW<${field_str}>,
> f3 TIMESTAMP(3)
> ) WITH (
>   'connector' = 'datagen',
>   'number-of-rows' = '2'
> )
> """)
>
> t_env.execute_sql(f"""
> CREATE TABLE print_table (
>  f0 BIGINT,
>  f1 DECIMAL(32,2),
>  f2 ROW<${field_str}>,
>  f3 TIMESTAMP(3)
> ) WITH (

Re: Routing events to different kafka topics dynamically

2020-12-03 Thread Prasanna kumar
Thanks Till,

Able to deduce topics by extending the KafkaSerializarion Schema class.

Prasanna.

On Wed, Dec 2, 2020 at 11:18 PM Till Rohrmann  wrote:

> Hi Prasanna,
>
> I believe that what Aljoscha suggestd in the linked discussion is still
> the best way to go forward. Given your description of the problem this
> should actually be pretty straightforward as you can deduce the topic from
> the message. Hence, you just need to create the ProducerRecord with the
> right target topic you extracted from the record/message.
>
> Cheers,
> Till
>
> On Wed, Dec 2, 2020 at 5:28 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Hi,
>>
>> Events need to be routed to different kafka topics dynamically based upon
>> some info in the message.
>>
>> We have implemented using KeyedSerializationSchema similar to
>> https://stackoverflow.com/questions/49508508/apache-flink-how-to-sink-events-to-different-kafka-topics-depending-on-the-even.
>> But its deprecated and we cannot use it for production.
>>
>> I looked at the alternative KafkaSerializationSchema but there i do not
>> see an option there.
>>
>> Then i stumbled upon this
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Usage-of-KafkaDeserializationSchema-and-KafkaSerializationSchema-td32347.html.
>> which asks us to use KafkaContextAware.
>>
>> Is there a more intuitive/easier way to do the same ?
>>
>> Thanks,
>> Prasanna.
>>
>>
>>


Re: Running Flink job as a rest

2020-12-03 Thread Chesnay Schepler
What you are asking for is an HTTP(s) source. This currently does not 
exist for Flink, so you would have to implement it yourself.
Additionally you would have to figure out the host on which the source 
runs on yourself.


It may be easier to setup a separate HTTP(s) server that accepts data, 
which you then query from the source.


On 12/2/2020 10:31 PM, dhurandar S wrote:

Can Flink job be running as Rest Server, Where Apache Flink job is
listening on a port (443). When a user calls this URL with payload,
data directly goes to the Apache Flink windowing function.

Right now Flink can ingest data from Kafka or Kinesis, but we have a use
case where we would like to push data to Flink, where Flink is listening on
a port





Query regarding HA mode and checkpointing

2020-12-03 Thread Kaushal Raj
Hello,

I am new to flink. Have few queries regarding the HA mode with zookeeper
and checkpointing. When flink is configured in HA mode with zookeeper,
where do the job checkpoints are stored? zookeeper only used for recovering
the jobmanager or even checkpoints? what is the significance of
*"**high-availability.storageDir"
*here.

Thanks,
Kaushal


Re: Application Mode support on VVP v2.3

2020-12-03 Thread Fabian Paul
Hi Narasimha,

Nothing comes to my mind immediately why it should not work. We are using the 
StandaloneApplicationClusterEntryPoint to start the cluster. Can you provide 
some more information about which Flink image on vvp are you trying to use and 
maybe show the error message?

Best,
Fabian

Re: Routing events to different kafka topics dynamically

2020-12-03 Thread Till Rohrmann
Great to hear :-)

Cheers,
Till

On Thu, Dec 3, 2020 at 10:15 AM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Thanks Till,
>
> Able to deduce topics by extending the KafkaSerializarion Schema class.
>
> Prasanna.
>
> On Wed, Dec 2, 2020 at 11:18 PM Till Rohrmann 
> wrote:
>
>> Hi Prasanna,
>>
>> I believe that what Aljoscha suggestd in the linked discussion is still
>> the best way to go forward. Given your description of the problem this
>> should actually be pretty straightforward as you can deduce the topic from
>> the message. Hence, you just need to create the ProducerRecord with the
>> right target topic you extracted from the record/message.
>>
>> Cheers,
>> Till
>>
>> On Wed, Dec 2, 2020 at 5:28 PM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Events need to be routed to different kafka topics dynamically based
>>> upon some info in the message.
>>>
>>> We have implemented using KeyedSerializationSchema similar to
>>> https://stackoverflow.com/questions/49508508/apache-flink-how-to-sink-events-to-different-kafka-topics-depending-on-the-even.
>>> But its deprecated and we cannot use it for production.
>>>
>>> I looked at the alternative KafkaSerializationSchema but there i do not
>>> see an option there.
>>>
>>> Then i stumbled upon this
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Usage-of-KafkaDeserializationSchema-and-KafkaSerializationSchema-td32347.html.
>>> which asks us to use KafkaContextAware.
>>>
>>> Is there a more intuitive/easier way to do the same ?
>>>
>>> Thanks,
>>> Prasanna.
>>>
>>>
>>>


Re: Partitioned tables in SQL client configuration.

2020-12-03 Thread Jark Wu
Only legacy connectors (`connector.type=kafka` instead of
`connector=kafka`) are supported in the YAML at the moment. You can use
regular DDL instead. There is a similar discussion in
https://issues.apache.org/jira/browse/FLINK-20260 these days.

Best,
Jark

On Thu, 3 Dec 2020 at 00:52, Till Rohrmann  wrote:

> Hi Maciek,
>
> I am pulling in Timo who might help you with this problem.
>
> Cheers,
> Till
>
> On Tue, Dec 1, 2020 at 6:51 PM Maciek Próchniak  wrote:
>
>> Hello,
>>
>> I try to configure SQL Client to query partitioned ORC data on local
>> filesystem. I have directory structure like that:
>>
>> /tmp/table1/startdate=2020-11-28
>>
>> /tmp/table1/startdate=2020-11-27
>>
>> etc.
>>
>>
>> If I run SQL Client session and create table by hand:
>>
>> create table tst (column1 string, startdate string) partitioned by
>> (startdate) with ('connector'='filesystem', 'format'='orc',
>> 'path'='/tmp/table1');
>>
>> everything runs fine:
>>
>> explain select * from tst where startdate='2020-11-27'
>>
>> shows that only one partition in 'readPartitions'
>>
>>
>> However, I struggle to configure table in .yaml config.
>>
>> I tried like this (after some struggle, as "partition.keys" setting
>> doesn't seem to be documented...) :
>>
>> tables:
>>- name: tst2
>>  type: source-table
>>  connector: filesystem
>>  path: "/tmp/table1"
>>  format: orc
>>  partition.keys:
>>- name: startdate
>>  schema:
>>- name: column1
>>  data-type: string
>>- name: startdate
>>  data-type: string
>>
>> and it more or less works - queries are executed properly. However,
>> partitions are not pruned:
>>
>> explain select * from tst2 where startdate='2020-11-27'
>>
>> show all partitions in 'readPartitions'
>>
>>
>> Any idea what can be wrong? I'm using Flink 1.11.2
>>
>>
>> thanks,
>>
>> maciek
>>
>>
>>


Re: Question: How to avoid local execution being terminated before session window closes

2020-12-03 Thread Klemens Muthmann

Hi,

Thanks for the hint. The infinite loop was the solution and my pipeline 
works now.


Regards

    Klemens

Am 24.11.20 um 16:59 schrieb Timo Walther:
For debugging you can also implement a simple non-parallel source 
using 
`org.apache.flink.streaming.api.functions.source.SourceFunction`. You 
would need to implement the run() method with an endless loop after 
emitting all your records.


Regards,
Timo

On 24.11.20 16:07, Klemens Muthmann wrote:

Hi,

Thanks for your reply. I am using processing time instead of event 
time, since we do get the events in batches and some might arrive 
days later.


But for my current dev setup I just use a CSV dump of finite size as 
input. I will hand over the pipeline to some other guys, who will 
need to integrate it with an Apache Kafka Service. Output is written 
to a Postgres-Database-System.


I'll have a look at your proposal and let you know if it worked, 
after having finished a few prerequisite parts.


Regards

 Klemens

Am 24.11.20 um 12:59 schrieb Timo Walther:

Hi Klemens,

what you are observing are reasons why event-time should be 
preferred over processing-time. Event-time uses the timestamp of 
your data while processing-time is to basic for many use cases. Esp. 
when you want to reprocess historic data, you want to do that at 
full speed instead of waiting 1 hour for 1-hour-windows.


If you want to use processing-time nevertheless, you need to use a 
source that produced unbounded streams instead of bounded streams 
such that the pipeline execution theoretically is infinite. Some 
documentation can be found here [1] where you need to use the 
`FileProcessingMode.PROCESS_CONTINUOUSLY`. But what kind of 
connector are you currently using?


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources 



On 24.11.20 09:59, Klemens Muthmann wrote:

Hi,

I have written an Apache Flink Pipeline containing the following 
piece of code (Java):


stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(50))).aggregate(new 
CustomAggregator()).print();


If I run the pipeline using local execution I see the following 
behavior. The "CustomAggregator" calls the `createAccumulator` and 
`add` methods correctly with the correct data. However it never 
calls `getResult` and my pipeline simply finishes.


So I did a little research and found out that it works if I change 
the code to:


stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1))).aggregate(new 
CustomAggregator()).print();


Notice the reduced gap time for the processing time session window. 
So it seems that execution only continues if the window has been 
closed and if that takes too long, the execution simply aborts. I 
guess another factor playing a part in the problem is, that my 
initial data is read in much faster than 50 seconds. This results 
in the pipeline being in a state where it only waits for the window 
to be closed and having nothing else to do it decides that there is 
no work left and simply shuts down.


My question now is if it is possible to tell the local execution 
environment to wait for that window to be closed, instead of just 
shutting down.


Thanks and Regards

 Klemens Muthmann






--
Mit freundlichen Grüßen
  Dr.-Ing. Klemens Muthmann

---
Cyface GmbH
Hertha-Lindner-Straße 10
01067 Dresden
web: www.cyface.de
email: klemens.muthm...@cyface.de



Re: Partitioned tables in SQL client configuration.

2020-12-03 Thread Maciek Próchniak

Hi Jark,

thanks for answer. I'm a bit puzzled, because in my yaml I'm using  
"connector: filesystem" (not connector.type). I don't think I end up using


https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html#file-system-connector 
- this connector as partitioning and orc format are handled correctly.



It's also not clear for me what is "not legacy" connector for reading 
files directly from filesystem (no Hive). I don't see any implementation 
of DynamicTableSourceFactory which would do this.


I assumed that using DDL I wrote below also gives me 
FileSystemTableFactory, am I wrong?



thanks,

maciek



On 03.12.2020 16:26, Jark Wu wrote:
Only legacy connectors (`connector.type=kafka` instead of 
`connector=kafka`) are supported in the YAML at the moment. You can 
use regular DDL instead. There is a similar discussion in 
https://issues.apache.org/jira/browse/FLINK-20260 
 these days.


Best,
Jark

On Thu, 3 Dec 2020 at 00:52, Till Rohrmann > wrote:


Hi Maciek,

I am pulling in Timo who might help you with this problem.

Cheers,
Till

On Tue, Dec 1, 2020 at 6:51 PM Maciek Próchniak mailto:m...@touk.pl>> wrote:

Hello,

I try to configure SQL Client to query partitioned ORC data on
local
filesystem. I have directory structure like that:

/tmp/table1/startdate=2020-11-28

/tmp/table1/startdate=2020-11-27

etc.


If I run SQL Client session and create table by hand:

create table tst (column1 string, startdate string)
partitioned by
(startdate) with ('connector'='filesystem', 'format'='orc',
'path'='/tmp/table1');

everything runs fine:

explain select * from tst where startdate='2020-11-27'

shows that only one partition in 'readPartitions'


However, I struggle to configure table in .yaml config.

I tried like this (after some struggle, as "partition.keys"
setting
doesn't seem to be documented...) :

tables:
   - name: tst2
 type: source-table
 connector: filesystem
 path: "/tmp/table1"
 format: orc
 partition.keys:
   - name: startdate
 schema:
   - name: column1
 data-type: string
   - name: startdate
 data-type: string

and it more or less works - queries are executed properly.
However,
partitions are not pruned:

explain select * from tst2 where startdate='2020-11-27'

show all partitions in 'readPartitions'


Any idea what can be wrong? I'm using Flink 1.11.2


thanks,

maciek




How to parse list values in csv file

2020-12-03 Thread narasimha
Hi,

Getting below error when trying to read a csv file, one of the field is
list tupe

Can someone help if fixing the issue

jobmanager_1   | Caused by: java.lang.IllegalArgumentException: The type
'java.util.List' is not supported for the CSV input format.

jobmanager_1   | at
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:289)
~[flink-dist_2.11-1.11.2.jar:1.11.2]

jobmanager_1   | at
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:67)
~[flink-dist_2.11-1.11.2.jar:1.11.2]

jobmanager_1   | at
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:83)
~[flink-dist_2.11-1.11.2.jar:1.11.2]

jobmanager_1   | at
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:87)
~[flink-dist_2.11-1.11.2.jar:1.11.2]

-- 
A.Narasimha Swamy


FlinkKafkaProducer Fails with "Topic not present in metadata"

2020-12-03 Thread Joseph Lorenzini
Hi all,

I have a flink job that uses FlinkKafkaConsumer to consume messages off a kafka 
topic and a FlinkKafkaProducer to produce records on a Kafka topic. The 
consumer works fine. However, the flink job eventually fails with the following 
exception.

Caused by: org.apache.kafka.common.errors.TimeoutException: Topic XXX not 
present in metadata after 6 ms.

I did find this issue but didn't have any details so I am not sure if its 
related or not.

https://issues.apache.org/jira/browse/FLINK-18757

Some details that might be important:

- yes I verified the topic exists__
- The kafka cluster that the flink job is integrating with is the Confluent 
cloud platform at version 5.5.0. This means it should be compatible with apache 
kafka 2.5.X.  See here for details 
https://docs.confluent.io/platform/current/installation/versions-interoperability.html
- ACLs and SASL SSL are turned on
- a springboot app that I wrote (which uses spring kafka) is able to write to 
this same topic using the same credentials as what the flink job is using
- I am on flink 1.11.2 and using the flink-connector-kafka_2.11 at version 
1.11.2.
- I turned on trace logs and verified that metadata requests from the flink job 
occur and metadata responses from the kafka broker are returned.
- I've set producer semantics to none and disabled checkpointing



Privileged/Confidential Information may be contained in this message. If you 
are not the addressee indicated in this message (or responsible for delivery of 
the message to such person), you may not copy or deliver this message to 
anyone. In such case, you should destroy this message and kindly notify the 
sender by reply email. Please advise immediately if you or your employer does 
not consent to Internet email for messages of this kind. Opinions, conclusions 
and other information in this message that do not relate to the official 
business of my firm shall be understood as neither given nor endorsed by it.


Re: Running Flink job as a rest

2020-12-03 Thread Jaffe, Julian
I can't vouch for it personally, but perhaps the Apache Bahir Netty Source for 
Flink could help you? It sounds like you want to use HTTPS, which this doesn't 
support directly, but the source might be a helpful starting point to adding 
the functionality you need.

On 12/3/20, 1:33 AM, "Chesnay Schepler"  wrote:

What you are asking for is an HTTP(s) source. This currently does not 
exist for Flink, so you would have to implement it yourself.
Additionally you would have to figure out the host on which the source 
runs on yourself.

It may be easier to setup a separate HTTP(s) server that accepts data, 
which you then query from the source.

On 12/2/2020 10:31 PM, dhurandar S wrote:
> Can Flink job be running as Rest Server, Where Apache Flink job is
> listening on a port (443). When a user calls this URL with payload,
> data directly goes to the Apache Flink windowing function.
>
> Right now Flink can ingest data from Kafka or Kinesis, but we have a use
> case where we would like to push data to Flink, where Flink is listening 
on
> a port
>




Duplicate operators generated by plan

2020-12-03 Thread Rex Fenley
Hello,

I'm running into an issue where my execution plan is creating the same
exact join operator multiple times simply because the subsequent operator
filters on a different boolean value. This is a massive duplication of
storage and work. The filtered operators which follow result in only a
small set of elements filtered out per set too.

eg. of two separate operators that are equal

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
organization_id, user_id, roles, id_splatted, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent]

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
organization_id, user_id, roles, id_splatted, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent])

Which are entirely the same datasets being processed.

The first one points to
GroupAggregate(groupBy=[user_id], select=[user_id,
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
TMP_0.f0 AS admin_organization_ids])

The second one points to
GroupAggregate(groupBy=[user_id], select=[user_id,
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
TMP_0.f0 AS teacher_organization_ids])

And these are both intersecting sets of data though slightly different. I
don't see why that would make the 1 join from before split into 2 though.
There's even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink
to not duplicate operators where it doesn't need to?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



How to tell what mode a Table operator is in

2020-12-03 Thread Rex Fenley
Hi,

When I'm looking at the Flink plan in the UI and at an operator, is there a
way to tell if an operator is in Retract mode vs Upsert mode? Ideally we
want as many of our operators in Upsert mode as possible since our data
sources are all uniquely keyed.

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: How to tell what mode a Table operator is in

2020-12-03 Thread Rex Fenley
Our sinks are uniquely keyed as well. A couple of our joins are not until
an aggregate is performed however.

On Thu, Dec 3, 2020 at 12:46 PM Rex Fenley  wrote:

> Hi,
>
> When I'm looking at the Flink plan in the UI and at an operator, is there
> a way to tell if an operator is in Retract mode vs Upsert mode? Ideally we
> want as many of our operators in Upsert mode as possible since our data
> sources are all uniquely keyed.
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: How to tell what mode a Table operator is in

2020-12-03 Thread Danny Chan
If a stateful operator has also a stateful operator in its input
sub-pipeline, then it may receive retract messages. Operator like group
agg, stream-stream join or rank are stateful.

We can not show if the operator are receiving retract messages in the UI.
But your request is reasonable.

Rex Fenley 于2020年12月4日 周五上午4:48写道:

> Our sinks are uniquely keyed as well. A couple of our joins are not until
> an aggregate is performed however.
>
> On Thu, Dec 3, 2020 at 12:46 PM Rex Fenley  wrote:
>
>> Hi,
>>
>> When I'm looking at the Flink plan in the UI and at an operator, is there
>> a way to tell if an operator is in Retract mode vs Upsert mode? Ideally we
>> want as many of our operators in Upsert mode as possible since our data
>> sources are all uniquely keyed.
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: Query regarding HA mode and checkpointing

2020-12-03 Thread Yang Wang
Hi Kaushal,

Only the state handle pointer is stored in the ZooKeeper node. Since
ZooKeeper is built for
small data(KB level) storage. The real data will be persisted in the *"*
*high-availability.storageDir"*.
Note that it should be distributed storage(HDFS, S3, etc.).

The ZooKeeper HA service has the following usage, as well as the Kubernetes
HA service,
you could find more information here[1].
* Leader election/retrieval
* Running job registry
* Submitted job graph store
* Checkpoint store

[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-HAcomponents


Best,
Yang


Kaushal Raj  于2020年12月3日周四 下午7:13写道:

> Hello,
>
> I am new to flink. Have few queries regarding the HA mode with zookeeper
> and checkpointing. When flink is configured in HA mode with zookeeper,
> where do the job checkpoints are stored? zookeeper only used for recovering
> the jobmanager or even checkpoints? what is the significance of 
> *"**high-availability.storageDir"
> *here.
>
> Thanks,
> Kaushal
>


Re: Duplicate operators generated by plan

2020-12-03 Thread Yun Gao
Hi Rex,

Could  you also attach one example for these sql / table ? And one possible 
issue to confirm is that does the operators with the same names also have the 
same inputs ?

Best,
Yun

 --Original Mail --
Sender:Rex Fenley 
Send Date:Fri Dec 4 02:55:41 2020
Recipients:user 
Subject:Duplicate operators generated by plan

Hello,

I'm running into an issue where my execution plan is creating the same exact 
join operator multiple times simply because the subsequent operator filters on 
a different boolean value. This is a massive duplication of storage and work. 
The filtered operators which follow result in only a small set of elements 
filtered out per set too.

eg. of two separate operators that are equal

 Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, 
organization_id, user_id, roles, id_splatted, org_user_is_admin, 
org_user_is_teacher, org_user_is_student, org_user_is_parent], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, 
user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, 
org_user_is_student, org_user_is_parent]

 Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, 
organization_id, user_id, roles, id_splatted, org_user_is_admin, 
org_user_is_teacher, org_user_is_student, org_user_is_parent], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, 
user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, 
org_user_is_student, org_user_is_parent]) 

Which are entirely the same datasets being processed.

The first one points to 
 GroupAggregate(groupBy=[user_id], select=[user_id, 
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS 
admin_organization_ids]) 

The second one points to
 GroupAggregate(groupBy=[user_id], select=[user_id, 
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS 
teacher_organization_ids]) 

And these are both intersecting sets of data though slightly different. I don't 
see why that would make the 1 join from before split into 2 though. There's 
even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink to 
not duplicate operators where it doesn't need to?

Thanks!

-- 

Rex Fenley | Software Engineer - Mobile and Backend

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Re: Duplicate operators generated by plan

2020-12-03 Thread Rex Fenley
Yes, the same exact input operators go into both joins.

The chunk of code for the joins from the specific part of the plan I showed
is as follows. The orgUsersTable is later filtered into one table and
aggregated and another table and aggregated. The planner seems to duplicate
orgUsersTable into 2 operators even though I create only 1 of it.

// in the main function
val orgUsersTable = splatRoles(
this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
OrgUsersRoleSplatPrefix,
this.tableEnv
)

// helper function
def splatRoles(
table: Table,
columnPrefix: String,
tableEnv: TableEnvironment
): Table = {
// Flink does not have a contains function so we have to splat out our role
array's contents
// and join it to the originating table.
val func = new SplatRolesFunc()
val splatted = table
.map(func($"roles", $"id"))
.as(
"id_splatted",
s"${columnPrefix}_is_admin",
s"${columnPrefix}_is_teacher",
s"${columnPrefix}_is_student",
s"${columnPrefix}_is_parent"
)
// FIRST_VALUE is only available in SQL - so this is SQL.
// Rationale: We have to group by after a map to preserve the pk inference,
otherwise flink will
// toss it out and all future joins will not have a unique key.
tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
val grouped = tableEnv.sqlQuery(s"""
SELECT
id_splatted,
FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
FROM ${columnPrefix}_splatted
GROUP BY id_splatted
""")
return table
.join(grouped, $"id" === $"id_splatted")
.dropColumns($"id_splatted")
.renameColumns($"roles".as(s"${columnPrefix}_roles"))
}

@FunctionHint(
output = new DataTypeHint(
"(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student
BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
)
)
class SplatRolesFunc extends ScalarFunction {
def eval(roles: Array[String], id: java.lang.Long): Row = {
val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
}

override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
Types.ROW(
Types.LONG,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN
)
}


On Thu, Dec 3, 2020 at 7:49 PM Yun Gao  wrote:

> Hi Rex,
>
> Could  you also attach one example for these sql / table ? And one
> possible issue to confirm is that does the operators with the same names
> also have the same inputs ?
>
> Best,
> Yun
>
> --Original Mail --
> *Sender:*Rex Fenley 
> *Send Date:*Fri Dec 4 02:55:41 2020
> *Recipients:*user 
> *Subject:*Duplicate operators generated by plan
>
>> Hello,
>>
>> I'm running into an issue where my execution plan is creating the same
>> exact join operator multiple times simply because the subsequent operator
>> filters on a different boolean value. This is a massive duplication of
>> storage and work. The filtered operators which follow result in only a
>> small set of elements filtered out per set too.
>>
>> eg. of two separate operators that are equal
>>
>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>> leftInputSpec=[JoinKeyContainsUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent]
>>
>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>> leftInputSpec=[JoinKeyContainsUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent])
>>
>> Which are entirely the same datasets being processed.
>>
>> The first one points to
>> GroupAggregate(groupBy=[user_id], select=[user_id,
>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>> TMP_0.f0 AS admin_organization_ids])
>>
>> The second one points to
>> GroupAggregate(groupBy=[user_id], select=[user_id,
>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>> TMP_0.f0 AS teacher_organization_ids])
>>
>> And these are both intersecting sets of data though slightly different. I
>> don't see why that would make the 1 join from before split into 2 though.
>> There's even a case where I'm seeing a join tripled.
>

Re: Duplicate operators generated by plan

2020-12-03 Thread Rex Fenley
cc Brad

On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley  wrote:

> Yes, the same exact input operators go into both joins.
>
> The chunk of code for the joins from the specific part of the plan I
> showed is as follows. The orgUsersTable is later filtered into one table
> and aggregated and another table and aggregated. The planner seems to
> duplicate orgUsersTable into 2 operators even though I create only 1 of it.
>
> // in the main function
> val orgUsersTable = splatRoles(
> this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
> OrgUsersRoleSplatPrefix,
> this.tableEnv
> )
>
> // helper function
> def splatRoles(
> table: Table,
> columnPrefix: String,
> tableEnv: TableEnvironment
> ): Table = {
> // Flink does not have a contains function so we have to splat out our
> role array's contents
> // and join it to the originating table.
> val func = new SplatRolesFunc()
> val splatted = table
> .map(func($"roles", $"id"))
> .as(
> "id_splatted",
> s"${columnPrefix}_is_admin",
> s"${columnPrefix}_is_teacher",
> s"${columnPrefix}_is_student",
> s"${columnPrefix}_is_parent"
> )
> // FIRST_VALUE is only available in SQL - so this is SQL.
> // Rationale: We have to group by after a map to preserve the pk
> inference, otherwise flink will
> // toss it out and all future joins will not have a unique key.
> tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
> val grouped = tableEnv.sqlQuery(s"""
> SELECT
> id_splatted,
> FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
> FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
> FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
> FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
> FROM ${columnPrefix}_splatted
> GROUP BY id_splatted
> """)
> return table
> .join(grouped, $"id" === $"id_splatted")
> .dropColumns($"id_splatted")
> .renameColumns($"roles".as(s"${columnPrefix}_roles"))
> }
>
> @FunctionHint(
> output = new DataTypeHint(
> "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student
> BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
> )
> )
> class SplatRolesFunc extends ScalarFunction {
> def eval(roles: Array[String], id: java.lang.Long): Row = {
> val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
> val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
> val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
> val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
> return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
> }
>
> override def getResultType(signature: Array[Class[_]]):
> TypeInformation[_] =
> Types.ROW(
> Types.LONG,
> Types.BOOLEAN,
> Types.BOOLEAN,
> Types.BOOLEAN,
> Types.BOOLEAN
> )
> }
>
>
> On Thu, Dec 3, 2020 at 7:49 PM Yun Gao  wrote:
>
>> Hi Rex,
>>
>> Could  you also attach one example for these sql / table ? And one
>> possible issue to confirm is that does the operators with the same names
>> also have the same inputs ?
>>
>> Best,
>> Yun
>>
>> --Original Mail --
>> *Sender:*Rex Fenley 
>> *Send Date:*Fri Dec 4 02:55:41 2020
>> *Recipients:*user 
>> *Subject:*Duplicate operators generated by plan
>>
>>> Hello,
>>>
>>> I'm running into an issue where my execution plan is creating the same
>>> exact join operator multiple times simply because the subsequent operator
>>> filters on a different boolean value. This is a massive duplication of
>>> storage and work. The filtered operators which follow result in only a
>>> small set of elements filtered out per set too.
>>>
>>> eg. of two separate operators that are equal
>>>
>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>>> leftInputSpec=[JoinKeyContainsUniqueKey],
>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent]
>>>
>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>>> leftInputSpec=[JoinKeyContainsUniqueKey],
>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent])
>>>
>>> Which are entirely the same datasets being processed.
>>>
>>> The first one points to
>>> GroupAggregate(groupBy=[user_id], select=[user_id,
>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>>> TMP_0.f0 AS admin_organization_ids])
>>>
>>> The second one points to
>>> GroupAggregate(groupBy=[user_id], select=[user_id,
>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(sel

Re: How to parse list values in csv file

2020-12-03 Thread Yun Gao

Hi,

The CSV only supports the types listed in [1] and must use the types in 
this list, thus for other types some kind of workaround is needed, like first 
parsed as string and parsed again later in the program. 

Best,
Yun



[1] 
https://github.com/apache/flink/blob/e10e548feb2bedf54c3863bbd49ed4f9140546cf/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java#L287



 --Original Mail --
Sender:narasimha 
Send Date:Fri Dec 4 00:45:53 2020
Recipients:user 
Subject:How to parse list values in csv file

Hi,

Getting below error when trying to read a csv file, one of the field is list 
tupe 

Can someone help if fixing the issue 

jobmanager_1   | Caused by: java.lang.IllegalArgumentException: The type 
'java.util.List' is not supported for the CSV input format.
jobmanager_1   | at 
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:289)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:67)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:83)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:87)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]

-- 
A.Narasimha Swamy