Skip to main content

Python Ray Remote Functions

Guangzhou, China

Source: Scaling Python with Ray

Ray lets you run functions as remote tasks in the cluster. Remote functions can be run in a separate process on the local machine - spreading out the workload over several cores. Or can be executed on remote machines in your server cluster. They are used for the parallel execution of stateless functions.

Waiting for Results

To execute a function remotely, you have to decorate your function with @ray.remote. Then, you call that function with .remote() instead of calling it normally. This remote call returns a future, a so-called Ray object reference, that you can then fetch with ray.get - which will block the process until the result is returned.

When working with futures that have different execution times - e.g. ML trainings with different batch-sizes - you should use ray.wait instead, which returns the requested number of futures that have already been completed. We can simulate this with a simple variable sleep timer time.sleep(x):


import ray
import time
import timeit

# simulate remote functions with
# different execution times
def remote_task(x):
return x

# create a list of things
things = list(range(10))
# ensure that the futures won’t complete in order


## get results when all results are available
def in_order():
### use remote function to retrieve futures
futures = list(map(lambda x: remote_task.remote(x), things))
### ray.get will block your function until all futures are returned
values = ray.get(futures)
### loop over results and print
for v in values:
print(f" Completed {v}")
### simulate some business logic

# call order and see how long it takes to complete
print("GET took: ", timeit.timeit(lambda: in_order(), number=1))

Executing this function returns:

2023-01-23 13:42:34,328 INFO -- Started a local Ray instance.
View the dashboard at
Completed 9
Completed 8
Completed 7
Completed 6
Completed 5
Completed 4
Completed 3
Completed 2
Completed 1
Completed 0
GET took: 22.13837863399931


With ray.wait we can now process the incoming results as they become available. Since we are simulating some business logic in our local process (time.sleep(1)) this part can now be executed in parallel with the remote function whenever a new result is returned. This will reduce the overall execution time dramatically:


## process as results become available
def as_available():
### use remote function to retrieve futures
futures = list(map(lambda x: remote_task.remote(x), things))
### while still futures left
while len(futures) > 0:
### call ray.wait to get the next future
ready_futures, rest_futures = ray.wait(futures)
### show progress
print(f"Ready {len(ready_futures)} rest {len(rest_futures)}")
### print results
for id in ready_futures:
print(f'completed value {id}, result {ray.get(id)}')
### simulate some business logic
### wait on the ones that are not yet available
futures = rest_futures

## call order and see how long it takes to complete
print("WAIT took: ", timeit.timeit(lambda: as_available(), number=1))

As predicted we get a much shorter overall execution time - a reduction of ~ 45%:

Ready 1 rest 9
completed value ObjectRef(c54e76759b2a0c10ffffffffffffffffffffffff0100000001000000), result 2
Ready 1 rest 8
completed value ObjectRef(5d4b8d1788f12d2dffffffffffffffffffffffff0100000001000000), result 3
Ready 1 rest 7
completed value ObjectRef(239c2f70c73fbf73ffffffffffffffffffffffff0100000001000000), result 1
Ready 1 rest 6
completed value ObjectRef(71b133a11e1c461cffffffffffffffffffffffff0100000001000000), result 4
Ready 1 rest 5
completed value ObjectRef(1e360ffa862f8fe3ffffffffffffffffffffffff0100000001000000), result 0
Ready 1 rest 4
completed value ObjectRef(d695f922effe6d99ffffffffffffffffffffffff0100000001000000), result 6
Ready 1 rest 3
completed value ObjectRef(85748392bcd969ccffffffffffffffffffffffff0100000001000000), result 7
Ready 1 rest 2
completed value ObjectRef(1e8ff6d236132784ffffffffffffffffffffffff0100000001000000), result 8
Ready 1 rest 1
completed value ObjectRef(359ec6ce30d3ca2dffffffffffffffffffffffff0100000001000000), result 9
Ready 1 rest 0
completed value ObjectRef(2751d69548dba956ffffffffffffffffffffffff0100000001000000), result 5
WAIT took: 12.03411061800034

Optional Parameter

  • num_returns: Number of ObjectRef objects for Ray to wait for completion before returning. This is equal or smaller than the length of the input list.
  • timeout: The maximum amount of time in seconds to wait before returning. This defaults to −1 - infinite. GET will return GetTimeoutError. While WAIT will just stop waiting for more results.
  • fetch_local: If set to False the results will not be fetched. You only get confirmation that the remote function returned results as expected.

Additionally you can use ray.cancel to terminate remotely executed functions - since a timeout will not stop them.

## same as above but with timeout and remote cancel
def as_available():
futures = list(map(lambda x: remote_task.remote(x), things))
### while still futures left
while len(futures) > 0:
### call ray.wait to get the next future
### but with a 10s timeout and always collect 5 results before returning anything
ready_futures, rest_futures = ray.wait(futures, timeout=10, num_returns=5)
# if we get back less than num_returns
if len(ready_futures) < 1:
print(f"Timed out on {rest_futures}")
# cancel remote function, e.g. if task is using a lot of resources
for id in ready_futures:
print(f'completed value {id}, result {ray.get(id)}')
futures = rest_futures

## call order and see how long it takes to complete
print("WAIT took: ", timeit.timeit(lambda: as_available(), number=1))

Since we are not running into timeouts with our code the output is the same as before. But with num_returns set to 5 the process will always collect 5 results before returning them - so we will get 2 chunks of our 10 orders.


The two most common methods of composition with remote functions in Ray are __Pipelining and Nested Parallelism.


Pipelined function use an ObjectRef objects from an earlier ray.remote as parameters for a new remote function call. Ray coordinates those pipeline and will send the result from the first remote function directly to the remote host (or process in a local setup) that needs it.

In the following example we have two remote functions. The first one gets an integer value from "somewhere" and the second expects three integers to apply some mathematics on. We can call both functions with a single ray.get and pipe the result from the first function into the second - and Ray handles the rest for us:

import random
import ray
import time

# first remote function
## retrieve a value from somewhere
def generate_number(s: int, limit: int, sl: float) -> int :
# create a random seed from value of 's'
# wait for value of 'sl'
# create a random number between '0' and the value of 'limit'
return random.randint(0, limit)

# second remote function
## take values and do some math
def sum_values(v1: int, v2: int, v3: int) -> int :
# take three integer and add them up
return v1+v2+v3

# use first remote function return as input for second remote function
print(ray.get(sum_values.remote(generate_number.remote(1, 10, .1), generate_number.remote(5, 20, .2), generate_number.remote(7, 15, .3))))
2023-01-25 13:49:46,733 INFO -- Started a local Ray instance.
View the dashboard at

Limitation: You can only use ObjectRef directly. They cannot be added to a Python construct like a list. Trying to push the results from remote function one as a list does not work - you will get a TypeError because Ray will not execute the functions inside the list but forwards the values as type string:

print(ray.get(sum_values.remote([generate_number.remote(1, 10, .1), generate_number.remote(5, 20, .2), generate_number.remote(7, 15, .3)])))

Nested Parallelism

Ray allows you to start additional remote functions from a remote function itself. There are two approaches to leverage the feature. In the first case remote_objrefs() we return all the ObjectRef objects to the invoker of the aggregating function. So we need additional code that loops ove the returned futures and waits for the results.

In the second case remote_values() the aggregating function waits for all the remote function to execute and returns the actual execution results:

import random
import ray
import time

# retrieve a value from somewhere
def generate_number(s: int, limit: int) -> int :
# create a random seed from value of 's'
# simulate processing time
# create a random number between '0' and the value of 'limit'
return random.randint(0, limit)

# invoke multiple remote functions and return the 'ObjectRef's
def remote_objrefs():
results = []
for n in range(4):
results.append(generate_number.remote(n, 4*n))
return results

# store returned 'ObjectRef's from remote_objrefs()
futures = ray.get(remote_objrefs.remote())

# invoke multiple remote functions and return the resulting values directly
def remote_values():
results = []
for n in range(4):
results.append(generate_number.remote(n, 4*n))
return ray.get(results)

# print returned results

# take returned 'ObjectRef's and get results
while len(futures) > 0:
ready_futures, rest_futures = ray.wait(futures, timeout=600, num_returns=1)
# break when the return is smaller than num_returns
if len(ready_futures) < 1:
for id in ready_futures:
print(f'completed result {ray.get(id)}')
futures = rest_futures
2023-01-25 15:14:11,235 INFO -- Started a local Ray instance.
View the dashboard at
[0, 1, 0, 3]
completed result 0
completed result 3
completed result 1
completed result 0