Re: StreamingFileSink.forBulkFormat() with CheckpointRollingPolicy issues

2021-01-06 Thread Yun Gao
Hi Mahendra,  

Sorry for the late reply. I noticed that in your code you implement a 
bucket assigner that reads to switch to a new bucket every minute, does it 
related to the current problems met ? Since different buckets would use 
different directories and files, when switching buckets new files would be 
created and used.


Best,
 Yun


 --Original Mail --
Sender:Mahendra Hegde 
Send Date:Tue Dec 29 20:23:33 2020
Recipients:user@flink.apache.org 
Subject:StreamingFileSink.forBulkFormat() with CheckpointRollingPolicy issues

Hello,

I am trying to use StreamingFileSink.forBulkFormat() for writing avro to S3.
I have used ‘CheckpointRollingPolicy’ as DefaultRolling cannot be used with 
bulk formats.

But when I use this I am facing 2 issues :
‘shouldRollOnEvent’ method is getting called on each record addition but 
.getsize() always gives one message size instead of current partFile size.
Files are getting rolled out at every 1 minute even though my checkpoint is 
bigger (3 mins), I don’t find any way to override this 1 min default rolling.

Any suggestion would be appreciated.


Code:

val avroOcfFilesink : StreamingFileSink[GenericRecord] =  
StreamingFileSink.forBulkFormat(new Path(avroOutputPath),
  new AvroWriterFactory[GenericRecord](new AvroBuilder[GenericRecord]() {
override def createWriter(out: OutputStream): 
DataFileWriter[GenericRecord] = {
  val schema: Schema = new 
Schema.Parser().parse(faultCodeOCFRecordSchema)
  val datumWriter = new ReflectDatumWriter[GenericRecord](schema)
  val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
  dataFileWriter.setCodec(CodecFactory.snappyCodec)
  dataFileWriter.create(schema, out)
  dataFileWriter
}
  }))
  .withBucketAssigner(new BucketAssigner[GenericRecord, String] {
override def getBucketId(in: GenericRecord, context: Context): String = 
{
  val bucketIdPrefix = 
configurationParameters.getRequired("s3.bucket.id.prefix")
  val currentProcessingTimeUTC = System.currentTimeMillis()
bucketIdPrefix + 
TimeConversion.convertTimestampToRunDate_HHMM(currentProcessingTimeUTC)

}
override def getSerializer: SimpleVersionedSerializer[String] = { 
SimpleVersionedStringSerializer.INSTANCE }
  }).withBucketCheckInterval(12)
 .withRollingPolicy(
   new CheckpointRollingPolicy[GenericRecord, String] {
override def shouldRollOnEvent(partFileState: PartFileInfo[String], 
element: GenericRecord): Boolean = {
  log.info("## PartFileState.getSize:"+partFileState.getSize+", 
Creation"+partFileState.getCreationTime+",  
Lastupdate:"+partFileState.getLastUpdateTime)
  false
}
override def shouldRollOnProcessingTime(partFileState: 
PartFileInfo[String], currentTime: Long): Boolean = {
  val result : Boolean =  (currentTime - partFileState.getCreationTime) 
>= 1
  log.info(" currentTime:"+currentTime+" , 
partFileState.getCreationTime"+partFileState.getCreationTime+", 
Diff:"+(currentTime - partFileState.getCreationTime)+", result:"+result)
  false
}
  }
).build()


Thanks
MH

Re: "flink list" fails when zk-based HA is enabled in flink-conf.yaml

2021-01-06 Thread Dongwon Kim
Hi Yang,

I was wondering why CLI accessed ZK because, as shown in the following
lines, CLI seemed to know the address of JM by contacting AHS before
connecting to ZK.

2021-01-06 18:35:32,351 INFO  org.apache.flink.client.cli.CliFrontend
[] - Running 'list' command.

2021-01-06 18:35:32,682 INFO  org.apache.hadoop.yarn.client.AHSProxy
[] - Connecting to Application History server at
mobdata-devflink-nm02.dakao.io/10.93.0.91:10200

2021-01-06 18:35:32,763 INFO  org.apache.flink.yarn.YarnClusterDescriptor
[] - Found Web Interface
mobdata-devflink-dn03.dakao.io:37098 of application
'application_1600163418174_0127'.

2021-01-06 18:35:32,773 INFO  org.apache.flink.runtime.util.ZooKeeperUtils
  [] - Enforcing default ACL for ZK connections

2021-01-06 18:35:32,774 INFO  org.apache.flink.runtime.util.ZooKeeperUtils
  [] - Using '/driving-habits/default' as Zookeeper namespace.
Anyway CLI needs to know where the leader (=active) JM is located via a ZK
node and GenericCLI has to be informed of high-availability.cluster-id.
Thanks for the heads up!


You could also specify the "high-availability.cluster-id" so that leader
> retrieval could get the correct JobManager address.
> *flink list --target yarn-per-job -Dyarn.application.id
> =$application_id
> -Dhigh-availability.cluster-id=$application_id*

Okay, it checked that it works. Thank you very much :-) It will be nice for
other users if your answer is also explained on [1].


And the following
> command should work with/without ZooKeeper HA configured.
> *./bin/flink list -m yarn-cluster -yid $applicationId*

I'm very confused as there's different ways to specify YARN per-job
clusters:
- "--target yarn-per-job" is explained in the current documentation [1] and
it looks like the most recent one, so I'd rather use this one with
"-Dhigh-availability.cluster-id=$application_id"
- Is "--jobmanater yarn-per-job" a preferred way of specifying per-job
clusters and is it going to co-exist with "--target yarn-per-job" for the
future releases? but It looks old-school to me.
- There's also "--executor yarn-per-job" which seems to be deprecated soon
(explained in "flink help")

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#per-job-cluster-mode

Best,

Dongwon

On Wed, Jan 6, 2021 at 12:33 PM Yang Wang  wrote:

> Hi Dongwon,
>
> I think the root cause is that GenericCLI do not override the
> "high-availability.cluster-id" with specified application id.
> The GenericCLI is activated by "--target yarn-per-job". In
> the FlinkYarnSessionCli, we have done this. And the following
> command should work with/without ZooKeeper HA configured.
>
>
> *./bin/flink list -m yarn-cluster -yid $applicationId*
>
> You could also specify the "high-availability.cluster-id" so that leader
> retrieval could get the correct JobManager address.
>
>
> *flink list --target yarn-per-job -Dyarn.application.id
> =$application_id
> -Dhigh-availability.cluster-id=$application_id*
>
> BTW, this is not a new introduced behavior change in Flink 1.12. I believe
> it also could not work in 1.11 and 1.10.
>
>
> Best,
> Yang
>
>
> Dongwon Kim  于2021年1月5日周二 下午11:22写道:
>
>> Hi,
>>
>> I'm using Flink-1.12.0 and running on Hadoop YARN.
>>
>> After setting HA-related properties in flink-conf.yaml,
>>
>> high-availability: zookeeper
>>
>> high-availability.zookeeper.path.root: /recovery
>>
>> high-availability.zookeeper.quorum: nm01:2181,nm02:2181,nm03:2181
>>
>> high-availability.storageDir: hdfs:///flink/recovery
>>
>> the following command hangs and fails:
>>
>> $ flink list --target yarn-per-job -Dyarn.application.id=$application_id
>>
>> Before setting the properties, I can see the following lines after
>> executing the above command:
>>
>> 2021-01-06 00:11:48,961 INFO  
>> org.apache.flink.runtime.security.modules.HadoopModule
>>   [] - Hadoop user set to deploy (auth:SIMPLE)
>>
>> 2021-01-06 00:11:48,968 INFO  
>> org.apache.flink.runtime.security.modules.JaasModule
>> [] - Jaas file will be created as
>> /tmp/jaas-8522045433029410483.conf.
>>
>> 2021-01-06 00:11:48,976 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] - Running 'list' command.
>>
>> 2021-01-06 00:11:49,316 INFO  org.apache.hadoop.yarn.client.AHSProxy
>>   [] - Connecting to Application History server at nm02/
>> 10.93.0.91:10200
>>
>> 2021-01-06 00:11:49,324 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>> [] - No path for the flink jar passed. Using the
>> location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the
>> jar
>>
>> 2021-01-06 00:11:49,333 WARN  org.apache.flink.yarn.YarnClusterDescriptor
>> [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR
>> environment variable is set.The Flink YARN Client needs one of these to be
>> set to properly load the Ha

Re: Batch with Flink Steraming API version 1.12.0

2021-01-06 Thread Arvid Heise
Hi Robert,

The most reliable way to use batch mode in streaming is to use event time
[1]. Processing time windows or ingestion time does not make a whole lot of
sense if you want to do some kind of reprocessing (indeterministic results
and resource usage because the timestamp of records change with every
execution).

For windows to work in event time, you often need to define watermark
strategy [2]. Note that in your example, you used the old source which
doesn't support batch execution mode.

Here is a sketch on how I'd modify it

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*);
KafkaSource source =
KafkaSource.builder()
.setBootstrapServers(...)
.setGroupId(...)
.setTopics(...)
.setDeserializer(...)
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();

DataStream stream = env
*.fromSource(source,
WatermarkStrategy.forMonotonousTimestamps(), *"Kafka Source");

Note that the specific watermark strategy depends on your data. I have
chosen the most common strategy for Kafka which assumes that in each
partition timestamps are (non-strictly) increasing. If you have some out of
order events, you probably need forBoundedOutOfOrderness.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/timely-stream-processing.html#event-time-and-watermarks
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html#event-time-and-watermarks

On Tue, Jan 5, 2021 at 10:21 PM Robert Cullen  wrote:

> Arvid,
>
> I’m hoping to get your input on a process I’m working on. Originally I was
> using a streaming solution but noticed that the data in the sliding windows
> was getting too large over longer intervals and sometimes stopped
> processing altogether. Anyway, the total counts should be a fixed number so
> a batch process would be more acceptable.
>
> The use case is this: Get counts on keys for 30 minutes of data, take
> those totals and take a 30 second time slice on the same data, possibly
> consecutive time slices, take the results and run it through one function:
> Originally my code looked like this using Sliding Time Windows in streaming
> mode:
>
>  final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>
> DataStream stream = env
> .addSource(getConsumer(properties))
> .name("Kafka Source");
>
> DataStream> keyedCounts  = stream
> .filter(value -> value.getGrokName() != null)
> .map(new MapFunction>() {
> @Override
> public Tuple2 map(FluentdMessage value) 
> throws Exception {
> return Tuple2.of(value.getGrokName(), 1L);
> }
> })
> .keyBy(value -> value.f0)
> .window(SlidingProcessingTimeWindows.of(Time.minutes(30), 
> Time.seconds(30)))
> .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
> //.sum(2);
> .reduce((ReduceFunction>) (data1, data2) 
> -> Tuple2.of(data1.f0, data1.f1 + data2.f1));
>
>keyedCounts
> .windowAll(SlidingProcessingTimeWindows.of(Time.minutes(30), 
> Time.seconds(30)))
> .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
> .process(new ProcessAllWindowFunction, 
> Tuple5, TimeWindow>() {
>
> private ValueState currentCount;
>
> @Override
> public void open(Configuration parameters) throws 
> Exception {
> currentCount = getRuntimeContext().getState(
> new ValueStateDescriptor<>("count", 
> Long.class));
> }
>
> @Override
> public void process(Context context,
> Iterable> 
> iterable,
> Collector String, Long>> out) throws Exception {
> long count = 
> StreamSupport.stream(iterable.spliterator(), false).count();
> if(currentCount.value() == null) {
> currentCount.update(0L);
> }
> Iterator> iterator = 
> iterable.iterator();
> Map map = new HashMap<>();
> Map> keyTotalMap = new HashMap<>();
>
> if(currentCount.value() < count) {
>   

Re: "flink list" fails when zk-based HA is enabled in flink-conf.yaml

2021-01-06 Thread Yang Wang
Hi Dongwon,

Please find the answer inline.

> why CLI accessed ZK?
This is a good question. Currently, when the HA is enabled, even though we
could get the JobManager rest endpoint from Yarn application report, we
still have to retrieve the leader information from ZooKeeper. Please find
more information in the class *RestClusterClient*. I am not aware of any
potential issues if we directly retrieve rest endpoint from Yarn
application report. And I think this could be a minor improvement.

> Specify the "high-availability.cluster-id" to list jobs
I have created a ticket for updating the documentation[1].

> About the "-m yarn-cluster"
You are right. "--target yarn-per-job" is the recommended way to start a
perjob cluster. The backend cli option parser is *GenericCLI*. It is also
used for application mode and K8s deployment. "-m yarn-cluster" is the old
way. All the cli options are parsed by FlinkYarnSessionCli. Since it is
widely used, it could not be deprecated or removed very soon. "--executor"
has the exactly same effect with "--target". The only different is the
naming.

[1]. https://issues.apache.org/jira/browse/FLINK-20866

Best,
Yang


Dongwon Kim  于2021年1月6日周三 下午6:49写道:

> Hi Yang,
>
> I was wondering why CLI accessed ZK because, as shown in the following
> lines, CLI seemed to know the address of JM by contacting AHS before
> connecting to ZK.
>
> 2021-01-06 18:35:32,351 INFO  org.apache.flink.client.cli.CliFrontend
>   [] - Running 'list' command.
>
> 2021-01-06 18:35:32,682 INFO  org.apache.hadoop.yarn.client.AHSProxy
>   [] - Connecting to Application History server at
> mobdata-devflink-nm02.dakao.io/10.93.0.91:10200
>
> 2021-01-06 18:35:32,763 INFO  org.apache.flink.yarn.YarnClusterDescriptor
> [] - Found Web Interface
> mobdata-devflink-dn03.dakao.io:37098 of application
> 'application_1600163418174_0127'.
>
> 2021-01-06 18:35:32,773 INFO  org.apache.flink.runtime.util.ZooKeeperUtils
> [] - Enforcing default ACL for ZK connections
>
> 2021-01-06 18:35:32,774 INFO  org.apache.flink.runtime.util.ZooKeeperUtils
> [] - Using '/driving-habits/default' as Zookeeper
> namespace.
> Anyway CLI needs to know where the leader (=active) JM is located via a ZK
> node and GenericCLI has to be informed of high-availability.cluster-id.
> Thanks for the heads up!
>
>
> You could also specify the "high-availability.cluster-id" so that leader
>> retrieval could get the correct JobManager address.
>> *flink list --target yarn-per-job -Dyarn.application.id
>> =$application_id
>> -Dhigh-availability.cluster-id=$application_id*
>
> Okay, it checked that it works. Thank you very much :-) It will be nice
> for other users if your answer is also explained on [1].
>
>
> And the following
>> command should work with/without ZooKeeper HA configured.
>> *./bin/flink list -m yarn-cluster -yid $applicationId*
>
> I'm very confused as there's different ways to specify YARN per-job
> clusters:
> - "--target yarn-per-job" is explained in the current documentation [1]
> and it looks like the most recent one, so I'd rather use this one with
> "-Dhigh-availability.cluster-id=$application_id"
> - Is "--jobmanater yarn-per-job" a preferred way of specifying per-job
> clusters and is it going to co-exist with "--target yarn-per-job" for the
> future releases? but It looks old-school to me.
> - There's also "--executor yarn-per-job" which seems to be deprecated soon
> (explained in "flink help")
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#per-job-cluster-mode
>
> Best,
>
> Dongwon
>
> On Wed, Jan 6, 2021 at 12:33 PM Yang Wang  wrote:
>
>> Hi Dongwon,
>>
>> I think the root cause is that GenericCLI do not override the
>> "high-availability.cluster-id" with specified application id.
>> The GenericCLI is activated by "--target yarn-per-job". In
>> the FlinkYarnSessionCli, we have done this. And the following
>> command should work with/without ZooKeeper HA configured.
>>
>>
>> *./bin/flink list -m yarn-cluster -yid $applicationId*
>>
>> You could also specify the "high-availability.cluster-id" so that leader
>> retrieval could get the correct JobManager address.
>>
>>
>> *flink list --target yarn-per-job -Dyarn.application.id
>> =$application_id
>> -Dhigh-availability.cluster-id=$application_id*
>>
>> BTW, this is not a new introduced behavior change in Flink 1.12. I
>> believe it also could not work in 1.11 and 1.10.
>>
>>
>> Best,
>> Yang
>>
>>
>> Dongwon Kim  于2021年1月5日周二 下午11:22写道:
>>
>>> Hi,
>>>
>>> I'm using Flink-1.12.0 and running on Hadoop YARN.
>>>
>>> After setting HA-related properties in flink-conf.yaml,
>>>
>>> high-availability: zookeeper
>>>
>>> high-availability.zookeeper.path.root: /recovery
>>>
>>> high-availability.zookeeper.quorum: nm01:2181,nm02:2181,nm03:2181
>>>
>>> high-availability.storageDir: hdfs:///flink/recove

Re: "flink list" fails when zk-based HA is enabled in flink-conf.yaml

2021-01-06 Thread Dongwon Kim
Thanks Yang for the very detailed explanation!

Wow, I really appreciate it.

Best,

Dongwon

On Wed, Jan 6, 2021 at 10:17 PM Yang Wang  wrote:

> Hi Dongwon,
>
> Please find the answer inline.
>
> > why CLI accessed ZK?
> This is a good question. Currently, when the HA is enabled, even though we
> could get the JobManager rest endpoint from Yarn application report, we
> still have to retrieve the leader information from ZooKeeper. Please find
> more information in the class *RestClusterClient*. I am not aware of any
> potential issues if we directly retrieve rest endpoint from Yarn
> application report. And I think this could be a minor improvement.
>
> > Specify the "high-availability.cluster-id" to list jobs
> I have created a ticket for updating the documentation[1].
>
> > About the "-m yarn-cluster"
> You are right. "--target yarn-per-job" is the recommended way to start a
> perjob cluster. The backend cli option parser is *GenericCLI*. It is also
> used for application mode and K8s deployment. "-m yarn-cluster" is the old
> way. All the cli options are parsed by FlinkYarnSessionCli. Since it is
> widely used, it could not be deprecated or removed very soon. "--executor"
> has the exactly same effect with "--target". The only different is the
> naming.
>
> [1]. https://issues.apache.org/jira/browse/FLINK-20866
>
> Best,
> Yang
>
>
> Dongwon Kim  于2021年1月6日周三 下午6:49写道:
>
>> Hi Yang,
>>
>> I was wondering why CLI accessed ZK because, as shown in the following
>> lines, CLI seemed to know the address of JM by contacting AHS before
>> connecting to ZK.
>>
>> 2021-01-06 18:35:32,351 INFO  org.apache.flink.client.cli.CliFrontend
>>   [] - Running 'list' command.
>>
>> 2021-01-06 18:35:32,682 INFO  org.apache.hadoop.yarn.client.AHSProxy
>>   [] - Connecting to Application History server at
>> mobdata-devflink-nm02.dakao.io/10.93.0.91:10200
>>
>> 2021-01-06 18:35:32,763 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>> [] - Found Web Interface
>> mobdata-devflink-dn03.dakao.io:37098 of application
>> 'application_1600163418174_0127'.
>>
>> 2021-01-06 18:35:32,773 INFO  org.apache.flink.runtime.util.ZooKeeperUtils
>> [] - Enforcing default ACL for ZK connections
>>
>> 2021-01-06 18:35:32,774 INFO  org.apache.flink.runtime.util.ZooKeeperUtils
>> [] - Using '/driving-habits/default' as Zookeeper
>> namespace.
>> Anyway CLI needs to know where the leader (=active) JM is located via a
>> ZK node and GenericCLI has to be informed of high-availability.cluster-id.
>> Thanks for the heads up!
>>
>>
>> You could also specify the "high-availability.cluster-id" so that leader
>>> retrieval could get the correct JobManager address.
>>> *flink list --target yarn-per-job -Dyarn.application.id
>>> =$application_id
>>> -Dhigh-availability.cluster-id=$application_id*
>>
>> Okay, it checked that it works. Thank you very much :-) It will be nice
>> for other users if your answer is also explained on [1].
>>
>>
>> And the following
>>> command should work with/without ZooKeeper HA configured.
>>> *./bin/flink list -m yarn-cluster -yid $applicationId*
>>
>> I'm very confused as there's different ways to specify YARN per-job
>> clusters:
>> - "--target yarn-per-job" is explained in the current documentation [1]
>> and it looks like the most recent one, so I'd rather use this one with
>> "-Dhigh-availability.cluster-id=$application_id"
>> - Is "--jobmanater yarn-per-job" a preferred way of specifying per-job
>> clusters and is it going to co-exist with "--target yarn-per-job" for the
>> future releases? but It looks old-school to me.
>> - There's also "--executor yarn-per-job" which seems to be deprecated
>> soon (explained in "flink help")
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#per-job-cluster-mode
>>
>> Best,
>>
>> Dongwon
>>
>> On Wed, Jan 6, 2021 at 12:33 PM Yang Wang  wrote:
>>
>>> Hi Dongwon,
>>>
>>> I think the root cause is that GenericCLI do not override the
>>> "high-availability.cluster-id" with specified application id.
>>> The GenericCLI is activated by "--target yarn-per-job". In
>>> the FlinkYarnSessionCli, we have done this. And the following
>>> command should work with/without ZooKeeper HA configured.
>>>
>>>
>>> *./bin/flink list -m yarn-cluster -yid $applicationId*
>>>
>>> You could also specify the "high-availability.cluster-id" so that leader
>>> retrieval could get the correct JobManager address.
>>>
>>>
>>> *flink list --target yarn-per-job -Dyarn.application.id
>>> =$application_id
>>> -Dhigh-availability.cluster-id=$application_id*
>>>
>>> BTW, this is not a new introduced behavior change in Flink 1.12. I
>>> believe it also could not work in 1.11 and 1.10.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Dongwon Kim  于2021年1月5日周二 下午11:22写道:
>>>
 Hi,

 I'm using Flink-1.12.0 and running on Hadoop Y

Re: question regarding flink local buffer pool

2021-01-06 Thread Arvid Heise
Hi Eleanore,

first I'd like to point to a related blog post, which explains most
concepts in a better way than I could write here [1].

Now let's go through your questions:

1. A buffer pool is just a place where a task gets a buffer from. So pool-A
is used by taskA for both reading (from network stack) and writing to
network stack. Now you have the special case of taskA and B being
co-located. In this case (and only in this case), the buffer of pool-A is
handed over to taskB for immediate consumption.

Back-pressure in general occurs when B consumes slower than A. For
distributed tasks that means that B is not freeing buffers fast enough to
read from the network. There are two reasons: processing is slower than A
produces (buffer stuck in network input) or B itself is backpressured
(buffers stuck in output). For co-located tasks, it's pretty much the same
except that B is not using a buffer to read data from its pool, so buffers
of pool-A are stuck in network input.

2. Correct. Chained tasks do not use buffers at all. Chained tasks should
always be preferred. Note that if you have not enabled object reuse, the
StreamRecord would be cloned to ensure data integrity. You should enable
object reuse [2] for even better performance.

3. Network buffer pool is created per task manager according to the memory
settings [2]. The network buffer pool then redistributes its buffers to the
local pools of tasks (there are also exclusive buffers for inputs that live
outside of local pools).

Related options are taskmanager.memory.network.fraction (= total size of
network buffer pool) and taskmanager.memory.segment-size (determines number
of buffers) as well as taskmanager.network.memory.buffers-per-channel
(exclusive buffers), taskmanager.network.memory.floating-buffers-per-gate
(additional buffers in local pool per input gate/result partition), and
taskmanager.network.memory.max-buffers-per-channel (max buffers per
outgoing channel in case of data skew).

4. Yes, indirectly through (effective value of
taskmanager.memory.network.fraction) / taskmanager.memory.segment-size.
Yes, it's distributed evenly afaik.

[1] https://flink.apache.org/2019/06/05/flink-network-stack.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
[3]
https://flink.apache.org/news/2020/04/21/memory-management-improvements-flink-1.10.html

On Wed, Jan 6, 2021 at 7:25 AM Eleanore Jin  wrote:

> Hi experts,
>
> I am running flink 1.10, the flink job is stateless. I am trying to
> understand how local buffer pool works:
>
> 1. lets say taskA and taskB both run in the same TM JVM, each task will
> have its own local buffer pool, and taskA will write to pool-A, and taskB
> will read from pool-A and write to pool-b, if taskB consume slower from
> pool-A than taskA writes to it, it will cause backpressure.
>
> 2. If the above assumption is correct, then this works when taskA and
> taskB is not chained together, if chained, there is no buffer in between,
> the StreamRecord will be directly passed from taskA to taskB?
>
> 3. what is the configuration parameter for this local buffer pool? and
> what is the relationship between local buffer pool with network buffer pool?
>
> 4. is the configuration for the total local buffer per TM? and is it
> evenly spread between tasks?
>
> Thanks a lot!
> Eleanore
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Roadmap for Execution Mode (Batch/Streaming) and interaction with Table/SQL APIs

2021-01-06 Thread burkaygur
Hi Flink Community,

Really excited for the "true unification" of Batch and Stream APIs, and had
a few questions for clarification after reading the release notes, and
watching the video by aljoscha  here
  .


1) How do these changes impact the Table and SQL APIs? Are they completely
orthogonal or can we get the benefits of the new Batch Mode with Flink SQL
as well?
2) What is the best ticket to follow the roadmap & track the progress of
this whole project. Specifically the parts about bootstrapping of state. I
would love to help contribute to it.

Best,
Burkay



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using key.fields in 1.12

2021-01-06 Thread Piotr Nowojski
Hey,

have you added Kafka connector as the dependency? [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies

Best,
Piotrek

śr., 6 sty 2021 o 04:37 Aeden Jameson  napisał(a):

> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
> feature of the Kafa SQL Connector. My current connector is configured
> as ,
>
> connector.type= 'kafka'
> connector.version = 'universal'
> connector.topic   = 'my-topic'
> connector.properties.group.id = 'my-consumer-group'
> connector.properties.bootstrap.servers = '...'
> format.type = 'avro'
> format.avro-schema = ''
>
> I tried adding
>
> key.fields = 'my_key_field'
>
> as well as
>
> key.format = 'avro'
> key.fields = 'my_key_field'
>
> but I get the exception
>
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Reason: No factory supports all properties.
>
> The matching candidates:
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> Unsupported property keys:
> key.fields
> key.format
>
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> at
> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
> at
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
> at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
> ... 21 more
>
> I have validated that the uber jar clearly contains the 1.12
> dependencies. What is that magic combination of properties to get
> key.fields to work? Or is it not supported with avro?
>
> --
> Thank You,
> Aeden
>


reason for endless backpressure

2021-01-06 Thread 赵一旦
I've had a problem many times. When the task suddenly continues to back
pressure, the back pressure node will no longer send any records unless the
task is restarted. But I can confirm that it's not due to high pressure.
During the back pressure period, the CPU utilization of the machine is all
reduced, but not increased.

At present, I initially suspect that it has something to do with the
Internet. Does anyone know the principle? Can network jitter cause this
phenomenon?


Re: Roadmap for Execution Mode (Batch/Streaming) and interaction with Table/SQL APIs

2021-01-06 Thread Piotr Nowojski
Hi,

1. I think those changes will mostly bring new features/functionalities to
the existing Streaming APIs in order to fully support batch executions. For
example one way or another to better handle "bounded data streams" in the
DataStream API.
2. I think there is and there is not going to be one single umbrella
ticket, as this whole idea is going to take quite a bit of time to fully
design. There are a couple of the initial design proposals connected with
the batch and streaming unification:
FLIP-134 [1]
FLIP-147 [2]
But I'm pretty sure more will follow in the future.

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

śr., 6 sty 2021 o 16:20 burkaygur  napisał(a):

> Hi Flink Community,
>
> Really excited for the "true unification" of Batch and Stream APIs, and had
> a few questions for clarification after reading the release notes, and
> watching the video by aljoscha  here
>   .
>
>
> 1) How do these changes impact the Table and SQL APIs? Are they completely
> orthogonal or can we get the benefits of the new Batch Mode with Flink SQL
> as well?
> 2) What is the best ticket to follow the roadmap & track the progress of
> this whole project. Specifically the parts about bootstrapping of state. I
> would love to help contribute to it.
>
> Best,
> Burkay
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: reason for endless backpressure

2021-01-06 Thread Piotr Nowojski
Hi,

If you have an unstable network, which is dropping packets in a weird way
(data is lost, but the connection is still kept alive from the perspective
of the underlying operating system) it could happen that task will be
perpetually blocked. But this is extremely rare. I would first suggest
trying to look in other directions. For example if maybe your Task is
stuck/deadlocked somewhere in your code? I would suggest attaching a
debugger to the problematic TaskManager and/or gather some stack traces.

If it's indeed a network or hardware issue, a good question would be if
it's happening always on the same physical machine? Last time I saw a user
reporting a similar problem it ended up being a faulty machine.

Best,
Piotrek



śr., 6 sty 2021 o 17:16 赵一旦  napisał(a):

> I've had a problem many times. When the task suddenly continues to back
> pressure, the back pressure node will no longer send any records unless the
> task is restarted. But I can confirm that it's not due to high pressure.
> During the back pressure period, the CPU utilization of the machine is all
> reduced, but not increased.
>
> At present, I initially suspect that it has something to do with the
> Internet. Does anyone know the principle? Can network jitter cause this
> phenomenon?
>


Re: Using key.fields in 1.12

2021-01-06 Thread Aeden Jameson
Yes, I do have that dependency. I see it in the dependency view of
intellij and directly. in the uber jar. Thanks for responding.

- Aeden

On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski  wrote:
>
> Hey,
>
> have you added Kafka connector as the dependency? [1]
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
>
> Best,
> Piotrek
>
> śr., 6 sty 2021 o 04:37 Aeden Jameson  napisał(a):
>>
>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
>> feature of the Kafa SQL Connector. My current connector is configured
>> as ,
>>
>> connector.type= 'kafka'
>> connector.version = 'universal'
>> connector.topic   = 'my-topic'
>> connector.properties.group.id = 'my-consumer-group'
>> connector.properties.bootstrap.servers = '...'
>> format.type = 'avro'
>> format.avro-schema = ''
>>
>> I tried adding
>>
>> key.fields = 'my_key_field'
>>
>> as well as
>>
>> key.format = 'avro'
>> key.fields = 'my_key_field'
>>
>> but I get the exception
>>
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>> Could not find a suitable table factory for
>> 'org.apache.flink.table.factories.TableSourceFactory' in
>> the classpath.
>>
>> Reason: No factory supports all properties.
>>
>> The matching candidates:
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>> Unsupported property keys:
>> key.fields
>> key.format
>>
>> The following factories have been considered:
>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>> at 
>> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
>> at 
>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
>> at 
>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>> at 
>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>> at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
>> ... 21 more
>>
>> I have validated that the uber jar clearly contains the 1.12
>> dependencies. What is that magic combination of properties to get
>> key.fields to work? Or is it not supported with avro?
>>
>> --
>> Thank You,
>> Aeden



-- 
Cheers,
Aeden

GitHub: https://github.com/aedenj
Linked In: http://www.linkedin.com/in/aedenjameson
Blah Blah Blah: http://www.twitter.com/daliful


RE: Flink kafka exceptions handling

2021-01-06 Thread BELGHITH Amira (EXT)

Thank you for your answer.
I have been subscribed.

This is the previous topic I’m referring to 
http://mail-archives.apache.org/mod_mbox/flink-user/202008.mbox/%3CCACzKVZQ093HixMewb_prtP41ceXgmxCv=cmpsbphw-9+h8b...@mail.gmail.com%3E

Our flink job manager fails after multiple restarting, when the Kafka Consumer 
does not find a topic for example. We have a kafka exception 
TopicUnthaurizationException. We listen to a list a topics and whenever one is 
down , all our streaming system is down .. is there a way to handle those 
exceptions in the FlinkKafkaConsumer so the job manager does not fail?


De : Amira Belghith 
Envoyé : mercredi 6 janvier 2021 18:36
À : BELGHITH Amira (EXT) ResgGtsOpmOptVdf ; 
amira.belghith-...@soge.com
Objet : Fwd: Flink kafka exceptions handling

[EMETTEUR EXTERNE] / [EXTERNAL SENDER]
Soyez vigilant avant d'ouvrir les pièces jointes ou de cliquer sur les liens. 
En cas de doute, signalez le message via le bouton "Message suspect" ou 
consultez go/secu.
Be cautious before opening attachments or clicking on any links. If in doubt, 
use "Suspicious email" button or visit go/secu.



-- Message transféré -
De : Piotr Nowojski mailto:pnowoj...@apache.org>>
Date : mer. 6 janv. 2021 à 17:26
Objet : Re: Flink kafka exceptions handling
À : Amira Belghith mailto:belghith.am...@gmail.com>>
CC : buggi...@gmail.com 
mailto:buggi...@gmail.com>>

I think you first need to be subscribed as it's explained here [1]. Could you 
also link to which previous topic are you referring to?

Piotrek

[1] https://flink.apache.org/community.html#mailing-lists

śr., 6 sty 2021 o 17:09 Amira Belghith 
mailto:belghith.am...@gmail.com>> napisał(a):
Hey,
Thanks for your fast reply.
The mail couldnt be delivered to the mailing list.

Le mer. 6 janv. 2021 à 16:59, Piotr Nowojski 
mailto:pnowoj...@apache.org>> a écrit :
Hey,

could you post the question on the user 
mailto:user@flink.apache.org>> mailing list?

Thanks,
Piotrek

śr., 6 sty 2021 o 15:11 Amira Belghith 
mailto:belghith.am...@gmail.com>> napisał(a):
Hi Nick, Piotr,

Im a software engineer working for Societe Generale bank.
I saw your discussion about FlinkKafkaConsumer and exceptions handling.
I have the same problem for a week now, and I wanted to know if you have found 
a solution.
Our flink job manager fails after multiple restarting, when the Kafka Consumer 
does not find a topic for example. We have a kafka exception 
TopicUnthaurizationException. We listen to a list a topics and whenever one is 
down , all our streaming system is down .. is there a way to handle those 
exceptions in the FlinkKafkaConsumer so the job manager does not fail?

Thanks a lot for your help,
Amira belghith

=

Ce message et toutes les pieces jointes (ci-apres le "message")
sont confidentiels et susceptibles de contenir des informations
couvertes par le secret professionnel. Ce message est etabli
a l'intention exclusive de ses destinataires. Toute utilisation
ou diffusion non autorisee interdite.
Tout message electronique est susceptible d'alteration. La SOCIETE GENERALE
et ses filiales declinent toute responsabilite au titre de ce message
s'il a ete altere, deforme falsifie.

=

This message and any attachments (the "message") are confidential,
intended solely for the addresses, and may contain legally privileged
information. Any unauthorized use or dissemination is prohibited.
E-mails are susceptible to alteration. Neither SOCIETE GENERALE nor any
of its subsidiaries or affiliates shall be liable for the message
if altered, changed or falsified.

=


Re: Implementing a TarInputFormat based on FileInputFormat

2021-01-06 Thread Billy Bain
Hi Arvid,

Thanks for the response. I have created a sample application with input
data and uploaded it to google drive. The sample data is in the archive...
thus the large size. (27 mb)

https://drive.google.com/file/d/1dxpnDF3hPUPNlPO5p2tBf-88oOVV0qck/view?usp=sharing

To run it:
flink run  -Dexecution.runtime-mode=BATCH -c
com.billybobbain.AndroidTarReader
/path/to/flink-tar/build/libs/flink-tar-0.1.jar --input_path
/path/to/flink-tar/playstore-00.tar.gz --output_path /path/to/output/

The main class:
public class AndroidTarReader {
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
String inputPath = parameter.get("input_path");
String outputPath = parameter.get("output_path");
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource android = env.readFile(new
TarInputFormat<>(new Path(inputPath), AndroidData.class), inputPath);
final FileSink sink = FileSink
.forRowFormat(new Path(outputPath), new
AndroidDataEncoder())

.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix(".json").build())
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(1024 * 1024)
.build())
.build();
android.sinkTo(sink);
env.execute("zMarket Android");
}
}

On Tue, Jan 5, 2021 at 5:59 AM Arvid Heise  wrote:

> Hi Billy,
>
> the exception is happening on the output side. Input side looks fine.
> Could you maybe post more information about the sink?
>
> On Mon, Dec 28, 2020 at 8:11 PM Billy Bain  wrote:
>
>> I am trying to implement a class that will work similar to
>> AvroFileFormat.
>>
>> This tar archive has a very specific format. It has only one file inside
>> and that file is line delimited JSON.
>>
>> I get this exception, but all the data is written to the temporary files.
>> I have checked that my code isn't closing the stream, which was my prior
>> issue.
>>
>> Caused by: java.nio.channels.ClosedChannelException
>> at
>> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
>> at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
>> at
>> org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
>> at
>> org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
>> at
>> org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/00.run(Unknown
>> Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> at java.base/java.lang.Thread.run(Thread.java:836)
>>
>> public class TarInputFormat extends FileInputFormat implements
>> ResultTypeQueryable {
>>
>> private static final Logger logger =
>> LoggerFactory.getLogger(TarInputFormat.class);
>> private transient TarArchiveInputStream tarArchiveInputStream;
>> private TarArchiveEntry nextEntry;
>> private final Class valueType;
>> private long currentPosition = 0L;
>> private static final ObjectMapper objectMapper = new ObjectMapper();
>>
>> public TarInputFormat(Path filePath, Class valueType) {
>> super(filePath);
>> this.valueType = valueType;
>> this.unsplittable = true;
>> this.setNumSplits(1);
>> }
>>
>> @Ove

Question about Java lists in Flink SQL

2021-01-06 Thread Jeyhun G.
Dear Flink committers,

I have an issue with Flink SQL that I was wondering if you could help me
with.

I am trying to use Flink in my project and I ran into an issue. I have a
DataStream that consists of POJOs. One of the fields in the POJO is of type
java.util.List. I'm converting DataStream into Table, and I want to run
Flink SQL query on that table. I expected to be able to use square brackets [
] to address specific elements of the field that was of type java.util.List in
the original data stream. However, I get the following error when I try to
do so:

org.apache.flink.table.api.TableException: Type is not supported: ANY

I describe my issue in slightly more detail on StackOverflow
.
Would you be able to tell me if there is a way to treat a field that was of
type java.util.List as an array in Flink SQL, in order to apply array
functions on that field?

Thank you


Re: Unexpected latency across operator instances

2021-01-06 Thread Paul Lam
Hi Antonis,

Did you try to profile the “bad” taskmanager to see what the task thread was 
busy doing?

And a possible culprit might be gc, if you haven't checked that. I’ve seen gc 
threads eating up 30% of cpu.

Best,
Paul Lam

> 2020年12月14日 06:24,Antonis Papaioannou  写道:
> 
> Hi,
> 
> I experience a strange behaviour with our Flink application. So I created a 
> very simple sample application to demonstrate the problem.
> A simple Flink application reads data from Kakfa, perfoms a simple 
> transformation and accesses an external Redis database to read data within a 
> FlatMap operator. When running the application with parallelism higher than 
> 1, there is an unexpected high latency only on one operator instance (the 
> “bad” instance is not always the same, it is randomly “selected” across 
> multiple runs) that accesses the external database. There multiple Redis 
> instances, all running in standalone mode, so each Redis request is served by 
> the local instance. To demonstrate that the latency is not related to the 
> Redis, I completely removed the database access and simulated its latency 
> with a sleep operation for about 0.1 ms, resulting to the same strange 
> behavior.
> 
> Profiling the application by enabling the Flink monitoring mechanism, we see 
> that all instances of the upstream operator is backpressured and the input 
> buffer pool (and the input exclusive buffer pool) usage on the “bad” node are 
> 100% during the whole run.
> 
> There is no skew in the dataset. I also replaces the keyBy with rebalance 
> which follows a round-robbin data distribution but there is no difference. 
> 
> I expected all nodes to exhibit similar (either low or high) latency. So the 
> question is why only one operator instance exhibits high latency? Is there 
> any change there is a starvation problem due to credit-based flow control?
> 
> Removing the keyBy between the operators, the system exhibits the expected 
> behaviour.
> 
> I also attach a pdf with more details about the application and graphs with 
> monitoring data.
> 
> I hope someone could have an idea about this unexpected behaviour.
> 
> Thank you,
> Antonis
> 
> 
> 



Re: Flink Stateful Function: The program's entry point class not found in the jar file

2021-01-06 Thread Le Xu
Thanks Igal the application runs perfectly now!

Le

On Mon, Jan 4, 2021 at 9:13 AM Igal Shilman  wrote:

> Hi Le,
>
> Looking at your pom.xml, you are pointing to the wrong main method here:
>
> https://github.com/flint-stone/flink-statefun/blob/lx-base/statefun-examples/statefun-flink-datastream-example/pom.xml#L161
>
> You need to change it to your Example class, this should work.
>
>
> On Tue, Dec 29, 2020 at 5:06 AM Le Xu  wrote:
>
>> Hi Igal:
>>
>> Thanks for pointing that out. I was able to add the dependency in and
>> submit the job. For statefun-greeter example [1] I was able to submit the
>> job to the cluster . But when I try out the statefun-data-stream example
>> [2] I got the complaints saying that "There are no ingress defined" (I'm
>> adding the full trace to the end of the email). (I'm also adding my pom to
>> [3] and source file to [4].) From the example it appears the job uses the
>> Datastream as ingress [5] so ideally the job should be able to receive
>> events as trigger periodically (the job works fine with local environment).
>> I also came across this issue [6] but I don't think it helps solving my
>> problem. By any chance you'd know what's going on?
>>
>>
>> [1]
>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>> [2]
>> https://github.com/flint-stone/flink-statefun/blob/3aba506242300d69e95ef339afeac8e561dc7a2d/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java#L82
>> [3]
>> https://github.com/flint-stone/flink-statefun/blob/lx-base/statefun-examples/statefun-flink-datastream-example/pom.xml
>> [4]
>> https://github.com/flint-stone/flink-statefun/blob/lx-base/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
>> [5]
>> https://github.com/flint-stone/flink-statefun/blob/3aba506242300d69e95ef339afeac8e561dc7a2d/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java#L82
>> [6]
>> https://stackoverflow.com/questions/61578082/flink-statefun-co-located-functions-communication
>>
>> Error Trace:
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: There are no ingress defined.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>> at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>> Caused by: java.lang.IllegalStateException: There are no ingress defined.
>> at
>> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
>> at
>> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:76)
>> at
>> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:52)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> ... 8 more
>>
>>
>> Thanks for the help!
>>
>> Le
>>
>>
>> On Mon, Dec 28, 2020 at 6:24 AM Igal Shilman  wrote:
>>
>>> Hi Le,
>>> Indeed you have added the dependency correctly. But the resulting
>>> artifact doesn't contain the dependencies. You need to create a jar with
>>> dependencies ( via [1] or [2])
>>> Take a look at [3] for a usage example of the maven shade plugin.
>>>
>>> I hope this helps,
>>> Igal.
>>>
>>> [1] https://maven.apache.org/plugins/maven-assembly-plugin/usage.html
>>> [2] https://maven.apache.org/plugins/maven-shade-plugin/
>>> [3]
>>> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-distribution/pom.xml#L126,L199
>>>
>>> On Sat, Dec 26, 2020 at 11:52 PM Le Xu  wrote:
>>>
 Thanks Igal! I might be missing something here. I did place
 statefun-flink-distribution as part of my dependency in the pom (see
 line 46 at [1]).  Is there a correct way to include the jar? I'm having the