OMG! Thank you! Thank you! I didn't think this could be a problem. When I removed validation the time needed to ingest all events reduced to 10min.
BR, BB On Thu, May 27, 2021 at 11:50 AM Arvid Heise <ar...@apache.org> wrote: > Hi, > > The implementation looks good. I'd probably cache the > *ObjectValidator.of().getValidator()* in a field to be sure that it's not > a pricey construction. > Did you evaluate what happens when you skip the validation entirely in > terms of records/s? > > On Thu, May 27, 2021 at 11:18 AM B.B. <bijela.vr...@gmail.com> wrote: > >> I am having a problem with sending code. So here it is. Hope this now >> looks ok >> >> This is my main job (some parts of codes are abbreviated and this is the >> main part): >> >> *public class MyJob {* >> >> * private StreamExecutionEnvironment env;* >> >> * private static final Integer NUM_OF_PARALLEL_OPERATORS = 1;* >> >> >> * public static void main(String[] args) throws Exception {* >> >> >> * var myJob = new MyJob();* >> >> * myJob.withExecutionEnvironment();* >> >> * myJob.init();* >> >> * }* >> >> * public void init() throws Exception {* >> >> * var settings = getApplicationProperties();* >> >> >> * SourceFunction<ProcessingResult<MyEvent>> customerAccountSource =* >> >> * getSource(settings, settings.getProperty("my.topic"));* >> >> >> * DataStream<ProcessingResult<MyEvent>> mySource =* >> >> * >> env.addSource(customerAccountSource).setParallelism(NUM_OF_PARALLEL_OPERATORS);* >> >> >> * mySource.print();* >> >> * env.execute();* >> >> * }* >> >> >> >> >> * private static SourceFunction<ProcessingResult<MyEvent>> getSource( >> Properties settings, String topic) {* >> >> * var kafkaProps = getKafkaSourceProperties(settings);* >> >> * return new FlinkKafkaConsumer<>(topic, new MyKafkaDeserializer(), >> kafkaProps)* >> * .setStartFromEarliest()* >> * .assignTimestampsAndWatermarks(* >> * >> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(100)))* >> * .setCommitOffsetsOnCheckpoints(true);* >> * }* >> >> *}* >> >> And this is myKafkaDeserializer: >> >> >> *public class MyKafkaDeserializer implements >> KafkaDeserializationSchema<ProcessingResult<MyEvent>> {* >> >> * private final ObjectMapper objectMapper;* >> >> * public MyKafkaDeserializer() {* >> >> * this.objectMapper = JsonUtils.objectMapper();* >> >> * }* >> >> * @Override* >> * public boolean isEndOfStream(ProcessingResult<MyEvent> >> processingResult) {* >> >> * return false;* >> >> * }* >> >> * @Override* >> * public ProcessingResult<MyEvent> deserialize(ConsumerRecord<byte[], >> byte[]> consumerRecord)* >> * throws Exception {* >> >> * try {* >> >> * var event = objectMapper.readValue(consumerRecord.value(), >> MyEvent.class);* >> >> * var violation = >> ObjectValidator.of().getValidator().validate(event);* >> >> >> * if (!violation.isEmpty()) {* >> >> * return ProcessingResult.error(* >> >> * new ProcessingError(* >> >> * asTransformationError(* >> >> * "constraint.violation", >> toCustomerAccountValidationErrorString(violation))));* >> >> * }* >> >> >> * return ProcessingResult.success(event);* >> >> >> * } catch (JsonProcessingException e) {* >> >> * return ProcessingResult.error(* >> >> * new ProcessingError(* >> >> * asTransformationError("constraint.violation", >> toStacktraceString(e))));* >> >> * }* >> >> * }* >> >> * @Override* >> * public TypeInformation<ProcessingResult<MyEvent>> getProducedType() {* >> >> * return TypeInformation.of(new TypeHint<ProcessingResult<MyEvent>>() >> {});* >> >> * }* >> >> * private static String toCustomerAccountValidationErrorString(* >> * Set<ConstraintViolation<MyEvent>> errors) {* >> >> * return >> errors.stream().map(ConstraintViolation::getMessage).collect(Collectors.joining(";"));* >> >> * }* >> >> >> >> >> >> On Tue, May 25, 2021 at 5:51 PM Arvid Heise <ar...@apache.org> wrote: >> >>> Could you share your KafkaDeserializationSchema, we might be able to >>> spot some optimization potential. You could also try out enableObjectReuse >>> [1], which avoids copying data between tasks (not sure if you have any >>> non-chained tasks). >>> >>> If you are on 1.13, you could check out the flamegraph to see where the >>> bottleneck occurs. [2] >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/execution/execution_configuration/ >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/ >>> >>> On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski <pnowoj...@apache.org> >>> wrote: >>> >>>> Hi, >>>> >>>> That's a throughput of 700 records/second, which should be well below >>>> theoretical limits of any deserializer (from hundreds thousands up to tens >>>> of millions records/second/per single operator), unless your records are >>>> huge or very complex. >>>> >>>> Long story short, I don't know of a magic bullet to help you solve your >>>> problem. As always you have two options, either optimise/speed up your >>>> code/job, or scale up. >>>> >>>> If you choose the former, think about Flink as just another Java >>>> application. Check metrics and resource usage, and understand what resource >>>> is the problem (cpu? memory? machine is swapping? io?). You might be able >>>> to guess what's your bottleneck (reading from kafka? deserialisation? >>>> something else? Flink itself?) by looking at some of the metrics >>>> (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or >>>> you can also simplify your job to bare minimum and test performance of >>>> independent components. Also you can always attach a code profiler and >>>> simply look at what's happening. First identify what's the source of the >>>> bottleneck and then try to understand what's causing it. >>>> >>>> Best, >>>> Piotrek >>>> >>>> [1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also >>>> comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes >>>> in the job graph based on busy/back pressured status and Flamegraph >>>> support) >>>> >>>> wt., 25 maj 2021 o 15:44 B.B. <bijela.vr...@gmail.com> napisaĆ(a): >>>> >>>>> Hi, >>>>> >>>>> I am in the process of optimizing my job which at the moment by our >>>>> thinking is too slow. >>>>> >>>>> We are deploying job in kubernetes with 1 job manager with 1gb ram and >>>>> 1 cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and >>>>> parallelism of two). >>>>> >>>>> The main problem is one kafka source that has 3,8 million events that >>>>> we have to process. >>>>> As a test we made a simple job that connects to kafka using a custom >>>>> implementation of KafkaDeserializationSchema. There we are using >>>>> ObjectMapper that mapps input values eg. >>>>> >>>>> *var event = objectMapper.readValue(consumerRecord.value(), >>>>> MyClass.class);* >>>>> >>>>> This is then validated with hibernate validator and output of this >>>>> source is printed on the console. >>>>> >>>>> The time needed for the job to consume all the events was one and a >>>>> half hours, which seems a bit long. >>>>> Is there a way we can speed up this process? >>>>> >>>>> Is more cpu cores or memory solution? >>>>> Should we switch to avro deserialization schema? >>>>> >>>>> >>>>> >>>>> >> >> -- >> Everybody wants to be a winner >> Nobody wants to lose their game >> Its insane for me >> Its insane for you >> Its insane >> > -- Everybody wants to be a winner Nobody wants to lose their game Its insane for me Its insane for you Its insane