[ https://issues.apache.org/jira/browse/FLINK-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828107#comment-15828107 ]
ASF GitHub Bot commented on FLINK-5530: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3143#discussion_r96638710 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) { namespaceSerializer); int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); - return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + // we cannot reuse the keySerializationStream member since this method + // is called concurrently to the other ones and it may thus contain garbage + ByteArrayOutputStreamWithPos tmpKeySerializationStream = + new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = + new DataOutputViewStreamWrapper(tmpKeySerializationStream); + + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, + tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); + + return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); } protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + writeKeyWithGroupAndNamespace( + backend.getCurrentKeyGroupIndex(), + backend.getCurrentKey(), + currentNamespace, + this.keySerializationStream, + this.keySerializationDateDataOutputView); } - protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, + N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + keySerializationStream.reset(); - writeKeyGroup(keyGroup); - writeKey(key); - writeNameSpace(namespace); + writeKeyGroup(keyGroup, keySerializationDateDataOutputView); + writeKey(key, keySerializationStream, keySerializationDateDataOutputView); + writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView); } - private void writeKeyGroup(int keyGroup) throws IOException { + private void writeKeyGroup(int keyGroup, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); } } - private void writeKey(K key) throws IOException { + private void writeKey(K key, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + //write key int beforeWrite = keySerializationStream.getPosition(); backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write size of key - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } - private void writeNameSpace(N namespace) throws IOException { + private void writeNameSpace(N namespace, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { + int beforeWrite = keySerializationStream.getPosition(); namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { //write length of namespace - writeLengthFrom(beforeWrite); + writeLengthFrom(beforeWrite, keySerializationStream, + keySerializationDateDataOutputView); } } - private void writeLengthFrom(int fromPosition) throws IOException { + private static void writeLengthFrom(int fromPosition, + final ByteArrayOutputStreamWithPos keySerializationStream, + final DataOutputView keySerializationDateDataOutputView) throws + IOException { --- End diff -- I'll try to do that manually, too, as I didn't see a way to teach my IntelliJ style that behaviour > race condition in AbstractRocksDBState#getSerializedValue > --------------------------------------------------------- > > Key: FLINK-5530 > URL: https://issues.apache.org/jira/browse/FLINK-5530 > Project: Flink > Issue Type: Bug > Components: Queryable State > Affects Versions: 1.2.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > Priority: Blocker > > AbstractRocksDBState#getSerializedValue() uses the same key serialisation > stream as the ordinary state access methods but is called in parallel during > state queries thus violating the assumption of only one thread accessing it. > This may lead to either wrong results in queries or corrupt data while > queries are executed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)