In the world of SaaS (software as a service), the scenario of having a single main server responsible for requests from multiple clients is common. Usually, one needs to know how much workload the server can handle within a given time frame. This would provide crucial information that could lead to a better server-client structure to be deployed. Here in this post, we will focus on performing server stress testing on a server that is reached by clients through a gRPC-based communication mechanism. The demonstration code is based on Python.
Suppose we have an example API named example_api
, with a class named ExampleService. Our gRPC server is defined in our server.py
file with the following content:
import time
import grpc
import logging
from concurrent import futures
import protos.example_api_pb2_grpc as example_grpc
from example_api.service.example_service import ExampleService
MAX_MESSAGE_IN_MB = 10
options = [
('grpc.max_send_message_length', MAX_MESSAGE_IN_MB * 1024 * 1024),
('grpc.max_receive_message_length', MAX_MESSAGE_IN_MB * 1024 * 1024)
]
def start_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)
example_grpc.add_exampleAPIServicer_to_server(ExampleService(), server)
logging.info('Starting server...')
server.add_insecure_port('localhost:50057')
server.start()
logging.info('Server started successfully. Listening on port localhost:50057...')
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
server.stop(0)
if __name__ == "__main__":
start_server()
The gRPC proto files are generated using the related gRPC tool, which can be directly referenced by official documentation.
In order to investigate the performance of this server, such as its response delay once we increase the client count, we use Locust, which is an easy-to-use, scriptable, and scalable performance testing tool. We can define the definition of our target behavior in Python.
Installation
First, let’s install Locust by:
$ pip3 install locust==2.5.1
Here, we used version 2.5.1, but you may change it accordingly. If no version is provided, the latest default version will be installed.
We can check our installed Locust version by:
$ locust -V
Preparation
Now, under the directory which contains our server.py
, let’s create a file named locustfile.py
. Note that, this filename is important because once we run Locust, this is the default file name it will search for and carry out related tasks defined in it. If you want to use a custom file name, you should call locust with -f
option in your terminal as follows:
$ locust -f custom_file.py
Some of the following codes are adopted from the official documentation of Locust.
In our Locust file, we need to include the related packages first:
from locust import events, User, task
from locust.exception import LocustError
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, WorkerRunner
In order to successfully trigger the server side, we need to initialize gevent
:
# patch grpc so that it uses gevent instead of asyncio
import grpc.experimental.gevent as grpc_gevent
grpc_gevent.init_gevent()
In our server.py
, the function start_server
is the function responsible for initiating the server. Therefore, we need to import it into our locustfile.py
:
from server import start_server
@events.init.add_listener
def run_grpc_server(environment, **_kwargs):
gevent.spawn(start_server)
Next, we need to create a class named GrpcClient
as follows:
class GrpcClient:
def __init__(self, stub):
self._stub_class = stub.__class__
self._stub = stub
def __getattr__(self, name):
func = self._stub_class.__getattribute__(self._stub, name)
def wrapper(*args, **kwargs):
request_meta = {
"request_type": "grpc",
"name": name,
"start_time": time.time(),
"response_length": 0,
"exception": None,
"context": None,
"response": None,
}
start_perf_counter = time.perf_counter()
try:
request_meta["response"] = func(*args, **kwargs)
request_meta["response_length"] = len(str(request_meta["response"].change_rate))
except grpc.RpcError as e:
request_meta["exception"] = e
request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
events.request.fire(**request_meta)
return request_meta["response"]
return wrapper
Do remember that when calculating the response length, your provided attribute must be valid (change_rate
in this case). Refer to your gRPC proto file if needed.
Subsequently, we create a class named GrpcUser
, which must be derived from locust.User
as its base class. Also, set its abstract attribute as True
to avoid instantiating it as an actual user during the testing phase. Locust needs at least one class to be derived from locust.User
:
class GrpcUser(User):
abstract = True
stub_class = None
def __init__(self, environment):
super().__init__(environment)
for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
if attr_value is None:
raise LocustError(f"You must specify the {attr_name}.")
self._channel = grpc.insecure_channel(self.host)
self._channel_closed = False
stub = self.stub_class(self._channel)
self.client = GrpcClient(stub)
In this example API, the class Processor
in client.py
is responsible for performing tasks to sending client requests to the server and receiving corresponding responses. Therefore, we need to focus on its member method to perform stress analysis. Modify this class to be inherited from our previously created GrpcUser
class.
class Processor(GrpcUser):
host = "localhost:50057"
stub_class = example_api_grpc.exampleServiceStub
def __init__(self, environment) -> None:
super().__init__(environment)
...
The method(s) that will be targeted by Locust should be decorated with @task
. Locust will create micro threads for each method decorated with @task
per user. In this scenario, our target method is run_test
of our Processor
class. This method corresponds to the actual main function of the original client.py
of our example API:
@task
def run_test(self):
...
Note: it is challenging to transfer input parameters to functions decorated with @task
in Locust. Therefore, one straightforward approach could be to create a method without any required input parameter and handle the rest inside. For instance, suppose in the Processor class, we have a function named process which requires an input of np.ndarray
type. It would be difficult to directly target this process function because of its required input parameter. Therefore, in case needed, one can create another function named dummy
decorated with @task
without any input parameter, and access to process
from there.
Moreover, to stop Locust execution once the failed case ratio exceeds some threshold (e.g. 0.2), we can add the following content:
# Stopping the locust if a threshold (in this case the failure ratio) is exceeded
def checker(environment):
while environment.runner.state not in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
time.sleep(1)
if environment.runner.stats.total.fail_ratio > 0.2:
print(f"fail ratio was {environment.runner.stats.total.fail_ratio}, quitting")
environment.runner.quit()
return
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if not isinstance(environment.runner, WorkerRunner):
gevent.spawn(checker, environment)
As a result, the final locustfile.py
as a whole would be as follows (suppose we’re sending images to our server and receiving some kind of processing results):
...
from example_api.config import CONFIG
import protos.example_api_pb2 as example_pb2
import protos.example_api_pb2_grpc as example_grpc
from locust import events, User, task
from locust.exception import LocustError
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, WorkerRunner
import time
import gevent
from server import start_server
# patch grpc so that it uses gevent instead of asyncio
import grpc.experimental.gevent as grpc_gevent
grpc_gevent.init_gevent()
@events.init.add_listener
def run_grpc_server(environment, **_kwargs):
gevent.spawn(start_server)
class GrpcClient:
def __init__(self, stub):
self._stub_class = stub.__class__
self._stub = stub
def __getattr__(self, name):
func = self._stub_class.__getattribute__(self._stub, name)
def wrapper(*args, **kwargs):
request_meta = {
"request_type": "grpc",
"name": name,
"start_time": time.time(),
"response_length": 0,
"exception": None,
"context": None,
"response": None,
}
start_perf_counter = time.perf_counter()
try:
request_meta["response"] = func(*args, **kwargs)
request_meta["response_length"] = len(str(request_meta["response"].change_rate))
except grpc.RpcError as e:
request_meta["exception"] = e
request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
events.request.fire(**request_meta)
return request_meta["response"]
return wrapper
class GrpcUser(User):
abstract = True
stub_class = None
def __init__(self, environment):
super().__init__(environment)
for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
if attr_value is None:
raise LocustError(f"You must specify the {attr_name}.")
self._channel = grpc.insecure_channel(self.host)
self._channel_closed = False
stub = self.stub_class(self._channel)
self.client = GrpcClient(stub)
class Processor(GrpcUser):
host = "localhost:50057"
stub_class = example_grpc.exampleServiceStub
def __init__(self, environment) -> None:
super().__init__(environment)
"""
Default constructor
"""
...
def send_img_as_request(self, orig_frame: np.ndarray, target_frame: np.ndarray) -> float:
...
request = example_api.exampleAPIRequest(orig_frame=img1, target_frame=img2)
response = self.client.detect(request)
change_rate = response.change_rate
return change_rate
def process(self, image: np.ndarray) -> float:
....
return change_rate
@task
def run_test(self):
if CONFIG['SOURCE'] == 'video':
cam = cv2.VideoCapture(CONFIG['VIDEO_NAME'])
while True:
ret, frame = cam.read()
if not ret:
logging.error('failed to grab frame!')
break
if frame is None:
logging.error('Empty frame detected!')
break
change_rate = self.process(frame)
if change_rate > self.thresh:
logging.debug('input image changed by {} percent!'.format(change_rate * 100))
result = {'id': CONFIG['ID'], 'original image': self.orig_frame,
'target image': frame}
else:
orig_img = cv2.imread('original_img.jpg')
target_img = cv2.imread('target_img.jpg')
_rate = self.send_img_as_request(orig_img, target_img)
if _rate > self.thresh:
logging.debug('input image changed by {} percent!'.format(_rate * 100))
# Stopping the locust if a threshold (in this case the failure ratio) is exceeded
def checker(environment):
while environment.runner.state not in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
time.sleep(1)
if environment.runner.stats.total.fail_ratio > 0.2:
print(f"fail ratio was {environment.runner.stats.total.fail_ratio}, quitting")
environment.runner.quit()
return
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if not isinstance(environment.runner, WorkerRunner):
gevent.spawn(checker, environment)
Start Testing
Once the locustfile.py
is ready, open a terminal under the same directory and call locust:
$ locust
There should be some messages like this in your terminal:
[2022-01-11 20:50:11,488] my-laptop/INFO/locust.main: Starting web interface at [<http://0.0.0.0:8089>](<http://0.0.0.0:8089/>) (accepting connections from all network interfaces)
[2022-01-11 20:50:11,496] my-laptop/INFO/locust.main: Starting Locust 2.5.1
[2022-01-11 20:50:12,438] my-laptop/INFO/root: Starting server...
[2022-01-11 20:50:12,439] my-laptop/INFO/root: Server started successfully. Listening on port localhost:50057...
In your browser, direct to http://0.0.0.0:8089, you should be able to see a page like this:

Number of users
corresponds to how many users we’re going to simulate as clients. Spawn rate
indicates how many users should be added per second until the amount indicated by Number of users
is reached. Host represents the host address of our target server, in this case, it is, as our selected API communicates through 50057 port via gRPC.
If we want to simulate a maximum of 40 clients sending requests to our server simultaneously, with a spawn rate of 1, after clicking the Start swarming
button, Locust will start from 1 client until 40 within 40 seconds and continue sending requests until we hit the Stop
button:

The test results are quite detailed in sections under different names. One way to obtain the result as a whole is to select Download Data →Download Report. There, a detailed report similar to this one could be available:


Run without Web UI
If you need to run Locus analysis through the command line without a web UI, you should pass the headless option as follows:
$ locust -f locustfile.py --headless -u 40 -r 1 --run-time 10m
- u: number of total users to be simulated
- r: spawn rate
- -run-time: total execution time for the test (10 minutes in this case)
Testing with Multiple CPU
In case we need to test using multiple CPU cores, we need one master with multiple workers. For that, first start Locust in master mode:
locust -f locustfile.py --master --expect-works 8
The master will wait until the worker count reaches –expect-works to start the actual test.
Subsequently, start locust with worker mode. Each worker will run with one CPU:
locust -f locustfile.py --worker --master-host=192.168.0.1
Here, master-host indicates the IP address of the machine hosting the master. If the master and all workers are on the same machine, no need to pass this parameter.
Cheers!