Python Ray Deployments
Source: Scaling Python with Ray
Deployments with Ray
Use Ray Serve for implementing a general-purpose microservice framework and how to use this framework for model serving. Ray Serve is implemented on top of Ray with Ray actors. Three kinds of actors are created to make up a Serve instance:
Ray Actor | Description |
---|---|
Controller | The controller is responsible for creating, updating, and destroying other actors. All of the Serve API calls (e.g., creating or getting a deployment) use the controller for their execution. |
Router | There is one router per node. Each router is a HTTP server that accepts incoming requests, forwards them to replicas, and responds after they are completed. |
Worker Replica | Worker replicas execute the user-defined code in response to a request. |
Simple Deployment
A deployment defines the logic that will handle incoming requests and the way this logic is exposed over
HTTP or in Python. A deployment is defined by the annotation @serve.deployment
and since it will start Ray actors it can be vertically scaled accordingly. Here we can also define the route prefix for the HTTP entpoint we want to use for the API calls to the actor:
@serve.deployment(ray_actor_options={"num_cpus": 2, "num_gpus":1}, route_prefix="/converter")
import ray
from ray import serve
import requests
from starlette.requests import Request
# start Ray
ray.init()
# start Serve
serve.start()
#define deployment
@serve.deployment
class Converter:
def __call__(self, request):
if request.query_params["conversion"] == 'CF' :
return {"INFO :: Fahrenheit temperature":
9.0/5.0 * float(request.query_params["temp"]) + 32.0}
elif request.query_params["conversion"] == 'FC' :
return {"INFO :: Celsius temperature":
(float(request.query_params["temp"]) - 32.0) * 5.0/9.0 }
else:
return {"ERROR :: Unknown conversion code" : request.query_params["conversion"]}
Converter.deploy()
# list current deployment
print(serve.list_deployments())
With serve.start()
an ingress HTTP server is started on only the head node. If your deployments begin to exceed about three thousand requests per second, you can start an HTTP server on every node by using:
serve.start(http_options={"location":"EveryNode"})
The rest of the code starts an API server that can converts incoming temperature readings into degree Celsius or Fahrenheit depending on the appended Parameter. The HTTP Server will run on port 8000
by default and serve our function on the endpoint /Converter
for us. When running this code I ran into an error message that fastAPI was missing as a dependency:
Error missing dependencies:
ModuleNotFoundError: No module named 'fastapi'. You can run
pip install "ray[serve]"to install all Ray Serve dependencies.
With ray[serve]
installed I can now run 3 request trying out the 3 possible API responses:
Converting:
- Celsius -> Fahrenheit
- Fahrenheit -> Celsius
- an unknown unit
# query our endpoint over http
print(requests.get("http://127.0.0.1:8000/Converter?temp=999.0&conversion=CF").text)
print(requests.get("http://127.0.0.1:8000/Converter?temp=999.0&conversion=FC").text)
print(requests.get("http://127.0.0.1:8000/Converter?temp=999.0&conversion=CC").text)
This returns the following responses:
python 01_ray_simple_deployments.py
{'Converter': Deployment(name=Converter,version=None,route_prefix=/Converter)}
{"INFO :: Fahrenheit temperature": 1830.2}
{"INFO :: Celsius temperature": 537.2222222222222}
{"ERROR :: Unknown conversion code": "CC"}
The API request can also be handled with a regular ray.get()
by using the deployment handle - in our case Converter
:
# direct invocation
handle = serve.get_deployment('Converter').get_handle()
print(ray.get(handle.remote(Request({"type": "http", "query_string": b"temp=666.0&type=CF"}))))
print(ray.get(handle.remote(Request({"type": "http", "query_string": b"temp=666.0&type=FC"}))))
print(ray.get(handle.remote(Request({"type": "http", "query_string": b"temp=666.0&type=CC"}))))
The return is identical to the HTTP request earlier (we now get two responses - one for the vallu 999.0
and for 666.0
):
python 01_ray_simple_deployments.py
{'Converter': Deployment(name=Converter,version=None,route_prefix=/Converter)}
{"INFO :: Fahrenheit temperature": 1830.2}
{"INFO :: Celsius temperature": 537.2222222222222}
{"ERROR :: Unknown conversion code": "CC"}
{'INFO :: Fahrenheit temperature': 1230.8}
{'INFO :: Celsius temperature': 352.22222222222223}
{'ERROR :: Unknown conversion code': 'CC'}
All of this can be simplified to:
# simplifying the converter
@serve.deployment
class ConverterSimple:
def celcius_fahrenheit(self, temp):
return 9.0/5.0 * temp + 32.0
def fahrenheit_celcius(self, temp):
return (temp - 32.0) * 5.0/9.0
ConverterSimple.deploy()
# list current deployment
print(serve.list_deployments())
handleSimple = serve.get_deployment('ConverterSimple').get_handle()
print(ray.get(handleSimple.celcius_fahrenheit.remote(333.0)))
print(ray.get(handleSimple.fahrenheit_celcius.remote(333.0)))
And we get the following return:
{'Converter': Deployment(name=Converter,version=None,route_prefix=/Converter), 'ConverterSimple': Deployment(name=ConverterSimple,version=None,route_prefix=/ConverterSimple)}
631.4
167.22222222222223
Scaled Deployment
By default, deployment.deploy
creates a single instance of a deployment. By specifying the number of replicas in @serve.deployment
, you can scale out a deployment. I will also add a UUID to each instance that is being created so we can differentiate them:
# simplifying the converter
# adding scale factor
@serve.deployment(num_replicas=2)
class ConverterSimple:
# generate instance id
def __init__ (self):
self.id = str(uuid4())
def celcius_fahrenheit(self, temp):
output = self.id, "INFO :: Fahrenheit temperature", 9.0/5.0 * temp + 32.0
return output
def fahrenheit_celcius(self, temp):
output = self.id, "INFO :: Celsius temperature", (temp - 32.0) * 5.0/9.0
return output
If we run the code again we can now see that 2 instances are created and incoming requests are load-balanced in a round-robin fashion (the replies come from 2 different IDs):
(ServeController pid=53210) INFO 2023-01-29 16:39:57,500 controller 53210 deployment_state.py:1310 - Adding 2 replicas to deployment 'ConverterSimple'.
{'Converter': Deployment(name=Converter,version=None,route_prefix=/Converter), 'ConverterSimple': Deployment(name=ConverterSimple,version=None,route_prefix=/ConverterSimple)}
('cf9a204c-8337-4fe9-b301-43cd0f4fbeb1', 'INFO :: Fahrenheit temperature', 631.4)
('6b150f9a-ba6d-48b0-95ac-bbbd6de2e5ae', 'INFO :: Celsius temperature', 167.22222222222223)
('cf9a204c-8337-4fe9-b301-43cd0f4fbeb1', 'INFO :: Fahrenheit temperature', 431.6)
('6b150f9a-ba6d-48b0-95ac-bbbd6de2e5ae', 'INFO :: Celsius temperature', 105.55555555555556)
Request Routing with FastAPI
In the previous example we had to define the conversion type to be handed the correct conversion method for our value input. A cleaner option is to use FastAPI to generate different URL endpoints for every method:
#define deployment
@serve.deployment(route_prefix="/converter")
@serve.ingress(app)
class Converter:
@app.get("/to_fahrenheit")
def celcius_fahrenheit(self, temp):
return {"INFO :: Fahrenheit temperature": 9.0/5.0 * float(temp) + 32.0}
@app.get("/to_celsius")
def fahrenheit_celcius(self, temp):
return {"INFO :: Celsius temperature": (float(temp) - 32.0) * 5.0/9.0}
This changes the API endpoints to /converter/to_fahrenheit
and /converter/to_celsius
and we can use them with:
# Query our endpoint over HTTP.
print(requests.get("http://127.0.0.1:8000/converter/to_fahrenheit?temp=1111.0&").text)
print(requests.get("http://127.0.0.1:8000/converter/to_celsius?temp=1111.0").text)
The direct invocation is not affected by this change:
handle = serve.get_deployment('FastConverter').get_handle()
print(ray.get(handle.celcius_fahrenheit.remote(555.0)))
print(ray.get(handle.fahrenheit_celcius.remote(555.0)))
python 02_ray_fastAPI_deployments.py
{'FastConverter': Deployment(name=FastConverter,version=None,route_prefix=/converter)}
{"INFO :: Fahrenheit temperature":2031.8}
{"INFO :: Celsius temperature":599.4444444444445}
{'INFO :: Fahrenheit temperature': 1031.0}
{'INFO :: Celsius temperature': 290.55555555555554}
Deployment Composition
Make deployments that are compositions of further deployments.
Canary Deployments
One example is the slow rollout of a new version of your service to a limited amount of incoming request, e.g.:
@serve.deployment
def version_one(data):
return {"version": "I am the old version..."}
version_one.deploy()
@serve.deployment
def version_two(data):
return {"version": "I am the new version!"}
version_two.deploy()
To implement the canary deployment we can use the following code:
@serve.deployment(route_prefix="/get_version")
class Canary:
def __init__(self, canary_percent):
from random import random
self.version_one = version_one.get_handle()
self.version_two = version_two.get_handle()
self.canary_percent = canary_percent
# This method can be called concurrently!
async def __call__(self, request):
data = await request.body()
if(random() > self.canary_percent):
return await self.version_one.remote(data=data)
else:
return await self.version_two.remote(data=data)
# invocation with canary_percent of 20%
Canary.deploy(.2)
Now we can call the API 10 time and see the NEW/OLD distribution of the responses we get:
# call api 10 times
for _ in range(10):
resp = requests.get("http://127.0.0.1:8000/get_version", data="some string, doesn't matter")
print(resp.json())
This makes sure that if there is an issue with the new version only a limited amount of requests will be affected by it:
{'version': 'I am the new version!'}
{'version': 'I am the old version...'}
{'version': 'I am the old version...'}
{'version': 'I am the old version...'}
{'version': 'I am the old version...'}
{'version': 'I am the old version...'}
{'version': 'I am the old version...'}
{'version': 'I am the new version!'}
{'version': 'I am the old version...'}
{'version': 'I am the old version...'}