Re: How to parse list values in csv file

2020-12-06 Thread narasimha
thanks for you email.

Translated csv to JSON, read it as a plain text file and then processed to
objects.
It solved my use case.



On Fri, Dec 4, 2020 at 12:24 PM Yun Gao  wrote:

>
> Hi,
>
> The CSV only supports the types listed in [1] and must use the types
> in this list, thus for other types some kind of workaround is needed, like
> first parsed as string and parsed again later in the program.
>
> Best,
> Yun
>
>
>
> [1]
> https://github.com/apache/flink/blob/e10e548feb2bedf54c3863bbd49ed4f9140546cf/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java#L287
>
>
> --Original Mail --
> *Sender:*narasimha 
> *Send Date:*Fri Dec 4 00:45:53 2020
> *Recipients:*user 
> *Subject:*How to parse list values in csv file
>
>> Hi,
>>
>> Getting below error when trying to read a csv file, one of the field is
>> list tupe
>>
>> Can someone help if fixing the issue
>>
>> jobmanager_1   | Caused by: java.lang.IllegalArgumentException: The type
>> 'java.util.List' is not supported for the CSV input format.
>>
>> jobmanager_1   | at
>> org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:289)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>> jobmanager_1   | at
>> org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:67)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>> jobmanager_1   | at
>> org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:83)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>> jobmanager_1   | at
>> org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:87)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>> --
>> A.Narasimha Swamy
>>
>

-- 
A.Narasimha Swamy


Re: Application Mode support on VVP v2.3

2020-12-06 Thread narasimha
thanks Fabian for responding.

flink image : registry.ververica.com/v2.2/flink:1.11.1-stream1-scala_2.12

There are no errors as such. But it is just considering the first job.


On Thu, Dec 3, 2020 at 5:34 PM Fabian Paul 
wrote:

> Hi Narasimha,
>
> Nothing comes to my mind immediately why it should not work. We are using
> the StandaloneApplicationClusterEntryPoint to start the cluster. Can you
> provide some more information about which Flink image on vvp are you trying
> to use and maybe show the error message?
>
> Best,
> Fabian



-- 
A.Narasimha Swamy


Flink logs with extra pipeline property

2020-12-06 Thread Sidney Feiner
Hi,

We're using Apache Flink 1.9.2 and we've started logging everything as JSON 
with log4j (standard log4j1 that comes with Flink). When I say JSON logging, I 
just mean that I've formatted in according to:

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": 
"%d{ISO8601}", "class": "%c", "line": "%L", "message": "%m"}%n


Now I would like to somehow add a field to this JSON to indicate which pipeline 
generated the log . At first I thought I'd add another field that logs some 
environment variable like such:

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": 
"%d{ISO8601}", "class": "%c", "line": "%L", "pipeline: "${PIPELINE}", 
"message": "%m"}%n

But that doesn't seem to be working (is it because the TM is inited before the 
pipeline and that's when the placeholders are set?).

Do you know of a way I could add a field of the current pipeline running? In my 
"Main" I have access to the pipeline name and I also have access to this 
variable in the tasks themselves. I would prefer not needing to explicitly 
using this variable when I log, but that it would be automatic during logging.

If anybody has an idea, I'd love to hear it (we can use logback or anything 
else if necessary),

Thanks :)



Flink UDF registration from jar at runtime

2020-12-06 Thread Jakub N
The current setup is: Data in Kafka -> Kafka Connector -> 
StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java 
class file during runtime. What I have tried so far is using Java's Classloader 
getting an instance of a ScalarFunction (UDF) and registering it in the 
StreamTableEnvironment. When I try executing a query making use of the UDF I 
get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at 
java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at 
java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at 
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at 
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at 
org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when 
invoking any of its methods.

Do you have any ideas on why this is failing?


How to filter kafka stream from multiples source csv files.

2020-12-06 Thread amrahmed
Hello,

I am newbie in Flink, I am stuck and looking for help,  I want to join
Streams A, B, C, D from csv source files, some of the streams update
frequently and I have another stream high throughput from Kafka K and I need
to filter K stream from [A,B,C,D].  I tried using Flink table API, Union all
streams [A,B,C,D] and then execute query SELECT * FROM K k WHERE k.key Not
IN (SELECT Key FROM ALL_UNION_LIST) but stuck in Error AppendStreamTableSink
doesn't consuming update and delete. Only select query without join get
executed and if you can let me know also How I can implement this solution
in DataStream API. All the streams have common key.

Best,
Amr



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink UDF registration from jar at runtime

2020-12-06 Thread Guowei Ma
Hi, Jakub
In theory there should not be any problem because you could register the
function object.
So would you like to share your code and the shell command that you submit
your job?
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N  wrote:

> The current setup is: Data in Kafka -> Kafka Connector ->
> StreamTableEnvironment -> execute Flink SQL queries
>
> I would like to register Flink's User-defined Functions from a jar or java
> class file during runtime. What I have tried so far is using Java's
> Classloader getting an instance of a ScalarFunction (UDF) and registering
> it in the StreamTableEnvironment. When I try executing a query making use
> of the UDF I get the following exception:
>
>
> Exception in thread "main" java.lang.ClassNotFoundException: myFunction
>
> at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>
> at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>
> at java.base/java.lang.Class.forName0(Native Method)
>
> at java.base/java.lang.Class.forName(Class.java:398)
>
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>
> at
> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
>
> at
> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
>
> at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
>
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
>
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
>
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>
> at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
>
> at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
>
> at
> org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
>
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
>
> at
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
>
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
>
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>
> at scala.collection.TraversableLike.map(TraversableLike.scala:285)
>
> at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
>
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> ...
>
>
> I have verified that the generated instance of the UDF behaves as expected
> when invoking any of its methods.
>
> Do you have any ideas on why this is failing?
>


Re: taskmanager.cpu.cores 1.7976931348623157E308

2020-12-06 Thread Yangze Guo
Hi, Rex,

Can you share more logs for it. Did you see something like "The
configuration option taskmanager.cpu.cores required for local
execution is not set, setting it to" in your logs?

Best,
Yangze Guo

Best,
Yangze Guo


On Sat, Dec 5, 2020 at 6:53 PM David Anderson  wrote:
>
> taskmanager.cpu.cores is intended for internal use only -- you aren't meant 
> to set this option. What happens if you leave it alone?
>
> Regards,
> David
>
>
> On Sat, Dec 5, 2020 at 8:04 AM Rex Fenley  wrote:
>>
>> We're running this in a local environment so that may be contributing to 
>> what we're seeing.
>>
>> On Fri, Dec 4, 2020 at 10:41 PM Rex Fenley  wrote:
>>>
>>> Hello,
>>>
>>> I'm tuning flink for parallelism right now and when I look at the 
>>> JobManager I see
>>> taskmanager.cpu.cores1.7976931348623157E308
>>> Which looks like the maximum double number.
>>>
>>> We have 8 cpu cores, so we figured we'd bump to 16 for hyper threading. We 
>>> have 37 operators so we rounded up and set 40 task slots.
>>>
>>> Here is our configuration
>>>
>>> "vmArgs": "-Xmx16g -Xms16g -XX:MaxDirectMemorySize=1207959552 
>>> -XX:MaxMetaspaceSize=268435456 -Dlog.file=/tmp/flink.log 
>>> -Dtaskmanager.memory.framework.off-heap.size=134217728b 
>>> -Dtaskmanager.memory.network.max=1073741824b 
>>> -Dtaskmanager.memory.network.min=1073741824b 
>>> -Dtaskmanager.memory.framework.heap.size=134217728b 
>>> -Dtaskmanager.memory.managed.size=6335076856b 
>>> -Dtaskmanager.memory.task.heap.size=8160437768b 
>>> -Dtaskmanager.memory.task.off-heap.size=0b 
>>> -Dtaskmanager.numberOfTaskSlots=40 -Dtaskmanager.cpu.cores=16.0"
>>>
>>> We then tried with -Dtaskmanager.cpu.cores=7.0 and still ended up with that 
>>> very odd value for cpu cores.
>>>
>>> How do we correctly adjust this?
>>>
>>> Thanks!
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


Re: taskmanager.cpu.cores 1.7976931348623157E308

2020-12-06 Thread Yangze Guo
My gut feeling is your "vmArgs" does not take effect.

Best,
Yangze Guo

On Mon, Dec 7, 2020 at 10:32 AM Yangze Guo  wrote:
>
> Hi, Rex,
>
> Can you share more logs for it. Did you see something like "The
> configuration option taskmanager.cpu.cores required for local
> execution is not set, setting it to" in your logs?
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Sat, Dec 5, 2020 at 6:53 PM David Anderson  wrote:
> >
> > taskmanager.cpu.cores is intended for internal use only -- you aren't meant 
> > to set this option. What happens if you leave it alone?
> >
> > Regards,
> > David
> >
> >
> > On Sat, Dec 5, 2020 at 8:04 AM Rex Fenley  wrote:
> >>
> >> We're running this in a local environment so that may be contributing to 
> >> what we're seeing.
> >>
> >> On Fri, Dec 4, 2020 at 10:41 PM Rex Fenley  wrote:
> >>>
> >>> Hello,
> >>>
> >>> I'm tuning flink for parallelism right now and when I look at the 
> >>> JobManager I see
> >>> taskmanager.cpu.cores1.7976931348623157E308
> >>> Which looks like the maximum double number.
> >>>
> >>> We have 8 cpu cores, so we figured we'd bump to 16 for hyper threading. 
> >>> We have 37 operators so we rounded up and set 40 task slots.
> >>>
> >>> Here is our configuration
> >>>
> >>> "vmArgs": "-Xmx16g -Xms16g -XX:MaxDirectMemorySize=1207959552 
> >>> -XX:MaxMetaspaceSize=268435456 -Dlog.file=/tmp/flink.log 
> >>> -Dtaskmanager.memory.framework.off-heap.size=134217728b 
> >>> -Dtaskmanager.memory.network.max=1073741824b 
> >>> -Dtaskmanager.memory.network.min=1073741824b 
> >>> -Dtaskmanager.memory.framework.heap.size=134217728b 
> >>> -Dtaskmanager.memory.managed.size=6335076856b 
> >>> -Dtaskmanager.memory.task.heap.size=8160437768b 
> >>> -Dtaskmanager.memory.task.off-heap.size=0b 
> >>> -Dtaskmanager.numberOfTaskSlots=40 -Dtaskmanager.cpu.cores=16.0"
> >>>
> >>> We then tried with -Dtaskmanager.cpu.cores=7.0 and still ended up with 
> >>> that very odd value for cpu cores.
> >>>
> >>> How do we correctly adjust this?
> >>>
> >>> Thanks!
> >>> --
> >>>
> >>> Rex Fenley  |  Software Engineer - Mobile and Backend
> >>>
> >>>
> >>> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
> >>
> >>
> >>
> >> --
> >>
> >> Rex Fenley  |  Software Engineer - Mobile and Backend
> >>
> >>
> >> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


Re: Flink logs with extra pipeline property

2020-12-06 Thread Yang Wang
I think you could use the following config options to set the environments
for JobManager and TaskManager.
And then you could use the envs in the log4j configuration file.
"${env:PIPELINE}" could be used in log4j2.

containerized.master.env.PIPELINE: my-flink-pipeline
containerized.taskmanager.env.PIPELINE: my-flink-pipeline


For log4j2, I am afraid you need to set the java dynamic option[1] to get a
similar effect.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/config.html#env-java-opts


Best,
Yang

Sidney Feiner  于2020年12月6日周日 下午10:13写道:

> Hi,
>
> We're using Apache Flink 1.9.2 and we've started logging everything as
> JSON with log4j (standard log4j1 that comes with Flink). When I say JSON
> logging, I just mean that I've formatted in according to:
>
> log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts":
> "%d{ISO8601}", "class": "%c", "line": "%L", "message": "%m"}%n
>
>
> Now I would like to somehow add a field to this JSON to indicate which
> pipeline generated the log . At first I thought I'd add another field that
> logs some environment variable like such:
>
> log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts":
> "%d{ISO8601}", "class": "%c", "line": "%L", "pipeline: "${PIPELINE}",
> "message": "%m"}%n
>
> But that doesn't seem to be working (is it because the TM is inited before
> the pipeline and that's when the placeholders are set?).
>
> Do you know of a way I could add a field of the current pipeline running?
> In my "Main" I have access to the pipeline name and I also have access to
> this variable in the tasks themselves. I would prefer not needing to
> explicitly using this variable when I log, but that it would be automatic
> during logging.
>
> If anybody has an idea, I'd love to hear it (we can use logback or
> anything else if necessary),
>
> Thanks :)
>
>


Re: taskmanager.cpu.cores 1.7976931348623157E308

2020-12-06 Thread Xintong Song
Hi Rex,

We're running this in a local environment so that may be contributing to
> what we're seeing.
>
Just to double check on this. By `local environment`, you mean running
flink without setting up a standalone cluster or submitting it to a
K8s/Yarn cluster? (Typically executing from an IDE, running `flink run -t
local`, or running your own application that calls
`ExecutionEnvironment#execute`).
If yes, then this is kind of expected.

A couple of things that might help you understand this.

   - Running on a local environment means setting up the Flink cluster
   within the current process (the IDE process if executed from an IDE, the
   flink client process if using `flink run -t local`, or your own application
   process). That also means most of the resource configurations cannot take
   effect, because the resources of the JVM are already determined. Please
   refer to the memory configuration documents for options that still take
   effect in local execution. [1][2]
   - David is correct that `taskmanager.cpu.cores` is only intended for
   internal usages. I assume you learnt about this configuration by reading
   the source codes? If true, please also be aware that the JavaDoc
   of `TaskManagerOption#CPU_CORES` says "DO NOT USE THIS CONFIG OPTION", and
   it is also annotated with `ExcludeFromDocumentation` so that users do not
   learn this option from the documents.
   - Flink does not really control how many cpu cores it uses. However,
   when running on an external resource management system (K8s, Yarn, Mesos),
   it requires a certain amount of cpu resources for its containers/pods, and
   allows the external system to control its cpu usage. You can use the
   following configuration options to control how many cpu cores are requested
   in such cases.
  - kubernetes.jobmanager.cpu
  - kubernetes.taskmanager.cpu
  - yarn.appmaster.vcores
  - yarn.containers.vcores
  - mesos.resourcemanager.tasks.cpus


Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#local-execution
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_jobmanager.html#local-execution


On Mon, Dec 7, 2020 at 10:32 AM Yangze Guo  wrote:

> Hi, Rex,
>
> Can you share more logs for it. Did you see something like "The
> configuration option taskmanager.cpu.cores required for local
> execution is not set, setting it to" in your logs?
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Sat, Dec 5, 2020 at 6:53 PM David Anderson 
> wrote:
> >
> > taskmanager.cpu.cores is intended for internal use only -- you aren't
> meant to set this option. What happens if you leave it alone?
> >
> > Regards,
> > David
> >
> >
> > On Sat, Dec 5, 2020 at 8:04 AM Rex Fenley  wrote:
> >>
> >> We're running this in a local environment so that may be contributing
> to what we're seeing.
> >>
> >> On Fri, Dec 4, 2020 at 10:41 PM Rex Fenley  wrote:
> >>>
> >>> Hello,
> >>>
> >>> I'm tuning flink for parallelism right now and when I look at the
> JobManager I see
> >>> taskmanager.cpu.cores1.7976931348623157E308
> >>> Which looks like the maximum double number.
> >>>
> >>> We have 8 cpu cores, so we figured we'd bump to 16 for hyper
> threading. We have 37 operators so we rounded up and set 40 task slots.
> >>>
> >>> Here is our configuration
> >>>
> >>> "vmArgs": "-Xmx16g -Xms16g -XX:MaxDirectMemorySize=1207959552
> -XX:MaxMetaspaceSize=268435456 -Dlog.file=/tmp/flink.log
> -Dtaskmanager.memory.framework.off-heap.size=134217728b
> -Dtaskmanager.memory.network.max=1073741824b
> -Dtaskmanager.memory.network.min=1073741824b
> -Dtaskmanager.memory.framework.heap.size=134217728b
> -Dtaskmanager.memory.managed.size=6335076856b
> -Dtaskmanager.memory.task.heap.size=8160437768b
> -Dtaskmanager.memory.task.off-heap.size=0b
> -Dtaskmanager.numberOfTaskSlots=40 -Dtaskmanager.cpu.cores=16.0"
> >>>
> >>> We then tried with -Dtaskmanager.cpu.cores=7.0 and still ended up with
> that very odd value for cpu cores.
> >>>
> >>> How do we correctly adjust this?
> >>>
> >>> Thanks!
> >>> --
> >>>
> >>> Rex Fenley  |  Software Engineer - Mobile and Backend
> >>>
> >>>
> >>> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
> >>
> >>
> >>
> >> --
> >>
> >> Rex Fenley  |  Software Engineer - Mobile and Backend
> >>
> >>
> >> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
>


Re: taskmanager.cpu.cores 1.7976931348623157E308

2020-12-06 Thread Xintong Song
I forgot to mention that it is designed that task managers always have
`Double#MAX_VALUE` cpu cores in local execution.


And I think Yangze is right. The log "The configuration option
taskmanager.cpu.cores required for local execution is not set, setting it
to" can be misleading for users. Will fire an issue on that.


Thank you~

Xintong Song




On Mon, Dec 7, 2020 at 11:03 AM Xintong Song  wrote:

> Hi Rex,
>
> We're running this in a local environment so that may be contributing to
>> what we're seeing.
>>
> Just to double check on this. By `local environment`, you mean running
> flink without setting up a standalone cluster or submitting it to a
> K8s/Yarn cluster? (Typically executing from an IDE, running `flink run -t
> local`, or running your own application that calls
> `ExecutionEnvironment#execute`).
> If yes, then this is kind of expected.
>
> A couple of things that might help you understand this.
>
>- Running on a local environment means setting up the Flink cluster
>within the current process (the IDE process if executed from an IDE, the
>flink client process if using `flink run -t local`, or your own application
>process). That also means most of the resource configurations cannot take
>effect, because the resources of the JVM are already determined. Please
>refer to the memory configuration documents for options that still take
>effect in local execution. [1][2]
>- David is correct that `taskmanager.cpu.cores` is only intended for
>internal usages. I assume you learnt about this configuration by reading
>the source codes? If true, please also be aware that the JavaDoc
>of `TaskManagerOption#CPU_CORES` says "DO NOT USE THIS CONFIG OPTION", and
>it is also annotated with `ExcludeFromDocumentation` so that users do not
>learn this option from the documents.
>- Flink does not really control how many cpu cores it uses. However,
>when running on an external resource management system (K8s, Yarn, Mesos),
>it requires a certain amount of cpu resources for its containers/pods, and
>allows the external system to control its cpu usage. You can use the
>following configuration options to control how many cpu cores are requested
>in such cases.
>   - kubernetes.jobmanager.cpu
>   - kubernetes.taskmanager.cpu
>   - yarn.appmaster.vcores
>   - yarn.containers.vcores
>   - mesos.resourcemanager.tasks.cpus
>
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#local-execution
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_jobmanager.html#local-execution
>
>
> On Mon, Dec 7, 2020 at 10:32 AM Yangze Guo  wrote:
>
>> Hi, Rex,
>>
>> Can you share more logs for it. Did you see something like "The
>> configuration option taskmanager.cpu.cores required for local
>> execution is not set, setting it to" in your logs?
>>
>> Best,
>> Yangze Guo
>>
>> Best,
>> Yangze Guo
>>
>>
>> On Sat, Dec 5, 2020 at 6:53 PM David Anderson 
>> wrote:
>> >
>> > taskmanager.cpu.cores is intended for internal use only -- you aren't
>> meant to set this option. What happens if you leave it alone?
>> >
>> > Regards,
>> > David
>> >
>> >
>> > On Sat, Dec 5, 2020 at 8:04 AM Rex Fenley  wrote:
>> >>
>> >> We're running this in a local environment so that may be contributing
>> to what we're seeing.
>> >>
>> >> On Fri, Dec 4, 2020 at 10:41 PM Rex Fenley  wrote:
>> >>>
>> >>> Hello,
>> >>>
>> >>> I'm tuning flink for parallelism right now and when I look at the
>> JobManager I see
>> >>> taskmanager.cpu.cores1.7976931348623157E308
>> >>> Which looks like the maximum double number.
>> >>>
>> >>> We have 8 cpu cores, so we figured we'd bump to 16 for hyper
>> threading. We have 37 operators so we rounded up and set 40 task slots.
>> >>>
>> >>> Here is our configuration
>> >>>
>> >>> "vmArgs": "-Xmx16g -Xms16g -XX:MaxDirectMemorySize=1207959552
>> -XX:MaxMetaspaceSize=268435456 -Dlog.file=/tmp/flink.log
>> -Dtaskmanager.memory.framework.off-heap.size=134217728b
>> -Dtaskmanager.memory.network.max=1073741824b
>> -Dtaskmanager.memory.network.min=1073741824b
>> -Dtaskmanager.memory.framework.heap.size=134217728b
>> -Dtaskmanager.memory.managed.size=6335076856b
>> -Dtaskmanager.memory.task.heap.size=8160437768b
>> -Dtaskmanager.memory.task.off-heap.size=0b
>> -Dtaskmanager.numberOfTaskSlots=40 -Dtaskmanager.cpu.cores=16.0"
>> >>>
>> >>> We then tried with -Dtaskmanager.cpu.cores=7.0 and still ended up
>> with that very odd value for cpu cores.
>> >>>
>> >>> How do we correctly adjust this?
>> >>>
>> >>> Thanks!
>> >>> --
>> >>>
>> >>> Rex Fenley  |  Software Engineer - Mobile and Backend
>> >>>
>> >>>
>> >>> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
>> >>
>> >>
>> >>
>> >> --
>> >>
>> >> Rex Fenley  |  Software Engineer - Mobile and Backend
>> >>
>> >>
>> >> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
>>
>


Re: Flink 1.9Version State TTL parameter configuration it does not work

2020-12-06 Thread Yun Tang
Hi Yang,


Why your checkpoint is failed, was that checkpoint expired or failed due to 
error?

Could you paste the jstack result of what are RocksDB doing during checkpoint?

BTW, you could also use async-profiler [1] to view what the CPU operation of 
your actions, this tool could help to view what's RocksDB doing.

[1] https://github.com/jvm-profiling-tools/async-profiler

Best
Yun Tang


From: Andrey Zagrebin 
Sent: Friday, December 4, 2020 17:49
To: user 
Subject: Re: Flink 1.9Version State TTL parameter configuration it does not work

Hi Yang,

(redirecting this to user mailing list as this is not a dev question)

I am not sure why the state loading is stuck after enabling the compaction 
filter
but the background cleanup of RocksDB state with TTL will not work without 
activating the filter.
This happens on RocksDB opening in Flink, before any state is created and it 
starts to load.

Which version of Flink do you use?
Did you try to enable the filter without starting from the checkpoint, 
basically from the beginning of the job run?

Best,
Andrey

On Fri, Dec 4, 2020 at 11:27 AM Yang Peng 
mailto:yangpengklf...@gmail.com>> wrote:
Hi,I have some questions about state TTL to consult with everybody,the
statebackend is rocksdb  Below is my code:
-code begin-
private static final String EV_STATE_FLAG = "EV_EID_FLAG";

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(60))
.updateTtlOnCreateAndWrite()
.neverReturnExpired()
.cleanupInRocksdbCompactFilter(1000)
.build();
MapStateDescriptor eidMapStateDesc = new
MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO);
eidMapStateDesc.enableTimeToLive(ttlConfig);
eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);

-code end-

I  have set the TTL of the state is 60mins, But after 12 hours,  through
the monitor of rocksdb metric , we found that the sst file of
 CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
Later we found some information from the taskmanager log:*WARN
org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL compaction
filter for state < EV_EID_FLAG >: feature is disabled for the state backend*
After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
true*"  this parameter, the warn information disappeared, but  ater the
project completed some checkpoints ,The next   checkpoint will always fail, I
checked the jstack command and found that the fail checkpoint was stuck in
acquiring state ,disk io is idle;remove the  "
*state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
parameter,the project  will resume the checkpoint.  So I’m asking everyone
here. Is my usage method wrong?


Re: taskmanager.cpu.cores 1.7976931348623157E308

2020-12-06 Thread Xintong Song
FYI, I've opened FLINK-20503 for this.
https://issues.apache.org/jira/browse/FLINK-20503

Thank you~

Xintong Song



On Mon, Dec 7, 2020 at 11:10 AM Xintong Song  wrote:

> I forgot to mention that it is designed that task managers always have
> `Double#MAX_VALUE` cpu cores in local execution.
>
>
> And I think Yangze is right. The log "The configuration option
> taskmanager.cpu.cores required for local execution is not set, setting it
> to" can be misleading for users. Will fire an issue on that.
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
> On Mon, Dec 7, 2020 at 11:03 AM Xintong Song 
> wrote:
>
>> Hi Rex,
>>
>> We're running this in a local environment so that may be contributing to
>>> what we're seeing.
>>>
>> Just to double check on this. By `local environment`, you mean running
>> flink without setting up a standalone cluster or submitting it to a
>> K8s/Yarn cluster? (Typically executing from an IDE, running `flink run -t
>> local`, or running your own application that calls
>> `ExecutionEnvironment#execute`).
>> If yes, then this is kind of expected.
>>
>> A couple of things that might help you understand this.
>>
>>- Running on a local environment means setting up the Flink cluster
>>within the current process (the IDE process if executed from an IDE, the
>>flink client process if using `flink run -t local`, or your own 
>> application
>>process). That also means most of the resource configurations cannot take
>>effect, because the resources of the JVM are already determined. Please
>>refer to the memory configuration documents for options that still take
>>effect in local execution. [1][2]
>>- David is correct that `taskmanager.cpu.cores` is only intended for
>>internal usages. I assume you learnt about this configuration by reading
>>the source codes? If true, please also be aware that the JavaDoc
>>of `TaskManagerOption#CPU_CORES` says "DO NOT USE THIS CONFIG OPTION", and
>>it is also annotated with `ExcludeFromDocumentation` so that users do not
>>learn this option from the documents.
>>- Flink does not really control how many cpu cores it uses. However,
>>when running on an external resource management system (K8s, Yarn, Mesos),
>>it requires a certain amount of cpu resources for its containers/pods, and
>>allows the external system to control its cpu usage. You can use the
>>following configuration options to control how many cpu cores are 
>> requested
>>in such cases.
>>   - kubernetes.jobmanager.cpu
>>   - kubernetes.taskmanager.cpu
>>   - yarn.appmaster.vcores
>>   - yarn.containers.vcores
>>   - mesos.resourcemanager.tasks.cpus
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#local-execution
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_jobmanager.html#local-execution
>>
>>
>> On Mon, Dec 7, 2020 at 10:32 AM Yangze Guo  wrote:
>>
>>> Hi, Rex,
>>>
>>> Can you share more logs for it. Did you see something like "The
>>> configuration option taskmanager.cpu.cores required for local
>>> execution is not set, setting it to" in your logs?
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> Best,
>>> Yangze Guo
>>>
>>>
>>> On Sat, Dec 5, 2020 at 6:53 PM David Anderson 
>>> wrote:
>>> >
>>> > taskmanager.cpu.cores is intended for internal use only -- you aren't
>>> meant to set this option. What happens if you leave it alone?
>>> >
>>> > Regards,
>>> > David
>>> >
>>> >
>>> > On Sat, Dec 5, 2020 at 8:04 AM Rex Fenley  wrote:
>>> >>
>>> >> We're running this in a local environment so that may be contributing
>>> to what we're seeing.
>>> >>
>>> >> On Fri, Dec 4, 2020 at 10:41 PM Rex Fenley  wrote:
>>> >>>
>>> >>> Hello,
>>> >>>
>>> >>> I'm tuning flink for parallelism right now and when I look at the
>>> JobManager I see
>>> >>> taskmanager.cpu.cores1.7976931348623157E308
>>> >>> Which looks like the maximum double number.
>>> >>>
>>> >>> We have 8 cpu cores, so we figured we'd bump to 16 for hyper
>>> threading. We have 37 operators so we rounded up and set 40 task slots.
>>> >>>
>>> >>> Here is our configuration
>>> >>>
>>> >>> "vmArgs": "-Xmx16g -Xms16g -XX:MaxDirectMemorySize=1207959552
>>> -XX:MaxMetaspaceSize=268435456 -Dlog.file=/tmp/flink.log
>>> -Dtaskmanager.memory.framework.off-heap.size=134217728b
>>> -Dtaskmanager.memory.network.max=1073741824b
>>> -Dtaskmanager.memory.network.min=1073741824b
>>> -Dtaskmanager.memory.framework.heap.size=134217728b
>>> -Dtaskmanager.memory.managed.size=6335076856b
>>> -Dtaskmanager.memory.task.heap.size=8160437768b
>>> -Dtaskmanager.memory.task.off-heap.size=0b
>>> -Dtaskmanager.numberOfTaskSlots=40 -Dtaskmanager.cpu.cores=16.0"
>>> >>>
>>> >>> We then tried with -Dtaskmanager.cpu.cores=7.0 and still ended up
>>> with that very odd value for cpu cores.
>>> >>>
>>> >>> How do we correctly adjust this?
>>> >>>
>>> >>> Thanks!
>>> >>>

Re: How to filter kafka stream from multiples source csv files.

2020-12-06 Thread Guowei Ma
Hi, Amr
What sink do you use? I think it means that the sink does not support the
"upsert".
If you use Kafka as a sink[1] I think you could choose to try it after 1.12.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/upsert-kafka.html
Best,
Guowei


On Mon, Dec 7, 2020 at 9:19 AM amrahmed  wrote:

> Hello,
>
> I am newbie in Flink, I am stuck and looking for help,  I want to join
> Streams A, B, C, D from csv source files, some of the streams update
> frequently and I have another stream high throughput from Kafka K and I
> need
> to filter K stream from [A,B,C,D].  I tried using Flink table API, Union
> all
> streams [A,B,C,D] and then execute query SELECT * FROM K k WHERE k.key Not
> IN (SELECT Key FROM ALL_UNION_LIST) but stuck in Error
> AppendStreamTableSink
> doesn't consuming update and delete. Only select query without join get
> executed and if you can let me know also How I can implement this solution
> in DataStream API. All the streams have common key.
>
> Best,
> Amr
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


many questions (kafka table KafkaDeserilizationSchema support, recommended enrichment approach, prevent JDBC temporal dimension table from N +1 queries, etc.

2020-12-06 Thread Marco Villalobos
1. How can I create a kafka table that can use headers and map them to columns? 
Currently, I am using KafkaDeserilizationSchema to create a DataStream, and 
then I convert that DataStream into a Table. I would like to use a more direct 
approach.

2. What is the recommended way to enrich a kafka table or data-stream with 
data-from postgres?
a) kafka table and JDBC temporal dimension table with temporal join and 
lookup cache setup
b) data-stream with async io which connects via JDBC.  (note that 
asycio does not support Keyed State cache)
c) data-stream rich function or process function that uses Keyed State.

3. When using a kafka told and JDBC temporal dimension table how do I prevent N 
+ 1 queries per join row?

When I issued a query such as this:

SELECT k.name, t1.id, t2.metadata, SUM(k.cost)
FROM kafka_table AS k
JOIN jdbc_table_one AS t1 ON k.t1_id = t1.ID
LEFT JOIN jdbc_table_two FOR SYSTEM_TIME AS OF k.proc_time AS t2 ON 
t1.t2_id = t2.id AND t2.name = k.name
GROUP BY TUMBLE (k.proc_time, INTERVAL '3' MINUTE), k.name, t1.id, 
t2.metadata

My PostgreSQL sql logs show that jdbc_table_two has a query per each 
distinct t2.name.

In a real production system, that would be 200,000 queries!

4. When using a JDBC temporal dimension table does Flink retrieve the from the 
database asynchronously , or is it possible for Flink to multiple join rows at 
time with a IN (subquery) syntax?





Re: Re: Duplicate operators generated by plan

2020-12-06 Thread Yun Gao
Hi Rex,

   I tried a similar example[1] but did not reproduce the issue, which version 
of Flink you are using now ?

Best,
 Yun


[1] The example code:
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.setRestartStrategy(RestartStrategies.noRestart());
bsEnv.setParallelism(1);

EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);

DataStream> source = bsEnv.addSource(new 
RichParallelSourceFunction>() {

   @Override
   public void run(SourceContext> sourceContext) throws 
Exception {
  sourceContext.collect(new Tuple2<>(0, "test"));
   }

   @Override
   public void cancel() {

   }
});

Table table = bsTableEnv.fromDataStream(
  source, $("id"), $("name"));
Table table2 = table.select(call("abs", $("id")), $("name"))
  .as("new_id", "new_name");

bsTableEnv.createTemporaryView("view", table2);
Table handled = bsTableEnv.sqlQuery("select new_id, FIRST_VALUE(new_name) as 
new_name from view group by new_id");

Table ret = table.join(handled)
  .where($("id").isEqual($("new_id")))
  .select($("id"), $("name"), $("new_name"));
System.out.println(ret.explain());

DataStream> row = bsTableEnv.toRetractStream(ret, 
Row.class);
row.addSink(new SinkFunction>() {
   @Override
   public void invoke(Tuple2 value, Context context) throws 
Exception {

   }
});

System.out.println(bsEnv.getStreamGraph().getStreamingPlanAsJSON());
--
Sender:Rex Fenley
Date:2020/12/04 14:18:21
Recipient:Yun Gao
Cc:user; Brad Davis
Theme:Re: Duplicate operators generated by plan

cc Brad

On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley  wrote:

Yes, the same exact input operators go into both joins.

The chunk of code for the joins from the specific part of the plan I showed is 
as follows. The orgUsersTable is later filtered into one table and aggregated 
and another table and aggregated. The planner seems to duplicate orgUsersTable 
into 2 operators even though I create only 1 of it.

// in the main function
val orgUsersTable = splatRoles(
 this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
 OrgUsersRoleSplatPrefix,
 this.tableEnv
 )

// helper function
 def splatRoles(
 table: Table,
 columnPrefix: String,
 tableEnv: TableEnvironment
 ): Table = {
 // Flink does not have a contains function so we have to splat out our role 
array's contents
 // and join it to the originating table.
 val func = new SplatRolesFunc()
 val splatted = table
 .map(func($"roles", $"id"))
 .as(
 "id_splatted",
 s"${columnPrefix}_is_admin",
 s"${columnPrefix}_is_teacher",
 s"${columnPrefix}_is_student",
 s"${columnPrefix}_is_parent"
 )
 // FIRST_VALUE is only available in SQL - so this is SQL.
 // Rationale: We have to group by after a map to preserve the pk inference, 
otherwise flink will
 // toss it out and all future joins will not have a unique key.
 tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
 val grouped = tableEnv.sqlQuery(s"""
 SELECT
 id_splatted,
 FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
 FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
 FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
 FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
 FROM ${columnPrefix}_splatted
 GROUP BY id_splatted
 """)
 return table
 .join(grouped, $"id" === $"id_splatted")
 .dropColumns($"id_splatted")
 .renameColumns($"roles".as(s"${columnPrefix}_roles"))
 }

@FunctionHint(
 output = new DataTypeHint(
 "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student 
BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
 )
)
class SplatRolesFunc extends ScalarFunction {
 def eval(roles: Array[String], id: java.lang.Long): Row = {
 val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
 val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
 val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
 val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
 return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
 }
 override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
 Types.ROW(
 Types.LONG,
 Types.BOOLEAN,
 Types.BOOLEAN,
 Types.BOOLEAN,
 Types.BOOLEAN
 )
}


On Thu, Dec 3, 2020 at 7:49 PM Yun Gao  wrote:

Hi Rex,

Could  you also attach one example for these sql / table ? And one possible 
issue to confirm is that does the operators with the same names also have the 
same inputs ?

Best,
Yun

 --Original Mail --
Sender:Rex Fenley 
Send Date:Fri Dec 4 02:55:41 2020
Recipients:user 
Subject:Duplicate operators generated by plan

Hello,

I'm running into an issue where my execution plan is creating the same exact 
join operator multiple times simply because the subsequent oper

Re: How to parse list values in csv file

2020-12-06 Thread Yun Gao
Glad to hear that you solved this issue!


Best,
 Yun--
Sender:narasimha
Date:2020/12/06 21:35:33
Recipient:Yun Gao
Cc:user
Theme:Re: How to parse list values in csv file

thanks for you email.

Translated csv to JSON, read it as a plain text file and then processed to 
objects. 
It solved my use case. 


On Fri, Dec 4, 2020 at 12:24 PM Yun Gao  wrote:


Hi,

The CSV only supports the types listed in [1] and must use the types in 
this list, thus for other types some kind of workaround is needed, like first 
parsed as string and parsed again later in the program. 

Best,
Yun



[1] 
https://github.com/apache/flink/blob/e10e548feb2bedf54c3863bbd49ed4f9140546cf/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java#L287



 --Original Mail --
Sender:narasimha 
Send Date:Fri Dec 4 00:45:53 2020
Recipients:user 
Subject:How to parse list values in csv file

Hi,

Getting below error when trying to read a csv file, one of the field is list 
tupe 

Can someone help if fixing the issue 

jobmanager_1   | Caused by: java.lang.IllegalArgumentException: The type 
'java.util.List' is not supported for the CSV input format.
jobmanager_1   | at 
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:289)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:67)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:83)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
jobmanager_1   | at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:87)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]

-- 
A.Narasimha Swamy


-- 
A.Narasimha Swamy



Re: test (harness and minicluster)

2020-12-06 Thread Martin Frank Hansen
Hi Till,
Thanks for your answer!
I will try the approach you suggested, hope I can make it work.

Best regards
martin

Den ons. 2. dec. 2020 kl. 17.03 skrev Till Rohrmann :

> Hi Martin,
>
> In general, Flink's MiniCluster should be able to run every complete Flink
> JobGraph. However, from what I read you are looking for a test harness for
> a processWindowFunction so that you can test this function in a more unit
> test style, right? What you can do is to use the
> OneInputStreamOperatorTestHarness and initialize it with a WindowOperator
> where you pass the ProcessWindowFunction to. That way you should be able to
> test the ProcessWindowFunction. Please also take a look
> at org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest
> [1] for some examples how it could be done.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
>
> Cheers,
> Till
>
> On Wed, Dec 2, 2020 at 12:04 PM Martin Frank Hansen <
> m...@berlingskemedia.dk> wrote:
>
>> Hi,
>>
>> I am trying to make a test-suite for our flink cluster using harness and
>> minicluster. As we are using processWindowFunctions in our pipeline we need
>> some good ways of validating these functions. To my surprise
>> processWindowFunctions are neither supported by test-harness or minicluster
>> setups, so does anyone know if it will be supported in the future? (Or if
>> it is supported how should I use it?)
>>
>> Furthermore does anyone have some good ideas for a test-setup for
>> processWindowFunctions?
>>
>> best regards
>>
>>
>> Martin Frank Hansen
>>
>> Data Engineer
>>
>>

-- 

Martin Frank Hansen

Data Engineer
Digital Service
M: +45 25 57 14 18
E: m...@berlingskemedia.dk

Pilestræde 34 | DK-1147 København K | T: +45 33 75 75 75 |
berlingskemedia.dk