import time
import threading
class Snowflake:
# Basic parameters
epoch = 1609459200000 # Customized start time (2021-01-01 00:00:00)
worker_id_bits = 10 # Number of bits occupied by the work machine ID
datacenter_id_bits = 10 # Number of bits occupied by the data center ID
sequence_bits = 12 # Number of bits occupied by the serial number
# Calculate the displacement of each part
worker_id_shift = sequence_bits # Serial number displacement
datacenter_id_shift = sequence_bits + worker_id_bits # Data Center ID Displacement
timestamp_shift = sequence_bits + worker_id_bits + datacenter_id_bits # timestamp displacement
# Generate Mask
max_worker_id = -1 ^ (-1 << worker_id_bits) # Calculate the maximum working machine ID
max_datacenter_id = -1 ^ (-1 << datacenter_id_bits) # Calculate Maximum Data Center ID
sequence_mask = -1 ^ (-1 << sequence_bits) # Calculating the Serial Number Mask
def __init__(self, worker_id, datacenter_id):
if worker_id > self.max_worker_id or worker_id < 0:
raise ValueError(f'worker_id must be between 0 and {self.max_worker_id}')
if datacenter_id > self.max_datacenter_id or datacenter_id < 0:
raise ValueError(f'datacenter_id must be between 0 and {self.max_datacenter_id}')
self.worker_id = worker_id
self.datacenter_id = datacenter_id
= 0 # Initial value of serial number
self.last_timestamp = -1 # Timestamp of the last time the ID was generated
def _current_time_millis(self):
return int(() * 1000) # Get current timestamp in milliseconds
def generate_id(self):
timestamp = self._current_time_millis() # Get current timestamp
if timestamp < self.last_timestamp:
raise Exception('Clock moved backward, unable to generate ID')
if self.last_timestamp == timestamp:
= ( + 1) & self.sequence_mask # Sequence number plus 1 in the same millisecond
if == 0:
# Sequence number full. Waiting for next millisecond.
while timestamp <= self.last_timestamp:
timestamp = self._current_time_millis()
else:
= 0 # 重置序列号
self.last_timestamp = timestamp # Update last timestamp
# Generate ID
id = ((timestamp - ) << self.timestamp_shift) | \
(self.datacenter_id << self.datacenter_id_shift) | \
(self.worker_id << self.worker_id_shift) | \
return id
# Example: Creating an instance of the snowflake algorithm and generating IDs
if __name__ == "__main__":
worker_id = 1 # Machine ID
datacenter_id = 1 # Data Center ID
snowflake = Snowflake(worker_id, datacenter_id)
# Generate 10 IDs
for _ in range(10):
print(snowflake.generate_id())