Location>code7788 >text

Installation and Use of Ray, a Distributed Framework in Python

Popularity:556 ℃/2024-08-28 11:25:56

Technical background

Assuming we have multiple workstations (not servers) on a LAN, is there a simple solution to implement a small cluster that submits distributed tasks?Rayprovides us with a great solution that allows you to flexibly build clustered environments and submit distributed tasks via conda and Python. Its basic architecture is:

So this article briefly describes the installation and basic use of Ray.

mounting

Since it is a Python framework, Ray can be installed and managed directly using pip:

$ python3 -m pip install ray[default]

However, it is important to note that you need to standardize the versions of Python and Ray on all the devices you need to build the cluster, so it is recommended that you create the same virtual environment using conda before installing the standardized version of ray. otherwise, the following problems may occur when adding cluster nodes:

RuntimeError: Version mismatch: The cluster was started with:
    Ray: 2.7.2
    Python: 3.7.13
This process on node 172.17.0.2 was started with:
    Ray: 2.7.2
    Python: 3.7.5

Starting and connecting to services

You can usually configure the key login first when configuring the cluster:

$ ssh-keygen -t rsa
$ ssh-copy-id user_name@ip_address

In just two steps, you can configure the remote server ssh password-free login (there is a possibility that you need to enter the password once during the configuration process). Then start the ray service on the master node (configure a master node):

$ ray start --head --dashboard-host='0.0.0.0' --dashboard-port=8265
Usage stats collection is enabled. To disable this, add `--disable-usage-stats` to the command that starts the cluster, or run the following command: `ray disable-usage-stats` before starting the cluster. See /en/master/cluster/ for more details.

Local node IP: 

--------------------
Ray runtime started.
--------------------

Next steps
  To add another node to this Ray cluster, run
    ray start --address=':6379'

  To connect to this Ray cluster:
    import ray
    ()

  To submit a Ray job using the Ray Jobs CLI:
    RAY_ADDRESS=':8265' ray job submit --working-dir . -- python my_script.py

  See /en/latest/cluster/running-applications/job-submission/
  for more information on submitting Ray jobs to the Ray cluster.

  To terminate the Ray runtime, run
    ray stop

  To view the status of the cluster, use
    ray status

  To monitor and debug Ray, view the dashboard at
    :8265

  If connection to the dashboard fails, check your firewall settings and network configuration.

This boots up to completion and gives you instructions on what to do next, such as configuring additions to the cluster on another node, which can be done using the command:

$ ray start --address=':6379'

But as mentioned earlier, the Python and Ray versions are required to be the same here, and if they are not, you will get this error:

RuntimeError: Version mismatch: The cluster was started with:
    Ray: 2.7.2
    Python: 3.7.13
This process on node 172.17.0.2 was started with:
    Ray: 2.7.2
    Python: 3.7.5

Here in fact Ray cluster has been deployed, very simple and convenient.

Basic use

Let's start with one of the simplest cases to test it out:

# test_ray.py 
import os
import ray

()

print('''This cluster consists of
    {} nodes in total
    {} CPU resources in total
'''.format(len(()), ray.cluster_resources()['CPU']))

This Python script prints the compute resources of the remote node, so we can submit a local job in this way:

$ RAY_ADDRESS=':8265' ray job submit --working-dir . -- python test_ray.py 
Job submission server address: :8265
2024-08-27 07:35:10,751 INFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_4b79155b5de665ce.zip.
2024-08-27 07:35:10,751 INFO :518 -- Creating a file package for local directory '.'.

-------------------------------------------------------
Job 'raysubmit_7Uqy8LjP4dxjZxGa' submitted successfully
-------------------------------------------------------

Next steps
  Query the logs of the job:
    ray job logs raysubmit_7Uqy8LjP4dxjZxGa
  Query the status of the job:
    ray job status raysubmit_7Uqy8LjP4dxjZxGa
  Request the job to be stopped:
    ray job stop raysubmit_7Uqy8LjP4dxjZxGa

Tailing logs until the job exits (disable with --no-wait):
2024-08-27 15:35:14,079 INFO :1330 -- Using address :6379 set in the environment variable RAY_ADDRESS
2024-08-27 15:35:14,079 INFO :1458 -- Connecting to existing Ray cluster at address: :6379...
2024-08-27 15:35:14,103 INFO :1639 -- Connected to Ray cluster. View the dashboard at :8265 
This cluster consists of
    1 nodes in total
    48.0 CPU resources in total


------------------------------------------
Job 'raysubmit_7Uqy8LjP4dxjZxGa' succeeded
------------------------------------------

The information here indicates that the remote cluster has only one node with 48 available CPU core resources on that node. This output can be seen not only in the terminal window, but also in more detailed task management from the dashboard link given here:

Here, by the way, we also submit a command that outputs the software location information to confirm that the task is being executed remotely and not locally:

import ray

()

import numpy as np
print (np.__file__)

The returned log is:

$ RAY_ADDRESS=':8265' ray job submit --working-dir . -- python test_ray.py 
Job submission server address: :8265
2024-08-27 07:46:10,645 INFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_5bba1a7144beb522.zip.
2024-08-27 07:46:10,658 INFO :518 -- Creating a file package for local directory '.'.

-------------------------------------------------------
Job 'raysubmit_kQ3XgE4Hxp3dkmuU' submitted successfully
-------------------------------------------------------

Next steps
  Query the logs of the job:
    ray job logs raysubmit_kQ3XgE4Hxp3dkmuU
  Query the status of the job:
    ray job status raysubmit_kQ3XgE4Hxp3dkmuU
  Request the job to be stopped:
    ray job stop raysubmit_kQ3XgE4Hxp3dkmuU

Tailing logs until the job exits (disable with --no-wait):
2024-08-27 15:46:12,456 INFO :1330 -- Using address :6379 set in the environment variable RAY_ADDRESS
2024-08-27 15:46:12,457 INFO :1458 -- Connecting to existing Ray cluster at address: :6379...
2024-08-27 15:46:12,470 INFO :1639 -- Connected to Ray cluster. View the dashboard at :8265 
/home/dechin/anaconda3/envs/mindspore-latest/lib/python3.7/site-packages/numpy/__init__.py

------------------------------------------
Job 'raysubmit_kQ3XgE4Hxp3dkmuU' succeeded
------------------------------------------

$ python3 -m pip show numpy
Name: numpy
Version: 1.21.6
Summary: NumPy is the fundamental package for array computing with Python.
Home-page: 
Author: Travis E. Oliphant et al.
Author-email: 
License: BSD
Location: /usr/local/python-3.7.5/lib/python3.7/site-packages
Requires: 
Required-by: CyFES, h5py, hadder, matplotlib, mindinsight, mindspore, mindspore-serving, pandas, ray, scikit-learn, scipy

Here you can see that the numpy in the committed task is saved in the mindspore-latest virtual environment and the local numpy is not in the virtual environment, indicating that the task is indeed executed remotely. Similarly you can see the commit log on top of the dashboard:

Next test the concurrency properties of the distributed framework ray:

import ray

()

@(num_returns=1)
def cpu_task():
    import time
    (2)
    import numpy as np
    nums = 100000
    arr = ((2, nums))
    arr2 = arr[1]**2 + arr[0]**2
    pi = (arr2<=1, 1, 0).sum() * 4 / nums
    return pi

num_conc = 10
res = ([cpu_task.remote() for _ in range(num_conc)])
print (sum(res) / num_conc)

This case is about calculating the value of pi using Monte Carlo algorithm, submitting 10 tasks at a time with 100000 points sprinkled in each task and sleep for 2s. then if it is executed sequentially, theoretically it needs to sleep for 20s. and here after submitting the tasks, the output is as follows:

$ time RAY_ADDRESS=':8265' ray job submit --working-dir . --entrypoint-num-cpus 10 -- python te
st_ray.py 
Job submission server address: :8265
2024-08-27 08:30:13,315 INFO dashboard_sdk.py:385 -- Package gcs://_ray_pkg_d66b052eb6944465.zip already exists, skipping upload.

-------------------------------------------------------
Job 'raysubmit_Ur6MAvP7DYiCT6Uz' submitted successfully
-------------------------------------------------------

Next steps
  Query the logs of the job:
    ray job logs raysubmit_Ur6MAvP7DYiCT6Uz
  Query the status of the job:
    ray job status raysubmit_Ur6MAvP7DYiCT6Uz
  Request the job to be stopped:
    ray job stop raysubmit_Ur6MAvP7DYiCT6Uz

Tailing logs until the job exits (disable with --no-wait):
2024-08-27 16:30:15,032 INFO :1330 -- Using address :6379 set in the environment variable RAY_ADDRESS
2024-08-27 16:30:15,033 INFO :1458 -- Connecting to existing Ray cluster at address: :6379...
2024-08-27 16:30:15,058 INFO :1639 -- Connected to Ray cluster. View the dashboard at :8265 
3.141656

------------------------------------------
Job 'raysubmit_Ur6MAvP7DYiCT6Uz' succeeded
------------------------------------------


real    0m7.656s
user    0m0.414s
sys     0m0.010s

The total runtime is at 7.656 seconds, with about 5s of that time coming from the network delay. so actually the total runtime after concurrency is right around 2s, which is about the same as a single task hibernating. That is to say, the remotely submitted tasks are indeed concurrently executed. The final returned result is summed and processed to get an estimate of the circumference:3.141656. And you can upload GPU tasks in addition to regular CPU tasks:

import ray

()

@(num_returns=1, num_gpus=1)
def test_ms():
    import os
    ['GLOG_v']='4'
    ['CUDA_VISIBLE_DEVICE']='0'
    import mindspore as ms
    ms.set_context(device_target="GPU", device_id=0)
    a = ([1, 2, 3], ms.float32)
    return ().sum()

res = (test_ms.remote())
()
print (res)

This task simply creates a Tensor using mindspore and calculates the sum of the Tensor to return to the local, the output reads:

$ RAY_ADDRESS=':8265' ray job submit --working-dir . --entrypoint-num-gpus 1 -- python test_ray.py 
Job submission server address: :8265
2024-08-28 01:16:38,712 INFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_10019cd9fa9bdc38.zip.
2024-08-28 01:16:38,712 INFO :518 -- Creating a file package for local directory '.'.

-------------------------------------------------------
Job 'raysubmit_RUvkEqnkjNitKmnJ' submitted successfully
-------------------------------------------------------

Next steps
  Query the logs of the job:
    ray job logs raysubmit_RUvkEqnkjNitKmnJ
  Query the status of the job:
    ray job status raysubmit_RUvkEqnkjNitKmnJ
  Request the job to be stopped:
    ray job stop raysubmit_RUvkEqnkjNitKmnJ

Tailing logs until the job exits (disable with --no-wait):
2024-08-28 09:16:41,960 INFO :1330 -- Using address :6379 set in the environment variable RAY_ADDRESS
2024-08-28 09:16:41,960 INFO :1458 -- Connecting to existing Ray cluster at address: :6379...
2024-08-28 09:16:41,974 INFO :1639 -- Connected to Ray cluster. View the dashboard at :8265 
6.0

------------------------------------------
Job 'raysubmit_RUvkEqnkjNitKmnJ' succeeded
------------------------------------------

The returned calculation is 6.0, which is then also correct.

Viewing and managing tasks

The previous task output information has a corresponding job_id, we can view the execution of the relevant task on top of the master node based on this job_id:

$ ray job status raysubmit_RUvkEqnkjNitKmnJ

You can view the output of this task:

$ ray job logs raysubmit_RUvkEqnkjNitKmnJ

It is also possible to terminate the task:

$ ray job stop raysubmit_RUvkEqnkjNitKmnJ

Summary outline

This article introduces the basic installation and use of Python-based distributed framework Ray. Ray framework not only through conda and Python is very convenient to build a cluster, but also automatically concurrent processing of distributed tasks, and support for GPU distributed task submission, greatly simplifying the manual distributed development workload.

copyright statement

This article was first linked to:/dechinphy/p/

Author ID: DechinPhy

More original articles:/dechinphy/

Buy the blogger coffee:/dechinphy/gallery/image/