Batch version of StreamingFileSink.forRowFormat(...)

2020-08-10 Thread Dan Hill
Hi. I have a streaming job that writes to
StreamingFileSink.forRowFormat(...) with an encoder that converts protocol
buffers to byte arrays.

How do read this data back in during a batch pipeline (using DataSet)?  Do
I use env.readFile with a custom DelimitedInputFormat?  The streamfile sink
documentation

is a bit vague.

These files are used as raw logs.  They're processed offline and the whole
record is read and used at the same time.

Thanks!
- Dan



Issues with Flink Batch and Hadoop dependency

2020-08-28 Thread Dan Hill
I'm assuming I have a simple, common setup problem.  I've spent 6 hours
debugging and haven't been able to figure it out.  Any help would be
greatly appreciated.


*Problem*
I have a Flink Streaming job setup that writes SequenceFiles in S3.  When I
try to create a Flink Batch job to read these Sequence files, I get the
following error:

NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat

It fails on this readSequenceFile.

env.createInput(HadoopInputs.readSequenceFile(Text.class,
ByteWritable.class, INPUT_FILE))

If I directly depend on org-apache-hadoop/hadoop-mapred when building the
job, I get the following error when trying to run the job:

Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
FileSystem for scheme "s3"
at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209)
at
org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254)
at
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
at
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:257)


*Extra context*
I'm using this Helm chart  for
creating Flink.  I'm using v1.10.1.


*Questions*
Are there any existing projects that read batch Hadoop file formats from S3?

I've looked at these instructions for Hadoop Integration
.
I'm assuming my configuration is wrong.  I'm also assuming I need the
hadoop dependency properly setup in the jobmanager and taskmanager (not in
the job itself).  If I use this Helm chart, do I need to download a hadoop
common jar into the Flink images for jobmanager and taskmanager?  Are there
pre-built images which I can use that already have the dependencies setup?


- Dan


Re: Batch version of StreamingFileSink.forRowFormat(...)

2020-08-28 Thread Dan Hill
Thanks!  I'll take a look.

On Tue, Aug 11, 2020 at 1:33 AM Timo Walther  wrote:

> Hi Dan,
>
> InputFormats are the connectors of the DataSet API. Yes, you can use
> either readFile, readCsvFile, readFileOfPrimitives etc. However, I would
> recommend to also give Table API a try. The unified TableEnvironment is
> able to perform batch processing and is integrated with a bunch of
> connectors such as for filesystems [1] and through Hive abstractions [2].
>
> I hope this helps.
>
> Regards,
> Timo
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_read_write.html
>
> On 11.08.20 00:13, Dan Hill wrote:
> > Hi. I have a streaming job that writes to
> > StreamingFileSink.forRowFormat(...) with an encoder that converts
> > protocol buffers to byte arrays.
> >
> > How do read this data back in during a batch pipeline (using DataSet)?
> > Do I use env.readFile with a custom DelimitedInputFormat?  The
> > streamfile sink documentation
> > <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html>
>
> > is a bit vague.
> >
> > These files are used as raw logs.  They're processed offline and the
> > whole record is read and used at the same time.
> >
> > Thanks!
> > - Dan
> > <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> >
>
>


Re: Issues with Flink Batch and Hadoop dependency

2020-08-29 Thread Dan Hill
I was able to get a basic version to work by including a bunch of hadoop
and s3 dependencies in the job jar and hacking in some hadoop config
values.  It's probably not optimal but it looks like I'm unblocked.

On Fri, Aug 28, 2020 at 12:11 PM Dan Hill  wrote:

> I'm assuming I have a simple, common setup problem.  I've spent 6 hours
> debugging and haven't been able to figure it out.  Any help would be
> greatly appreciated.
>
>
> *Problem*
> I have a Flink Streaming job setup that writes SequenceFiles in S3.  When
> I try to create a Flink Batch job to read these Sequence files, I get the
> following error:
>
> NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat
>
> It fails on this readSequenceFile.
>
> env.createInput(HadoopInputs.readSequenceFile(Text.class,
> ByteWritable.class, INPUT_FILE))
>
> If I directly depend on org-apache-hadoop/hadoop-mapred when building the
> job, I get the following error when trying to run the job:
>
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
> FileSystem for scheme "s3"
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
> at
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209)
> at
> org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48)
> at
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254)
> at
> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
> at
> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:257)
>
>
> *Extra context*
> I'm using this Helm chart <https://hub.helm.sh/charts/riskfocus/flink>
> for creating Flink.  I'm using v1.10.1.
>
>
> *Questions*
> Are there any existing projects that read batch Hadoop file formats from
> S3?
>
> I've looked at these instructions for Hadoop Integration
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html#add-hadoop-classpaths>.
> I'm assuming my configuration is wrong.  I'm also assuming I need the
> hadoop dependency properly setup in the jobmanager and taskmanager (not in
> the job itself).  If I use this Helm chart, do I need to download a hadoop
> common jar into the Flink images for jobmanager and taskmanager?  Are there
> pre-built images which I can use that already have the dependencies setup?
>
>
> - Dan
>


Flink Table API and not recognizing s3 plugins

2020-09-10 Thread Dan Hill
*Background*
I'm converting some prototype Flink v1.11.1 code that uses
DataSet/DataTable APIs to use the Table API.

*Problem*
When switching to using the Table API, my s3 plugins stopped working.  I
don't know why.  I've added the required maven table dependencies to the
job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin
subfolders.  No luck.

Any ideas what is wrong?  I'm guessing I'm missing something simple.


*Error*

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3p'. The scheme is
directly supported by Flink through the following plugin:
flink-s3-fs-presto. Please ensure that each plugin resides within its own
subfolder within the plugins directory. See
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for
more information. If you want to use a Hadoop file system for that scheme,
please add the scheme to the configuration fs.allowed-fallback-filesystems.
For a full list of supported file systems, please see
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at
org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more

*ls of plugins directory (same for taskmanager)*

kubectl exec pod/flink-jobmanager-0  -- ls -l
/opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar


Re: Flink Table API and not recognizing s3 plugins

2020-09-10 Thread Dan Hill
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar
in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar
in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3a'. The scheme is
directly supported by Flink through the following plugin:
flink-s3-fs-hadoop. Please ensure that each plugin resides within its own
subfolder within the plugins directory. See
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for
more information. If you want to use a Hadoop file system for that scheme,
please add the scheme to the configuration fs.allowed-fallback-filesystems.
For a full list of supported file systems, please see
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3p'. The scheme is
directly supported by Flink through the following plugin:
flink-s3-fs-presto. Please ensure that each plugin resides within its own
subfolder within the plugins directory. See
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for
more information. If you want to use a Hadoop file system for that scheme,
please add the scheme to the configuration fs.allowed-fallback-filesystems.
For a full list of supported file systems, please see
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise  wrote:

> Hi Dan,
>
> s3p is only provided by flink-s3-fs-presto plugin. The plugin you used
> provides s3a.
> (and both provide s3, but it's good to use the more specific prefix).
>
> Best,
>
> Arvid
>
> On Thu, Sep 10, 2020 at 9:24 AM Dan Hill  wrote:
>
>> *Background*
>> I'm converting some prototype Flink v1.11.1 code that uses
>> DataSet/DataTable APIs to use the Table API.
>>
>> *Problem*
>> When switching to using the Table API, my s3 plugins stopped working.  I
>> don't know why.  I've added the required maven table dependencies to the
>> job.
>>
>> I've tried us moving both the presto and/or the hadoop s3 jars to plugin
>> subfolders.  No luck.
>>
>> Any ideas what is wrong?  I'm guessing I'm missing something simple.
>>
>>
>> *Error*
>>
>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>> Could not find a file system implementation for scheme 's3p'. The scheme is
>> directly supported by Flink through the following plugin:
>> flink-s3-fs-presto. Please ensure that each plugin resides within its own
>> subfolder within the plugins directory. See
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
>> for more information. If you want to use a Hadoop file system for that
>> scheme, please add the scheme to the configuration
>> fs.allowed-fallback-filesystems. For a full list of supported file systems,
>> please see
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>>
>> at
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)
>>
>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
>>
>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
>>
>> at
>> org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)
>>
>> ... 35 more
>>
>> *ls of plugins directory (same for taskmanager)*
>>
>> kubectl exec pod/flink-jobmanager-0  -- ls -l
>> /opt/flink/plugins/s3-fs-hadoop
>>
>> total 19520
>>
>> -rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar
>>
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Flink Table API and not recognizing s3 plugins

2020-09-10 Thread Dan Hill
I changed the levels to DEBUG.  I don't see useful data in the logs.

https://drive.google.com/file/d/1ua1zsr3BInY_8xdsWwA__F0uloAqy-vG/view?usp=sharing

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise  wrote:

> Could you try 1) or 2) and enable debug logging* and share the log with us?
>
> *Usually by adjusting FLINK_HOME/conf/log4j.properties.
>
> On Thu, Sep 10, 2020 at 5:38 PM Dan Hill  wrote:
>
>> Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
>> 1) using s3a uri with flink-s3-fs-hadoop jar
>> in /opt/flink/plugins/s3-fs-hadoop.
>> 2) using s3p uri with flink-s3-fs-presto jar
>> in /opt/flink/plugins/s3-fs-presto.
>> 3) loading both 1 and 2
>> 4) trying s3 uri.
>>
>> When doing 1)
>>
>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>> Could not find a file system implementation for scheme 's3a'. The scheme is
>> directly supported by Flink through the following plugin:
>> flink-s3-fs-hadoop. Please ensure that each plugin resides within its own
>> subfolder within the plugins directory. See
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
>> for more information. If you want to use a Hadoop file system for that
>> scheme, please add the scheme to the configuration
>> fs.allowed-fallback-filesystems. For a full list of supported file systems,
>> please see
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>>
>> When doing 2)
>>
>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>> Could not find a file system implementation for scheme 's3p'. The scheme is
>> directly supported by Flink through the following plugin:
>> flink-s3-fs-presto. Please ensure that each plugin resides within its own
>> subfolder within the plugins directory. See
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
>> for more information. If you want to use a Hadoop file system for that
>> scheme, please add the scheme to the configuration
>> fs.allowed-fallback-filesystems. For a full list of supported file systems,
>> please see
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>>
>> etc
>>
>> On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise  wrote:
>>
>>> Hi Dan,
>>>
>>> s3p is only provided by flink-s3-fs-presto plugin. The plugin you used
>>> provides s3a.
>>> (and both provide s3, but it's good to use the more specific prefix).
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Thu, Sep 10, 2020 at 9:24 AM Dan Hill  wrote:
>>>
>>>> *Background*
>>>> I'm converting some prototype Flink v1.11.1 code that uses
>>>> DataSet/DataTable APIs to use the Table API.
>>>>
>>>> *Problem*
>>>> When switching to using the Table API, my s3 plugins stopped working.
>>>> I don't know why.  I've added the required maven table dependencies to the
>>>> job.
>>>>
>>>> I've tried us moving both the presto and/or the hadoop s3 jars to
>>>> plugin subfolders.  No luck.
>>>>
>>>> Any ideas what is wrong?  I'm guessing I'm missing something simple.
>>>>
>>>>
>>>> *Error*
>>>>
>>>> Caused by:
>>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
>>>> find a file system implementation for scheme 's3p'. The scheme is directly
>>>> supported by Flink through the following plugin: flink-s3-fs-presto. Please
>>>> ensure that each plugin resides within its own subfolder within the plugins
>>>> directory. See
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
>>>> for more information. If you want to use a Hadoop file system for that
>>>> scheme, please add the scheme to the configuration
>>>> fs.allowed-fallback-filesystems. For a full list of supported file systems,
>>>> please see
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
>>>> .
>>>>
>>>> at
>>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)
>>>>
>>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
>>>>
>>>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
>>>>
>>>> at
>>>> org.apache.fl

Re: Flink Table API and not recognizing s3 plugins

2020-09-10 Thread Dan Hill
 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property:
s3a.aws.credentials.provider,
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

2020-09-10 19:50:17,802 INFO  org.apache.flink.client.cli.CliFrontend
[] - Loading FallbackYarnSessionCli

2020-09-10 19:50:17,929 INFO  org.apache.flink.core.fs.FileSystem
[] - Hadoop is not in the classpath/dependencies. The
extended set of supported File Systems via Hadoop is not available.

2020-09-10 19:50:18,102 INFO
org.apache.flink.runtime.security.modules.HadoopModuleFactory
[] - Cannot create Hadoop Security Module because Hadoop cannot be found in
the Classpath.

2020-09-10 19:50:18,126 INFO
org.apache.flink.runtime.security.modules.JaasModule
[] - Jaas file will be created as
/tmp/jaas-1506212733867615019.conf.

2020-09-10 19:50:18,161 INFO
org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
[] - Cannot install HadoopSecurityContext because Hadoop cannot be found in
the Classpath.

2020-09-10 19:50:18,163 INFO  org.apache.flink.client.cli.CliFrontend
[] - Running 'list' command.

2020-09-10 19:50:18,226 INFO
org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
[] - Could not load factory due to missing dependencies.

2020-09-10 19:50:19,107 INFO  org.apache.flink.client.cli.CliFrontend
[] - Waiting for response...

2020-09-10 19:50:19,414 INFO  org.apache.flink.client.cli.CliFrontend
[] - Successfully retrieved list of jobs

On Thu, Sep 10, 2020 at 10:50 AM Dan Hill  wrote:

> I changed the levels to DEBUG.  I don't see useful data in the logs.
>
>
> https://drive.google.com/file/d/1ua1zsr3BInY_8xdsWwA__F0uloAqy-vG/view?usp=sharing
>
> On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise  wrote:
>
>> Could you try 1) or 2) and enable debug logging* and share the log with
>> us?
>>
>> *Usually by adjusting FLINK_HOME/conf/log4j.properties.
>>
>> On Thu, Sep 10, 2020 at 5:38 PM Dan Hill  wrote:
>>
>>> Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
>>> 1) using s3a uri with flink-s3-fs-hadoop jar
>>> in /opt/flink/plugins/s3-fs-hadoop.
>>> 2) using s3p uri with flink-s3-fs-presto jar
>>> in /opt/flink/plugins/s3-fs-presto.
>>> 3) loading both 1 and 2
>>> 4) trying s3 uri.
>>>
>>> When doing 1)
>>>
>>> Caused by:
>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
>>> find a file system implementation for scheme 's3a'. The scheme is directly
>>> supported by Flink through the following plugin: flink-s3-fs-hadoop. Please
>>> ensure that each plugin resides within its own subfolder within the plugins
>>> directory. See
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
>>> for more information. If you want to use a Hadoop file system for that
>>> scheme, please add the scheme to the configuration
>>> fs.allowed-fallback-filesystems. For a full list of supported file systems,
>>> please see
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>>>
>>> When doing 2)
>>>
>>> Caused by:
>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
>>> find a file system implementation for scheme 's3p'. The scheme is directly
>>> supported by Flink through the following plugin: flink-s3-fs-presto. Please
>>> ensure that each plugin resides within its own subfolder within the plugins
>>> directory. See
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
>>> for more information. If you want to use a Hadoop file system for that
>>> scheme, please add the scheme to the configuration
>>> fs.allowed-fallback-filesystems. For a full list of supported file systems,
>>> please see
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>>>
>>> etc
>>>
>>> On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise  wrote:
>>>
>>>> Hi Dan,
>>>>
>>>> s3p is only provided by flink-s3-fs-presto plugin. The plugin you used
>>>> provides s3a.
>>>> (and both provide s3, but it's good to use the more specific prefix).
>>>>
>>>> Best,
>>>>
>>>> Arvid
>>>>
>>>> On Thu, Sep 10, 2020 at 9:24 AM Dan Hill  wrote:
>>>>
>>>>> *Background*
>>>>> I'm converting some prototype Flink v1.11.1 code that uses
>>>>> DataSet/DataTabl

Re: Flink Table API and not recognizing s3 plugins

2020-09-10 Thread Dan Hill
 line interface.
>
> How do you execute the job? Do you start it from your machine? Can you try
> out to also add the respective s3 plugin there?
>
> Best,
>
> Arvid
>
> On Thu, Sep 10, 2020 at 7:50 PM Dan Hill  wrote:
>
>> I changed the levels to DEBUG.  I don't see useful data in the logs.
>>
>>
>> https://drive.google.com/file/d/1ua1zsr3BInY_8xdsWwA__F0uloAqy-vG/view?usp=sharing
>>
>> On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise  wrote:
>>
>>> Could you try 1) or 2) and enable debug logging* and share the log with
>>> us?
>>>
>>> *Usually by adjusting FLINK_HOME/conf/log4j.properties.
>>>
>>> On Thu, Sep 10, 2020 at 5:38 PM Dan Hill  wrote:
>>>
>>>> Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
>>>> 1) using s3a uri with flink-s3-fs-hadoop jar
>>>> in /opt/flink/plugins/s3-fs-hadoop.
>>>> 2) using s3p uri with flink-s3-fs-presto jar
>>>> in /opt/flink/plugins/s3-fs-presto.
>>>> 3) loading both 1 and 2
>>>> 4) trying s3 uri.
>>>>
>>>> When doing 1)
>>>>
>>>> Caused by:
>>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
>>>> find a file system implementation for scheme 's3a'. The scheme is directly
>>>> supported by Flink through the following plugin: flink-s3-fs-hadoop. Please
>>>> ensure that each plugin resides within its own subfolder within the plugins
>>>> directory. See
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
>>>> for more information. If you want to use a Hadoop file system for that
>>>> scheme, please add the scheme to the configuration
>>>> fs.allowed-fallback-filesystems. For a full list of supported file systems,
>>>> please see
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
>>>> .
>>>>
>>>> When doing 2)
>>>>
>>>> Caused by:
>>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
>>>> find a file system implementation for scheme 's3p'. The scheme is directly
>>>> supported by Flink through the following plugin: flink-s3-fs-presto. Please
>>>> ensure that each plugin resides within its own subfolder within the plugins
>>>> directory. See
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
>>>> for more information. If you want to use a Hadoop file system for that
>>>> scheme, please add the scheme to the configuration
>>>> fs.allowed-fallback-filesystems. For a full list of supported file systems,
>>>> please see
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
>>>> .
>>>>
>>>> etc
>>>>
>>>> On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise 
>>>> wrote:
>>>>
>>>>> Hi Dan,
>>>>>
>>>>> s3p is only provided by flink-s3-fs-presto plugin. The plugin you used
>>>>> provides s3a.
>>>>> (and both provide s3, but it's good to use the more specific prefix).
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>> On Thu, Sep 10, 2020 at 9:24 AM Dan Hill 
>>>>> wrote:
>>>>>
>>>>>> *Background*
>>>>>> I'm converting some prototype Flink v1.11.1 code that uses
>>>>>> DataSet/DataTable APIs to use the Table API.
>>>>>>
>>>>>> *Problem*
>>>>>> When switching to using the Table API, my s3 plugins stopped
>>>>>> working.  I don't know why.  I've added the required maven table
>>>>>> dependencies to the job.
>>>>>>
>>>>>> I've tried us moving both the presto and/or the hadoop s3 jars to
>>>>>> plugin subfolders.  No luck.
>>>>>>
>>>>>> Any ideas what is wrong?  I'm guessing I'm missing something simple.
>>>>>>
>>>>>>
>>>>>> *Error*
>>>>>>
>>>>>> Caused by:
>>>>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
>>>>>> find a file system implementation for scheme 's3p'. The scheme is 
>>>>>> directly
>>>>

Re: Flink Table API and not recognizing s3 plugins

2020-09-10 Thread Dan Hill
I'm using this Helm chart
<https://github.com/riskfocus/helm-charts-public/tree/master/flink>.  I
start the job by building an image with the job jar and using kubectl apply
to do a flink run with the jar.

The log4j.properties on jobmanager and taskmanager have debug level set and
are pretty embedded into the Helm chart.  My log4j-cli.properties is hacked
on the CLI side.

I thought I just needed the s3 plugins in the jobmanager and taskmanager.
Do I need to have a similar plugin structure from the image where I run
'flink run'?


On Thu, Sep 10, 2020 at 1:03 PM Dan Hill  wrote:

> Copying more of the log
>
> 2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -
> 
>
> 2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -  Starting Command Line Client (Version: 1.11.1,
> Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)
>
> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -  OS current user: root
>
> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -  Current Hadoop/Kerberos user:  dependency found>
>
> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -  JVM: OpenJDK 64-Bit Server VM - Oracle
> Corporation - 1.8/25.265-b01
>
> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -  Maximum heap size: 2167 MiBytes
>
> tail: log/flink--client-flink-jobmanager-0.log: file truncated
>
> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -  JAVA_HOME: /usr/local/openjdk-8
>
> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -  No Hadoop Dependency available
>
> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -  JVM Options:
>
> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -
> -Djava.security.properties=/opt/flink/conf/security.properties
>
> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -
> -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log
>
> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties
>
> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -
> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties
>
> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -
> -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
>
> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -  Program Arguments:
>
> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>   [] - list
>
> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>   [] - --jobmanager
>
> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>   [] - localhost:8081
>
> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -  Classpath:
> /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar:::
>
> 2020-09-10 19:50:17,722 INFO  org.apache.flink.client.cli.CliFrontend
>   [] -
> 
>
> 2020-09-10 19:50:17,731 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   [] - Loading configuration property:
> taskmanager.numberOfTaskSlots, 2
>
> 2020-09-10 19:50:17,732 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   [] - Loading configuration property: blob.server.port, 6124
>
> 2020-09-10 19:50:17,732 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>   [] - Loading configuration property: taskmanager.rpc.port, 6122
>
> 2020-09-10 19:50:17,732 INFO  
> org.apache.flink.configuration.GlobalConf

Re: Flink Table API and not recognizing s3 plugins

2020-09-10 Thread Dan Hill
This is running on my local minikube and is trying to hit minio.

On Thu, Sep 10, 2020 at 1:10 PM Dan Hill  wrote:

> I'm using this Helm chart
> <https://github.com/riskfocus/helm-charts-public/tree/master/flink>.  I
> start the job by building an image with the job jar and using kubectl apply
> to do a flink run with the jar.
>
> The log4j.properties on jobmanager and taskmanager have debug level set
> and are pretty embedded into the Helm chart.  My log4j-cli.properties is
> hacked on the CLI side.
>
> I thought I just needed the s3 plugins in the jobmanager and taskmanager.
> Do I need to have a similar plugin structure from the image where I run
> 'flink run'?
>
>
> On Thu, Sep 10, 2020 at 1:03 PM Dan Hill  wrote:
>
>> Copying more of the log
>>
>> 2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -
>> 
>>
>> 2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -  Starting Command Line Client (Version: 1.11.1,
>> Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)
>>
>> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -  OS current user: root
>>
>> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -  Current Hadoop/Kerberos user: > dependency found>
>>
>> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -  JVM: OpenJDK 64-Bit Server VM - Oracle
>> Corporation - 1.8/25.265-b01
>>
>> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -  Maximum heap size: 2167 MiBytes
>>
>> tail: log/flink--client-flink-jobmanager-0.log: file truncated
>>
>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -  JAVA_HOME: /usr/local/openjdk-8
>>
>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -  No Hadoop Dependency available
>>
>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -  JVM Options:
>>
>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -
>> -Djava.security.properties=/opt/flink/conf/security.properties
>>
>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -
>> -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log
>>
>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -
>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties
>>
>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -
>> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties
>>
>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -
>> -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
>>
>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -  Program Arguments:
>>
>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] - list
>>
>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] - --jobmanager
>>
>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] - localhost:8081
>>
>> 2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -  Classpath:
>> /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar:::
>>
>> 2020-09-10 19:50:17,722 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] -
>> 
>>
>> 2020-09-10 19:50:17,731 INFO  
>> org.apache.flink.configuration.GlobalConfiguration
>>   [] - Lo

I hit a bad jobmanager address when trying to use Flink SQL Client

2020-09-10 Thread Dan Hill
I just tried using the Flink SQL Client.  A simple job is not running
because it cannot hit jobmanager.  I'm not sure why Flink SQL Client is
hitting "flink-jobmanager/10.98.253.58:8081".  I'd expect either
"flink-jobmanager:8081" or "10.98.253.58:8081" (which should work with my
kubernetes setup).

I'm using riskfocus's Flink helm chart
.

The last SELECT errors out.  I have an environment file that indicates this
"execution.type: batch".  My setup works when using DataSet and
DataStream.  The jobmanager and taskmanager logs look fine.  This seems
like a weird configuration with SQL Client that is either broken with that
Flink helm chart or with SQL Client.


Flink SQL> DROP TABLE `default_catalog.mydb.user`;

[INFO] Table has been removed.


Flink SQL> CREATE TABLE `default_catalog.mydb.user` (`platformId` BIGINT,
`userId` STRING) WITH ('connector' = 'filesystem', 'path' =
's3://mys3bucket/users.csv','format' = 'csv');

[INFO] Table has been created.


Flink SQL> SELECT * FROM `default_catalog.mydb.user` LIMIT 10;

*[ERROR] Could not execute SQL statement. Reason:*

*org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException:
connection timed out: flink-jobmanager/10.98.253.58:8081
*

My config logs from jobmanager.

2020-09-11 02:33:07,962 INFO
org.apache.flink.contrib.streaming.state.RocksDBStateBackend
[] - Using predefined options: DEFAULT.

2020-09-11 02:33:07,962 INFO
org.apache.flink.contrib.streaming.state.RocksDBStateBackend
[] - Using default options factory:
DefaultConfigurableOptionsFactory{configuredOptions={}}.

2020-09-11 02:33:08,100 INFO
org.apache.flink.table.client.gateway.local.ProgramDeployer  [] -
Submitting job org.apache.flink.streaming.api.graph.StreamGraph@3bd08435
for query default: SELECT * FROM `default_catalog.mydb.user` LIMIT 10`

2020-09-11 02:33:08,112 INFO  org.apache.flink.configuration.Configuration
  [] - Config uses fallback configuration key
'jobmanager.rpc.address' instead of key 'rest.address'

tail: log/flink--sql-client-flink-jobmanager-0.log: file truncated

2020-09-11 02:34:35,848 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property:
taskmanager.numberOfTaskSlots, 2

2020-09-11 02:34:35,884 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property: blob.server.port, 6124

2020-09-11 02:34:35,884 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property: taskmanager.rpc.port, 6122

2020-09-11 02:34:35,885 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property: jobmanager.heap.size, 1g

2020-09-11 02:34:35,886 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property:
taskmanager.memory.process.size, 1g

2020-09-11 02:34:35,886 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property: state.backend, rocksdb

2020-09-11 02:34:35,886 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property: state.checkpoints.dir,
file:///flink_state/checkpoints

2020-09-11 02:34:35,887 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property: state.savepoints.dir,
file:///flink_state/savepoints

2020-09-11 02:34:35,887 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property: state.backend.async, true

2020-09-11 02:34:35,887 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property:
state.backend.fs.memory-threshold, 1024

2020-09-11 02:34:35,888 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property:
state.backend.fs.write-buffer-size, 4096

2020-09-11 02:34:35,888 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property: state.backend.incremental,
true

2020-09-11 02:34:35,888 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property:
state.backend.local-recovery, true

2020-09-11 02:34:35,889 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property:
state.checkpoints.num-retained, 1

2020-09-11 02:34:35,889 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property:
taskmanager.state.local.root-dirs, file:///flink_state/local-recovery

2020-09-11 02:34:35,889 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property:
state.backend.rocksdb.checkpoint.transfer.thread.num, 1

2020-09-11 02:34:35,890 INFO
org.apache.flink.configuration.GlobalConfiguration
  [] - Loading configuration property:
state.backend.rocksdb.localdir, /flink_state/rocksdb

2020

Re: I hit a bad jobmanager address when trying to use Flink SQL Client

2020-09-11 Thread Dan Hill
Hi Robert!

I have Flink running locally on minikube.  I'm running SQL client using
exec on the jobmanager.

kubectl exec pod/flink-jobmanager-0 -i -t -- /opt/flink/bin/sql-client.sh
embedded -e /opt/flink/sql-client-defaults.yaml

Here's the sql-client-defaults.yaml.  I didn't specify a session.
execution:
  type: batch
  result-mode: table
  max-table-result-rows: 100

I'm prototyping the Table SQL interface.  I got blocked using the Table SQL
interface and figured I'd try the SQL Client to see if I could get
unblocked.


On Fri, Sep 11, 2020 at 11:18 AM Robert Metzger  wrote:

> Hi Dan,
>
> the notation of "flink-jobmanager/10.98.253.58:8081" is not a problem. It
> is how java.net.InetAddress stringifies a resolved address (with both
> hostname and IP).
>
> How did you configure the SQL client to work with a Kubernetes Session?
> Afaik this is not a documented, tested and officially supported feature
> (this doesn't mean we should not support it -- apparently it is something
> we should do rather soon ;) ).
>
> Best,
> Robert
>
> On Fri, Sep 11, 2020 at 5:25 AM Dan Hill  wrote:
>
>> I just tried using the Flink SQL Client.  A simple job is not running
>> because it cannot hit jobmanager.  I'm not sure why Flink SQL Client is
>> hitting "flink-jobmanager/10.98.253.58:8081".  I'd expect either
>> "flink-jobmanager:8081" or "10.98.253.58:8081" (which should work with
>> my kubernetes setup).
>>
>> I'm using riskfocus's Flink helm chart
>> <https://github.com/riskfocus/helm-charts-public/tree/master/flink>.
>>
>> The last SELECT errors out.  I have an environment file that indicates
>> this "execution.type: batch".  My setup works when using DataSet and
>> DataStream.  The jobmanager and taskmanager logs look fine.  This seems
>> like a weird configuration with SQL Client that is either broken with that
>> Flink helm chart or with SQL Client.
>>
>>
>> Flink SQL> DROP TABLE `default_catalog.mydb.user`;
>>
>> [INFO] Table has been removed.
>>
>>
>> Flink SQL> CREATE TABLE `default_catalog.mydb.user` (`platformId`
>> BIGINT, `userId` STRING) WITH ('connector' = 'filesystem', 'path' =
>> 's3://mys3bucket/users.csv','format' = 'csv');
>>
>> [INFO] Table has been created.
>>
>>
>> Flink SQL> SELECT * FROM `default_catalog.mydb.user` LIMIT 10;
>>
>> *[ERROR] Could not execute SQL statement. Reason:*
>>
>> *org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException:
>> connection timed out: flink-jobmanager/10.98.253.58:8081
>> <http://10.98.253.58:8081>*
>>
>> My config logs from jobmanager.
>>
>> 2020-09-11 02:33:07,962 INFO  
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend
>> [] - Using predefined options: DEFAULT.
>>
>> 2020-09-11 02:33:07,962 INFO  
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend
>> [] - Using default options factory:
>> DefaultConfigurableOptionsFactory{configuredOptions={}}.
>>
>> 2020-09-11 02:33:08,100 INFO
>> org.apache.flink.table.client.gateway.local.ProgramDeployer  [] -
>> Submitting job org.apache.flink.streaming.api.graph.StreamGraph@3bd08435
>> for query default: SELECT * FROM `default_catalog.mydb.user` LIMIT 10`
>>
>> 2020-09-11 02:33:08,112 INFO  org.apache.flink.configuration.Configuration
>> [] - Config uses fallback configuration key
>> 'jobmanager.rpc.address' instead of key 'rest.address'
>>
>> tail: log/flink--sql-client-flink-jobmanager-0.log: file truncated
>>
>> 2020-09-11 02:34:35,848 INFO  
>> org.apache.flink.configuration.GlobalConfiguration
>>   [] - Loading configuration property:
>> taskmanager.numberOfTaskSlots, 2
>>
>> 2020-09-11 02:34:35,884 INFO  
>> org.apache.flink.configuration.GlobalConfiguration
>>   [] - Loading configuration property: blob.server.port, 6124
>>
>> 2020-09-11 02:34:35,884 INFO  
>> org.apache.flink.configuration.GlobalConfiguration
>>   [] - Loading configuration property: taskmanager.rpc.port, 6122
>>
>> 2020-09-11 02:34:35,885 INFO  
>> org.apache.flink.configuration.GlobalConfiguration
>>   [] - Loading configuration property: jobmanager.heap.size, 1g
>>
>> 2020-09-11 02:34:35,886 INFO  
>> org.apache.flink.configuration.GlobalConfiguration
>>   [] - Loading configuration property:
>> taskmanager.memory.process.size, 

Re: Flink Table API and not recognizing s3 plugins

2020-09-14 Thread Dan Hill
Thanks for the update!

I'm trying a bunch of combinations on the client side to get the S3
Filesystem to be picked up correctly.  Most of my attempts involved
building into the job jar (which I'm guessing won't work).  I then started
getting issues with ClassCastExceptions.

I might try a little more tomorrow (e.g. modifying the custom image).  If I
can't get it, I'll roll back to a previous Flink version that works.

Caused by: java.lang.ClassCastException:
org.codehaus.janino.CompilerFactory cannot be cast to
org.codehaus.commons.compiler.ICompilerFactory

at
org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)

at
org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)

at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:431)

... 51 more


On Mon, Sep 14, 2020 at 7:03 PM Jingsong Li  wrote:

> Hi Dan,
>
> I think Arvid and Dawid are right, as a workaround, you can try making
> S3Filesystem works in the client. But for a long term solution, we can fix
> it.
>
> I created https://issues.apache.org/jira/browse/FLINK-19228 for tracking
> this.
>
> Best,
> Jingsong
>
> On Mon, Sep 14, 2020 at 3:57 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Dan,
>>
>> As far as I checked in the code, the FileSystemSink will try to create
>> staging directories from the client. I think it might be problematic, as
>> your case shows. We might need to revisit that part. I am cc'ing Jingsong
>> who worked on the FileSystemSink.
>>
>> As a workaround you might try putting the s3 plugin on the CLI classpath
>> (not sure if plugins work for the CLI through the /plugins directory).
>>
>> Best,
>>
>> Dawid
>> On 10/09/2020 22:13, Dan Hill wrote:
>>
>> This is running on my local minikube and is trying to hit minio.
>>
>> On Thu, Sep 10, 2020 at 1:10 PM Dan Hill  wrote:
>>
>>> I'm using this Helm chart
>>> <https://github.com/riskfocus/helm-charts-public/tree/master/flink>.  I
>>> start the job by building an image with the job jar and using kubectl apply
>>> to do a flink run with the jar.
>>>
>>> The log4j.properties on jobmanager and taskmanager have debug level set
>>> and are pretty embedded into the Helm chart.  My log4j-cli.properties is
>>> hacked on the CLI side.
>>>
>>> I thought I just needed the s3 plugins in the jobmanager and
>>> taskmanager.  Do I need to have a similar plugin structure from the image
>>> where I run 'flink run'?
>>>
>>>
>>> On Thu, Sep 10, 2020 at 1:03 PM Dan Hill  wrote:
>>>
>>>> Copying more of the log
>>>>
>>>> 2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend
>>>> [] -
>>>> 
>>>>
>>>> 2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend
>>>> [] -  Starting Command Line Client (Version:
>>>> 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)
>>>>
>>>> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>>>> [] -  OS current user: root
>>>>
>>>> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>>>> [] -  Current Hadoop/Kerberos user: >>> dependency found>
>>>>
>>>> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>>>> [] -  JVM: OpenJDK 64-Bit Server VM - Oracle
>>>> Corporation - 1.8/25.265-b01
>>>>
>>>> 2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend
>>>> [] -  Maximum heap size: 2167 MiBytes
>>>>
>>>> tail: log/flink--client-flink-jobmanager-0.log: file truncated
>>>>
>>>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>>> [] -  JAVA_HOME: /usr/local/openjdk-8
>>>>
>>>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>>> [] -  No Hadoop Dependency available
>>>>
>>>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>>>> [] -  JVM Options:
>>>>
>>>> 2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend
>

Re: Flink Table API and not recognizing s3 plugins

2020-09-14 Thread Dan Hill
Yes, the client runs in K8.  It uses a different K8 config than the Helm
chart and does not load the plugins.  Does the client use the same plugin
structure as the Flink job/task manager?  I can try using it tomorrow.

Cool, that link would work too.

Thanks, Arvid!


On Mon, Sep 14, 2020 at 10:59 PM Arvid Heise  wrote:

> Hi Dan,
>
> Are you running the client also in K8s? If so you need an initialization
> step, where you add the library to the plugins directory. Putting it into
> lib or into the user jar doesn't work anymore as we removed the shading in
> s3 in Flink 1.10.
>
> The official Flink docker image has an easy way to add these plugins [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#using-plugins
>
> On Tue, Sep 15, 2020 at 6:40 AM Dan Hill  wrote:
>
>> Thanks for the update!
>>
>> I'm trying a bunch of combinations on the client side to get the S3
>> Filesystem to be picked up correctly.  Most of my attempts involved
>> building into the job jar (which I'm guessing won't work).  I then started
>> getting issues with ClassCastExceptions.
>>
>> I might try a little more tomorrow (e.g. modifying the custom image).  If
>> I can't get it, I'll roll back to a previous Flink version that works.
>>
>> Caused by: java.lang.ClassCastException:
>> org.codehaus.janino.CompilerFactory cannot be cast to
>> org.codehaus.commons.compiler.ICompilerFactory
>>
>> at
>> org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
>>
>> at
>> org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
>>
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:431)
>>
>> ... 51 more
>>
>>
>> On Mon, Sep 14, 2020 at 7:03 PM Jingsong Li 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> I think Arvid and Dawid are right, as a workaround, you can try making
>>> S3Filesystem works in the client. But for a long term solution, we can fix
>>> it.
>>>
>>> I created https://issues.apache.org/jira/browse/FLINK-19228 for
>>> tracking this.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Mon, Sep 14, 2020 at 3:57 PM Dawid Wysakowicz 
>>> wrote:
>>>
>>>> Hi Dan,
>>>>
>>>> As far as I checked in the code, the FileSystemSink will try to create
>>>> staging directories from the client. I think it might be problematic, as
>>>> your case shows. We might need to revisit that part. I am cc'ing Jingsong
>>>> who worked on the FileSystemSink.
>>>>
>>>> As a workaround you might try putting the s3 plugin on the CLI
>>>> classpath (not sure if plugins work for the CLI through the /plugins
>>>> directory).
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>> On 10/09/2020 22:13, Dan Hill wrote:
>>>>
>>>> This is running on my local minikube and is trying to hit minio.
>>>>
>>>> On Thu, Sep 10, 2020 at 1:10 PM Dan Hill  wrote:
>>>>
>>>>> I'm using this Helm chart
>>>>> <https://github.com/riskfocus/helm-charts-public/tree/master/flink>.
>>>>> I start the job by building an image with the job jar and using kubectl
>>>>> apply to do a flink run with the jar.
>>>>>
>>>>> The log4j.properties on jobmanager and taskmanager have debug level
>>>>> set and are pretty embedded into the Helm chart.  My log4j-cli.properties
>>>>> is hacked on the CLI side.
>>>>>
>>>>> I thought I just needed the s3 plugins in the jobmanager and
>>>>> taskmanager.  Do I need to have a similar plugin structure from the image
>>>>> where I run 'flink run'?
>>>>>
>>>>>
>>>>> On Thu, Sep 10, 2020 at 1:03 PM Dan Hill 
>>>>> wrote:
>>>>>
>>>>>> Copying more of the log
>>>>>>
>>>>>> 2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend
>>>>>> [] -
>>>>>> 
>>>>>>
>>>>>> 2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend
>>>>>>

Re: Flink Table API and not recognizing s3 plugins

2020-09-15 Thread Dan Hill
Sweet, this was the issue.  I got this to work by copying the s3 jar over
to plugins for the client container.

Thanks for all of the help!  The Table API is sweet!

On Mon, Sep 14, 2020 at 11:14 PM Dan Hill  wrote:

> Yes, the client runs in K8.  It uses a different K8 config than the Helm
> chart and does not load the plugins.  Does the client use the same plugin
> structure as the Flink job/task manager?  I can try using it tomorrow.
>
> Cool, that link would work too.
>
> Thanks, Arvid!
>
>
> On Mon, Sep 14, 2020 at 10:59 PM Arvid Heise  wrote:
>
>> Hi Dan,
>>
>> Are you running the client also in K8s? If so you need an initialization
>> step, where you add the library to the plugins directory. Putting it into
>> lib or into the user jar doesn't work anymore as we removed the shading in
>> s3 in Flink 1.10.
>>
>> The official Flink docker image has an easy way to add these plugins [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#using-plugins
>>
>> On Tue, Sep 15, 2020 at 6:40 AM Dan Hill  wrote:
>>
>>> Thanks for the update!
>>>
>>> I'm trying a bunch of combinations on the client side to get the S3
>>> Filesystem to be picked up correctly.  Most of my attempts involved
>>> building into the job jar (which I'm guessing won't work).  I then started
>>> getting issues with ClassCastExceptions.
>>>
>>> I might try a little more tomorrow (e.g. modifying the custom image).
>>> If I can't get it, I'll roll back to a previous Flink version that works.
>>>
>>> Caused by: java.lang.ClassCastException:
>>> org.codehaus.janino.CompilerFactory cannot be cast to
>>> org.codehaus.commons.compiler.ICompilerFactory
>>>
>>> at
>>> org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
>>>
>>> at
>>> org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
>>>
>>> at
>>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:431)
>>>
>>> ... 51 more
>>>
>>>
>>> On Mon, Sep 14, 2020 at 7:03 PM Jingsong Li 
>>> wrote:
>>>
>>>> Hi Dan,
>>>>
>>>> I think Arvid and Dawid are right, as a workaround, you can try making
>>>> S3Filesystem works in the client. But for a long term solution, we can fix
>>>> it.
>>>>
>>>> I created https://issues.apache.org/jira/browse/FLINK-19228 for
>>>> tracking this.
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>> On Mon, Sep 14, 2020 at 3:57 PM Dawid Wysakowicz <
>>>> dwysakow...@apache.org> wrote:
>>>>
>>>>> Hi Dan,
>>>>>
>>>>> As far as I checked in the code, the FileSystemSink will try to create
>>>>> staging directories from the client. I think it might be problematic, as
>>>>> your case shows. We might need to revisit that part. I am cc'ing Jingsong
>>>>> who worked on the FileSystemSink.
>>>>>
>>>>> As a workaround you might try putting the s3 plugin on the CLI
>>>>> classpath (not sure if plugins work for the CLI through the /plugins
>>>>> directory).
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>> On 10/09/2020 22:13, Dan Hill wrote:
>>>>>
>>>>> This is running on my local minikube and is trying to hit minio.
>>>>>
>>>>> On Thu, Sep 10, 2020 at 1:10 PM Dan Hill 
>>>>> wrote:
>>>>>
>>>>>> I'm using this Helm chart
>>>>>> <https://github.com/riskfocus/helm-charts-public/tree/master/flink>.
>>>>>> I start the job by building an image with the job jar and using kubectl
>>>>>> apply to do a flink run with the jar.
>>>>>>
>>>>>> The log4j.properties on jobmanager and taskmanager have debug level
>>>>>> set and are pretty embedded into the Helm chart.  My log4j-cli.properties
>>>>>> is hacked on the CLI side.
>>>>>>
>>>>>> I thought I just needed the s3 plugins in the jobmanager and
>>>>>> taskmanager.  Do I need to have a similar plugin structure from the image
>>>>>> where I run 

Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-15 Thread Dan Hill
How do I avoid unnecessary reshuffles when using Kafka as input?  My keys
in Kafka are ~userId.  The first few stages do joins that are usually
(userId, someOtherKeyId).  It makes sense for these joins to stay on the
same machine and avoid unnecessary shuffling.

What's the best way to avoid unnecessary shuffling when using Table SQL
interface?  I see PARTITION BY on TABLE.  I'm not sure how to specify the
keys for Kafka.


Error: Avro "CREATE TABLE" with nested rows and missing fields

2020-09-16 Thread Dan Hill
I might be misunderstanding Flink Avro support.  I assumed not including a
field in "CREATE TABLE" would work fine.  If I leave out any field before a
nested row, "CREATE TABLE" fails.  If I include all of the fields, this
succeeds.  I assumed fields would be optional.

I'm using Flink v1.11.1 with the Table SQL API.

*Problem*
If I do not include one of the fields, I get the following exception.  If I
add back the missing field, "contentId", this works.

"CREATE TABLE `default.mydb.mytable` (\n" +
"`userId` STRING, \n" +
"`timeEpochMillis` BIGINT, \n" +
//"`contentId` BIGINT, \n" +
"`contentDetails` ROW<\n" +
"`contentId` BIGINT >\n" +
") WITH (...)\n"


Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to
org.apache.avro.generic.IndexedRecord
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:
203)
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema
.java:221)
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:
206)
at org.apache.flink.formats.avro.
AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(
AvroFileSystemFormatFactory.java:204)
at org.apache.flink.streaming.api.functions.source.
InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)


Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-16 Thread Dan Hill
Hi Piotr!  Yes, that's what I'm using with DataStream.  It works well in my
prototype.

On Wed, Sep 16, 2020 at 8:58 AM Piotr Nowojski  wrote:

> Hi,
>
> Have you seen "Reinterpreting a pre-partitioned data stream as keyed
> stream" feature? [1] However I'm not sure if and how can it be integrated
> with the Table API. Maybe someone more familiar with the Table API can help
> with that?
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>
> śr., 16 wrz 2020 o 05:35 Dan Hill  napisał(a):
>
>> How do I avoid unnecessary reshuffles when using Kafka as input?  My keys
>> in Kafka are ~userId.  The first few stages do joins that are usually
>> (userId, someOtherKeyId).  It makes sense for these joins to stay on the
>> same machine and avoid unnecessary shuffling.
>>
>> What's the best way to avoid unnecessary shuffling when using Table SQL
>> interface?  I see PARTITION BY on TABLE.  I'm not sure how to specify the
>> keys for Kafka.
>>
>>
>>
>>
>>


Re: Error: Avro "CREATE TABLE" with nested rows and missing fields

2020-09-16 Thread Dan Hill
Interesting.  How does schema evolution work with Avro and Flink?  E.g.
adding new fields or enum values.

On Wed, Sep 16, 2020 at 12:13 PM Dawid Wysakowicz 
wrote:

> Hi Dan,
>
> I'd say this is a result of a few assumptions.
>
>1. We try to separate the concept of format from the connector.
>Therefore we did not make too many assumption which connector does a format
>work with.
>2. Avro needs the original schema that the incoming record was
>serialized with. It will not work with just an expected schema. Even if it
>is "compatible" with the old one.
>3. The most common use case for Avro we see is that it is used for
>stream processing (e.g. Avro encoded messages in Kafka). In that scenario
>we do not have the schema encoded alongside the data. Therefore we assume
>the DDL is the only source of truth for the schema of the data (that is
>also true for other formats such as e.g. JSON or CSV). I agree in case of
>Avro files we have the original schema encoded in files. It would
>contradict with the assumption that DDL is the original schema.
>
> However I think it is a valid scenario to support such a case for the
> combination of avro + filesystem. Honestly we do that already for
> avro-schema-registry format (where we look up the writer schema in SR and
> convert to the schema of DDL). Moreover it should be relatively easy to do
> that. @Jingsong What do you think?
>
> Best,
>
> Dawid
>
>
> On 16/09/2020 09:49, Dan Hill wrote:
>
> I might be misunderstanding Flink Avro support.  I assumed not including a
> field in "CREATE TABLE" would work fine.  If I leave out any field before a
> nested row, "CREATE TABLE" fails.  If I include all of the fields, this
> succeeds.  I assumed fields would be optional.
>
> I'm using Flink v1.11.1 with the Table SQL API.
>
> *Problem*
> If I do not include one of the fields, I get the following exception.  If
> I add back the missing field, "contentId", this works.
>
> "CREATE TABLE `default.mydb.mytable` (\n" +
> "`userId` STRING, \n" +
> "`timeEpochMillis` BIGINT, \n" +
> //"`contentId` BIGINT, \n" +"`contentDetails` ROW<\n" +
> "`contentId` BIGINT >\n" +") WITH (...)\n"
>
>
> Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to
> org.apache.avro.generic.IndexedRecord
> at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
> .lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema
> .java:203)
> at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
> .lambda$createNullableConverter$c3bac5d8$1(
> AvroRowDataDeserializationSchema.java:221)
> at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
> .lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema
> .java:206)
> at org.apache.flink.formats.avro.
> AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(
> AvroFileSystemFormatFactory.java:204)
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
> at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:100)
> at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:63)
> at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>
>


Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-16 Thread Dan Hill
Hi Dawid!

I see.  Yea, this would break my job after I move away from the prototype.

How do other Flink devs avoid unnecessary reshuffles when sourcing data
from Kafka?  Is the Table API early or not used often?




On Wed, Sep 16, 2020 at 12:31 PM Dawid Wysakowicz 
wrote:

> Hi Dan,
>
> I am afraid there is no mechanism to do that purely in the Table API yet.
> Or I am not aware of one. If the reinterpretAsKeyedStream works for you,
> you could use this approach and convert a DataStream (with the
> reinterpretAsKeyedStream applied) to a Table[1] and then continue with the
> Table API.
>
> On the topic of reinterpretAsKeyedStream, I wanted to stress out one
> thing. I'd like to bring your attention to this warning:
>
> *WARNING*: The re-interpreted data stream *MUST* already be
> pre-partitioned in *EXACTLY* the same way Flink’s keyBy would partition
> the data in a shuffle w.r.t. key-group assignment.
>
> I think it is not trivial(or even not possible?) to achieve unless both
> the producer and the consumer are Flink jobs with the same parallelism.
>
> Best,
>
> Dawid
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-datastream-or-dataset-into-a-table
> On 16/09/2020 18:22, Dan Hill wrote:
>
> Hi Piotr!  Yes, that's what I'm using with DataStream.  It works well in
> my prototype.
>
> On Wed, Sep 16, 2020 at 8:58 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Have you seen "Reinterpreting a pre-partitioned data stream as keyed
>> stream" feature? [1] However I'm not sure if and how can it be integrated
>> with the Table API. Maybe someone more familiar with the Table API can help
>> with that?
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>>
>> śr., 16 wrz 2020 o 05:35 Dan Hill  napisał(a):
>>
>>> How do I avoid unnecessary reshuffles when using Kafka as input?  My
>>> keys in Kafka are ~userId.  The first few stages do joins that are usually
>>> (userId, someOtherKeyId).  It makes sense for these joins to stay on the
>>> same machine and avoid unnecessary shuffling.
>>>
>>> What's the best way to avoid unnecessary shuffling when using Table SQL
>>> interface?  I see PARTITION BY on TABLE.  I'm not sure how to specify the
>>> keys for Kafka.
>>>
>>>
>>>
>>>
>>>


Re: Error: Avro "CREATE TABLE" with nested rows and missing fields

2020-09-17 Thread Dan Hill
Thanks Dawid!

You answered most of my questions:
1) Kafka to Flink - Is the most common practice to use the Confluent Schema
Registry and then use ConfluentRegistryAvroDeserializationSchema?
2) Flink to State - great.
3) Flink to Avro file output - great.
4) Avro file output to Flink (batch) - Yes, I assumed this convert from the
schema in the file to the SQL schema.

I'm sorry if this was a basic question.  The previous systems I've designed
use Protobufs.  Evolution was a lot easier (if tags are backwards
compatible).



On Thu, Sep 17, 2020 at 3:33 AM Dawid Wysakowicz 
wrote:

> Hi Dan,
>
> It depends which part of the system you have in mind. Generally though
> Avro itself does need the original schema of the record it was written
> with. There are a couple of alternatives. You have
> RegistryAvroDeserializationSchema for DataStream, which looks up the old
> schema in schema registry (It will be exposed in SQL in 1.12[1]). If we are
> talking about schema migration for state objects, we persist the schema
> with which the state was written. So that upon restore we have the old
> schema and possibly new schema.
>
> In case of avro files we could/should probably use the schema from the
> file.
>
> Best,
>
> Dawid
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-16048
> On 16/09/2020 21:20, Dan Hill wrote:
>
> Interesting.  How does schema evolution work with Avro and Flink?  E.g.
> adding new fields or enum values.
>
> On Wed, Sep 16, 2020 at 12:13 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Dan,
>>
>> I'd say this is a result of a few assumptions.
>>
>>1. We try to separate the concept of format from the connector.
>>Therefore we did not make too many assumption which connector does a 
>> format
>>work with.
>>2. Avro needs the original schema that the incoming record was
>>serialized with. It will not work with just an expected schema. Even if it
>>is "compatible" with the old one.
>>3. The most common use case for Avro we see is that it is used for
>>stream processing (e.g. Avro encoded messages in Kafka). In that scenario
>>we do not have the schema encoded alongside the data. Therefore we assume
>>the DDL is the only source of truth for the schema of the data (that is
>>also true for other formats such as e.g. JSON or CSV). I agree in case of
>>Avro files we have the original schema encoded in files. It would
>>contradict with the assumption that DDL is the original schema.
>>
>> However I think it is a valid scenario to support such a case for the
>> combination of avro + filesystem. Honestly we do that already for
>> avro-schema-registry format (where we look up the writer schema in SR and
>> convert to the schema of DDL). Moreover it should be relatively easy to do
>> that. @Jingsong What do you think?
>>
>> Best,
>>
>> Dawid
>>
>>
>> On 16/09/2020 09:49, Dan Hill wrote:
>>
>> I might be misunderstanding Flink Avro support.  I assumed not including
>> a field in "CREATE TABLE" would work fine.  If I leave out any field before
>> a nested row, "CREATE TABLE" fails.  If I include all of the fields, this
>> succeeds.  I assumed fields would be optional.
>>
>> I'm using Flink v1.11.1 with the Table SQL API.
>>
>> *Problem*
>> If I do not include one of the fields, I get the following exception.  If
>> I add back the missing field, "contentId", this works.
>>
>> "CREATE TABLE `default.mydb.mytable` (\n" +
>> "`userId` STRING, \n" +
>> "`timeEpochMillis` BIGINT, \n" +
>> //"`contentId` BIGINT, \n" +"`contentDetails` ROW<\n" +
>> "`contentId` BIGINT >\n" +") WITH (...)\n"
>>
>>
>> Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast
>> to org.apache.avro.generic.IndexedRecord
>> at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
>> .lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema
>> .java:203)
>> at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
>> .lambda$createNullableConverter$c3bac5d8$1(
>> AvroRowDataDeserializationSchema.java:221)
>> at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
>> .lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema
>> .java:206)
>> at org.apache.flink.formats.avro.
>> AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(
>> AvroFileSystemFormatFactory.java:204)
>> at org.apache.flink.streaming.api.functions.source.
>> InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
>> at org.apache.flink.streaming.api.operators.StreamSource.run(
>> StreamSource.java:100)
>> at org.apache.flink.streaming.api.operators.StreamSource.run(
>> StreamSource.java:63)
>> at org.apache.flink.streaming.runtime.tasks.
>> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201
>> )
>>
>>


Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-17 Thread Dan Hill
Hi Godfrey!

I'll describe the overall setup and then I'll describe the joins.

One of the goals of my Flink jobs is to join incoming log records  (User,
Session, PageView, Requests, Insertions, Impressions, etc) and do useful
things with the joined results.

Input = Kafka.  Value = batch log records.  Key = userId.

The volume for any single user will be pretty small.  All of the events
could be stored on the same machine for the same user and joins could be
done locally.

My first stream implementation keeps userId as the key and then I have my
own left join logic using MapState.  For my batch job, I use
DataSet.leftOuterJoins.  I'm okay if the batch job does more shuffling
(pretty simple and less latency sensitive).

I'm evaluating Table API based on a recommendation around raw log output.
I started evaluating it for more of the Flink jobs.









On Thu, Sep 17, 2020 at 5:12 AM godfrey he  wrote:

> Hi Dan,
>
> What kind of joins [1] you are using? Currently, only temporal join and
> join with table function
> do not reshuffle the input data in Table API and SQL, other joins always
> reshuffle the input data
> based on join keys.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins
>
> Best,
> Godfrey
>
>
> Dan Hill  于2020年9月17日周四 上午3:44写道:
>
>> Hi Dawid!
>>
>> I see.  Yea, this would break my job after I move away from the prototype.
>>
>> How do other Flink devs avoid unnecessary reshuffles when sourcing data
>> from Kafka?  Is the Table API early or not used often?
>>
>>
>>
>>
>> On Wed, Sep 16, 2020 at 12:31 PM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> I am afraid there is no mechanism to do that purely in the Table API
>>> yet. Or I am not aware of one. If the reinterpretAsKeyedStream works for
>>> you, you could use this approach and convert a DataStream (with the
>>> reinterpretAsKeyedStream applied) to a Table[1] and then continue with the
>>> Table API.
>>>
>>> On the topic of reinterpretAsKeyedStream, I wanted to stress out one
>>> thing. I'd like to bring your attention to this warning:
>>>
>>> *WARNING*: The re-interpreted data stream *MUST* already be
>>> pre-partitioned in *EXACTLY* the same way Flink’s keyBy would partition
>>> the data in a shuffle w.r.t. key-group assignment.
>>>
>>> I think it is not trivial(or even not possible?) to achieve unless both
>>> the producer and the consumer are Flink jobs with the same parallelism.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-datastream-or-dataset-into-a-table
>>> On 16/09/2020 18:22, Dan Hill wrote:
>>>
>>> Hi Piotr!  Yes, that's what I'm using with DataStream.  It works well in
>>> my prototype.
>>>
>>> On Wed, Sep 16, 2020 at 8:58 AM Piotr Nowojski 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Have you seen "Reinterpreting a pre-partitioned data stream as keyed
>>>> stream" feature? [1] However I'm not sure if and how can it be integrated
>>>> with the Table API. Maybe someone more familiar with the Table API can help
>>>> with that?
>>>>
>>>> Piotrek
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>>>>
>>>> śr., 16 wrz 2020 o 05:35 Dan Hill  napisał(a):
>>>>
>>>>> How do I avoid unnecessary reshuffles when using Kafka as input?  My
>>>>> keys in Kafka are ~userId.  The first few stages do joins that are usually
>>>>> (userId, someOtherKeyId).  It makes sense for these joins to stay on the
>>>>> same machine and avoid unnecessary shuffling.
>>>>>
>>>>> What's the best way to avoid unnecessary shuffling when using Table
>>>>> SQL interface?  I see PARTITION BY on TABLE.  I'm not sure how to specify
>>>>> the keys for Kafka.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>


Re: I hit a bad jobmanager address when trying to use Flink SQL Client

2020-09-17 Thread Dan Hill
Hi Robert!  Sorry for the delay.  This worked!  Thanks!  I used slightly
different deployment parameters.

deployment:
  gateway-address: flink-jobmanager
  gateway-port: 8081

On Mon, Sep 14, 2020 at 6:21 AM Robert Metzger  wrote:

> Hi Dan,
>
> I don't think the SQL Client officially supports running against
> Kubernetes.
> What you could try is using an undocumented, untested feature:
> Put something like
> jobmanager: kubernetes
> into the "deployment:" section of the Sql Client configuration
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html#environment-files>
> .
> Proper support for Kubernetes, YARN etc. is (probably stalled) work in
> progress
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html#limitations--future>
> .
>
>
> On Mon, Sep 14, 2020 at 2:18 PM Arvid Heise  wrote:
>
>> Hi Dan,
>>
>> Can you verify from the pod that jobmanager and *10.98.253.58:8081
>> <http://10.98.253.58:8081> *is actually accessible (e.g., with curl)?
>> I'd probably also try out localhost:8081 as you are connecting to the
>> respective pod directly.
>>
>> On Fri, Sep 11, 2020 at 9:59 PM Dan Hill  wrote:
>>
>>> Hi Robert!
>>>
>>> I have Flink running locally on minikube.  I'm running SQL client using
>>> exec on the jobmanager.
>>>
>>> kubectl exec pod/flink-jobmanager-0 -i -t --
>>> /opt/flink/bin/sql-client.sh embedded -e /opt/flink/sql-client-defaults.yaml
>>>
>>>
>>> Here's the sql-client-defaults.yaml.  I didn't specify a session.
>>> execution:
>>>   type: batch
>>>   result-mode: table
>>>   max-table-result-rows: 100
>>>
>>> I'm prototyping the Table SQL interface.  I got blocked using the Table
>>> SQL interface and figured I'd try the SQL Client to see if I could get
>>> unblocked.
>>>
>>>
>>> On Fri, Sep 11, 2020 at 11:18 AM Robert Metzger 
>>> wrote:
>>>
>>>> Hi Dan,
>>>>
>>>> the notation of "flink-jobmanager/10.98.253.58:8081" is not a problem.
>>>> It is how java.net.InetAddress stringifies a resolved address (with
>>>> both hostname and IP).
>>>>
>>>> How did you configure the SQL client to work with a Kubernetes Session?
>>>> Afaik this is not a documented, tested and officially supported feature
>>>> (this doesn't mean we should not support it -- apparently it is something
>>>> we should do rather soon ;) ).
>>>>
>>>> Best,
>>>> Robert
>>>>
>>>> On Fri, Sep 11, 2020 at 5:25 AM Dan Hill  wrote:
>>>>
>>>>> I just tried using the Flink SQL Client.  A simple job is not running
>>>>> because it cannot hit jobmanager.  I'm not sure why Flink SQL Client is
>>>>> hitting "flink-jobmanager/10.98.253.58:8081".  I'd expect either
>>>>> "flink-jobmanager:8081" or "10.98.253.58:8081" (which should work
>>>>> with my kubernetes setup).
>>>>>
>>>>> I'm using riskfocus's Flink helm chart
>>>>> <https://github.com/riskfocus/helm-charts-public/tree/master/flink>.
>>>>>
>>>>> The last SELECT errors out.  I have an environment file that indicates
>>>>> this "execution.type: batch".  My setup works when using DataSet and
>>>>> DataStream.  The jobmanager and taskmanager logs look fine.  This seems
>>>>> like a weird configuration with SQL Client that is either broken with that
>>>>> Flink helm chart or with SQL Client.
>>>>>
>>>>>
>>>>> Flink SQL> DROP TABLE `default_catalog.mydb.user`;
>>>>>
>>>>> [INFO] Table has been removed.
>>>>>
>>>>>
>>>>> Flink SQL> CREATE TABLE `default_catalog.mydb.user` (`platformId`
>>>>> BIGINT, `userId` STRING) WITH ('connector' = 'filesystem', 'path' =
>>>>> 's3://mys3bucket/users.csv','format' = 'csv');
>>>>>
>>>>> [INFO] Table has been created.
>>>>>
>>>>>
>>>>> Flink SQL> SELECT * FROM `default_catalog.mydb.user` LIMIT 10;
>>>>>
>>>>> *[ERROR] Could not execute SQL statement. Reason:*
>>>>>
>>>>> *org.apache.flink.shaded.netty4.

Flink Table SQL and writing nested Avro files

2020-09-18 Thread Dan Hill
Hi!

I want to join two tables and write the results to Avro where the left and
right rows are nested in the avro output.  Is it possible to do this with
the SQL interface?

Thanks!
- Dan

 CREATE TABLE `flat_avro` (
   `left` ROW,
   `right` ROW
) WITH (
   'connector' = 'filesystem',
   'path' = 's3p://blah/blah',
   'format' = 'avro'
);

INSERT INTO `flat_avro`
SELECT left.*, right.*
FROM `left`
LEFT JOIN `right`
ON `left`.`id` = `right`.`id`
);


Flink SQL - can I have multiple outputs per job?

2020-09-18 Thread Dan Hill
I have a few results that I want to produce.
- A join B
- A join B join C
- A join B join C join D
- A join B join C join D join E

When I use the DataSet API directly, I can execute all of these in the same
job to reduce redundancy.  When I use the SQL interface, it looks like
separate jobs are created for each of these (duplicating join calculations).

Is there a way to merge these joins?


Re: Flink SQL - can I have multiple outputs per job?

2020-09-19 Thread Dan Hill
I figured it out.  TableEnvironment.StatementSet.

Semi-related, query optimizers can mess up the reuse depending on which
tables the join IDs come from.






On Fri, Sep 18, 2020 at 9:40 PM Dan Hill  wrote:

> I have a few results that I want to produce.
> - A join B
> - A join B join C
> - A join B join C join D
> - A join B join C join D join E
>
> When I use the DataSet API directly, I can execute all of these in the
> same job to reduce redundancy.  When I use the SQL interface, it looks like
> separate jobs are created for each of these (duplicating join calculations).
>
> Is there a way to merge these joins?
>


Flink Table SQL and Job Names

2020-09-19 Thread Dan Hill
I'm getting names like "insert-into_{catalog}.{db}.{table}".  Is there a
way to customize this with non-deprecated methods?


How to stop multiple Flink jobs of the same name from being created?

2020-09-20 Thread Dan Hill
I'm prototyping with Flink SQL.  I'm iterating on a client job with
multiple INSERT INTOs.  Whenever I have an error, my Kubernetes job
retries.  This creates multiple stream jobs with the same names.

Is it up to clients to delete the existing jobs?  I see Flink CLI functions
for this.  Do most people usually do this from inside their client jar or
their wrapper code (e.g. Kubernetes job).

- Dan


Re: How to stop multiple Flink jobs of the same name from being created?

2020-09-20 Thread Dan Hill
I've read the following upgrade application page
<https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html>.
This seems to focus on doing this in a wrapper layer (e.g. Kubernetes).
Just checking to see if this is the common practice or do people do this
from their client jars.



On Sun, Sep 20, 2020 at 5:13 PM Dan Hill  wrote:

> I'm prototyping with Flink SQL.  I'm iterating on a client job with
> multiple INSERT INTOs.  Whenever I have an error, my Kubernetes job
> retries.  This creates multiple stream jobs with the same names.
>
> Is it up to clients to delete the existing jobs?  I see Flink CLI
> functions for this.  Do most people usually do this from inside their
> client jar or their wrapper code (e.g. Kubernetes job).
>
> - Dan
>


Re: How to stop multiple Flink jobs of the same name from being created?

2020-09-22 Thread Dan Hill
Hi Yang!

The multiple "INSERT INTO" jobs all go to the same Flink cluster.  I'm
using this Helm chart
<https://github.com/riskfocus/helm-charts-public/tree/master/flink> (which
looks like the standalone option).  I deploy the job using a simple k8
Job.  Sounds like I should do this myself.  Thanks!

Thanks!
- Dan



On Tue, Sep 22, 2020 at 5:37 AM Yang Wang  wrote:

> Hi Dan,
>
> First, I want to get more information about your submission so that we
> could make the question clear.
>
> Are you using TableEnvironment to execute multiple "INSERT INTO" sentences
> and find that each one will
> be executed in a separated Flink cluster? It is really strange, and I want
> to know how your are deploying your
> Flink cluster on Kubernetes, via standalone[1] or native integration[2].
> If it is the former, I am afraid you need
> `kubectl` to start/stop your Flink application manually. If it is the
> latter, I think the Flink cluster will be destroyed
> automatically when the Flink job failed. Also all the SQL jobs will be
> executed in a shared Flink application.
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>
>
> Best,
> Yang
>
> Dan Hill  于2020年9月21日周一 上午8:15写道:
>
>> I've read the following upgrade application page
>> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html>.
>> This seems to focus on doing this in a wrapper layer (e.g. Kubernetes).
>> Just checking to see if this is the common practice or do people do this
>> from their client jars.
>>
>>
>>
>> On Sun, Sep 20, 2020 at 5:13 PM Dan Hill  wrote:
>>
>>> I'm prototyping with Flink SQL.  I'm iterating on a client job with
>>> multiple INSERT INTOs.  Whenever I have an error, my Kubernetes job
>>> retries.  This creates multiple stream jobs with the same names.
>>>
>>> Is it up to clients to delete the existing jobs?  I see Flink CLI
>>> functions for this.  Do most people usually do this from inside their
>>> client jar or their wrapper code (e.g. Kubernetes job).
>>>
>>> - Dan
>>>
>>


Re: Flink Table SQL and writing nested Avro files

2020-09-22 Thread Dan Hill
Nice!  I'll try that.  Thanks, Dawid!

On Mon, Sep 21, 2020 at 2:37 AM Dawid Wysakowicz 
wrote:

> Hi Dan,
>
> I think the best what I can suggest is this:
>
> SELECT
>
> ROW(left.field0, left.field1, left.field2, ...),
>
> ROW(right.field0, right.field1, right.field2, ...)
>
> FROM ...
>
> You will need to list all the fields manually, as SQL does not allow for
> asterisks in regular function calls.
>
> If you are willing to give the Table API a try you might workaround some
> of the manual work with the Column Function[1]
>
> Table join = t1.join(t2).where($("id1").isEqual($("id2")));
> join
> .select(
> row(withColumns(range(1, t1.getSchema().getFieldCount(,
> row(withColumns(range(
> t1.getSchema().getFieldCount() + 1,
> t1.getSchema().getFieldCount() +
> t2.getSchema().getFieldCount(
> )
> .executeInsert("flat_avro")
> .await();
>
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#column-functions
> On 18/09/2020 09:47, Dan Hill wrote:
>
> Hi!
>
> I want to join two tables and write the results to Avro where the left and
> right rows are nested in the avro output.  Is it possible to do this with
> the SQL interface?
>
> Thanks!
> - Dan
>
>  CREATE TABLE `flat_avro` (
>`left` ROW,
>`right` ROW) WITH (
>'connector' = 'filesystem',
>'path' = 's3p://blah/blah',
>'format' = 'avro');INSERT INTO `flat_avro` SELECT left.*, right.* FROM 
> `left` LEFT JOIN `right`ON `left`.`id` = `right`.`id`);
>
>


Back pressure with multiple joins

2020-09-22 Thread Dan Hill
Hi!

My goal is to better understand how my code impacts streaming throughput.

I have a streaming job where I join multiple tables (A, B, C, D) using
interval joins.

Case 1) If I have 3 joins in the same query, I don't hit back pressure.

SELECT ...
FROM A
LEFT JOIN B
ON...
LEFT JOIN C
ON...
LEFT JOIN D
ON...


Case 2) If I create temporary views for two of the joins (for reuse with
another query), I hit back a lot of back pressure.  This is selecting
slightly more fields than the first.

CREATE TEMPORARY VIEW `AB`

SELECT ...
FROM A
LEFT JOIN B
...

CREATE TEMPORARY VIEW `ABC`
SELECT ...
FROM AB
LEFT JOIN C
...



Can Temporary Views increase back pressure?

If A, B, C and D are roughly the same size (fake data), does the join order
matter?  E.g. I assume reducing the size of the columns in each join stage
would help.

Thanks!
- Dan


Re: Back pressure with multiple joins

2020-09-22 Thread Dan Hill
When I use DataStream and implement the join myself, I can get 50x the
throughput.  I assume I'm doing something wrong with Flink's Table API and
SQL interface.

On Tue, Sep 22, 2020 at 11:21 PM Dan Hill  wrote:

> Hi!
>
> My goal is to better understand how my code impacts streaming throughput.
>
> I have a streaming job where I join multiple tables (A, B, C, D) using
> interval joins.
>
> Case 1) If I have 3 joins in the same query, I don't hit back pressure.
>
> SELECT ...
> FROM A
> LEFT JOIN B
> ON...
> LEFT JOIN C
> ON...
> LEFT JOIN D
> ON...
>
>
> Case 2) If I create temporary views for two of the joins (for reuse with
> another query), I hit back a lot of back pressure.  This is selecting
> slightly more fields than the first.
>
> CREATE TEMPORARY VIEW `AB`
>
> SELECT ...
> FROM A
> LEFT JOIN B
> ...
>
> CREATE TEMPORARY VIEW `ABC`
> SELECT ...
> FROM AB
> LEFT JOIN C
> ...
>
>
>
> Can Temporary Views increase back pressure?
>
> If A, B, C and D are roughly the same size (fake data), does the join
> order matter?  E.g. I assume reducing the size of the columns in each join
> stage would help.
>
> Thanks!
> - Dan
>
>
>


Reusing Flink SQL Client's Environment for Flink pipelines

2020-09-23 Thread Dan Hill
Has anyone tried to reused the Flink SQL Client's yaml Environment

configuration
for their production setups?  It seems pretty flexible.

I like most of the logic inside ExecutionContext.java
.
I'm curious to learn about issues from other Flink developers.


Best way to resolve bottlenecks with Flink?

2020-09-24 Thread Dan Hill
My job has very slow throughput.  What are the best signals that will
indicate if there are performance issues?

Is there an overall health summary that would indicate the most likely
issues impacting performance?

I found a variety of pages and metrics.  I resolved some of the
backpressure in my job.  I looked at memory issues.  My direct "Outside
JVM" is full (but I'm not sure if that's normal).


Re: Back pressure with multiple joins

2020-09-26 Thread Dan Hill
I can't reproduce the issue now.  I'm using the same commits so I'm
guessing another environment factor is at play.  The explains are pretty
similar (there's an extra LogicalProject when there is an extra view).

On Fri, Sep 25, 2020 at 12:55 AM Timo Walther  wrote:

> Hi Dan,
>
> could you share the plan with us using `TableEnvironment.explainSql()`
> for both queries?
>
> In general, views should not have an impact on the performance. They are
> a logical concept that gives a bunch of operations a name. The contained
> operations are inlined into the bigger query during optimization.
>
> Unless you execute multiple queries in a StatementSet, the the data is
> read twice from the source. How do you execute the SQL stataments?
>
> For deterministic plans, join reordering is disabled by default. You can
> set it via:
>
>
> org.apache.flink.table.api.config.OptimizerConfigOptions#TABLE_OPTIMIZER_JOIN_REORDER_ENABLED
>
> Regards,
> Timo
>
> On 23.09.20 08:23, Dan Hill wrote:
> > When I use DataStream and implement the join myself, I can get 50x the
> > throughput.  I assume I'm doing something wrong with Flink's Table API
> > and SQL interface.
> >
> > On Tue, Sep 22, 2020 at 11:21 PM Dan Hill  > <mailto:quietgol...@gmail.com>> wrote:
> >
> > Hi!
> >
> > My goal is to better understand how my code impacts streaming
> > throughput.
> >
> > I have a streaming job where I join multiple tables (A, B, C, D)
> > using interval joins.
> >
> > Case 1) If I have 3 joins in the same query, I don't hit back
> pressure.
> >
> > SELECT ...
> > FROM A
> > LEFT JOIN B
> > ON...
> > LEFT JOIN C
> > ON...
> > LEFT JOIN D
> > ON...
> >
> >
> > Case 2) If I create temporary views for two of the joins (for reuse
> > with another query), I hit back a lot of back pressure.  This is
> > selecting slightly more fields than the first.
> >
> > CREATE TEMPORARY VIEW `AB`
> >
> > SELECT ...
> > FROM A
> > LEFT JOIN B
> > ...
> >
> > CREATE TEMPORARY VIEW `ABC`
> > SELECT ...
> > FROM AB
> > LEFT JOIN C
> > ...
> >
> >
> >
> > Can Temporary Views increase back pressure?
> >
> > If A, B, C and D are roughly the same size (fake data), does the
> > join order matter?  E.g. I assume reducing the size of the columns
> > in each join stage would help.
> >
> > Thanks!
> > - Dan
> >
> >
>
>


Hiring Flink developers

2020-09-26 Thread Dan Hill
I'm looking to hire Flink developers (full time or contractors) to work on
a specialized user event logging system.  Besides for the usual developer
hiring websites, what are good hiring sources for Flink developers?

Thanks!
- Dan


Re: Flink SQL - can I force the planner to reuse a temporary view to be reused across multiple queries that access different fields?

2020-10-02 Thread Dan Hill
Thanks, Timo and Piotr!

I figured out my issue.  I called env.disableOperatorChaining(); in my
developer mode.  Disabling operator chaining created the redundant joins.



On Mon, Sep 28, 2020 at 6:41 AM Timo Walther  wrote:

> Hi Dan,
>
> unfortunetely, it is very difficult to read you plan? Maybe you can
> share a higher resolution and highlight which part of the pipeline is A,
> B etc. In general, the planner should be smart enough to reuse subplans
> where appropriate. Maybe this is a bug or shortcoming in the optimizer
> rules that we can fix.
>
> Piotr's suggestion would work to "materialize" a part of the plan to
> DataStream API such that this part is a black box for the optimizer and
> read only once. Currently, there is no API for performing this in the
> Table API itself.
>
> Regards,
> Timo
>
> On 28.09.20 15:13, Piotr Nowojski wrote:
> > Hi Dan,
> >
> > Are we talking about Streaming SQL (from the presence of IntervalJoin
> > node I presume so)? Are you using blink planner?
> >
> > I'm not super familiar with the Flink SQL, but my best guess would be
> > that if you would "export" the view "A" as a DataStream, then
> > re-register it as a new table "A2" and use "A2" in your query, it could
> > do the trick. [1]
> > But I might be wrong or there might be a better way to do it (maybe
> > someone else can help here?).
> >
> > Piotrek
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#integration-with-datastream-and-dataset-api
> >
> > sob., 26 wrz 2020 o 00:02 Dan Hill  > <mailto:quietgol...@gmail.com>> napisał(a):
> >
> > I have a temporary views, A and B, and I want to output a union like
> > the following:
> > SELECT * FROM ((SELECT ... FROM A) UNION ALL (SELECT ... FROM B JOIN
> > A ...))
> >
> > Since the columns being requested in both parts of the union are
> > different, the planner appears to be separating these out.  A is
> > pretty complex so I want to reuse A.  Here's the graph for A.  A
> > bunch of extra join nodes are introduced.
> >
> > Just A.
> > Screen Shot 2020-09-22 at 11.14.07 PM.png
> >
> > How the planner currently handles the union.  It creates a bunch of
> > inefficient extra join nodes since the columns are slightly
> different.
> > Screen Shot 2020-09-23 at 12.24.59 PM.png
> >
>
>


Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-05 Thread Dan Hill
I'm writing a test for a batch job using MiniClusterResourceConfiguration.

Here's a simple description of my working test case:
1) I use TableEnvironment.executeSql(...) to create a source and sink table
using tmp filesystem directory.
2) I use executeSql to insert some test data into the source tabel.
3) I use executeSql to select from source and insert into sink.
4) I use executeSql from the same source to a different sink.

When I do these steps, it works.  If I remove step 4, no data gets written
to the sink.  My actual code is more complex than this (has create view,
join and more tables).  This is a simplified description but highlights the
weird error.

Has anyone hit issues like this?  I'm assuming I have a small code bug in
my queries that's causing issues.  These queries appear to work in
production so I'm confused.  Are there ways of viewing failed jobs or
queries with MiniClusterResourceConfiguration?

Thanks!
- Dan


Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-06 Thread Dan Hill
I've tried to enable additional logging for a few hours today.  I think
something with junit5 is swallowing the logs.  I'm using Bazel and junit5.
I setup MiniClusterResourceConfiguration using a custom extension.  Are
there any known issues with Flink and junit5?  I can try switching to
junit4.

When I've binary searched this issue, this failure happens if my query in
step 3 has a join it.  If I remove the join, I can remove step 4 and the
code still works.  I've renamed a bunch of my tables too and the problem
still exists.





On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek  wrote:

> Hi Dan,
>
> there were some bugs and quirks in the MiniCluster that we recently fixed:
>
>   - https://issues.apache.org/jira/browse/FLINK-19123
>   - https://issues.apache.org/jira/browse/FLINK-19264
>
> But I think they are probably unrelated to your case. Could you enable
> logging and see from the logs whether the 2) and 3) jobs execute
> correctly on the MiniCluster?
>
> Best,
> Aljoscha
>
> On 06.10.20 08:08, Dan Hill wrote:
> > I'm writing a test for a batch job using
> MiniClusterResourceConfiguration.
> >
> > Here's a simple description of my working test case:
> > 1) I use TableEnvironment.executeSql(...) to create a source and sink
> table
> > using tmp filesystem directory.
> > 2) I use executeSql to insert some test data into the source tabel.
> > 3) I use executeSql to select from source and insert into sink.
> > 4) I use executeSql from the same source to a different sink.
> >
> > When I do these steps, it works.  If I remove step 4, no data gets
> written
> > to the sink.  My actual code is more complex than this (has create view,
> > join and more tables).  This is a simplified description but highlights
> the
> > weird error.
> >
> > Has anyone hit issues like this?  I'm assuming I have a small code bug in
> > my queries that's causing issues.  These queries appear to work in
> > production so I'm confused.  Are there ways of viewing failed jobs or
> > queries with MiniClusterResourceConfiguration?
> >
> > Thanks!
> > - Dan
> >
>
>


Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-06 Thread Dan Hill
I don't think any of the gotchas apply to me (at the bottom of this link).
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource

I'm assuming for a batch job that I don't have to do anything for: "You can
implement a custom parallel source function for emitting watermarks if your
job uses event time timers."

On Tue, Oct 6, 2020 at 2:42 PM Dan Hill  wrote:

> I've tried to enable additional logging for a few hours today.  I think
> something with junit5 is swallowing the logs.  I'm using Bazel and junit5.
> I setup MiniClusterResourceConfiguration using a custom extension.  Are
> there any known issues with Flink and junit5?  I can try switching to
> junit4.
>
> When I've binary searched this issue, this failure happens if my query in
> step 3 has a join it.  If I remove the join, I can remove step 4 and the
> code still works.  I've renamed a bunch of my tables too and the problem
> still exists.
>
>
>
>
>
> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek  wrote:
>
>> Hi Dan,
>>
>> there were some bugs and quirks in the MiniCluster that we recently fixed:
>>
>>   - https://issues.apache.org/jira/browse/FLINK-19123
>>   - https://issues.apache.org/jira/browse/FLINK-19264
>>
>> But I think they are probably unrelated to your case. Could you enable
>> logging and see from the logs whether the 2) and 3) jobs execute
>> correctly on the MiniCluster?
>>
>> Best,
>> Aljoscha
>>
>> On 06.10.20 08:08, Dan Hill wrote:
>> > I'm writing a test for a batch job using
>> MiniClusterResourceConfiguration.
>> >
>> > Here's a simple description of my working test case:
>> > 1) I use TableEnvironment.executeSql(...) to create a source and sink
>> table
>> > using tmp filesystem directory.
>> > 2) I use executeSql to insert some test data into the source tabel.
>> > 3) I use executeSql to select from source and insert into sink.
>> > 4) I use executeSql from the same source to a different sink.
>> >
>> > When I do these steps, it works.  If I remove step 4, no data gets
>> written
>> > to the sink.  My actual code is more complex than this (has create view,
>> > join and more tables).  This is a simplified description but highlights
>> the
>> > weird error.
>> >
>> > Has anyone hit issues like this?  I'm assuming I have a small code bug
>> in
>> > my queries that's causing issues.  These queries appear to work in
>> > production so I'm confused.  Are there ways of viewing failed jobs or
>> > queries with MiniClusterResourceConfiguration?
>> >
>> > Thanks!
>> > - Dan
>> >
>>
>>


Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-06 Thread Dan Hill
Thanks!

Great to know.  I copied this junit5-jupiter-starter-bazel
<https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
rule
into my repository (I don't think junit5 is supported directly with
java_test yet).  I tried a few ways of bundling `log4j.properties` into the
jar and didn't get them to work.  My current iteration hacks the
log4j.properties file as an absolute path.  My failed attempts would spit
an error saying log4j.properties file was not found.  This route finds it
but the log properties are not used for the java logger.

Are there a better set of rules to use for junit5?

# build rule
java_junit5_test(
name = "tests",
srcs = glob(["*.java"]),
test_package = "ai.promoted.logprocessor.batch",
deps = [...],
jvm_flags =
["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
)

# log4j.properties
status = error
name = Log4j2PropertiesConfig
appenders = console
appender.console.type = Console
appender.console.name = LogToConsole
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
rootLogger.level = info
rootLogger.appenderRefs = stdout
rootLogger.appenderRef.stdout.ref = LogToConsole

On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Oops, this is actually the JOIN issue thread [1]. Guess I should revise my
> previous "haven't had issues" statement hah. Sorry for the spam!
>
> [1]:
> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>
> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Unless it's related to this issue[1], which was w/ my JOIN and time
>> characteristics, though not sure that applies for batch.
>>
>> Best,
>> Austin
>>
>> [1]:
>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>
>>
>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey Dan,
>>>
>>> We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
>>> haven’t had issues, though we’re only testing on streaming jobs.
>>>
>>> Happy to help setting up logging with that if you’d like.
>>>
>>> Best,
>>> Austin
>>>
>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill  wrote:
>>>
>>>> I don't think any of the gotchas apply to me (at the bottom of this
>>>> link).
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>>
>>>> I'm assuming for a batch job that I don't have to do anything for: "You
>>>> can implement a custom parallel source function for emitting watermarks if
>>>> your job uses event time timers."
>>>>
>>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill  wrote:
>>>>
>>>>> I've tried to enable additional logging for a few hours today.  I
>>>>> think something with junit5 is swallowing the logs.  I'm using Bazel and
>>>>> junit5.  I setup MiniClusterResourceConfiguration using a custom
>>>>> extension.  Are there any known issues with Flink and junit5?  I can try
>>>>> switching to junit4.
>>>>>
>>>>> When I've binary searched this issue, this failure happens if my query
>>>>> in step 3 has a join it.  If I remove the join, I can remove step 4 and 
>>>>> the
>>>>> code still works.  I've renamed a bunch of my tables too and the problem
>>>>> still exists.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek 
>>>>> wrote:
>>>>>
>>>>>> Hi Dan,
>>>>>>
>>>>>> there were some bugs and quirks in the MiniCluster that we recently
>>>>>> fixed:
>>>>>>
>>>>>>   - https://issues.apache.org/jira/browse/FLINK-19123
>>>>>>   - https://issues.apache.org/jira/browse/FLINK-19264
>>>>>>
>>>>>> But I think they are probably unrelated to your case. Could you
>>>>>> enable
>>>>>> logging and see from the logs whether the 2) and 3) jobs execute
>&g

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-06 Thread Dan Hill
I'm trying to use Table API for my job.  I'll soon try to get a test
working for my stream job.
- I'll parameterize so I can have different sources and sink for tests.
How should I mock out a Kafka source?  For my test, I was planning on
changing the input to be from a temp file (instead of Kafka).
- What's a good way of forcing a watermark using the Table API?


On Tue, Oct 6, 2020 at 3:35 PM Dan Hill  wrote:

> Thanks!
>
> Great to know.  I copied this junit5-jupiter-starter-bazel
> <https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
>  rule
> into my repository (I don't think junit5 is supported directly with
> java_test yet).  I tried a few ways of bundling `log4j.properties` into the
> jar and didn't get them to work.  My current iteration hacks the
> log4j.properties file as an absolute path.  My failed attempts would spit
> an error saying log4j.properties file was not found.  This route finds it
> but the log properties are not used for the java logger.
>
> Are there a better set of rules to use for junit5?
>
> # build rule
> java_junit5_test(
> name = "tests",
> srcs = glob(["*.java"]),
> test_package = "ai.promoted.logprocessor.batch",
> deps = [...],
> jvm_flags =
> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
> )
>
> # log4j.properties
> status = error
> name = Log4j2PropertiesConfig
> appenders = console
> appender.console.type = Console
> appender.console.name = LogToConsole
> appender.console.layout.type = PatternLayout
> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
> rootLogger.level = info
> rootLogger.appenderRefs = stdout
> rootLogger.appenderRef.stdout.ref = LogToConsole
>
> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Oops, this is actually the JOIN issue thread [1]. Guess I should revise
>> my previous "haven't had issues" statement hah. Sorry for the spam!
>>
>> [1]:
>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>>
>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Unless it's related to this issue[1], which was w/ my JOIN and time
>>> characteristics, though not sure that applies for batch.
>>>
>>> Best,
>>> Austin
>>>
>>> [1]:
>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html
>>>
>>>
>>> On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hey Dan,
>>>>
>>>> We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
>>>> haven’t had issues, though we’re only testing on streaming jobs.
>>>>
>>>> Happy to help setting up logging with that if you’d like.
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> On Tue, Oct 6, 2020 at 6:02 PM Dan Hill  wrote:
>>>>
>>>>> I don't think any of the gotchas apply to me (at the bottom of this
>>>>> link).
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource
>>>>>
>>>>> I'm assuming for a batch job that I don't have to do anything for:
>>>>> "You can implement a custom parallel source function for emitting
>>>>> watermarks if your job uses event time timers."
>>>>>
>>>>> On Tue, Oct 6, 2020 at 2:42 PM Dan Hill  wrote:
>>>>>
>>>>>> I've tried to enable additional logging for a few hours today.  I
>>>>>> think something with junit5 is swallowing the logs.  I'm using Bazel and
>>>>>> junit5.  I setup MiniClusterResourceConfiguration using a custom
>>>>>> extension.  Are there any known issues with Flink and junit5?  I can try
>>>>>> switching to junit4.
>>>>>>
>>>>>> When I've binary searched this issue, this failure happens if my
>>>>>> query in step 3 has a join it.  If I remove the join, I can remove step 4
>>>>>> and the code still works.  I've renamed a bunch of my tables too and the
>>>>>> problem still exists.
>&

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-06 Thread Dan Hill
@Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
reference.  However, the actual log calls are not printing to the console.
Only errors appear in my terminal window and the test logs.  Maybe console
logger does not work for this junit setup.  I'll see if the file version
works.

On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> What Aljoscha suggested is what works for us!
>
> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek 
> wrote:
>
>> Hi Dan,
>>
>> to make the log properties file work this should do it: assuming the
>> log4j.properties is in //src/main/resources. You will need a BUILD.bazel
>> in that directory that has only the line
>> "exports_files(["log4j.properties"]). Then you can reference it in your
>> test via "resources = ["//src/main/resources:log4j.properties"],". Of
>> course you also need to have the right log4j deps (or slf4j if you're
>> using that)
>>
>> Hope that helps!
>>
>> Aljoscha
>>
>> On 07.10.20 00:41, Dan Hill wrote:
>> > I'm trying to use Table API for my job.  I'll soon try to get a test
>> > working for my stream job.
>> > - I'll parameterize so I can have different sources and sink for tests.
>> > How should I mock out a Kafka source?  For my test, I was planning on
>> > changing the input to be from a temp file (instead of Kafka).
>> > - What's a good way of forcing a watermark using the Table API?
>> >
>> >
>> > On Tue, Oct 6, 2020 at 3:35 PM Dan Hill  wrote:
>> >
>> >> Thanks!
>> >>
>> >> Great to know.  I copied this junit5-jupiter-starter-bazel
>> >> <
>> https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
>> rule
>> >> into my repository (I don't think junit5 is supported directly with
>> >> java_test yet).  I tried a few ways of bundling `log4j.properties`
>> into the
>> >> jar and didn't get them to work.  My current iteration hacks the
>> >> log4j.properties file as an absolute path.  My failed attempts would
>> spit
>> >> an error saying log4j.properties file was not found.  This route finds
>> it
>> >> but the log properties are not used for the java logger.
>> >>
>> >> Are there a better set of rules to use for junit5?
>> >>
>> >> # build rule
>> >> java_junit5_test(
>> >>  name = "tests",
>> >>  srcs = glob(["*.java"]),
>> >>  test_package = "ai.promoted.logprocessor.batch",
>> >>  deps = [...],
>> >>  jvm_flags =
>> >>
>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>> >> )
>> >>
>> >> # log4j.properties
>> >> status = error
>> >> name = Log4j2PropertiesConfig
>> >> appenders = console
>> >> appender.console.type = Console
>> >> appender.console.name = LogToConsole
>> >> appender.console.layout.type = PatternLayout
>> >> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>> >> rootLogger.level = info
>> >> rootLogger.appenderRefs = stdout
>> >> rootLogger.appenderRef.stdout.ref = LogToConsole
>> >>
>> >> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>> >> austin.caw...@gmail.com> wrote:
>> >>
>> >>> Oops, this is actually the JOIN issue thread [1]. Guess I should
>> revise
>> >>> my previous "haven't had issues" statement hah. Sorry for the spam!
>> >>>
>> >>> [1]:
>> >>>
>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
>> >>>
>> >>> On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
>> >>> austin.caw...@gmail.com> wrote:
>> >>>
>> >>>> Unless it's related to this issue[1], which was w/ my JOIN and time
>> >>>> characteristics, though not sure that applies for batch.
>> >>>>
>> >>>> Best,
>> >>>> Austin
>> >>>>
>> >>>> [1]:
>> >>>>
>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-07 Thread Dan Hill
Switching to junit4 did not help.

If I make a request to the url returned from
MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
I get
{"errors":["Not found."]}.  I'm not sure if this is intentional.




On Tue, Oct 6, 2020 at 4:16 PM Dan Hill  wrote:

> @Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
> reference.  However, the actual log calls are not printing to the console.
> Only errors appear in my terminal window and the test logs.  Maybe console
> logger does not work for this junit setup.  I'll see if the file version
> works.
>
> On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> What Aljoscha suggested is what works for us!
>>
>> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> to make the log properties file work this should do it: assuming the
>>> log4j.properties is in //src/main/resources. You will need a BUILD.bazel
>>> in that directory that has only the line
>>> "exports_files(["log4j.properties"]). Then you can reference it in your
>>> test via "resources = ["//src/main/resources:log4j.properties"],". Of
>>> course you also need to have the right log4j deps (or slf4j if you're
>>> using that)
>>>
>>> Hope that helps!
>>>
>>> Aljoscha
>>>
>>> On 07.10.20 00:41, Dan Hill wrote:
>>> > I'm trying to use Table API for my job.  I'll soon try to get a test
>>> > working for my stream job.
>>> > - I'll parameterize so I can have different sources and sink for tests.
>>> > How should I mock out a Kafka source?  For my test, I was planning on
>>> > changing the input to be from a temp file (instead of Kafka).
>>> > - What's a good way of forcing a watermark using the Table API?
>>> >
>>> >
>>> > On Tue, Oct 6, 2020 at 3:35 PM Dan Hill  wrote:
>>> >
>>> >> Thanks!
>>> >>
>>> >> Great to know.  I copied this junit5-jupiter-starter-bazel
>>> >> <
>>> https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
>>> rule
>>> >> into my repository (I don't think junit5 is supported directly with
>>> >> java_test yet).  I tried a few ways of bundling `log4j.properties`
>>> into the
>>> >> jar and didn't get them to work.  My current iteration hacks the
>>> >> log4j.properties file as an absolute path.  My failed attempts would
>>> spit
>>> >> an error saying log4j.properties file was not found.  This route
>>> finds it
>>> >> but the log properties are not used for the java logger.
>>> >>
>>> >> Are there a better set of rules to use for junit5?
>>> >>
>>> >> # build rule
>>> >> java_junit5_test(
>>> >>  name = "tests",
>>> >>  srcs = glob(["*.java"]),
>>> >>  test_package = "ai.promoted.logprocessor.batch",
>>> >>  deps = [...],
>>> >>  jvm_flags =
>>> >>
>>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>>> >> )
>>> >>
>>> >> # log4j.properties
>>> >> status = error
>>> >> name = Log4j2PropertiesConfig
>>> >> appenders = console
>>> >> appender.console.type = Console
>>> >> appender.console.name = LogToConsole
>>> >> appender.console.layout.type = PatternLayout
>>> >> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
>>> >> rootLogger.level = info
>>> >> rootLogger.appenderRefs = stdout
>>> >> rootLogger.appenderRef.stdout.ref = LogToConsole
>>> >>
>>> >> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
>>> >> austin.caw...@gmail.com> wrote:
>>> >>
>>> >>> Oops, this is actually the JOIN issue thread [1]. Guess I should
>>> revise
>>> >>> my previous "haven't had issues" statement hah. Sorry for the spam!
>>> >>>
>>> >>> [1]:
>>> >>>
>>> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-07 Thread Dan Hill
I was able to get finer grained logs showing.  I switched from
-Dlog4j.configuration to -Dlog4j.configurationFile and it worked.  With my
larger test case, I was hitting a silent log4j error.  When I created a
small test case to just test logging, I received a log4j error.

Here is a tar
<https://drive.google.com/file/d/1b6vJR_hfaRZwA28jKNlUBxDso7YiTIbk/view?usp=sharing>
with the info logs for:
- (test-nojoin.log) this one works as expected
- (test-join.log) this does not work as expected

I don't see an obvious issue just by scanning the logs.  I'll take a deeper
in 9 hours.




On Wed, Oct 7, 2020 at 8:28 PM Dan Hill  wrote:

> Switching to junit4 did not help.
>
> If I make a request to the url returned from
> MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
> I get
> {"errors":["Not found."]}.  I'm not sure if this is intentional.
>
>
>
>
> On Tue, Oct 6, 2020 at 4:16 PM Dan Hill  wrote:
>
>> @Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
>> reference.  However, the actual log calls are not printing to the console.
>> Only errors appear in my terminal window and the test logs.  Maybe console
>> logger does not work for this junit setup.  I'll see if the file version
>> works.
>>
>> On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> What Aljoscha suggested is what works for us!
>>>
>>> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek 
>>> wrote:
>>>
>>>> Hi Dan,
>>>>
>>>> to make the log properties file work this should do it: assuming the
>>>> log4j.properties is in //src/main/resources. You will need a
>>>> BUILD.bazel
>>>> in that directory that has only the line
>>>> "exports_files(["log4j.properties"]). Then you can reference it in your
>>>> test via "resources = ["//src/main/resources:log4j.properties"],". Of
>>>> course you also need to have the right log4j deps (or slf4j if you're
>>>> using that)
>>>>
>>>> Hope that helps!
>>>>
>>>> Aljoscha
>>>>
>>>> On 07.10.20 00:41, Dan Hill wrote:
>>>> > I'm trying to use Table API for my job.  I'll soon try to get a test
>>>> > working for my stream job.
>>>> > - I'll parameterize so I can have different sources and sink for
>>>> tests.
>>>> > How should I mock out a Kafka source?  For my test, I was planning on
>>>> > changing the input to be from a temp file (instead of Kafka).
>>>> > - What's a good way of forcing a watermark using the Table API?
>>>> >
>>>> >
>>>> > On Tue, Oct 6, 2020 at 3:35 PM Dan Hill 
>>>> wrote:
>>>> >
>>>> >> Thanks!
>>>> >>
>>>> >> Great to know.  I copied this junit5-jupiter-starter-bazel
>>>> >> <
>>>> https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
>>>> rule
>>>> >> into my repository (I don't think junit5 is supported directly with
>>>> >> java_test yet).  I tried a few ways of bundling `log4j.properties`
>>>> into the
>>>> >> jar and didn't get them to work.  My current iteration hacks the
>>>> >> log4j.properties file as an absolute path.  My failed attempts would
>>>> spit
>>>> >> an error saying log4j.properties file was not found.  This route
>>>> finds it
>>>> >> but the log properties are not used for the java logger.
>>>> >>
>>>> >> Are there a better set of rules to use for junit5?
>>>> >>
>>>> >> # build rule
>>>> >> java_junit5_test(
>>>> >>  name = "tests",
>>>> >>  srcs = glob(["*.java"]),
>>>> >>  test_package = "ai.promoted.logprocessor.batch",
>>>> >>  deps = [...],
>>>> >>  jvm_flags =
>>>> >>
>>>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>>>> >> )
>>>> >>
>>>> >> # log4j.properties
>>>> >> status = error
>>>> >> name = Log4j2PropertiesConfig
>>>> >> appenders = console
>>>

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-08 Thread Dan Hill
I figured out the issue.  The join caused part of the job's execution to be
delayed.  I added my own hacky wait condition into the test to make sure
the join job finishes first and it's fine.

What common test utilities exist for Flink?  I found
flink/flink-test-utils-parent.  I implemented a simple sleep loop to wait
for jobs to finish.  I'm guessing this can be done with one of the other
utilities.

Are there any open source test examples?

How are watermarks usually sent with Table API in tests?

After I collect some answers, I'm fine updating the Flink testing page.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs

On Thu, Oct 8, 2020 at 8:52 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Can't comment on the SQL issues, but here's our exact setup for Bazel and
> Junit5 w/ the resource files approach:
> https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit
>
> Best,
> Austin
>
> On Thu, Oct 8, 2020 at 2:41 AM Dan Hill  wrote:
>
>> I was able to get finer grained logs showing.  I switched from
>> -Dlog4j.configuration to -Dlog4j.configurationFile and it worked.  With my
>> larger test case, I was hitting a silent log4j error.  When I created a
>> small test case to just test logging, I received a log4j error.
>>
>> Here is a tar
>> <https://drive.google.com/file/d/1b6vJR_hfaRZwA28jKNlUBxDso7YiTIbk/view?usp=sharing>
>> with the info logs for:
>> - (test-nojoin.log) this one works as expected
>> - (test-join.log) this does not work as expected
>>
>> I don't see an obvious issue just by scanning the logs.  I'll take a
>> deeper in 9 hours.
>>
>>
>>
>>
>> On Wed, Oct 7, 2020 at 8:28 PM Dan Hill  wrote:
>>
>>> Switching to junit4 did not help.
>>>
>>> If I make a request to the url returned from
>>> MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
>>> I get
>>> {"errors":["Not found."]}.  I'm not sure if this is intentional.
>>>
>>>
>>>
>>>
>>> On Tue, Oct 6, 2020 at 4:16 PM Dan Hill  wrote:
>>>
>>>> @Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
>>>> reference.  However, the actual log calls are not printing to the console.
>>>> Only errors appear in my terminal window and the test logs.  Maybe console
>>>> logger does not work for this junit setup.  I'll see if the file version
>>>> works.
>>>>
>>>> On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
>>>> austin.caw...@gmail.com> wrote:
>>>>
>>>>> What Aljoscha suggested is what works for us!
>>>>>
>>>>> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek 
>>>>> wrote:
>>>>>
>>>>>> Hi Dan,
>>>>>>
>>>>>> to make the log properties file work this should do it: assuming the
>>>>>> log4j.properties is in //src/main/resources. You will need a
>>>>>> BUILD.bazel
>>>>>> in that directory that has only the line
>>>>>> "exports_files(["log4j.properties"]). Then you can reference it in
>>>>>> your
>>>>>> test via "resources = ["//src/main/resources:log4j.properties"],". Of
>>>>>> course you also need to have the right log4j deps (or slf4j if you're
>>>>>> using that)
>>>>>>
>>>>>> Hope that helps!
>>>>>>
>>>>>> Aljoscha
>>>>>>
>>>>>> On 07.10.20 00:41, Dan Hill wrote:
>>>>>> > I'm trying to use Table API for my job.  I'll soon try to get a test
>>>>>> > working for my stream job.
>>>>>> > - I'll parameterize so I can have different sources and sink for
>>>>>> tests.
>>>>>> > How should I mock out a Kafka source?  For my test, I was planning
>>>>>> on
>>>>>> > changing the input to be from a temp file (instead of Kafka).
>>>>>> > - What's a good way of forcing a watermark using the Table API?
>>>>>> >
>>>>>> >
>>>>>> > On Tue, Oct 6, 2020 at 3:35 PM Dan Hill 
>>>>>> wrote:
>>>>>> >
>>>>>> >> Thanks!
>>>>>> >>
>>>>&g

Any testing issues when using StreamTableEnvironment.createTemporaryView?

2020-10-08 Thread Dan Hill
*Summary*
I'm hitting an error when running a test that is related to using
createTemporaryView to convert a Protobuf input stream to Flink Table API.
I'm not sure how to debug "SourceConversion$5.processElement(Unknown
Source)" line.  Is this generated code?  How can I debug this?

Any help would be appreciated.  Thanks! - Dan

*Details*
My current input is a protocol buffer stream.  I convert it to the Table
API spec using createTemporaryView.  The code is hacky.  I want to get some
tests implemented before cleaning it up.

KeyedStream batchLogStream =
env.fromElements(BatchLog.class, new
LogGenerator.BatchLogIterator().next())
.keyBy((logRequest) -> logRequest.getUserId());

tableEnv.createTemporaryView(
"input_user",
batchLogStream.flatMap(new ToUsers()),
$("userId"),
$("timeEpochMillis"),
$("userTime").rowtime());

This appears to work in my prototype (maybe serialization is broken).  In a
Flink test, I hit the following error.

org.apache.flink.runtime.taskmanager.Task: Flat Map -> Map ->
SourceConversion(table=[default.mydb.input_user], fields=[userId,
timeEpochMillis, userTime]) -> Calc(select=[userId, timeEpochMillis]) ->
StreamingFileWriter (2/7) (ae67114dd4175c6fd87063f73706c8ec) switched from
RUNNING to FAILED. java.lang.NullPointerException
at SourceConversion$5.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at
ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:18)
at
ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:11)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:834)


I wasn't able to find this exact stacktrace when looking on Google.


Re: Any testing issues when using StreamTableEnvironment.createTemporaryView?

2020-10-09 Thread Dan Hill
I figured out my issue.  I needed to assign watermarks (e.g.
assignTimestampsAndWatermarks) after the fromElements.  I could not figure
out how the auto-generated code worked.  I hooked up a debugger and guessed
at the issue.

On Thu, Oct 8, 2020 at 11:09 PM Dan Hill  wrote:

> *Summary*
> I'm hitting an error when running a test that is related to using
> createTemporaryView to convert a Protobuf input stream to Flink Table API.
> I'm not sure how to debug "SourceConversion$5.processElement(Unknown
> Source)" line.  Is this generated code?  How can I debug this?
>
> Any help would be appreciated.  Thanks! - Dan
>
> *Details*
> My current input is a protocol buffer stream.  I convert it to the Table
> API spec using createTemporaryView.  The code is hacky.  I want to get some
> tests implemented before cleaning it up.
>
> KeyedStream batchLogStream =
> env.fromElements(BatchLog.class, new
> LogGenerator.BatchLogIterator().next())
> .keyBy((logRequest) -> logRequest.getUserId());
>
> tableEnv.createTemporaryView(
> "input_user",
> batchLogStream.flatMap(new ToUsers()),
> $("userId"),
> $("timeEpochMillis"),
> $("userTime").rowtime());
>
> This appears to work in my prototype (maybe serialization is broken).  In
> a Flink test, I hit the following error.
>
> org.apache.flink.runtime.taskmanager.Task: Flat Map -> Map ->
> SourceConversion(table=[default.mydb.input_user], fields=[userId,
> timeEpochMillis, userTime]) -> Calc(select=[userId, timeEpochMillis]) ->
> StreamingFileWriter (2/7) (ae67114dd4175c6fd87063f73706c8ec) switched from
> RUNNING to FAILED. java.lang.NullPointerException
> at SourceConversion$5.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> at
> ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:18)
> at
> ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:11)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.base/java.lang.Thread.run(Thread.java:834)
>
>
> I wasn't able to find this exact stacktrace when looking on Google.
>


Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-10 Thread Dan Hill
No, thanks!  I used JobClient to getJobStatus and sleep if it was not
terminal.  I'll switch to this.


On Sat, Oct 10, 2020 at 12:50 AM Aljoscha Krettek 
wrote:

> Hi Dan,
>
> did you try using the JobClient you can get from the TableResult to wait
> for job completion? You can get a CompletableFuture for the JobResult
> which should help you.
>
> Best,
> Aljoscha
>
> On 08.10.20 23:55, Dan Hill wrote:
> > I figured out the issue.  The join caused part of the job's execution to
> be
> > delayed.  I added my own hacky wait condition into the test to make sure
> > the join job finishes first and it's fine.
> >
> > What common test utilities exist for Flink?  I found
> > flink/flink-test-utils-parent.  I implemented a simple sleep loop to wait
> > for jobs to finish.  I'm guessing this can be done with one of the other
> > utilities.
> >
> > Are there any open source test examples?
> >
> > How are watermarks usually sent with Table API in tests?
> >
> > After I collect some answers, I'm fine updating the Flink testing page.
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs
> >
> > On Thu, Oct 8, 2020 at 8:52 AM Austin Cawley-Edwards <
> > austin.caw...@gmail.com> wrote:
> >
> >> Can't comment on the SQL issues, but here's our exact setup for Bazel
> and
> >> Junit5 w/ the resource files approach:
> >>
> https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit
> >>
> >> Best,
> >> Austin
> >>
> >> On Thu, Oct 8, 2020 at 2:41 AM Dan Hill  wrote:
> >>
> >>> I was able to get finer grained logs showing.  I switched from
> >>> -Dlog4j.configuration to -Dlog4j.configurationFile and it worked.
> With my
> >>> larger test case, I was hitting a silent log4j error.  When I created a
> >>> small test case to just test logging, I received a log4j error.
> >>>
> >>> Here is a tar
> >>> <
> https://drive.google.com/file/d/1b6vJR_hfaRZwA28jKNlUBxDso7YiTIbk/view?usp=sharing
> >
> >>> with the info logs for:
> >>> - (test-nojoin.log) this one works as expected
> >>> - (test-join.log) this does not work as expected
> >>>
> >>> I don't see an obvious issue just by scanning the logs.  I'll take a
> >>> deeper in 9 hours.
> >>>
> >>>
> >>>
> >>>
> >>> On Wed, Oct 7, 2020 at 8:28 PM Dan Hill  wrote:
> >>>
> >>>> Switching to junit4 did not help.
> >>>>
> >>>> If I make a request to the url returned from
> >>>>
> MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
> >>>> I get
> >>>> {"errors":["Not found."]}.  I'm not sure if this is intentional.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Tue, Oct 6, 2020 at 4:16 PM Dan Hill 
> wrote:
> >>>>
> >>>>> @Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
> >>>>> reference.  However, the actual log calls are not printing to the
> console.
> >>>>> Only errors appear in my terminal window and the test logs.  Maybe
> console
> >>>>> logger does not work for this junit setup.  I'll see if the file
> version
> >>>>> works.
> >>>>>
> >>>>> On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
> >>>>> austin.caw...@gmail.com> wrote:
> >>>>>
> >>>>>> What Aljoscha suggested is what works for us!
> >>>>>>
> >>>>>> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek <
> aljos...@apache.org>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Dan,
> >>>>>>>
> >>>>>>> to make the log properties file work this should do it: assuming
> the
> >>>>>>> log4j.properties is in //src/main/resources. You will need a
> >>>>>>> BUILD.bazel
> >>>>>>> in that directory that has only the line
> >>>>>>> "exports_files(["log4j.properties"]). Then you can reference it in
> >>>>>>> your
> >>>>>>> test via "resources = ["//src/main/reso

Table SQL and updating rows

2020-10-15 Thread Dan Hill
*Context*
I'm working on a user event logging system using Flink.   I'm tracking some
info that belongs to the user's current session (e.g. location, device,
experiment info).  I don't have a v1 requirement to support mutability but
I want to plan for it.  I think the most likely reason for mutation will be
that different parts of the system update session info (e.g. location might
not be accessible until later if the user does not give permission for it
initially).

*Ideas*
1) Introduce an aggregation step.  E.g. key by sessionId and run a
TableAggregateFunction.
2) Split the session into smaller parts that can only be inserted once.  Do
a join to create a combined session.

How have other people solved this with Flink Table SQL?  If I was using
DataStream directly, I'd group by and have a map function that uses key
state to keep track of the latest values.


Flink Kubernetes / Helm

2020-10-16 Thread Dan Hill
What libraries do people use for running Flink on Kubernetes?

Some links I've found:

   - Flink official documentation
   

   - Ververica documentation
   
   - https://github.com/lightbend/flink-operator
   - https://github.com/riskfocus/helm-charts-public
   - https://github.com/docker-flink/examples
   - different K8 proposal
   



Trying to run Flink tests

2020-10-20 Thread Dan Hill
I forked Flink to work on a PR.  When I run `mvn clean package` from a
clean branch, Maven says the runtime tests failed but the logs do not
appear to have details on the failure.  Do I have to do anything to run
these?



...

[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.24
s - in org.apache.flink.runtime.taskexecutor.BackPressureSampleServiceTest

[INFO] Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
1.872 s - in org.apache.flink.runtime.taskexecutor.partition.
PartitionTableTest

[DEBUG] Forking command line: /bin/sh -c cd
/Users/quietgolfer/code/dan-flink/flink/flink-runtime &&
/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/jre/bin/java
-Xms256m -Xmx2048m -Dmvn.forkNumber=9 -XX:+UseG1GC -jar
/Users/quietgolfer/code/dan-flink/flink/flink-runtime/target/surefire/surefirebooter3345042301183877750.jar
/Users/quietgolfer/code/dan-flink/flink/flink-runtime/target/surefire
2020-10-19T23-54-59_239-jvmRun9 surefire7884081050263655575tmp
surefire_7142433009722615751420tmp

[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2 s
- in org.apache.flink.runtime.taskexecutor.slot.TaskSlotTest

[INFO] Running org.apache.flink.runtime.taskexecutor.
NettyShuffleEnvironmentConfigurationTest

[INFO] Running org.apache.flink.runtime.taskexecutor.slot.
TaskSlotTableImplTest

[INFO] Running org.apache.flink.runtime.taskexecutor.
TaskExecutorToResourceManagerConnectionTest

[INFO] Running org.apache.flink.runtime.taskexecutor.slot.TimerServiceTest

[INFO] Running akka.actor.RobustActorSystemTest

[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
2.077 s - in org.apache.flink.runtime.taskexecutor.
NettyShuffleEnvironmentConfigurationTest

[INFO] Tests run: 15, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
3.009 s - in org.apache.flink.runtime.taskexecutor.slot.
TaskSlotTableImplTest

[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
4.088 s - in org.apache.flink.runtime.taskexecutor.slot.TimerServiceTest

[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
4.986 s - in org.apache.flink.runtime.taskexecutor.
TaskExecutorToResourceManagerConnectionTest

[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
5.146 s - in akka.actor.RobustActorSystemTest

[INFO] Tests run: 27, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
11.309 s - in org.apache.flink.runtime.taskexecutor.TaskExecutorTest

[INFO] Tests run: 7, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
11.068 s - in org.apache.flink.runtime.taskexecutor.TaskManagerRunnerTest

[INFO]

[INFO] Results:

[INFO]

[WARNING] Tests run: 4813, Failures: 0, Errors: 0, Skipped: 50

[INFO]

[INFO]


[INFO] Reactor Summary for Flink : 1.12-SNAPSHOT:

[INFO]

[INFO] Flink : Tools : Force Shading .. SUCCESS [  1.736
s]

[INFO] Flink :  SUCCESS [
15.601 s]

[INFO] Flink : Annotations  SUCCESS [
14.502 s]

[INFO] Flink : Test utils : ... SUCCESS [
12.505 s]

[INFO] Flink : Test utils : Junit . SUCCESS [
15.152 s]

[INFO] Flink : Metrics : .. SUCCESS [
12.443 s]

[INFO] Flink : Metrics : Core . SUCCESS [
14.250 s]

[INFO] Flink : Core ... SUCCESS [01:23
min]

[INFO] Flink : Java ... SUCCESS [
43.248 s]

[INFO] Flink : Queryable state : .. SUCCESS [
10.139 s]

[INFO] Flink : Queryable state : Client Java .. SUCCESS [
16.857 s]

[INFO] Flink : FileSystems : .. SUCCESS [
13.203 s]

[INFO] Flink : FileSystems : Hadoop FS  SUCCESS [
25.514 s]

[INFO] Flink : Runtime  FAILURE [11:05
min]

[INFO] Flink : Scala .. SKIPPED


Re: Trying to run Flink tests

2020-10-20 Thread Dan Hill
Hi Xintong!

No changes.  I tried -X and no additional log information is logged.
-DfailIfNoTests=false does not help.  `-DskipTests` works fine.  I'm going
to go ahead and create a PR and see if it fails.

Thanks!
- Dan

On Tue, Oct 20, 2020 at 8:22 AM Xintong Song  wrote:

> Hi Dan,
>
> The 'mvn package' command automatically includes 'mvn verify', which
> triggers the test cases. You can skip the tests with 'mvn package
> -DskipTests'. You can rely on the ci-tests running on Azure Pipeline,
> either in your own workspace or in the PR.
>
> If it is intended to execute the tests locally, you can try the following
> actions. I'm not sure whether that helps though.
> - Try to add '-DfailIfNoTests=false' to your maven command.
> - Execute the maven command with '-X' to print all the debug logs.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Oct 20, 2020 at 3:48 PM Dan Hill  wrote:
>
>> I forked Flink to work on a PR.  When I run `mvn clean package` from a
>> clean branch, Maven says the runtime tests failed but the logs do not
>> appear to have details on the failure.  Do I have to do anything to run
>> these?
>>
>>
>>
>> ...
>>
>> [INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>> 2.24 s - in org.apache.flink.runtime.taskexecutor.
>> BackPressureSampleServiceTest
>>
>> [INFO] Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>> 1.872 s - in org.apache.flink.runtime.taskexecutor.partition.
>> PartitionTableTest
>>
>> [DEBUG] Forking command line: /bin/sh -c cd
>> /Users/quietgolfer/code/dan-flink/flink/flink-runtime &&
>> /Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/jre/bin/java
>> -Xms256m -Xmx2048m -Dmvn.forkNumber=9 -XX:+UseG1GC -jar
>> /Users/quietgolfer/code/dan-flink/flink/flink-runtime/target/surefire/surefirebooter3345042301183877750.jar
>> /Users/quietgolfer/code/dan-flink/flink/flink-runtime/target/surefire
>> 2020-10-19T23-54-59_239-jvmRun9 surefire7884081050263655575tmp
>> surefire_7142433009722615751420tmp
>>
>> [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2
>> s - in org.apache.flink.runtime.taskexecutor.slot.TaskSlotTest
>>
>> [INFO] Running org.apache.flink.runtime.taskexecutor.
>> NettyShuffleEnvironmentConfigurationTest
>>
>> [INFO] Running org.apache.flink.runtime.taskexecutor.slot.
>> TaskSlotTableImplTest
>>
>> [INFO] Running org.apache.flink.runtime.taskexecutor.
>> TaskExecutorToResourceManagerConnectionTest
>>
>> [INFO] Running org.apache.flink.runtime.taskexecutor.slot.
>> TimerServiceTest
>>
>> [INFO] Running akka.actor.RobustActorSystemTest
>>
>> [INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>> 2.077 s - in org.apache.flink.runtime.taskexecutor.
>> NettyShuffleEnvironmentConfigurationTest
>>
>> [INFO] Tests run: 15, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>> 3.009 s - in org.apache.flink.runtime.taskexecutor.slot.
>> TaskSlotTableImplTest
>>
>> [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>> 4.088 s - in org.apache.flink.runtime.taskexecutor.slot.TimerServiceTest
>>
>> [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>> 4.986 s - in org.apache.flink.runtime.taskexecutor.
>> TaskExecutorToResourceManagerConnectionTest
>>
>> [INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>> 5.146 s - in akka.actor.RobustActorSystemTest
>>
>> [INFO] Tests run: 27, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>> 11.309 s - in org.apache.flink.runtime.taskexecutor.TaskExecutorTest
>>
>> [INFO] Tests run: 7, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>> 11.068 s - in org.apache.flink.runtime.taskexecutor.TaskManagerRunnerTest
>>
>> [INFO]
>>
>> [INFO] Results:
>>
>> [INFO]
>>
>> [WARNING] Tests run: 4813, Failures: 0, Errors: 0, Skipped: 50
>>
>> [INFO]
>>
>> [INFO]
>> 
>>
>> [INFO] Reactor Summary for Flink : 1.12-SNAPSHOT:
>>
>> [INFO]
>>
>> [INFO] Flink : Tools : Force Shading .. SUCCESS [  1.736
>> s]
>>
>> [INFO] Flink :  SUCCESS [
>> 15.601 s]
>>
>> [INFO] Flink : Annotations  SUCCESS [
>> 14.502 s]
>>
>> [INFO] Flink : Test utils : ..

Re: Trying to run Flink tests

2020-10-21 Thread Dan Hill
Sure, here's a link
<https://drive.google.com/file/d/13Q7h77zG-2vp7gJOke8QAzLtKLKIPuTf/view?usp=sharing>
to
the output.  I think for this one I used either:
- `mvn package -e -X -DfailIfNoTests=false`
- or added a `clean` before package.

On Wed, Oct 21, 2020 at 2:24 AM Xintong Song  wrote:

> Would you be able to share the complete maven logs and the command? And
> what is the maven version?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Oct 21, 2020 at 1:37 AM Dan Hill  wrote:
>
>> Hi Xintong!
>>
>> No changes.  I tried -X and no additional log information is logged.
>> -DfailIfNoTests=false does not help.  `-DskipTests` works fine.  I'm going
>> to go ahead and create a PR and see if it fails.
>>
>> Thanks!
>> - Dan
>>
>> On Tue, Oct 20, 2020 at 8:22 AM Xintong Song 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> The 'mvn package' command automatically includes 'mvn verify', which
>>> triggers the test cases. You can skip the tests with 'mvn package
>>> -DskipTests'. You can rely on the ci-tests running on Azure Pipeline,
>>> either in your own workspace or in the PR.
>>>
>>> If it is intended to execute the tests locally, you can try the
>>> following actions. I'm not sure whether that helps though.
>>> - Try to add '-DfailIfNoTests=false' to your maven command.
>>> - Execute the maven command with '-X' to print all the debug logs.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Tue, Oct 20, 2020 at 3:48 PM Dan Hill  wrote:
>>>
>>>> I forked Flink to work on a PR.  When I run `mvn clean package` from a
>>>> clean branch, Maven says the runtime tests failed but the logs do not
>>>> appear to have details on the failure.  Do I have to do anything to run
>>>> these?
>>>>
>>>>
>>>>
>>>> ...
>>>>
>>>> [INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>>>> 2.24 s - in org.apache.flink.runtime.taskexecutor.
>>>> BackPressureSampleServiceTest
>>>>
>>>> [INFO] Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>>>> 1.872 s - in org.apache.flink.runtime.taskexecutor.partition.
>>>> PartitionTableTest
>>>>
>>>> [DEBUG] Forking command line: /bin/sh -c cd
>>>> /Users/quietgolfer/code/dan-flink/flink/flink-runtime &&
>>>> /Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/jre/bin/java
>>>> -Xms256m -Xmx2048m -Dmvn.forkNumber=9 -XX:+UseG1GC -jar
>>>> /Users/quietgolfer/code/dan-flink/flink/flink-runtime/target/surefire/surefirebooter3345042301183877750.jar
>>>> /Users/quietgolfer/code/dan-flink/flink/flink-runtime/target/surefire
>>>> 2020-10-19T23-54-59_239-jvmRun9 surefire7884081050263655575tmp
>>>> surefire_7142433009722615751420tmp
>>>>
>>>> [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>>>> 2 s - in org.apache.flink.runtime.taskexecutor.slot.TaskSlotTest
>>>>
>>>> [INFO] Running org.apache.flink.runtime.taskexecutor.
>>>> NettyShuffleEnvironmentConfigurationTest
>>>>
>>>> [INFO] Running org.apache.flink.runtime.taskexecutor.slot.
>>>> TaskSlotTableImplTest
>>>>
>>>> [INFO] Running org.apache.flink.runtime.taskexecutor.
>>>> TaskExecutorToResourceManagerConnectionTest
>>>>
>>>> [INFO] Running org.apache.flink.runtime.taskexecutor.slot.
>>>> TimerServiceTest
>>>>
>>>> [INFO] Running akka.actor.RobustActorSystemTest
>>>>
>>>> [INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>>>> 2.077 s - in org.apache.flink.runtime.taskexecutor.
>>>> NettyShuffleEnvironmentConfigurationTest
>>>>
>>>> [INFO] Tests run: 15, Failures: 0, Errors: 0, Skipped: 0, Time
>>>> elapsed: 3.009 s - in org.apache.flink.runtime.taskexecutor.slot.
>>>> TaskSlotTableImplTest
>>>>
>>>> [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>>>> 4.088 s - in org.apache.flink.runtime.taskexecutor.slot.
>>>> TimerServiceTest
>>>>
>>>> [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
>>>> 4.986 s - in org.apache.flink.runtime.taskexecutor.
>>>>

Re: Trying to run Flink tests

2020-10-21 Thread Dan Hill
1) I don't see anything useful in it
<https://drive.google.com/file/d/12q0m2L2YTueTRszc2pNN8GKzwGv-Andy/view?usp=sharing>
.
2) This PR <https://github.com/apache/flink/pull/13711>.

Thanks for replying, Xintong!

On Wed, Oct 21, 2020 at 7:11 PM Xintong Song  wrote:

> Hi Dan,
>
> It looks like while your tests are executed and passed, the java processes
> executing those tests did not exit properly.
> - Could you try execute the command manually and see if there's any useful
> outputs? You can find the commands by searching "Command was" in the maven
> logs.
> - Quick question: which PR are you working on? By any chance you called
> `System.exit()` in your codes?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Oct 22, 2020 at 5:59 AM Dan Hill  wrote:
>
>> Sure, here's a link
>> <https://drive.google.com/file/d/13Q7h77zG-2vp7gJOke8QAzLtKLKIPuTf/view?usp=sharing>
>>  to
>> the output.  I think for this one I used either:
>> - `mvn package -e -X -DfailIfNoTests=false`
>> - or added a `clean` before package.
>>
>> On Wed, Oct 21, 2020 at 2:24 AM Xintong Song 
>> wrote:
>>
>>> Would you be able to share the complete maven logs and the command? And
>>> what is the maven version?
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Wed, Oct 21, 2020 at 1:37 AM Dan Hill  wrote:
>>>
>>>> Hi Xintong!
>>>>
>>>> No changes.  I tried -X and no additional log information is logged.
>>>> -DfailIfNoTests=false does not help.  `-DskipTests` works fine.  I'm going
>>>> to go ahead and create a PR and see if it fails.
>>>>
>>>> Thanks!
>>>> - Dan
>>>>
>>>> On Tue, Oct 20, 2020 at 8:22 AM Xintong Song 
>>>> wrote:
>>>>
>>>>> Hi Dan,
>>>>>
>>>>> The 'mvn package' command automatically includes 'mvn verify', which
>>>>> triggers the test cases. You can skip the tests with 'mvn package
>>>>> -DskipTests'. You can rely on the ci-tests running on Azure Pipeline,
>>>>> either in your own workspace or in the PR.
>>>>>
>>>>> If it is intended to execute the tests locally, you can try the
>>>>> following actions. I'm not sure whether that helps though.
>>>>> - Try to add '-DfailIfNoTests=false' to your maven command.
>>>>> - Execute the maven command with '-X' to print all the debug logs.
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 20, 2020 at 3:48 PM Dan Hill 
>>>>> wrote:
>>>>>
>>>>>> I forked Flink to work on a PR.  When I run `mvn clean package` from
>>>>>> a clean branch, Maven says the runtime tests failed but the logs do not
>>>>>> appear to have details on the failure.  Do I have to do anything to run
>>>>>> these?
>>>>>>
>>>>>>
>>>>>>
>>>>>> ...
>>>>>>
>>>>>> [INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time
>>>>>> elapsed: 2.24 s - in org.apache.flink.runtime.taskexecutor.
>>>>>> BackPressureSampleServiceTest
>>>>>>
>>>>>> [INFO] Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time
>>>>>> elapsed: 1.872 s - in org.apache.flink.runtime.taskexecutor.partition.
>>>>>> PartitionTableTest
>>>>>>
>>>>>> [DEBUG] Forking command line: /bin/sh -c cd
>>>>>> /Users/quietgolfer/code/dan-flink/flink/flink-runtime &&
>>>>>> /Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/jre/bin/java
>>>>>> -Xms256m -Xmx2048m -Dmvn.forkNumber=9 -XX:+UseG1GC -jar
>>>>>> /Users/quietgolfer/code/dan-flink/flink/flink-runtime/target/surefire/surefirebooter3345042301183877750.jar
>>>>>> /Users/quietgolfer/code/dan-flink/flink/flink-runtime/target/surefire
>>>>>> 2020-10-19T23-54-59_239-jvmRun9 surefire7884081050263655575tmp
>>>>>> surefire_7142433009722615751420tmp
>>>>>>
>>>>>> [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time
>>>>>> elapsed: 2 s - in org.apache.flink.runtime.taskexecutor.slot.
>>>

Flink Table SQL and MongoDB connector?

2020-10-21 Thread Dan Hill
Has anyone connected these two?

Looking through previous emails and the Flink docs, I've see two mentions
of how to hook up MongoDB to Flink.

1) https://github.com/okkam-it/flink-mongodb-test
2) Debezium->Kafka->Flink
https://debezium.io/documentation/reference/1.3/connectors/mongodb.html

The debezium route has more setup and maintenance work but scales better
and probably has an easier hook for Table SQL (since it's Kafka).


Re: Trying to run Flink tests

2020-10-23 Thread Dan Hill
unning ci tests on AZP for PRs. Your maven log suggests the maven version
> on your machine is 3.6.3.
> I'm not sure whether the maven version is related, but maybe you can try
> it out with 3.2.5. And if it turns out worked, we may fire a issue at the
> Apache Maven community.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Oct 22, 2020 at 12:31 PM Dan Hill  wrote:
>
>> 1) I don't see anything useful in it
>> <https://drive.google.com/file/d/12q0m2L2YTueTRszc2pNN8GKzwGv-Andy/view?usp=sharing>
>> .
>> 2) This PR <https://github.com/apache/flink/pull/13711>.
>>
>> Thanks for replying, Xintong!
>>
>> On Wed, Oct 21, 2020 at 7:11 PM Xintong Song 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> It looks like while your tests are executed and passed, the java
>>> processes executing those tests did not exit properly.
>>> - Could you try execute the command manually and see if there's any
>>> useful outputs? You can find the commands by searching "Command was" in the
>>> maven logs.
>>> - Quick question: which PR are you working on? By any chance you called
>>> `System.exit()` in your codes?
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Thu, Oct 22, 2020 at 5:59 AM Dan Hill  wrote:
>>>
>>>> Sure, here's a link
>>>> <https://drive.google.com/file/d/13Q7h77zG-2vp7gJOke8QAzLtKLKIPuTf/view?usp=sharing>
>>>>  to
>>>> the output.  I think for this one I used either:
>>>> - `mvn package -e -X -DfailIfNoTests=false`
>>>> - or added a `clean` before package.
>>>>
>>>> On Wed, Oct 21, 2020 at 2:24 AM Xintong Song 
>>>> wrote:
>>>>
>>>>> Would you be able to share the complete maven logs and the command?
>>>>> And what is the maven version?
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Oct 21, 2020 at 1:37 AM Dan Hill 
>>>>> wrote:
>>>>>
>>>>>> Hi Xintong!
>>>>>>
>>>>>> No changes.  I tried -X and no additional log information is logged.
>>>>>> -DfailIfNoTests=false does not help.  `-DskipTests` works fine.  I'm 
>>>>>> going
>>>>>> to go ahead and create a PR and see if it fails.
>>>>>>
>>>>>> Thanks!
>>>>>> - Dan
>>>>>>
>>>>>> On Tue, Oct 20, 2020 at 8:22 AM Xintong Song 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Dan,
>>>>>>>
>>>>>>> The 'mvn package' command automatically includes 'mvn verify', which
>>>>>>> triggers the test cases. You can skip the tests with 'mvn package
>>>>>>> -DskipTests'. You can rely on the ci-tests running on Azure Pipeline,
>>>>>>> either in your own workspace or in the PR.
>>>>>>>
>>>>>>> If it is intended to execute the tests locally, you can try the
>>>>>>> following actions. I'm not sure whether that helps though.
>>>>>>> - Try to add '-DfailIfNoTests=false' to your maven command.
>>>>>>> - Execute the maven command with '-X' to print all the debug logs.
>>>>>>>
>>>>>>> Thank you~
>>>>>>>
>>>>>>> Xintong Song
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 20, 2020 at 3:48 PM Dan Hill 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I forked Flink to work on a PR.  When I run `mvn clean package`
>>>>>>>> from a clean branch, Maven says the runtime tests failed but the logs 
>>>>>>>> do
>>>>>>>> not appear to have details on the failure.  Do I have to do anything 
>>>>>>>> to run
>>>>>>>> these?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ...
>>>>>>>>
>>>>>>>> [INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time
>>>>>>>> elapsed: 2.24 s - in org.ap

How to debug a Flink Exception that does not have a stack trace?

2020-12-09 Thread Dan Hill
In the Flink dashboard, my job is failing with a NullPointerException but
the Exception is not showing a stack trace.  I do not see any
NullPointerExceptions in any of the flink-jobmanager and flink-taskmanager
logs.

Is this a normal issue?

[image: Screen Shot 2020-12-09 at 4.29.30 PM.png]


Re: How to debug a Flink Exception that does not have a stack trace?

2020-12-09 Thread Dan Hill
in.onTimer(RowTimeIntervalJoin.java:30)
~[flink-table-blink_2.12-1.11.1.jar:1.11.1]

at
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95)
~[flink-dist_2.12-1.11.1.jar:1.11.1]

at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Wed, Dec 9, 2020 at 4:33 PM Dan Hill  wrote:

> In the Flink dashboard, my job is failing with a NullPointerException but
> the Exception is not showing a stack trace.  I do not see any
> NullPointerExceptions in any of the flink-jobmanager and flink-taskmanager
> logs.
>
> Is this a normal issue?
>
> [image: Screen Shot 2020-12-09 at 4.29.30 PM.png]
>


Re: How to debug a Flink Exception that does not have a stack trace?

2020-12-10 Thread Dan Hill
Yea, the error makes sense and was an easy fix.

Any idea what happened with the hidden stacktrace?  The hidden stacktrace
made this 100x more difficult.

On Thu, Dec 10, 2020 at 12:59 AM Flavio Pompermaier 
wrote:

> It looks like the problem is that there's a problem in reading a null
> value in the AvroRowDataDeserializationSchema (see below for the snippet of
> code from Flink 1.11.1).
> The problem is due to the fact that there's a bad typing of the source so
> the call to createConverter() within the createNullableConverter() returns
> null, creating a null on  fieldConverters[i] and, in the end, a NullPointer
> in  fieldConverters[i].convert(). Does it make sense?
>
> static DeserializationRuntimeConverter createRowConverter(RowType rowType)
> {
> final DeserializationRuntimeConverter[] fieldConverters =
> rowType.getFields().stream()
> .map(RowType.RowField::getType)
> .map(AvroRowDataDeserializationSchema::createNullableConverter)
> .toArray(DeserializationRuntimeConverter[]::new);
> final int arity = rowType.getFieldCount();
> return avroObject -> {
> IndexedRecord record = (IndexedRecord) avroObject;
> GenericRowData row = new GenericRowData(arity);
> for (int i = 0; i < arity; ++i) {
> row.setField(i, fieldConverters[i].convert(record.get(i)));
> }
> return row;
> };
> }
>
> Best,
> Flavio
>
> On Thu, Dec 10, 2020 at 8:39 AM Dan Hill  wrote:
>
>> One of the Exception instances finally reported a stacktrace.  I'm not
>> sure why it's so infrequent.
>>
>> java.lang.NullPointerException: null
>>
>> at
>> org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$7(RowData.java:338)
>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.lambda$createRowConverter$6827278$1(AvroRowDataSerializationSchema.java:177)
>> ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?]
>>
>> at
>> org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.addElement(AvroFileSystemFormatFactory.java:251)
>> ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?]
>>
>> at
>> org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.addElement(AvroFileSystemFormatFactory.java:247)
>> ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?]
>>
>> at
>> org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:498)
>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:494)
>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:202)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.

Re: How to debug a Flink Exception that does not have a stack trace?

2020-12-11 Thread Dan Hill
Hmm, I don't have a good job I can separate for reproduction.  I was using
Table SQL and inserting a long field (which was null) into a table that
sinked out to avro.  The exception was being thrown from this Avro
function.  I can watch to see if it keeps happening.

https://github.com/apache/avro/blob/e982f2e6ee57c362a7fae21ba7373c1cfc964fce/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java#L127

On Fri, Dec 11, 2020 at 3:31 AM Till Rohrmann  wrote:

> Hi Dan,
>
> Do you have an example job and some sample data to reproduce this problem?
> I couldn't reproduce it locally with a simple example job.
>
> Cheers,
> Till
>
> On Thu, Dec 10, 2020 at 5:51 PM Dan Hill  wrote:
>
>> Yea, the error makes sense and was an easy fix.
>>
>> Any idea what happened with the hidden stacktrace?  The hidden stacktrace
>> made this 100x more difficult.
>>
>> On Thu, Dec 10, 2020 at 12:59 AM Flavio Pompermaier 
>> wrote:
>>
>>> It looks like the problem is that there's a problem in reading a null
>>> value in the AvroRowDataDeserializationSchema (see below for the snippet of
>>> code from Flink 1.11.1).
>>> The problem is due to the fact that there's a bad typing of the source
>>> so the call to createConverter() within the createNullableConverter()
>>> returns null, creating a null on  fieldConverters[i] and, in the end, a
>>> NullPointer in  fieldConverters[i].convert(). Does it make sense?
>>>
>>> static DeserializationRuntimeConverter createRowConverter(RowType
>>> rowType) {
>>> final DeserializationRuntimeConverter[] fieldConverters =
>>> rowType.getFields().stream()
>>> .map(RowType.RowField::getType)
>>> .map(AvroRowDataDeserializationSchema::createNullableConverter)
>>> .toArray(DeserializationRuntimeConverter[]::new);
>>> final int arity = rowType.getFieldCount();
>>> return avroObject -> {
>>> IndexedRecord record = (IndexedRecord) avroObject;
>>> GenericRowData row = new GenericRowData(arity);
>>> for (int i = 0; i < arity; ++i) {
>>> row.setField(i, fieldConverters[i].convert(record.get(i)));
>>> }
>>> return row;
>>> };
>>> }
>>>
>>> Best,
>>> Flavio
>>>
>>> On Thu, Dec 10, 2020 at 8:39 AM Dan Hill  wrote:
>>>
>>>> One of the Exception instances finally reported a stacktrace.  I'm not
>>>> sure why it's so infrequent.
>>>>
>>>> java.lang.NullPointerException: null
>>>>
>>>> at
>>>> org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
>>>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>>>
>>>> at
>>>> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$7(RowData.java:338)
>>>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>>>
>>>> at
>>>> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.lambda$createRowConverter$6827278$1(AvroRowDataSerializationSchema.java:177)
>>>> ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?]
>>>>
>>>> at
>>>> org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.addElement(AvroFileSystemFormatFactory.java:251)
>>>> ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?]
>>>>
>>>> at
>>>> org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.addElement(AvroFileSystemFormatFactory.java:247)
>>>> ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?]
>>>>
>>>> at
>>>> org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:498)
>>>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>>>
>>>> at
>>>> org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:494)
>>>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>>>
>>>> at
>>>> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48)
>>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>>>
>>>> at
>>>> org.apa

Re: How to debug a Flink Exception that does not have a stack trace?

2020-12-12 Thread Dan Hill
Thanks!  That makes sense.

On Sat, Dec 12, 2020 at 11:13 AM Steven Wu  wrote:

> This is a performance optimization in JVM when the same exception is
> thrown too frequently. You can set `-XX:-OmitStackTraceInFastThrow` to
> disable the feature. You can typically find the full stack trace in the log
> before the optimization kicks in.
>
> On Sat, Dec 12, 2020 at 2:05 AM Till Rohrmann 
> wrote:
>
>> Ok, then let's see whether it reoccurs. What you could do is to revert
>> the fix and check the stack trace again.
>>
>> Cheers,
>> Till
>>
>> On Sat, Dec 12, 2020, 02:16 Dan Hill  wrote:
>>
>>> Hmm, I don't have a good job I can separate for reproduction.  I was
>>> using Table SQL and inserting a long field (which was null) into a table
>>> that sinked out to avro.  The exception was being thrown from this Avro
>>> function.  I can watch to see if it keeps happening.
>>>
>>>
>>> https://github.com/apache/avro/blob/e982f2e6ee57c362a7fae21ba7373c1cfc964fce/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java#L127
>>>
>>> On Fri, Dec 11, 2020 at 3:31 AM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Dan,
>>>>
>>>> Do you have an example job and some sample data to reproduce this
>>>> problem? I couldn't reproduce it locally with a simple example job.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Dec 10, 2020 at 5:51 PM Dan Hill  wrote:
>>>>
>>>>> Yea, the error makes sense and was an easy fix.
>>>>>
>>>>> Any idea what happened with the hidden stacktrace?  The hidden
>>>>> stacktrace made this 100x more difficult.
>>>>>
>>>>> On Thu, Dec 10, 2020 at 12:59 AM Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> It looks like the problem is that there's a problem in reading a null
>>>>>> value in the AvroRowDataDeserializationSchema (see below for the snippet 
>>>>>> of
>>>>>> code from Flink 1.11.1).
>>>>>> The problem is due to the fact that there's a bad typing of the
>>>>>> source so the call to createConverter()
>>>>>> within the createNullableConverter() returns null, creating a null on
>>>>>> fieldConverters[i] and, in the end, a NullPointer in
>>>>>> fieldConverters[i].convert(). Does it make sense?
>>>>>>
>>>>>> static DeserializationRuntimeConverter createRowConverter(RowType
>>>>>> rowType) {
>>>>>> final DeserializationRuntimeConverter[] fieldConverters =
>>>>>> rowType.getFields().stream()
>>>>>> .map(RowType.RowField::getType)
>>>>>>
>>>>>> .map(AvroRowDataDeserializationSchema::createNullableConverter)
>>>>>> .toArray(DeserializationRuntimeConverter[]::new);
>>>>>> final int arity = rowType.getFieldCount();
>>>>>> return avroObject -> {
>>>>>> IndexedRecord record = (IndexedRecord) avroObject;
>>>>>> GenericRowData row = new GenericRowData(arity);
>>>>>> for (int i = 0; i < arity; ++i) {
>>>>>> row.setField(i,
>>>>>> fieldConverters[i].convert(record.get(i)));
>>>>>> }
>>>>>> return row;
>>>>>> };
>>>>>> }
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>> On Thu, Dec 10, 2020 at 8:39 AM Dan Hill 
>>>>>> wrote:
>>>>>>
>>>>>>> One of the Exception instances finally reported a stacktrace.  I'm
>>>>>>> not sure why it's so infrequent.
>>>>>>>
>>>>>>> java.lang.NullPointerException: null
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
>>>>>>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$7(RowData.java:338)
>>>>>>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
>>>>>>>

Flink - Create Temporary View and "Rowtime attributes must not be in the input rows of a regular join"

2020-12-15 Thread Dan Hill
When I try to refactor my joins into a temporary view to share joins and
state, I get the following error.  I tried a few variations of the code
snippets below (adding TIMESTAMP casts based on Google searches).  I
removed a bunch of fields to simplify this example.

Is this a known issue?  Do I have a simple coding bug?

CREATE TEMPORARY VIEW `flat_impression_view` AS

SELECT

DATE_FORMAT(input_impression.ts, '-MM-dd') AS dt,

input_insertion.log_user_id AS insertion_log_user_id,

COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3)))
AS insertion_ts,

input_insertion.insertion_id AS insertion_insertion_id,

COALESCE(CAST(input_impression.ts AS TIMESTAMP(3)), CAST(0 AS
TIMESTAMP(3))) AS impression_ts,

input_impression.impression_id AS impression_impression_id,

input_impression.insertion_id AS impression_insertion_id,

FROM input_insertion

JOIN input_impression

ON input_insertion.insertion_id = input_impression.insertion_id

AND CAST(input_insertion.ts AS TIMESTAMP) BETWEEN CAST(input_impression.ts
AS TIMESTAMP) - INTERVAL '12' HOUR AND CAST(input_impression.ts AS
TIMESTAMP) + INTERVAL '1' HOUR


INSERT INTO `flat_impression_w_click`

SELECT

dt,

insertion_log_user_id,

CAST(insertion_ts AS TIMESTAMP(3)) AS insertion_ts,

insertion_insertion_id,

CAST(impression_ts AS TIMESTAMP(3)) AS mpression_ts,

impression_impression_id,

impression_insertion_id,

COALESCE(CAST(input_click.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS
click_ts,

COALESCE(input_click.click_id, EmptyByteArray()) AS click_click_id,

COALESCE(input_click.impression_id, EmptyByteArray()) AS
click_impression_id,

FROM flat_impression_view

LEFT JOIN input_click

ON flat_impression_view.impression_impression_id = input_click.impression_id


AND CAST(flat_impression_view.impression_ts AS TIMESTAMP) BETWEEN
CAST(input_click.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND
CAST(input_click.ts AS TIMESTAMP) + INTERVAL '12' HOUR


java.lang.RuntimeException: Failed to executeSql=...

...

Caused by: org.apache.flink.table.api.TableException: Cannot generate a
valid execution plan for the given query:

FlinkLogicalLegacySink(name=[...])

+- FlinkLogicalCalc(select=[...])

   +- FlinkLogicalJoin(condition=[AND(=($36, $45),
>=(CAST($35):TIMESTAMP(6) NOT NULL, -(CAST($43):TIMESTAMP(6),
4320:INTERVAL HOUR)), <=(CAST($35):TIMESTAMP(6) NOT NULL,
+(CAST($43):TIMESTAMP(6), 4320:INTERVAL HOUR)))], joinType=[left])

  :- FlinkLogicalCalc(select=[...])

  :  +- FlinkLogicalJoin(condition=[AND(=($5, $35),
>=(CAST($4):TIMESTAMP(6), -(CAST($33):TIMESTAMP(6), 4320:INTERVAL
HOUR)), <=(CAST($4):TIMESTAMP(6), +(CAST($33):TIMESTAMP(6),
360:INTERVAL HOUR)))], joinType=[inner])

  : :- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_insertion]])

  : +- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_impression]])

  +- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_click]])


Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before.

Please check the documentation for the set of currently supported SQL
features.

at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)

at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)

at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)

at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)

at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)

at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)

at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)

Flink - sending clicks+impressions to AWS Personalize

2020-12-15 Thread Dan Hill
I want to try using AWS Personalize 
to get content recommendations.  One of the fields on the input (click)
event is a list of recent impressions.

E.g.
{
  ...
  eventType: 'click',
  eventId: 'click-1',
  itemId: 'item-1'
  impression: ['item-2', 'item-3', 'item-4', 'item-5', ],
}

Is there a way to produce this output using Flink SQK?

I tried doing a version of this but get the following error:
"Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before."

Here is a simplified version of the query.


SELECT

"user".user_id AS userId,

"view".session_id AS sessionId,  click.click_id AS eventId,

CAST(click.ts AS BIGINT) AS sentAt,

insertion.content_id AS itemId,

impression_content_ids AS impression

FROM "user"

RIGHT JOIN "view"

ON "user".log_user_id = "view".log_user_id

AND "user".ts BETWEEN "view".ts - INTERVAL '30' DAY AND "view".ts +
INTERVAL '1' HOUR

JOIN insertion

ON view.view_id = insertion.view_id

AND view.ts BETWEEN insertion.ts - INTERVAL '1' HOUR   AND insertion.ts
+ INTERVAL '1' HOUR

JOIN impression  ON insertion.insertion_id = impression.insertion_id

AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
impression.ts + INTERVAL '1' HOUR

JOIN (

SELECT log_user_id, CAST(COLLECT(DISTINCT impression_content_id) AS
ARRAY) AS impression_content_ids

FROM (

SELECT insertion.log_user_id AS log_user_id,

ROW_NUMBER() OVER (PARTITION BY insertion.log_user_id ORDER BY
impression.ts DESC) AS row_num,

  insertion.content_id AS impression_content_id

FROM insertion

JOIN impression

ON insertion.insertion_id = impression.insertion_id

AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
impression.ts + INTERVAL '1' HOUR

GROUP BY insertion.log_user_id, impression.ts, insertion.content_id

) WHERE row_num <= 25

GROUP BY log_user_id

) ON insertion.insertion_id = impression.insertion_id

AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
impression.ts + INTERVAL '1' HOUR  LEFT JOIN click

ON impression.impression_id = click.impression_id

AND impression.ts BETWEEN click.ts - INTERVAL '12' HOUR AND click.ts +
INTERVAL '12' HOUR"


Flink SQL, temporal joins and backfilling data

2020-12-29 Thread Dan Hill
Hi!

I have a Flink SQL job that does a few temporal joins and has been running
for over a month on regular data.  No issues.  Ran well.

I'm trying to re-run the Flink SQL job on the same data set but it's
failing to checkpoint and very slow to make progress.  I've modified some
of the checkpoint settings.

What else do I have to modify?

My data size is really small so I'm guessing it's still keeping state for
data outside the temporal join time windows.  Do I have to set Idle State
Retention Time to forget older data?

- Dan


Flink SQL - IntervalJoin doesn't support consuming update and delete - trying to deduplicate rows

2020-12-30 Thread Dan Hill
Hi!

I'm using Flink SQL to do an interval join.  Rows in one of the tables are
not unique.  I'm fine using either the first or last row.  When I try to
deduplicate

and
then interval join, I get the following error.

IntervalJoin doesn't support consuming update and delete changes which is
produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=1], partitionBy=[log_user_id], orderBy=[ts
ASC], select=[platform_id, user_id, log_user_id, client_log_ts,
event_api_ts, ts])

Is there a way to combine these in this order?  I could do the
deduplication afterwards but this will result in more state.

- Dan


Comparing Flink vs Materialize

2021-01-04 Thread Dan Hill
Has anyone compared Flink with Materialize?  A friend recommended me switch
to Materialize.

In one of their blog posts, it says that Flink splits operators across CPUs
(instead of splitting partitions across CPUs).  Is this true?  Is it
configurable?

https://materialize.com/blog-rocksdb/


Re: Flink SQL - IntervalJoin doesn't support consuming update and delete - trying to deduplicate rows

2021-01-14 Thread Dan Hill
Hey, sorry for the late reply.  I'm using v1.11.1.

Cool.  I did a non-SQL way of using the first row.  I'll try to see if I
can do this in the SQL version.

On Wed, Jan 13, 2021 at 11:26 PM Jark Wu  wrote:

> Hi Dan,
>
> Sorry for the late reply.
>
> I guess you applied a "deduplication with keeping last row" before the
> interval join?
> That will produce an updating stream and interval join only supports
> append-only input.
> You can try to apply "deduplication with keeping *first* row" before the
> interval join.
> That should produce an append-only stream and interval join can consume
> from it.
>
> Best,
> Jark
>
>
>
> On Tue, 5 Jan 2021 at 20:07, Arvid Heise  wrote:
>
>> Hi Dan,
>>
>> Which Flink version are you using? I know that there has been quite a bit
>> of optimization of deduplication in 1.12, which would reduce the required
>> state tremendously.
>> I'm pulling in Jark who knows more.
>>
>> On Thu, Dec 31, 2020 at 6:54 AM Dan Hill  wrote:
>>
>>> Hi!
>>>
>>> I'm using Flink SQL to do an interval join.  Rows in one of the tables
>>> are not unique.  I'm fine using either the first or last row.  When I try
>>> to deduplicate
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#deduplication>
>>>  and
>>> then interval join, I get the following error.
>>>
>>> IntervalJoin doesn't support consuming update and delete changes which
>>> is produced by node Rank(strategy=[UndefinedStrategy],
>>> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>>> partitionBy=[log_user_id], orderBy=[ts ASC], select=[platform_id, user_id,
>>> log_user_id, client_log_ts, event_api_ts, ts])
>>>
>>> Is there a way to combine these in this order?  I could do the
>>> deduplication afterwards but this will result in more state.
>>>
>>> - Dan
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>


Re: Flink SQL, temporal joins and backfilling data

2021-01-17 Thread Dan Hill
Hi Timo.  Sorry for the delay.  I'll message this message the next time I
hit this.  I haven't restarted my job in 12 days.  I'll check the
watermarks the next time I restart.

On Tue, Jan 5, 2021 at 4:47 AM Timo Walther  wrote:

> Hi Dan,
>
> are you sure that your watermarks are still correct during reprocessing?
> As far as I know, idle state retention is not used for temporal joins.
> The watermark indicates when state can be removed in this case.
>
> Maybe you can give us some more details about which kind of temporal
> join you are using (event-time or processing-time?) and checkpoint
> settings?
>
> Regards,
> Timo
>
> On 30.12.20 08:30, Dan Hill wrote:
> > Hi!
> >
> > I have a Flink SQL job that does a few temporal joins and has been
> > running for over a month on regular data.  No issues.  Ran well.
> >
> > I'm trying to re-run the Flink SQL job on the same data set but it's
> > failing to checkpoint and very slow to make progress.  I've modified
> > some of the checkpoint settings.
> >
> > What else do I have to modify?
> >
> > My data size is really small so I'm guessing it's still keeping state
> > for data outside the temporal join time windows.  Do I have to set Idle
> > State Retention Time to forget older data?
> >
> > - Dan
>
>


Flink SQL and checkpoints and savepoints

2021-01-17 Thread Dan Hill
How well does Flink SQL work with checkpoints and savepoints?  I tried to
find documentation for it in v1.11 but couldn't find it.

E.g. what happens if the Flink SQL is modified between releases?  New
columns?  Change columns?  Adding joins?


Re: Flink SQL and checkpoints and savepoints

2021-01-18 Thread Dan Hill
Thanks Timo!

The reason makes sense.

Do any of the techniques make it easy to support exactly once?

I'm inferring what is meant by dry out.  Are there any documented patterns
for it?  E.g. sending data to new kafka topics between releases?




On Mon, Jan 18, 2021, 01:04 Timo Walther  wrote:

> Hi Dan,
>
> currently, we cannot provide any savepoint guarantees between releases.
> Because of the nature of SQL that abstracts away runtime operators, it
> might be that a future execution plan will look completely different and
> thus we cannot map state anymore. This is not avoidable because the
> optimizer might get smarter when adding new optimizer rules.
>
> For such cases, we recommend to dry out the old pipeline and/or warm up
> a new pipeline with historic data when upgrading Flink. A change in
> columns sometimes works but even this depends on the used operators.
>
> Regards,
> Timo
>
>
> On 18.01.21 04:46, Dan Hill wrote:
> > How well does Flink SQL work with checkpoints and savepoints?  I tried
> > to find documentation for it in v1.11 but couldn't find it.
> >
> > E.g. what happens if the Flink SQL is modified between releases?  New
> > columns?  Change columns?  Adding joins?
> >
> >
>
>


Re: How to safely update jobs in-flight using Apache Flink on AWS EMR?

2020-06-10 Thread Dan Hill
Hi!  I'm assuming this question comes up regularly with AWS EMR.  I posted
it on Stack Overflow.

https://stackoverflow.com/questions/62309400/how-to-safely-update-jobs-in-flight-using-apache-flink-on-aws-emr



I was not able to find instructions for how to update code safely. I see
Flink docs on how to use savepoints. I'd expect an easy solution for
updating Flink jobs in AWS EMR.

https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/aws.html

https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/upgrading.html

https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html

I was expecting instructions like the following (but not for Dataflow and
Apache Beam):

https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline

https://medium.com/google-cloud/restarting-cloud-dataflow-in-flight-9c688c49adfd

>


Does anyone have an example of Bazel working with Flink?

2020-06-11 Thread Dan Hill
I took the Flink playground and I'm trying to swap out Maven for Bazel.  I
got to the point where I'm hitting the following error.  I want to diff my
code with an existing, working setup.

Thanks! - Dan


client_1|
org.apache.flink.client.program.ProgramInvocationException:
Neither a 'Main-Class', nor a 'program-class' entry was found in the jar
file.

client_1| at
org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596)

client_1| at
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190)

client_1| at
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128)

client_1| at
org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862)

client_1| at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204)

client_1| at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)

client_1| at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)

client_1| at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

client_1| at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)


Re: Does anyone have an example of Bazel working with Flink?

2020-06-15 Thread Dan Hill
Thanks for the replies!  I was able to use the provided answers to get a
setup working (maybe not the most efficiently).  The main change I made was
to switch to including the deploy jar in the image (rather than the default
one).

I'm open to contributing to a "rules_flink" project.  I don't know enough
yet to help design it.

On Sat, Jun 13, 2020 at 4:39 AM Till Rohrmann  wrote:

> Hi Dan,
>
> if you want to run a Flink job without specifying the main class via
> `bin/flink run --class org.a.b.Foobar` then you have to add a MANIFEST.MF
> file to your jar under META-INF and this file needs to contain `Main-Class:
> org.a.b.Foobar`.
>
> Cheers,
> Till
>
> On Fri, Jun 12, 2020 at 12:30 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey all,
>>
>> Adding to Aaron's response, we use Bazel to build our Flink apps. We've
>> open-sourced some of our setup here[1] though a bit outdated. There are
>> definitely rough edges/ probably needs a good deal of work to fit other
>> setups. We have written a wrapper around the `java_library` and
>> `java_binary` and could do the same for `rules_scala`, though we just
>> started using Bazel last November and have a lot to learn in terms of best
>> practices there.
>>
>> If you're interested in contributing to a `rules_flink` project, I would
>> be as well!
>>
>> Best,
>> Austin
>>
>> [1]: https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020
>>
>> On Thu, Jun 11, 2020 at 6:14 PM Aaron Levin 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> We use Bazel to compile our Flink applications. We're using
>>> "rules_scala" (https://github.com/bazelbuild/rules_scala) to manage the
>>> dependencies and produce jars. We haven't had any issues. However, I have
>>> found that sometimes it's difficult to figure out exactly what Flink target
>>> or dependency my application needs.
>>>
>>> Unfortunately I'm not sure what issue you're seeing here. I would guess
>>> either your flink application wasn't compiled into the jar
>>> you're executing. If you can paste the bazel target used to generate your
>>> jar and how you're launching the application, that will be helpful
>>> for diagnosis.
>>>
>>> On Thu, Jun 11, 2020 at 5:21 PM Dan Hill  wrote:
>>>
>>>> I took the Flink playground and I'm trying to swap out Maven for
>>>> Bazel.  I got to the point where I'm hitting the following error.  I want
>>>> to diff my code with an existing, working setup.
>>>>
>>>> Thanks! - Dan
>>>>
>>>>
>>>> client_1| 
>>>> org.apache.flink.client.program.ProgramInvocationException:
>>>> Neither a 'Main-Class', nor a 'program-class' entry was found in the jar
>>>> file.
>>>>
>>>> client_1| at
>>>> org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596)
>>>>
>>>> client_1| at
>>>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190)
>>>>
>>>> client_1| at
>>>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128)
>>>>
>>>> client_1| at
>>>> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862)
>>>>
>>>> client_1| at
>>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204)
>>>>
>>>> client_1| at
>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>>>
>>>> client_1| at
>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>>>
>>>> client_1| at
>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>>
>>>> client_1| at
>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>>>
>>>


Re: Flink SQL and checkpoints and savepoints

2021-01-28 Thread Dan Hill
Is this savepoint recovery issue also true with the Flink Table API?  I'd
assume so.  Just doublechecking.

On Mon, Jan 18, 2021 at 1:58 AM Timo Walther  wrote:

> I would check the past Flink Forward conference talks and blog posts. A
> couple of companies have developed connectors or modified existing
> connectors to make this work. Usually, based on event timestamps or some
> external control stream (DataStream API around the actual SQL pipeline
> for handling this).
>
> Also there is FLIP-150 which goes into this direction.
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
>
> Regards,
> Timo
>
>
> On 18.01.21 10:40, Dan Hill wrote:
> > Thanks Timo!
> >
> > The reason makes sense.
> >
> > Do any of the techniques make it easy to support exactly once?
> >
> > I'm inferring what is meant by dry out.  Are there any documented
> > patterns for it?  E.g. sending data to new kafka topics between releases?
> >
> >
> >
> >
> > On Mon, Jan 18, 2021, 01:04 Timo Walther  > <mailto:twal...@apache.org>> wrote:
> >
> > Hi Dan,
> >
> > currently, we cannot provide any savepoint guarantees between
> releases.
> > Because of the nature of SQL that abstracts away runtime operators,
> it
> > might be that a future execution plan will look completely different
> > and
> > thus we cannot map state anymore. This is not avoidable because the
> > optimizer might get smarter when adding new optimizer rules.
> >
> > For such cases, we recommend to dry out the old pipeline and/or warm
> up
> > a new pipeline with historic data when upgrading Flink. A change in
> > columns sometimes works but even this depends on the used operators.
> >
> > Regards,
> > Timo
> >
> >
> > On 18.01.21 04:46, Dan Hill wrote:
> >  > How well does Flink SQL work with checkpoints and savepoints?  I
> > tried
> >  > to find documentation for it in v1.11 but couldn't find it.
> >  >
> >  > E.g. what happens if the Flink SQL is modified between releases?
> > New
> >  > columns?  Change columns?  Adding joins?
> >  >
> >  >
> >
>
>


Re: Flink SQL and checkpoints and savepoints

2021-01-28 Thread Dan Hill
I went through a few of the recent Flink Forward videos and didn't see
solutions to this problem.  It sounds like some companies have solutions
but they didn't talk about them in enough detail to do something similar.

On Thu, Jan 28, 2021 at 11:45 PM Dan Hill  wrote:

> Is this savepoint recovery issue also true with the Flink Table API?  I'd
> assume so.  Just doublechecking.
>
> On Mon, Jan 18, 2021 at 1:58 AM Timo Walther  wrote:
>
>> I would check the past Flink Forward conference talks and blog posts. A
>> couple of companies have developed connectors or modified existing
>> connectors to make this work. Usually, based on event timestamps or some
>> external control stream (DataStream API around the actual SQL pipeline
>> for handling this).
>>
>> Also there is FLIP-150 which goes into this direction.
>>
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
>>
>> Regards,
>> Timo
>>
>>
>> On 18.01.21 10:40, Dan Hill wrote:
>> > Thanks Timo!
>> >
>> > The reason makes sense.
>> >
>> > Do any of the techniques make it easy to support exactly once?
>> >
>> > I'm inferring what is meant by dry out.  Are there any documented
>> > patterns for it?  E.g. sending data to new kafka topics between
>> releases?
>> >
>> >
>> >
>> >
>> > On Mon, Jan 18, 2021, 01:04 Timo Walther > > <mailto:twal...@apache.org>> wrote:
>> >
>> > Hi Dan,
>> >
>> > currently, we cannot provide any savepoint guarantees between
>> releases.
>> > Because of the nature of SQL that abstracts away runtime operators,
>> it
>> > might be that a future execution plan will look completely different
>> > and
>> > thus we cannot map state anymore. This is not avoidable because the
>> > optimizer might get smarter when adding new optimizer rules.
>> >
>> > For such cases, we recommend to dry out the old pipeline and/or
>> warm up
>> > a new pipeline with historic data when upgrading Flink. A change in
>> > columns sometimes works but even this depends on the used operators.
>> >
>> > Regards,
>> > Timo
>> >
>> >
>> > On 18.01.21 04:46, Dan Hill wrote:
>> >  > How well does Flink SQL work with checkpoints and savepoints?  I
>> > tried
>> >  > to find documentation for it in v1.11 but couldn't find it.
>> >  >
>> >  > E.g. what happens if the Flink SQL is modified between releases?
>> > New
>> >  > columns?  Change columns?  Adding joins?
>> >  >
>> >  >
>> >
>>
>>


Minicluster Flink tests, checkpoints and inprogress part files

2021-02-04 Thread Dan Hill
Hi Flink user group,

*Background*
I'm changing a Flink SQL job to use Datastream.  I'm updating an existing
Minicluster test in my code.  It has a similar structure to other tests in
flink-tests.  I call StreamExecutionEnvironment.execute.  My tests sink
using StreamingFileSink Bulk Formats to tmp local disk.

*Issue*
When I try to check the files on local disk, I see
".part-0-0.inprogress.1234abcd-5678-uuid...".

*Question*
What's the best way to get the test to complete the outputs?  I tried
checkpointing very frequently, sleeping, etc but these didn't work.

Thanks!
- Dan


Re: Minicluster Flink tests, checkpoints and inprogress part files

2021-02-05 Thread Dan Hill
Thanks Aljoscha!

On Fri, Feb 5, 2021 at 1:48 AM Aljoscha Krettek  wrote:

> Hi Dan,
>
> I'm afraid this is not easily possible using the DataStream API in
> STREAMING execution mode today. However, there is one possible solution
> and we're introducing changes that will also make this work on STREAMING
> mode.
>
> The possible solution is to use the `FileSink` instead of the
> `StreamingFileSink`. This is an updated version of the sink that works
> in both BATCH and STREAMING mode (see [1]). If you use BATCH execution
> mode all your files should be "completed" at the end.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_execution_mode.html
>
> The thing we're currently working on is FLIP-147 [2], which will allow
> sinks (and other operators) to always do one final checkpoint before
> shutting down. This will allow them to move the last outstanding
> inprogress files over to finished as well.
>
> [2] https://cwiki.apache.org/confluence/x/mw-ZCQ
>
> I hope that helps!
>
> Best,
> Aljoscha
>
> On 2021/02/04 21:37, Dan Hill wrote:
> >Hi Flink user group,
> >
> >*Background*
> >I'm changing a Flink SQL job to use Datastream.  I'm updating an existing
> >Minicluster test in my code.  It has a similar structure to other tests in
> >flink-tests.  I call StreamExecutionEnvironment.execute.  My tests sink
> >using StreamingFileSink Bulk Formats to tmp local disk.
> >
> >*Issue*
> >When I try to check the files on local disk, I see
> >".part-0-0.inprogress.1234abcd-5678-uuid...".
> >
> >*Question*
> >What's the best way to get the test to complete the outputs?  I tried
> >checkpointing very frequently, sleeping, etc but these didn't work.
> >
> >Thanks!
> >- Dan
>


Re: Minicluster Flink tests, checkpoints and inprogress part files

2021-02-06 Thread Dan Hill
Ah looks like I need to use 1.12 for this.  I'm still on 1.11.

On Fri, Feb 5, 2021, 08:37 Dan Hill  wrote:

> Thanks Aljoscha!
>
> On Fri, Feb 5, 2021 at 1:48 AM Aljoscha Krettek 
> wrote:
>
>> Hi Dan,
>>
>> I'm afraid this is not easily possible using the DataStream API in
>> STREAMING execution mode today. However, there is one possible solution
>> and we're introducing changes that will also make this work on STREAMING
>> mode.
>>
>> The possible solution is to use the `FileSink` instead of the
>> `StreamingFileSink`. This is an updated version of the sink that works
>> in both BATCH and STREAMING mode (see [1]). If you use BATCH execution
>> mode all your files should be "completed" at the end.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_execution_mode.html
>>
>> The thing we're currently working on is FLIP-147 [2], which will allow
>> sinks (and other operators) to always do one final checkpoint before
>> shutting down. This will allow them to move the last outstanding
>> inprogress files over to finished as well.
>>
>> [2] https://cwiki.apache.org/confluence/x/mw-ZCQ
>>
>> I hope that helps!
>>
>> Best,
>> Aljoscha
>>
>> On 2021/02/04 21:37, Dan Hill wrote:
>> >Hi Flink user group,
>> >
>> >*Background*
>> >I'm changing a Flink SQL job to use Datastream.  I'm updating an existing
>> >Minicluster test in my code.  It has a similar structure to other tests
>> in
>> >flink-tests.  I call StreamExecutionEnvironment.execute.  My tests sink
>> >using StreamingFileSink Bulk Formats to tmp local disk.
>> >
>> >*Issue*
>> >When I try to check the files on local disk, I see
>> >".part-0-0.inprogress.1234abcd-5678-uuid...".
>> >
>> >*Question*
>> >What's the best way to get the test to complete the outputs?  I tried
>> >checkpointing very frequently, sleeping, etc but these didn't work.
>> >
>> >Thanks!
>> >- Dan
>>
>


Re: Minicluster Flink tests, checkpoints and inprogress part files

2021-02-06 Thread Dan Hill
I changed the test to use ExecutionMode.BATCH in v1.11 and it still doesn't
work.  How did devs write minicluster tests before for similar code?  Did
they not?

On Sat, Feb 6, 2021 at 5:38 PM Dan Hill  wrote:

> Ah looks like I need to use 1.12 for this.  I'm still on 1.11.
>
> On Fri, Feb 5, 2021, 08:37 Dan Hill  wrote:
>
>> Thanks Aljoscha!
>>
>> On Fri, Feb 5, 2021 at 1:48 AM Aljoscha Krettek 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> I'm afraid this is not easily possible using the DataStream API in
>>> STREAMING execution mode today. However, there is one possible solution
>>> and we're introducing changes that will also make this work on STREAMING
>>> mode.
>>>
>>> The possible solution is to use the `FileSink` instead of the
>>> `StreamingFileSink`. This is an updated version of the sink that works
>>> in both BATCH and STREAMING mode (see [1]). If you use BATCH execution
>>> mode all your files should be "completed" at the end.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_execution_mode.html
>>>
>>> The thing we're currently working on is FLIP-147 [2], which will allow
>>> sinks (and other operators) to always do one final checkpoint before
>>> shutting down. This will allow them to move the last outstanding
>>> inprogress files over to finished as well.
>>>
>>> [2] https://cwiki.apache.org/confluence/x/mw-ZCQ
>>>
>>> I hope that helps!
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 2021/02/04 21:37, Dan Hill wrote:
>>> >Hi Flink user group,
>>> >
>>> >*Background*
>>> >I'm changing a Flink SQL job to use Datastream.  I'm updating an
>>> existing
>>> >Minicluster test in my code.  It has a similar structure to other tests
>>> in
>>> >flink-tests.  I call StreamExecutionEnvironment.execute.  My tests sink
>>> >using StreamingFileSink Bulk Formats to tmp local disk.
>>> >
>>> >*Issue*
>>> >When I try to check the files on local disk, I see
>>> >".part-0-0.inprogress.1234abcd-5678-uuid...".
>>> >
>>> >*Question*
>>> >What's the best way to get the test to complete the outputs?  I tried
>>> >checkpointing very frequently, sleeping, etc but these didn't work.
>>> >
>>> >Thanks!
>>> >- Dan
>>>
>>


UUID in part files

2021-02-07 Thread Dan Hill
Hi.

*Context*
I'm migrating my Flink SQL job to DataStream.  When switching to
StreamingFileSink, I noticed that the part files now do not have a uuid in
them.  "part-0-0" vs "part-{uuid string}-0-0".  This is easy to add with
OutputFileConfig.

*Question*
Is there a reason why the base OutputFileConfig doesn't add the uuid
automatically?  Is this just a legacy issue?  Or do most people not have
the uuid in the file outputs?


Optimizing Flink joins

2021-02-10 Thread Dan Hill
Hi!  I was curious if there are docs on how to optimize Flink joins.  I
looked around and on the Flink docs and didn't see much.  I see a little on
the Configuration page.

E.g. one of my jobs has an interval join.  Does left vs right matter for
interval join?


Re: Optimizing Flink joins

2021-02-11 Thread Dan Hill
Hi Timo!  I'm moving away from SQL to DataStream.

On Thu, Feb 11, 2021 at 9:11 AM Timo Walther  wrote:

> Hi Dan,
>
> the order of all joins depends on the order in the SQL query by default.
>
> You can also check the following example (not interval joins though) and
> swap e.g. b and c:
>
> env.createTemporaryView("a", env.fromValues(1, 2, 3));
> env.createTemporaryView("b", env.fromValues(4, 5, 6));
> env.createTemporaryView("c", env.fromValues(7, 8, 9));
>
> System.out.println(env.sqlQuery("SELECT * FROM c, b, a").explain());
>
> So you can reorder the tables in the query if that improves performance.
> For interval joins, we currently don't provide additional algorithms or
> options.
>
> Regards,
> Timo
>
> On 11.02.21 05:04, Dan Hill wrote:
> > Hi!  I was curious if there are docs on how to optimize Flink joins.  I
> > looked around and on the Flink docs and didn't see much.  I see a little
> > on the Configuration page.
> >
> > E.g. one of my jobs has an interval join.  Does left vs right matter for
> > interval join?
>
>


Re: Optimizing Flink joins

2021-02-12 Thread Dan Hill
Flink SQL is missing a reliable, commonly-used way of evolving a job (e.g.
savepointing and checkpointing).

The other concepts I heard about are not shared publicly enough to rely on
(e.g. roll off).  I wasn't able to find anything useful on this.

On Fri, Feb 12, 2021, 02:05 Timo Walther  wrote:

> Hi Dan,
>
> thanks for letting us know. Could you give us some feedback what is
> missing in SQL for this use case? Are you looking for some broadcast
> joining or which kind of algorithm would help you?
>
> Regards,
> Timo
>
> On 11.02.21 20:32, Dan Hill wrote:
> > Hi Timo!  I'm moving away from SQL to DataStream.
> >
> > On Thu, Feb 11, 2021 at 9:11 AM Timo Walther  > <mailto:twal...@apache.org>> wrote:
> >
> > Hi Dan,
> >
> > the order of all joins depends on the order in the SQL query by
> default.
> >
> > You can also check the following example (not interval joins though)
> > and
> > swap e.g. b and c:
> >
> > env.createTemporaryView("a", env.fromValues(1, 2, 3));
> > env.createTemporaryView("b", env.fromValues(4, 5, 6));
> > env.createTemporaryView("c", env.fromValues(7, 8, 9));
> >
> > System.out.println(env.sqlQuery("SELECT * FROM c, b, a").explain());
> >
> > So you can reorder the tables in the query if that improves
> > performance.
> > For interval joins, we currently don't provide additional algorithms
> or
> > options.
> >
> > Regards,
> > Timo
> >
> > On 11.02.21 05:04, Dan Hill wrote:
> >  > Hi!  I was curious if there are docs on how to optimize Flink
> > joins.  I
> >  > looked around and on the Flink docs and didn't see much.  I see a
> > little
> >  > on the Configuration page.
> >  >
> >  > E.g. one of my jobs has an interval join.  Does left vs right
> > matter for
> >  > interval join?
> >
>
>


Best practices around checkpoint intervals and sizes?

2021-02-17 Thread Dan Hill
Hi.  I'm playing around with optimizing our checkpoint intervals and sizes.

Are there any best practices around this?  I have a ~7 sequential joins and
a few sinks.  I'm curious what would result in the better throughput and
latency trade offs.  I'd assume less frequent checkpointing would increase
throughput (but constrained by how frequently I want checkedpointed sinks
written).


Debugging long Flink checkpoint durations

2021-03-01 Thread Dan Hill
Hi.  Are there good ways to debug long Flink checkpoint durations?

I'm running a backfill job that runs ~10 days of data and then starts
checkpointing failing.  Since I only see the last 10 checkpoints in the
jobmaster UI, I don't see when it starts.

I looked through the text logs and didn't see much.

I assume:
1) I have something misconfigured that is causing old state is sticking
around.
2) I don't have enough resources.


Re: Debugging long Flink checkpoint durations

2021-03-02 Thread Dan Hill
Thanks!  Yes, I've looked at these.   My job is facing backpressure
starting at an early join step.  I'm unclear if more time is fine for the
backfill or if I need more resources.

On Tue, Mar 2, 2021 at 12:50 AM Yun Gao  wrote:

> Hi Dan,
>
> I think you could see the detail of the checkpoints via the checkpoint
> UI[1]. Also, if you see in the
> pending checkpoints some tasks do not take snapshot,  you might have a
> look whether this task
> is backpressuring the previous tasks [2].
>
> Best,
> Yun
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/monitoring/checkpoint_monitoring.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
>
> --
> Sender:Dan Hill
> Date:2021/03/02 04:34:56
> Recipient:user
> Theme:Debugging long Flink checkpoint durations
>
> Hi.  Are there good ways to debug long Flink checkpoint durations?
>
> I'm running a backfill job that runs ~10 days of data and then starts
> checkpointing failing.  Since I only see the last 10 checkpoints in the
> jobmaster UI, I don't see when it starts.
>
> I looked through the text logs and didn't see much.
>
> I assume:
> 1) I have something misconfigured that is causing old state is sticking
> around.
> 2) I don't have enough resources.
>
>
>


Flink, local development, finish processing a stream of Kafka data

2021-03-02 Thread Dan Hill
Hi.

For local and tests development, I want to flush the events in my system to
make sure I'm processing everything.  My watermark does not progress to
finish all of the data.

What's the best practice for local development or tests?

If I use idle sources for 1 Kafka partition, this appears broken.  I'm
guessing there is logic to prevent removing an idle partition if it's the
only partition.  Is there a version of this I can enable for local
development that supports 1 partition?

I see this tech talk.  Are there other talks to watch?
https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be

Do I need to write my own watermark generator?  Or change my test data to
have a way of generating watermarks?

I've tried a few variants of the following source code.  The watermark
doesn't progress in the operator right after creating the source.

SingleOutputStreamOperator viewInput = env.addSource(...)
.uid("source-view")
.assignTimestampsAndWatermarks(

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));


Re: Debugging long Flink checkpoint durations

2021-03-04 Thread Dan Hill
I dove deeper into it and made a little more progress (by giving more
resources).

Here is a screenshot of one bottleneck:
https://drive.google.com/file/d/1CIatEuIJwmKjBE9__RihVlxSilchtKS1/view

My job isn't making any progress.  It's checkpointing and failing.  The
taskmaster text logs are empty during the checkpoint.  It's not clear if
the checkpoint is making any progress.
https://drive.google.com/file/d/1slLO6PJVhXfoAN5OrSqsE9G7kvHPXJnl/view?usp=sharing

I spent some time changing the memory parameters but it's unclear if I'm
making forward progress.
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html




On Tue, Mar 2, 2021 at 3:45 PM Dan Hill  wrote:

> Thanks!  Yes, I've looked at these.   My job is facing backpressure
> starting at an early join step.  I'm unclear if more time is fine for the
> backfill or if I need more resources.
>
> On Tue, Mar 2, 2021 at 12:50 AM Yun Gao  wrote:
>
>> Hi Dan,
>>
>> I think you could see the detail of the checkpoints via the checkpoint
>> UI[1]. Also, if you see in the
>> pending checkpoints some tasks do not take snapshot,  you might have a
>> look whether this task
>> is backpressuring the previous tasks [2].
>>
>> Best,
>> Yun
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/monitoring/checkpoint_monitoring.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
>>
>> --
>> Sender:Dan Hill
>> Date:2021/03/02 04:34:56
>> Recipient:user
>> Theme:Debugging long Flink checkpoint durations
>>
>> Hi.  Are there good ways to debug long Flink checkpoint durations?
>>
>> I'm running a backfill job that runs ~10 days of data and then starts
>> checkpointing failing.  Since I only see the last 10 checkpoints in the
>> jobmaster UI, I don't see when it starts.
>>
>> I looked through the text logs and didn't see much.
>>
>> I assume:
>> 1) I have something misconfigured that is causing old state is sticking
>> around.
>> 2) I don't have enough resources.
>>
>>
>>


Re: Debugging long Flink checkpoint durations

2021-03-04 Thread Dan Hill
The checkpoint was only acknowledged shortly after it was started.

On Thu, Mar 4, 2021 at 12:38 PM Dan Hill  wrote:

> I dove deeper into it and made a little more progress (by giving more
> resources).
>
> Here is a screenshot of one bottleneck:
> https://drive.google.com/file/d/1CIatEuIJwmKjBE9__RihVlxSilchtKS1/view
>
> My job isn't making any progress.  It's checkpointing and failing.  The
> taskmaster text logs are empty during the checkpoint.  It's not clear if
> the checkpoint is making any progress.
>
> https://drive.google.com/file/d/1slLO6PJVhXfoAN5OrSqsE9G7kvHPXJnl/view?usp=sharing
>
> I spent some time changing the memory parameters but it's unclear if I'm
> making forward progress.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html
>
>
>
>
> On Tue, Mar 2, 2021 at 3:45 PM Dan Hill  wrote:
>
>> Thanks!  Yes, I've looked at these.   My job is facing backpressure
>> starting at an early join step.  I'm unclear if more time is fine for the
>> backfill or if I need more resources.
>>
>> On Tue, Mar 2, 2021 at 12:50 AM Yun Gao  wrote:
>>
>>> Hi Dan,
>>>
>>> I think you could see the detail of the checkpoints via the checkpoint
>>> UI[1]. Also, if you see in the
>>> pending checkpoints some tasks do not take snapshot,  you might have a
>>> look whether this task
>>> is backpressuring the previous tasks [2].
>>>
>>> Best,
>>> Yun
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/monitoring/checkpoint_monitoring.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
>>>
>>> --
>>> Sender:Dan Hill
>>> Date:2021/03/02 04:34:56
>>> Recipient:user
>>> Theme:Debugging long Flink checkpoint durations
>>>
>>> Hi.  Are there good ways to debug long Flink checkpoint durations?
>>>
>>> I'm running a backfill job that runs ~10 days of data and then starts
>>> checkpointing failing.  Since I only see the last 10 checkpoints in the
>>> jobmaster UI, I don't see when it starts.
>>>
>>> I looked through the text logs and didn't see much.
>>>
>>> I assume:
>>> 1) I have something misconfigured that is causing old state is sticking
>>> around.
>>> 2) I don't have enough resources.
>>>
>>>
>>>


Gradually increasing checkpoint size

2021-03-07 Thread Dan Hill
Hi!

I'm running a backfill Flink stream job over older data.  It has multiple
interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd
expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess
is that my checkpoint has a bunch of useless state in it.

- Dan


Re: Gradually increasing checkpoint size

2021-03-08 Thread Dan Hill
Hi Yun!

Thanks for the quick reply.

One of the lowerBounds is large but the table being joined with is ~500
rows.  I also have my own operator that only outputs the first value.

public class OnlyFirstUser extends
RichFlatMapFunction {


private transient ValueState alreadyOutputted;


@Override

public void flatMap(T value, Collector out) throws Exception {

if (!alreadyOutputted.value()) {

alreadyOutputted.update(true);

out.collect(value);

}

}


@Override

public void open(Configuration config) {

ValueStateDescriptor descriptor =

new ValueStateDescriptor<>(

"alreadyOutputted", // the state name

TypeInformation.of(new TypeHint() {}), //
type information

false); // default value of the state, if nothing
was set

alreadyOutputted = getRuntimeContext().getState(descriptor);

}

}

All of my inputs have this watermark strategy.  In the Flink UI, early in
the job run, I see "Low Watermarks" on each node and they increase.  After
some checkpoint failures, low watermarks stop appearing in the UI
<https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>
.


.assignTimestampsAndWatermarks(


WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));



Thanks Yun!


On Mon, Mar 8, 2021 at 7:27 AM Yun Gao  wrote:

> Hi Dan,
>
> Have you use a too large upperBound or lowerBound?
>
> If not, could you also check the watermark strategy ?
> The interval join operator depends on the event-time
> timer for cleanup, and the event-time timer would be
> triggered via watermark.
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Dan Hill 
> *Send Date:*Mon Mar 8 14:59:48 2021
> *Recipients:*user 
> *Subject:*Gradually increasing checkpoint size
>
>> Hi!
>>
>> I'm running a backfill Flink stream job over older data.  It has multiple
>> interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd
>> expect my checkpoints to stabilize and not grow.
>>
>> Is there a setting to prune useless data from the checkpoint?  My top
>> guess is that my checkpoint has a bunch of useless state in it.
>>
>> - Dan
>>
>


Re: Re: Gradually increasing checkpoint size

2021-03-08 Thread Dan Hill
Hi Yun!

That advice was useful.  The state for that operator is very small (31kb).
Most of the checkpoint size is in a couple simple DataStream.intervalJoin
operators.  The time intervals are fairly short.

I'm going to try running the code with some small configuration changes.
One thing I did notice is that I set a positive value for the
relativeUpperBound.  I'm not sure if I found a bug in IntervalJoinOperator
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>.
The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for
clean up.  It has some logic around cleaning up the right side that
uses timerTimestamp
+ lowerBound
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>.
However, processElement doesn’t use the same logic when creating a timer (I
only see + lowerBound
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>).
Maybe I'm misreading the code.  It feels like a bug.


On Mon, Mar 8, 2021 at 10:29 PM Yun Gao  wrote:

> Hi Dan,
>
> Regarding the original checkpoint size problem, could you also have a
> check
> which tasks' state are increasing from the checkpoint UI ? For example,
> the
> attached operator has a `alreadyOutputed` value state, which seems to keep
> increasing if there are always new keys ?
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Dan Hill 
> *Send Date:*Tue Mar 9 00:59:24 2021
> *Recipients:*Yun Gao 
> *CC:*user 
> *Subject:*Re: Gradually increasing checkpoint size
>
>> Hi Yun!
>>
>> Thanks for the quick reply.
>>
>> One of the lowerBounds is large but the table being joined with is ~500
>> rows.  I also have my own operator that only outputs the first value.
>>
>> public class OnlyFirstUser extends
>> RichFlatMapFunction {
>>
>>
>> private transient ValueState alreadyOutputted;
>>
>>
>> @Override
>>
>> public void flatMap(T value, Collector out) throws Exception {
>>
>> if (!alreadyOutputted.value()) {
>>
>> alreadyOutputted.update(true);
>>
>> out.collect(value);
>>
>> }
>>
>> }
>>
>>
>> @Override
>>
>> public void open(Configuration config) {
>>
>> ValueStateDescriptor descriptor =
>>
>> new ValueStateDescriptor<>(
>>
>> "alreadyOutputted", // the state name
>>
>> TypeInformation.of(new TypeHint() {}),
>> // type information
>>
>> false); // default value of the state, if
>> nothing was set
>>
>> alreadyOutputted = getRuntimeContext().getState(descriptor);
>>
>> }
>>
>> }
>>
>> All of my inputs have this watermark strategy.  In the Flink UI, early in
>> the job run, I see "Low Watermarks" on each node and they increase.  After
>> some checkpoint failures, low watermarks stop appearing in the UI
>> <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>
>> .
>>
>>
>> .assignTimestampsAndWatermarks(
>>
>>
>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>
>>
>>
>> Thanks Yun!
>>
>>
>> On Mon, Mar 8, 2021 at 7:27 AM Yun Gao  wrote:
>>
>>> Hi Dan,
>>>
>>> Have you use a too large upperBound or lowerBound?
>>>
>>> If not, could you also check the watermark strategy ?
>>> The interval join operator depends on the event-time
>>> timer for cleanup, and the event-time timer would be
>>> triggered via watermark.
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> --Original Mail --
>>> *Sender:*Dan Hill 
>>> *Send Date:*Mon Mar 8 14:59:48 2021
>>> *Recipients:*user 
>>> *Subject:*Gradually increasing checkpoint size
>>>
>>>> Hi!
>>>>
>>>> I'm running a backfill Flink stream job over older data.  It has
>>>> multiple interval joins.  I noticed my checkpoint is regularly gaining in
>>>> size.  I'd expect my checkpoints to stabilize and not grow.
>>>>
>>>> Is there a setting to prune useless data from the checkpoint?  My top
>>>> guess is that my checkpoint has a bunch of useless state in it.
>>>>
>>>> - Dan
>>>>
>>>


Best practices for complex state manipulation

2021-03-09 Thread Dan Hill
Hi!

I'm working on a join setup that does fuzzy matching in case the client
does not send enough parameters to join by a foreign key.  There's a few
ways I can store the state.  I'm curious about best practices around this.
I'm using rocksdb as the state storage.

I was reading the code for IntervalJoin

and was a little shocked by the implementation.  It feels designed for very
short join intervals.

I read this set of pages

but I'm looking for one level deeper.  E.g. what are performance
characteristics of different types of state crud operations with rocksdb?
E.g. I could create extra MapState to act as an index.  When is this worth
it?


Re: Best practices for complex state manipulation

2021-03-10 Thread Dan Hill
Thanks Gordon and Seth!

On Wed, Mar 10, 2021, 21:55 Tzu-Li (Gordon) Tai  wrote:

> Hi Dan,
>
> For a deeper dive into state backends and how they manage state, or
> performance critical aspects such as state serialization and choosing
> appropriate state structures, I highly recommend starting from this webinar
> done by my colleague Seth Weismann:
> https://www.youtube.com/watch?v=9GF8Hwqzwnk.
>
> Cheers,
> Gordon
>
> On Wed, Mar 10, 2021 at 1:58 AM Dan Hill  wrote:
>
>> Hi!
>>
>> I'm working on a join setup that does fuzzy matching in case the client
>> does not send enough parameters to join by a foreign key.  There's a few
>> ways I can store the state.  I'm curious about best practices around this.
>> I'm using rocksdb as the state storage.
>>
>> I was reading the code for IntervalJoin
>> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>
>> and was a little shocked by the implementation.  It feels designed for very
>> short join intervals.
>>
>> I read this set of pages
>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html>
>> but I'm looking for one level deeper.  E.g. what are performance
>> characteristics of different types of state crud operations with rocksdb?
>> E.g. I could create extra MapState to act as an index.  When is this worth
>> it?
>>
>>
>>


Re: Gradually increasing checkpoint size

2021-03-12 Thread Dan Hill
I figured it out.  I have some records with the same key and I was doing an
IntervalJoin.  One of the IntervalJoin implementations that I found looks
like it the runtime increases exponentially when there are duplicate keys.
I introduced a de-duping step and it works a lot faster.

On Thu, Mar 11, 2021 at 5:30 AM Dawid Wysakowicz 
wrote:

> Hey Dan,
>
> I think the logic should be correct. Mind that in the processElement we
> are using *relative*Upper/LowerBound, which are inverted global bound:
>
> relativeUpperBound = upperBound for left and -lowerBound for right
>
> relativeLowerBound = lowerBound for left and -upperBound for right
>
> Therefore the cleaning logic in onTimer effectively uses the same logic.
> If I understand it correctly, this trick was introduced to deduplicate the
> method.
>
> There might be a bug somewhere, but I don't think it's where you pointed.
> I'd suggest to first investigate the progress of watermarks.
>
> Best,
>
> Dawid
> On 09/03/2021 08:36, Dan Hill wrote:
>
> Hi Yun!
>
> That advice was useful.  The state for that operator is very small
> (31kb).  Most of the checkpoint size is in a couple simple
> DataStream.intervalJoin operators.  The time intervals are fairly short.
>
> I'm going to try running the code with some small configuration changes.
> One thing I did notice is that I set a positive value for the
> relativeUpperBound.  I'm not sure if I found a bug in IntervalJoinOperator
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>.
> The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for
> clean up.  It has some logic around cleaning up the right side that uses 
> timerTimestamp
> + lowerBound
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>.
> However, processElement doesn’t use the same logic when creating a timer (I
> only see + lowerBound
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>).
> Maybe I'm misreading the code.  It feels like a bug.
>
>
> On Mon, Mar 8, 2021 at 10:29 PM Yun Gao  wrote:
>
>> Hi Dan,
>>
>> Regarding the original checkpoint size problem, could you also have a
>> check
>> which tasks' state are increasing from the checkpoint UI ? For example,
>> the
>> attached operator has a `alreadyOutputed` value state, which seems to keep
>> increasing if there are always new keys ?
>>
>> Best,
>> Yun
>>
>>
>> --Original Mail --
>> *Sender:*Dan Hill 
>> *Send Date:*Tue Mar 9 00:59:24 2021
>> *Recipients:*Yun Gao 
>> *CC:*user 
>> *Subject:*Re: Gradually increasing checkpoint size
>>
>>> Hi Yun!
>>>
>>> Thanks for the quick reply.
>>>
>>> One of the lowerBounds is large but the table being joined with is ~500
>>> rows.  I also have my own operator that only outputs the first value.
>>>
>>> public class OnlyFirstUser extends
>>> RichFlatMapFunction {
>>>
>>>
>>> private transient ValueState alreadyOutputted;
>>>
>>>
>>> @Override
>>>
>>> public void flatMap(T value, Collector out) throws Exception {
>>>
>>> if (!alreadyOutputted.value()) {
>>>
>>> alreadyOutputted.update(true);
>>>
>>> out.collect(value);
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> @Override
>>>
>>> public void open(Configuration config) {
>>>
>>> ValueStateDescriptor descriptor =
>>>
>>> new ValueStateDescriptor<>(
>>>
>>> "alreadyOutputted", // the state name
>>>
>>> TypeInformation.of(new TypeHint() {}),
>>> // type information
>>>
>>> false); // default value of the state, if
>>> nothing was set
>>>
>>> alreadyOutputted = getRuntimeContext().getState(descriptor);
>>>
>>> }
>>>
>>> }
>>>
>>> All of my inputs have this watermark strategy.  In the Flink UI, early
>>> in the job run, I see "Low Watermarks" on each node and they increase.
>>> After some checkpoin

Does WatermarkStrategy.withIdleness work?

2021-03-12 Thread Dan Hill
I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it
broken?  None of my timers trigger when I'd expect idleness to take over.

On Tue, Mar 2, 2021 at 11:15 PM Dan Hill  wrote:

> Hi.
>
> For local and tests development, I want to flush the events in my system
> to make sure I'm processing everything.  My watermark does not progress to
> finish all of the data.
>
> What's the best practice for local development or tests?
>
> If I use idle sources for 1 Kafka partition, this appears broken.  I'm
> guessing there is logic to prevent removing an idle partition if it's the
> only partition.  Is there a version of this I can enable for local
> development that supports 1 partition?
>
> I see this tech talk.  Are there other talks to watch?
> https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be
>
> Do I need to write my own watermark generator?  Or change my test data to
> have a way of generating watermarks?
>
> I've tried a few variants of the following source code.  The watermark
> doesn't progress in the operator right after creating the source.
>
> SingleOutputStreamOperator viewInput = env.addSource(...)
> .uid("source-view")
> .assignTimestampsAndWatermarks(
>
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>


Re: Does WatermarkStrategy.withIdleness work?

2021-03-12 Thread Dan Hill
Thanks David!

On Fri, Mar 12, 2021, 01:54 David Anderson  wrote:

> WatermarkStrategy.withIdleness works by marking idle streams as idle, so
> that downstream operators will ignore those streams and allow the
> watermarks to progress based only on the advancement of the watermarks of
> the still active streams. As you suspected, this mechanism does not provide
> for the watermark to be advanced in situations where all of the streams are
> idle.
>
> If your goal is ensure that all of the events are processed and all
> event-time timers are fired (and all event-time windows are closed) before
> a job ends, Flink already includes a mechanism for this purpose. If you are
> using a bounded source, then when that source reaches the end of its input,
> a final Watermark of value Watermark.MAX_WATERMARK will be automatically
> emitted. The --drain option, as in
>
> ./bin/flink stop --drain 
>
> also has this effect [1].
>
> With a Kafka source, you can arrange for this to happen by having your
> kafka deserializer return true from its isEndOfStream() method. Or you
> could use the new KafkaSource connector included in Flink 1.12 with
> its setBounded option.
>
> On the other hand, if you really did need to advance the watermark despite
> a (possibly temporary) total lack of events, you could implement a
> watermark strategy that artificially advances the watermark based on the
> passage of processing time. You'll find an example in [2], though it hasn't
> been updated to use the new watermark strategy interface.
>
> Regards,
> David
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#stopping-a-job-gracefully-creating-a-final-savepoint
> [2]
> https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java
>
> On Fri, Mar 12, 2021 at 9:47 AM Dan Hill  wrote:
>
>> I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it
>> broken?  None of my timers trigger when I'd expect idleness to take over.
>>
>> On Tue, Mar 2, 2021 at 11:15 PM Dan Hill  wrote:
>>
>>> Hi.
>>>
>>> For local and tests development, I want to flush the events in my system
>>> to make sure I'm processing everything.  My watermark does not progress to
>>> finish all of the data.
>>>
>>> What's the best practice for local development or tests?
>>>
>>> If I use idle sources for 1 Kafka partition, this appears broken.  I'm
>>> guessing there is logic to prevent removing an idle partition if it's the
>>> only partition.  Is there a version of this I can enable for local
>>> development that supports 1 partition?
>>>
>>> I see this tech talk.  Are there other talks to watch?
>>> https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be
>>>
>>> Do I need to write my own watermark generator?  Or change my test data
>>> to have a way of generating watermarks?
>>>
>>> I've tried a few variants of the following source code.  The watermark
>>> doesn't progress in the operator right after creating the source.
>>>
>>> SingleOutputStreamOperator viewInput = env.addSource(...)
>>> .uid("source-view")
>>> .assignTimestampsAndWatermarks(
>>>
>>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>>
>>


  1   2   >