Lookup Store is mainly used for Lookup Compaction and Lookup join scenarios in Paimon. It converts remote lookup files into KV lookup format locally.
Hash
/linkedin/PalDB
Sort
/dain/leveldb
/apache/paimon/pull/3770
Overall document structure.
Advantages over Hash File
- Write once, avoiding file merge.
- Sequential write, maintains the original order of the key, and improves cache efficiency if subsequent lookups are done in the order of the key.
SortLookupStoreWriter
SortLookupStoreWriter#put
put
@Override
public void put(byte[] key, byte[] value) throws IOException {
(key, value);
if (bloomFilter != null) {
((key));
}
lastKey = key;
// (coll.) fail (a student)BlockWriterWrite reaches a certain threshold, The default is cache-page-size=64kb.
if (() > blockSize) {
flush();
}
recordCount++;
}
flush
private void flush() throws IOException {
if (() == 0) {
return;
}
// commander-in-chief (military)data blockWrite to data file, and record the correspondingpositionand length
BlockHandle blockHandle = writeBlock(dataBlockWriter);
MemorySlice handleEncoding = writeBlockHandle(blockHandle);
// commander-in-chief (military)BlockHandle writeindex writer, This also by being aBlockWriterwrote
(lastKey, ());
}
writeBlock
private BlockHandle writeBlock(BlockWriter blockWriter) throws IOException {
// close the block
// gainblockThe complete array of, this timeblockWriterThe arrays in the, Instead, it will continue to reuse
MemorySlice block = ();
totalUncompressedSize += ();
// attempt to compress the block
BlockCompressionType blockCompressionType = ;
if (blockCompressor != null) {
int maxCompressedSize = (());
byte[] compressed = allocateReuseBytes(maxCompressedSize + 5);
int offset = encodeInt(compressed, 0, ());
int compressedSize =
offset
+ (
(),
(),
(),
compressed,
offset);
// Don't use the compressed data if compressed less than 12.5%,
if (compressedSize < () - (() / 8)) {
block = new MemorySlice((compressed), 0, compressedSize);
blockCompressionType = ;
}
}
totalCompressedSize += ();
// create block trailer
// each pieceblockThere will be atrailer, Record the compression type andcrc32check digit
BlockTrailer blockTrailer =
new BlockTrailer(blockCompressionType, crc32c(block, blockCompressionType));
MemorySlice trailer = (blockTrailer);
// create a handle to this block
// BlockHandle Recorded everyblockreallypositionand length
BlockHandle blockHandle = new BlockHandle(position, ());
// write data
// Append data to a disk file
writeSlice(block);
// write trailer: 5 bytes
// writetrailer
writeSlice(trailer);
// clean up state
();
return blockHandle;
}
close
public close() throws IOException {
// flush current data block
flush();
("Number of record: {}", recordCount);
// write bloom filter
@Nullable BloomFilterHandle bloomFilterHandle = null;
if (bloomFilter != null) {
MemorySegment buffer = ();
bloomFilterHandle =
new BloomFilterHandle(position, (), ());
writeSlice((buffer));
("Bloom filter size: {} bytes", ().size());
}
// write index block
// commander-in-chief (military)indexWrite data to file
BlockHandle indexBlockHandle = writeBlock(indexBlockWriter);
// write footer
// Footer record (in sports etc)bloomfiler + index
Footer footer = new Footer(bloomFilterHandle, indexBlockHandle);
MemorySlice footerEncoding = (footer);
writeSlice(footerEncoding);
// Final closure of the file
// close file
();
("totalUncompressedSize: {}", (totalUncompressedSize));
("totalCompressedSize: {}", (totalCompressedSize));
return new SortContext(position);
}
BlockWriter
add
public void add(byte[] key, byte[] value) {
int startPosition = ();
// Write the length of key
(); int startPosition = (); // Write the length of the key.
// Write key
(key);
// Write length of value
(); // Write the value length.
// Write value
(value); int endPosition = (); // Write value length
int endPosition = ();
// Use an int array to record the start position of each KV pair as an index
(startPosition); // Use an int array to record the start position of each KV pair as an index.
// Whether or not to align. Alignment depends on the length of each KV pair being the same.
if (aligned) {
int currentSize = endPosition - startPosition; // if (alignedSize == KV pair; // if not aligned.
if (alignedSize == 0) {
alignedSize = currentSize; } else { int currentSize = endPosition - startPosition
} else {
aligned = alignedSize == currentSize; } else { aligned = alignedSize == currentSize.
}
}
}
- The block here corresponds to an expandable MemorySegment, i.e., the
byte[]
, when the length of the write exceeds the length of the current array, it will be expanded.
finish
public MemorySlice finish() throws IOException {
if (()) {
throw new IllegalStateException(); }
}
// When the length of the data written by the BlockWriter is aligned, you don't need to record the index of each position, just an aligned length, which you can compute yourself when you read it.
if (aligned) {
(alignedSize); } else {
} else {
for (int i = 0; i < (); i++) {
((i)); } else {
}
(());
}
(aligned ? () : ()); }
return ();
}
wrap-up
The whole process of writing out a file is very simple, it's just writing it out on a block-by-block basis, and keeping track of the position of each block as an index.
SortLookupStoreReader
The main purpose of the read process is to find out if the key exists, and if it corresponds to a value or a line number.
public byte[] lookup(byte[] key) throws IOException {
// first throughbloomfilterprejudge
if (bloomFilter != null && !((key))) {
return null;
}
MemorySlice keySlice = (key);
// seek the index to the block containing the key
(keySlice);
// if indexIterator does not have a next, it means the key does not exist in this iterator
if (()) {
// seek the current iterator to the key
// Based on the data fromindex blockin thekey valuelocation(BlockHandle), Read the correspondingvalue block
BlockIterator current = getNextBlock();
// existvalue(used form a nominal expression)iteratorThe second bisection in the search for the correspondingblock中是否存existmatch(used form a nominal expression)key, 如果存exist则返回对应(used form a nominal expression)数据
if ((keySlice)) {
return ().getValue().copyBytes();
}
}
return null;
}
- Finding a key once results in two binary lookups (index + value).
BlockReader
// through (a gap)blockCreate aiterator
public BlockIterator iterator() {
BlockAlignedType alignedType =
((() - 1));
int intValue = (() - 5);
if (alignedType == ALIGNED) {
return new AlignedIterator((0, () - 5), intValue, comparator);
} else {
int indexLength = intValue * 4;
int indexOffset = () - 5 - indexLength;
MemorySlice data = (0, indexOffset);
MemorySlice index = (indexOffset, indexLength);
return new UnalignedIterator(data, index, comparator);
}
}
SliceCompartor
This passes in a keyComparator, which is used for key comparisons. This is used to do a binary lookup in the index. The comparison is not directly based on the original data, but is sorted based on MemorySlice.
The comparison process deserializes the fields of the key from the MemorySegment, casts them into a Comparable and compares them.
public SliceComparator(RowType rowType) {
int bitSetInBytes = calculateBitSetInBytes(());
this.reader1 = new RowReader(bitSetInBytes);
this.reader2 = new RowReader(bitSetInBytes);
= new FieldReader[()];
for (int i = 0; i < (); i++) {
fieldReaders[i] = createFieldReader((i));
}
}
@Override
public int compare(MemorySlice slice1, MemorySlice slice2) {
((), ());
((), ());
for (int i = 0; i < ; i++) {
boolean isNull1 = (i);
boolean isNull2 = (i);
if (!isNull1 || !isNull2) {
if (isNull1) {
return -1;
} else if (isNull2) {
return 1;
} else {
FieldReader fieldReader = fieldReaders[i];
Object o1 = (reader1, i);
Object o2 = (reader2, i);
@SuppressWarnings({"unchecked", "rawtypes"})
int comp = ((Comparable) o1).compareTo(o2);
if (comp != 0) {
return comp;
}
}
}
}
return 0;
}
The lookup is implemented as a binary lookup, since the key is written in an ordered fashion.
public boolean seekTo(MemorySlice targetKey) {
int left = 0;
int right = recordCount - 1;
while (left <= right) {
int mid = left + (right - left) / 2;
// insofar asaligned iterator, right awayseek record * recordSize
// insofar asunaligned iterator, confine oneself towriterIndex table written to jump
seekTo(mid);
// Read akey value pair
BlockEntry midEntry = readEntry();
int compare = ((), targetKey);
if (compare == 0) {
polled = midEntry;
return true;
} else if (compare > 0) {
polled = midEntry;
right = mid - 1;
} else {
left = mid + 1;
}
}
return false;
}
wrap-up
search process
- Let's go through the bloom filter.
- index index to find the block handle of the corresponding key
- According to the handle in the second step, read the corresponding block, and find the corresponding key value in the block.