How Scheduling Scheduled Us Weeks Behind Our Schedule?

How Scheduling Scheduled Us Weeks Behind Our Schedule?

Initially, we believed the problem was straightforward. Our prior configuration utilized goroutines to schedule database queries, enabling us to operate the entire system with minimal setup using SQLite and a Go service. This seemed quite uncomplicated. However, when we chose to incorporate this feature into our SaaS platform, we were unaware at the beginning that we would be encountering a fresh set of difficulties related to dynamic scheduling and simultaneous task execution.

We needed a way to sync data in a scheduled manner from the client's data warehouse to our data store.

Challenges with replicating the previous setup:

To understand the issue, let's check the previous architecture more closely. Our previous architecture permitted users to link to their respective data warehouses, run database queries, and synchronize subscribers using a preset timeline (e.g., hourly, daily).

This scenario appeared straightforward initially, given that we used an embedded SQLite database within the service and anticipated limited occurrences of simultaneous executions, as most customers opted to update individual tables.

Also, since we used Golang, we didn’t need a separate process to handle scheduling as it was done efficiently using goroutines and a lightweight library Asynq built around this concept.

Goroutines and sqlite based setup for task scheduling

However, the complexity surfaced upon moving this functionality to our SAAS platform. We faced 2 challenges, dynamic scheduling from different processes and concurrent execution of those schedules, which would execute the query, process the results, and sync the subscribers.

How did we arrive at the solution?

Before going through how we arrived at the solution, let’s understand the problems more deeply.

Dynamic Scheduling

Imagine a scenario where anyone can schedule a task at any time, and it can be set to run via a cron schedule or at fixed intervals. The scheduler should be capable of prioritizing tasks at all times and managing them efficiently. Since the schedule can change dynamically, the heartbeat process should adapt to every change.

Conceptually, this can be achieved with the help of Redis’s sorted set data structure.

Redis's Sorted Set is a powerful data structure that significantly aids in scheduling tasks by enabling efficient storage and retrieval of tasks based on their execution time or priority.

The Sorted Set stores elements as unique strings (members) along with their associated scores, which are floating-point numbers. Internally In Redis, sorted sets are implemented using a combination of a hash table and a skip list data structure.

The hash table provides fast access to elements based on their value, while the skip list maintains the sorted order of the elements based on their scores. This dual structure allows Redis to perform operations on sorted sets efficiently.

In the context of task scheduling, scores typically represent the execution time or priority of tasks. Redis maintains the Sorted Set in ascending order based on the scores The priority of a task is determined by its score, with lower scores having higher priority. This allows for fast lookup and retrieval of tasks that are due for execution. If two tasks have the same scores, they are sorted lexicographically.

In the context of Redis-based schedulers, they would use Redis’s ZADD commands (to add task representation in sorted sets) and ZRANGEBYSCORE (to retrieve the highest priority task from the sorted set).

Let’s understand with an example:

Suppose we have a task scheduling system with different priorities (low, medium, high) and execution times. We want to schedule tasks such that high-priority tasks are executed before low-priority tasks, even if a low-priority task has an earlier execution time. To achieve this, we can use a scoring algorithm that combines the priority and execution time into a single score.

Example scoring algorithm:

def calculate_score(priority, execution_time):

    # Convert execution_time to a UNIX timestamp
    unix_timestamp = execution_time.timestamp()

    # Assign numeric values to priorities (lower value means higher priority)
    priority_values = {'low': 3, 'medium': 2, 'high': 1}

    # Calculate the score by combining the priority value and UNIX timestamp
    score = unix_timestamp + (10**9 * priority_values[priority])
    return score        

Now, let's add tasks to the Redis Sorted Set using the ZADD command:

import redis

# Connect to Redis
r = redis.Redis()

# Add tasks with their calculated scores
r.zadd('scheduled_tasks', {
    'Task A (low)': calculate_score('low', datetime(2023, 3, 15, 10, 0, 0)),
    'Task B (medium)': calculate_score('medium', datetime(2023, 3, 15, 10, 15, 0)),
    'Task C (high)': calculate_score('high', datetime(2023, 3, 15, 10, 30, 0)),
    'Task D (low)': calculate_score('low', datetime(2023, 3, 15, 10, 45, 0)),
})        

To retrieve tasks due for execution, we can use the ZRANGEBYSCORE command with the current UNIX timestamp as the minimum score and a large value (e.g., +inf) as the maximum score:

import datetime

# Get the current UNIX timestamp
current_timestamp = datetime.datetime.utcnow().timestamp()

# Retrieve tasks due for execution
due_tasks = r.zrangebyscore('scheduled_tasks', current_timestamp, '+inf')        

This approach ensures that tasks with higher priority are executed before tasks with lower priority, even if they have later execution times.

Now that the scoring and scheduling part is clear, let’s try to understand how we can leverage this to build a robust system that can schedule tasks from a separate producer process and utilize scheduler, worker, and Redis to function in sync.

  • We would need a producer process/ processes to put the task in Redis using ZADD in Redis’s sorted set.
  • We would need a scheduler that would continuously poll for tasks in Redis using ZRANGEBYSCORE and current timestamp and assign the task to existing workers.
  • Finally, we would need a worker process to execute the task and produce heartbeats when the task is completed so that the scheduler can update the execution progress.

In our case, the API server was our producer.

Implementation:

We evaluated various libraries that would utilize this unique functionality provided by Redis, and we found that rq_scheduler in Python ticks all the boxes. We also evaluated:

  1. APScheduler: It lacked a separate process for scheduler and worker, which is required since we would ideally want to decouple these processes from our main API server.
  2. Celerybeat: Celerybeat didn’t support dynamic scheduling and hence wasn’t ideal.
  3. RQ-scheduler: This implements exactly the algorithm explained above and was ideal for our use case; also, its availability in Django was a plus.

Now this is how the final architecture looked like:?

New architecture for task scheduling with redis based schedulers

For Concurrent DB Writes

Our previous setup, SQLite, wouldn’t work for distributed applications like ours because:

  1. Concurrency Limitations: SQLite's file-based locking can cause contention issues in scenarios with high concurrent writes.
  2. File-based Locking: SQLite's reliance on file-level locks impedes concurrent write operations in a distributed environment.
  3. Limited Scalability: SQLite's serverless design becomes a bottleneck as the number of nodes and concurrent writes increases.
  4. ACID Compliance Challenges: Ensuring ACID properties across distributed nodes introduces complexities for SQLite.
  5. Data Integrity Concerns: File-based locking can lead to data corruption or loss of integrity in distributed systems.
  6. No Built-in Network Protocol: SQLite's direct communication with the local file system limits its use in distributed environments.

Considering the situation where we had to handle distributed writes from multiple processes on the same DB. We chose to use Redis or Postgres for our distributed application. Since each query execution involved handling multiple states and processing results in batches to alleviate server load, we opted for Postgres as our database.

Postgres solves all the abovementioned issues related to distributed and concurrent writes, and scalability support, which was ideal for our use case. The only drawback was potentially a little extra compute cost to cloud providers for Postgres usage. Still, the cost paid for a bad customer experience is much larger and potentially catastrophic.

Well, after architecting the solution efficiently, processing the queries, which can sometimes even fetch a billion rows (or more), was another critical problem to solve, which we solved by creating a separate service to process the tasks as seen in the architecture diagram, which processed the tasks and sent events to SuprSend internally for subscriber addition.

SuprSend is a powerful product that helps you build your notification system with ease. This is how it works:

SuprSend capabilities


要查看或添加评论,请登录

社区洞察

其他会员也浏览了