Aller au contenu principal

Queue System with Bull

JifiJs includes a powerful queue system built on Bull, a Redis-based queue for Node.js. This allows you to process tasks asynchronously in the background, improving application performance and user experience.

Why Use Queues?

Queues are essential for:

  • Email sending - Don't make users wait for emails to be sent
  • File processing - Handle uploads, conversions, and optimizations asynchronously
  • Data exports - Generate large reports without blocking requests
  • Scheduled tasks - Execute jobs at specific times
  • Rate-limited operations - Control API calls to third-party services
  • Heavy computations - Offload CPU-intensive tasks

Architecture

┌─────────────┐
│ Client │
└──────┬──────┘
│ HTTP Request

┌─────────────────┐
│ Controller │
└──────┬──────────┘
│ Add Job

┌─────────────────┐ ┌─────────────┐
│ QueueService │◄────►│ Redis │
└──────┬──────────┘ └─────────────┘

│ Process Job

┌─────────────────┐
│ Job Handler │
└─────────────────┘

QueueService

JifiJs provides a built-in QueueService that wraps Bull queues with additional functionality.

Getting Queue Instance

import QueueService from './utils/bases/queues/queue.service';

// Get or create a queue instance
const emailQueue = QueueService.getInstance('emailQueue');
const fileQueue = QueueService.getInstance('fileQueue');

The QueueService uses a singleton pattern - calling getInstance('queueName') multiple times with the same name returns the same instance.

Queue Configuration

Queues are automatically configured from your .env file:

# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-password
REDIS_QUEUE=myapp # Base name for all queues

Adding Jobs to Queue

Basic Job

import QueueService from './utils/bases/queues/queue.service';

const emailQueue = QueueService.getInstance('email');

// Add job to queue
const job = await emailQueue.add({
type: 'welcome-email',
userId: user.id,
email: user.email,
name: user.name,
});

console.log(`Job ${job.id} added to queue`);

Job with Options

// Add job with custom options
const job = await emailQueue.add(
{
type: 'notification',
userId: user.id,
message: 'Your report is ready',
},
{
attempts: 5, // Retry up to 5 times
delay: 5000, // Wait 5 seconds before processing
priority: 1, // Higher priority (lower number = higher priority)
timeout: 30000, // Timeout after 30 seconds
removeOnComplete: true, // Remove when done
removeOnFail: false, // Keep failed jobs for debugging
}
);

Default Job Options

The QueueService sets these defaults:

{
attempts: 3, // Retry 3 times
backoff: {
type: 'exponential', // Exponential backoff
delay: 2000, // Start with 2 seconds
},
removeOnComplete: 10, // Keep last 10 completed jobs
removeOnFail: 5, // Keep last 5 failed jobs
timeout: 60000, // 60 second timeout
}

Processing Jobs

Job Processor

Create a job processor to handle queue items:

import QueueService from './utils/bases/queues/queue.service';
import { Job } from 'bull';

const emailQueue = QueueService.getInstance('email');

// Process jobs from the queue
emailQueue.process(async (job: Job) => {
console.log(`Processing job ${job.id}`);
const { type, email, name } = job.data;

if (type === 'welcome-email') {
await sendWelcomeEmail(email, name);
} else if (type === 'notification') {
await sendNotification(email, job.data.message);
}

return { success: true, processedAt: new Date() };
});

Typed Job Processing

interface EmailJobData {
type: 'welcome' | 'reset' | 'notification';
email: string;
name: string;
data?: any;
}

const emailQueue = QueueService.getInstance('email');

emailQueue.process<EmailJobData>(async (job: Job<EmailJobData>) => {
const { type, email, name, data } = job.data;

switch (type) {
case 'welcome':
await sendWelcomeEmail(email, name);
break;
case 'reset':
await sendPasswordReset(email, data.token);
break;
case 'notification':
await sendNotification(email, data.message);
break;
}

return { success: true };
});

Job Lifecycle and Events

The QueueService automatically sets up event handlers for monitoring:

Event Handlers

// error - Queue error occurred
queue.on('error', (error) => {
console.error('Queue error:', error);
});

// waiting - Job added to queue
queue.on('waiting', (jobId) => {
console.log(`Job ${jobId} is waiting`);
});

// active - Job started processing
queue.on('active', (job) => {
console.log(`Job ${job.id} started (Attempt ${job.attemptsMade + 1})`);
});

// completed - Job finished successfully
queue.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});

// failed - Job failed
queue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message);
});

// stalled - Job stalled and will be reprocessed
queue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled. Reprocessing...`);
});

// removed - Job removed from queue
queue.on('removed', (job) => {
console.log(`Job ${job.id} removed`);
});

These event handlers are automatically configured by the QueueService.

Job States

Jobs go through these states:

  1. waiting - Job added to queue, waiting to be processed
  2. active - Job is currently being processed
  3. completed - Job finished successfully
  4. failed - Job failed (may retry based on attempts setting)
  5. delayed - Job scheduled for future execution
  6. stalled - Job stalled (locks expired, will be reprocessed)

Queue Management

Get Job Counts

const counts = await emailQueue.getJobCounts();
console.log(counts);
// {
// waiting: 10,
// active: 2,
// completed: 145,
// failed: 3,
// delayed: 5
// }

Get Queue Statistics

const stats = await emailQueue.getStats();
console.log(stats);
// {
// name: 'email',
// isActive: true,
// waiting: 10,
// active: 2,
// completed: 145,
// failed: 3,
// delayed: 5
// }

Clean Old Jobs

// Remove completed jobs older than 1 hour
await emailQueue.clean(3600 * 1000, 'completed');

// Remove failed jobs older than 24 hours
await emailQueue.clean(24 * 3600 * 1000, 'failed');

// Remove all completed jobs
await emailQueue.clean(0, 'completed');

Access Bull Queue Directly

const bullQueue = emailQueue.getQueue();

// Now you can use any Bull queue methods
const job = await bullQueue.getJob('job-id');
await bullQueue.pause();
await bullQueue.resume();
const isPaused = await bullQueue.isPaused();

Real-World Examples

Email Queue Service

import QueueService from './utils/bases/queues/queue.service';
import mailService from './utils/bases/mail.service';
import { Job } from 'bull';

interface EmailJobData {
type: 'mail';
data: {
receivers: string | string[];
subject: string;
content: string;
html: any;
sender: any;
attachments: any[];
};
}

class EmailQueueService {
private queue: QueueService;

constructor() {
this.queue = QueueService.getInstance('mailQueue');
this.setupProcessor();
}

private setupProcessor() {
this.queue.process<EmailJobData>(async (job: Job<EmailJobData>) => {
const { type, data } = job.data;

if (type === 'mail') {
const { receivers, subject, content, html, sender, attachments } = data;

const result = await mailService.send(
receivers,
subject,
content,
html,
sender,
attachments
);

if (result.error) {
throw new Error(result.message);
}

return result;
}
});
}

async sendEmail(
receivers: string | string[],
subject: string,
content: string,
html: any = null,
sender: any = null,
attachments: any[] = []
) {
return await this.queue.add<EmailJobData>({
type: 'mail',
data: { receivers, subject, content, html, sender, attachments },
});
}

async getStats() {
return await this.queue.getStats();
}
}

export default new EmailQueueService();

File Processing Queue

import QueueService from './utils/bases/queues/queue.service';
import { Job } from 'bull';

interface FileJobData {
type: 'image-resize' | 'pdf-generate' | 'csv-export';
userId: string;
filePath: string;
options: any;
}

const fileQueue = QueueService.getInstance('fileProcessing');

fileQueue.process<FileJobData>(async (job: Job<FileJobData>) => {
const { type, userId, filePath, options } = job.data;

switch (type) {
case 'image-resize':
await resizeImage(filePath, options);
break;
case 'pdf-generate':
await generatePDF(userId, options);
break;
case 'csv-export':
await generateCSVExport(userId, filePath);
break;
}

return { success: true, processedAt: new Date() };
});

// Add job to queue
export async function queueImageResize(userId: string, imagePath: string) {
return await fileQueue.add({
type: 'image-resize',
userId,
filePath: imagePath,
options: { width: 800, height: 600 },
});
}

Error Handling and Retries

Exponential Backoff

The QueueService uses exponential backoff by default:

Attempt 1: Immediate
Attempt 2: 2 seconds delay
Attempt 3: 4 seconds delay (2^1 * 2000ms)
Attempt 4: 8 seconds delay (2^2 * 2000ms)

Custom Retry Logic

emailQueue.process(async (job: Job) => {
try {
await processJob(job.data);
return { success: true };
} catch (error) {
// Check if it's a permanent error
if (error.code === 'INVALID_DATA') {
// Don't retry, mark as failed
await job.moveToFailed({ message: 'Invalid data' }, true);
return;
}

// Temporary error - throw to trigger retry
throw error;
}
});

Graceful Shutdown

The QueueService automatically handles graceful shutdown:

// Shutdown is automatically handled on SIGTERM and SIGINT
process.on('SIGTERM', () => {
// QueueService.closeAll() is called automatically
});

Manual Shutdown

import QueueService from './utils/bases/queues/queue.service';

// Close specific queue
const emailQueue = QueueService.getInstance('email');
await emailQueue.close();

// Close all queues
await QueueService.closeAll();

Monitoring and Debugging

Check Queue Health

async function checkQueueHealth() {
const emailQueue = QueueService.getInstance('email');
const stats = await emailQueue.getStats();

if (stats.failed > 100) {
console.error('Too many failed jobs!');
// Send alert
}

if (stats.waiting > 1000) {
console.warn('Queue backlog detected');
// Scale workers
}

return stats;
}

// Run every minute
setInterval(checkQueueHealth, 60000);

Get Specific Job

const bullQueue = emailQueue.getQueue();
const job = await bullQueue.getJob('job-id');

if (job) {
console.log('Job state:', await job.getState());
console.log('Job data:', job.data);
console.log('Attempts:', job.attemptsMade);
console.log('Failed reason:', job.failedReason);
}

Best Practices

1. Use Separate Queues

Create dedicated queues for different job types:

const emailQueue = QueueService.getInstance('email');
const fileQueue = QueueService.getInstance('file-processing');
const notificationQueue = QueueService.getInstance('notifications');

2. Set Appropriate Timeouts

await queue.add(data, {
timeout: 300000, // 5 minutes for heavy operations
});

3. Monitor Queue Health

setInterval(async () => {
const stats = await queue.getStats();

if (stats.failed > 100) {
// Alert: Too many failed jobs
}

if (stats.waiting > 1000) {
// Alert: Queue backlog
}
}, 60000);

4. Idempotent Jobs

Design jobs to be safely retried:

emailQueue.process(async (job: Job) => {
const { userId, emailType } = job.data;

// Check if already processed
const sent = await EmailLog.findOne({ userId, emailType });
if (sent) {
return { status: 'already_sent' };
}

await sendEmail(userId, emailType);

// Record that we sent it
await EmailLog.create({ userId, emailType, sentAt: new Date() });

return { status: 'sent' };
});

5. Clean Up Old Jobs

// Clean up completed jobs daily
setInterval(async () => {
await emailQueue.clean(24 * 3600 * 1000, 'completed');
await emailQueue.clean(7 * 24 * 3600 * 1000, 'failed');
}, 24 * 3600 * 1000);

Configuration Reference

Redis Configuration (.env)

REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-password
REDIS_QUEUE=myapp_base

Default Job Options

{
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
removeOnComplete: 10,
removeOnFail: 5,
timeout: 60000,
}

Queue Settings

{
lockDuration: 30000, // Job lock expires after 30s
stalledInterval: 30000, // Check for stalled jobs every 30s
maxStalledCount: 1 // Move to failed after 1 stall
}

Pro Tip: Always monitor your queues in production. Failed jobs can accumulate and cause issues. Set up alerts for high failure rates and implement regular cleanup of old jobs.