Skip to main content

Python Ray Model Serving

Guangzhou, China

Source: Scaling Python with Ray

I earlier looked at Wine Dataset and used a couple of SciKit Learn Classifier to fit different models to this data to make a binary classification "Is this wine any good?" based on a set of features provided by the dataset.

Export the trained Classifier Models

To be able to serve such a model in Ray we need to export it using Pickle. Since we already found that only the Random Forrest and the XGBoost Classifier give us usable results we only need to export those by adding the following code (don't forget to import pickle) to the training file:

# DEPLOY MODELS

RANDOM_FOREST_WHITE_MODEL_PATH = os.path.join("models/wine-white-quality_random_forest.pkl")
RANDOM_FOREST_RED_MODEL_PATH = os.path.join("models/wine-red-quality_random_forest.pkl")
XGBOOST_WHITE_MODEL_PATH = os.path.join("models/wine-white-quality_xgboost.pkl")
XGBOOST_RED_MODEL_PATH = os.path.join("models/wine-red-quality_xgboost.pkl")

XGBoostModelRed.deploy(XGBOOST_RED_MODEL_PATH)
XGBoostModelWhite.deploy(XGBOOST_WHITE_MODEL_PATH)
RandomForestModelRed.deploy(RANDOM_FOREST_RED_MODEL_PATH)
RandomForestModelWhite.deploy(RANDOM_FOREST_WHITE_MODEL_PATH)

Serving the Models

As in our previous deployments we now have to use @serve.deployment to define a Ray remote actor that loads our exported model and, defines the API (what feature inputs does the model expect) and then run predictions based on incoming API requests:

# XGBOOST - RED WINE

#define deployment
@serve.deployment(route_prefix="/xgboost/red_wines")
class XGBoostModelRed:
def __init__(self, path):
with open(path, "rb") as f:
self.model = pickle.load(f)

async def __call__(self, request):
payload = await request.json()
return self.serve(payload)

def serve(self, request):
model_features = np.array([
request["fixed acidity"],
request["volatile acidity"],
request["citric acid"],
request["residual sugar"],
request["chlorides"],
request["free sulfur dioxide"],
request["total sulfur dioxide"],
request["density"],
request["pH"],
request["sulphates"],
request["alcohol"],
])
prediction = self.model.predict(model_features.reshape(1,11))[0]
return {"result": str(prediction)}

XGBoostModelRed.deploy(XGBOOST_RED_MODEL_PATH)

For the prediction I pretend that I received a CSV file with measurements, load them into a dataframe and run the same scaler on them as I did for the trainings data:

# normalize prediction example
df_input = pd.read_csv("data/prediction_example.csv")
df_norm = StandardScaler().fit_transform(df_input)

Now I can prepare an API request to my prediction service by taking the first row of the file:

sample_request_input_red = {
"fixed acidity": df_norm[0][0],
"volatile acidity": df_norm[0][1],
"citric acid": df_norm[0][2],
"residual sugar": df_norm[0][3],
"chlorides": df_norm[0][4],
"free sulfur dioxide": df_norm[0][5],
"total sulfur dioxide": df_norm[0][6],
"density": df_norm[0][7],
"pH": df_norm[0][8],
"sulphates": df_norm[0][9],
"alcohol": df_norm[0][10],
}

Since this measurement came from a red wine I will run it against a model trained on red wine data:

print(requests.get("http://localhost:8000/xgboost/red_wines", json=sample_request_input_red).text)

As a result I will receive either a 0 or 1 - the latter meaning that the quality of this wine was judged to be a 7 out of 10 or better:

{"result": "1"}

Speculative Model Serving

Speculative Model Serving is an optimization technique where a computer system performs some task that may not be needed. Work is done before it is known whether it is actually needed, so as to prevent a delay that would have to be incurred by doing the work after it is known that it is needed.

This technique can be used to build a Consensus-based Model Serving. I, currently, I only use two models - this example would make more sense if I had at least 3. But we can still build a prediction API that only returns a result if both models agree on the result:

@serve.deployment(route_prefix="/speculative")
class Speculative:
def __init__(self):
self.rfhandle = RandomForestModel.get_handle(sync=False)
self.xgboosthandle = XGBoostModel.get_handle(sync=False)
async def __call__(self, request):
payload = await request.json()
f1, f2 = await asyncio.gather(self.rfhandle.serve.remote(payload),
self.xgboosthandle.serve.remote(payload))

rfresurlt = ray.get(f1)['result']
xgresurlt = ray.get(f2)['result']
ones = []
zeros = []
if rfresurlt == "1":
ones.append("Random forest")
else:
zeros.append("Random forest")
if xgresurlt == "1":
ones.append("XGBoost")
else:
zeros.append("XGBoost")
if len(ones) >= 2:
return {"result": "1", "methods": ones}
else:
return {"result": "0", "methods": zeros}

The call method here gets the payload and starts executing all three models in parallel and then waits for all to complete. Once all the results are in, the consensus is calculated and the result returned along with methods that voted for it:

:: Random Forrest Classifier - Red Wines ::

{"result": "1"}

:: XGBoost Classifier - Red Wines ::

{"result": "1"}

:: Consensus Results ::

{"result": "1", "methods": ["Random forest", "XGBoost"]}