Joblib is an open source Python library for efficient parallel computing that provides easy-to-use tools for memory mapping and parallel computation to distribute tasks to multiple worker processes.The Joblib library is particularly well suited for tasks that require repetitive computation or large-scale data processing.The official repository for the Joblib library can be found at:joblibFor official documentation, see:joblib-doc。
The Jolib library installation code is as follows:
pip install joblib
# View version
import joblib
joblib.__version__
'1.4.2'
-
1 Instructions for use
- 1.1 Memory class
- 1.2 Parallel class
- 1.3 Serialization
-
2 Examples
- 2.1 Joblib caching and parallelism
- 2.2 Serialization
- 2.3 Memory monitoring
- 3 Reference
1 Instructions for use
The main functionality of the Joblib library covers the following three main blocks:
- Memory mode: the Memory class caches the return value of a function to disk. The next time it is called, if the input parameters are unchanged, the result is loaded directly from the cache, avoiding repeated calculations.
- Parallel computing : Parallel class will be split into multiple processes or threads in parallel execution of the task , to speed up the calculation process .
- Efficient serialization: optimized for large data objects such as NumPy arrays and fast serialization and deserialization.
1.1 Memory class
The Memory class of the Joblib library supports storing the results of a function's computation through memory mode so that it can be called directly at the next use. The advantages of this mechanism include speeding up the computation process, saving resources, and simplifying management.
The Memory class constructor is as follows:
class (location=None, backend='local', mmap_mode=None, compress=False, verbose=1, bytes_limit=None, backend_options=None)
The parameters are described below:
- location: The location of the cached file. If set to None, no caching will be done.
- backend: the backend storage method of the cache. The default is "local", which means use local file system.
- mmap_mode: a string indicating the mode of the memory-mapped file (None, 'r+', 'r', 'w+', 'c').
- compress: Indicates whether to compress the cached file. Compression saves disk space, but increases the time of I/O operations.
- verbose: an integer indicating the level of detail of the log. 0 means no output, 1 means only warnings, 2 means information, 3 means debugging information.
- bytes_limit: An integer or None indicating the byte limit used by the cache. If the cache exceeds this limit, the oldest cached file will be deleted.
- backend_options: Options passed to the backend of the cache.
Simple use of the Memory class
The following code shows the first call to the function and caches the result:
from joblib import Memory
import os, shutil
# Create a Memory object, specifying the cache directory as the run folder in the current directory.
# verbose=0 to turn off detailed output
cachedir = '. /run'
if (cachedir).
(cachedir).
memory = Memory(cachedir, verbose=0)
# Cache the result of the function f using the @decorator
@
def f(x).
# Execute the code in the body of the function only if its input parameter x is not cached
print('Running f(%s)' % x)
return x
# The first call to f(1) executes the code within the function and caches the result
print(f(1))
Running f(1)
1
Second call to the function:
# The second time f(1) is called, since the result is already cached, the code within the function is not executed again, but the result is read directly from the cache
print(f(1))
1
Calls other functions:
# Call f(2), which, because of the different input parameters, executes the code within the function again and caches the result
print(f(2))
Running f(2)
2
Applying Memory classes to numpy arrays
import numpy as np
from joblib import Memory
import os, shutil
cachedir = '. /run'
if (cachedir).
(cachedir).
memory = Memory(cachedir, verbose=0)
@
def g(x).
print('A long-running calculation, with parameter %s' % x)
# Return the Hamming window
return (x)
@
def h(x).
print('A second long-running calculation, using g(x)')
# Generate the Vandermonde matrix
return (x)
# Call function g, passing in parameter 3 and storing the result in variable a
a = g(3)
# Print the value of variable a
print(a)
# Call the function g again, passing in the same argument 3 and not recalculating since the result is cached
print(g(3))
A long-running calculation, with parameter 3
[0.08 1. 0.08]
[0.08 1. 0.08]
Direct computation and cached results are equivalent:
# Call the function h, passing in the variable a as an argument and storing the result in the variable b
b = h(a)
# Call function h again, passing in the same argument a. Since the result is cached, it is not recalculated
b2 = h(a)
# Use numpy's allclose function to check if b and b2 are close enough, i.e., if they are equal
print((b, b2))
A second long-running calculation, using g(x)
True
Calling cached results directly
import numpy as np
from joblib import Memory
import os, shutil
# Set the path to the cache directory.
cachedir = '. /run'
# Check if the cache directory exists.
if (cachedir).
# If the cached directory exists, use to delete the directory and its contents.
(cachedir)
# Initialize the Memory object, set the cache directory to cachedir as defined above, and set mmap_mode to 'r' for read-only mode.
memory = Memory(cachedir, mmap_mode='r', verbose=0)
# Cache the result of the function using the decorator.
square = ()
a = ((3)).astype(float)
# Print the matrix a as processed by the square function.
print(square(a))
# Get the cached result of a
result = square.call_and_shelve(a)
print(()) # Get and print the cached result.
[[ 0. 0. 1.]
[ 1. 1. 1.]
[16. 4. 1.]]
[[ 0. 0. 1.]
[ 1. 1. 1.]
[16. 4. 1.]]
Using Cache in Classes
Memory class is not recommended to be used directly in class methods. If you want to use caching in a class, the recommended pattern is to use a separately defined caching function in the class, as shown below:
@
def compute_func(arg1, arg2, arg3):
pass
class Foo(object):
def __init__(self, args):
= None
def compute(self):
# Functions that call the cache in the class
= compute_func(self.arg1, self.arg2, 40)
1.2 Parallel class
The Parallel class of the Joblib library is used to simply and quickly decompose a task into multiple subtasks and assign them to different CPU cores or machines for execution, thus significantly improving the efficiency of a program's operation.
The Parallel class constructor and main parameters are as follows:
class (n_jobs=default(None), backend=default(None), return_as='list', verbose=default(0), timeout=None, batch_size='auto', pre_dispatch='2 * n_jobs', temp_folder=default(None), max_nbytes=default('1M'), require=default(None))
The parameters are described below:
- n_jobs: Specify the number of parallel jobs, -1 means use all available CPU cores; None means use a single process.
- backend: Specify the backend for parallelization, optional:
- 'loky': the use of thelokylibrary to implement multi-processing, which is developed by the joblib developers, the default option.
- 'threading': uses the threading library to implement multithreading.
- 'multiprocessing': implement multiprocessing using the multiprocessing library.
- return_as: return result format, optional:
- 'list: list.
- generator: a generator that produces results in the order in which tasks are submitted.
- generator_unordered: generator in order of execution result completion.
- verbose: an integer indicating the level of detail of the log. 0 means no output, 1 means only warnings, 2 means information, 3 means debugging information.
- timeout: the maximum running time of a single task, timeout will trigger TimeOutError. only applicable to the case where n_jobs is not 1.
- batch_size: when the Parallel class executes a task, it processes the task in batches. the batch_size parameter determines the number of tasks included in each batch.
- pre_dispatch: Used to determine how many tasks in each batch will be pre-prepared and waiting to be assigned to a single worker process before parallel computing begins. The default value is "2*n_jobs", which means that two times the number of jobs can be used for parallel computation.
- temp_folder: Specifies the storage path of the temporary file.
- max_nbytes: threshold for the size of the array passed to the worker program.
- require: requirement for running the task, optionally None and sharedmem. sharedmem indicates that shared memory will be used to perform parallel tasks, but will affect computational performance.
simple example
The following code shows the results of running a computationally intensive task directly from a single thread:
from joblib import Parallel, delayed
import numpy as np
import time
start = ()
# Define a computationally intensive function
def compute_heavy_task(data).
# Simulate processing time
(1)
# Numerical computation
result = ((data))
return result
# Generate some simulated data
# Set the seed for the random number generator
(42)
data = (10, 1000) # 10 vectors of 1000 dimensions
results = [compute_heavy_task(d) for d in data]
# Print the sum of the results
print(f "Results: {sum(results)}")
print(f "elapsed time: {()-start}s")
Result: 3269.16485027708
Time taken: 10.101513624191284s
The following code demonstrates the results of creating a multiprocess to run a computationally intensive task using the Parallel class:
from joblib import Parallel, delayed
import numpy as np
import time
start = ()
# Define a computationally intensive function
def compute_heavy_task(data).
# Simulate processing time
(1)
# Numerical computation
result = ((data))
return result
# Set the seed for the random number generator
(42)
# Generate some simulated data
data = (10, 1000) # 10 vectors of 1000 dimensions
# Use Parallel to execute tasks in parallel
results = Parallel(n_jobs=8, return_as="generator")(delayed(compute_heavy_task)(d) for d in data)
# Print the sum of the results
print(f "Results: {sum(results)}")
print(f "Elapsed time: {()-start}s")
Result: 3269.16485027708
Time taken: 2.381772041320801s
It can be seen that the joblib library utilizes multiprocessing techniques to significantly improve the efficiency of task execution. However, when faced with I/O intensive tasks or tasks with very short execution times, the advantages of multithreading or multiprocessing may not be obvious. This is because the overhead of thread creation and context switching can sometimes exceed the execution time of the task itself. In the case of the compute_heavy_task function described above, for example, the time required for multiprocess execution would increase significantly if the functions in it were removed.
In addition get the number of cpu cores (logical processors) of the current system code as follows:
import joblib
# Get the number of cpu cores in the current system
n_cores = joblib.cpu_count()
print(f'The system's core count is: {n_cores}')
The number of cores in the system is: 16
Comparison of different parallelism methods
The following code shows the application of different parallelism methods in the Parallel class. By default, loky multiprocessing is used:
# utilizationlokymultiprocess
from joblib import Parallel, delayed
import numpy as np
import time
start = ()
# Define a computationally intensive function
def compute_heavy_task(data):
# Analog processing time
(1)
# numerical calculation
result = ((data))
return result
# Generate some simulation data
data = (10, 1000) # 10classifier for individual things or people, general, catch-all classifier1000dimensional vector
results = Parallel(n_jobs=8, return_as="generator", backend='loky')(delayed(compute_heavy_task)(d) for d in data)
# Print the sum of the results
print(f"in the end: {sum(results)}")
print(f"take a period of (x amount of time):{()-start}s")
Result: 3382.3336437893217
Time taken: 2.042675256729126s
The following code demonstrates the use of threading multithreading, note that due to Python's Global Interpreter Lock (GIL) ensures that only one thread executes Python bytecode at any given moment. This suggests that even on multicore processors, Python's threads do not allow for truly parallel computation. However, multithreading still has advantages when it comes to handling I/O-intensive tasks or small-scale tasks that require fast responses:
# utilizationthreadingmulti-threaded
start = ()
results = Parallel(n_jobs=8, return_as="generator", backend = 'threading')(delayed(compute_heavy_task)(d) for d in data)
# Print the sum of the results
print(f"in the end: {sum(results)}")
print(f"take a period of (x amount of time):{()-start}s")
Result: 3382.3336437893217
Elapsed time: 2.040527105331421s
The following code demonstrates the use of multiprocessing multiprocessing, note that under Windows you need to put the multiprocessing related code in the main function:
from joblib import Parallel, delayed
import numpy as np
import time
# Define a computationally intensive function
def compute_heavy_task(data):
# Analog processing time
(1)
# numerical calculation
result = ((data))
return result
def main():
start = ()
# Generate some simulation data
data = (10, 1000) # 10classifier for individual things or people, general, catch-all classifier1000dimensional vector
# multiprocessingReturns are not supportedrgenerator
results = Parallel(n_jobs=8, return_as="list", backend='multiprocessing')(delayed(compute_heavy_task)(d) for d in data)
# Print the sum of the results
print(f"in the end: {sum(results)}")
print(f"take a period of (x amount of time):{()-start}s")
if __name__ == '__main__':
main()
Result: 3304.6651996375645
Time taken: 2.4303956031799316s
followingloky
、threading
cap (a poem)multiprocessing
A comparison of some of the key features of the
Features/Libraries | loky |
threading |
multiprocessing |
---|---|---|---|
Applicable platforms | cross-platform | cross-platform | Cross-platform, but limited on Windows |
Process/thread model | step | threading | step |
GIL impact | not have | there are | not have |
Applicable Scenarios | CPU-intensive tasks | I/O intensive tasks | CPU-intensive tasks |
startup overhead | comparatively small | comparatively small | comparatively large |
Memory Usage | high | relatively low | high |
interprocess communication | Through pipelines, queues, etc. | Through shared data structures | Through pipelines, queues, etc. |
Inter-Thread Communication | shared data structure | shared data structure | inapplicable |
Exception handling | Inter-process independence | Shared between threads | Inter-process independence |
Debugging Difficulty | high | relatively low | high |
Framework of application | common (use) | common (use) | common (use) |
A simple comparison of threads and processes in Python is as follows:
- Resource sharing: Threads share the memory and resources of the same process, while processes have separate memory spaces.
- GIL impact: threads are subject to GIL, processes are not.
- Overhead: Threads are created and switched with low overhead, processes are created and switched with high overhead.
- Applicability: Threads are suitable for I/O intensive tasks, processes are suitable for CPU intensive tasks.
- Communication: inter-thread communication is simple but needs to deal with synchronization issues, inter-process communication is complex but naturally isolated.
In practice, the choice of whether to use threads or processes depends on the characteristics and performance needs of the task. If the task is primarily I/O-intensive, using threads can improve performance; if the task is CPU-intensive, using processes can better utilize the computational power of multi-core processors.
shared memory
By default, the Parallel class executes tasks without the individual tasks sharing memory, as shown below:
from joblib import Parallel, delayed
shared_set = set()
def collect(x):
shared_set.add(x)
Parallel(n_jobs=2)(delayed(collect)(i) for i in range(5))
print(sorted(shared_set))
[]
Memory sharing can be achieved by setting require='sharedmem':
# require='sharedmem' indicates that shared memory is required to ensure that multiple processes can access the shared_set collection
Parallel(n_jobs=2, require='sharedmem')(delayed(collection)(i) for i in range(5))
print(sorted(shared_set))
[0, 1, 2, 3, 4]
context manager (computing)
Some algorithms require multiple successive calls to a parallel function, but multiple calls in a loop are suboptimal because this creates and destroys a set of worker processes multiple times, resulting in significant performance overhead.
For this case, it is more efficient to use the class's context manager API, which allows you to reuse the same set of worker processes for multiple calls to the object. This is shown below:
from joblib import Parallel, delayed
import math
with Parallel(n_jobs=2) as parallel:
accumulator = 0.
n_iter = 0
while accumulator < 1000:
results = parallel(delayed()(accumulator + i ** 2) for i in range(5))
accumulator += sum(results)
n_iter += 1
print(accumulator, n_iter)
1136.5969161564717 14
parallel_config
Joblib provides the parallel_config class for configuring parameters for parallel execution, such as parallel backend type, batch size, etc. This configuration can affect all subsequent instances of parallel. It is usually used before calling the Parallel class. For more information on the use of parallel_config see:parallel_config。
1.3 Serialization
() and () provide an alternative to the pickle library for efficient serialized processing of arbitrary Python objects containing large amounts of data, especially large NumPy arrays. For pickle library usage see:Python data serialization module pickle using notes. For a comparison of the two effects see:
specificities | pickle | joblib |
---|---|---|
performances | general | Optimized for large data types such as NumPy arrays, usually faster |
parallel processing | unsupported | Built-in parallel processing to speed up tasks |
memory map | unsupported | Supports memory mapping for efficient handling of large files |
compressed | be in favor of | Supports compression, which can reduce storage space |
additional functionality | stop (doing sth) | Provides some additional features such as caching, delayed loading, etc. |
The following code demonstrates the basic use of the
from tempfile import mkdtemp
# Use mkdtemp to create a temporary directory and store the directory path in the variable savedir.
savedir = mkdtemp(dir='. /')
import os
# Save the path to the file
filename = (savedir, '')
import numpy as np
import pandas as pd
import joblib
# Create a dictionary to be persisted
to_persist = [('a', [1, 2, 3]), ('b', (10)), ('c', ((5,5))))])
# Use the function to serialize the to_persist dictionary and save it to the file specified by filename
# Note that the pickle library cannot serialize numpy data
(to_persist, filename)
['./tmp82ms1z5w\\']
Use the function to load previously saved serialized data from the specified file:
(filename)
[('a', [1, 2, 3]),
('b', array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])),
('c',
0 1 2 3 4
0 1.0 1.0 1.0 1.0 1.0
1 1.0 1.0 1.0 1.0 1.0
2 1.0 1.0 1.0 1.0 1.0
3 1.0 1.0 1.0 1.0 1.0
4 1.0 1.0 1.0 1.0 1.0)]
and functions also accept file objects:
with open(filename, 'wb') as fo:
# utilizationjoblibconnect an objectto_persistSerialize and write to file
(to_persist, fo)
with open(filename, 'rb') as fo:
(fo)
Setting the compress parameter for data compression is also supported:
# compress parameter for the compression level, take the value of 0 to 9, the larger the value of the compression effect is better. The larger the value, the better the compression effect. 0 means no compression, the default value is 0.
(to_persist, filename, compress=1)
['./tmp82ms1z5w\\']
By default, the zlib compression method is used as it provides the best balance between speed and disk space. Other supported compression methods include "gzip", "bz2", "lzma" and "xz". "The compress parameter is entered with the compression method and compression level to select different compression methods:
(to_persist, filename + '.gz', compress=('gzip', 3))
(filename + '.gz')
(to_persist, filename + '.bz2', compress=('bz2', 5))
(filename + '.bz2')
[('a', [1, 2, 3]),
('b', array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])),
('c',
0 1 2 3 4
0 1.0 1.0 1.0 1.0 1.0
1 1.0 1.0 1.0 1.0 1.0
2 1.0 1.0 1.0 1.0 1.0
3 1.0 1.0 1.0 1.0 1.0
4 1.0 1.0 1.0 1.0 1.0)]
In addition to the default compression method, the lz4 compression algorithm can also be used for data compression. The prerequisite is that the lz4 compression library needs to be installed:
pip install lz4
Of these compression methods, lz4 and the default method work better. lz4 is used in the same way as the other compression methods:
(to_persist, filename, compress=('lz4', 3))
['./tmp82ms1z5w\\']
2 Examples
2.1 Joblib caching and parallelism
This example demonstrates the use of joblib caching and parallelism to speed up task execution. The following code demonstrates a highly time-consuming task:
# Import the time module to implement the time delay function.
import time
# Define a function to simulate a time-consuming computation
def costly_compute(data, column).
# Sleep for 1 second to simulate a time-consuming operation
(1)
# Return the specified column of incoming data
return data[column]
# Define a function that calculates the mean of a column of data
def data_processing_mean(data, column).
# Call the costly_compute function to get the data for the specified column.
column_data = costly_compute(data, column)
# Calculate and return the mean of the column data
return column_data.mean()
# Import the numpy library and set the seed for the random number generator to ensure reproducible results
import numpy as np
rng = (42)
# Generate a random data matrix of 1000 rows and 4 columns
data = (int(1000), 4)
# Record the start time
start = ()
# Calculate the mean for each column of data and store the results in the results list
results = [data_processing_mean(data, col) for col in range([1])]
# Record the end time
stop = ()
# Print a description of the processing
print('\nSequential processing')
# Print the elapsed time for the entire processing
print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))
Sequential processing
Elapsed time for the entire processing: 4.05 s
The following code demonstrates how to use the joblib library to cache and parallelize the computation of the above tasks:
# Import the time module for simulating time-consuming operations.
import time
# Define a function that uses cache to compute the mean of the data.
def data_processing_mean_using_cache(data, column).
return costly_compute_cached(data, column).mean()
# Import the Memory class from the joblib library to cache the output of a function.
from joblib import Memory
# Set the storage location and level of detail of the cache
location = '. /cachedir'
memory = Memory(location, verbose=0)
# Use the cache method of the Memory object to cache the output of the costly_compute function.
costly_compute_cached = (costly_compute)
# Import the Parallel and delayed classes from the joblib library for executing functions in parallel.
from joblib import Parallel, delayed
# Record the start time.
start = ()
# Execute the data_processing_mean_using_cache function in parallel using the Parallel class for each column of data.
results = Parallel(n_jobs=2) (
delayed(data_processing_mean_using_cache)(data, col)
for col in range([1]))
# Record the end time.
stop = ()
# Print information about how long the first round of processing took, including caching the data.
print('\nFirst round - caching the data')
print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))
First round - caching the data
Elapsed time for the entire processing: 2.05 s
Executing the same procedure again, you can see that the results are cached instead of re-executing the function:
start = ()
results = Parallel(n_jobs=2)(
delayed(data_processing_mean_using_cache)(data, col)
for col in range([1]))
stop = ()
print('\nSecond round - reloading from the cache')
print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))
# If you don't want to use cached results,Cache information can be cleared
(warn=False)
Second round - reloading from the cache
Elapsed time for the entire processing: 0.02 s
2.2 Serialization
The following example shows the use of serialized memory mapping () in. Memory mapping allows large data sets to be split into smaller chunks and loaded into memory when needed. This approach reduces memory usage and increases processing speed.
Define time-consuming functions:
import numpy as np
data = ((int(1e7),))
window_size = int(5e5)
slices = [slice(start, start + window_size)
for start in range(0, - window_size, int(1e5))]
import time
def slow_mean(data, sl):
(0.01)
return data[sl].mean()
The following code is the result of running the directly called function:
tic = ()
results = [slow_mean(data, sl) for sl in slices]
toc = ()
print('\nElapsed time computing the average of couple of slices {:.2f} s'.format(toc - tic))
Elapsed time computing the average of couple of slices 1.49 s
The following code is the result of calling the Parallel class for 2 processes to run, since the overall task computation takes less time. So the Parallel class parallel computation does not have much speed advantage over calling the function directly, as the process startup destruction takes extra time:
from joblib import Parallel, delayed
tic = ()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data, sl) for sl in slices)
toc = ()
print('\nElapsed time computing the average of couple of slices {:.2f} s'.format(toc - tic))
Elapsed time computing the average of couple of slices 1.00 s
The following code provides and load function to accelerate data reading. Where the dump function is used to serialize and save the data object to a file on disk and create a memory map that allows the file to be accessed like a memory array. When the program loads this file again, it can be opened in memory-mapped mode using the load function:
import os
from joblib import dump, load # Import dump and load functions from the joblib library to create and load memory-mapped files.
# Set the path to the folder for the memory mapped file
folder = '. /memmap'
(folder, exist_ok = True)
# Combine the name of the memory map file with the path
data_filename_memmap = (folder, 'data_memmap.joblib')
# Save the data object 'data' to the memory map file using the dump function
dump(data, data_filename_memmap)
# Use load function to load the memory map file, mmap_mode='r' means open in read-only mode
data_ = load(data_filename_memmap, mmap_mode='r')
# Record start time
tic = ()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data_, sl) for sl in slices)
# Record the end time
toc = ()
print('\nElapsed time computing the average of couple of slices {:.2f} s\n'.format(toc - tic))
import shutil
# Delete the mapping file at the end
try.
(folder)
except: (folder)
pass
Elapsed time computing the average of couple of slices 0.77 s
2.3 Memory monitoring
This example shows the memory consumption of different parallelism methods.
Creating a Memory Monitor
from psutil import Process
from threading import Thread
class MemoryMonitor(Thread).
"""Monitors memory usage (in MB) in a separate thread.""""
def __init__(self).
super(). __init__() # Call the constructor of the parent class Thread
= False # Flag used to control thread stopping
self.memory_buffer = [] # List used to store memory usage logs
() # Start the thread
def get_memory(self).
"""Get the memory usage of a process and its children.""""
p = Process() # Get the current process
memory = p.memory_info().rss # Get the memory usage of the current process
for c in (): # iterate through all child processes
memory += c.memory_info().rss # Accumulate memory usage of child processes.
return memory
def run(self).
"""The main method of thread running, periodically logging memory usage.""""
memory_start = self.get_memory() # get initial memory usage
while not : # loop when no stop flag is set
self.memory_buffer.append(self.get_memory() - memory_start) # Record the difference between current memory usage and initial memory usage
(0.2) # hibernate for 0.2 seconds
def join(self).
"""Override the join method to set the stop flag and wait for the thread to finish.""""
= True # Set the stop flag
super().join() # Call parent method to wait for thread to end
parallel task
The result returns the list's parallel task:
import time
import numpy as np
def return_big_object(i).
"""Generate and return a large NumPy array object.""""
(.1) # Sleep for 0.1 seconds to simulate time-consuming operations
return i * ((10000, 200), dtype=np.float64)
def accumulator_sum(generator).
"""Accumulate all the values generated by the generator and print the progress.""""
result = 0
for value in generator.
result += value
# print("." , end="", flush=True) # print dot and flush output
# print("") # Print the newline character
return result
from joblib import Parallel, delayed
monitor = MemoryMonitor() # Create an instance of the memory monitor and start the monitor
print("Running tasks with return_as='list'...") # Print information about the running tasks
res = Parallel(n_jobs=2, return_as="list")(
delayed(return_big_object)(i) for i in range(150) # Execute tasks in parallel using joblib's Parallel function
)
res = accumulator_sum(res) # accumulate results
print('All tasks completed and reduced successfully.') # Print the task completion information
# Report memory usage
del res # Clean up the results to avoid memory bounding effects
() # Wait for the memory monitor thread to finish
peak = max(monitor.memory_buffer) / 1e9 # Calculate peak memory usage and convert to GBs
print(f "Peak memory usage: {peak:.2f}GB") # print peak memory usage
Running tasks with return_as='list'...
All tasks completed and reduced successfully.
Peak memory usage: 2.44GB
If it is changed to an output generator, then the memory usage will be greatly reduced:
monitor_gen = MemoryMonitor() # Create an instance of the memory monitor and start the monitoring
print("Running tasks with return_as='generator'...") # Print information about the running tasks
res = Parallel(n_jobs=2, return_as="generator")(
delayed(return_big_object)(i) for i in range(150)
)
res = accumulator_sum(res) # accumulate results
print('All tasks completed and reduced successfully.') # Print the task completion information
# Report memory usage
del res # Clean up the results to avoid memory bounds effects
monitor_gen.join() # Wait for the memory monitor thread to end
peak = max(monitor_gen.memory_buffer) / 1e9 # Calculate peak memory usage and convert to GBs
print(f "Peak memory usage: {peak:.2f}GB") # Print peak memory usage
Running tasks with return_as='generator'...
All tasks completed and reduced successfully.
Peak memory usage: 0.19GB
The following figure shows the memory consumption of the above two approaches, the first case involves storing all the results in memory until the processing is complete, which can lead to a linear increase in memory usage over time. Whereas the second case generator involves streaming i.e. the results are processed in real time and hence there is no need to store all the results in the memory at the same time which reduces the need for memory usage:
import as plt
(0)
(
(monitor.memory_buffer),
label='return_as="list"'
)
(
(monitor_gen.memory_buffer),
label='return_as="generator"'
)
("Time")
([], [])
("Memory usage")
([1e7, 1e8, 1e9], ['10MB', '100MB', '1GB'])
()
()
Further memory savings
The generator in the previous example is keeping the order of task submission. If some process tasks are submitted late but finish earlier than others. The corresponding results are kept in memory to wait for the other tasks to finish. If the tasks have no requirements on the order of result return, for example, just summing all results at the end, you can use generator_unordered to reduce memory consumption. This is shown below:
# Create a handler function that may take a different amount of time per task
def return_big_object_delayed(i).
if (i + 20) % 60.
(0.1)
else: (0.1)
(5)
return i * ((10000, 200), dtype=np.float64)
Returns the memory usage in generator format:
monitor_delayed_gen = MemoryMonitor()
print("Create result generator on delayed tasks with return_as='generator'...")
res = Parallel(n_jobs=2, return_as="generator")(
delayed(return_big_object_delayed)(i) for i in range(150)
)
res = accumulator_sum(res)
print('All tasks completed and reduced successfully.')
del res
monitor_delayed_gen.join()
peak = max(monitor_delayed_gen.memory_buffer) / 1e6
print(f"Peak memory usage: {peak:.2f}MB")
Create result generator on delayed tasks with return_as='generator'...
All tasks completed and reduced successfully.
Peak memory usage: 784.23MB
Returns memory usage in generator_unordered format:
monitor_delayed_gen_unordered = MemoryMonitor()
print(
"Create result generator on delayed tasks with "
"return_as='generator_unordered'..."
)
res = Parallel(n_jobs=2, return_as="generator_unordered")(
delayed(return_big_object_delayed)(i) for i in range(150)
)
res = accumulator_sum(res)
print('All tasks completed and reduced successfully.')
del res
monitor_delayed_gen_unordered.join()
peak = max(monitor_delayed_gen_unordered.memory_buffer) / 1e6
print(f"Peak memory usage: {peak:.2f}MB")
Create result generator on delayed tasks with return_as='generator_unordered'...
All tasks completed and reduced successfully.
Peak memory usage: 175.22MB
The memory usage results are compared below. Based on the generator_unordered option when executing tasks, it is possible to process each task independently without relying on the completion status of other tasks. However, it is important to note that the order in which the results are returned is uncertain due to a variety of factors that may affect the order in which tasks are executed, such as system load, back-end implementation, and so on:
(1)
(
(monitor_delayed_gen.memory_buffer),
label='return_as="generator"'
)
(
(monitor_delayed_gen_unordered.memory_buffer),
label='return_as="generator_unordered"'
)
("Time")
([], [])
("Memory usage")
([1e7, 1e8, 1e9], ['10MB', '100MB', '1GB'])
()
()
3 Reference
- joblib
- joblib-doc
- loky
- parallel_config
- Python data serialization module pickle using notes