Mastering Continuous Integration: Building a Scalable Test Automation System from Scratch in Python.
What is a Continuous Integration System?
When building software, it's important to ensure that new features or bug fixes are safe and work as expected. To do this, we run tests on our code. Developers might test their changes locally, but they can't always test the software on every system it will run on. Additionally, as the number of tests grows, running them all locally becomes time-consuming. This is where continuous integration (CI) systems come in.
A Continuous Integration (CI) system is a dedicated tool that automatically tests new code changes. Whenever a developer commits code to the repository, the CI system fetches these changes, runs the tests, and reports the results. It ensures that new code doesn't break existing functionality. A good CI system is also reliable, meaning it can recover from failures and continue testing.
To handle heavy workloads, such as when multiple commits are made quickly, the CI system should distribute and run tests in parallel. This ensures that test results are delivered promptly. This project will showcase a simple, distributed CI system designed to be flexible and easy to expand.
Things that we will discuss:
Project Limitations and Notes
This project is designed to use Git for the code repository that needs testing. While it uses basic source code management functions, if you're familiar with other version control systems like SVN or Mercurial, you should still find it easy to follow.
To keep things simple, the system will only run tests located in a?tests?directory within the repository. Also, instead of monitoring a remote master repository (as is typical for CI systems), this example project uses a local repository for simplicity.
Testing Schedule
Continuous Integration (CI) systems usually monitor a repository for changes. They can run tests after every commit or at specific intervals. For this project, the CI system will check for changes periodically (e.g., every five seconds). It will only test the latest commit made during that interval, not all commits.
Real-world CI systems often use notifications like GitHub's "post-commit hooks" to trigger testing, but this project uses a simpler observer model. The observer checks for changes rather than waiting for a notification.
Test Results Reporting
CI systems typically report test results in a way that's easy for developers to access, such as on a webpage. In this project, results are stored as files on the local file system of the dispatcher process.
Architecture Overview
This project demonstrates a simplified architecture for a CI system, consisting of three main components:
Single Machine vs. Distributed Systems
A simple CI system might run all components as a single process on one machine. However, this setup has limitations:
To address these issues, this project separates each component into its own process. This allows:
Each component is assigned a unique host/port address for communication. If a machine fails, you can replace it with another, making the system more resilient.
Setup and Files
This project includes the following files:
Initial Setup
To start, all components will run locally on one computer. This avoids network-related issues while you're learning how the system works. Later, you can run each component on separate machines for a true distributed setup.
Setting Up the Repository for Continuous Integration
To get started with a Continuous Integration (CI) system, we need a repository for the CI system to monitor for changes. Let's create a repository named?test_repo:
$ mkdir test_repo
$ cd test_repo
$ git init
This will serve as the master repository, where developers commit their code. The CI system will pull this repository, monitor for new commits, and run tests. The component responsible for detecting new commits is the?repository observer.
Adding Initial Commits
To monitor for new changes, the repository observer requires at least one commit in the master repository. Let's add our example tests as the initial commit:
$ cp -r /this/directory/tests /path/to/test_repo/
$ cd /path/to/test_repo
$ git add tests/
$ git commit -m "add tests"
With this, the master repository is ready. Next, we'll create clones for the repository observer and the test runner components.
Creating Clones for CI Components
The repository observer requires its own clone of the repository to monitor for changes. Create a clone named?test_repo_clone_obs:
$ git clone /path/to/test_repo test_repo_clone_obs
Similarly, the test runner needs its own clone to run tests on specific commits. Create another clone,?test_repo_clone_runner:
$ git clone /path/to/test_repo test_repo_clone_runner
Key Components of the CI System
The observer runs indefinitely, polling the repository and notifying the dispatcher when new commits are found. To stop the process, you can use a keyboard interrupt (Ctrl+C) or send a kill signal.
The Dispatcher (dispatcher.py)
The dispatcher is a dedicated service responsible for delegating testing tasks. It listens on a specified port for requests from test runners and the repository observer. Test runners can register themselves with the dispatcher, which assigns test runners to handle new commits provided by the repository observer. The dispatcher also monitors the status of test runners and redistributes tasks if issues arise.
When?dispatch.py?is executed, the?serve?function initializes the dispatcher. It parses command-line arguments to determine the host and port on which the dispatcher will operate:
def serve():
parser = argparse.ArgumentParser()
parser.add_argument("--host",
help="dispatcher's host, default is localhost",
default="localhost",
action="store")
parser.add_argument("--port",
help="dispatcher's port, default is 8888",
default=8888,
action="store")
args = parser.parse_args()
The dispatcher starts a server and spawns two threads: one for monitoring test runners (runner_checker) and another for redistributing pending tasks (redistribute).
领英推荐
server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
print(f"serving on {args.host}:{args.port}")
runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
redistributor = threading.Thread(target=redistribute, args=(server,))
try:
runner_heartbeat.start()
redistributor.start()
server.serve_forever()
except (KeyboardInterrupt, Exception):
server.dead = True
runner_heartbeat.join()
redistributor.join()
Monitoring Test Runners
The?runner_checker?function ensures all registered test runners remain responsive. If a runner becomes unresponsive, it is removed from the pool, and its associated tasks are reassigned.
def runner_checker(server):
def manage_commit_lists(runner):
for commit, assigned_runner in list(server.dispatched_commits.items()):
if assigned_runner == runner:
del server.dispatched_commits[commit]
server.pending_commits.append(commit)
break
server.runners.remove(runner)
while not server.dead:
time.sleep(1)
for runner in list(server.runners):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
response = helpers.communicate(runner["host"],
int(runner["port"]),
"ping")
if response != "pong":
print(f"removing runner {runner}")
manage_commit_lists(runner)
except socket.error:
manage_commit_lists(runner)
Redistributing Tasks
The?redistribute?function assigns pending commit IDs to available runners. If a commit ID exists in?pending_commits, it is dispatched using?dispatch_tests.
def redistribute(server):
while not server.dead:
for commit in list(server.pending_commits):
print("running redistribute")
dispatch_tests(server, commit)
time.sleep(5)
Dispatching Tests
The?dispatch_tests?function identifies an available runner and assigns it a commit ID. It keeps retrying until a runner becomes available.
def dispatch_tests(server, commit_id):
while True:
print("trying to dispatch to runners")
for runner in server.runners:
response = helpers.communicate(runner["host"],
int(runner["port"]),
f"runtest:{commit_id}")
if response == "OK":
print(f"adding id {commit_id}")
server.dispatched_commits[commit_id] = runner
if commit_id in server.pending_commits:
server.pending_commits.remove(commit_id)
return
time.sleep(2)
Leveraging a Threaded Server
The dispatcher uses a?ThreadingTCPServer?class to handle simultaneous connections from multiple sources. This enables the dispatcher to communicate concurrently with test runners and the repository observer.
class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
runners = [] # Pool of registered test runners
dead = False # Signals other threads to stop
dispatched_commits = {} # Tracks assignments of commits to runners
pending_commits = [] # Tracks unassigned commits
Handling Commands
The dispatcher defines a custom handler (DispatcherHandler) to process incoming requests. The handler uses the?handle?method to parse and execute commands. Supported commands include:
Here’s a simplified look at the?DispatcherHandler?class:
class DispatcherHandler(SocketServer.BaseRequestHandler):
command_re = re.compile(r"(\w+)(:.+)*")
BUF_SIZE = 1024
def handle(self):
self.data = self.request.recv(self.BUF_SIZE).strip()
command_groups = self.command_re.match(self.data)
if not command_groups:
self.request.sendall("Invalid command")
return
command = command_groups.group(1)
if command == "status":
self.request.sendall("OK")
elif command == "register":
address = command_groups.group(2)
host, port = re.findall(r":(\w*)", address)
runner = {"host": host, "port": port}
self.server.runners.append(runner)
self.request.sendall("OK")
elif command == "dispatch":
commit_id = command_groups.group(2)[1:]
if not self.server.runners:
self.request.sendall("No runners are registered")
else:
self.request.sendall("OK")
dispatch_tests(self.server, commit_id)
elif command == "results":
results = command_groups.group(2)[1:]
# Process results here
self.request.sendall("Results received")
By combining these components, the dispatcher effectively manages and coordinates testing tasks, ensuring reliability and scalability.
The Test Runner (test_runner.py)
The?Test Runner?is designed to execute tests for a given commit ID and relay the results back to the?Dispatcher Server. Its role is crucial in ensuring commits are validated systematically. The?test_runner.py?interacts exclusively with the dispatcher, which assigns commit IDs for testing and receives the corresponding test outcomes.
Initialization and Dispatcher Monitoring
Upon execution,?test_runner.py?invokes the?serve?function, initiating the test runner server. Additionally, it spawns a thread to run the?dispatcher_checker?function. This function monitors the dispatcher server's availability, pinging it every five seconds. If the dispatcher becomes unresponsive, the test runner gracefully shuts down, as it depends on the dispatcher for tasks and reporting.
def dispatcher_checker(server):
while not server.dead:
time.sleep(5)
if (time.time() - server.last_communication) > 10:
try:
response = helpers.communicate(
server.dispatcher_server["host"],
int(server.dispatcher_server["port"]),
"status"
)
if response != "OK":
print("Dispatcher is no longer functional")
server.shutdown()
return
except socket.error as e:
print(f"Can't communicate with dispatcher: {e}")
server.shutdown()
return
Threading for Concurrent Tasks
The test runner employs a?ThreadingTCPServer, enabling multi-threaded operations. This is essential as the test runner needs to manage test execution while simultaneously responding to dispatcher pings.
class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
dispatcher_server = None # Stores dispatcher server details
last_communication = None # Tracks last contact with the dispatcher
busy = False # Indicates whether the runner is occupied
dead = False # Status flag for server termination
Communication with the Dispatcher
The communication protocol involves two primary messages:
The?TestHandler?class processes these commands. For a?ping?request, it acknowledges with a?pong?and updates its last communication timestamp.
class TestHandler(SocketServer.BaseRequestHandler):
...
def handle(self):
...
if command == "ping":
print("Ping received")
self.server.last_communication = time.time()
self.request.sendall("pong")
For?runtest:<commit ID>, the test runner checks its availability. If idle, it accepts the task, marks itself as busy, and initiates the test job on a separate thread to maintain responsiveness.
elif command == "runtest":
print(f"Received runtest command: Am I busy? {self.server.busy}")
if self.server.busy:
self.request.sendall("BUSY")
else:
self.request.sendall("OK")
commit_id = command_groups.group(2)[1:]
self.server.busy = True
self.run_tests(commit_id, self.server.repo_folder)
self.server.busy = False
Executing Tests
The?run_tests?function orchestrates the test execution workflow:
def run_tests(self, commit_id, repo_folder):
output = subprocess.check_output(["./test_runner_script.sh", repo_folder, commit_id])
print(output)
test_folder = os.path.join(repo_folder, "tests")
suite = unittest.TestLoader().discover(test_folder)
result_file = open("results", "w")
unittest.TextTestRunner(result_file).run(suite)
result_file.close()
result_file = open("results", "r")
output = result_file.read()
helpers.communicate(
self.server.dispatcher_server["host"],
int(self.server.dispatcher_server["port"]),
f"results:{commit_id}:{len(output)}:{output}"
)
Running the Test Runner
To execute?test_runner.py, provide it with a cloned repository for testing. By default, it listens on a port within the range 8900-9000 and connects to the dispatcher at?localhost:8888. You can override these settings using optional arguments like?--host,?--port, and?--dispatcher-server.
Example commands:
$ python dispatcher.py
$ python test_runner.py <path/to/test_repo_clone_runner>
$ python repo_observer.py --dispatcher-server=localhost:8888 <path/to/repo_clone_obs>
Error Handling
The system includes mechanisms to address process failures:
Enhancements and Extensions
This CI system, while functional, offers opportunities for improvement:
By distributing responsibilities across processes and enabling socket-based communication, this architecture lays the groundwork for a scalable and reliable CI system.