Queue System with Bull
JifiJs includes a powerful queue system built on Bull, a Redis-based queue for Node.js. This allows you to process tasks asynchronously in the background, improving application performance and user experience.
π― Why Use Queues?β
Queues are essential for:
- Email sending - Don't make users wait for emails to be sent
- File processing - Handle uploads, conversions, and optimizations asynchronously
- Data exports - Generate large reports without blocking requests
- Scheduled tasks - Execute jobs at specific times
- Rate-limited operations - Control API calls to third-party services
- Heavy computations - Offload CPU-intensive tasks
ποΈ Architectureβ
βββββββββββββββ
β Client β
ββββββββ¬βββββββ
β HTTP Request
βΌ
βββββββββββββββββββ
β Controller β
ββββββββ¬βββββββββββ
β Add Job
βΌ
βββββββββββββββββββ βββββββββββββββ
β Bull Queue βββββββΊβ Redis β
ββββββββ¬βββββββββββ βββββββββββββββ
β
β Process Job
βΌ
βββββββββββββββββββ
β Job Handler β
βββββββββββββββββββ
π¦ Queue Configurationβ
Environment Setupβ
Configure Redis connection in .env:
# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
# Queue Configuration
QUEUE_REDIS_HOST=localhost
QUEUE_REDIS_PORT=6379
QUEUE_REDIS_DB=1
Queue Instanceβ
Create a queue instance in queues/email.queue.ts:
import Bull from 'bull';
import config from '../config';
export const emailQueue = new Bull('email-processing', {
redis: {
host: config.queue.redis.host,
port: config.queue.redis.port,
password: config.queue.redis.password,
db: config.queue.redis.db,
},
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 5000,
},
removeOnComplete: true,
removeOnFail: false,
},
});
π Creating Jobsβ
Adding Jobs to the Queueβ
import { emailQueue } from '../queues/email.queue';
// In your service or controller
export class UserService {
async registerUser(userData: IUser) {
const user = await User.create(userData);
// Add email job to queue
await emailQueue.add('welcome-email', {
userId: user.id,
email: user.email,
name: user.name,
}, {
delay: 5000, // Send after 5 seconds
priority: 1, // Higher priority = processed first
});
return user;
}
}
Job Prioritiesβ
// High priority (1) - processed first
await emailQueue.add('critical-alert', data, { priority: 1 });
// Normal priority (default)
await emailQueue.add('newsletter', data);
// Low priority (10) - processed last
await emailQueue.add('cleanup', data, { priority: 10 });
βοΈ Processing Jobsβ
Job Processorβ
Create a processor in queues/processors/email.processor.ts:
import { Job } from 'bull';
import { emailQueue } from '../email.queue';
import { EmailService } from '../../services/email.service';
interface WelcomeEmailData {
userId: string;
email: string;
name: string;
}
// Process welcome emails
emailQueue.process('welcome-email', async (job: Job<WelcomeEmailData>) => {
const { email, name } = job.data;
console.log(`Processing welcome email for ${email}`);
try {
await EmailService.sendWelcomeEmail(email, name);
// Update progress
await job.progress(100);
return { success: true, email };
} catch (error) {
console.error('Failed to send welcome email:', error);
throw error; // Will trigger retry
}
});
// Process password reset emails
emailQueue.process('password-reset', async (job: Job) => {
const { email, token } = job.data;
await EmailService.sendPasswordResetEmail(email, token);
return { success: true };
});
Concurrent Processingβ
Control how many jobs are processed simultaneously:
// Process 5 jobs concurrently
emailQueue.process('newsletter', 5, async (job: Job) => {
await EmailService.sendNewsletter(job.data);
});
π Job Lifecycleβ
Job Statesβ
Jobs go through several states:
- waiting - Job added to queue
- active - Job being processed
- completed - Job finished successfully
- failed - Job failed (will retry if configured)
- delayed - Job scheduled for future execution
Monitoring Jobsβ
import { emailQueue } from './queues/email.queue';
// Get job counts
const counts = await emailQueue.getJobCounts();
console.log(counts);
// {
// waiting: 10,
// active: 2,
// completed: 145,
// failed: 3,
// delayed: 5
// }
// Get specific job
const job = await emailQueue.getJob('job-id');
console.log(job.progress());
console.log(job.getState());
π― Advanced Featuresβ
Scheduled Jobs (Cron-like)β
import { emailQueue } from './queues/email.queue';
// Run every day at 9 AM
emailQueue.add('daily-report', {}, {
repeat: {
cron: '0 9 * * *',
tz: 'America/New_York',
},
});
// Run every hour
emailQueue.add('cleanup', {}, {
repeat: {
every: 3600000, // milliseconds
},
});
Job Eventsβ
Listen to queue events:
emailQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result:`, result);
});
emailQueue.on('failed', (job, error) => {
console.error(`Job ${job.id} failed:`, error.message);
// Send alert to monitoring system
});
emailQueue.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
emailQueue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled`);
});
Rate Limitingβ
Limit job processing rate:
const apiQueue = new Bull('api-calls', {
redis: redisConfig,
limiter: {
max: 100, // Max 100 jobs
duration: 60000 // Per 60 seconds
},
});
Job Progress Trackingβ
emailQueue.process('bulk-email', async (job: Job) => {
const emails = job.data.recipients;
const total = emails.length;
for (let i = 0; i < total; i++) {
await sendEmail(emails[i]);
// Update progress
const progress = Math.round((i + 1) / total * 100);
await job.progress(progress);
}
return { sent: total };
});
π‘οΈ Error Handlingβ
Retry Strategyβ
await emailQueue.add('important-task', data, {
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000, // Start with 2s, then 4s, 8s, 16s, 32s
},
});
Custom Retry Logicβ
emailQueue.process('api-call', async (job: Job) => {
try {
return await makeApiCall(job.data);
} catch (error) {
if (error.statusCode === 429) {
// Rate limited, wait longer
throw new Error('Rate limited, retry later');
}
if (error.statusCode >= 500) {
// Server error, retry
throw error;
}
// Client error, don't retry
job.moveToFailed({ message: 'Client error' }, true);
}
});
π Queue Managementβ
Cleaning Old Jobsβ
// Remove completed jobs older than 24 hours
await emailQueue.clean(24 * 3600 * 1000, 'completed');
// Remove failed jobs older than 7 days
await emailQueue.clean(7 * 24 * 3600 * 1000, 'failed');
Pausing and Resumingβ
// Pause the queue
await emailQueue.pause();
// Resume processing
await emailQueue.resume();
// Check if paused
const isPaused = await emailQueue.isPaused();
Emptying the Queueβ
// Remove all waiting jobs
await emailQueue.empty();
// Remove all jobs (waiting, active, delayed, failed)
await emailQueue.obliterate({ force: true });
π¨ Bull Board (UI Dashboard)β
Add a web UI to monitor queues:
import { createBullBoard } from '@bull-board/api';
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { emailQueue } from './queues/email.queue';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [new BullAdapter(emailQueue)],
serverAdapter,
});
app.use('/admin/queues', serverAdapter.getRouter());
Access the dashboard at http://localhost:3000/admin/queues
π Best Practicesβ
1. Use Separate Queuesβ
Create dedicated queues for different job types:
const emailQueue = new Bull('emails');
const fileQueue = new Bull('file-processing');
const notificationQueue = new Bull('notifications');
2. Set Appropriate Timeoutsβ
await queue.add('long-task', data, {
timeout: 300000, // 5 minutes
});
3. Monitor Queue Healthβ
// Check queue health periodically
setInterval(async () => {
const counts = await emailQueue.getJobCounts();
if (counts.failed > 100) {
// Alert: Too many failed jobs
console.error('Queue health check failed');
}
}, 60000);
4. Idempotent Jobsβ
Design jobs to be safely retried:
emailQueue.process('send-email', async (job: Job) => {
const { userId, emailId } = job.data;
// Check if already sent
const sent = await EmailLog.findOne({ userId, emailId });
if (sent) {
return { status: 'already_sent' };
}
await sendEmail(userId);
// Record that we sent it
await EmailLog.create({ userId, emailId, sentAt: new Date() });
return { status: 'sent' };
});
π Debuggingβ
Enable debug logging:
// In development
if (process.env.NODE_ENV === 'development') {
emailQueue.on('global:completed', (jobId) => {
console.log(`Job ${jobId} completed`);
});
emailQueue.on('global:failed', (jobId, err) => {
console.error(`Job ${jobId} failed:`, err);
});
}
π Related Documentationβ
π‘ Pro Tip: Always monitor your queues in production. Failed jobs can accumulate and cause issues. Set up alerts for high failure rates.