Is there a way to avoid submit hive-udf's resources when we submit a job?

2020-09-18 Thread Husky Zeng
When we submit a job which use udf of hive , the job will dependent on udf's
jars and configuration files.

We have already store udf's jars and configuration files in hive metadata
store,so we excpet that flink could get those files hdfs paths by
hive-connector,and get those files in hdfs by paths when it running.

In this code, it seemed we have already get those udf resources's path in
FunctionInfo, but did't use it.

  
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java#L80

We submit udf's  jars and configuration files with job to yarn by client now
,and try to find a way to avoid submit udf's resources when we submit a
job.Is it possible?  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Table SQL and writing nested Avro files

2020-09-18 Thread Dan Hill
Hi!

I want to join two tables and write the results to Avro where the left and
right rows are nested in the avro output.  Is it possible to do this with
the SQL interface?

Thanks!
- Dan

 CREATE TABLE `flat_avro` (
   `left` ROW,
   `right` ROW
) WITH (
   'connector' = 'filesystem',
   'path' = 's3p://blah/blah',
   'format' = 'avro'
);

INSERT INTO `flat_avro`
SELECT left.*, right.*
FROM `left`
LEFT JOIN `right`
ON `left`.`id` = `right`.`id`
);


On Flink job re-submission, observing error, "the rpc invocation size exceeds the maximum akka framesize"

2020-09-18 Thread shravan
Hi,

This is in continuation to an already raised request, (had replied to the
same thread but couldn't get any response yet, hence posting a new request)
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-rpc-invocation-size-exceeds-the-maximum-akka-framesize-when-the-job-was-re-submitted-td37507.html

We are observing the same error as well with regard to "The rpc invocation
size exceeds the maximum akka framesize.", and have follow-up questions on
the same.

Why we face this issue, how can we know the expected size for which it is
failing? The error message does not indicate that. Does the operator state
have any impact on the expected Akka frame size? What is the impact of
increasing it?

Awaiting a response.

Regards,
Shravan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Automatically restore from checkpoint

2020-09-18 Thread David Anderson
If your job crashes, Flink will automatically restart from the latest
checkpoint, without any manual intervention. JobManager HA is only needed
for automatic recovery after the failure of the Job Manager.

You only need externalized checkpoints and "-s :checkpointPath" if you want
to use checkpoints to manually restart a job after manually cancelling or
stopping it. Also, it's not necessary that you have read access to the
checkpoints, but the job manager and task managers must be able to read
(and write) them.

Regards,
David

On Fri, Sep 18, 2020 at 6:23 AM Arpith P  wrote:

> Hi,
>
> I'm running Flink job in distributed mode deployed in Yarn; I've enabled
> externalized checkpoint to save in Hdfs, but I don't have access to read
> checkpoints folder. To restart Flink job from the last saved checkpoint is
> it possible to do without passing "-s :checkpointPath". If this is not
> possible how can I restore states after the job gets crashed. If enabling
> JobManager HA would help me in anyway.
>
> Thanks,
> Arpith
>


valuestate(descriptor) using a custom caseclass

2020-09-18 Thread Martin Frank Hansen
Hi,

I am trying to calculate some different metrics using the state backend to
control if events have been seen before. I am using the
ProcessWindowFunction, but nothing gets through, it is as if the
.process-function is ignored. Is it not possible to store a custom case
class as ValueState? Or do I need to implement a serializer for the
case-class? Or ...

Any help is much appreciated.

My code:

class MetricsProcessFunction extends
ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]()
 {

  var pageviewMetricState: ValueState[PageviewBasedMetrics] = _

  override def open(parameters: Configuration): Unit = {
pageviewMetricState = this.getRuntimeContext.getState(new
ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics",
classOf[PageviewBasedMetrics]))
  }

  override def process(key: PageviewBasedMetricsGroup, context:
Context, elements: Iterable[Event], out:
Collector[PageviewBasedMetrics]): Unit = {

if(elements.head.event.getOrElse("") == "page_view"){
  val tmp = pwbm.pageviews + 1
  val tmpPBM = pwbm.copy(pageviews = tmp,
startTime =
Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant,
endTime =
Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant)

  pageviewMetricState.update(SnowplowPickler.write(tmpPBM))
}
out.collect(SnowplowPickler.read(pageviewMetricState.value()))
  }
}


object AggregateMultipleMetrics  {


  def main(args: Array[String]) {
val env: StreamEnvironment =
StreamEnvironment.getStreamEnvironment("AggregateMetrics")
val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv
val appProps: Properties = env.appProps

val inputStream: String = appProps.getProperty("input_topic")
val outputTopic1Min: String = appProps.getProperty("output_topic_1_min")
val outputSerializer1Min:
KafkaSerializationSchemaPageviewBasedMetrics = new
KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min)
val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new
FlinkKafkaKeyPartitioner[PageviewBasedMetrics]()

val snowplowEventSource = new
SnowplowEventSource().getStream(inputStream, appProps, executionEnv)

val target1Min: SinkFunction[PageviewBasedMetrics] = new
KafkaSink[PageviewBasedMetrics,
KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction(
  outputTopic1Min,
  outputSerializer1Min,
  partitioner,
  appProps)

mainDataStream
  .keyBy[PageviewBasedMetricsGroup]((e: Event) =>
Util.getPageviewBasedMetricsGroup(e))
  .timeWindow(Time.minutes(1))
  .process(new MetricsProcessFunction)
  .addSink(target1Min)

// execute program
executionEnv.execute("Count pageview-based metrics")

  }
}





-- 

Martin Frank Hansen

Data Engineer
Digital Service
M: +45 25 57 14 18
E: m...@berlingskemedia.dk


Re: On Flink job re-submission, observing error, "the rpc invocation size exceeds the maximum akka framesize"

2020-09-18 Thread Chesnay Schepler

how can we know the expected size for which it is failing?


If you did not configure akka.framesize yourself then it is set to the 
documented default value. See the configuration documentation for the 
release you are using.


> Does the operator state have any impact on the expected Akka frame size?

If you are using the MemoryStateBackend, yes. Otherwise, the impact of 
using any form of state on the framesize should be negligible.


> What is the impact of increasing it?

Increase in memory consumption, probably around 1-2x the increased 
amount. (So, increase it my 1mb, memory usages goes up by 1-2 mb)


On 9/18/2020 9:50 AM, shravan wrote:

Hi,

This is in continuation to an already raised request, (had replied to the
same thread but couldn't get any response yet, hence posting a new request)
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-rpc-invocation-size-exceeds-the-maximum-akka-framesize-when-the-job-was-re-submitted-td37507.html

We are observing the same error as well with regard to "The rpc invocation
size exceeds the maximum akka framesize.", and have follow-up questions on
the same.

Why we face this issue, how can we know the expected size for which it is
failing? The error message does not indicate that. Does the operator state
have any impact on the expected Akka frame size? What is the impact of
increasing it?

Awaiting a response.

Regards,
Shravan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-18 Thread Yun Gao
Great! Very thanks @ZhuZhu for driving this and thanks for all contributed to 
the release!

Best,
 Yun


 --Original Mail --
Sender:Jingsong Li 
Send Date:Thu Sep 17 13:31:41 2020
Recipients:user-zh 
CC:dev , user , Apache Announce 
List 
Subject:Re: [ANNOUNCE] Apache Flink 1.11.2 released

Thanks ZhuZhu for driving the release.

Best,
Jingsong
On Thu, Sep 17, 2020 at 1:29 PM Zhu Zhu  wrote:
The Apache Flink community is very happy to announce the release of Apache
Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
series.

Apache Flink(r) is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/09/17/release-1.11.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348575

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Thanks,
Zhu


-- 
Best, Jingsong Lee

Re: Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-18 Thread Guowei Ma
Thanks Zhuzhu for driving the release!!!

Best,
Guowei


On Fri, Sep 18, 2020 at 5:10 PM Yun Gao  wrote:

> Great! Very thanks @ZhuZhu for driving this and thanks for all contributed
> to the release!
>
> Best,
>  Yun
>
> --Original Mail --
> *Sender:*Jingsong Li 
> *Send Date:*Thu Sep 17 13:31:41 2020
> *Recipients:*user-zh 
> *CC:*dev , user , Apache
> Announce List 
> *Subject:*Re: [ANNOUNCE] Apache Flink 1.11.2 released
>
>> Thanks ZhuZhu for driving the release.
>>
>> Best,
>> Jingsong
>>
>> On Thu, Sep 17, 2020 at 1:29 PM Zhu Zhu  wrote:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache
>>> Flink 1.11.2, which is the second bugfix release for the Apache Flink
>>> 1.11
>>> series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data
>>> streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements
>>> for this bugfix release:
>>> https://flink.apache.org/news/2020/09/17/release-1.11.2.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348575
>>>
>>> We would like to thank all contributors of the Apache Flink community who
>>> made this release possible!
>>>
>>> Thanks,
>>> Zhu
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>


Error on deploying Flink docker image with Kubernetes (minikube) and automatically launch a stream WordCount job.

2020-09-18 Thread Felipe Gutierrez
Hi community,

I am trying to deploy the default WordCount stream application on
minikube using the official documentation at [1]. I am using minikube
v1.13.0 on Ubuntu 18.04 and Kubernetes v1.19.0 on Docker 19.03.8. I
could sucessfully start 1 job manager and 3 task managers using the
yaml files flink-configuration-configmap.yaml,
jobmanager-service.yaml, jobmanager-rest-service.yaml,
jobmanager-session-deployment.yaml, and
taskmanager-session-deployment.yaml (all available on the Apendix of
this link [1]).

Then I would like to start the word-count stream job available on the
flink jar image [2], which I believe is available since it is built
inside the default flink jar distribution. What I understood that I
have to do is to create the objects based on the files
jobmanager-job.yaml and taskmanager-job-deployment.yaml (also
available on the link [1]). And, I think that I have to replace this
line below on the object jobmanager-job.yaml
(spec.template.spec.containers.name[jobmanager]):

args: ["standalone-job", "--job-classname",
"org.apache.flink.streaming.examples.wordcount.WordCount"]

Is this correct? I am not sure if this is my entire error. I am
getting the message "Could not find the provided job class
(org.apache.flink.streaming.examples.wordcount.WordCount) in the user
lib directory (/opt/flink/usrlib)". As far as I know
"/opt/flink/usrlib" is the default directory. I am not sure if I have
to change the property: path /host/path/to/job/artifacts. This is my
log message of the pod error.
Do you guys have any idea of what I am missing in my configuration?

Thanks, Felipe

$ kubectl get pods
NAME READY   STATUS RESTARTS   AGE
flink-jobmanager-ftgg9   0/1 CrashLoopBackOff   3  83s
$ kubectl logs flink-jobmanager-ftgg9
Starting Job Manager
sed: couldn't open temporary file /opt/flink/conf/sedA699Jt: Read-only
file system
sed: couldn't open temporary file /opt/flink/conf/sedvZhs0w: Read-only
file system
/docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create
/opt/flink/conf/flink-conf.yaml: Permission denied
/docker-entrypoint.sh: 91: /docker-entrypoint.sh: cannot create
/opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
Starting standalonejob as a console application on host flink-jobmanager-ftgg9.
2020-09-18 09:15:58,801 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -

2020-09-18 09:15:58,804 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Preconfiguration:
2020-09-18 09:15:58,804 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -


JM_RESOURCE_PARAMS extraction logs:
jvm_params: -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456
logs: INFO  [] - Loading configuration property:
jobmanager.rpc.address, flink-jobmanager
INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 4
INFO  [] - Loading configuration property: blob.server.port, 6124
INFO  [] - Loading configuration property: jobmanager.rpc.port, 6123
INFO  [] - Loading configuration property: taskmanager.rpc.port, 6122
INFO  [] - Loading configuration property: queryable-state.proxy.ports, 6125
INFO  [] - Loading configuration property: jobmanager.memory.process.size, 1600m
INFO  [] - Loading configuration property:
taskmanager.memory.process.size, 1728m
INFO  [] - Loading configuration property: parallelism.default, 2
INFO  [] - The derived from fraction jvm overhead memory (160.000mb
(167772162 bytes)) is less than its min value 192.000mb (201326592
bytes), min value will be used instead
INFO  [] - Final Master Memory configuration:
INFO  [] -   Total Process Memory: 1.563gb (1677721600 bytes)
INFO  [] - Total Flink Memory: 1.125gb (1207959552 bytes)
INFO  [] -   JVM Heap: 1024.000mb (1073741824 bytes)
INFO  [] -   Off-heap: 128.000mb (134217728 bytes)
INFO  [] - JVM Metaspace:  256.000mb (268435456 bytes)
INFO  [] - JVM Overhead:   192.000mb (201326592 bytes)

2020-09-18 09:15:58,805 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -

2020-09-18 09:15:58,805 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Starting StandaloneApplicationClusterEntryPoint (Version: 1.11.0,
Scala: 2.11, Rev:d04872d, Date:2020-06-29T16:13:14+02:00)
2020-09-18 09:15:58,805 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  OS
current user: flink
2020-09-18 09:15:58,805 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Current Hadoop/Kerberos user: 
2020-09-18 09:15:58,805 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.262-b10
2020-09-18 09:15:58,806 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Maximum heap size: 989 MiBy

Re: On Flink job re-submission, observing error, "the rpc invocation size exceeds the maximum akka framesize"

2020-09-18 Thread shravan
Thanks for the quick response. 

I might have wrongly phrased one of the questions.

/"> how can we know the expected size for which it is failing?

If you did not configure akka.framesize yourself then it is set to the
documented default value. See the configuration documentation for the
release you are using."/

We found out the default size from the configuration but we are unable to
identify the size for which it fails. Could you help out on this?

Awaiting a response.

Regards,
Shravan




Chesnay Schepler wrote
>> how can we know the expected size for which it is failing?
> 
> If you did not configure akka.framesize yourself then it is set to the 
> documented default value. See the configuration documentation for the 
> release you are using.
> 
>  > Does the operator state have any impact on the expected Akka frame
> size?
> 
> If you are using the MemoryStateBackend, yes. Otherwise, the impact of 
> using any form of state on the framesize should be negligible.
> 
>  > What is the impact of increasing it?
> 
> Increase in memory consumption, probably around 1-2x the increased 
> amount. (So, increase it my 1mb, memory usages goes up by 1-2 mb)
> 
> On 9/18/2020 9:50 AM, shravan wrote:
>> Hi,
>>
>> This is in continuation to an already raised request, (had replied to the
>> same thread but couldn't get any response yet, hence posting a new
>> request)
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-rpc-invocation-size-exceeds-the-maximum-akka-framesize-when-the-job-was-re-submitted-td37507.html
>>
>> We are observing the same error as well with regard to "The rpc
>> invocation
>> size exceeds the maximum akka framesize.", and have follow-up questions
>> on
>> the same.
>>
>> Why we face this issue, how can we know the expected size for which it is
>> failing? The error message does not indicate that. Does the operator
>> state
>> have any impact on the expected Akka frame size? What is the impact of
>> increasing it?
>>
>> Awaiting a response.
>>
>> Regards,
>> Shravan
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: On Flink job re-submission, observing error, "the rpc invocation size exceeds the maximum akka framesize"

2020-09-18 Thread Chesnay Schepler
If you use 1.10.0 or above the framesize for which it failed is part of 
the exception message, see FLINK-14618.


If you are using older version, then I'm afraid there is no way to tell.

On 9/18/2020 12:11 PM, shravan wrote:

Thanks for the quick response.

I might have wrongly phrased one of the questions.

/"> how can we know the expected size for which it is failing?

If you did not configure akka.framesize yourself then it is set to the
documented default value. See the configuration documentation for the
release you are using."/

We found out the default size from the configuration but we are unable to
identify the size for which it fails. Could you help out on this?

Awaiting a response.

Regards,
Shravan




Chesnay Schepler wrote

how can we know the expected size for which it is failing?

If you did not configure akka.framesize yourself then it is set to the
documented default value. See the configuration documentation for the
release you are using.

  > Does the operator state have any impact on the expected Akka frame
size?

If you are using the MemoryStateBackend, yes. Otherwise, the impact of
using any form of state on the framesize should be negligible.

  > What is the impact of increasing it?

Increase in memory consumption, probably around 1-2x the increased
amount. (So, increase it my 1mb, memory usages goes up by 1-2 mb)

On 9/18/2020 9:50 AM, shravan wrote:

Hi,

This is in continuation to an already raised request, (had replied to the
same thread but couldn't get any response yet, hence posting a new
request)
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-rpc-invocation-size-exceeds-the-maximum-akka-framesize-when-the-job-was-re-submitted-td37507.html

We are observing the same error as well with regard to "The rpc
invocation
size exceeds the maximum akka framesize.", and have follow-up questions
on
the same.

Why we face this issue, how can we know the expected size for which it is
failing? The error message does not indicate that. Does the operator
state
have any impact on the expected Akka frame size? What is the impact of
increasing it?

Awaiting a response.

Regards,
Shravan



--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: valuestate(descriptor) using a custom caseclass

2020-09-18 Thread Dawid Wysakowicz
Hi Martin,

I am not sure what is the exact problem. Is it that the ProcessFunction
is not invoked or is the problem with values in your state?

As for the question of the case class and ValueState. The best way to do
it, is to provide the TypeInformation explicitly. If you do not provide
the TypeInformation, the ValueStateDescriptor will use java type
extraction stack, which can not handle case classes well and if I am not
mistaken they will end up serialized with a generic serializer using Kryo.

You can create a proper TypeInformation for a scala case class like this:

   import org.apache.flink.streaming.api.scala._ // important to import
for the implicit scala type extraction

    val caseClassTypeInfo =
implicitly[TypeInformation[PageviewBasedMetrics]]
    this.pageviewMetricState = this.getRuntimeContext
  .getState(new
ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics",
caseClassTypeInfo))

If the problem is that the function is not being invoked, I'd recommend
checking what TimeCharacteristic you are using (I don't necessarily know
what is going on in the StreamEnvironment.getStreamEnvironment). If you
use the ProcessingTime the results will be emitted only after a minute
passes. (You are using TimeWindow.minutes(1))

Hope that helps.

Best,

Dawid

On 18/09/2020 10:42, Martin Frank Hansen wrote:
> Hi,
>
> I am trying to calculate some different metrics using the state
> backend to control if events have been seen before. I am using the
> ProcessWindowFunction, but nothing gets through, it is as if the
> .process-function is ignored. Is it not possible to store a custom
> case class as ValueState? Or do I need to implement a serializer for
> the case-class? Or ...
>
> Any help is much appreciated.
>
> My code: 
>
> class MetricsProcessFunction extends 
> ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]()
>   {
>
>   var pageviewMetricState: ValueState[PageviewBasedMetrics] = _
>  override def open(parameters: Configuration): Unit = {
> pageviewMetricState = this.getRuntimeContext.getState(new 
> ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", 
> classOf[PageviewBasedMetrics]))}
>
>   override def process(key: PageviewBasedMetricsGroup, context: Context, 
> elements: Iterable[Event], out: Collector[PageviewBasedMetrics]): Unit = {
>
> if(elements.head.event.getOrElse("") == "page_view"){
>   val tmp = pwbm.pageviews + 1val tmpPBM = pwbm.copy(pageviews = tmp,
> startTime = 
> Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant,
> endTime = 
> Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant)
>
>   pageviewMetricState.update(SnowplowPickler.write(tmpPBM))
> }out.collect(SnowplowPickler.read(pageviewMetricState.value()))
>   }
> }
>
> object AggregateMultipleMetrics {
>
>
>   def main(args: Array[String]) {
> val env: StreamEnvironment = 
> StreamEnvironment.getStreamEnvironment("AggregateMetrics")
> val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv
> val appProps: Properties = env.appProps
>
> val inputStream: String = appProps.getProperty("input_topic")
> val outputTopic1Min: String = appProps.getProperty("output_topic_1_min")
> val outputSerializer1Min: KafkaSerializationSchemaPageviewBasedMetrics = 
> new KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min)
> val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new 
> FlinkKafkaKeyPartitioner[PageviewBasedMetrics]()
>
> val snowplowEventSource = new 
> SnowplowEventSource().getStream(inputStream, appProps, executionEnv)
>
> val target1Min: SinkFunction[PageviewBasedMetrics] = new 
> KafkaSink[PageviewBasedMetrics, 
> KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction(
>   outputTopic1Min,
>   outputSerializer1Min,
>   partitioner,
>   appProps)
>
> mainDataStream .keyBy[PageviewBasedMetricsGroup]((e: Event) => 
> Util.getPageviewBasedMetricsGroup(e))
>   .timeWindow(Time.minutes(1)).process(new MetricsProcessFunction)
>   .addSink(target1Min)
>
> // execute program executionEnv.execute("Count pageview-based metrics")
>
>   }
> }
>
>
>
>
> -- 
>
> Martin Frank Hansen
>
> Data Engineer
> Digital Service
> M: +45 25 57 14 18
> E: m...@berlingskemedia.dk 
>
>


signature.asc
Description: OpenPGP digital signature


Re: On Flink job re-submission, observing error, "the rpc invocation size exceeds the maximum akka framesize"

2020-09-18 Thread shravan
Thanks again for the quick response.

In that case, could you tell me what are the possible factors that warrant a
framesize increase? I see the official documentation and it simply states
"If Flink fails because messages exceed this limit, then you should increase
it", which isn't very convincing.

Regards,
M S Shravan
Chesnay Schepler wrote
> If you use 1.10.0 or above the framesize for which it failed is part of 
> the exception message, see FLINK-14618.
> 
> If you are using older version, then I'm afraid there is no way to tell.
> 
> On 9/18/2020 12:11 PM, shravan wrote:
>> Thanks for the quick response.
>>
>> I might have wrongly phrased one of the questions.
>>
>> /"> how can we know the expected size for which it is failing?
>>
>> If you did not configure akka.framesize yourself then it is set to the
>> documented default value. See the configuration documentation for the
>> release you are using."/
>>
>> We found out the default size from the configuration but we are unable to
>> identify the size for which it fails. Could you help out on this?
>>
>> Awaiting a response.
>>
>> Regards,
>> Shravan
>>
>>
>>
>>
>> Chesnay Schepler wrote
 how can we know the expected size for which it is failing?
>>> If you did not configure akka.framesize yourself then it is set to the
>>> documented default value. See the configuration documentation for the
>>> release you are using.
>>>
>>>   > Does the operator state have any impact on the expected Akka frame
>>> size?
>>>
>>> If you are using the MemoryStateBackend, yes. Otherwise, the impact of
>>> using any form of state on the framesize should be negligible.
>>>
>>>   > What is the impact of increasing it?
>>>
>>> Increase in memory consumption, probably around 1-2x the increased
>>> amount. (So, increase it my 1mb, memory usages goes up by 1-2 mb)
>>>
>>> On 9/18/2020 9:50 AM, shravan wrote:
 Hi,

 This is in continuation to an already raised request, (had replied to
 the
 same thread but couldn't get any response yet, hence posting a new
 request)
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-rpc-invocation-size-exceeds-the-maximum-akka-framesize-when-the-job-was-re-submitted-td37507.html

 We are observing the same error as well with regard to "The rpc
 invocation
 size exceeds the maximum akka framesize.", and have follow-up questions
 on
 the same.

 Why we face this issue, how can we know the expected size for which it
 is
 failing? The error message does not indicate that. Does the operator
 state
 have any impact on the expected Akka frame size? What is the impact of
 increasing it?

 Awaiting a response.

 Regards,
 Shravan



 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-18 Thread Aljoscha Krettek

On 14.09.20 02:20, Steven Wu wrote:

Right now, we use UnionState to store the `nextCheckpointId` in the Iceberg
sink use case, because we can't retrieve the checkpointId from
the FunctionInitializationContext during the restore case. But we can move
away from it if the restore context provides the checkpointId.


Is the code for this available in the open source? I checked the Iceberg 
sink that's available in Iceberg proper and the one in Netflix 
Skunkworks: 
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L228


Both of them are only using operator state, not the union variant.

Best,
Aljoscha


Re: On Flink job re-submission, observing error, "the rpc invocation size exceeds the maximum akka framesize"

2020-09-18 Thread Chesnay Schepler

There are quite a few reason why the framesize could be exceeded.

The most common one we see is due to the parallelism being so high that 
tasks can't be deployed in the first place. When a task is deployed the 
RPC payload also contains information about all downstream tasks this 
task sends data to; when those are a few thousand (usually in case of a 
shuffle) the amount of data can quickly add up.


Other causes could be tasks having thousands of accumulators or there 
being too many metrics on one TaskExecutor (which would result in 
metrics not being queryable from the WebUI/REST API).


Overall though, the documentation is pretty accurate. The framesize 
being exceeded is usually not because the user did anything wrong, but 
just operating at a scale that the default framesize cannot support. The 
only solution to that is to increase the framesize.


On 9/18/2020 12:34 PM, shravan wrote:

Thanks again for the quick response.

In that case, could you tell me what are the possible factors that warrant a
framesize increase? I see the official documentation and it simply states
"If Flink fails because messages exceed this limit, then you should increase
it", which isn't very convincing.

Regards,
M S Shravan
Chesnay Schepler wrote

If you use 1.10.0 or above the framesize for which it failed is part of
the exception message, see FLINK-14618.

If you are using older version, then I'm afraid there is no way to tell.

On 9/18/2020 12:11 PM, shravan wrote:

Thanks for the quick response.

I might have wrongly phrased one of the questions.

/"> how can we know the expected size for which it is failing?

If you did not configure akka.framesize yourself then it is set to the
documented default value. See the configuration documentation for the
release you are using."/

We found out the default size from the configuration but we are unable to
identify the size for which it fails. Could you help out on this?

Awaiting a response.

Regards,
Shravan




Chesnay Schepler wrote

how can we know the expected size for which it is failing?

If you did not configure akka.framesize yourself then it is set to the
documented default value. See the configuration documentation for the
release you are using.

   > Does the operator state have any impact on the expected Akka frame
size?

If you are using the MemoryStateBackend, yes. Otherwise, the impact of
using any form of state on the framesize should be negligible.

   > What is the impact of increasing it?

Increase in memory consumption, probably around 1-2x the increased
amount. (So, increase it my 1mb, memory usages goes up by 1-2 mb)

On 9/18/2020 9:50 AM, shravan wrote:

Hi,

This is in continuation to an already raised request, (had replied to
the
same thread but couldn't get any response yet, hence posting a new
request)
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-rpc-invocation-size-exceeds-the-maximum-akka-framesize-when-the-job-was-re-submitted-td37507.html

We are observing the same error as well with regard to "The rpc
invocation
size exceeds the maximum akka framesize.", and have follow-up questions
on
the same.

Why we face this issue, how can we know the expected size for which it
is
failing? The error message does not indicate that. Does the operator
state
have any impact on the expected Akka frame size? What is the impact of
increasing it?

Awaiting a response.

Regards,
Shravan



--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: valuestate(descriptor) using a custom caseclass

2020-09-18 Thread Martin Frank Hansen
Hi Dawid,

Thanks for your reply, much appreciated.

I tried using your implementation for TypeInformation, but still nothing
gets through. There are no errors either, but it simply runs without
sending data to the sink. I have checked that there is data in the input
topic, and I have used the code to run a similar job (with a simple string
type as ValueState).

I have added a print-statement to the process function, but nothing gets
written to the console which suggests that the method is never called,
which it should be by this:

mainDataStream
  .keyBy[PageviewBasedMetricsGroup]((e: Event) =>
Util.getPageviewBasedMetricsGroup(e))
  .timeWindow(Time.minutes(1))
  .process(new MetricsProcessFunction)
  .addSink(target1Min)


Could the problem be in the open-method?

best regards


Den fre. 18. sep. 2020 kl. 12.30 skrev Dawid Wysakowicz <
dwysakow...@apache.org>:

> Hi Martin,
>
> I am not sure what is the exact problem. Is it that the ProcessFunction is
> not invoked or is the problem with values in your state?
>
> As for the question of the case class and ValueState. The best way to do
> it, is to provide the TypeInformation explicitly. If you do not provide the
> TypeInformation, the ValueStateDescriptor will use java type extraction
> stack, which can not handle case classes well and if I am not mistaken they
> will end up serialized with a generic serializer using Kryo.
>
> You can create a proper TypeInformation for a scala case class like this:
>
>import org.apache.flink.streaming.api.scala._ // important to import
> for the implicit scala type extraction
>
> val caseClassTypeInfo =
> implicitly[TypeInformation[PageviewBasedMetrics]]
> this.pageviewMetricState = this.getRuntimeContext
>   .getState(new
> ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics",
> caseClassTypeInfo))
>
> If the problem is that the function is not being invoked, I'd recommend
> checking what TimeCharacteristic you are using (I don't necessarily know
> what is going on in the StreamEnvironment.getStreamEnvironment). If you use
> the ProcessingTime the results will be emitted only after a minute passes.
> (You are using TimeWindow.minutes(1))
>
> Hope that helps.
>
> Best,
>
> Dawid
> On 18/09/2020 10:42, Martin Frank Hansen wrote:
>
> Hi,
>
> I am trying to calculate some different metrics using the state backend to
> control if events have been seen before. I am using the
> ProcessWindowFunction, but nothing gets through, it is as if the
> .process-function is ignored. Is it not possible to store a custom case
> class as ValueState? Or do I need to implement a serializer for the
> case-class? Or ...
>
> Any help is much appreciated.
>
> My code:
>
> class MetricsProcessFunction extends 
> ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]()
>   {
>
>   var pageviewMetricState: ValueState[PageviewBasedMetrics] = _
>
>   override def open(parameters: Configuration): Unit = {
> pageviewMetricState = this.getRuntimeContext.getState(new 
> ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", 
> classOf[PageviewBasedMetrics]))  }
>
>   override def process(key: PageviewBasedMetricsGroup, context: Context, 
> elements: Iterable[Event], out: Collector[PageviewBasedMetrics]): Unit = {
>
> if(elements.head.event.getOrElse("") == "page_view"){
>   val tmp = pwbm.pageviews + 1  val tmpPBM = pwbm.copy(pageviews = 
> tmp,
> startTime = 
> Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant,
> endTime = 
> Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant)
>
>   pageviewMetricState.update(SnowplowPickler.write(tmpPBM))
> }out.collect(SnowplowPickler.read(pageviewMetricState.value()))
>   }
> }
>
>
> object AggregateMultipleMetrics  {
>
>
>   def main(args: Array[String]) {
> val env: StreamEnvironment = 
> StreamEnvironment.getStreamEnvironment("AggregateMetrics")
> val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv
> val appProps: Properties = env.appProps
>
> val inputStream: String = appProps.getProperty("input_topic")
> val outputTopic1Min: String = appProps.getProperty("output_topic_1_min")
> val outputSerializer1Min: KafkaSerializationSchemaPageviewBasedMetrics = 
> new KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min)
> val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new 
> FlinkKafkaKeyPartitioner[PageviewBasedMetrics]()
>
> val snowplowEventSource = new 
> SnowplowEventSource().getStream(inputStream, appProps, executionEnv)
>
> val target1Min: SinkFunction[PageviewBasedMetrics] = new 
> KafkaSink[PageviewBasedMetrics, 
> KafkaSerializationSchemaPageviewBasedMetrics]().getSinkFunction(
>   outputTopic1Min,
>   outputSerializer1Min,
>   partitioner,
>   appProps)
>
> mainDataStream  .keyBy[PageviewBasedMetricsGroup]((e: Event) => 
> Util.getPageviewBasedMetr

Re: valuestate(descriptor) using a custom caseclass

2020-09-18 Thread Martin Frank Hansen
Another note, the case class in hand has about 40 fields in it, is there a
maximum limit for the number of fields?

best regards

Den fre. 18. sep. 2020 kl. 13.05 skrev Martin Frank Hansen <
m...@berlingskemedia.dk>:

> Hi Dawid,
>
> Thanks for your reply, much appreciated.
>
> I tried using your implementation for TypeInformation, but still nothing
> gets through. There are no errors either, but it simply runs without
> sending data to the sink. I have checked that there is data in the input
> topic, and I have used the code to run a similar job (with a simple string
> type as ValueState).
>
> I have added a print-statement to the process function, but nothing gets
> written to the console which suggests that the method is never called,
> which it should be by this:
>
> mainDataStream
>   .keyBy[PageviewBasedMetricsGroup]((e: Event) => 
> Util.getPageviewBasedMetricsGroup(e))
>   .timeWindow(Time.minutes(1))
>   .process(new MetricsProcessFunction)
>   .addSink(target1Min)
>
>
> Could the problem be in the open-method?
>
> best regards
>
>
> Den fre. 18. sep. 2020 kl. 12.30 skrev Dawid Wysakowicz <
> dwysakow...@apache.org>:
>
>> Hi Martin,
>>
>> I am not sure what is the exact problem. Is it that the ProcessFunction
>> is not invoked or is the problem with values in your state?
>>
>> As for the question of the case class and ValueState. The best way to do
>> it, is to provide the TypeInformation explicitly. If you do not provide the
>> TypeInformation, the ValueStateDescriptor will use java type extraction
>> stack, which can not handle case classes well and if I am not mistaken they
>> will end up serialized with a generic serializer using Kryo.
>>
>> You can create a proper TypeInformation for a scala case class like this:
>>
>>import org.apache.flink.streaming.api.scala._ // important to import
>> for the implicit scala type extraction
>>
>> val caseClassTypeInfo =
>> implicitly[TypeInformation[PageviewBasedMetrics]]
>> this.pageviewMetricState = this.getRuntimeContext
>>   .getState(new
>> ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics",
>> caseClassTypeInfo))
>>
>> If the problem is that the function is not being invoked, I'd recommend
>> checking what TimeCharacteristic you are using (I don't necessarily know
>> what is going on in the StreamEnvironment.getStreamEnvironment). If you use
>> the ProcessingTime the results will be emitted only after a minute passes.
>> (You are using TimeWindow.minutes(1))
>>
>> Hope that helps.
>>
>> Best,
>>
>> Dawid
>> On 18/09/2020 10:42, Martin Frank Hansen wrote:
>>
>> Hi,
>>
>> I am trying to calculate some different metrics using the state backend
>> to control if events have been seen before. I am using the
>> ProcessWindowFunction, but nothing gets through, it is as if the
>> .process-function is ignored. Is it not possible to store a custom case
>> class as ValueState? Or do I need to implement a serializer for the
>> case-class? Or ...
>>
>> Any help is much appreciated.
>>
>> My code:
>>
>> class MetricsProcessFunction extends 
>> ProcessWindowFunction[Event,PageviewBasedMetrics,PageviewBasedMetricsGroup,TimeWindow]()
>>   {
>>
>>   var pageviewMetricState: ValueState[PageviewBasedMetrics] = _
>>
>>   override def open(parameters: Configuration): Unit = {
>> pageviewMetricState = this.getRuntimeContext.getState(new 
>> ValueStateDescriptor[PageviewBasedMetrics]("PageviewMetrics", 
>> classOf[PageviewBasedMetrics]))  }
>>
>>   override def process(key: PageviewBasedMetricsGroup, context: Context, 
>> elements: Iterable[Event], out: Collector[PageviewBasedMetrics]): Unit = {
>>
>> if(elements.head.event.getOrElse("") == "page_view"){
>>   val tmp = pwbm.pageviews + 1  val tmpPBM = pwbm.copy(pageviews = 
>> tmp,
>> startTime = 
>> Instant.ofEpochMilli(context.window.getStart).atOffset(ZoneOffset.UTC).toInstant,
>> endTime = 
>> Instant.ofEpochMilli(context.window.getEnd).atOffset(ZoneOffset.UTC).toInstant)
>>
>>   pageviewMetricState.update(SnowplowPickler.write(tmpPBM))
>> }out.collect(SnowplowPickler.read(pageviewMetricState.value()))
>>   }
>> }
>>
>>
>> object AggregateMultipleMetrics  {
>>
>>
>>   def main(args: Array[String]) {
>> val env: StreamEnvironment = 
>> StreamEnvironment.getStreamEnvironment("AggregateMetrics")
>> val executionEnv: StreamExecutionEnvironment = env.streamExecutionEnv
>> val appProps: Properties = env.appProps
>>
>> val inputStream: String = appProps.getProperty("input_topic")
>> val outputTopic1Min: String = appProps.getProperty("output_topic_1_min")
>> val outputSerializer1Min: KafkaSerializationSchemaPageviewBasedMetrics = 
>> new KafkaSerializationSchemaPageviewBasedMetrics(outputTopic1Min)
>> val partitioner: FlinkKafkaPartitioner[PageviewBasedMetrics] = new 
>> FlinkKafkaKeyPartitioner[PageviewBasedMetrics]()
>>
>> val snowplowEventSource = new 
>> SnowplowEventSource().getStream(inputStream, appProps, executionEnv)
>>

Re: Disable WAL in RocksDB recovery

2020-09-18 Thread Yu Li
Thanks for bringing this up Juha, and good catch.

We actually are disabling WAL for routine writes by default when using
RocksDB and never encountered segment fault issues. However, from history
in FLINK-8922, segment fault issue occurs during restore if WAL is
disabled, so I guess the root cause lies in RocksDB batch write
(org.rocksdb.WriteBatch). And IMHO this is a RocksDB bug (it should work
well when WAL is disabled, no matter under single or batch write).

+1 for opening a new JIRA to figure the root cause out, fix it and disable
WAL during restore by default (maybe checking the fixes around WriteBatch
in later RocksDB versions could help locate the issue more quickly), and
thanks for volunteering taking the efforts. I will follow up and help
review if any findings / PR submission.

Best Regards,
Yu


On Wed, 16 Sep 2020 at 13:58, Juha Mynttinen 
wrote:

> Hello there,
>
> I'd like to bring to discussion a previously discussed topic - disabling
> WAL in RocksDB recovery.
>
> It's clear that WAL is not needed during the process, the reason being
> that the WAL is never read, so there's no need to write it.
>
> AFAIK the last thing that was done with WAL during recovery is an attempt
> to remove it and later reverting that removal (
> https://issues.apache.org/jira/browse/FLINK-8922). If I interpret the
> comments in the ticket correctly, what happened was that a) WAL was kept in
> the recovery, 2) it's unknown why removing WAL causes segfault.
>
> What can be seen in the ticket is that having WAL causes a significant
> performance penalty. Thus, getting rid of WAL would be a very nice
> performance improvement. I think it'd be worth to creating a new JIRA
> ticket at least as a reminder that WAL should be removed?
>
> I'm planning adding an experimental flag to remove WAL in the environment
> I'm using Flink and trying it out. If the flag is made configurable, WAL
> can always be re-enabled if removing it causes issues.
>
> Thoughts?
>
> Regards,
> Juha
>
>


Re: Automatically restore from checkpoint

2020-09-18 Thread Arpith P
Thanks David, in case of manual restart; to get checkpoint path
programmatically I'm using the following code to retrieve JobId and
CheckpointID so i could pass along while restarting with "-s" but seems I'm
missing something as I'm getting empty TimestampedFileSplit array.

GlobFilePathFilter filePathFilter = new GlobFilePathFilter(
   Collections.singletonList("[0-9a-fA-F]{32}/chk-[\\d]+"),
   Collections.emptyList());
TextInputFormat inputFormat = new TextInputFormat(
   new org.apache.flink.core.fs.Path(inputFolderPath));
inputFormat.setNestedFileEnumeration(true);
inputFormat.setFilesFilter(filePathFilter);

ContinuousFileMonitoringFunction monitoringFunction = new
ContinuousFileMonitoringFunction<>(
   inputFormat,
   FileProcessingMode.PROCESS_CONTINUOUSLY,
   inputFolderParallelism,
   pollInterval);

DataStream splits =
env.addSource(monitoringFunction);
splits.addSink(new PrintSinkFunction<>());


Arpith

On Fri, Sep 18, 2020 at 2:09 PM David Anderson 
wrote:

> If your job crashes, Flink will automatically restart from the latest
> checkpoint, without any manual intervention. JobManager HA is only needed
> for automatic recovery after the failure of the Job Manager.
>
> You only need externalized checkpoints and "-s :checkpointPath" if you
> want to use checkpoints to manually restart a job after manually cancelling
> or stopping it. Also, it's not necessary that you have read access to the
> checkpoints, but the job manager and task managers must be able to read
> (and write) them.
>
> Regards,
> David
>
> On Fri, Sep 18, 2020 at 6:23 AM Arpith P  wrote:
>
>> Hi,
>>
>> I'm running Flink job in distributed mode deployed in Yarn; I've enabled
>> externalized checkpoint to save in Hdfs, but I don't have access to read
>> checkpoints folder. To restart Flink job from the last saved checkpoint is
>> it possible to do without passing "-s :checkpointPath". If this is not
>> possible how can I restore states after the job gets crashed. If enabling
>> JobManager HA would help me in anyway.
>>
>> Thanks,
>> Arpith
>>
>


metaspace out-of-memory & error while retrieving the leader gateway

2020-09-18 Thread Claude M
Hello,

I upgraded from Flink 1.7.2 to 1.10.2.  One of the jobs running on the task
managers is periodically crashing w/ the following error:

java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
has occurred. This can mean two things: either the job requires a larger
size of JVM metaspace to load classes or there is a class loading leak. In
the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
should be increased. If the error persists (usually in cluster after
several job (re-)submissions) then there is probably a class loading leak
which has to be investigated and fixed. The task executor has to be
shutdown.

I found this issue regarding it:
https://issues.apache.org/jira/browse/FLINK-16406

I have tried increasing the taskmanager.memory.jvm-metaspace.size to 256M &
512M and still was having the problem.

I then added the following to the flink.conf to try to get more information
about the error:
env.java.opts: -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/opt/flink/log

When I deployed the change which is in a Kubernetes cluster, the jobmanager
pod fails to start up and the following message shows repeatedly:

2020-09-18 17:03:46,255 WARN
 org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever  -
Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@flink-jobmanager:50010/user/dispatcher.

The only way I can resolve this is to delete the folder from zookeeper
which I shouldn't have to do.

Any ideas on these issues?


Re: Maximum query and refresh rate for metrics from REST API

2020-09-18 Thread Piper Piper
Thank you, Chesnay!

On Thu, Sep 17, 2020, 3:59 AM Chesnay Schepler  wrote:

> By default metrics are only updated every 10 seconds; this can be
> controlled via
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#metrics-fetcher-update-interval
> .
>
> On 9/17/2020 12:22 AM, Piper Piper wrote:
> > Hello,
> >
> > What is the recommended way to get metrics (such as CPU, Memory and
> > user defined meters and gauges) at the highest frequency rate (i.e.
> > with the highest/fastest refresh rate) such as every 500 milliseconds
> > or less?
> >
> > Is there any rate limiting by default on querying the REST API for
> > metrics? I am querying the REST API every second but not seeing any
> > change in the CPU load for every second, so I was wondering if there
> > is any maximum frequency at which I can query it.
> >
> > Thanks,
> >
> > Piper
>
>
>


Flink SQL - can I have multiple outputs per job?

2020-09-18 Thread Dan Hill
I have a few results that I want to produce.
- A join B
- A join B join C
- A join B join C join D
- A join B join C join D join E

When I use the DataSet API directly, I can execute all of these in the same
job to reduce redundancy.  When I use the SQL interface, it looks like
separate jobs are created for each of these (duplicating join calculations).

Is there a way to merge these joins?