I am having a problem with sending code. So here it is. Hope this now looks
ok
This is my main job (some parts of codes are abbreviated and this is the
main part):
*public class MyJob {*
* private StreamExecutionEnvironment env;*
* private static final Integer NUM_OF_PARALLEL_OPERATORS = 1;*
* public static void main(String[] args) throws Exception {*
* var myJob = new MyJob();*
* myJob.withExecutionEnvironment();*
* myJob.init();*
* }*
* public void init() throws Exception {*
* var settings = getApplicationProperties();*
* SourceFunction<ProcessingResult<MyEvent>> customerAccountSource =*
* getSource(settings, settings.getProperty("my.topic"));*
* DataStream<ProcessingResult<MyEvent>> mySource =*
*
env.addSource(customerAccountSource).setParallelism(NUM_OF_PARALLEL_OPERATORS);*
* mySource.print();*
* env.execute();*
* }*
* private static SourceFunction<ProcessingResult<MyEvent>> getSource(
Properties settings, String topic) {*
* var kafkaProps = getKafkaSourceProperties(settings);*
* return new FlinkKafkaConsumer<>(topic, new MyKafkaDeserializer(),
kafkaProps)*
* .setStartFromEarliest()*
* .assignTimestampsAndWatermarks(*
*
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(100)))*
* .setCommitOffsetsOnCheckpoints(true);*
* }*
*}*
And this is myKafkaDeserializer:
*public class MyKafkaDeserializer implements
KafkaDeserializationSchema<ProcessingResult<MyEvent>> {*
* private final ObjectMapper objectMapper;*
* public MyKafkaDeserializer() {*
* this.objectMapper = JsonUtils.objectMapper();*
* }*
* @Override*
* public boolean isEndOfStream(ProcessingResult<MyEvent> processingResult)
{*
* return false;*
* }*
* @Override*
* public ProcessingResult<MyEvent> deserialize(ConsumerRecord<byte[],
byte[]> consumerRecord)*
* throws Exception {*
* try {*
* var event = objectMapper.readValue(consumerRecord.value(),
MyEvent.class);*
* var violation = ObjectValidator.of().getValidator().validate(event);*
* if (!violation.isEmpty()) {*
* return ProcessingResult.error(*
* new ProcessingError(*
* asTransformationError(*
* "constraint.violation",
toCustomerAccountValidationErrorString(violation))));*
* }*
* return ProcessingResult.success(event);*
* } catch (JsonProcessingException e) {*
* return ProcessingResult.error(*
* new ProcessingError(*
* asTransformationError("constraint.violation",
toStacktraceString(e))));*
* }*
* }*
* @Override*
* public TypeInformation<ProcessingResult<MyEvent>> getProducedType() {*
* return TypeInformation.of(new TypeHint<ProcessingResult<MyEvent>>()
{});*
* }*
* private static String toCustomerAccountValidationErrorString(*
* Set<ConstraintViolation<MyEvent>> errors) {*
* return
errors.stream().map(ConstraintViolation::getMessage).collect(Collectors.joining(";"));*
* }*
On Tue, May 25, 2021 at 5:51 PM Arvid Heise <[email protected]> wrote:
> Could you share your KafkaDeserializationSchema, we might be able to spot
> some optimization potential. You could also try out enableObjectReuse [1],
> which avoids copying data between tasks (not sure if you have any
> non-chained tasks).
>
> If you are on 1.13, you could check out the flamegraph to see where the
> bottleneck occurs. [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/execution/execution_configuration/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>
> On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski <[email protected]>
> wrote:
>
>> Hi,
>>
>> That's a throughput of 700 records/second, which should be well below
>> theoretical limits of any deserializer (from hundreds thousands up to tens
>> of millions records/second/per single operator), unless your records are
>> huge or very complex.
>>
>> Long story short, I don't know of a magic bullet to help you solve your
>> problem. As always you have two options, either optimise/speed up your
>> code/job, or scale up.
>>
>> If you choose the former, think about Flink as just another Java
>> application. Check metrics and resource usage, and understand what resource
>> is the problem (cpu? memory? machine is swapping? io?). You might be able
>> to guess what's your bottleneck (reading from kafka? deserialisation?
>> something else? Flink itself?) by looking at some of the metrics
>> (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or
>> you can also simplify your job to bare minimum and test performance of
>> independent components. Also you can always attach a code profiler and
>> simply look at what's happening. First identify what's the source of the
>> bottleneck and then try to understand what's causing it.
>>
>> Best,
>> Piotrek
>>
>> [1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also
>> comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes
>> in the job graph based on busy/back pressured status and Flamegraph
>> support)
>>
>> wt., 25 maj 2021 o 15:44 B.B. <[email protected]> napisaĆ(a):
>>
>>> Hi,
>>>
>>> I am in the process of optimizing my job which at the moment by our
>>> thinking is too slow.
>>>
>>> We are deploying job in kubernetes with 1 job manager with 1gb ram and 1
>>> cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and
>>> parallelism of two).
>>>
>>> The main problem is one kafka source that has 3,8 million events that we
>>> have to process.
>>> As a test we made a simple job that connects to kafka using a custom
>>> implementation of KafkaDeserializationSchema. There we are using
>>> ObjectMapper that mapps input values eg.
>>>
>>> *var event = objectMapper.readValue(consumerRecord.value(),
>>> MyClass.class);*
>>>
>>> This is then validated with hibernate validator and output of this
>>> source is printed on the console.
>>>
>>> The time needed for the job to consume all the events was one and a half
>>> hours, which seems a bit long.
>>> Is there a way we can speed up this process?
>>>
>>> Is more cpu cores or memory solution?
>>> Should we switch to avro deserialization schema?
>>>
>>>
>>>
>>>
--
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane