I am having a problem with sending code. So here it is. Hope this now looks ok
This is my main job (some parts of codes are abbreviated and this is the main part): *public class MyJob {* * private StreamExecutionEnvironment env;* * private static final Integer NUM_OF_PARALLEL_OPERATORS = 1;* * public static void main(String[] args) throws Exception {* * var myJob = new MyJob();* * myJob.withExecutionEnvironment();* * myJob.init();* * }* * public void init() throws Exception {* * var settings = getApplicationProperties();* * SourceFunction<ProcessingResult<MyEvent>> customerAccountSource =* * getSource(settings, settings.getProperty("my.topic"));* * DataStream<ProcessingResult<MyEvent>> mySource =* * env.addSource(customerAccountSource).setParallelism(NUM_OF_PARALLEL_OPERATORS);* * mySource.print();* * env.execute();* * }* * private static SourceFunction<ProcessingResult<MyEvent>> getSource( Properties settings, String topic) {* * var kafkaProps = getKafkaSourceProperties(settings);* * return new FlinkKafkaConsumer<>(topic, new MyKafkaDeserializer(), kafkaProps)* * .setStartFromEarliest()* * .assignTimestampsAndWatermarks(* * WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(100)))* * .setCommitOffsetsOnCheckpoints(true);* * }* *}* And this is myKafkaDeserializer: *public class MyKafkaDeserializer implements KafkaDeserializationSchema<ProcessingResult<MyEvent>> {* * private final ObjectMapper objectMapper;* * public MyKafkaDeserializer() {* * this.objectMapper = JsonUtils.objectMapper();* * }* * @Override* * public boolean isEndOfStream(ProcessingResult<MyEvent> processingResult) {* * return false;* * }* * @Override* * public ProcessingResult<MyEvent> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord)* * throws Exception {* * try {* * var event = objectMapper.readValue(consumerRecord.value(), MyEvent.class);* * var violation = ObjectValidator.of().getValidator().validate(event);* * if (!violation.isEmpty()) {* * return ProcessingResult.error(* * new ProcessingError(* * asTransformationError(* * "constraint.violation", toCustomerAccountValidationErrorString(violation))));* * }* * return ProcessingResult.success(event);* * } catch (JsonProcessingException e) {* * return ProcessingResult.error(* * new ProcessingError(* * asTransformationError("constraint.violation", toStacktraceString(e))));* * }* * }* * @Override* * public TypeInformation<ProcessingResult<MyEvent>> getProducedType() {* * return TypeInformation.of(new TypeHint<ProcessingResult<MyEvent>>() {});* * }* * private static String toCustomerAccountValidationErrorString(* * Set<ConstraintViolation<MyEvent>> errors) {* * return errors.stream().map(ConstraintViolation::getMessage).collect(Collectors.joining(";"));* * }* On Tue, May 25, 2021 at 5:51 PM Arvid Heise <ar...@apache.org> wrote: > Could you share your KafkaDeserializationSchema, we might be able to spot > some optimization potential. You could also try out enableObjectReuse [1], > which avoids copying data between tasks (not sure if you have any > non-chained tasks). > > If you are on 1.13, you could check out the flamegraph to see where the > bottleneck occurs. [2] > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/execution/execution_configuration/ > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/ > > On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> Hi, >> >> That's a throughput of 700 records/second, which should be well below >> theoretical limits of any deserializer (from hundreds thousands up to tens >> of millions records/second/per single operator), unless your records are >> huge or very complex. >> >> Long story short, I don't know of a magic bullet to help you solve your >> problem. As always you have two options, either optimise/speed up your >> code/job, or scale up. >> >> If you choose the former, think about Flink as just another Java >> application. Check metrics and resource usage, and understand what resource >> is the problem (cpu? memory? machine is swapping? io?). You might be able >> to guess what's your bottleneck (reading from kafka? deserialisation? >> something else? Flink itself?) by looking at some of the metrics >> (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or >> you can also simplify your job to bare minimum and test performance of >> independent components. Also you can always attach a code profiler and >> simply look at what's happening. First identify what's the source of the >> bottleneck and then try to understand what's causing it. >> >> Best, >> Piotrek >> >> [1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also >> comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes >> in the job graph based on busy/back pressured status and Flamegraph >> support) >> >> wt., 25 maj 2021 o 15:44 B.B. <bijela.vr...@gmail.com> napisaĆ(a): >> >>> Hi, >>> >>> I am in the process of optimizing my job which at the moment by our >>> thinking is too slow. >>> >>> We are deploying job in kubernetes with 1 job manager with 1gb ram and 1 >>> cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and >>> parallelism of two). >>> >>> The main problem is one kafka source that has 3,8 million events that we >>> have to process. >>> As a test we made a simple job that connects to kafka using a custom >>> implementation of KafkaDeserializationSchema. There we are using >>> ObjectMapper that mapps input values eg. >>> >>> *var event = objectMapper.readValue(consumerRecord.value(), >>> MyClass.class);* >>> >>> This is then validated with hibernate validator and output of this >>> source is printed on the console. >>> >>> The time needed for the job to consume all the events was one and a half >>> hours, which seems a bit long. >>> Is there a way we can speed up this process? >>> >>> Is more cpu cores or memory solution? >>> Should we switch to avro deserialization schema? >>> >>> >>> >>> -- Everybody wants to be a winner Nobody wants to lose their game Its insane for me Its insane for you Its insane