timoninmaxim commented on code in PR #10798: URL: https://github.com/apache/ignite/pull/10798#discussion_r1251848192
########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java: ########## @@ -184,16 +142,13 @@ protected void advance() throws IgniteCheckedException { return; } - else { Review Comment: Here and below - non-required changes. Let's revert them. ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInput.java: ########## @@ -36,4 +39,230 @@ public interface ByteBufferBackedDataInput extends DataInput { * @throws IOException If failed. */ public void ensure(int requested) throws IOException; + + /** + * @return Position in the stream. + */ + int position(); Review Comment: Methods in interfaces must be public. ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInput.java: ########## @@ -36,4 +39,230 @@ public interface ByteBufferBackedDataInput extends DataInput { * @throws IOException If failed. */ public void ensure(int requested) throws IOException; + + /** + * @return Position in the stream. + */ + int position(); + + /** + * @param skipCheck If CRC check should be skipped. + * @return autoclosable fileInput, after its closing crc32 will be calculated and compared with saved one + */ + CrcCheckingDataInput startRead(boolean skipCheck); + + /** + * Checking of CRC32. + */ + public class CrcCheckingDataInput implements ByteBufferBackedDataInput, AutoCloseable { Review Comment: Crc32? ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIterator.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Optional; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.NotNull; + +/** Byte Buffer WAL Iterator */ +public class ByteBufferWalIterator extends WalRecordsIteratorAdaptor { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final transient ByteBuffer buf; Review Comment: Why transient, let's remove this. ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalRecordsIteratorAdaptor.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.NotNull; + +/** + * Iterator over WAL segments. This abstract class provides most functionality for reading records. + */ +public abstract class WalRecordsIteratorAdaptor + extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> implements WALIterator { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Current record preloaded, to be returned on next()<br> Normally this should be not null because advance() method + * should already prepare some value<br> + */ + protected IgniteBiTuple<WALPointer, WALRecord> curRec; + + /** + * The exception which can be thrown during reading next record. It holds until the next calling of next record. + */ + private IgniteCheckedException curException; + + /** Logger */ + @NotNull protected final IgniteLogger log; + + /** Position of last read valid record. */ + private WALPointer lastRead; Review Comment: unused variable ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java: ########## @@ -49,22 +47,10 @@ * Iterator over WAL segments. This abstract class provides most functionality for reading records in log. Subclasses * are to override segment switching functionality */ -public abstract class AbstractWalRecordsIterator - extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> implements WALIterator { +public abstract class AbstractWalRecordsIterator extends WalRecordsIteratorAdaptor { Review Comment: AbstractFileWalRecordsIterator? ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java: ########## @@ -133,8 +133,8 @@ private void clearBuffer() { /** * @return Position in the stream. */ - @Override public long position() { - return pos - buf.remaining(); + @Override public int position() { Review Comment: Why this change? ########## modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIteratorTest.java: ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.testframework.wal.record.RecordUtils; +import org.apache.ignite.testframework.wal.record.UnsupportedWalRecord; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +/** + * + */ +public class ByteBufferWalIteratorTest extends GridCommonAbstractTest { Review Comment: I'd like to have 2 additional tests: 1. Check that empty buffer is handled correctly 2. WAL segment read from disk into a buffer is handled correctly. ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java: ########## @@ -386,7 +385,8 @@ static WALRecord readWithCrc( long size = -1; try { - size = in0.io().size(); + if (in0 instanceof FileInput) Review Comment: Why do you calculate size only for file? I think we can calculate it for the buffer also. ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInput.java: ########## @@ -36,4 +39,230 @@ public interface ByteBufferBackedDataInput extends DataInput { * @throws IOException If failed. */ public void ensure(int requested) throws IOException; + + /** + * @return Position in the stream. + */ + int position(); + + /** + * @param skipCheck If CRC check should be skipped. + * @return autoclosable fileInput, after its closing crc32 will be calculated and compared with saved one + */ + CrcCheckingDataInput startRead(boolean skipCheck); + + /** + * Checking of CRC32. + */ + public class CrcCheckingDataInput implements ByteBufferBackedDataInput, AutoCloseable { + /** */ + private final FastCrc crc = new FastCrc(); + + /** Last calc position. */ + private int lastCalcPosition; + + /** Skip crc check. */ + private boolean skipCheck; + + /** */ + private ByteBufferBackedDataInput delegate; + + /** + */ + public CrcCheckingDataInput(ByteBufferBackedDataInput delegate, boolean skipCheck) { + this.delegate = delegate; + this.lastCalcPosition = position(); + this.skipCheck = skipCheck; + } + + /** {@inheritDoc} */ + @Override public int position() { + return delegate.buffer().position(); + } + + /** {@inheritDoc} */ + @Override public void ensure(int requested) throws IOException { + int available = buffer().remaining(); + + if (available >= requested) + return; + + updateCrc(); + + delegate.ensure(requested); + + lastCalcPosition = 0; + } + + /** {@inheritDoc} */ + @Override public CrcCheckingDataInput startRead(boolean skipCheck) { Review Comment: Very strange behavior. Let's remove this method at all, and replace it with explicit constructor. It looks like that this method was created as an attempt to incapsulate CRC logic. Also let's make this class a full citizen class (non-nested). ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIterator.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Optional; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.NotNull; + +/** Byte Buffer WAL Iterator */ +public class ByteBufferWalIterator extends WalRecordsIteratorAdaptor { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final transient ByteBuffer buf; + + /** */ + private final transient RecordSerializer serializer; + + /** */ + private final transient ByteBufferBackedDataInputImpl dataInput; + + /** */ + public ByteBufferWalIterator( + @NotNull IgniteLogger log, + GridCacheSharedContext<?, ?> cctx, + ByteBuffer byteBuf) throws IgniteCheckedException { + super(log); + + buf = byteBuf; + + RecordSerializerFactory rsf = new RecordSerializerFactoryImpl(cctx, + (t, p) -> t.purpose() == WALRecord.RecordPurpose.LOGICAL).skipPositionCheck(true); Review Comment: Why do you skip position check? ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalRecordsIteratorAdaptor.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.NotNull; + +/** + * Iterator over WAL segments. This abstract class provides most functionality for reading records. + */ +public abstract class WalRecordsIteratorAdaptor Review Comment: AbstractWalRecordsIteratorApdator ########## modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIterator.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Optional; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.NotNull; + +/** Byte Buffer WAL Iterator */ +public class ByteBufferWalIterator extends WalRecordsIteratorAdaptor { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final transient ByteBuffer buf; + + /** */ + private final transient RecordSerializer serializer; + + /** */ + private final transient ByteBufferBackedDataInputImpl dataInput; + + /** */ + public ByteBufferWalIterator( + @NotNull IgniteLogger log, + GridCacheSharedContext<?, ?> cctx, + ByteBuffer byteBuf) throws IgniteCheckedException { + super(log); + + buf = byteBuf; + + RecordSerializerFactory rsf = new RecordSerializerFactoryImpl(cctx, + (t, p) -> t.purpose() == WALRecord.RecordPurpose.LOGICAL).skipPositionCheck(true); Review Comment: Why do you filter physical records? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org