Python Ray Remote Functions
Source: Scaling Python with Ray
Ray lets you run functions as remote tasks in the cluster. Remote functions can be run in a separate process on the local machine - spreading out the workload over several cores. Or can be executed on remote machines in your server cluster. They are used for the parallel execution of stateless functions.
Waiting for Results
To execute a function remotely, you have to decorate your function with @ray.remote
. Then, you call that function with .remote()
instead of calling it normally. This remote call returns a future, a so-called Ray object reference, that you can then fetch with ray.get
- which will block the process until the result is returned.
When working with futures that have different execution times - e.g. ML trainings with different batch-sizes - you should use ray.wait
instead, which returns the requested number of futures that have already been completed. We can simulate this with a simple variable sleep timer time.sleep(x)
:
Ray GET
import ray
import time
import timeit
# simulate remote functions with
# different execution times
@ray.remote
def remote_task(x):
time.sleep(x)
return x
# create a list of things
things = list(range(10))
# ensure that the futures won’t complete in order
things.sort(reverse=True)
# GET
## get results when all results are available
def in_order():
### use remote function to retrieve futures
futures = list(map(lambda x: remote_task.remote(x), things))
### ray.get will block your function until all futures are returned
values = ray.get(futures)
### loop over results and print
for v in values:
print(f" Completed {v}")
### simulate some business logic
time.sleep(1)
# call order and see how long it takes to complete
print("GET took: ", timeit.timeit(lambda: in_order(), number=1))
Executing this function returns:
python 01_ray-wait.py
2023-01-23 13:42:34,328 INFO worker.py:1529 -- Started a local Ray instance.
View the dashboard at 127.0.0.1:8265
Completed 9
Completed 8
Completed 7
Completed 6
Completed 5
Completed 4
Completed 3
Completed 2
Completed 1
Completed 0
GET took: 22.13837863399931
Ray WAIT
With ray.wait
we can now process the incoming results as they become available. Since we are simulating some business logic in our local process (time.sleep(1)
) this part can now be executed in parallel with the remote function whenever a new result is returned. This will reduce the overall execution time dramatically:
# WAIT
## process as results become available
def as_available():
### use remote function to retrieve futures
futures = list(map(lambda x: remote_task.remote(x), things))
### while still futures left
while len(futures) > 0:
### call ray.wait to get the next future
ready_futures, rest_futures = ray.wait(futures)
### show progress
print(f"Ready {len(ready_futures)} rest {len(rest_futures)}")
### print results
for id in ready_futures:
print(f'completed value {id}, result {ray.get(id)}')
### simulate some business logic
time.sleep(1)
### wait on the ones that are not yet available
futures = rest_futures
## call order and see how long it takes to complete
print("WAIT took: ", timeit.timeit(lambda: as_available(), number=1))
As predicted we get a much shorter overall execution time - a reduction of ~ 45%
:
Ready 1 rest 9
completed value ObjectRef(c54e76759b2a0c10ffffffffffffffffffffffff0100000001000000), result 2
Ready 1 rest 8
completed value ObjectRef(5d4b8d1788f12d2dffffffffffffffffffffffff0100000001000000), result 3
Ready 1 rest 7
completed value ObjectRef(239c2f70c73fbf73ffffffffffffffffffffffff0100000001000000), result 1
Ready 1 rest 6
completed value ObjectRef(71b133a11e1c461cffffffffffffffffffffffff0100000001000000), result 4
Ready 1 rest 5
completed value ObjectRef(1e360ffa862f8fe3ffffffffffffffffffffffff0100000001000000), result 0
Ready 1 rest 4
completed value ObjectRef(d695f922effe6d99ffffffffffffffffffffffff0100000001000000), result 6
Ready 1 rest 3
completed value ObjectRef(85748392bcd969ccffffffffffffffffffffffff0100000001000000), result 7
Ready 1 rest 2
completed value ObjectRef(1e8ff6d236132784ffffffffffffffffffffffff0100000001000000), result 8
Ready 1 rest 1
completed value ObjectRef(359ec6ce30d3ca2dffffffffffffffffffffffff0100000001000000), result 9
Ready 1 rest 0
completed value ObjectRef(2751d69548dba956ffffffffffffffffffffffff0100000001000000), result 5
WAIT took: 12.03411061800034
Optional Parameter
num_returns
: Number of ObjectRef objects for Ray to wait for completion before returning. This is equal or smaller than the length of the input list.timeout
: The maximum amount of time in seconds to wait before returning. This defaults to−1
- infinite. GET will returnGetTimeoutError
. While WAIT will just stop waiting for more results.fetch_local
: If set to False the results will not be fetched. You only get confirmation that the remote function returned results as expected.
Additionally you can use ray.cancel
to terminate remotely executed functions - since a timeout will not stop them.
## same as above but with timeout and remote cancel
def as_available():
futures = list(map(lambda x: remote_task.remote(x), things))
### while still futures left
while len(futures) > 0:
### call ray.wait to get the next future
### but with a 10s timeout and always collect 5 results before returning anything
ready_futures, rest_futures = ray.wait(futures, timeout=10, num_returns=5)
# if we get back less than num_returns
if len(ready_futures) < 1:
print(f"Timed out on {rest_futures}")
# cancel remote function, e.g. if task is using a lot of resources
ray.cancel(*rest_futures)
break
for id in ready_futures:
print(f'completed value {id}, result {ray.get(id)}')
futures = rest_futures
## call order and see how long it takes to complete
print("WAIT took: ", timeit.timeit(lambda: as_available(), number=1))
Since we are not running into timeouts with our code the output is the same as before. But with num_returns
set to 5
the process will always collect 5 results before returning them - so we will get 2 chunks of our 10 orders.
Composition
The two most common methods of composition with remote functions in Ray are __Pipelining and Nested Parallelism.
Pipelines
Pipelined function use an ObjectRef
objects from an earlier ray.remote
as parameters for a new remote function call. Ray coordinates those pipeline and will send the result from the first remote function directly to the remote host (or process in a local setup) that needs it.
In the following example we have two remote functions. The first one gets an integer value from "somewhere" and the second expects three integers to apply some mathematics on. We can call both functions with a single ray.get
and pipe the result from the first function into the second - and Ray handles the rest for us:
import random
import ray
import time
# first remote function
## retrieve a value from somewhere
@ray.remote
def generate_number(s: int, limit: int, sl: float) -> int :
# create a random seed from value of 's'
random.seed(s)
# wait for value of 'sl'
time.sleep(sl)
# create a random number between '0' and the value of 'limit'
return random.randint(0, limit)
# second remote function
## take values and do some math
@ray.remote
def sum_values(v1: int, v2: int, v3: int) -> int :
# take three integer and add them up
return v1+v2+v3
# use first remote function return as input for second remote function
print(ray.get(sum_values.remote(generate_number.remote(1, 10, .1), generate_number.remote(5, 20, .2), generate_number.remote(7, 15, .3))))
python 02_ray-pipeline.py
2023-01-25 13:49:46,733 INFO worker.py:1529 -- Started a local Ray instance.
View the dashboard at 127.0.0.1:8265
31
Limitation: You can only use
ObjectRef
directly. They cannot be added to a Python construct like a list. Trying to push the results from remote function one as a list does not work - you will get a TypeError because Ray will not execute the functions inside the list but forwards the values as type string:
print(ray.get(sum_values.remote([generate_number.remote(1, 10, .1), generate_number.remote(5, 20, .2), generate_number.remote(7, 15, .3)])))
Nested Parallelism
Ray allows you to start additional remote functions from a remote function itself. There are two approaches to leverage the feature. In the first case remote_objrefs()
we return all the ObjectRef
objects to the invoker of the aggregating function. So we need additional code that loops ove the returned futures and waits for the results.
In the second case remote_values()
the aggregating function waits for all the remote function to execute and returns the actual execution results:
import random
import ray
import time
# retrieve a value from somewhere
@ray.remote
def generate_number(s: int, limit: int) -> int :
# create a random seed from value of 's'
random.seed(s)
# simulate processing time
time.sleep(.1)
# create a random number between '0' and the value of 'limit'
return random.randint(0, limit)
# invoke multiple remote functions and return the 'ObjectRef's
@ray.remote
def remote_objrefs():
results = []
for n in range(4):
results.append(generate_number.remote(n, 4*n))
return results
# store returned 'ObjectRef's from remote_objrefs()
futures = ray.get(remote_objrefs.remote())
# invoke multiple remote functions and return the resulting values directly
@ray.remote
def remote_values():
results = []
for n in range(4):
results.append(generate_number.remote(n, 4*n))
return ray.get(results)
# print returned results
print(ray.get(remote_values.remote()))
# take returned 'ObjectRef's and get results
while len(futures) > 0:
ready_futures, rest_futures = ray.wait(futures, timeout=600, num_returns=1)
# break when the return is smaller than num_returns
if len(ready_futures) < 1:
ray.cancel(*rest_futures)
break
for id in ready_futures:
print(f'completed result {ray.get(id)}')
futures = rest_futures
python 03_ray_nested_parallelism.py
2023-01-25 15:14:11,235 INFO worker.py:1529 -- Started a local Ray instance.
View the dashboard at 127.0.0.1:8265
[0, 1, 0, 3]
completed result 0
completed result 3
completed result 1
completed result 0