preamble
A thread is the smallest unit of computing scheduling that the operating system can perform, it is contained within the process and is the actual unit of operation within the process. Due to CPython's GIL limitations, multithreading is effectively single-threaded and is mostly used only for IO-intensive tasks.
Python generally uses the standard librarythreading
to perform multithreaded programming.
Basic use
- Mode 1, Create
Examples of Classes
import threading
import time
def task1(counter: int):
print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {('%F %T')}")
num = counter
while num > 0:
(3)
num -= 1
print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {('%F %T')}")
if __name__ == "__main__":
print(f"main thread: {threading.current_thread().name}, start time: {('%F %T')}")
# Create three threads
t1 = (target=task1, args=(7,))
t2 = (target=task1, args=(5,))
t3 = (target=task1, args=(3,))
# Starting a thread
()
()
()
# join() Used to block the main thread, Wait for the child thread to finish executing
()
()
()
print(f"main thread: {threading.current_thread().name}, end time: {('%F %T')}")
Sample Execution Output
main thread: MainThread, start time: 2024-10-26 12:42:37
thread: Thread-1 (task1), args: 7, start time: 2024-10-26 12:42:37
thread: Thread-2 (task1), args: 5, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, end time: 2024-10-26 12:42:46
thread: Thread-2 (task1), args: 5, end time: 2024-10-26 12:42:52
thread: Thread-1 (task1), args: 7, end time: 2024-10-26 12:42:58
main thread: MainThread, end time: 2024-10-26 12:42:58
- Mode 2, Inheritance
class, overriding the
run()
respond in singing__init__()
methodologies
import threading
import time
class MyThread():
def __init__(self, counter: int):
super().__init__()
= counter
def run(self):
print(f"thread: {threading.current_thread().name}, args: {}, start time: {('%F %T')}")
num =
while num > 0:
(3)
num -= 1
print(f"thread: {threading.current_thread().name}, args: {}, end time: {('%F %T')}")
if __name__ == "__main__":
print(f"main thread: {threading.current_thread().name}, start time: {('%F %T')}")
# Create three threads
t1 = MyThread(7)
t2 = MyThread(5)
t3 = MyThread(3)
# Starting a thread
()
()
()
# join() Used to block the main thread, Wait for the child thread to finish executing
()
()
()
print(f"main thread: {threading.current_thread().name}, end time: {('%F %T')}")
predecessorClasses can also be written to call external functions like this.
import threading
import time
def task1(counter: int):
print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {('%F %T')}")
num = counter
while num > 0:
(3)
num -= 1
print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {('%F %T')}")
class MyThread():
def __init__(self, target, args: tuple):
super().__init__()
= target
= args
def run(self):
(*)
if __name__ == "__main__":
print(f"main thread: {threading.current_thread().name}, start time: {('%F %T')}")
# Create three threads
t1 = MyThread(target=task1, args=(7,))
t2 = MyThread(target=task1, args=(5,))
t3 = MyThread(target=task1, args=(3,))
# Starting a thread
()
()
()
# join() Used to block the main thread, Wait for the child thread to finish executing
()
()
()
print(f"main thread: {threading.current_thread().name}, end time: {('%F %T')}")
multithreaded synchronization
If multiple threads work together to modify some data, there may be unforeseen consequences, which requires some synchronization mechanism. For example, the following code, the result is random (personal computer with python3.13 real results are 0, while the lower version of python3.6 run results are indeed random)
import threading
import time
num = 0
def task1(counter: int):
print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {('%F %T')}")
global num
for _ in range(100000000):
num = num + counter
num = num - counter
print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {('%F %T')}")
if __name__ == "__main__":
print(f"main thread: {threading.current_thread().name}, start time: {('%F %T')}")
# Create three threads
t1 = (target=task1, args=(7,))
t2 = (target=task1, args=(5,))
t3 = (target=task1, args=(3,))
t4 = (target=task1, args=(6,))
t5 = (target=task1, args=(8,))
# Starting a thread
()
()
()
()
()
# join() Used to block the main thread, Wait for the child thread to finish executing
()
()
()
()
()
print(f"num: {num}")
print(f"main thread: {threading.current_thread().name}, end time: {('%F %T')}")
Lock-Lock
Using a mutex lock allows one thread to access data while denying access to other threads until it is unlocked.hit the nail on the head
Lock()
cap (a poem)Rlock()
Lock function can be provided.
import threading
import time
num = 0
mutex = ()
def task1(counter: int):
print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {('%F %T')}")
global num
()
for _ in range(100000):
num = num + counter
num = num - counter
()
print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {('%F %T')}")
if __name__ == "__main__":
print(f"main thread: {threading.current_thread().name}, start time: {('%F %T')}")
# Create three threads
t1 = (target=task1, args=(7,))
t2 = (target=task1, args=(5,))
t3 = (target=task1, args=(3,))
# Starting a thread
()
()
()
# join() Used to block the main thread, Wait for the child thread to finish executing
()
()
()
print(f"num: {num}")
print(f"main thread: {threading.current_thread().name}, end time: {('%F %T')}")
Semaphore - Signal Volume
Mutual exclusion locks allow only one thread to access shared data, while semaphores allow a certain number of threads to access shared data at the same time. For example, if a bank has 5 windows and allows 5 people to do business at the same time, the people behind them can only wait until the counter is free before they can enter.
import threading
import time
from random import randint
semaphore = (5)
def business(name: str):
()
print(f"{('%F %T')} {name} is handling")
(randint(3, 10))
print(f"{('%F %T')} {name} is done")
()
if __name__ == "__main__":
print(f"main thread: {threading.current_thread().name}, start time: {('%F %T')}")
threads = []
for i in range(10):
t = (target=business, args=(f"thread-{i}",))
(t)
for t in threads:
()
for t in threads:
()
print(f"main thread: {threading.current_thread().name}, end time: {('%F %T')}")
executable output
main thread: MainThread, start time: 2024-10-26 17:40:10
2024-10-26 17:40:10 thread-0 is handling
2024-10-26 17:40:10 thread-1 is handling
2024-10-26 17:40:10 thread-2 is handling
2024-10-26 17:40:10 thread-3 is handling
2024-10-26 17:40:10 thread-4 is handling
2024-10-26 17:40:15 thread-2 is done
2024-10-26 17:40:15 thread-5 is handling
2024-10-26 17:40:16 thread-0 is done
2024-10-26 17:40:16 thread-6 is handling
2024-10-26 17:40:19 thread-3 is done
2024-10-26 17:40:19 thread-4 is done
2024-10-26 17:40:19 thread-7 is handling
2024-10-26 17:40:19 thread-8 is handling
2024-10-26 17:40:20 thread-1 is done
2024-10-26 17:40:20 thread-9 is handling
2024-10-26 17:40:21 thread-6 is done
2024-10-26 17:40:23 thread-7 is done
2024-10-26 17:40:24 thread-5 is done
2024-10-26 17:40:24 thread-8 is done
2024-10-26 17:40:30 thread-9 is done
main thread: MainThread, end time: 2024-10-26 17:40:30
Condition-Condition Object
Condition
object enables a thread A to stop and wait for other threads, and thread A continues to run after the other threads are notified.
import threading
import time
import random
class Employee():
def __init__(self, username: str, cond: ):
= username
= cond
super().__init__()
def run(self):
with :
print(f"{('%F %T')} {} Arrival at the company")
() # Waiting for notification
print(f"{('%F %T')} {} start working")
((1, 5))
print(f"{('%F %T')} {} Completion of work")
class Boss():
def __init__(self, username: str, cond: ):
= username
= cond
super().__init__()
def run(self):
with :
print(f"{('%F %T')} {} notify")
.notify_all() # Notify all threads
(2)
if __name__ == "__main__":
cond = ()
boss = Boss("Lao Wang (1900-1984), * wuxia novelist", cond)
employees = []
for i in range(5):
(Employee(f"workers{i}", cond))
for employee in employees:
()
()
()
for employee in employees:
()
executable output
2024-10-26 21:16:20 Employee 0 arrived at work
2024-10-26 21:16:20 Employee 1 arrived at work
2024-10-26 21:16:20 Employee 2 arrives at work
2024-10-26 21:16:20 Employee 3 Arrived at Company
2024-10-26 21:16:20 Employee 4 arrived at work
2024-10-26 21:16:20 Lao Wang Sending out notices
2024-10-26 21:16:20 Employee 4 starts work
2024-10-26 21:16:23 Employee 4 Work completed
2024-10-26 21:16:23 Employee 1 starts work
2024-10-26 21:16:28 Employee 1 Work Completed
2024-10-26 21:16:28 Employee 2 Started work
2024-10-26 21:16:30 Employee 2 Work Completed
2024-10-26 21:16:30 Employee 0 Started work
2024-10-26 21:16:31 Employee 0 Work Completed
2024-10-26 21:16:31 Employee 3 Started Work
2024-10-26 21:16:32 Employee 3 Work Completed
Event - Event
In Python'sthreading
module.Event
is a thread synchronization primitive for simple communication between multiple threads.Event
object maintains an internal flag that can be used by threads using thewait()
method blocks until another thread calls theset()
method sets the flag toTrue
. Once the flag is set toTrue
, all waiting threads will be woken up and execution will continue.
Event
The main methodology of the Committee on Economic, Social and Cultural Rights
-
set()
: Set the internal flag of the event toTrue
, and wake up all waiting threads. -
clear()
: Set the internal flag of the event toFalse
。 -
is_set()
: Returns whether the internal flag of the event isTrue
。 -
wait(timeout=None)
: If the internal flag of the event isFalse
, then the current thread is blocked until the flag is set toTrue
or timeout (if specified).timeout
)。
import threading
import time
import random
class Employee():
def __init__(self, username: str, cond: ):
= username
= cond
super().__init__()
def run(self):
print(f"{('%F %T')} {} Arrival at the company")
() # The wait event flag isTrue
print(f"{('%F %T')} {} start working")
((1, 5))
print(f"{('%F %T')} {} Completion of work")
class Boss():
def __init__(self, username: str, cond: ):
= username
= cond
super().__init__()
def run(self):
print(f"{('%F %T')} {} notify")
()
if __name__ == "__main__":
cond = ()
boss = Boss("Lao Wang (1900-1984), * wuxia novelist", cond)
employees = []
for i in range(5):
(Employee(f"workers{i}", cond))
for employee in employees:
()
()
()
for employee in employees:
()
executable output
2024-10-26 21:22:28 Employee 0 arrived at work
2024-10-26 21:22:28 Employee 1 arrived at work
2024-10-26 21:22:28 Employee 2 arrived at work
2024-10-26 21:22:28 Employee 3 Arrived at Company
2024-10-26 21:22:28 Employee 4 arrived at work
2024-10-26 21:22:28 Lao Wang Sending out notices
2024-10-26 21:22:28 Employee 0 Starting work
2024-10-26 21:22:28 Employee 1 starts work
2024-10-26 21:22:28 Employee 3 started working
2024-10-26 21:22:28 Employee 4 Started Work 2024-10-26 21:22:28 Employee 4 Started Work
2024-10-26 21:22:28 Employee 2 Started Work 2024-10-26 21:22:28 Employee 2 Started Work
2024-10-26 21:22:30 Employee 3 Work Completed
2024-10-26 21:22:31 Employee 4 Work Completed
2024-10-26 21:22:31 Employee 2 Work Completed
2024-10-26 21:22:32 Employee 0 Work Completed
2024-10-26 21:22:32 Employee 1 work completed
Using Queues
Pythonqueue
module provides synchronized, thread-safe queue classes. The following example shows the production consumer model implemented using queue
import threading
import time
import random
import queue
class Producer():
"""Multithreaded Producer Class."""
def __init__(
self, tname: str, channel: , done:
):
= tname
= channel
= done
super().__init__()
def run(self) -> None:
"""Method representing the thread's activity."""
while True:
if .is_set():
print(
f"{('%F %T')} {} Stop Signal Event Received"
)
break
if ():
print(
f"{('%F %T')} {} report: The queue is full., All production ceased"
)
()
else:
num = (100, 1000)
(f"{}-{num}")
print(
f"{('%F %T')} {} Generate data {num}, queue size: {()}"
)
((1, 5))
class Consumer():
"""multithreaded consumer class (computing)."""
def __init__(
self, tname: str, channel: , done:
):
= tname
= channel
= done
= 0
super().__init__()
def run(self) -> None:
"""Method representing the thread's activity."""
while True:
if .is_set():
print(
f"{('%F %T')} {} Stop Signal Event Received"
)
break
if >= 3:
print(
f"{('%F %T')} {} report: Stopping consumption altogether"
)
()
continue
if ():
print(
f"{('%F %T')} {} report: The queue is empty., counter: {}"
)
+= 1
(1)
continue
else:
data = ()
print(
f"{('%F %T')} {} Consumption data {data}, queue size: {()}"
)
((1, 5))
= 0
if __name__ == "__main__":
done_p = ()
done_c = ()
channel = (30)
threads_producer = []
threads_consumer = []
for i in range(8):
threads_producer.append(Producer(f"producer-{i}", channel, done_p))
for i in range(6):
threads_consumer.append(Consumer(f"consumer-{i}", channel, done_c))
for t in threads_producer:
()
for t in threads_consumer:
()
for t in threads_producer:
()
for t in threads_consumer:
()
thread pool
In object-oriented programming, creating and destroying objects is time-consuming because creating an object involves accessing memory resources or other more resources. In a multithreaded program, it is inefficient to generate a new thread and then destroy it, and then create another one. Pooled multithreading, also known as thread pooling, was created for this purpose.
Add a task to the thread pool, the thread pool will automatically assign a free thread to execute the task, when the maximum number of threads is exceeded, the task will need to wait for a new free thread to be available before it will be executed.Python can generally use themultiprocessing
in the modulePool
to create the thread pool.
import time
from import Pool as ThreadPool
def foo(n).
(2)
if __name__ == "__main__".
start = ()
for n in range(5): foo(n)
foo(n)
print("single thread time: ", () - start)
start = ()
t_pool = ThreadPool(processes=5) # create a thread pool, specify the number of threads in the pool to be 5 (defaults to the number of CPUs)
rst = t_pool.map(foo, range(5)) # use map to apply to foo function for each element
t_pool.close() # block any new tasks from being submitted to the thread pool
t_pool.join() # wait for all submitted tasks to complete
print("thread pool time: ", () - start)
Thread Pool Executor
Python's built-in modulesoffers
ThreadPoolExecutor
class. This class combines the advantages of threads and queues and can be used to execute tasks in parallel.
import time
from random import randint
from import ThreadPoolExecutor
def foo() -> None:
(2)
return randint(1,100)
if __name__ == "__main__":
start = ()
futures = []
with ThreadPoolExecutor(max_workers=5) as executor:
for n in range(10):
((foo)) # Fan out
for future in futures: # Fan in
print(())
print("thread pool executor time: ", () - start)
executable output
44
19
86
48
35
74
59
99
58
53
thread pool executor time: 4.001955032348633
ThreadPoolExecutor
The great thing about classes is that if the caller passes thesubmit
method to give it a task to execute, then it will get aFuture
instance, and when the caller passes theresult
method to get the execution result when theThreadPoolExecutor
will automatically throw any exceptions it encounters during the execution of the task to the caller. While theThreadPoolExecutor
The disadvantage of the class is that the IO parallelism is not very good, even if you put themax_worker
Setting it to 100 will also not process the task efficiently. Higher demand IO tasks can consider switching to an asynchronous coprocessing scheme.
consultation
- Zheng Zheng "Python Automation O&M Quick Start" Tsinghua University Press
- Brett Slatkin Effective Python (2nd) Mechanical Industry Press