Building Scalable Job Queues with NestJS and Bull Queue
Learn how to implement robust background job processing in NestJS using Bull Queue, including job scheduling, retries, progress tracking, and concurrency management.
NestJS
Bull Queue
Redis
TypeScript
Background Jobs
Performance
Background job processing is essential for handling time-consuming tasks in modern applications. In this guide, we'll explore how to implement scalable job queues using NestJS and Bull Queue, with a focus on handling complex processing requirements.
Setting Up Bull Queue
npm install @nestjs/bull bull
Configuring Bull Module
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
password: process.env.REDIS_PASSWORD,
},
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: true,
},
}),
// Register queues
BullModule.registerQueue(
{ name: 'video-processing' },
{ name: 'email' },
{ name: 'notifications' }
),
],
})
export class AppModule {}
领英推荐
Implementing Job Processors
// video/video-processor.processor.ts
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { Job } from 'bull';
@Processor('video-processing')
export class VideoProcessor {
private readonly logger = new Logger(VideoProcessor.name);
@Process('transcode')
async handleTranscode(job: Job) {
this.logger.debug('Start transcoding...');
this.logger.debug(job.data);
let progress = 0;
while (progress < 100) {
await this.doTranscoding(job.data);
progress += 10;
await job.progress(progress);
}
return { success: true };
}
@Process('thumbnail')
async handleThumbnail(job: Job) {
this.logger.debug('Start generating thumbnail...');
this.logger.debug(job.data);
await this.generateThumbnail(job.data);
return { success: true };
}
private async doTranscoding(data: any) {
// Implement actual video transcoding logic
await new Promise(resolve => setTimeout(resolve, 1000));
}
private async generateThumbnail(data: any) {
// Implement thumbnail generation logic
await new Promise(resolve => setTimeout(resolve, 500));
}
}
Adding Job Events and Listeners
// video/video-processor.processor.ts
import { OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';
@Processor('video-processing')
export class VideoProcessor {
@OnQueueActive()
onActive(job: Job) {
this.logger.debug(
`Processing job ${job.id} of type ${job.name}. Data: ${JSON.stringify(job.data)}`,
);
}
@OnQueueCompleted()
onComplete(job: Job, result: any) {
this.logger.debug(
`Completed job ${job.id} of type ${job.name}. Result: ${JSON.stringify(result)}`,
);
}
@OnQueueFailed()
onError(job: Job<any>, error: any) {
this.logger.error(
`Failed job ${job.id} of type ${job.name}: ${error.message}`,
error.stack,
);
}
}
Implementing the Queue Service
// video/video.service.ts
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class VideoService {
constructor(
@InjectQueue('video-processing') private videoQueue: Queue,
) {}
async processVideo(videoData: any) {
// Add transcoding job
const transcodingJob = await this.videoQueue.add(
'transcode',
{
videoId: videoData.id,
format: videoData.format,
resolution: videoData.resolution,
},
{
priority: 1,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
timeout: 3600000, // 1 hour
},
);
// Add thumbnail job
const thumbnailJob = await this.videoQueue.add(
'thumbnail',
{
videoId: videoData.id,
timestamp: videoData.thumbnailTime,
},
{
priority: 2,
attempts: 2,
},
);
return {
transcodingJobId: transcodingJob.id,
thumbnailJobId: thumbnailJob.id,
};
}
async getJobStatus(jobId: string) {
const job = await this.videoQueue.getJob(jobId);
if (!job) {
return { status: 'not_found' };
}
const state = await job.getState();
const progress = job.progress();
return {
id: job.id,
status: state,
progress,
data: job.data,
returnvalue: job.returnvalue,
failedReason: job.failedReason,
};
}
}
Implementing Concurrent Processing
// video/video-processor.processor.ts
@Processor('video-processing')
export class VideoProcessor {
constructor() {
// Configure worker threads
const worker = new Worker('./video-worker.js', {
workerData: {
// Worker configuration
},
});
worker.on('message', (result) => {
// Handle worker results
});
}
@Process({
name: 'transcode',
concurrency: 3, // Process 3 jobs simultaneously
})
async handleTranscode(job: Job) {
// Implementation
}
}
Implementing Job Scheduling
// video/video.service.ts
async scheduleVideoProcessing(videoData: any, scheduleTime: Date) {
const delay = scheduleTime.getTime() - Date.now();
return this.videoQueue.add(
'transcode',
{
videoId: videoData.id,
format: videoData.format,
},
{
delay,
repeat: {
cron: '0 0 * * *', // Daily at midnight
},
},
);
}
Implementing Progress Tracking
// video/video.gateway.ts
import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';
import { OnQueueProgress } from '@nestjs/bull';
@WebSocketGateway()
export class VideoGateway {
@WebSocketServer()
server: Server;
@OnQueueProgress()
onProgress(job: Job, progress: number) {
this.server.to(job.data.userId).emit('processingProgress', {
jobId: job.id,
progress,
});
}
}
Best Practices and Optimization
By following these patterns and implementing proper error handling and monitoring, you can create a robust job processing system that scales well with your application's needs.