Mastering Continuous Integration: Building a Scalable Test Automation System from Scratch in Python.

Mastering Continuous Integration: Building a Scalable Test Automation System from Scratch in Python.

What is a Continuous Integration System?

When building software, it's important to ensure that new features or bug fixes are safe and work as expected. To do this, we run tests on our code. Developers might test their changes locally, but they can't always test the software on every system it will run on. Additionally, as the number of tests grows, running them all locally becomes time-consuming. This is where continuous integration (CI) systems come in.

A Continuous Integration (CI) system is a dedicated tool that automatically tests new code changes. Whenever a developer commits code to the repository, the CI system fetches these changes, runs the tests, and reports the results. It ensures that new code doesn't break existing functionality. A good CI system is also reliable, meaning it can recover from failures and continue testing.

To handle heavy workloads, such as when multiple commits are made quickly, the CI system should distribute and run tests in parallel. This ensures that test results are delivered promptly. This project will showcase a simple, distributed CI system designed to be flexible and easy to expand.

Things that we will discuss:

  • Project Limitations and Notes
  • Testing Schedule
  • Test Results Reporting
  • Architecture Overview
  • Single Machine vs. Distributed Systems
  • Setup and FilesInitial Setup
  • The Dispatcher (dispatcher.py)Monitoring Test RunnersRedistributing TasksDispatching TestsLeveraging a Threaded ServerHandling Commands
  • The Test Runner (test_runner.py)Initialization and Dispatcher MonitoringThreading for Concurrent TasksCommunication with the DispatcherExecuting TestsRunning the Test Runner
  • Error Handling
  • Enhancements and Extensions

Project Limitations and Notes

This project is designed to use Git for the code repository that needs testing. While it uses basic source code management functions, if you're familiar with other version control systems like SVN or Mercurial, you should still find it easy to follow.

To keep things simple, the system will only run tests located in a?tests?directory within the repository. Also, instead of monitoring a remote master repository (as is typical for CI systems), this example project uses a local repository for simplicity.

Testing Schedule

Continuous Integration (CI) systems usually monitor a repository for changes. They can run tests after every commit or at specific intervals. For this project, the CI system will check for changes periodically (e.g., every five seconds). It will only test the latest commit made during that interval, not all commits.

Real-world CI systems often use notifications like GitHub's "post-commit hooks" to trigger testing, but this project uses a simpler observer model. The observer checks for changes rather than waiting for a notification.

Test Results Reporting

CI systems typically report test results in a way that's easy for developers to access, such as on a webpage. In this project, results are stored as files on the local file system of the dispatcher process.

Architecture Overview

This project demonstrates a simplified architecture for a CI system, consisting of three main components:

  1. Observer:?Watches the repository for changes.
  2. Test Job Dispatcher:?Assigns commits to test runners.
  3. Test Runner:?Executes the tests for a specific commit.

Single Machine vs. Distributed Systems

A simple CI system might run all components as a single process on one machine. However, this setup has limitations:

  • It can't handle high workloads, leading to a backlog of tests.
  • It's not fault-tolerant; if the machine fails, no tests will run.

To address these issues, this project separates each component into its own process. This allows:

  • Parallel Test Execution:?Multiple test runners can work simultaneously to prevent backlogs.
  • Distributed Architecture:?Components can run on different machines and communicate over a network using sockets.

Each component is assigned a unique host/port address for communication. If a machine fails, you can replace it with another, making the system more resilient.

Setup and Files

This project includes the following files:

  • Python Files:repo_observer.py: Monitors the repository for changes.dispatcher.py: Distributes test jobs.test_runner.py: Runs tests.helpers.py: Shared communication functions for all components.
  • Bash Scripts:?Simplify running Git and system commands.
  • Tests Directory:?Contains example tests—one that passes and one that fails.

Initial Setup

To start, all components will run locally on one computer. This avoids network-related issues while you're learning how the system works. Later, you can run each component on separate machines for a true distributed setup.

Setting Up the Repository for Continuous Integration

To get started with a Continuous Integration (CI) system, we need a repository for the CI system to monitor for changes. Let's create a repository named?test_repo:

$ mkdir test_repo
$ cd test_repo
$ git init        

This will serve as the master repository, where developers commit their code. The CI system will pull this repository, monitor for new commits, and run tests. The component responsible for detecting new commits is the?repository observer.

Adding Initial Commits

To monitor for new changes, the repository observer requires at least one commit in the master repository. Let's add our example tests as the initial commit:

$ cp -r /this/directory/tests /path/to/test_repo/
$ cd /path/to/test_repo
$ git add tests/
$ git commit -m "add tests"        

With this, the master repository is ready. Next, we'll create clones for the repository observer and the test runner components.

Creating Clones for CI Components

The repository observer requires its own clone of the repository to monitor for changes. Create a clone named?test_repo_clone_obs:

$ git clone /path/to/test_repo test_repo_clone_obs        

Similarly, the test runner needs its own clone to run tests on specific commits. Create another clone,?test_repo_clone_runner:

$ git clone /path/to/test_repo test_repo_clone_runner        

Key Components of the CI System

  1. Repository Observer (repo_observer.py) The repository observer continuously monitors the repository for new commits and notifies the dispatcher when changes are detected. Unlike some version control systems with built-in notifications, this observer relies on periodic polling to detect updates.
  2. The?update_repo.sh?Script
  3. Notifying the Dispatcher

The observer runs indefinitely, polling the repository and notifying the dispatcher when new commits are found. To stop the process, you can use a keyboard interrupt (Ctrl+C) or send a kill signal.

The Dispatcher (dispatcher.py)

The dispatcher is a dedicated service responsible for delegating testing tasks. It listens on a specified port for requests from test runners and the repository observer. Test runners can register themselves with the dispatcher, which assigns test runners to handle new commits provided by the repository observer. The dispatcher also monitors the status of test runners and redistributes tasks if issues arise.

When?dispatch.py?is executed, the?serve?function initializes the dispatcher. It parses command-line arguments to determine the host and port on which the dispatcher will operate:

def serve():
    parser = argparse.ArgumentParser()
    parser.add_argument("--host",
                        help="dispatcher's host, default is localhost",
                        default="localhost",
                        action="store")
    parser.add_argument("--port",
                        help="dispatcher's port, default is 8888",
                        default=8888,
                        action="store")
    args = parser.parse_args()        

The dispatcher starts a server and spawns two threads: one for monitoring test runners (runner_checker) and another for redistributing pending tasks (redistribute).

    server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
    print(f"serving on {args.host}:{args.port}")

    runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
    redistributor = threading.Thread(target=redistribute, args=(server,))
    try:
        runner_heartbeat.start()
        redistributor.start()
        server.serve_forever()
    except (KeyboardInterrupt, Exception):
        server.dead = True
        runner_heartbeat.join()
        redistributor.join()        

Monitoring Test Runners

The?runner_checker?function ensures all registered test runners remain responsive. If a runner becomes unresponsive, it is removed from the pool, and its associated tasks are reassigned.

def runner_checker(server):
    def manage_commit_lists(runner):
        for commit, assigned_runner in list(server.dispatched_commits.items()):
            if assigned_runner == runner:
                del server.dispatched_commits[commit]
                server.pending_commits.append(commit)
                break
        server.runners.remove(runner)

    while not server.dead:
        time.sleep(1)
        for runner in list(server.runners):
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            try:
                response = helpers.communicate(runner["host"],
                                               int(runner["port"]),
                                               "ping")
                if response != "pong":
                    print(f"removing runner {runner}")
                    manage_commit_lists(runner)
            except socket.error:
                manage_commit_lists(runner)        

Redistributing Tasks

The?redistribute?function assigns pending commit IDs to available runners. If a commit ID exists in?pending_commits, it is dispatched using?dispatch_tests.

def redistribute(server):
    while not server.dead:
        for commit in list(server.pending_commits):
            print("running redistribute")
            dispatch_tests(server, commit)
            time.sleep(5)        

Dispatching Tests

The?dispatch_tests?function identifies an available runner and assigns it a commit ID. It keeps retrying until a runner becomes available.

def dispatch_tests(server, commit_id):
    while True:
        print("trying to dispatch to runners")
        for runner in server.runners:
            response = helpers.communicate(runner["host"],
                                           int(runner["port"]),
                                           f"runtest:{commit_id}")
            if response == "OK":
                print(f"adding id {commit_id}")
                server.dispatched_commits[commit_id] = runner
                if commit_id in server.pending_commits:
                    server.pending_commits.remove(commit_id)
                return
        time.sleep(2)        

Leveraging a Threaded Server

The dispatcher uses a?ThreadingTCPServer?class to handle simultaneous connections from multiple sources. This enables the dispatcher to communicate concurrently with test runners and the repository observer.

class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    runners = []  # Pool of registered test runners
    dead = False  # Signals other threads to stop
    dispatched_commits = {}  # Tracks assignments of commits to runners
    pending_commits = []  # Tracks unassigned commits        

Handling Commands

The dispatcher defines a custom handler (DispatcherHandler) to process incoming requests. The handler uses the?handle?method to parse and execute commands. Supported commands include:

  • status: Verifies if the dispatcher is running.
  • register: Registers a test runner by adding it to the pool.
  • dispatch: Assigns a commit ID to a runner for testing.
  • results: Receives and processes test results from runners.

Here’s a simplified look at the?DispatcherHandler?class:

class DispatcherHandler(SocketServer.BaseRequestHandler):
    command_re = re.compile(r"(\w+)(:.+)*")
    BUF_SIZE = 1024

    def handle(self):
        self.data = self.request.recv(self.BUF_SIZE).strip()
        command_groups = self.command_re.match(self.data)
        if not command_groups:
            self.request.sendall("Invalid command")
            return
        command = command_groups.group(1)

        if command == "status":
            self.request.sendall("OK")
        elif command == "register":
            address = command_groups.group(2)
            host, port = re.findall(r":(\w*)", address)
            runner = {"host": host, "port": port}
            self.server.runners.append(runner)
            self.request.sendall("OK")
        elif command == "dispatch":
            commit_id = command_groups.group(2)[1:]
            if not self.server.runners:
                self.request.sendall("No runners are registered")
            else:
                self.request.sendall("OK")
                dispatch_tests(self.server, commit_id)
        elif command == "results":
            results = command_groups.group(2)[1:]
            # Process results here
            self.request.sendall("Results received")        

By combining these components, the dispatcher effectively manages and coordinates testing tasks, ensuring reliability and scalability.

The Test Runner (test_runner.py)

The?Test Runner?is designed to execute tests for a given commit ID and relay the results back to the?Dispatcher Server. Its role is crucial in ensuring commits are validated systematically. The?test_runner.py?interacts exclusively with the dispatcher, which assigns commit IDs for testing and receives the corresponding test outcomes.

Initialization and Dispatcher Monitoring

Upon execution,?test_runner.py?invokes the?serve?function, initiating the test runner server. Additionally, it spawns a thread to run the?dispatcher_checker?function. This function monitors the dispatcher server's availability, pinging it every five seconds. If the dispatcher becomes unresponsive, the test runner gracefully shuts down, as it depends on the dispatcher for tasks and reporting.

def dispatcher_checker(server):
    while not server.dead:
        time.sleep(5)
        if (time.time() - server.last_communication) > 10:
            try:
                response = helpers.communicate(
                    server.dispatcher_server["host"],
                    int(server.dispatcher_server["port"]),
                    "status"
                )
                if response != "OK":
                    print("Dispatcher is no longer functional")
                    server.shutdown()
                    return
            except socket.error as e:
                print(f"Can't communicate with dispatcher: {e}")
                server.shutdown()
                return        

Threading for Concurrent Tasks

The test runner employs a?ThreadingTCPServer, enabling multi-threaded operations. This is essential as the test runner needs to manage test execution while simultaneously responding to dispatcher pings.

class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    dispatcher_server = None  # Stores dispatcher server details
    last_communication = None  # Tracks last contact with the dispatcher
    busy = False  # Indicates whether the runner is occupied
    dead = False  # Status flag for server termination        

Communication with the Dispatcher

The communication protocol involves two primary messages:

  1. Ping: Ensures the test runner is active and responsive.
  2. RunTest: Assigns a commit ID for testing.

The?TestHandler?class processes these commands. For a?ping?request, it acknowledges with a?pong?and updates its last communication timestamp.

class TestHandler(SocketServer.BaseRequestHandler):
    ...
    def handle(self):
        ...
        if command == "ping":
            print("Ping received")
            self.server.last_communication = time.time()
            self.request.sendall("pong")        

For?runtest:<commit ID>, the test runner checks its availability. If idle, it accepts the task, marks itself as busy, and initiates the test job on a separate thread to maintain responsiveness.

elif command == "runtest":
    print(f"Received runtest command: Am I busy? {self.server.busy}")
    if self.server.busy:
        self.request.sendall("BUSY")
    else:
        self.request.sendall("OK")
        commit_id = command_groups.group(2)[1:]
        self.server.busy = True
        self.run_tests(commit_id, self.server.repo_folder)
        self.server.busy = False        

Executing Tests

The?run_tests?function orchestrates the test execution workflow:

  1. Updates the repository to the specified commit using the?test_runner_script.sh?script.
  2. Executes tests using Python's?unittest?framework.
  3. Collects results and sends them back to the dispatcher.

def run_tests(self, commit_id, repo_folder):
    output = subprocess.check_output(["./test_runner_script.sh", repo_folder, commit_id])
    print(output)
    test_folder = os.path.join(repo_folder, "tests")
    suite = unittest.TestLoader().discover(test_folder)
    result_file = open("results", "w")
    unittest.TextTestRunner(result_file).run(suite)
    result_file.close()
    result_file = open("results", "r")
    output = result_file.read()
    helpers.communicate(
        self.server.dispatcher_server["host"],
        int(self.server.dispatcher_server["port"]),
        f"results:{commit_id}:{len(output)}:{output}"
    )        

Running the Test Runner

To execute?test_runner.py, provide it with a cloned repository for testing. By default, it listens on a port within the range 8900-9000 and connects to the dispatcher at?localhost:8888. You can override these settings using optional arguments like?--host,?--port, and?--dispatcher-server.

Example commands:

$ python dispatcher.py
$ python test_runner.py <path/to/test_repo_clone_runner>
$ python repo_observer.py --dispatcher-server=localhost:8888 <path/to/repo_clone_obs>        

Error Handling

The system includes mechanisms to address process failures:

  • If the?test runner?stops unexpectedly, the dispatcher removes it from the pool.
  • If the?dispatcher?goes down, the repository observer and test runners will halt their operations.
  • Test runners can recover gracefully by redistributing tasks among available resources.

Enhancements and Extensions

This CI system, while functional, offers opportunities for improvement:

  • Per-Commit Testing: Ensure every commit undergoes testing instead of only the latest.
  • Resilient Test Runners: Allow test runners to pause during dispatcher down-times and resume upon recovery.

By distributing responsibilities across processes and enabling socket-based communication, this architecture lays the groundwork for a scalable and reliable CI system.

要查看或添加评论,请登录

Shahwaiz Bukhari的更多文章

社区洞察

其他会员也浏览了