Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | 12x 12x 12x 12x 12x 1x 1x 1x 1x 2x 2x 2x 2x 1x 1x 1x 1x 1x 1x | import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { Logger } from '@nestjs/common';
import { NotificationsService } from './notifications.service';
import { PrismaService } from '@app/modules/prisma/prisma.service';
@Processor('notifications')
export class NotificationsProcessor extends WorkerHost {
private readonly logger = new Logger(NotificationsProcessor.name);
constructor(
private notificationsService: NotificationsService,
private prisma: PrismaService,
) {
super();
}
async process(job: Job<any>): Promise<any> {
const { notificationId } = job.data;
this.logger.debug(`Processing notification delivery: ${notificationId}`);
const notification = await this.prisma.notification.findUnique({
where: { id: notificationId },
});
if (!notification) {
this.logger.warn(`Notification ${notificationId} not found`);
return;
}
// Get user's notification preferences
const prefs = await this.notificationsService.getPreferences(notification.userId);
// Send push notification if enabled
if (prefs.channels && (prefs.channels as any).push) {
await this.notificationsService.sendPush(notification.userId, notification);
}
// TODO: Send email if enabled
// if (prefs.channels && (prefs.channels as any).email) {
// await this.emailService.sendNotificationEmail(notification);
// }
// TODO: Emit via WebSocket if user is online (integrate with MessagingGateway)
// if (messagingGateway.isUserOnline(notification.userId)) {
// messagingGateway.sendToUser(notification.userId, 'notification:new', notification);
// }
this.logger.debug(`Notification ${notificationId} delivered successfully`);
}
}
|