Hey Krzysztof, Re 1. I believe you are asking where the state is kept. It is stored in memory, but bear in mind there is only ever state kept for the current key. Once all records for a key are processed the corresponding state is discarded as it won't be needed anymore.
Re 2. The sorting algorithm keeps records in serialized form in the managed memory of an operator[2]. It potentially spills the intermediate results to local disks once it reaches the sort spilling threshold[1]. Re 2,5 We know there is no more records for a given key once we receive a record with a key different from the previous one after sorting. (Sorting is applied on keys, it's more of a grouping, than really sorting). This is leveraged e.g. in the BatchExecutionKeyedStateBackend#setCurrentKey. ExternalSorter is not a public class and thus there is no usage examples, nor user facing documentation. Unfortunately best you can get is the javadocs/comments in the class itself. Best, Dawid [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-runtime-sort-spilling-threshold [2] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#configure-heap-and-managed-memory On 11/01/2022 17:07, Chesnay Schepler wrote: > Looping in Dawid who can hopefully answer your questions. > > On 11/01/2022 13:00, Krzysztof Chmielewski wrote: >> Hi, >> Im reading docs and FLIP-140 available for BATCH mode [1][2] where it >> reads that >> " In |BATCH| mode, the configured state backend is ignored. Instead, >> the input of a keyed operation is grouped by key (using sorting) and >> then we process all records of a key in turn." [1] >> >> I would like to ask: >> 1. Where (Heap, OffHeap) Flink keeps records for BATCH Streams if the >> configured state backed is ignored. In FLIP-140 i see there was a >> new State implementation created, that is prepared to keep only one >> key value, but there is no information "where" regarding memory it is >> kept. >> >> 2. Where Sorting algorithm keeps it intermediate results? >> How/Who knows that there will be no more records for given key? >> >> If I get it right, sorting is done through ExternalSorter class. Is >> there any documentation or usage example for ExternalSorter and >> description about SortStege like READ, SORT, SPILL? >> >> Regards, >> Krzysztof Chmielewski >> >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/ >> [2] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Howtosort/groupkeys > >
OpenPGP_signature
Description: OpenPGP digital signature