Heidy, which state backend are you using? With RocksDB Flink will have to do ser/de on every access and update, but with the FsStateBackend, your sparse matrix will sit in memory, and only have to be serialized during checkpointing.
David On Wed, Sep 9, 2020 at 2:41 PM Heidi Hazem Mohamed <h.ha...@nu.edu.eg> wrote: > Hi Walther, > > Many thanks for your answer, I declared the state type as below > > ValueStateDescriptor<SparseBinaryMatrix> descriptor = > new ValueStateDescriptor<SparseBinaryMatrix>( > "Rating Matrix", > TypeInformation.of(new TypeHint<SparseBinaryMatrix>() { > } > )); > > > Is there a better way? > > Regards, > > Heidy > ------------------------------ > *From:* Timo Walther <twal...@apache.org> > *Sent:* Wednesday, September 9, 2020 1:58 PM > *To:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Slow Performance inquiry > > Hi Hazem, > > I guess your performance is mostly driven by the serialization overhead > in this case. How do you declare your state type? > > Flink comes with different serializers. Not all of them are extracted > automatically when using reflective extraction methods: > > - Note that `Serializable` declaration has no effect for Flink, other > than NOT using Flink's efficient serializers. > - Flink's POJO serializer only works with a default constructor present. > - Row needs to explicit declaration of fields. > > Regards, > Timo > > > On 09.09.20 13:08, Heidi Hazem Mohamed wrote: > > Dear, > > > > I am writing a Flink program(Recommender system) needed a matrix as a > > state which is the rating matrix, While the matrix is very sparse, I > > implemented a sparse binary matrix to save the memory and save only the > > ones, not all the matrix and use it as a data type and save it in a > > value State but unexpectedly the performance became terrible and the job > > became very slow, I wonder any suggestion to know what is the problem? > > > > My first implementation for the rating matrix state : > > > > MapState<String, Map<String, Float>>ratingMatrix; > > > > > > The second implementation (the slow one) for rating matrix state: > > > > ValueState<SparseBinaryMatrix>userItemRatingHistory; > > > > > > and this apart from sparseBinaryMatrix class > > > > public class SparseBinaryMatriximplements Serializable { > > > > private ArrayList<Row>content; > > > > private int rowLength; > > > > private HashMap<String, Integer>columnLabels; > > private HashMap<Integer, String>inverseColumnLabels; > > > > private HashMap<String, Integer>rowLabels; > > private HashMap<Integer, String>inverseRowLabels; > > > > private enum LabelerType{Row, Column}; > > > > public IntegercolNumber; > > public IntegerrowNumber; > > > > > > // This constructor initializes the matrix with zeros > > public SparseBinaryMatrix(int rows, int columns) > > { > > content =new ArrayList<>(rows); > > rowLength = columns; > > // for (int i = 0; i < rows; i++) > > // content.add(new Row(columns)); > > > > > > } > > > > > > > > Is depending on other class (Row) may lead to this terrible performance > > while Row is class I have implemented and this is part of it > > > > public class Rowimplements Serializable { > > //This is an alternating sorted array > > private ArrayList<Integer>content; > > private int length=0; > > > > public Row (int numbColumns) > > { > > length = numbColumns; > > for (int i =0; i < numbColumns;i++) > > setColumnToZero(i); > > } > > > > public Row (int[] initialValues ) > > { > > length = initialValues.length; > > content =new ArrayList<>(length); > > for (int i =0; i <length;i++) > > setColumn(i, initialValues[i]); > > } > > > > > > Regards, > > > > Heidy > > > >