diff --git a/src/queues/email-send.worker.test.ts b/src/queues/email-send.worker.test.ts new file mode 100644 index 0000000..c599936 --- /dev/null +++ b/src/queues/email-send.worker.test.ts @@ -0,0 +1,120 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' + +vi.mock('../server/db/campaigns', () => ({ + getCampaign: vi.fn(), +})) +vi.mock('../server/suppression/check', () => ({ + checkSuppression: vi.fn(), +})) +vi.mock('../server/smtp/client', () => ({ + sendEmail: vi.fn(), +})) +vi.mock('../server/clickhouse/client', () => ({ + clickhouse: { insert: vi.fn().mockResolvedValue(undefined) }, +})) + +import { processEmailSendJob } from './email-send.worker' +import { getCampaign } from '../server/db/campaigns' +import { checkSuppression } from '../server/suppression/check' +import { sendEmail } from '../server/smtp/client' +import { clickhouse } from '../server/clickhouse/client' + +const mockCampaign = { + id: 'campaign-1', + subject: 'Newsletter April', + htmlBody: '

Hallo

', + plainBody: 'Hallo', + name: 'April Newsletter', + status: 'sending' as const, + scheduledAt: null, + cronExpression: null, + createdAt: new Date(), + updatedAt: new Date(), +} + +const jobData = { + tenantId: 'tenant1', + campaignId: 'campaign-1', + recipientEmail: 'empfaenger@example.com', + recipientHash: 'abc123hash', +} + +describe('processEmailSendJob', () => { + beforeEach(() => vi.clearAllMocks()) + + it('sendet E-Mail wenn nicht suppressed', async () => { + vi.mocked(getCampaign).mockResolvedValue({ ok: true, data: mockCampaign }) + vi.mocked(checkSuppression).mockResolvedValue(false) + vi.mocked(sendEmail).mockResolvedValue({ ok: true, data: undefined }) + + const result = await processEmailSendJob(jobData) + + expect(result.ok).toBe(true) + expect(sendEmail).toHaveBeenCalledOnce() + expect(clickhouse.insert).toHaveBeenCalledOnce() + }) + + it('überspringt SMTP wenn Empfänger suppressed ist', async () => { + vi.mocked(getCampaign).mockResolvedValue({ ok: true, data: mockCampaign }) + vi.mocked(checkSuppression).mockResolvedValue(true) + + const result = await processEmailSendJob(jobData) + + expect(result.ok).toBe(true) + expect(sendEmail).not.toHaveBeenCalled() + expect(clickhouse.insert).toHaveBeenCalledWith( + expect.objectContaining({ + values: expect.arrayContaining([ + expect.objectContaining({ event_type: 'suppressed' }), + ]), + }) + ) + }) + + it('gibt err zurück wenn Kampagne nicht gefunden', async () => { + vi.mocked(getCampaign).mockResolvedValue({ ok: false, error: new Error('nicht gefunden') }) + + const result = await processEmailSendJob(jobData) + + expect(result.ok).toBe(false) + if (!result.ok) expect(result.error.message).toContain('nicht gefunden') + }) + + it('gibt err zurück wenn SMTP fehlschlägt', async () => { + vi.mocked(getCampaign).mockResolvedValue({ ok: true, data: mockCampaign }) + vi.mocked(checkSuppression).mockResolvedValue(false) + vi.mocked(sendEmail).mockResolvedValue({ ok: false, error: new Error('SMTP-Fehler') }) + + const result = await processEmailSendJob(jobData) + + expect(result.ok).toBe(false) + if (!result.ok) expect(result.error.message).toBe('SMTP-Fehler') + }) + + it('ClickHouse-Event enthält recipient_hash (kein Klartext)', async () => { + vi.mocked(getCampaign).mockResolvedValue({ ok: true, data: mockCampaign }) + vi.mocked(checkSuppression).mockResolvedValue(false) + vi.mocked(sendEmail).mockResolvedValue({ ok: true, data: undefined }) + + await processEmailSendJob(jobData) + + const insertCall = vi.mocked(clickhouse.insert).mock.calls[0][0] + const eventValue = (insertCall.values as Array>)[0] + expect(eventValue.recipient_hash).toBe('abc123hash') + expect(JSON.stringify(eventValue)).not.toContain('empfaenger@example.com') + }) + + it('sendEmail erhält List-Unsubscribe-Header', async () => { + vi.mocked(getCampaign).mockResolvedValue({ ok: true, data: mockCampaign }) + vi.mocked(checkSuppression).mockResolvedValue(false) + vi.mocked(sendEmail).mockResolvedValue({ ok: true, data: undefined }) + + await processEmailSendJob(jobData) + + expect(sendEmail).toHaveBeenCalledWith( + expect.objectContaining({ + listUnsubscribeHeader: expect.stringContaining('unsub'), + }) + ) + }) +}) diff --git a/src/queues/email-send.worker.ts b/src/queues/email-send.worker.ts new file mode 100644 index 0000000..820c2b3 --- /dev/null +++ b/src/queues/email-send.worker.ts @@ -0,0 +1,71 @@ +import { Worker } from 'bullmq' +import { getCampaign } from '../server/db/campaigns' +import { checkSuppression } from '../server/suppression/check' +import { sendEmail } from '../server/smtp/client' +import { clickhouse } from '../server/clickhouse/client' +import { ok, err, type Result } from '../lib/result' +import type { EmailSendJobData } from './email-send.queue' + +export async function processEmailSendJob(data: EmailSendJobData): Promise> { + const campaignResult = await getCampaign(data.tenantId, data.campaignId) + if (!campaignResult.ok) return err(campaignResult.error) + const campaign = campaignResult.data + + // Suppression-Check ist PFLICHT — kein Opt-out-Empfänger darf E-Mail erhalten + const suppressed = await checkSuppression(data.tenantId, data.recipientEmail) + if (suppressed) { + await insertEvent('suppressed', data) + return ok(undefined) + } + + const appUrl = process.env.APP_URL ?? 'http://localhost:3000' + const unsubUrl = `${appUrl}/unsub?tid=${data.tenantId}&cid=${data.campaignId}&r=${data.recipientHash}` + + const sendResult = await sendEmail({ + to: data.recipientEmail, + subject: campaign.subject, + html: campaign.htmlBody, + text: campaign.plainBody, + listUnsubscribeHeader: `<${unsubUrl}>`, + }) + + if (!sendResult.ok) return err(sendResult.error) + + await insertEvent('sent', data) + return ok(undefined) +} + +async function insertEvent(eventType: string, data: EmailSendJobData): Promise { + await clickhouse.insert({ + table: 'email_events', + values: [ + { + event_type: eventType, + tenant_id: data.tenantId, + campaign_id: data.campaignId, + // Datenschutz: nur Hash wird gespeichert — keine Klartext-E-Mail-Adresse in ClickHouse + recipient_hash: data.recipientHash, + timestamp: new Date().toISOString(), + metadata: {}, + }, + ], + format: 'JSONEachRow', + }) +} + +const connection = { + host: process.env.REDIS_HOST ?? 'localhost', + port: Number(process.env.REDIS_PORT ?? 6379), +} + +// Worker nur außerhalb von Tests starten +if (process.env.NODE_ENV !== 'test') { + new Worker( + 'email-send', + async (job) => { + const result = await processEmailSendJob(job.data) + if (!result.ok) throw result.error + }, + { connection, concurrency: 10 } + ) +}