Location>code7788 >text

[python] Multithreading Quickstart

Popularity:411 ℃/2024-10-29 20:20:11

preamble

A thread is the smallest unit of computing scheduling that the operating system can perform, it is contained within the process and is the actual unit of operation within the process. Due to CPython's GIL limitations, multithreading is effectively single-threaded and is mostly used only for IO-intensive tasks.

Python generally uses the standard librarythreadingto perform multithreaded programming.

Basic use

  • Mode 1, CreateExamples of Classes
import threading
import time

def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {('%F %T')}")
    num = counter
    while num > 0:
        (3)
        num -= 1
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {('%F %T')}")

if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {('%F %T')}")
    # Create three threads
    t1 = (target=task1, args=(7,))
    t2 = (target=task1, args=(5,))
    t3 = (target=task1, args=(3,))

    # Starting a thread
    ()
    ()
    ()

    # join() Used to block the main thread, Wait for the child thread to finish executing
    ()
    ()
    ()
    print(f"main thread: {threading.current_thread().name}, end time: {('%F %T')}")

Sample Execution Output

main thread: MainThread, start time: 2024-10-26 12:42:37
thread: Thread-1 (task1), args: 7, start time: 2024-10-26 12:42:37
thread: Thread-2 (task1), args: 5, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, end time: 2024-10-26 12:42:46
thread: Thread-2 (task1), args: 5, end time: 2024-10-26 12:42:52
thread: Thread-1 (task1), args: 7, end time: 2024-10-26 12:42:58
main thread: MainThread, end time: 2024-10-26 12:42:58
  • Mode 2, Inheritanceclass, overriding therun()respond in singing__init__()methodologies
import threading
import time

class MyThread():
    def __init__(self, counter: int):
        super().__init__()
         = counter

    def run(self):
        print(f"thread: {threading.current_thread().name}, args: {}, start time: {('%F %T')}")
        num =
        while num > 0:
            (3)
            num -= 1
        print(f"thread: {threading.current_thread().name}, args: {}, end time: {('%F %T')}")

if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {('%F %T')}")
    # Create three threads
    t1 = MyThread(7)
    t2 = MyThread(5)
    t3 = MyThread(3)

    # Starting a thread
    ()
    ()
    ()

    # join() Used to block the main thread, Wait for the child thread to finish executing
    ()
    ()
    ()
    print(f"main thread: {threading.current_thread().name}, end time: {('%F %T')}")

predecessorClasses can also be written to call external functions like this.

import threading
import time

def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {('%F %T')}")
    num = counter
    while num > 0:
        (3)
        num -= 1
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {('%F %T')}")

class MyThread():
    def __init__(self, target, args: tuple):
        super().__init__()
         = target
         = args
    
    def run(self):
        (*)

if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {('%F %T')}")
    # Create three threads
    t1 = MyThread(target=task1, args=(7,))
    t2 = MyThread(target=task1, args=(5,))
    t3 = MyThread(target=task1, args=(3,))

    # Starting a thread
    ()
    ()
    ()

    # join() Used to block the main thread, Wait for the child thread to finish executing
    ()
    ()
    ()
    print(f"main thread: {threading.current_thread().name}, end time: {('%F %T')}")

multithreaded synchronization

If multiple threads work together to modify some data, there may be unforeseen consequences, which requires some synchronization mechanism. For example, the following code, the result is random (personal computer with python3.13 real results are 0, while the lower version of python3.6 run results are indeed random)

import threading
import time

num = 0

def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {('%F %T')}")
    global num
    for _ in range(100000000):
        num = num + counter
        num = num - counter
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {('%F %T')}")

if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {('%F %T')}")
    # Create three threads
    t1 = (target=task1, args=(7,))
    t2 = (target=task1, args=(5,))
    t3 = (target=task1, args=(3,))
    t4 = (target=task1, args=(6,))
    t5 = (target=task1, args=(8,))

    # Starting a thread
    ()
    ()
    ()
    ()
    ()

    # join() Used to block the main thread, Wait for the child thread to finish executing
    ()
    ()
    ()
    ()
    ()
    
    print(f"num: {num}")
    print(f"main thread: {threading.current_thread().name}, end time: {('%F %T')}")

Lock-Lock

Using a mutex lock allows one thread to access data while denying access to other threads until it is unlocked.hit the nail on the headLock()cap (a poem)Rlock()Lock function can be provided.

import threading
import time

num = 0

mutex = ()

def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {('%F %T')}")
    global num
    ()
    for _ in range(100000):
        num = num + counter
        num = num - counter
    ()
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {('%F %T')}")

if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {('%F %T')}")
    # Create three threads
    t1 = (target=task1, args=(7,))
    t2 = (target=task1, args=(5,))
    t3 = (target=task1, args=(3,))

    # Starting a thread
    ()
    ()
    ()

    # join() Used to block the main thread, Wait for the child thread to finish executing
    ()
    ()
    ()
    
    print(f"num: {num}")
    print(f"main thread: {threading.current_thread().name}, end time: {('%F %T')}")

Semaphore - Signal Volume

Mutual exclusion locks allow only one thread to access shared data, while semaphores allow a certain number of threads to access shared data at the same time. For example, if a bank has 5 windows and allows 5 people to do business at the same time, the people behind them can only wait until the counter is free before they can enter.

import threading
import time
from random import randint

semaphore = (5)

def business(name: str):
    ()
    print(f"{('%F %T')} {name} is handling")
    (randint(3, 10))
    print(f"{('%F %T')} {name} is done")
    ()

if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {('%F %T')}")
    threads = []
    for i in range(10):
        t = (target=business, args=(f"thread-{i}",))
        (t)

    for t in threads:
        ()

    for t in threads:
        ()
    
    print(f"main thread: {threading.current_thread().name}, end time: {('%F %T')}")

executable output

main thread: MainThread, start time: 2024-10-26 17:40:10
2024-10-26 17:40:10 thread-0 is handling
2024-10-26 17:40:10 thread-1 is handling
2024-10-26 17:40:10 thread-2 is handling
2024-10-26 17:40:10 thread-3 is handling
2024-10-26 17:40:10 thread-4 is handling
2024-10-26 17:40:15 thread-2 is done
2024-10-26 17:40:15 thread-5 is handling
2024-10-26 17:40:16 thread-0 is done
2024-10-26 17:40:16 thread-6 is handling
2024-10-26 17:40:19 thread-3 is done
2024-10-26 17:40:19 thread-4 is done
2024-10-26 17:40:19 thread-7 is handling
2024-10-26 17:40:19 thread-8 is handling
2024-10-26 17:40:20 thread-1 is done
2024-10-26 17:40:20 thread-9 is handling
2024-10-26 17:40:21 thread-6 is done
2024-10-26 17:40:23 thread-7 is done
2024-10-26 17:40:24 thread-5 is done
2024-10-26 17:40:24 thread-8 is done
2024-10-26 17:40:30 thread-9 is done
main thread: MainThread, end time: 2024-10-26 17:40:30

Condition-Condition Object

Conditionobject enables a thread A to stop and wait for other threads, and thread A continues to run after the other threads are notified.

import threading
import time
import random

class Employee():
    def __init__(self, username: str, cond: ):
         = username
         = cond
        super().__init__()

    def run(self):
        with :
            print(f"{('%F %T')} {} Arrival at the company")
            () # Waiting for notification
            print(f"{('%F %T')} {} start working")
            ((1, 5))
            print(f"{('%F %T')} {} Completion of work")

class Boss():
    def __init__(self, username: str, cond: ):
         = username
         = cond
        super().__init__()

    def run(self):
        with :
            print(f"{('%F %T')} {} notify")
            .notify_all() # Notify all threads
        (2)

if __name__ == "__main__":
    cond = ()
    boss = Boss("Lao Wang (1900-1984), * wuxia novelist", cond)
    
    employees = []
    for i in range(5):
        (Employee(f"workers{i}", cond))

    for employee in employees:
        ()
    ()

    ()
    for employee in employees:
        ()

executable output

2024-10-26 21:16:20 Employee 0 arrived at work
2024-10-26 21:16:20 Employee 1 arrived at work
2024-10-26 21:16:20 Employee 2 arrives at work
2024-10-26 21:16:20 Employee 3 Arrived at Company
2024-10-26 21:16:20 Employee 4 arrived at work
2024-10-26 21:16:20 Lao Wang Sending out notices
2024-10-26 21:16:20 Employee 4 starts work
2024-10-26 21:16:23 Employee 4 Work completed
2024-10-26 21:16:23 Employee 1 starts work
2024-10-26 21:16:28 Employee 1 Work Completed
2024-10-26 21:16:28 Employee 2 Started work
2024-10-26 21:16:30 Employee 2 Work Completed
2024-10-26 21:16:30 Employee 0 Started work
2024-10-26 21:16:31 Employee 0 Work Completed
2024-10-26 21:16:31 Employee 3 Started Work
2024-10-26 21:16:32 Employee 3 Work Completed

Event - Event

In Python'sthreading module.Event is a thread synchronization primitive for simple communication between multiple threads.Event object maintains an internal flag that can be used by threads using thewait() method blocks until another thread calls theset() method sets the flag toTrue. Once the flag is set toTrue, all waiting threads will be woken up and execution will continue.

Event The main methodology of the Committee on Economic, Social and Cultural Rights

  1. set(): Set the internal flag of the event toTrue, and wake up all waiting threads.
  2. clear(): Set the internal flag of the event toFalse
  3. is_set(): Returns whether the internal flag of the event isTrue
  4. wait(timeout=None): If the internal flag of the event isFalse, then the current thread is blocked until the flag is set toTrue or timeout (if specified).timeout)。
import threading
import time
import random

class Employee():
    def __init__(self, username: str, cond: ):
         = username
         = cond
        super().__init__()

    def run(self):
        print(f"{('%F %T')} {} Arrival at the company")
        () # The wait event flag isTrue
        print(f"{('%F %T')} {} start working")
        ((1, 5))
        print(f"{('%F %T')} {} Completion of work")

class Boss():
    def __init__(self, username: str, cond: ):
         = username
         = cond
        super().__init__()

    def run(self):
        print(f"{('%F %T')} {} notify")
        ()

if __name__ == "__main__":
    cond = ()
    boss = Boss("Lao Wang (1900-1984), * wuxia novelist", cond)
    
    employees = []
    for i in range(5):
        (Employee(f"workers{i}", cond))

    for employee in employees:
        ()
    ()

    ()
    for employee in employees:
        ()

executable output

2024-10-26 21:22:28 Employee 0 arrived at work
2024-10-26 21:22:28 Employee 1 arrived at work
2024-10-26 21:22:28 Employee 2 arrived at work
2024-10-26 21:22:28 Employee 3 Arrived at Company
2024-10-26 21:22:28 Employee 4 arrived at work
2024-10-26 21:22:28 Lao Wang Sending out notices
2024-10-26 21:22:28 Employee 0 Starting work
2024-10-26 21:22:28 Employee 1 starts work
2024-10-26 21:22:28 Employee 3 started working
2024-10-26 21:22:28 Employee 4 Started Work 2024-10-26 21:22:28 Employee 4 Started Work
2024-10-26 21:22:28 Employee 2 Started Work 2024-10-26 21:22:28 Employee 2 Started Work
2024-10-26 21:22:30 Employee 3 Work Completed
2024-10-26 21:22:31 Employee 4 Work Completed
2024-10-26 21:22:31 Employee 2 Work Completed
2024-10-26 21:22:32 Employee 0 Work Completed
2024-10-26 21:22:32 Employee 1 work completed

Using Queues

Pythonqueuemodule provides synchronized, thread-safe queue classes. The following example shows the production consumer model implemented using queue

import threading
import time
import random
import queue


class Producer():
    """Multithreaded Producer Class."""

    def __init__(
        self, tname: str, channel: , done:
    ):
         = tname
         = channel
         = done
        super().__init__()

    def run(self) -> None:
        """Method representing the thread's activity."""

        while True:
            if .is_set():
                print(
                    f"{('%F %T')} {} Stop Signal Event Received"
                )
                break
            if ():
                print(
                    f"{('%F %T')} {} report: The queue is full., All production ceased"
                )
                ()
            else:
                num = (100, 1000)
                (f"{}-{num}")
                print(
                    f"{('%F %T')} {} Generate data {num}, queue size: {()}"
                )
                ((1, 5))


class Consumer():
    """multithreaded consumer class (computing)."""

    def __init__(
        self, tname: str, channel: , done:
    ):
         = tname
         = channel
         = done
         = 0
        super().__init__()

    def run(self) -> None:
        """Method representing the thread's activity."""
        while True:
            if .is_set():
                print(
                    f"{('%F %T')} {} Stop Signal Event Received"
                )
                break
            if >= 3:
                print(
                    f"{('%F %T')} {} report: Stopping consumption altogether"
                )
                ()
                continue
            if ():
                print(
                    f"{('%F %T')} {} report: The queue is empty., counter: {}"
                )
                 += 1
                (1)
                continue
            else:
                data = ()
                print(
                    f"{('%F %T')} {} Consumption data {data}, queue size: {()}"
                )
                ((1, 5))
                 = 0


if __name__ == "__main__":
    done_p = ()
    done_c = ()
    channel = (30)
    threads_producer = []
    threads_consumer = []

    for i in range(8):
        threads_producer.append(Producer(f"producer-{i}", channel, done_p))

    for i in range(6):
        threads_consumer.append(Consumer(f"consumer-{i}", channel, done_c))

    for t in threads_producer:
        ()

    for t in threads_consumer:
        ()

    for t in threads_producer:
        ()

    for t in threads_consumer:
        ()

thread pool

In object-oriented programming, creating and destroying objects is time-consuming because creating an object involves accessing memory resources or other more resources. In a multithreaded program, it is inefficient to generate a new thread and then destroy it, and then create another one. Pooled multithreading, also known as thread pooling, was created for this purpose.

Add a task to the thread pool, the thread pool will automatically assign a free thread to execute the task, when the maximum number of threads is exceeded, the task will need to wait for a new free thread to be available before it will be executed.Python can generally use themultiprocessingin the modulePoolto create the thread pool.

import time

from import Pool as ThreadPool

def foo(n).
    (2)


if __name__ == "__main__".
    start = ()
    for n in range(5): foo(n)
        foo(n)
    print("single thread time: ", () - start)

    start = ()
    t_pool = ThreadPool(processes=5) # create a thread pool, specify the number of threads in the pool to be 5 (defaults to the number of CPUs)
    rst = t_pool.map(foo, range(5)) # use map to apply to foo function for each element
    t_pool.close() # block any new tasks from being submitted to the thread pool
    t_pool.join() # wait for all submitted tasks to complete
    print("thread pool time: ", () - start)

Thread Pool Executor

Python's built-in modulesoffersThreadPoolExecutorclass. This class combines the advantages of threads and queues and can be used to execute tasks in parallel.

import time
from random import randint
from  import ThreadPoolExecutor

def foo() -> None:
    (2)
    return randint(1,100)

if __name__ == "__main__":
    start = ()
    futures = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        for n in range(10):
            ((foo))  # Fan out
            
    for future in futures:  # Fan in
        print(())
    print("thread pool executor time: ", () - start)

executable output

44
19
86
48
35
74
59
99
58
53
thread pool executor time:  4.001955032348633

ThreadPoolExecutorThe great thing about classes is that if the caller passes thesubmitmethod to give it a task to execute, then it will get aFutureinstance, and when the caller passes theresultmethod to get the execution result when theThreadPoolExecutorwill automatically throw any exceptions it encounters during the execution of the task to the caller. While theThreadPoolExecutorThe disadvantage of the class is that the IO parallelism is not very good, even if you put themax_workerSetting it to 100 will also not process the task efficiently. Higher demand IO tasks can consider switching to an asynchronous coprocessing scheme.

consultation

  • Zheng Zheng "Python Automation O&M Quick Start" Tsinghua University Press
  • Brett Slatkin Effective Python (2nd) Mechanical Industry Press