Skip to main content

Python Ray Remote Actors

Guangzhou, China

Source: Scaling Python with Ray

Remote Actors allow to make parallel remote procedure calls just like Remote Functions. But unlike the latter they enable you to maintain a state between invocations. To ensure state consistency, actors process one request at a time. An actor can:

  • Store data
  • Receive messages from other actors
  • Pass messages to other actors
  • Create additional child actors

Basic Ray Remote Actor

import ray

# start ray
ray.init()

@ray.remote
class Account:
def __init__(self, balance: float, minimal_balance: float):
# initialize account and balance
self.minimal = minimal_balance
if balance < minimal_balance:
raise Exception("ERROR :: Starting balance is less than minimal balance")
self.balance = balance

def balance(self) -> float:
# get current balance (private state)
return self.balance

def deposit(self, amount: float) -> float:
# take deposit and update balance state
if amount < 0:
raise Exception("ERROR :: Cannot deposit negative amount")
self.balance = self.balance + amount
return self.balance

def withdraw(self, amount: float) -> float:
# release withdraw and update balance state
if amount < 0:
raise Exception("ERROR :: Cannot withdraw negative amount")
balance = self.balance - amount
if balance < self.minimal:
raise Exception("ERROR :: Withdrawal is not supported by current balance")
self.balance = balance
return balance

# invoke remote actor instance
account_actor = Account(name='Account').remote(balance = 99.,minimal_balance=11.)

# do procedure calls to interact with instance
print(f"INFO :: Current Balance: {ray.get(account_actor.balance.remote())}")
print(f"INFO :: Balance after Withdraw: {ray.get(account_actor.withdraw.remote(66.))}")
print(f"INFO :: Balance after Deposit: {ray.get(account_actor.deposit.remote(33.))}")
print(f"INFO :: Balance after Deposit: {ray.get_actor('Account').deposit.remote(33.)}")
python 01_ray-basic-remote-actor.py
2023-01-26 15:55:13,252 INFO worker.py:1529 -- Started a local Ray instance.
View the dashboard at 127.0.0.1:8265
INFO :: Current Balance: 99.0
INFO :: Balance after Withdraw: 33.0
INFO :: Balance after Deposit: 66.0

Detached Actors

You can also run actors in detached mode and kill them manually:

account_actor = Account.options(name='Account', lifetime='detached')\
.remote(balance = 100.,minimal_balance=20.)

print(ray.get_actor('Account'))

ray.kill(account_actor)

print(ray.get_actor('Account'))

A detached actor can also exited from inside using ray.actor.exit_actor.

Retry on Failure

The @ray.remote annotation allows you to specify parameters that specify what Ray should do when a remote host/process goes down:

  • max_restarts: Specify the maximum number of times that the actor should be restarted when it dies unexpectedly. (default 0, unlimited -1)
  • max_task_retries: Specifies the number of times to retry an actor’s task if the task fails because of a system error. (default 0, until max_restarts is reched -1)
  • max_retries: The same as max_restarts but for Remote Functions.
@ray.remote(max_restarts=5, max_task_retries=-1)
class Account:
...

Persistence

We can implement this with the previous 'Account' actor using the local file system to make sure that our actors state is not lost. Changes that we need to make:

  • We need to identify the account to be able to retrieve the information if the state was lost.
  • We need to define a directory where the state should be saved.
  • And an additional function that is called with every state change and writes it to file.
  • And another function that can restore the state from file
import ray
from os.path import exists

ray.init()

@ray.remote
class Account:
def __init__(self, balance: float, minimal_balance: float, account_key: str, basedir: str = '.'):
# locate the file storage location
self.basedir = basedir
# set an account identification
self.key = account_key
# if state was not restored create new from props
if not self.restorestate():
if balance < minimal_balance:
raise Exception("ERROR :: Starting balance is less then minimal balance")
self.balance = balance
self.minimal = minimal_balance
# write generated state to file
self.storestate()

def balance(self) -> float:
# get current balance (private state)
return self.balance

def deposit(self, amount: float) -> float:
# take deposit and update balance state
if amount < 0:
raise Exception("ERROR :: Cannot deposit negative amount")
self.balance = self.balance + amount
self.storestate()
return self.balance

def withdraw(self, amount: float) -> float:
# release withdraw and update balance state
if amount < 0:
raise Exception("ERROR :: Can not withdraw negative amount")
balance = self.balance - amount
if balance < self.minimal:
raise Exception("ERROR :: Withdraw is not supported by current balance")
self.balance = balance
self.storestate()
return balance

def restorestate(self) -> bool:
# if stored state for account id exist load it
if exists(self.basedir + '/' + self.key):
with open(self.basedir + '/' + self.key, "rb") as f:
bytes = f.read()
state = ray.cloudpickle.loads(bytes)
self.balance = state['balance']
self.minimal = state['minimal']
return True
else:
return False

def storestate(self):
# store state to file
bytes = ray.cloudpickle.dumps({'balance' : self.balance, 'minimal' : self.minimal})
with open(self.basedir + '/' + self.key, "wb") as f:
f.write(bytes)

We can now invoke an instance of the actor, make changes to it's state, kill the instance and invoke a new one - which should have been able to restore the state generated before:

# invoke an instance of the account worker
account_actor = Account.options(name='Account')\
.remote(balance=99.,minimal_balance=11., account_key='secretaccountkey')

# make changes to it's default state
print(f"INFO :: Current balance: {ray.get(account_actor.balance.remote())}")
print(f"INFO :: Balance after Withdraw: {ray.get(account_actor.withdraw.remote(66.))}")
print(f"INFO :: Balance after Deposit: {ray.get(account_actor.deposit.remote(33.))}")

# get actor id
print(ray.get_actor('Account'))

# kill the first instance
ray.kill(account_actor)

# and create a new one
account_actor = Account.options(name='Account') \
.remote(balance=99.,minimal_balance=11., account_key='secretaccountkey')

# it should have restored the state from before
print(f"INFO :: Current balance {ray.get(account_actor.balance.remote())}")

# verify that this is a new actor
print(ray.get_actor('Account'))

# kill the first instance
ray.kill(account_actor)
python 02_ray-basic-remote-actor_persistence.py
2023-01-26 19:36:40,104 INFO worker.py:1529 -- Started a local Ray instance.
View the dashboard at 127.0.0.1:8265
INFO :: Current balance: 99.0
INFO :: Balance after Withdraw: 33.0
INFO :: Balance after Deposit: 66.0
Actor(Account, f90108cf73fc420607aed2ca01000000)
INFO :: Current balance 66.0
Actor(Account, 1fcc75b5d96c0c9a9f9b0a1e01000000)

Horizontal Scaling

You can add more processes for horizontal scaling with Ray’s actor pool, provided by the ray.util module. The actor pool effectively uses a fixed set of actors as a single entity and manages which actor in the pool gets the next request. But the state of this pool of actors is not merged - pooling only works if the state is created by the constructor and not changed over the lifecycle of each actor.

Pooling can be imported from from ray.util import ActorPool and be added to our code above by creating the ActorPool of 3 identical FilePersistence() actors (Note that the persistence class now alos needs to be a remote actor so that it can be spread out over several processes / hosts):

pool = ActorPool([FilePersistence.remote(), FilePersistence.remote(), FilePersistence.remote()])

The persistence is now handled by our pool and replaces the direct call of FilePersistence() inside the Account class:

@ray.remote
class Account:
def __init__(self, balance: float, minimal_balance: float, account_key: str, persistence: ActorPool):

And the actor handle has to be pointed to the pool as well:

# invoke an instance of the account worker
account_actor = Account.options(name='Account').remote(balance=99.,minimal_balance=11.,
account_key='secretaccountkey', persistence=pool)

Vertical Scaling

By default, Ray assumes that all functions and actors have the same resource requirements. For actors or functions with different resource requirements, you can specify the resources as parameter for the @ray.remote decorator. The defaults are one CPU and zero GPUs. The following decorator will request 1 CPU, 4 GPUs and 500 MiB of memory for the remote actor:

@ray.remote(num_cpus=1, num_gpus=4, (memory=500 * 1024 * 1024)