Hi, The implementation looks good. I'd probably cache the *ObjectValidator.of().getValidator()* in a field to be sure that it's not a pricey construction. Did you evaluate what happens when you skip the validation entirely in terms of records/s?
On Thu, May 27, 2021 at 11:18 AM B.B. <bijela.vr...@gmail.com> wrote: > I am having a problem with sending code. So here it is. Hope this now > looks ok > > This is my main job (some parts of codes are abbreviated and this is the > main part): > > *public class MyJob {* > > * private StreamExecutionEnvironment env;* > > * private static final Integer NUM_OF_PARALLEL_OPERATORS = 1;* > > > * public static void main(String[] args) throws Exception {* > > > * var myJob = new MyJob();* > > * myJob.withExecutionEnvironment();* > > * myJob.init();* > > * }* > > * public void init() throws Exception {* > > * var settings = getApplicationProperties();* > > > * SourceFunction<ProcessingResult<MyEvent>> customerAccountSource =* > > * getSource(settings, settings.getProperty("my.topic"));* > > > * DataStream<ProcessingResult<MyEvent>> mySource =* > > * > env.addSource(customerAccountSource).setParallelism(NUM_OF_PARALLEL_OPERATORS);* > > > * mySource.print();* > > * env.execute();* > > * }* > > > > > * private static SourceFunction<ProcessingResult<MyEvent>> getSource( > Properties settings, String topic) {* > > * var kafkaProps = getKafkaSourceProperties(settings);* > > * return new FlinkKafkaConsumer<>(topic, new MyKafkaDeserializer(), > kafkaProps)* > * .setStartFromEarliest()* > * .assignTimestampsAndWatermarks(* > * > WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(100)))* > * .setCommitOffsetsOnCheckpoints(true);* > * }* > > *}* > > And this is myKafkaDeserializer: > > > *public class MyKafkaDeserializer implements > KafkaDeserializationSchema<ProcessingResult<MyEvent>> {* > > * private final ObjectMapper objectMapper;* > > * public MyKafkaDeserializer() {* > > * this.objectMapper = JsonUtils.objectMapper();* > > * }* > > * @Override* > * public boolean isEndOfStream(ProcessingResult<MyEvent> > processingResult) {* > > * return false;* > > * }* > > * @Override* > * public ProcessingResult<MyEvent> deserialize(ConsumerRecord<byte[], > byte[]> consumerRecord)* > * throws Exception {* > > * try {* > > * var event = objectMapper.readValue(consumerRecord.value(), > MyEvent.class);* > > * var violation = > ObjectValidator.of().getValidator().validate(event);* > > > * if (!violation.isEmpty()) {* > > * return ProcessingResult.error(* > > * new ProcessingError(* > > * asTransformationError(* > > * "constraint.violation", > toCustomerAccountValidationErrorString(violation))));* > > * }* > > > * return ProcessingResult.success(event);* > > > * } catch (JsonProcessingException e) {* > > * return ProcessingResult.error(* > > * new ProcessingError(* > > * asTransformationError("constraint.violation", > toStacktraceString(e))));* > > * }* > > * }* > > * @Override* > * public TypeInformation<ProcessingResult<MyEvent>> getProducedType() {* > > * return TypeInformation.of(new TypeHint<ProcessingResult<MyEvent>>() > {});* > > * }* > > * private static String toCustomerAccountValidationErrorString(* > * Set<ConstraintViolation<MyEvent>> errors) {* > > * return > errors.stream().map(ConstraintViolation::getMessage).collect(Collectors.joining(";"));* > > * }* > > > > > > On Tue, May 25, 2021 at 5:51 PM Arvid Heise <ar...@apache.org> wrote: > >> Could you share your KafkaDeserializationSchema, we might be able to spot >> some optimization potential. You could also try out enableObjectReuse [1], >> which avoids copying data between tasks (not sure if you have any >> non-chained tasks). >> >> If you are on 1.13, you could check out the flamegraph to see where the >> bottleneck occurs. [2] >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/execution/execution_configuration/ >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/ >> >> On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski <pnowoj...@apache.org> >> wrote: >> >>> Hi, >>> >>> That's a throughput of 700 records/second, which should be well below >>> theoretical limits of any deserializer (from hundreds thousands up to tens >>> of millions records/second/per single operator), unless your records are >>> huge or very complex. >>> >>> Long story short, I don't know of a magic bullet to help you solve your >>> problem. As always you have two options, either optimise/speed up your >>> code/job, or scale up. >>> >>> If you choose the former, think about Flink as just another Java >>> application. Check metrics and resource usage, and understand what resource >>> is the problem (cpu? memory? machine is swapping? io?). You might be able >>> to guess what's your bottleneck (reading from kafka? deserialisation? >>> something else? Flink itself?) by looking at some of the metrics >>> (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or >>> you can also simplify your job to bare minimum and test performance of >>> independent components. Also you can always attach a code profiler and >>> simply look at what's happening. First identify what's the source of the >>> bottleneck and then try to understand what's causing it. >>> >>> Best, >>> Piotrek >>> >>> [1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also >>> comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes >>> in the job graph based on busy/back pressured status and Flamegraph >>> support) >>> >>> wt., 25 maj 2021 o 15:44 B.B. <bijela.vr...@gmail.com> napisaĆ(a): >>> >>>> Hi, >>>> >>>> I am in the process of optimizing my job which at the moment by our >>>> thinking is too slow. >>>> >>>> We are deploying job in kubernetes with 1 job manager with 1gb ram and >>>> 1 cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and >>>> parallelism of two). >>>> >>>> The main problem is one kafka source that has 3,8 million events that >>>> we have to process. >>>> As a test we made a simple job that connects to kafka using a custom >>>> implementation of KafkaDeserializationSchema. There we are using >>>> ObjectMapper that mapps input values eg. >>>> >>>> *var event = objectMapper.readValue(consumerRecord.value(), >>>> MyClass.class);* >>>> >>>> This is then validated with hibernate validator and output of this >>>> source is printed on the console. >>>> >>>> The time needed for the job to consume all the events was one and a >>>> half hours, which seems a bit long. >>>> Is there a way we can speed up this process? >>>> >>>> Is more cpu cores or memory solution? >>>> Should we switch to avro deserialization schema? >>>> >>>> >>>> >>>> > > -- > Everybody wants to be a winner > Nobody wants to lose their game > Its insane for me > Its insane for you > Its insane >