Blog

Server stress testing with Locust

Ekberjan Derman
Ekberjan Derman
-Aug 19, 2022

In the world of SaaS (software as a service), the scenario of having a single main server responsible for requests from multiple clients is common. Usually, one needs to know how much workload the server can handle within a given time frame. This would provide crucial information that could lead to a better server-client structure to be deployed.

Here in this post, we will focus on performing stress testing on a server that is reached by clients through gRPC-based communication mechanism. The demonstration code is based on Python.

Suppose we have an example API named example_api, with a class named ExampleService. Our gRPC server is defined in our server.py file with the following content:

import time
import grpc
import logging
from concurrent import futures
import protos.example_api_pb2_grpc as example_grpc
from example_api.service.example_service import ExampleService

MAX_MESSAGE_IN_MB = 10

options = [
    ('grpc.max_send_message_length', MAX_MESSAGE_IN_MB * 1024 * 1024),
    ('grpc.max_receive_message_length', MAX_MESSAGE_IN_MB * 1024 * 1024)
]

def start_server():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options)
    example_grpc.add_exampleAPIServicer_to_server(ExampleService(), server)

    logging.info('Starting server...')
    server.add_insecure_port('localhost:50057')
    server.start()
    logging.info('Server started successfully. Listening on port localhost:50057...')

    try:
        while True:
            time.sleep(86400)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == "__main__":
    start_server()

The gRPC proto files are generated using related gRPC tool, which can be directly referenced by official documentation.

In order to investigate the performance of this server, such as its response delay once we increase the client count, we use Locust, which is an easy to use, scriptable and scalable performance testing tool. We can define the definition of our target behavior in Python.

Installation

First, let’s install Locust by:

$ pip3 install locust==2.5.1

Here, we used the version of 2.5.1, but you may change it accordingly. If no version is provided, the latest default version will be installed.

We can check our installed Locust version by:

$ locust -V

Preparation

Now, under the directory which contains our server.py, let’s create a file named locustfile.py. Note that, this filename is important because once we run Locust, this is the default file name it will search for and carry out related tasks defined in it. If you want to use a custom file name, you should call locust with -f option in your terminal as follows:

$ locust -f custom_file.py

Some of the following codes are adopted from official documentation of Locust.

In our Locust file, we need to include the related packages first:

from locust import events, User, task
from locust.exception import LocustError
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, WorkerRunner

In order to successfully trigger the server side, we need to initialize gevent:

# patch grpc so that it uses gevent instead of asyncio
import grpc.experimental.gevent as grpc_gevent

grpc_gevent.init_gevent()

In our server.py, the function start_server is the function responsible for initiating the server. Therefore, we need to import it into our locustfile.py:

from server import start_server

@events.init.add_listener
def run_grpc_server(environment, **_kwargs):
    gevent.spawn(start_server)

Next, we need to create a class named GrpcClient as follows:

class GrpcClient:
    def __init__(self, stub):
        self._stub_class = stub.__class__
        self._stub = stub

    def __getattr__(self, name):
        func = self._stub_class.__getattribute__(self._stub, name)

        def wrapper(*args, **kwargs):
            request_meta = {
                "request_type": "grpc",
                "name": name,
                "start_time": time.time(),
                "response_length": 0,
                "exception": None,
                "context": None,
                "response": None,
            }
            start_perf_counter = time.perf_counter()
            try:
                request_meta["response"] = func(*args, **kwargs)
                request_meta["response_length"] = len(str(request_meta["response"].change_rate))
            except grpc.RpcError as e:
                request_meta["exception"] = e
            request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
            events.request.fire(**request_meta)
            return request_meta["response"]

        return wrapper

Do remember that when calculating the response length, your provided attribute must be valid (change_rate in this case). Refer to your gRPC proto file if needed.

Subsequently, we create a class named GrpcUser, which must be derived from locust.User as its base class. Also, set its abstract attribute as True to avoid instantiating it as an actual user during the testing phase. Locust needs at least one class to be derived from locust.User:

class GrpcUser(User):
    abstract = True

    stub_class = None

    def __init__(self, environment):
        super().__init__(environment)
        for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
            if attr_value is None:
                raise LocustError(f"You must specify the {attr_name}.")
        self._channel = grpc.insecure_channel(self.host)
        self._channel_closed = False
        stub = self.stub_class(self._channel)
        self.client = GrpcClient(stub)

In this example API, the class Processor in client.py is responsible for performing tasks to sending client requests to the server and receiving corresponding responses. Therefore, we need to focus on its member method to perform stress analysis. Modify this class to be inherited from our previously created GrpcUser class.

class Processor(GrpcUser):

	host = "localhost:50057"
	stub_class = example_api_grpc.exampleServiceStub
	
	def __init__(self, environment) -> None:
	    super().__init__(environment)
...

The method(s) that will be targeted by Locust should be decorated with @task. Locust will create micro threads for each method decorated with @task per user. In this scenario, our target method is run_test of our Processor class. This method corresponds to the actual main function of the original client.py of our example API:

@task
def run_test(self):
...

Note: it is challenging to transfer input parameters to functions decorated with @task in Locust. Therefore, one straightforward approach could be to create a method without any required input parameter, and handle the rest inside. For instance, suppose in the Processor class we have a function named process which requires input of np.ndarray type. It would be difficult to directly target this process function because of its required input parameter. Therefore, in case needed, one can create another function named dummy decorated with @task without any input parameter, and access to process from there.

Moreover, to stop Locust execution once the failed case ratio exceeds some threshold (e.g. 0.2), we can add the following content:

# Stopping the locust if a threshold (in this case the failure ratio) is exceeded
def checker(environment):
    while environment.runner.state not in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
        time.sleep(1)
        if environment.runner.stats.total.fail_ratio > 0.2:
            print(f"fail ratio was {environment.runner.stats.total.fail_ratio}, quitting")
            environment.runner.quit()
            return

@events.init.add_listener
def on_locust_init(environment, **_kwargs):
    if not isinstance(environment.runner, WorkerRunner):
        gevent.spawn(checker, environment)

As a result, the final locustfile.py as a whole would be as follows (suppose we’re sending images to our server and receiving some kind of processing results):

...
from example_api.config import CONFIG
import protos.example_api_pb2 as example_pb2
import protos.example_api_pb2_grpc as example_grpc

from locust import events, User, task
from locust.exception import LocustError
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, WorkerRunner

import time
import gevent

from server import start_server

# patch grpc so that it uses gevent instead of asyncio
import grpc.experimental.gevent as grpc_gevent

grpc_gevent.init_gevent()

@events.init.add_listener
def run_grpc_server(environment, **_kwargs):
    gevent.spawn(start_server)

class GrpcClient:
    def __init__(self, stub):
        self._stub_class = stub.__class__
        self._stub = stub

    def __getattr__(self, name):
        func = self._stub_class.__getattribute__(self._stub, name)

        def wrapper(*args, **kwargs):
            request_meta = {
                "request_type": "grpc",
                "name": name,
                "start_time": time.time(),
                "response_length": 0,
                "exception": None,
                "context": None,
                "response": None,
            }
            start_perf_counter = time.perf_counter()
            try:
                request_meta["response"] = func(*args, **kwargs)
                request_meta["response_length"] = len(str(request_meta["response"].change_rate))
            except grpc.RpcError as e:
                request_meta["exception"] = e
            request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
            events.request.fire(**request_meta)
            return request_meta["response"]

        return wrapper

class GrpcUser(User):
    abstract = True

    stub_class = None

    def __init__(self, environment):
        super().__init__(environment)
        for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
            if attr_value is None:
                raise LocustError(f"You must specify the {attr_name}.")
        self._channel = grpc.insecure_channel(self.host)
        self._channel_closed = False
        stub = self.stub_class(self._channel)
        self.client = GrpcClient(stub)

class Processor(GrpcUser):

    host = "localhost:50057"
    stub_class = example_grpc.exampleServiceStub

    def __init__(self, environment) -> None:
        super().__init__(environment)
        """
        Default constructor
        """
        ...

    def send_img_as_request(self, orig_frame: np.ndarray, target_frame: np.ndarray) -> float:
				...
        request = example_api.exampleAPIRequest(orig_frame=img1, target_frame=img2)
        response = self.client.detect(request)
        change_rate = response.change_rate
        return change_rate

    def process(self, image: np.ndarray) -> float:
        ....
        return change_rate

    @task
    def run_test(self):

        if CONFIG['SOURCE'] == 'video':
            cam = cv2.VideoCapture(CONFIG['VIDEO_NAME'])

            while True:
                ret, frame = cam.read()

                if not ret:
                    logging.error('failed to grab frame!')
                    break

                if frame is None:
                    logging.error('Empty frame detected!')
                    break

                change_rate = self.process(frame)

                if change_rate > self.thresh:
                    logging.debug('input image changed by {} percent!'.format(change_rate * 100))
                    result = {'id': CONFIG['ID'], 'original image': self.orig_frame,
                              'target image': frame}

                
        else:
            orig_img = cv2.imread('original_img.jpg')
            target_img = cv2.imread('target_img.jpg')
            _rate = self.send_img_as_request(orig_img, target_img)

            if _rate > self.thresh:
                logging.debug('input image changed by {} percent!'.format(_rate * 100))

# Stopping the locust if a threshold (in this case the failure ratio) is exceeded
def checker(environment):
    while environment.runner.state not in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
        time.sleep(1)
        if environment.runner.stats.total.fail_ratio > 0.2:
            print(f"fail ratio was {environment.runner.stats.total.fail_ratio}, quitting")
            environment.runner.quit()
            return

@events.init.add_listener
def on_locust_init(environment, **_kwargs):
    if not isinstance(environment.runner, WorkerRunner):
        gevent.spawn(checker, environment)

Start Testing

Once the locustfile.py is ready, open a terminal under the same directory and call locust:

$ locust

There should be some messages like this in your terminal:

[2022-01-11 20:50:11,488] my-laptop/INFO/locust.main: Starting web interface at [<http://0.0.0.0:8089>](<http://0.0.0.0:8089/>) (accepting connections from all network interfaces)
[2022-01-11 20:50:11,496] my-laptop/INFO/locust.main: Starting Locust 2.5.1
[2022-01-11 20:50:12,438] my-laptop/INFO/root: Starting server...
[2022-01-11 20:50:12,439] my-laptop/INFO/root: Server started successfully. Listening on port localhost:50057...

In your browser, direct to http://0.0.0.0:8089, you should be able to see a page like this:

Number of users corresponds to how many users we’re going to simulate as clients. Spawn rate indicates how many users should be added per second until the amount indicated by Number of users is reached. Host represents the host address of our target server, in this case, it is localhost:50057, as our selected API communicates through 50057 port via gRPC.

If we want to simulate a maximum of 40 clients sending requests to our sever simultaneously, with a spawn rate of 1, after clicking the Start swarming button, Locust will start from 1 client until to 40 within 40 seconds and continue sending requests until we hit the Stop button:

The test results are quite detailed in sections under different names. One way to obtain the result as a whole is to select Download Data →Download Report. There, a detailed report similar to this one could be available:

Run without Web UI

If you need to run Locus analysis through the command line without a web UI, you should pass the headless option as follows:

$ locust -f locustfile.py --headless -u 40 -r 1 --run-time 10m
  • u: number of total users to be simulated
  • r: spawn rate
  • -run-time: total execution time for the test (10 minutes in this case)

Testing with Multiple CPU

In case we need to test using multiple CPU cores, we need one master with multiple workers. For that, first start Locust in master mode:

locust -f locustfile.py --master --expect-works 8

The master will wait until the worker count reaches –expect-works to start the actual test.

Subsequently, start locust with worker mode. Each worker will run with one CPU:

locust -f locustfile.py --worker --master-host=192.168.0.1

Here, master-host indicates the IP address of the machine hosting the master. If the master and all workers are on the same machine, no need to pass this parameter.

Cheers!

#Blog Post
#intenseye
Schedule a Demo