feat: BullMQ Worker — Suppression-Check vor SMTP, ClickHouse-Event-Insert, RFC-8058-Header

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-17 12:09:04 +00:00
parent 5340e76630
commit 8bf143735e
2 changed files with 191 additions and 0 deletions

View File

@@ -0,0 +1,120 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'
vi.mock('../server/db/campaigns', () => ({
getCampaign: vi.fn(),
}))
vi.mock('../server/suppression/check', () => ({
checkSuppression: vi.fn(),
}))
vi.mock('../server/smtp/client', () => ({
sendEmail: vi.fn(),
}))
vi.mock('../server/clickhouse/client', () => ({
clickhouse: { insert: vi.fn().mockResolvedValue(undefined) },
}))
import { processEmailSendJob } from './email-send.worker'
import { getCampaign } from '../server/db/campaigns'
import { checkSuppression } from '../server/suppression/check'
import { sendEmail } from '../server/smtp/client'
import { clickhouse } from '../server/clickhouse/client'
const mockCampaign = {
id: 'campaign-1',
subject: 'Newsletter April',
htmlBody: '<p>Hallo</p>',
plainBody: 'Hallo',
name: 'April Newsletter',
status: 'sending' as const,
scheduledAt: null,
cronExpression: null,
createdAt: new Date(),
updatedAt: new Date(),
}
const jobData = {
tenantId: 'tenant1',
campaignId: 'campaign-1',
recipientEmail: 'empfaenger@example.com',
recipientHash: 'abc123hash',
}
describe('processEmailSendJob', () => {
beforeEach(() => vi.clearAllMocks())
it('sendet E-Mail wenn nicht suppressed', async () => {
vi.mocked(getCampaign).mockResolvedValue({ ok: true, data: mockCampaign })
vi.mocked(checkSuppression).mockResolvedValue(false)
vi.mocked(sendEmail).mockResolvedValue({ ok: true, data: undefined })
const result = await processEmailSendJob(jobData)
expect(result.ok).toBe(true)
expect(sendEmail).toHaveBeenCalledOnce()
expect(clickhouse.insert).toHaveBeenCalledOnce()
})
it('überspringt SMTP wenn Empfänger suppressed ist', async () => {
vi.mocked(getCampaign).mockResolvedValue({ ok: true, data: mockCampaign })
vi.mocked(checkSuppression).mockResolvedValue(true)
const result = await processEmailSendJob(jobData)
expect(result.ok).toBe(true)
expect(sendEmail).not.toHaveBeenCalled()
expect(clickhouse.insert).toHaveBeenCalledWith(
expect.objectContaining({
values: expect.arrayContaining([
expect.objectContaining({ event_type: 'suppressed' }),
]),
})
)
})
it('gibt err zurück wenn Kampagne nicht gefunden', async () => {
vi.mocked(getCampaign).mockResolvedValue({ ok: false, error: new Error('nicht gefunden') })
const result = await processEmailSendJob(jobData)
expect(result.ok).toBe(false)
if (!result.ok) expect(result.error.message).toContain('nicht gefunden')
})
it('gibt err zurück wenn SMTP fehlschlägt', async () => {
vi.mocked(getCampaign).mockResolvedValue({ ok: true, data: mockCampaign })
vi.mocked(checkSuppression).mockResolvedValue(false)
vi.mocked(sendEmail).mockResolvedValue({ ok: false, error: new Error('SMTP-Fehler') })
const result = await processEmailSendJob(jobData)
expect(result.ok).toBe(false)
if (!result.ok) expect(result.error.message).toBe('SMTP-Fehler')
})
it('ClickHouse-Event enthält recipient_hash (kein Klartext)', async () => {
vi.mocked(getCampaign).mockResolvedValue({ ok: true, data: mockCampaign })
vi.mocked(checkSuppression).mockResolvedValue(false)
vi.mocked(sendEmail).mockResolvedValue({ ok: true, data: undefined })
await processEmailSendJob(jobData)
const insertCall = vi.mocked(clickhouse.insert).mock.calls[0][0]
const eventValue = (insertCall.values as Array<Record<string, unknown>>)[0]
expect(eventValue.recipient_hash).toBe('abc123hash')
expect(JSON.stringify(eventValue)).not.toContain('empfaenger@example.com')
})
it('sendEmail erhält List-Unsubscribe-Header', async () => {
vi.mocked(getCampaign).mockResolvedValue({ ok: true, data: mockCampaign })
vi.mocked(checkSuppression).mockResolvedValue(false)
vi.mocked(sendEmail).mockResolvedValue({ ok: true, data: undefined })
await processEmailSendJob(jobData)
expect(sendEmail).toHaveBeenCalledWith(
expect.objectContaining({
listUnsubscribeHeader: expect.stringContaining('unsub'),
})
)
})
})

View File

@@ -0,0 +1,71 @@
import { Worker } from 'bullmq'
import { getCampaign } from '../server/db/campaigns'
import { checkSuppression } from '../server/suppression/check'
import { sendEmail } from '../server/smtp/client'
import { clickhouse } from '../server/clickhouse/client'
import { ok, err, type Result } from '../lib/result'
import type { EmailSendJobData } from './email-send.queue'
export async function processEmailSendJob(data: EmailSendJobData): Promise<Result<void>> {
const campaignResult = await getCampaign(data.tenantId, data.campaignId)
if (!campaignResult.ok) return err(campaignResult.error)
const campaign = campaignResult.data
// Suppression-Check ist PFLICHT — kein Opt-out-Empfänger darf E-Mail erhalten
const suppressed = await checkSuppression(data.tenantId, data.recipientEmail)
if (suppressed) {
await insertEvent('suppressed', data)
return ok(undefined)
}
const appUrl = process.env.APP_URL ?? 'http://localhost:3000'
const unsubUrl = `${appUrl}/unsub?tid=${data.tenantId}&cid=${data.campaignId}&r=${data.recipientHash}`
const sendResult = await sendEmail({
to: data.recipientEmail,
subject: campaign.subject,
html: campaign.htmlBody,
text: campaign.plainBody,
listUnsubscribeHeader: `<${unsubUrl}>`,
})
if (!sendResult.ok) return err(sendResult.error)
await insertEvent('sent', data)
return ok(undefined)
}
async function insertEvent(eventType: string, data: EmailSendJobData): Promise<void> {
await clickhouse.insert({
table: 'email_events',
values: [
{
event_type: eventType,
tenant_id: data.tenantId,
campaign_id: data.campaignId,
// Datenschutz: nur Hash wird gespeichert — keine Klartext-E-Mail-Adresse in ClickHouse
recipient_hash: data.recipientHash,
timestamp: new Date().toISOString(),
metadata: {},
},
],
format: 'JSONEachRow',
})
}
const connection = {
host: process.env.REDIS_HOST ?? 'localhost',
port: Number(process.env.REDIS_PORT ?? 6379),
}
// Worker nur außerhalb von Tests starten
if (process.env.NODE_ENV !== 'test') {
new Worker<EmailSendJobData>(
'email-send',
async (job) => {
const result = await processEmailSendJob(job.data)
if (!result.ok) throw result.error
},
{ connection, concurrency: 10 }
)
}