Re: Flink SQL update-mode set to retract in env file.

2019-09-26 Thread Terry Wang
Hi srikanth~

The Flink SQL update-mode is inferred from the target table type.
For now, there are three StreamTableSink type, `AppendStreamTableSink` 
`UpsertStreamTableSink` and `RetractStreamTableSink`. 
If the target table is a type of Kafka which implments AppendStreamTableSink, 
the update-mode will be append only. 
So if you want enable retract-mode you may need to insert into one kind of 
RetractStreamTableSink.
Hope it helps you ~



Best,
Terry Wang



> 在 2019年9月26日,下午2:50,srikanth flink  写道:
> 
> How could I configure environment file for Flink SQL, update-mode: retract?
> 
> I have this for append:
> properties: 
> - key: zookeeper.connect
>   value: localhost:2181
> - key: bootstrap.servers
>   value: localhost:9092
> - key: group.id 
>   value: reconMultiAttempFail
> format:
>   type: json
>   fail-on-missing-field: false
>   json-schema: >
> {
>   type: 'object',
>   properties: {
> 'a': {
>type: 'string'
> },
> 'b': {
>type: 'string'
> },
> 'cnt': {
>type: 'string'
> }
>   }
> }
>   derive-schema: false
> 
> schema:
>   - name: 'a'
> type: VARCHAR
>  - name: 'b'
> type: VARCHAR
>   - name: 'cnt'
> type: BIGINT
> 
> Couldn't find any document for the same. 
> 
> someone help me with the syntax.
> 
> Thanks
> Srikanth
> 



CSV Table source as data-stream in environment file

2019-09-26 Thread Nishant Gupta
Hi Team,

How do we define csv table source as a data-stream instead of data-set in
environment file.?

Whether or not i mention  update-mode: append  or not... I takes only csv
file as data-set.
Is there any detailed reference to environment file configuration where
sinks and sources are defined.

 - name: badips
type: source-table
update-mode: append
connector:
  type: filesystem
  path: "/home/user/file.csv"
format:
  type: csv
  fields:
- name: col1
  type: VARCHAR
  comment-prefix: "#"
schema:
  - name: col1
type: VARCHAR


Re: Flink SQL update-mode set to retract in env file.

2019-09-26 Thread srikanth flink
Hi Terry Wang,

Thanks for quick reply.

I would like to understand more on your line " If the target table is a
type of Kafka which implments AppendStreamTableSink, the update-mode will
be append only".
If your statement defines retract mode could not be used for Kafka sinks as
it implements AppendStreamTableSink, but then the below code is working for
me, dumping data to Kafka:
DataStream outStreamAgg = tableEnv.toRetractStream(resultTable,
Row.class).map(t -> {
Row r = t.f1;
ObjectNode node = mapper.createObjectNode();
node.put("source.ip", r.getField(0).toString());
node.put("destination.ip", r.getField(1).toString());
node.put("cnt", Long.parseLong(r.getField(2).toString()));
return node.toString();
});
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"host:9092");
kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");

outStreamAgg.addSink(new FlinkKafkaProducer("reconMultiAttempFail",
new SimpleStringSchema(),
kafkaProducerProperties));

Is it that the above functionality works only with Table API and not with
SQL?
Please explain.

Thanks
Srikanth



On Thu, Sep 26, 2019 at 1:57 PM Terry Wang  wrote:

> Hi srikanth~
>
> The Flink SQL update-mode is inferred from the target table type.
> For now, there are three StreamTableSink type, `AppendStreamTableSink`
> `UpsertStreamTableSink` and `RetractStreamTableSink`.
> If the target table is a type of Kafka which implments
> AppendStreamTableSink, the update-mode will be append only.
> So if you want enable retract-mode you may need to insert into one kind of
> RetractStreamTableSink.
> Hope it helps you ~
>
>
>
> Best,
> Terry Wang
>
>
>
> 在 2019年9月26日,下午2:50,srikanth flink  写道:
>
> How could I configure environment file for Flink SQL, update-mode: retract?
>
> I have this for append:
> properties:
> - key: zookeeper.connect
>   value: localhost:2181
> - key: bootstrap.servers
>   value: localhost:9092
> - key: group.id
>   value: reconMultiAttempFail
> format:
>   type: json
>   fail-on-missing-field: false
>   json-schema: >
> {
>   type: 'object',
>   properties: {
> 'a': {
>type: 'string'
> },
> 'b': {
>type: 'string'
> },
> 'cnt': {
>type: 'string'
> }
>   }
> }
>   derive-schema: false
>
> schema:
>   - name: 'a'
> type: VARCHAR
>  - name: 'b'
> type: VARCHAR
>   - name: 'cnt'
> type: BIGINT
>
> Couldn't find any document for the same.
>
> someone help me with the syntax.
>
> Thanks
> Srikanth
>
>
>


Re: Multiple Job Managers in Flink HA Setup

2019-09-26 Thread Yang Wang
Hi Steven,

I have test the standalone cluster on kubernetes with 2 jobmanager.
Using active jobmanager webui or standby jobmanager webui
to submit flink jobs could both work fine. So i think maybe the problem
is about your HAProxy. Does your HAProxy will forward traffic to both
active and standby jobmanager? I think just using a random or round robin
is enough.

Best,
Yang

Gary Yao  于2019年9月25日周三 下午9:10写道:

> Hi Steve,
>
> > I also tried attaching a shared NFS folder between the two machines and
> > tried to set their web.tmpdir property to the shared folder, however it
> > appears that each job manager creates a seperate job inside that
> directory.
>
> You can create a fixed upload directory via the config option
> 'web.upload.dir'
> [1]. To avoid race conditions, it is probably best to make sure that the
> directory already exists before starting the JMs (if the path does not
> exist,
> both JMs may attempt to create it).
>
> Alternatively you can try one of the following:
> - Do not use stand-by masters
> - Find the leader address from ZooKeeper, and issue a request directly [2]
> - Use Flink CLI, which will resolve the leading JM from ZooKeeper. Note
> that
>  the CLI submits the job by uploading a serialized JobGraph [2][3][4][5]
> (you
>  could also rebuild that part of the CLI if you need programmatic job
>  submission).
>
> Lastly, I want to point out that the programmatic job submission is
> currently
> being reworked (see [6] for details).
>
> > 2) provide a persistent storage directory for the Jar file so I can
> perform
> > rescaling without needing to re-upload the jar file.
>
> Can you describe how are you rescaling?
>
>
> Best,
> Gary
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#web-upload-dir
> [2]
> https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L162
> [3]
> https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L215
> [4]
> https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java#L79
> [5]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-1
> [6]
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>
> On Fri, Sep 20, 2019 at 10:57 PM Steven Nelson 
> wrote:
>
>> Hello!
>>
>> I am having some difficulty with multiple job managers in an HA setup
>> using Flink 1.9.0.
>>
>> I have 2 job managers and have setup the HA setup with the following
>> config
>>
>> high-availability: zookeeper
>> high-availability.cluster-id: /imet-enhance
>> high-availability.storageDir: hdfs:///flink/ha/
>> high-availability.zookeeper.quorum:
>> flink-state-hdfs-zookeeper-1.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-2.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-0.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181
>> high-availability.zookeeper.path.root: /flink
>> high-availability.jobmanager.port: 5-50025
>>
>> I have the job managers behind a load balancer inside a kubernetes cluster
>>
>> They work great except for one thing. When I use the website (or API) to
>> upload the Jar file and start the job sometimes the request goes to a
>> different job manager, which doesn't have the jar file in it's temporary
>> directory, so it fails to start.
>>
>> In the 1.7 version of this setup the second Job Manager would return a
>> Redirect request. I put an HAProxy in front of it that only allowed traffic
>> to flow to the Job Manager that wasn't returning a 300 and this worked well
>> for everything. In 1.9 it appears that both Job Managers are able to
>> respond (via the internal proxy mechanism I have seen in prior emails).
>> However it appears the web file cache is still shared.
>>
>> I also tried attaching a shared NFS folder between the two machines and
>> tried to set their web.tmpdir property to the shared folder, however it
>> appears that each job manager creates a seperate job inside that directory.
>>
>> My end goals are:
>> 1) Provide a fault tolerant Flink Cluster
>> 2) provide a persistent storage directory for the Jar file so I can
>> perform rescaling without needing to re-upload the jar file.
>>
>> Thoughts?
>> -Steve
>>
>


Problems with java.utils

2019-09-26 Thread Nicholas Walton
I’m having a problem using ArrayList in Scala . The code is below

import org.apache.flink.core.fs._
import org.apache.flink.streaming.api._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sinks._
import org.apache.http.HttpHost
import org.slf4j.LoggerFactory

import java.util.ArrayList

object Job {

  val httpHosts = new ArrayList[HttpHost]
  httpHosts.add(new HttpHost("samwise.local", 9200, "http"))
…..

}

Scala refuses to recognise ArrayList. The IDE InteliJ likewise flags java.util 
as not existing, even though it recognises ArrayList and suggest auto-insertion 
of the import java.utils.ArrayList statement. The compilation errors are

[error] ……/src/main/scala/org/example/Job.scala:30:13: object util is not a 
member of package org.apache.flink.table.api.java
[error] import java.util.ArrayList
[error] ^
[error] ……/src/main/scala/org/example/Job.scala:34:23: not found: type 
ArrayList
[error]   val httpHosts = new ArrayList[HttpHost]
[error]   ^
[error] two errors found
[error] (Compile / compileIncremental) Compilation failed

Without the ArrayList reference the code compiles with no errors.

The sbt build file, if it helps is,

resolvers in ThisBuild ++= Seq("Apache Development Snapshot Repository" at 
"https://repository.apache.org/content/repositories/snapshots/";, 
Resolver.mavenLocal)

name := "Flink MultiChannel Project"

version := "0.1-SNAPSHOT"

organization := "org.example"

scalaVersion in ThisBuild := "2.11.0"

val flinkVersion = "1.8.1"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion ,
  "org.apache.flink" %% "flink-table" % "1.7.2" ,
  "org.apache.flink" %% "flink-connector-elasticsearch" % flinkVersion,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
  "org.apache.httpcomponents" % "httpclient" % "4.5.10",
  "org.apache.httpcomponents" % "httpcore" % "4.4.11")
// https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore

lazy val root = (project in file(".")).
  settings(
libraryDependencies ++= flinkDependencies)

mainClass in assembly := Some("org.example.Job")

// make run command include the provided dependencies
run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in 
(Compile, run), runner in (Compile, run))

// exclude Scala library from assembly
assemblyOption in assembly := (assemblyOption in 
assembly).value.copy(includeScala = false)


TIA

Nick

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-26 Thread Yang Wang
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to
always recover
from the latest checkpoint, just as Aleksandar and Yun Tang said you could
use the
high-availability configuration. Make sure the cluster-id is not changed, i
think the job
could recover both at exceptionally crash and restart by expectation.

@Aleksandar Mastilovic , we are also have an
zookeeper-less high-availability implementation[1].
Maybe we could have some discussion and contribute this useful feature to
the community.

[1].
https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit

Best,
Yang

Aleksandar Mastilovic  于2019年9月26日周四 上午4:11写道:

> Would you guys (Flink devs) be interested in our solution for
> zookeeper-less HA? I could ask the managers how they feel about
> open-sourcing the improvement.
>
> On Sep 25, 2019, at 11:49 AM, Yun Tang  wrote:
>
> As Aleksandar said, k8s with HA configuration could solve your problem.
> There already have some discussion about how to implement such HA in k8s if
> we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
> Currently, you might only have to choose zookeeper as high-availability
> service.
>
> [1] https://issues.apache.org/jira/browse/FLINK-11105
> [2] https://issues.apache.org/jira/browse/FLINK-12884
>
> Best
> Yun Tang
> --
> *From:* Aleksandar Mastilovic 
> *Sent:* Thursday, September 26, 2019 1:57
> *To:* Sean Hester 
> *Cc:* Hao Sun ; Yuval Itzchakov ;
> user 
> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>
> Can’t you simply use JobManager in HA mode? It would pick up where it left
> off if you don’t provide a Savepoint.
>
> On Sep 25, 2019, at 6:07 AM, Sean Hester 
> wrote:
>
> thanks for all replies! i'll definitely take a look at the Flink k8s
> Operator project.
>
> i'll try to restate the issue to clarify. this issue is specific to
> starting a job from a savepoint in job-cluster mode. in these cases the Job
> Manager container is configured to run a single Flink job at start-up. the
> savepoint needs to be provided as an argument to the entrypoint. the Flink
> documentation for this approach is here:
>
>
> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>
> the issue is that taking this approach means that the job will *always* start
> from the savepoint provided as the start argument in the Kubernetes YAML.
> this includes unplanned restarts of the job manager, but we'd really prefer
> any *unplanned* restarts resume for the most recent checkpoint instead of
> restarting from the configured savepoint. so in a sense we want the
> savepoint argument to be transient, only being used during the initial
> deployment, but this runs counter to the design of Kubernetes which always
> wants to restore a deployment to the "goal state" as defined in the YAML.
>
> i hope this helps. if you want more details please let me know, and thanks
> again for your time.
>
>
> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun  wrote:
>
> I think I overlooked it. Good point. I am using Redis to save the path to
> my savepoint, I might be able to set a TTL to avoid such issue.
>
> Hao Sun
>
>
> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov  wrote:
>
> Hi Hao,
>
> I think he's exactly talking about the usecase where the JM/TM restart and
> they come back up from the latest savepoint which might be stale by that
> time.
>
> On Tue, 24 Sep 2019, 19:24 Hao Sun,  wrote:
>
> We always make a savepoint before we shutdown the job-cluster. So the
> savepoint is always the latest. When we fix a bug or change the job graph,
> it can resume well.
> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
> uncaught exception, etc.
>
> Maybe I do not understand your use case well, I do not see a need to start
> from checkpoint after a bug fix.
> From what I know, currently you can use checkpoint as a savepoint as well
>
> Hao Sun
>
>
> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov  wrote:
>
> AFAIK there's currently nothing implemented to solve this problem, but
> working on a possible fix can be implemented on top of
> https://github.com/lyft/flinkk8soperator which already has a pretty fancy
> state machine for rolling upgrades. I'd love to be involved as this is an
> issue I've been thinking about as well.
>
> Yuval
>
> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester 
> wrote:
>
> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
> when deploying Flink jobs to start from savepoints using the job-cluster
> mode in Kubernetes.
>
> we're running a ~15 different jobs, all in job-cluster mode, using a mix
> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
> all long-running streaming jobs, all essentially acting as microservices.
> we're using Helm charts to configure all of our deployments.
>
> we have a number of use cases where we want to restart jobs from a
> savepoint to r

Re: Flink SQL update-mode set to retract in env file.

2019-09-26 Thread Terry Wang
Hi, Srikanth~

In your code, 
DataStream outStreamAgg = tableEnv.toRetractStream(resultTable, 
Row.class).map(t -> {});  has converted the resultTable into a DataStream 
that’s unrelated with tableApi,
And the following code `outStreamAgg.addSink(…)` is just a normall stream write 
to a FlinkKafka sink function.
Your program code is a mixture of table api and dataStream programing not just 
single Table API.

Best,
Terry Wang



> 在 2019年9月26日,下午5:47,srikanth flink  写道:
> 
> Hi Terry Wang,
> 
> Thanks for quick reply.
> 
> I would like to understand more on your line " If the target table is a type 
> of Kafka which implments AppendStreamTableSink, the update-mode will be 
> append only". 
> If your statement defines retract mode could not be used for Kafka sinks as 
> it implements AppendStreamTableSink, but then the below code is working for 
> me, dumping data to Kafka:
> DataStream outStreamAgg = tableEnv.toRetractStream(resultTable, 
> Row.class).map(t -> {
> Row r = t.f1;
> ObjectNode node = mapper.createObjectNode();
> node.put("source.ip", r.getField(0).toString());
> node.put("destination.ip", r.getField(1).toString());
> node.put("cnt", Long.parseLong(r.getField(2).toString()));
> return node.toString();
> }); 
> Properties kafkaProducerProperties = new Properties();
> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "host:9092");
> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
> 
> outStreamAgg.addSink(new FlinkKafkaProducer("reconMultiAttempFail", 
> new SimpleStringSchema(),
> kafkaProducerProperties));
> 
> Is it that the above functionality works only with Table API and not with SQL?
> Please explain.
> 
> Thanks
> Srikanth
> 
> 
> 
> On Thu, Sep 26, 2019 at 1:57 PM Terry Wang  > wrote:
> Hi srikanth~
> 
> The Flink SQL update-mode is inferred from the target table type.
> For now, there are three StreamTableSink type, `AppendStreamTableSink` 
> `UpsertStreamTableSink` and `RetractStreamTableSink`. 
> If the target table is a type of Kafka which implments AppendStreamTableSink, 
> the update-mode will be append only. 
> So if you want enable retract-mode you may need to insert into one kind of 
> RetractStreamTableSink.
> Hope it helps you ~
> 
> 
> 
> Best,
> Terry Wang
> 
> 
> 
>> 在 2019年9月26日,下午2:50,srikanth flink > > 写道:
>> 
>> How could I configure environment file for Flink SQL, update-mode: retract?
>> 
>> I have this for append:
>> properties: 
>> - key: zookeeper.connect
>>   value: localhost:2181
>> - key: bootstrap.servers
>>   value: localhost:9092
>> - key: group.id 
>>   value: reconMultiAttempFail
>> format:
>>   type: json
>>   fail-on-missing-field: false
>>   json-schema: >
>> {
>>   type: 'object',
>>   properties: {
>> 'a': {
>>type: 'string'
>> },
>> 'b': {
>>type: 'string'
>> },
>> 'cnt': {
>>type: 'string'
>> }
>>   }
>> }
>>   derive-schema: false
>> 
>> schema:
>>   - name: 'a'
>> type: VARCHAR
>>  - name: 'b'
>> type: VARCHAR
>>   - name: 'cnt'
>> type: BIGINT
>> 
>> Couldn't find any document for the same. 
>> 
>> someone help me with the syntax.
>> 
>> Thanks
>> Srikanth
>> 
> 



Re: Flink SQL update-mode set to retract in env file.

2019-09-26 Thread srikanth flink
Awesome, thanks!

On Thu, Sep 26, 2019 at 5:50 PM Terry Wang  wrote:

> Hi, Srikanth~
>
> In your code,
> DataStream outStreamAgg = tableEnv.toRetractStream(resultTable,
> Row.class).map(t -> {});  has converted the resultTable into a DataStream
> that’s unrelated with tableApi,
> And the following code `outStreamAgg.addSink(…)` is just a normall stream
> write to a FlinkKafka sink function.
> Your program code is a mixture of table api and dataStream programing not
> just single Table API.
>
> Best,
> Terry Wang
>
>
>
> 在 2019年9月26日,下午5:47,srikanth flink  写道:
>
> Hi Terry Wang,
>
> Thanks for quick reply.
>
> I would like to understand more on your line " If the target table is a
> type of Kafka which implments AppendStreamTableSink, the update-mode will
> be append only".
> If your statement defines retract mode could not be used for Kafka sinks
> as it implements AppendStreamTableSink, but then the below code is working
> for me, dumping data to Kafka:
> DataStream outStreamAgg = tableEnv.toRetractStream(resultTable,
> Row.class).map(t -> {
> Row r = t.f1;
> ObjectNode node = mapper.createObjectNode();
> node.put("source.ip", r.getField(0).toString());
> node.put("destination.ip", r.getField(1).toString());
> node.put("cnt", Long.parseLong(r.getField(2).toString()));
> return node.toString();
> });
> Properties kafkaProducerProperties = new Properties();
> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "host:9092");
> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
>
> outStreamAgg.addSink(new
> FlinkKafkaProducer("reconMultiAttempFail", new SimpleStringSchema(),
> kafkaProducerProperties));
>
> Is it that the above functionality works only with Table API and not with
> SQL?
> Please explain.
>
> Thanks
> Srikanth
>
>
>
> On Thu, Sep 26, 2019 at 1:57 PM Terry Wang  wrote:
>
>> Hi srikanth~
>>
>> The Flink SQL update-mode is inferred from the target table type.
>> For now, there are three StreamTableSink type, `AppendStreamTableSink`
>> `UpsertStreamTableSink` and `RetractStreamTableSink`.
>> If the target table is a type of Kafka which implments
>> AppendStreamTableSink, the update-mode will be append only.
>> So if you want enable retract-mode you may need to insert into one kind
>> of RetractStreamTableSink.
>> Hope it helps you ~
>>
>>
>>
>> Best,
>> Terry Wang
>>
>>
>>
>> 在 2019年9月26日,下午2:50,srikanth flink  写道:
>>
>> How could I configure environment file for Flink SQL, update-mode:
>> retract?
>>
>> I have this for append:
>> properties:
>> - key: zookeeper.connect
>>   value: localhost:2181
>> - key: bootstrap.servers
>>   value: localhost:9092
>> - key: group.id
>>   value: reconMultiAttempFail
>> format:
>>   type: json
>>   fail-on-missing-field: false
>>   json-schema: >
>> {
>>   type: 'object',
>>   properties: {
>> 'a': {
>>type: 'string'
>> },
>> 'b': {
>>type: 'string'
>> },
>> 'cnt': {
>>type: 'string'
>> }
>>   }
>> }
>>   derive-schema: false
>>
>> schema:
>>   - name: 'a'
>> type: VARCHAR
>>  - name: 'b'
>> type: VARCHAR
>>   - name: 'cnt'
>> type: BIGINT
>>
>> Couldn't find any document for the same.
>>
>> someone help me with the syntax.
>>
>> Thanks
>> Srikanth
>>
>>
>>
>


Re: Problems with java.utils

2019-09-26 Thread Nicholas Walton
I’ve shrunk the problem down to a minimal size. The code 

package org.example

import org.apache.flink.table.api._
import org.apache.http.HttpHost

import java.util.ArrayList

object foo {

  val httpHosts = new ArrayList[HttpHost]
  httpHosts.add(new HttpHost("samwise.local", 9200, "http"))

}
will not compile, but remove the import org.apache.flink.table.api._ and all is 
well

Nick


> On 26 Sep 2019, at 12:53, Nicholas Walton  wrote:
> 
> I’m having a problem using ArrayList in Scala . The code is below
> 
> import org.apache.flink.core.fs._
> import org.apache.flink.streaming.api._
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.api._
> import org.apache.flink.table.api.scala._
> import org.apache.flink.table.sinks._
> import org.apache.http.HttpHost
> import org.slf4j.LoggerFactory
> 
> import java.util.ArrayList
> 
> object Job {
> 
>   val httpHosts = new ArrayList[HttpHost]
>   httpHosts.add(new HttpHost("samwise.local", 9200, "http"))
> …..
> 
> }
> 
> Scala refuses to recognise ArrayList. The IDE InteliJ likewise flags 
> java.util as not existing, even though it recognises ArrayList and suggest 
> auto-insertion of the import java.utils.ArrayList statement. The compilation 
> errors are
> 
> [error] ……/src/main/scala/org/example/Job.scala:30:13: object util is not 
> a member of package org.apache.flink.table.api.java
> [error] import java.util.ArrayList
> [error] ^
> [error] ……/src/main/scala/org/example/Job.scala:34:23: not found: type 
> ArrayList
> [error]   val httpHosts = new ArrayList[HttpHost]
> [error]   ^
> [error] two errors found
> [error] (Compile / compileIncremental) Compilation failed
> 
> Without the ArrayList reference the code compiles with no errors.
> 
> The sbt build file, if it helps is,
> 
> resolvers in ThisBuild ++= Seq("Apache Development Snapshot Repository" at 
> "https://repository.apache.org/content/repositories/snapshots/ 
> ", 
> Resolver.mavenLocal)
> 
> name := "Flink MultiChannel Project"
> 
> version := "0.1-SNAPSHOT"
> 
> organization := "org.example"
> 
> scalaVersion in ThisBuild := "2.11.0"
> 
> val flinkVersion = "1.8.1"
> 
> val flinkDependencies = Seq(
>   "org.apache.flink" %% "flink-scala" % flinkVersion ,
>   "org.apache.flink" %% "flink-table" % "1.7.2" ,
>   "org.apache.flink" %% "flink-connector-elasticsearch" % flinkVersion,
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
>   "org.apache.httpcomponents" % "httpclient" % "4.5.10",
>   "org.apache.httpcomponents" % "httpcore" % "4.4.11")
> // https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore 
> 
> 
> lazy val root = (project in file(".")).
>   settings(
> libraryDependencies ++= flinkDependencies)
> 
> mainClass in assembly := Some("org.example.Job")
> 
> // make run command include the provided dependencies
> run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in 
> (Compile, run), runner in (Compile, run))
> 
> // exclude Scala library from assembly
> assemblyOption in assembly := (assemblyOption in 
> assembly).value.copy(includeScala = false)
> 
> 
> TIA
> 
> Nick



Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-26 Thread Sean Hester
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is
that there are some disaster recovery and data center migration use cases
where the continuity of the job managers is difficult to preserve. but
those are admittedly very edgy use cases. i think it's definitely worth
reviewing the SLAs with our site reliability engineers to see how likely it
would be to completely lose all job managers under an HA configuration.
that small a risk might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i
spotted a thread somewhere between Till and someone (perhaps you) about
that. feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang  wrote:

> Hi, Aleksandar
>
> Savepoint option in standalone job cluster is optional. If you want to
> always recover
> from the latest checkpoint, just as Aleksandar and Yun Tang said you could
> use the
> high-availability configuration. Make sure the cluster-id is not changed,
> i think the job
> could recover both at exceptionally crash and restart by expectation.
>
> @Aleksandar Mastilovic , we are also have
> an zookeeper-less high-availability implementation[1].
> Maybe we could have some discussion and contribute this useful feature to
> the community.
>
> [1].
> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>
> Best,
> Yang
>
> Aleksandar Mastilovic  于2019年9月26日周四
> 上午4:11写道:
>
>> Would you guys (Flink devs) be interested in our solution for
>> zookeeper-less HA? I could ask the managers how they feel about
>> open-sourcing the improvement.
>>
>> On Sep 25, 2019, at 11:49 AM, Yun Tang  wrote:
>>
>> As Aleksandar said, k8s with HA configuration could solve your problem.
>> There already have some discussion about how to implement such HA in k8s if
>> we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
>> Currently, you might only have to choose zookeeper as high-availability
>> service.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>
>> Best
>> Yun Tang
>> --
>> *From:* Aleksandar Mastilovic 
>> *Sent:* Thursday, September 26, 2019 1:57
>> *To:* Sean Hester 
>> *Cc:* Hao Sun ; Yuval Itzchakov ;
>> user 
>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>
>> Can’t you simply use JobManager in HA mode? It would pick up where it
>> left off if you don’t provide a Savepoint.
>>
>> On Sep 25, 2019, at 6:07 AM, Sean Hester 
>> wrote:
>>
>> thanks for all replies! i'll definitely take a look at the Flink k8s
>> Operator project.
>>
>> i'll try to restate the issue to clarify. this issue is specific to
>> starting a job from a savepoint in job-cluster mode. in these cases the Job
>> Manager container is configured to run a single Flink job at start-up. the
>> savepoint needs to be provided as an argument to the entrypoint. the Flink
>> documentation for this approach is here:
>>
>>
>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>
>> the issue is that taking this approach means that the job will *always* start
>> from the savepoint provided as the start argument in the Kubernetes YAML.
>> this includes unplanned restarts of the job manager, but we'd really prefer
>> any *unplanned* restarts resume for the most recent checkpoint instead
>> of restarting from the configured savepoint. so in a sense we want the
>> savepoint argument to be transient, only being used during the initial
>> deployment, but this runs counter to the design of Kubernetes which always
>> wants to restore a deployment to the "goal state" as defined in the YAML.
>>
>> i hope this helps. if you want more details please let me know, and
>> thanks again for your time.
>>
>>
>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun  wrote:
>>
>> I think I overlooked it. Good point. I am using Redis to save the path to
>> my savepoint, I might be able to set a TTL to avoid such issue.
>>
>> Hao Sun
>>
>>
>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov 
>> wrote:
>>
>> Hi Hao,
>>
>> I think he's exactly talking about the usecase where the JM/TM restart
>> and they come back up from the latest savepoint which might be stale by
>> that time.
>>
>> On Tue, 24 Sep 2019, 19:24 Hao Sun,  wrote:
>>
>> We always make a savepoint before we shutdown the job-cluster. So the
>> savepoint is always the latest. When we fix a bug or change the job graph,
>> it can resume well.
>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>> uncaught exception, etc.
>>
>> Maybe I do not understand your use case well, I do not see a need to
>> start from checkpoint after a bug fix.
>> From what I know, currently you can use checkpoint as a savepoint as well
>>
>> Hao Sun
>>
>>
>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov 
>> wrote:
>>
>> AF

Flink- Heap Space running out

2019-09-26 Thread Nishant Gupta
 am running a query to join a stream and a table as below. It is running
out of heap space. Even though it has enough heap space in flink cluster
(60GB * 3)

Is there an eviction strategy needed for this query ?

*SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON
sourceKafka.CC=DefaulterTable.CC;  *

Thanks

Nishant


Re: Problems with java.utils

2019-09-26 Thread Dian Fu
Hi Nick,

>> [error] ……/src/main/scala/org/example/Job.scala:30:13: object util is 
>> not a member of package org.apache.flink.table.api.java
>> [error] import java.util.ArrayList


The error message shows that it tries to find "util.ArrayList" under package 
"org.apache.flink.table.api.java". So you can try one of the following two 
solutions:
1) Don't import "org.apache.flink.table.api._"
2) Use absolute import: "import _root_.java.util.ArrayList"

Regards,
Dian

> 在 2019年9月26日,下午10:04,Nicholas Walton  写道:
> 
> I’ve shrunk the problem down to a minimal size. The code 
> 
> package org.example
> 
> import org.apache.flink.table.api._
> import org.apache.http.HttpHost
> 
> import java.util.ArrayList
> 
> object foo {
> 
>   val httpHosts = new ArrayList[HttpHost]
>   httpHosts.add(new HttpHost("samwise.local", 9200, "http"))
> 
> }
> will not compile, but remove the import org.apache.flink.table.api._ and all 
> is well
> 
> Nick
> 
> 
>> On 26 Sep 2019, at 12:53, Nicholas Walton > > wrote:
>> 
>> I’m having a problem using ArrayList in Scala . The code is below
>> 
>> import org.apache.flink.core.fs._
>> import org.apache.flink.streaming.api._
>> import org.apache.flink.streaming.api.scala._
>> import org.apache.flink.table.api._
>> import org.apache.flink.table.api.scala._
>> import org.apache.flink.table.sinks._
>> import org.apache.http.HttpHost
>> import org.slf4j.LoggerFactory
>> 
>> import java.util.ArrayList
>> 
>> object Job {
>> 
>>   val httpHosts = new ArrayList[HttpHost]
>>   httpHosts.add(new HttpHost("samwise.local", 9200, "http"))
>> …..
>> 
>> }
>> 
>> Scala refuses to recognise ArrayList. The IDE InteliJ likewise flags 
>> java.util as not existing, even though it recognises ArrayList and suggest 
>> auto-insertion of the import java.utils.ArrayList statement. The compilation 
>> errors are
>> 
>> [error] ……/src/main/scala/org/example/Job.scala:30:13: object util is 
>> not a member of package org.apache.flink.table.api.java
>> [error] import java.util.ArrayList
>> [error] ^
>> [error] ……/src/main/scala/org/example/Job.scala:34:23: not found: type 
>> ArrayList
>> [error]   val httpHosts = new ArrayList[HttpHost]
>> [error]   ^
>> [error] two errors found
>> [error] (Compile / compileIncremental) Compilation failed
>> 
>> Without the ArrayList reference the code compiles with no errors.
>> 
>> The sbt build file, if it helps is,
>> 
>> resolvers in ThisBuild ++= Seq("Apache Development Snapshot Repository" at 
>> "https://repository.apache.org/content/repositories/snapshots/ 
>> ", 
>> Resolver.mavenLocal)
>> 
>> name := "Flink MultiChannel Project"
>> 
>> version := "0.1-SNAPSHOT"
>> 
>> organization := "org.example"
>> 
>> scalaVersion in ThisBuild := "2.11.0"
>> 
>> val flinkVersion = "1.8.1"
>> 
>> val flinkDependencies = Seq(
>>   "org.apache.flink" %% "flink-scala" % flinkVersion ,
>>   "org.apache.flink" %% "flink-table" % "1.7.2" ,
>>   "org.apache.flink" %% "flink-connector-elasticsearch" % flinkVersion,
>>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
>>   "org.apache.httpcomponents" % "httpclient" % "4.5.10",
>>   "org.apache.httpcomponents" % "httpcore" % "4.4.11")
>> // https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore 
>> 
>> 
>> lazy val root = (project in file(".")).
>>   settings(
>> libraryDependencies ++= flinkDependencies)
>> 
>> mainClass in assembly := Some("org.example.Job")
>> 
>> // make run command include the provided dependencies
>> run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in 
>> (Compile, run), runner in (Compile, run))
>> 
>> // exclude Scala library from assembly
>> assemblyOption in assembly := (assemblyOption in 
>> assembly).value.copy(includeScala = false)
>> 
>> 
>> TIA
>> 
>> Nick
> 



Re: Problems with java.utils

2019-09-26 Thread Nicholas Walton
Dian

That fixed the problem thanks you. It would appear that someone has taken it 
upon themselves to redefine part of the Java standard library in 
org.apache.flink.table.api._

NIck

> On 26 Sep 2019, at 15:16, Dian Fu  wrote:
> 
> Hi Nick,
> 
>>> [error] ……/src/main/scala/org/example/Job.scala:30:13: object util is 
>>> not a member of package org.apache.flink.table.api.java
>>> [error] import java.util.ArrayList
> 
> 
> The error message shows that it tries to find "util.ArrayList" under package 
> "org.apache.flink.table.api.java". So you can try one of the following two 
> solutions:
> 1) Don't import "org.apache.flink.table.api._"
> 2) Use absolute import: "import _root_.java.util.ArrayList"
> 
> Regards,
> Dian
> 
>> 在 2019年9月26日,下午10:04,Nicholas Walton > > 写道:
>> 
>> I’ve shrunk the problem down to a minimal size. The code 
>> 
>> package org.example
>> 
>> import org.apache.flink.table.api._
>> import org.apache.http.HttpHost
>> 
>> import java.util.ArrayList
>> 
>> object foo {
>> 
>>   val httpHosts = new ArrayList[HttpHost]
>>   httpHosts.add(new HttpHost("samwise.local", 9200, "http"))
>> 
>> }
>> will not compile, but remove the import org.apache.flink.table.api._ and all 
>> is well
>> 
>> Nick
>> 
>> 
>>> On 26 Sep 2019, at 12:53, Nicholas Walton >> > wrote:
>>> 
>>> I’m having a problem using ArrayList in Scala . The code is below
>>> 
>>> import org.apache.flink.core.fs._
>>> import org.apache.flink.streaming.api._
>>> import org.apache.flink.streaming.api.scala._
>>> import org.apache.flink.table.api._
>>> import org.apache.flink.table.api.scala._
>>> import org.apache.flink.table.sinks._
>>> import org.apache.http.HttpHost
>>> import org.slf4j.LoggerFactory
>>> 
>>> import java.util.ArrayList
>>> 
>>> object Job {
>>> 
>>>   val httpHosts = new ArrayList[HttpHost]
>>>   httpHosts.add(new HttpHost("samwise.local", 9200, "http"))
>>> …..
>>> 
>>> }
>>> 
>>> Scala refuses to recognise ArrayList. The IDE InteliJ likewise flags 
>>> java.util as not existing, even though it recognises ArrayList and suggest 
>>> auto-insertion of the import java.utils.ArrayList statement. The 
>>> compilation errors are
>>> 
>>> [error] ……/src/main/scala/org/example/Job.scala:30:13: object util is 
>>> not a member of package org.apache.flink.table.api.java
>>> [error] import java.util.ArrayList
>>> [error] ^
>>> [error] ……/src/main/scala/org/example/Job.scala:34:23: not found: type 
>>> ArrayList
>>> [error]   val httpHosts = new ArrayList[HttpHost]
>>> [error]   ^
>>> [error] two errors found
>>> [error] (Compile / compileIncremental) Compilation failed
>>> 
>>> Without the ArrayList reference the code compiles with no errors.
>>> 
>>> The sbt build file, if it helps is,
>>> 
>>> resolvers in ThisBuild ++= Seq("Apache Development Snapshot Repository" at 
>>> "https://repository.apache.org/content/repositories/snapshots/ 
>>> ", 
>>> Resolver.mavenLocal)
>>> 
>>> name := "Flink MultiChannel Project"
>>> 
>>> version := "0.1-SNAPSHOT"
>>> 
>>> organization := "org.example"
>>> 
>>> scalaVersion in ThisBuild := "2.11.0"
>>> 
>>> val flinkVersion = "1.8.1"
>>> 
>>> val flinkDependencies = Seq(
>>>   "org.apache.flink" %% "flink-scala" % flinkVersion ,
>>>   "org.apache.flink" %% "flink-table" % "1.7.2" ,
>>>   "org.apache.flink" %% "flink-connector-elasticsearch" % flinkVersion,
>>>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
>>>   "org.apache.httpcomponents" % "httpclient" % "4.5.10",
>>>   "org.apache.httpcomponents" % "httpcore" % "4.4.11")
>>> // https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore 
>>> 
>>> 
>>> lazy val root = (project in file(".")).
>>>   settings(
>>> libraryDependencies ++= flinkDependencies)
>>> 
>>> mainClass in assembly := Some("org.example.Job")
>>> 
>>> // make run command include the provided dependencies
>>> run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in 
>>> (Compile, run), runner in (Compile, run))
>>> 
>>> // exclude Scala library from assembly
>>> assemblyOption in assembly := (assemblyOption in 
>>> assembly).value.copy(includeScala = false)
>>> 
>>> 
>>> TIA
>>> 
>>> Nick
>> 
> 



Re: Problems with java.utils

2019-09-26 Thread Dian Fu
Hi Nick, 

There is a package named "org.apache.flink.table.api.java" in flink and so the 
import of "org.apache.flink.table.api._" causes 
"org.apache.flink.table.api.java" imported. Then all the import of package 
starting with "java" such as "import java.util.ArrayList" will try to find the 
classes under "org.apache.flink.table.api.java" as Scala uses relative import 
by default. So I think nothing is wrong here. This is the behavior of Scala 
language. You just need to use prefix of "_root_" to force using absolute 
import. 

Regards,
Dian

> 在 2019年9月26日,下午10:26,Nicholas Walton  写道:
> 
> Dian
> 
> That fixed the problem thanks you. It would appear that someone has taken it 
> upon themselves to redefine part of the Java standard library in 
> org.apache.flink.table.api._
> 
> NIck
> 
>> On 26 Sep 2019, at 15:16, Dian Fu > > wrote:
>> 
>> Hi Nick,
>> 
 [error] ……/src/main/scala/org/example/Job.scala:30:13: object util is 
 not a member of package org.apache.flink.table.api.java
 [error] import java.util.ArrayList
>> 
>> 
>> The error message shows that it tries to find "util.ArrayList" under package 
>> "org.apache.flink.table.api.java". So you can try one of the following two 
>> solutions:
>> 1) Don't import "org.apache.flink.table.api._"
>> 2) Use absolute import: "import _root_.java.util.ArrayList"
>> 
>> Regards,
>> Dian
>> 
>>> 在 2019年9月26日,下午10:04,Nicholas Walton >> > 写道:
>>> 
>>> I’ve shrunk the problem down to a minimal size. The code 
>>> 
>>> package org.example
>>> 
>>> import org.apache.flink.table.api._
>>> import org.apache.http.HttpHost
>>> 
>>> import java.util.ArrayList
>>> 
>>> object foo {
>>> 
>>>   val httpHosts = new ArrayList[HttpHost]
>>>   httpHosts.add(new HttpHost("samwise.local", 9200, "http"))
>>> 
>>> }
>>> will not compile, but remove the import org.apache.flink.table.api._ and 
>>> all is well
>>> 
>>> Nick
>>> 
>>> 
 On 26 Sep 2019, at 12:53, Nicholas Walton >>> > wrote:
 
 I’m having a problem using ArrayList in Scala . The code is below
 
 import org.apache.flink.core.fs._
 import org.apache.flink.streaming.api._
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.sinks._
 import org.apache.http.HttpHost
 import org.slf4j.LoggerFactory
 
 import java.util.ArrayList
 
 object Job {
 
   val httpHosts = new ArrayList[HttpHost]
   httpHosts.add(new HttpHost("samwise.local", 9200, "http"))
 …..
 
 }
 
 Scala refuses to recognise ArrayList. The IDE InteliJ likewise flags 
 java.util as not existing, even though it recognises ArrayList and suggest 
 auto-insertion of the import java.utils.ArrayList statement. The 
 compilation errors are
 
 [error] ……/src/main/scala/org/example/Job.scala:30:13: object util is 
 not a member of package org.apache.flink.table.api.java
 [error] import java.util.ArrayList
 [error] ^
 [error] ……/src/main/scala/org/example/Job.scala:34:23: not found: type 
 ArrayList
 [error]   val httpHosts = new ArrayList[HttpHost]
 [error]   ^
 [error] two errors found
 [error] (Compile / compileIncremental) Compilation failed
 
 Without the ArrayList reference the code compiles with no errors.
 
 The sbt build file, if it helps is,
 
 resolvers in ThisBuild ++= Seq("Apache Development Snapshot Repository" at 
 "https://repository.apache.org/content/repositories/snapshots/ 
 ", 
 Resolver.mavenLocal)
 
 name := "Flink MultiChannel Project"
 
 version := "0.1-SNAPSHOT"
 
 organization := "org.example"
 
 scalaVersion in ThisBuild := "2.11.0"
 
 val flinkVersion = "1.8.1"
 
 val flinkDependencies = Seq(
   "org.apache.flink" %% "flink-scala" % flinkVersion ,
   "org.apache.flink" %% "flink-table" % "1.7.2" ,
   "org.apache.flink" %% "flink-connector-elasticsearch" % flinkVersion,
   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
   "org.apache.httpcomponents" % "httpclient" % "4.5.10",
   "org.apache.httpcomponents" % "httpcore" % "4.4.11")
 // https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore 
 
 
 lazy val root = (project in file(".")).
   settings(
 libraryDependencies ++= flinkDependencies)
 
 mainClass in assembly := Some("org.example.Job")
 
 // make run command include the provided dependencies
 run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in 
 (Compile, run), runner in (Compile, run))
 
 // exclude Scala libra

Re: Flink- Heap Space running out

2019-09-26 Thread miki haiat
You can configure the task manager memory in the config.yaml file.
What is the current configuration?

On Thu, Sep 26, 2019, 17:14 Nishant Gupta 
wrote:

>  am running a query to join a stream and a table as below. It is running
> out of heap space. Even though it has enough heap space in flink cluster
> (60GB * 3)
>
> Is there an eviction strategy needed for this query ?
>
> *SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON
> sourceKafka.CC=DefaulterTable.CC;  *
>
> Thanks
>
> Nishant
>


Re: Flink- Heap Space running out

2019-09-26 Thread Fabian Hueske
Hi,

I don' think that the memory configuration is the issue.
The problem is the join query. The join does not have any temporal
boundaries.
Therefore, both tables are completely stored in memory and never released.

You can configure a memory eviction strategy via idle state retention [1]
but you should make sure that this is really what you want.
Alternatively, try a time-windowed join or a join with a temporal table
function.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html#idle-state-retention-time

Am Do., 26. Sept. 2019 um 17:08 Uhr schrieb miki haiat :

> You can configure the task manager memory in the config.yaml file.
> What is the current configuration?
>
> On Thu, Sep 26, 2019, 17:14 Nishant Gupta 
> wrote:
>
>>  am running a query to join a stream and a table as below. It is running
>> out of heap space. Even though it has enough heap space in flink cluster
>> (60GB * 3)
>>
>> Is there an eviction strategy needed for this query ?
>>
>> *SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON
>> sourceKafka.CC=DefaulterTable.CC;  *
>>
>> Thanks
>>
>> Nishant
>>
>


Re: Flink job manager doesn't remove stale checkmarks

2019-09-26 Thread Clay Teeter
I see, I'll try turning off incremental checkpoints to see if that helps.

re: Diskspace, i could see a scenario with my application where i could get
10,000+ checkpoints, if the checkpoints are additive.  I'll let you know
what i see.

Thanks!
Clay


On Wed, Sep 25, 2019 at 5:40 PM Fabian Hueske  wrote:

> Hi,
>
> You enabled incremental checkpoints.
> This means that parts of older checkpoints that did not change since the
> last checkpoint are not removed because they are still referenced by the
> incremental checkpoints.
> Flink will automatically remove them once they are not needed anymore.
>
> Are you sure that the size of your application's state is not growing too
> large?
>
> Best, Fabian
>
> Am Di., 24. Sept. 2019 um 10:47 Uhr schrieb Clay Teeter <
> clay.tee...@maalka.com>:
>
>> Oh geez,  checkmarks  = checkpoints... sorry.
>>
>> What i mean by stale "checkpoints" are checkpoints that should be reaped
>> by: "state.checkpoints.num-retained: 3".
>>
>> What is happening is that directories:
>>   - state.checkpoints.dir: file:///opt/ha/49/checkpoints
>>   - high-availability.storageDir: file:///opt/ha/49/ha
>> are growing with every checkpoint and i'm running out of disk space.
>>
>> On Tue, Sep 24, 2019 at 4:55 AM Biao Liu  wrote:
>>
>>> Hi Clay,
>>>
>>> Sorry I don't get your point. I'm not sure what the "stale checkmarks"
>>> exactly means. The HA storage and checkpoint directory left after shutting
>>> down cluster?
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Tue, 24 Sep 2019 at 03:12, Clay Teeter 
>>> wrote:
>>>
 I'm trying to get my standalone cluster to remove stale checkmarks.

 The cluster is composed of a single job and task manager backed by
 rocksdb with high availability.

 The configuration on both the job and task manager are:

 state.backend: rocksdb
 state.checkpoints.dir: file:///opt/ha/49/checkpoints
 state.backend.incremental: true
 state.checkpoints.num-retained: 3
 jobmanager.heap.size: 1024m
 taskmanager.heap.size: 2048m
 taskmanager.numberOfTaskSlots: 24
 parallelism.default: 1
 high-availability.jobmanager.port: 6123
 high-availability.zookeeper.path.root: _49
 high-availability: zookeeper
 high-availability.storageDir: file:///opt/ha/49/ha
 high-availability.zookeeper.quorum: **t:2181

 Both machines have access to /opt/ha/49 and /opt/ha/49/checkpoints via
 NFS and are owned by the flink user.  Also, there are no errors that i can
 find.

 Does anyone have any ideas that i could try?




UnitTests and ProcessTimeWindows - Missing results

2019-09-26 Thread Clay Teeter
What is the best way to run unit tests on streams that contain
ProcessTimeWindows?

Example:

def bufferDataStreamByProcessId(ds: DataStream[MaalkaRecord]):
DataStream[MaalkaRecord] = {
  ds.map { r =>
println(s"data in: $r") // Data shows up here
r
  }.keyBy { mr =>
val r = mr.asInstanceOf[MaalkaDataRecord]
r.dataProcessingId -> r.isBefore
  }
.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(10)))
.reduce { (_, r2) =>
  println(s"Reducing: r2") // Data does NOT show up here
  r2
}
.map { r =>
  println(s"Emitted: $r") // Data does NOT show up here
  r
}
}


This stream completes about 100ms after the first element is received by
the ProcessingTimeSessionWindow but no data is emitted from the
sessionWindow.

If i change the window to use a
TumblingProcessTimeWindow.of(Time.milliseconds(1)) some of windows do emit,
but the many of the expected values are missing.

Any ideas?

Cheers,
Clay


Debugging slow/failing checkpoints

2019-09-26 Thread Steven Nelson
I am working with an application that hasn't gone to production yet. We run
Flink as a cluster within a K8s environment. It has the following attributes

1) 2 Job Manager configured using HA, backed by Zookeeper and HDFS
2) 4 Task Managers
3) Configured to use RocksDB. The actual RocksDB files are configured to be
written to a locally attached NVMe drive.
4) We checkpoint every 15 seconds, with a minimum delay of 7.5 seconds.
5) There is currently very little load going through the system (it's in a
test environment). The web console indicates there isn't any Back Pressure
6) The cluster is running Flink 1.9.0
7) I don't see anything unexpected in the logs
8) Checkpoints take longer than 10 minutes with very little state (<1 mb),
they fail due to timeout
9) Eventually the job fails because it can't checkpoint.

What steps beyond what I have already done should I consider to debug this?

-Steve


Re: Flink job manager doesn't remove stale checkmarks

2019-09-26 Thread Clay Teeter
I looked into the disk issues and found that Fabian was on the right path.
The checkpoints that were lingering were in-fact in use.

Thanks for the help!
Clay



On Thu, Sep 26, 2019 at 8:09 PM Clay Teeter  wrote:

> I see, I'll try turning off incremental checkpoints to see if that helps.
>
> re: Diskspace, i could see a scenario with my application where i could
> get 10,000+ checkpoints, if the checkpoints are additive.  I'll let you
> know what i see.
>
> Thanks!
> Clay
>
>
> On Wed, Sep 25, 2019 at 5:40 PM Fabian Hueske  wrote:
>
>> Hi,
>>
>> You enabled incremental checkpoints.
>> This means that parts of older checkpoints that did not change since the
>> last checkpoint are not removed because they are still referenced by the
>> incremental checkpoints.
>> Flink will automatically remove them once they are not needed anymore.
>>
>> Are you sure that the size of your application's state is not growing too
>> large?
>>
>> Best, Fabian
>>
>> Am Di., 24. Sept. 2019 um 10:47 Uhr schrieb Clay Teeter <
>> clay.tee...@maalka.com>:
>>
>>> Oh geez,  checkmarks  = checkpoints... sorry.
>>>
>>> What i mean by stale "checkpoints" are checkpoints that should be reaped
>>> by: "state.checkpoints.num-retained: 3".
>>>
>>> What is happening is that directories:
>>>   - state.checkpoints.dir: file:///opt/ha/49/checkpoints
>>>   - high-availability.storageDir: file:///opt/ha/49/ha
>>> are growing with every checkpoint and i'm running out of disk space.
>>>
>>> On Tue, Sep 24, 2019 at 4:55 AM Biao Liu  wrote:
>>>
 Hi Clay,

 Sorry I don't get your point. I'm not sure what the "stale checkmarks"
 exactly means. The HA storage and checkpoint directory left after shutting
 down cluster?

 Thanks,
 Biao /'bɪ.aʊ/



 On Tue, 24 Sep 2019 at 03:12, Clay Teeter 
 wrote:

> I'm trying to get my standalone cluster to remove stale checkmarks.
>
> The cluster is composed of a single job and task manager backed by
> rocksdb with high availability.
>
> The configuration on both the job and task manager are:
>
> state.backend: rocksdb
> state.checkpoints.dir: file:///opt/ha/49/checkpoints
> state.backend.incremental: true
> state.checkpoints.num-retained: 3
> jobmanager.heap.size: 1024m
> taskmanager.heap.size: 2048m
> taskmanager.numberOfTaskSlots: 24
> parallelism.default: 1
> high-availability.jobmanager.port: 6123
> high-availability.zookeeper.path.root: _49
> high-availability: zookeeper
> high-availability.storageDir: file:///opt/ha/49/ha
> high-availability.zookeeper.quorum: **t:2181
>
> Both machines have access to /opt/ha/49 and /opt/ha/49/checkpoints via
> NFS and are owned by the flink user.  Also, there are no errors that i can
> find.
>
> Does anyone have any ideas that i could try?
>
>


** Help need w.r.t parallelism settings in flink **

2019-09-26 Thread Akshay Iyangar
Hi
So we are running a beam pipeline that uses flink as its execution engine. We 
are currently on flink1.8
So per the flink documentation I see that there is an option that allows u to 
set

Parallelism
and
maxParallelism.

We actually want to set both so that we can dynamically scale the pipeline if 
there is back pressure on it.

I want to clarify a few doubts I had –


  1.  Does the maxParallelism only work for stateful operators or does it work 
for all the operators?


  1.  Also is the setting either or ? Like we can only set parallelism or 
maxParallelism? Or is it that we can set both?


  1.  Because, Currently I have the parallelism at 8 and maxparallelism at 32 
but the UI only shows me that it is using parallelism of 8. It would be great 
if someone helps me understand the exact behavior of these parameters.

Thanks
Akshay Iyangar



Re: ** Help need w.r.t parallelism settings in flink **

2019-09-26 Thread Zhu Zhu
Hi Akshay,

For your questions,
1. One main purpose of maxParallelism is to decide the count of
keyGroup. keyGroup is the bucket for keys when doing keyBy partitioning.
So a larger maxParallelism indicates a finer granularity for key
distribution. No matter it's a stateful operator or not.

2. You can only set the parallelism and Flink can automatically decide the
maxParallelism for it.
But it is recommended to set the maxParallelism to a fixed proper value.

3. The parallelism is the actually parallelism used.
maxParallelism is the upper bound limit of parallelism when you tries to
change the parallelism via manually rescaling.

Thanks,
Zhu Zhu

Akshay Iyangar  于2019年9月27日周五 上午4:20写道:

> Hi
>
> So we are running a beam pipeline that uses flink as its execution engine.
> We are currently on flink1.8
>
> So per the flink documentation I see that there is an option that allows u
> to set
>
>
>
> *Parallelism *
>
> and
>
> *maxParallelism*.
>
>
>
> We actually want to set both so that we can dynamically scale the pipeline
> if there is back pressure on it.
>
>
>
> I want to clarify a few doubts I had –
>
>
>
>1. Does the maxParallelism only work for stateful operators or does it
>work for all the operators?
>
>
>
>1. Also is the setting either or ? Like we can only set parallelism or
>maxParallelism? Or is it that we can set both?
>
>
>
>1. Because, Currently I have the parallelism at 8 and maxparallelism
>at 32 but the UI only shows me that it is using parallelism of 8. It would
>be great if someone helps me understand the exact behavior of these
>parameters.
>
>
>
> Thanks
>
> Akshay Iyangar
>
>
>


Re: Debugging slow/failing checkpoints

2019-09-26 Thread Congxian Qiu
Hi  Steve

1. Do you use exactly once or at least once?
2. Do you use incremental or not
3. Do you have any timer, and where does the timer stored(Heap or RocksDB),
you can ref the config here[1], you can try store the timer in RocksDB.
4. Does the align time too long
5. You can check if it is sync duration took too long time or async
duration tool too long time.
6. If the io/network during the checkpoint has reached the limit

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#rocksdb-state-backend-config-options
Best,
Congxian


Steven Nelson  于2019年9月27日周五 上午3:33写道:

>
> I am working with an application that hasn't gone to production yet. We
> run Flink as a cluster within a K8s environment. It has the following
> attributes
>
> 1) 2 Job Manager configured using HA, backed by Zookeeper and HDFS
> 2) 4 Task Managers
> 3) Configured to use RocksDB. The actual RocksDB files are configured to
> be written to a locally attached NVMe drive.
> 4) We checkpoint every 15 seconds, with a minimum delay of 7.5 seconds.
> 5) There is currently very little load going through the system (it's in a
> test environment). The web console indicates there isn't any Back Pressure
> 6) The cluster is running Flink 1.9.0
> 7) I don't see anything unexpected in the logs
> 8) Checkpoints take longer than 10 minutes with very little state (<1 mb),
> they fail due to timeout
> 9) Eventually the job fails because it can't checkpoint.
>
> What steps beyond what I have already done should I consider to debug this?
>
> -Steve
>
>
>
>


Re: 订阅邮件

2019-09-26 Thread Dian Fu
To subscribe to the mailing list, you need send email to the following address 
dev-subscr...@flink.apache.org , 
user-subscr...@flink.apache.org  and 
user-zh-subscr...@flink.apache.org  
separately.

> 在 2019年9月26日,上午9:58,杨利君  写道:
> 
> 订阅flink社区邮件



Re: CSV Table source as data-stream in environment file

2019-09-26 Thread Dian Fu
You need to add the following configuration to configure it run in streaming 
mode[1].

execution:
  type: streaming

Regards,
Dian

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html#environment-files
 

> 在 2019年9月26日,下午5:35,Nishant Gupta  写道:
> 
> Hi Team,
> 
> How do we define csv table source as a data-stream instead of data-set in 
> environment file.?
> 
> Whether or not i mention  update-mode: append  or not... I takes only csv 
> file as data-set.
> Is there any detailed reference to environment file configuration where sinks 
> and sources are defined.
> 
>  - name: badips
> type: source-table
> update-mode: append
> connector:
>   type: filesystem
>   path: "/home/user/file.csv"
> format:
>   type: csv
>   fields:
> - name: col1
>   type: VARCHAR
>   comment-prefix: "#"
> schema:
>   - name: col1
> type: VARCHAR



Reading Key-Value from Kafka | Eviction policy.

2019-09-26 Thread srikanth flink
Hi,

My data source is Kafka, all these days have been reading the values from
Kafka stream to a table. The table just grows and runs into a heap issue.

Came across the eviction policy that works on only keys, right?

Have researched to configure the environment file(Flink SLQ) to read both
key and value, so as the eviction works on the keys and older data is
cleared. I found nothing in the docs, so far.

Could someone help with that?
If there's no support for reading key and value, can someone help me to
assign a key to the table I'm building from stream?

Thanks
Srikanth


Re: Reading Key-Value from Kafka | Eviction policy.

2019-09-26 Thread miki haiat
I'm sure there is several ways to implement it. Can you elaborate more on
your use case ?

On Fri, Sep 27, 2019, 08:37 srikanth flink  wrote:

> Hi,
>
> My data source is Kafka, all these days have been reading the values from
> Kafka stream to a table. The table just grows and runs into a heap issue.
>
> Came across the eviction policy that works on only keys, right?
>
> Have researched to configure the environment file(Flink SLQ) to read both
> key and value, so as the eviction works on the keys and older data is
> cleared. I found nothing in the docs, so far.
>
> Could someone help with that?
> If there's no support for reading key and value, can someone help me to
> assign a key to the table I'm building from stream?
>
> Thanks
> Srikanth
>


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-26 Thread Vijay Bhaskar
Suppose my cluster got crashed and need to bring up the entire cluster
back? Does HA still helps to run the cluster from latest save point?

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester 
wrote:

> thanks to everyone for all the replies.
>
> i think the original concern here with "just" relying on the HA option is
> that there are some disaster recovery and data center migration use cases
> where the continuity of the job managers is difficult to preserve. but
> those are admittedly very edgy use cases. i think it's definitely worth
> reviewing the SLAs with our site reliability engineers to see how likely it
> would be to completely lose all job managers under an HA configuration.
> that small a risk might be acceptable/preferable to a one-off solution.
>
> @Aleksander, would love to learn more about Zookeeper-less HA. i think i
> spotted a thread somewhere between Till and someone (perhaps you) about
> that. feel free to DM me.
>
> thanks again to everyone!
>
> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang  wrote:
>
>> Hi, Aleksandar
>>
>> Savepoint option in standalone job cluster is optional. If you want to
>> always recover
>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>> could use the
>> high-availability configuration. Make sure the cluster-id is not changed,
>> i think the job
>> could recover both at exceptionally crash and restart by expectation.
>>
>> @Aleksandar Mastilovic , we are also have
>> an zookeeper-less high-availability implementation[1].
>> Maybe we could have some discussion and contribute this useful feature to
>> the community.
>>
>> [1].
>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>
>> Best,
>> Yang
>>
>> Aleksandar Mastilovic  于2019年9月26日周四
>> 上午4:11写道:
>>
>>> Would you guys (Flink devs) be interested in our solution for
>>> zookeeper-less HA? I could ask the managers how they feel about
>>> open-sourcing the improvement.
>>>
>>> On Sep 25, 2019, at 11:49 AM, Yun Tang  wrote:
>>>
>>> As Aleksandar said, k8s with HA configuration could solve your problem.
>>> There already have some discussion about how to implement such HA in k8s if
>>> we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
>>> Currently, you might only have to choose zookeeper as high-availability
>>> service.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-11105
>>> [2] https://issues.apache.org/jira/browse/FLINK-12884
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Aleksandar Mastilovic 
>>> *Sent:* Thursday, September 26, 2019 1:57
>>> *To:* Sean Hester 
>>> *Cc:* Hao Sun ; Yuval Itzchakov ;
>>> user 
>>> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>>>
>>> Can’t you simply use JobManager in HA mode? It would pick up where it
>>> left off if you don’t provide a Savepoint.
>>>
>>> On Sep 25, 2019, at 6:07 AM, Sean Hester 
>>> wrote:
>>>
>>> thanks for all replies! i'll definitely take a look at the Flink k8s
>>> Operator project.
>>>
>>> i'll try to restate the issue to clarify. this issue is specific to
>>> starting a job from a savepoint in job-cluster mode. in these cases the Job
>>> Manager container is configured to run a single Flink job at start-up. the
>>> savepoint needs to be provided as an argument to the entrypoint. the Flink
>>> documentation for this approach is here:
>>>
>>>
>>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>>
>>> the issue is that taking this approach means that the job will *always* 
>>> start
>>> from the savepoint provided as the start argument in the Kubernetes YAML.
>>> this includes unplanned restarts of the job manager, but we'd really prefer
>>> any *unplanned* restarts resume for the most recent checkpoint instead
>>> of restarting from the configured savepoint. so in a sense we want the
>>> savepoint argument to be transient, only being used during the initial
>>> deployment, but this runs counter to the design of Kubernetes which always
>>> wants to restore a deployment to the "goal state" as defined in the YAML.
>>>
>>> i hope this helps. if you want more details please let me know, and
>>> thanks again for your time.
>>>
>>>
>>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun  wrote:
>>>
>>> I think I overlooked it. Good point. I am using Redis to save the path
>>> to my savepoint, I might be able to set a TTL to avoid such issue.
>>>
>>> Hao Sun
>>>
>>>
>>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov 
>>> wrote:
>>>
>>> Hi Hao,
>>>
>>> I think he's exactly talking about the usecase where the JM/TM restart
>>> and they come back up from the latest savepoint which might be stale by
>>> that time.
>>>
>>> On Tue, 24 Sep 2019, 19:24 Hao Sun,  wrote:
>>>
>>> We always make a savepoint before we shutdown the job-cluster. So the
>>> savepoint is always the latest. When we fix a bug or change the job graph,
>>> it can resume well.
>>> We only use checkpoints for unpla

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-26 Thread Vijay Bhaskar
I don't think HA will help to recover from cluster crash, for that we
should take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar 
wrote:

> Suppose my cluster got crashed and need to bring up the entire cluster
> back? Does HA still helps to run the cluster from latest save point?
>
> Regards
> Bhaskar
>
> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester 
> wrote:
>
>> thanks to everyone for all the replies.
>>
>> i think the original concern here with "just" relying on the HA option is
>> that there are some disaster recovery and data center migration use cases
>> where the continuity of the job managers is difficult to preserve. but
>> those are admittedly very edgy use cases. i think it's definitely worth
>> reviewing the SLAs with our site reliability engineers to see how likely it
>> would be to completely lose all job managers under an HA configuration.
>> that small a risk might be acceptable/preferable to a one-off solution.
>>
>> @Aleksander, would love to learn more about Zookeeper-less HA. i think i
>> spotted a thread somewhere between Till and someone (perhaps you) about
>> that. feel free to DM me.
>>
>> thanks again to everyone!
>>
>> On Thu, Sep 26, 2019 at 7:32 AM Yang Wang  wrote:
>>
>>> Hi, Aleksandar
>>>
>>> Savepoint option in standalone job cluster is optional. If you want to
>>> always recover
>>> from the latest checkpoint, just as Aleksandar and Yun Tang said you
>>> could use the
>>> high-availability configuration. Make sure the cluster-id is not
>>> changed, i think the job
>>> could recover both at exceptionally crash and restart by expectation.
>>>
>>> @Aleksandar Mastilovic , we are also have
>>> an zookeeper-less high-availability implementation[1].
>>> Maybe we could have some discussion and contribute this useful feature
>>> to the community.
>>>
>>> [1].
>>> https://docs.google.com/document/d/1Z-VdJlPPEQoWT1WLm5woM4y0bFep4FrgdJ9ipQuRv8g/edit
>>>
>>> Best,
>>> Yang
>>>
>>> Aleksandar Mastilovic  于2019年9月26日周四
>>> 上午4:11写道:
>>>
 Would you guys (Flink devs) be interested in our solution for
 zookeeper-less HA? I could ask the managers how they feel about
 open-sourcing the improvement.

 On Sep 25, 2019, at 11:49 AM, Yun Tang  wrote:

 As Aleksandar said, k8s with HA configuration could solve your problem.
 There already have some discussion about how to implement such HA in k8s if
 we don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2].
 Currently, you might only have to choose zookeeper as high-availability
 service.

 [1] https://issues.apache.org/jira/browse/FLINK-11105
 [2] https://issues.apache.org/jira/browse/FLINK-12884

 Best
 Yun Tang
 --
 *From:* Aleksandar Mastilovic 
 *Sent:* Thursday, September 26, 2019 1:57
 *To:* Sean Hester 
 *Cc:* Hao Sun ; Yuval Itzchakov ;
 user 
 *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes

 Can’t you simply use JobManager in HA mode? It would pick up where it
 left off if you don’t provide a Savepoint.

 On Sep 25, 2019, at 6:07 AM, Sean Hester 
 wrote:

 thanks for all replies! i'll definitely take a look at the Flink k8s
 Operator project.

 i'll try to restate the issue to clarify. this issue is specific to
 starting a job from a savepoint in job-cluster mode. in these cases the Job
 Manager container is configured to run a single Flink job at start-up. the
 savepoint needs to be provided as an argument to the entrypoint. the Flink
 documentation for this approach is here:


 https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint

 the issue is that taking this approach means that the job will *always*
  start from the savepoint provided as the start argument in the
 Kubernetes YAML. this includes unplanned restarts of the job manager, but
 we'd really prefer any *unplanned* restarts resume for the most recent
 checkpoint instead of restarting from the configured savepoint. so in a
 sense we want the savepoint argument to be transient, only being used
 during the initial deployment, but this runs counter to the design of
 Kubernetes which always wants to restore a deployment to the "goal state"
 as defined in the YAML.

 i hope this helps. if you want more details please let me know, and
 thanks again for your time.


 On Tue, Sep 24, 2019 at 1:09 PM Hao Sun  wrote:

 I think I overlooked it. Good point. I am using Redis to save the path
 to my savepoint, I might be able to set a TTL to avoid such issue.

 Hao Sun


 On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov 
 wrote:

 Hi Hao,

 I think he's exactly talking about the usecase where the JM/TM restart
 and they 

Re: Reading Key-Value from Kafka | Eviction policy.

2019-09-26 Thread srikanth flink
Hi Miki,

What are those several ways? could you help me with references?

Use case:

We have a continuous credit card transaction stream flowing into a Kafka
topic, along with a set of defaulters of credit card in a .csv file(which
gets updated every day).


Thanks

Srikanth


On Fri, Sep 27, 2019 at 11:11 AM miki haiat  wrote:

> I'm sure there is several ways to implement it. Can you elaborate more on
> your use case ?
>
> On Fri, Sep 27, 2019, 08:37 srikanth flink  wrote:
>
>> Hi,
>>
>> My data source is Kafka, all these days have been reading the values from
>> Kafka stream to a table. The table just grows and runs into a heap issue.
>>
>> Came across the eviction policy that works on only keys, right?
>>
>> Have researched to configure the environment file(Flink SLQ) to read both
>> key and value, so as the eviction works on the keys and older data is
>> cleared. I found nothing in the docs, so far.
>>
>> Could someone help with that?
>> If there's no support for reading key and value, can someone help me to
>> assign a key to the table I'm building from stream?
>>
>> Thanks
>> Srikanth
>>
>