Location>code7788 >text

Paimon lookup store implementation

Popularity:387 ℃/2024-10-29 23:39:47

Lookup Store is mainly used for Lookup Compaction and Lookup join scenarios in Paimon. It converts remote lookup files into KV lookup format locally.

Hash

/linkedin/PalDB

Sort

/dain/leveldb
/apache/paimon/pull/3770

Pasted image 20241029081723
Overall document structure.

Pasted image 20241029230800|182

Advantages over Hash File

  • Write once, avoiding file merge.
  • Sequential write, maintains the original order of the key, and improves cache efficiency if subsequent lookups are done in the order of the key.

SortLookupStoreWriter

SortLookupStoreWriter#put

put

@Override
public void put(byte[] key, byte[] value) throws IOException {
	(key, value);
	if (bloomFilter != null) {
		((key));
	}

	lastKey = key;

	// (coll.) fail (a student)BlockWriterWrite reaches a certain threshold, The default is cache-page-size=64kb.
	if (() > blockSize) {
		flush();
	}

	recordCount++;
}

flush

private void flush() throws IOException {
    if (() == 0) {
        return;
    }  
	// commander-in-chief (military)data blockWrite to data file, and record the correspondingpositionand length
    BlockHandle blockHandle = writeBlock(dataBlockWriter);
    MemorySlice handleEncoding = writeBlockHandle(blockHandle);
    // commander-in-chief (military)BlockHandle writeindex writer, This also by being aBlockWriterwrote
    (lastKey, ());
}

writeBlock

private BlockHandle writeBlock(BlockWriter blockWriter) throws IOException {
	// close the block
	// gainblockThe complete array of, this timeblockWriterThe arrays in the, Instead, it will continue to reuse
	MemorySlice block = ();

	totalUncompressedSize += ();

	// attempt to compress the block
	BlockCompressionType blockCompressionType = ;
	if (blockCompressor != null) {
		int maxCompressedSize = (());
		byte[] compressed = allocateReuseBytes(maxCompressedSize + 5);
		int offset = encodeInt(compressed, 0, ());
		int compressedSize =
				offset
						+ (
								(),
								(),
								(),
								compressed,
								offset);

		// Don't use the compressed data if compressed less than 12.5%,
		if (compressedSize < () - (() / 8)) {
			block = new MemorySlice((compressed), 0, compressedSize);
			blockCompressionType = ;
		}
	}

	totalCompressedSize += ();

	// create block trailer
	// each pieceblockThere will be atrailer, Record the compression type andcrc32check digit
	BlockTrailer blockTrailer =
			new BlockTrailer(blockCompressionType, crc32c(block, blockCompressionType));
	MemorySlice trailer = (blockTrailer);

	// create a handle to this block
	// BlockHandle Recorded everyblockreallypositionand length
	BlockHandle blockHandle = new BlockHandle(position, ());

	// write data
	// Append data to a disk file
	writeSlice(block);

	// write trailer: 5 bytes
	// writetrailer
	writeSlice(trailer);

	// clean up state
	();

	return blockHandle;
}

close

public close() throws IOException {
	// flush current data block
	flush();

	("Number of record: {}", recordCount);

	// write bloom filter
	@Nullable BloomFilterHandle bloomFilterHandle = null;
	if (bloomFilter != null) {
		MemorySegment buffer = ();
		bloomFilterHandle =
				new BloomFilterHandle(position, (), ());
		writeSlice((buffer));
		("Bloom filter size: {} bytes", ().size());
	}

	// write index block
	// commander-in-chief (military)indexWrite data to file
	BlockHandle indexBlockHandle = writeBlock(indexBlockWriter);

	// write footer
	// Footer record (in sports etc)bloomfiler + index
	Footer footer = new Footer(bloomFilterHandle, indexBlockHandle);
	MemorySlice footerEncoding = (footer);
	writeSlice(footerEncoding);

	// Final closure of the file
	// close file
	();

	("totalUncompressedSize: {}", (totalUncompressedSize));
	("totalCompressedSize: {}", (totalCompressedSize));
	return new SortContext(position);
}

BlockWriter

add

public void add(byte[] key, byte[] value) {
int startPosition = ();
// Write the length of key
(); int startPosition = (); // Write the length of the key.
// Write key
(key);
// Write length of value
(); // Write the value length.
// Write value
(value); int endPosition = (); // Write value length
int endPosition = ();

// Use an int array to record the start position of each KV pair as an index
(startPosition); // Use an int array to record the start position of each KV pair as an index.
// Whether or not to align. Alignment depends on the length of each KV pair being the same.
if (aligned) {
int currentSize = endPosition - startPosition; // if (alignedSize == KV pair; // if not aligned.
if (alignedSize == 0) {
alignedSize = currentSize; } else { int currentSize = endPosition - startPosition
} else {
aligned = alignedSize == currentSize; } else { aligned = alignedSize == currentSize.
}
}
}
  • The block here corresponds to an expandable MemorySegment, i.e., thebyte[] , when the length of the write exceeds the length of the current array, it will be expanded.

finish

public MemorySlice finish() throws IOException {
if (()) {
throw new IllegalStateException(); }
}
// When the length of the data written by the BlockWriter is aligned, you don't need to record the index of each position, just an aligned length, which you can compute yourself when you read it.
if (aligned) {
(alignedSize); } else {
} else {
for (int i = 0; i < (); i++) {
((i)); } else {
}
(());
}
(aligned ? () : ()); }
return ();
}

wrap-up

The whole process of writing out a file is very simple, it's just writing it out on a block-by-block basis, and keeping track of the position of each block as an index.

SortLookupStoreReader

The main purpose of the read process is to find out if the key exists, and if it corresponds to a value or a line number.

public byte[] lookup(byte[] key) throws IOException {
	// first throughbloomfilterprejudge
	if (bloomFilter != null && !((key))) {
		return null;
	}

	MemorySlice keySlice = (key);
	// seek the index to the block containing the key
	(keySlice);

	// if indexIterator does not have a next, it means the key does not exist in this iterator
	if (()) {
		// seek the current iterator to the key
		// Based on the data fromindex blockin thekey valuelocation(BlockHandle), Read the correspondingvalue block
		BlockIterator current = getNextBlock();
		// existvalue(used form a nominal expression)iteratorThe second bisection in the search for the correspondingblock中是否存existmatch(used form a nominal expression)key, 如果存exist则返回对应(used form a nominal expression)数据
		if ((keySlice)) {
			return ().getValue().copyBytes();
		}
	}
	return null;
}
  • Finding a key once results in two binary lookups (index + value).

BlockReader

// through (a gap)blockCreate aiterator
public BlockIterator iterator() {
	BlockAlignedType alignedType =
			((() - 1));
	int intValue = (() - 5);
	if (alignedType == ALIGNED) {
		return new AlignedIterator((0, () - 5), intValue, comparator);
	} else {
		int indexLength = intValue * 4;
		int indexOffset = () - 5 - indexLength;
		MemorySlice data = (0, indexOffset);
		MemorySlice index = (indexOffset, indexLength);
		return new UnalignedIterator(data, index, comparator);
	}
}

SliceCompartor

This passes in a keyComparator, which is used for key comparisons. This is used to do a binary lookup in the index. The comparison is not directly based on the original data, but is sorted based on MemorySlice.

The comparison process deserializes the fields of the key from the MemorySegment, casts them into a Comparable and compares them.

public SliceComparator(RowType rowType) {
	int bitSetInBytes = calculateBitSetInBytes(());
	this.reader1 = new RowReader(bitSetInBytes);
	this.reader2 = new RowReader(bitSetInBytes);
	 = new FieldReader[()];
	for (int i = 0; i < (); i++) {
		fieldReaders[i] = createFieldReader((i));
	}
}

@Override
public int compare(MemorySlice slice1, MemorySlice slice2) {
	((), ());
	((), ());
	for (int i = 0; i < ; i++) {
		boolean isNull1 = (i);
		boolean isNull2 = (i);
		if (!isNull1 || !isNull2) {
			if (isNull1) {
				return -1;
			} else if (isNull2) {
				return 1;
			} else {
				FieldReader fieldReader = fieldReaders[i];
				Object o1 = (reader1, i);
				Object o2 = (reader2, i);
				@SuppressWarnings({"unchecked", "rawtypes"})
				int comp = ((Comparable) o1).compareTo(o2);
				if (comp != 0) {
					return comp;
				}
			}
		}
	}
	return 0;
}

The lookup is implemented as a binary lookup, since the key is written in an ordered fashion.

public boolean seekTo(MemorySlice targetKey) {
	int left = 0;
	int right = recordCount - 1;

	while (left <= right) {
		int mid = left + (right - left) / 2;

		// insofar asaligned iterator, right awayseek record * recordSize
		// insofar asunaligned iterator, confine oneself towriterIndex table written to jump
		seekTo(mid);
		// Read akey value pair
		BlockEntry midEntry = readEntry();
		int compare = ((), targetKey);

		if (compare == 0) {
			polled = midEntry;
			return true;
		} else if (compare > 0) {
			polled = midEntry;
			right = mid - 1;
		} else {
			left = mid + 1;
		}
	}

	return false;
}

wrap-up

search process

  • Let's go through the bloom filter.
  • index index to find the block handle of the corresponding key
  • According to the handle in the second step, read the corresponding block, and find the corresponding key value in the block.