OMG! Thank you! Thank you! I didn't think this could be a problem. When I
removed validation the time needed to ingest all events reduced to 10min.

BR,
BB

On Thu, May 27, 2021 at 11:50 AM Arvid Heise <ar...@apache.org> wrote:

> Hi,
>
> The implementation looks good. I'd probably cache the
> *ObjectValidator.of().getValidator()* in a field to be sure that it's not
> a pricey construction.
> Did you evaluate what happens when you skip the validation entirely in
> terms of records/s?
>
> On Thu, May 27, 2021 at 11:18 AM B.B. <bijela.vr...@gmail.com> wrote:
>
>> I am having a problem with sending code. So here it is. Hope this now
>> looks ok
>>
>> This is my main job (some parts of codes are abbreviated and this is the
>> main part):
>>
>> *public class MyJob {*
>>
>> *  private StreamExecutionEnvironment env;*
>>
>> *  private static final Integer NUM_OF_PARALLEL_OPERATORS = 1;*
>>
>>
>> *  public static void main(String[] args) throws Exception {*
>>
>>
>> *   var myJob = new MyJob();*
>>
>> *   myJob.withExecutionEnvironment();*
>>
>> *   myJob.init();*
>>
>> *  }*
>>
>> *  public void init() throws Exception {*
>>
>> *    var settings = getApplicationProperties();*
>>
>>
>> *    SourceFunction<ProcessingResult<MyEvent>> customerAccountSource =*
>>
>> *        getSource(settings, settings.getProperty("my.topic"));*
>>
>>
>> *    DataStream<ProcessingResult<MyEvent>> mySource =*
>>
>> *
>> env.addSource(customerAccountSource).setParallelism(NUM_OF_PARALLEL_OPERATORS);*
>>
>>
>> *    mySource.print();*
>>
>> *    env.execute();*
>>
>> *  }*
>>
>>
>>
>>
>> *  private static SourceFunction<ProcessingResult<MyEvent>> getSource(
>>   Properties settings, String topic) {*
>>
>> *    var kafkaProps = getKafkaSourceProperties(settings);*
>>
>> *    return new FlinkKafkaConsumer<>(topic, new MyKafkaDeserializer(),
>> kafkaProps)*
>> *        .setStartFromEarliest()*
>> *        .assignTimestampsAndWatermarks(*
>> *
>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(100)))*
>> *        .setCommitOffsetsOnCheckpoints(true);*
>> *  }*
>>
>> *}*
>>
>> And this is myKafkaDeserializer:
>>
>>
>> *public class MyKafkaDeserializer implements
>> KafkaDeserializationSchema<ProcessingResult<MyEvent>> {*
>>
>> *  private final ObjectMapper objectMapper;*
>>
>> *  public MyKafkaDeserializer() {*
>>
>> *    this.objectMapper = JsonUtils.objectMapper();*
>>
>> *  }*
>>
>> *  @Override*
>> *  public boolean isEndOfStream(ProcessingResult<MyEvent>
>> processingResult) {*
>>
>> *    return false;*
>>
>> *  }*
>>
>> *  @Override*
>> *  public ProcessingResult<MyEvent> deserialize(ConsumerRecord<byte[],
>> byte[]> consumerRecord)*
>> *      throws Exception {*
>>
>> *    try {*
>>
>> *      var event = objectMapper.readValue(consumerRecord.value(),
>> MyEvent.class);*
>>
>> *      var violation =
>> ObjectValidator.of().getValidator().validate(event);*
>>
>>
>> *      if (!violation.isEmpty()) {*
>>
>> *        return ProcessingResult.error(*
>>
>> *            new ProcessingError(*
>>
>> *                asTransformationError(*
>>
>> *                    "constraint.violation",
>> toCustomerAccountValidationErrorString(violation))));*
>>
>> *      }*
>>
>>
>> *      return ProcessingResult.success(event);*
>>
>>
>> *    } catch (JsonProcessingException e) {*
>>
>> *      return ProcessingResult.error(*
>>
>> *          new ProcessingError(*
>>
>> *              asTransformationError("constraint.violation",
>> toStacktraceString(e))));*
>>
>> *    }*
>>
>> *  }*
>>
>> *  @Override*
>> *  public TypeInformation<ProcessingResult<MyEvent>> getProducedType() {*
>>
>> *    return TypeInformation.of(new TypeHint<ProcessingResult<MyEvent>>()
>> {});*
>>
>> *  }*
>>
>> *  private static String toCustomerAccountValidationErrorString(*
>> *      Set<ConstraintViolation<MyEvent>> errors) {*
>>
>> *    return
>> errors.stream().map(ConstraintViolation::getMessage).collect(Collectors.joining(";"));*
>>
>> *  }*
>>
>>
>>
>>
>>
>> On Tue, May 25, 2021 at 5:51 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Could you share your KafkaDeserializationSchema, we might be able to
>>> spot some optimization potential. You could also try out enableObjectReuse
>>> [1], which avoids copying data between tasks (not sure if you have any
>>> non-chained tasks).
>>>
>>> If you are on 1.13, you could check out the flamegraph to see where the
>>> bottleneck occurs. [2]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/execution/execution_configuration/
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>>>
>>> On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> That's a throughput of 700 records/second, which should be well below
>>>> theoretical limits of any deserializer (from hundreds thousands up to tens
>>>> of millions records/second/per single operator), unless your records are
>>>> huge or very complex.
>>>>
>>>> Long story short, I don't know of a magic bullet to help you solve your
>>>> problem. As always you have two options, either optimise/speed up your
>>>> code/job, or scale up.
>>>>
>>>> If you choose the former, think about Flink as just another Java
>>>> application. Check metrics and resource usage, and understand what resource
>>>> is the problem (cpu? memory? machine is swapping? io?). You might be able
>>>> to guess what's your bottleneck (reading from kafka? deserialisation?
>>>> something else? Flink itself?) by looking at some of the metrics
>>>> (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or
>>>> you can also simplify your job to bare minimum and test performance of
>>>> independent components. Also you can always attach a code profiler and
>>>> simply look at what's happening. First identify what's the source of the
>>>> bottleneck and then try to understand what's causing it.
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> [1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also
>>>> comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes
>>>> in the job graph based on busy/back pressured status and Flamegraph
>>>> support)
>>>>
>>>> wt., 25 maj 2021 o 15:44 B.B. <bijela.vr...@gmail.com> napisaƂ(a):
>>>>
>>>>> Hi,
>>>>>
>>>>> I am in the process of optimizing my job which at the moment by our
>>>>> thinking is too slow.
>>>>>
>>>>> We are deploying job in kubernetes with 1 job manager with 1gb ram and
>>>>> 1 cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and
>>>>> parallelism of two).
>>>>>
>>>>> The main problem is one kafka source that has 3,8 million events that
>>>>> we have to process.
>>>>> As a test we made a simple job that connects to kafka using a custom
>>>>> implementation of KafkaDeserializationSchema. There we are using
>>>>> ObjectMapper that mapps input values eg.
>>>>>
>>>>> *var event = objectMapper.readValue(consumerRecord.value(),
>>>>> MyClass.class);*
>>>>>
>>>>> This is then validated with hibernate validator and output of this
>>>>> source is printed on the console.
>>>>>
>>>>> The time needed for the job to consume all the events was one and a
>>>>> half hours, which seems a bit long.
>>>>> Is there a way we can speed up this process?
>>>>>
>>>>> Is more cpu cores or memory solution?
>>>>> Should we switch to avro deserialization schema?
>>>>>
>>>>>
>>>>>
>>>>>
>>
>> --
>> Everybody wants to be a winner
>> Nobody wants to lose their game
>> Its insane for me
>> Its insane for you
>> Its insane
>>
>

-- 
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane

Reply via email to