Building Scalable Job Queues with NestJS and Bull Queue
BullQueue+NestJS

Building Scalable Job Queues with NestJS and Bull Queue

https://abhex.dev/blogs/nestjs-bull-queue

Learn how to implement robust background job processing in NestJS using Bull Queue, including job scheduling, retries, progress tracking, and concurrency management.

NestJS

Bull Queue

Redis

TypeScript

Background Jobs

Performance

Background job processing is essential for handling time-consuming tasks in modern applications. In this guide, we'll explore how to implement scalable job queues using NestJS and Bull Queue, with a focus on handling complex processing requirements.

Setting Up Bull Queue

npm install @nestjs/bull bull        

Configuring Bull Module

// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379,
        password: process.env.REDIS_PASSWORD,
      },
      defaultJobOptions: {
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 1000,
        },
        removeOnComplete: true,
      },
    }),
    // Register queues
    BullModule.registerQueue(
      { name: 'video-processing' },
      { name: 'email' },
      { name: 'notifications' }
    ),
  ],
})
export class AppModule {}        

Implementing Job Processors

// video/video-processor.processor.ts
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { Job } from 'bull';

@Processor('video-processing')
export class VideoProcessor {
  private readonly logger = new Logger(VideoProcessor.name);

  @Process('transcode')
  async handleTranscode(job: Job) {
    this.logger.debug('Start transcoding...');
    this.logger.debug(job.data);

    let progress = 0;
    while (progress < 100) {
      await this.doTranscoding(job.data);
      progress += 10;
      await job.progress(progress);
    }

    return { success: true };
  }

  @Process('thumbnail')
  async handleThumbnail(job: Job) {
    this.logger.debug('Start generating thumbnail...');
    this.logger.debug(job.data);

    await this.generateThumbnail(job.data);
    return { success: true };
  }

  private async doTranscoding(data: any) {
    // Implement actual video transcoding logic
    await new Promise(resolve => setTimeout(resolve, 1000));
  }

  private async generateThumbnail(data: any) {
    // Implement thumbnail generation logic
    await new Promise(resolve => setTimeout(resolve, 500));
  }
}        

Adding Job Events and Listeners

// video/video-processor.processor.ts
import { OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';

@Processor('video-processing')
export class VideoProcessor {
  @OnQueueActive()
  onActive(job: Job) {
    this.logger.debug(
      `Processing job ${job.id} of type ${job.name}. Data: ${JSON.stringify(job.data)}`,
    );
  }

  @OnQueueCompleted()
  onComplete(job: Job, result: any) {
    this.logger.debug(
      `Completed job ${job.id} of type ${job.name}. Result: ${JSON.stringify(result)}`,
    );
  }

  @OnQueueFailed()
  onError(job: Job<any>, error: any) {
    this.logger.error(
      `Failed job ${job.id} of type ${job.name}: ${error.message}`,
      error.stack,
    );
  }
}        

Implementing the Queue Service

// video/video.service.ts
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';

@Injectable()
export class VideoService {
  constructor(
    @InjectQueue('video-processing') private videoQueue: Queue,
  ) {}

  async processVideo(videoData: any) {
    // Add transcoding job
    const transcodingJob = await this.videoQueue.add(
      'transcode',
      {
        videoId: videoData.id,
        format: videoData.format,
        resolution: videoData.resolution,
      },
      {
        priority: 1,
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 1000,
        },
        timeout: 3600000, // 1 hour
      },
    );

    // Add thumbnail job
    const thumbnailJob = await this.videoQueue.add(
      'thumbnail',
      {
        videoId: videoData.id,
        timestamp: videoData.thumbnailTime,
      },
      {
        priority: 2,
        attempts: 2,
      },
    );

    return {
      transcodingJobId: transcodingJob.id,
      thumbnailJobId: thumbnailJob.id,
    };
  }

  async getJobStatus(jobId: string) {
    const job = await this.videoQueue.getJob(jobId);
    if (!job) {
      return { status: 'not_found' };
    }

    const state = await job.getState();
    const progress = job.progress();

    return {
      id: job.id,
      status: state,
      progress,
      data: job.data,
      returnvalue: job.returnvalue,
      failedReason: job.failedReason,
    };
  }
}        

Implementing Concurrent Processing

// video/video-processor.processor.ts
@Processor('video-processing')
export class VideoProcessor {
  constructor() {
    // Configure worker threads
    const worker = new Worker('./video-worker.js', {
      workerData: {
        // Worker configuration
      },
    });

    worker.on('message', (result) => {
      // Handle worker results
    });
  }

  @Process({
    name: 'transcode',
    concurrency: 3, // Process 3 jobs simultaneously
  })
  async handleTranscode(job: Job) {
    // Implementation
  }
}        

Implementing Job Scheduling

// video/video.service.ts
async scheduleVideoProcessing(videoData: any, scheduleTime: Date) {
  const delay = scheduleTime.getTime() - Date.now();
  
  return this.videoQueue.add(
    'transcode',
    {
      videoId: videoData.id,
      format: videoData.format,
    },
    {
      delay,
      repeat: {
        cron: '0 0 * * *', // Daily at midnight
      },
    },
  );
}        

Implementing Progress Tracking

// video/video.gateway.ts
import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';
import { OnQueueProgress } from '@nestjs/bull';

@WebSocketGateway()
export class VideoGateway {
  @WebSocketServer()
  server: Server;

  @OnQueueProgress()
  onProgress(job: Job, progress: number) {
    this.server.to(job.data.userId).emit('processingProgress', {
      jobId: job.id,
      progress,
    });
  }
}        

Best Practices and Optimization

  • Implement proper error handling and retries
  • Use job priorities for important tasks
  • Implement job cleanup strategies
  • Monitor queue health and performance
  • Implement rate limiting for job processing
  • Use job events for monitoring and logging

By following these patterns and implementing proper error handling and monitoring, you can create a robust job processing system that scales well with your application's needs.

要查看或添加评论,请登录

Abdul Basit的更多文章

社区洞察

其他会员也浏览了