Queue System with Bull
JifiJs includes a powerful queue system built on Bull, a Redis-based queue for Node.js. This allows you to process tasks asynchronously in the background, improving application performance and user experience.
Why Use Queues?
Queues are essential for:
- Email sending - Don't make users wait for emails to be sent
- File processing - Handle uploads, conversions, and optimizations asynchronously
- Data exports - Generate large reports without blocking requests
- Scheduled tasks - Execute jobs at specific times
- Rate-limited operations - Control API calls to third-party services
- Heavy computations - Offload CPU-intensive tasks
Architecture
┌─────────────┐
│ Client │
└──────┬──────┘
│ HTTP Request
▼
┌─────────────────┐
│ Controller │
└──────┬──────────┘
│ Add Job
▼
┌─────────────────┐ ┌─────────────┐
│ QueueService │◄────►│ Redis │
└─ ─────┬──────────┘ └─────────────┘
│
│ Process Job
▼
┌─────────────────┐
│ Job Handler │
└─────────────────┘
QueueService
JifiJs provides a built-in QueueService that wraps Bull queues with additional functionality.
Getting Queue Instance
import QueueService from './utils/bases/queues/queue.service';
// Get or create a queue instance
const emailQueue = QueueService.getInstance('emailQueue');
const fileQueue = QueueService.getInstance('fileQueue');
The QueueService uses a singleton pattern - calling getInstance('queueName') multiple times with the same name returns the same instance.
Queue Configuration
Queues are automatically configured from your .env file:
# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-password
REDIS_QUEUE=myapp # Base name for all queues
Adding Jobs to Queue
Basic Job
import QueueService from './utils/bases/queues/queue.service';
const emailQueue = QueueService.getInstance('email');
// Add job to queue
const job = await emailQueue.add({
type: 'welcome-email',
userId: user.id,
email: user.email,
name: user.name,
});
console.log(`Job ${job.id} added to queue`);
Job with Options
// Add job with custom options
const job = await emailQueue.add(
{
type: 'notification',
userId: user.id,
message: 'Your report is ready',
},
{
attempts: 5, // Retry up to 5 times
delay: 5000, // Wait 5 seconds before processing
priority: 1, // Higher priority (lower number = higher priority)
timeout: 30000, // Timeout after 30 seconds
removeOnComplete: true, // Remove when done
removeOnFail: false, // Keep failed jobs for debugging
}
);
Default Job Options
The QueueService sets these defaults:
{
attempts: 3, // Retry 3 times
backoff: {
type: 'exponential', // Exponential backoff
delay: 2000, // Start with 2 seconds
},
removeOnComplete: 10, // Keep last 10 completed jobs
removeOnFail: 5, // Keep last 5 failed jobs
timeout: 60000, // 60 second timeout
}
Processing Jobs
Job Processor
Create a job processor to handle queue items:
import QueueService from './utils/bases/queues/queue.service';
import { Job } from 'bull';
const emailQueue = QueueService.getInstance('email');
// Process jobs from the queue
emailQueue.process(async (job: Job) => {
console.log(`Processing job ${job.id}`);
const { type, email, name } = job.data;
if (type === 'welcome-email') {
await sendWelcomeEmail(email, name);
} else if (type === 'notification') {
await sendNotification(email, job.data.message);
}
return { success: true, processedAt: new Date() };
});
Typed Job Processing
interface EmailJobData {
type: 'welcome' | 'reset' | 'notification';
email: string;
name: string;
data?: any;
}
const emailQueue = QueueService.getInstance('email');
emailQueue.process<EmailJobData>(async (job: Job<EmailJobData>) => {
const { type, email, name, data } = job.data;
switch (type) {
case 'welcome':
await sendWelcomeEmail(email, name);
break;
case 'reset':
await sendPasswordReset(email, data.token);
break;
case 'notification':
await sendNotification(email, data.message);
break;
}
return { success: true };
});
Job Lifecycle and Events
The QueueService automatically sets up event handlers for monitoring:
Event Handlers
// error - Queue error occurred
queue.on('error', (error) => {
console.error('Queue error:', error);
});
// waiting - Job added to queue
queue.on('waiting', (jobId) => {
console.log(`Job ${jobId} is waiting`);
});
// active - Job started processing
queue.on('active', (job) => {
console.log(`Job ${job.id} started (Attempt ${job.attemptsMade + 1})`);
});
// completed - Job finished successfully
queue.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
// failed - Job failed
queue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message);
});
// stalled - Job stalled and will be reprocessed
queue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled. Reprocessing...`);
});
// removed - Job removed from queue
queue.on('removed', (job) => {
console.log(`Job ${job.id} removed`);
});
These event handlers are automatically configured by the QueueService.
Job States
Jobs go through these states:
- waiting - Job added to queue, waiting to be processed
- active - Job is currently being processed
- completed - Job finished successfully
- failed - Job failed (may retry based on attempts setting)
- delayed - Job scheduled for future execution
- stalled - Job stalled (locks expired, will be reprocessed)
Queue Management
Get Job Counts
const counts = await emailQueue.getJobCounts();
console.log(counts);
// {
// waiting: 10,
// active: 2,
// completed: 145,
// failed: 3,
// delayed: 5
// }
Get Queue Statistics
const stats = await emailQueue.getStats();
console.log(stats);
// {
// name: 'email',
// isActive: true,
// waiting: 10,
// active: 2,
// completed: 145,
// failed: 3,
// delayed: 5
// }
Clean Old Jobs
// Remove completed jobs older than 1 hour
await emailQueue.clean(3600 * 1000, 'completed');
// Remove failed jobs older than 24 hours
await emailQueue.clean(24 * 3600 * 1000, 'failed');
// Remove all completed jobs
await emailQueue.clean(0, 'completed');
Access Bull Queue Directly
const bullQueue = emailQueue.getQueue();
// Now you can use any Bull queue methods
const job = await bullQueue.getJob('job-id');
await bullQueue.pause();
await bullQueue.resume();
const isPaused = await bullQueue.isPaused();
Real-World Examples
Email Queue Service
import QueueService from './utils/bases/queues/queue.service';
import mailService from './utils/bases/mail.service';
import { Job } from 'bull';
interface EmailJobData {
type: 'mail';
data: {
receivers: string | string[];
subject: string;
content: string;
html: any;
sender: any;
attachments: any[];
};
}
class EmailQueueService {
private queue: QueueService;
constructor() {
this.queue = QueueService.getInstance('mailQueue');
this.setupProcessor();
}
private setupProcessor() {
this.queue.process<EmailJobData>(async (job: Job<EmailJobData>) => {
const { type, data } = job.data;
if (type === 'mail') {
const { receivers, subject, content, html, sender, attachments } = data;
const result = await mailService.send(
receivers,
subject,
content,
html,
sender,
attachments
);
if (result.error) {
throw new Error(result.message);
}
return result;
}
});
}
async sendEmail(
receivers: string | string[],
subject: string,
content: string,
html: any = null,
sender: any = null,
attachments: any[] = []
) {
return await this.queue.add<EmailJobData>({
type: 'mail',
data: { receivers, subject, content, html, sender, attachments },
});
}
async getStats() {
return await this.queue.getStats();
}
}
export default new EmailQueueService();
File Processing Queue
import QueueService from './utils/bases/queues/queue.service';
import { Job } from 'bull';
interface FileJobData {
type: 'image-resize' | 'pdf-generate' | 'csv-export';
userId: string;
filePath: string;
options: any;
}
const fileQueue = QueueService.getInstance('fileProcessing');
fileQueue.process<FileJobData>(async (job: Job<FileJobData>) => {
const { type, userId, filePath, options } = job.data;
switch (type) {
case 'image-resize':
await resizeImage(filePath, options);
break;
case 'pdf-generate':
await generatePDF(userId, options);
break;
case 'csv-export':
await generateCSVExport(userId, filePath);
break;
}
return { success: true, processedAt: new Date() };
});
// Add job to queue
export async function queueImageResize(userId: string, imagePath: string) {
return await fileQueue.add({
type: 'image-resize',
userId,
filePath: imagePath,
options: { width: 800, height: 600 },
});
}
Error Handling and Retries
Exponential Backoff
The QueueService uses exponential backoff by default:
Attempt 1: Immediate
Attempt 2: 2 seconds delay
Attempt 3: 4 seconds delay (2^1 * 2000ms)
Attempt 4: 8 seconds delay (2^2 * 2000ms)
Custom Retry Logic
emailQueue.process(async (job: Job) => {
try {
await processJob(job.data);
return { success: true };
} catch (error) {
// Check if it's a permanent error
if (error.code === 'INVALID_DATA') {
// Don't retry, mark as failed
await job.moveToFailed({ message: 'Invalid data' }, true);
return;
}
// Temporary error - throw to trigger retry
throw error;
}
});
Graceful Shutdown
The QueueService automatically handles graceful shutdown:
// Shutdown is automatically handled on SIGTERM and SIGINT
process.on('SIGTERM', () => {
// QueueService.closeAll() is called automatically
});
Manual Shutdown
import QueueService from './utils/bases/queues/queue.service';
// Close specific queue
const emailQueue = QueueService.getInstance('email');
await emailQueue.close();
// Close all queues
await QueueService.closeAll();
Monitoring and Debugging
Check Queue Health
async function checkQueueHealth() {
const emailQueue = QueueService.getInstance('email');
const stats = await emailQueue.getStats();
if (stats.failed > 100) {
console.error('Too many failed jobs!');
// Send alert
}
if (stats.waiting > 1000) {
console.warn('Queue backlog detected');
// Scale workers
}
return stats;
}
// Run every minute
setInterval(checkQueueHealth, 60000);
Get Specific Job
const bullQueue = emailQueue.getQueue();
const job = await bullQueue.getJob('job-id');
if (job) {
console.log('Job state:', await job.getState());
console.log('Job data:', job.data);
console.log('Attempts:', job.attemptsMade);
console.log('Failed reason:', job.failedReason);
}
Best Practices
1. Use Separate Queues
Create dedicated queues for different job types:
const emailQueue = QueueService.getInstance('email');
const fileQueue = QueueService.getInstance('file-processing');
const notificationQueue = QueueService.getInstance('notifications');
2. Set Appropriate Timeouts
await queue.add(data, {
timeout: 300000, // 5 minutes for heavy operations
});
3. Monitor Queue Health
setInterval(async () => {
const stats = await queue.getStats();
if (stats.failed > 100) {
// Alert: Too many failed jobs
}
if (stats.waiting > 1000) {
// Alert: Queue backlog
}
}, 60000);
4. Idempotent Jobs
Design jobs to be safely retried:
emailQueue.process(async (job: Job) => {
const { userId, emailType } = job.data;
// Check if already processed
const sent = await EmailLog.findOne({ userId, emailType });
if (sent) {
return { status: 'already_sent' };
}
await sendEmail(userId, emailType);
// Record that we sent it
await EmailLog.create({ userId, emailType, sentAt: new Date() });
return { status: 'sent' };
});
5. Clean Up Old Jobs
// Clean up completed jobs daily
setInterval(async () => {
await emailQueue.clean(24 * 3600 * 1000, 'completed');
await emailQueue.clean(7 * 24 * 3600 * 1000, 'failed');
}, 24 * 3600 * 1000);
Configuration Reference
Redis Configuration (.env)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-password
REDIS_QUEUE=myapp_base
Default Job Options
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
removeOnComplete: 10,
removeOnFail: 5,
timeout: 60000,
}
Queue Settings
{
lockDuration: 30000, // Job lock expires after 30s
stalledInterval: 30000, // Check for stalled jobs every 30s
maxStalledCount: 1 // Move to failed after 1 stall
}
Related Documentation
Pro Tip: Always monitor your queues in production. Failed jobs can accumulate and cause issues. Set up alerts for high failure rates and implement regular cleanup of old jobs.