Implemented complete MVP for geofencing functionality with database, backend logic, MQTT integration, and API endpoints. **Phase 1: Database & Core Logic** - scripts/init-geofence-db.js: Database initialization for Geofence tables - lib/types.ts: TypeScript types for Geofence, GeofenceEvent, GeofenceStatus - lib/geofence-engine.ts: Core geofencing logic (Haversine distance, state tracking) - lib/geofence-db.ts: Database layer with CRUD operations - package.json: Added db:init:geofence script **Phase 2: MQTT Integration & Email Notifications** - emails/geofence-enter.tsx: React Email template for enter events - emails/geofence-exit.tsx: React Email template for exit events - lib/email-renderer.ts: Added geofence email rendering functions - lib/geofence-notifications.ts: Notification service for geofence events - lib/mqtt-subscriber.ts: Integrated automatic geofence checking on location updates **Phase 3: Minimal API** - app/api/geofences/route.ts: GET (list) and POST (create) endpoints - app/api/geofences/[id]/route.ts: DELETE endpoint - All endpoints with authentication and ownership checks **MVP Simplifications:** - No zone limit enforcement (unlimited for all users) - No notification flags (always send Enter + Exit emails) - Device assignment required (no NULL device logic) - Circular geofences only **Features:** ✅ Automatic geofence detection on MQTT location updates ✅ Email notifications for enter/exit events ✅ State tracking to prevent duplicate events ✅ REST API for geofence management ✅ Non-blocking async processing 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
248 lines
6.8 KiB
TypeScript
248 lines
6.8 KiB
TypeScript
// MQTT Subscriber Service für OwnTracks Location Updates
|
|
import mqtt from 'mqtt';
|
|
import { locationDb, Location } from './db';
|
|
import { checkGeofences } from './geofence-engine';
|
|
import { geofenceDb } from './geofence-db';
|
|
import { sendGeofenceNotifications } from './geofence-notifications';
|
|
|
|
// OwnTracks Message Format
|
|
interface OwnTracksMessage {
|
|
_type: 'location' | 'transition' | 'waypoint' | 'lwt';
|
|
tid?: string; // Tracker ID
|
|
lat: number;
|
|
lon: number;
|
|
tst: number; // Timestamp (Unix epoch)
|
|
batt?: number; // Battery level (0-100)
|
|
vel?: number; // Velocity/Speed in km/h
|
|
acc?: number; // Accuracy
|
|
alt?: number; // Altitude
|
|
cog?: number; // Course over ground
|
|
t?: string; // Trigger (p=ping, c=region, b=beacon, u=manual, t=timer, v=monitoring)
|
|
}
|
|
|
|
class MQTTSubscriber {
|
|
private client: mqtt.MqttClient | null = null;
|
|
private reconnectAttempts = 0;
|
|
private isConnecting = false;
|
|
|
|
constructor(
|
|
private brokerUrl: string,
|
|
private username?: string,
|
|
private password?: string
|
|
) {}
|
|
|
|
/**
|
|
* Verbinde zum MQTT Broker und subscribiere Topics
|
|
*/
|
|
connect(): void {
|
|
if (this.isConnecting || this.client?.connected) {
|
|
console.log('Already connecting or connected to MQTT broker');
|
|
return;
|
|
}
|
|
|
|
this.isConnecting = true;
|
|
console.log(`Connecting to MQTT broker: ${this.brokerUrl}`);
|
|
|
|
const options: mqtt.IClientOptions = {
|
|
clean: true,
|
|
reconnectPeriod: 5000,
|
|
connectTimeout: 30000,
|
|
};
|
|
|
|
if (this.username && this.password) {
|
|
options.username = this.username;
|
|
options.password = this.password;
|
|
}
|
|
|
|
this.client = mqtt.connect(this.brokerUrl, options);
|
|
|
|
this.client.on('connect', () => {
|
|
this.isConnecting = false;
|
|
this.reconnectAttempts = 0;
|
|
console.log('✓ Connected to MQTT broker');
|
|
|
|
// Subscribiere owntracks Topic (alle Devices) mit QoS 1
|
|
this.client?.subscribe('owntracks/+/+', { qos: 1 }, (err) => {
|
|
if (err) {
|
|
console.error('Failed to subscribe to owntracks topic:', err);
|
|
} else {
|
|
console.log('✓ Subscribed to owntracks/+/+ with QoS 1');
|
|
}
|
|
});
|
|
});
|
|
|
|
this.client.on('message', (topic, message) => {
|
|
this.handleMessage(topic, message);
|
|
});
|
|
|
|
this.client.on('error', (error) => {
|
|
console.error('MQTT client error:', error);
|
|
this.isConnecting = false;
|
|
});
|
|
|
|
this.client.on('reconnect', () => {
|
|
this.reconnectAttempts++;
|
|
// Log less frequently to avoid spam (every 10th attempt after attempt 10)
|
|
if (this.reconnectAttempts <= 10 || this.reconnectAttempts % 10 === 0) {
|
|
console.log(`Reconnecting to MQTT broker (attempt ${this.reconnectAttempts})`);
|
|
}
|
|
});
|
|
|
|
this.client.on('close', () => {
|
|
console.log('MQTT connection closed');
|
|
this.isConnecting = false;
|
|
});
|
|
|
|
this.client.on('offline', () => {
|
|
console.log('MQTT client offline');
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Verarbeite eingehende MQTT Nachricht
|
|
*/
|
|
private handleMessage(topic: string, message: Buffer): void {
|
|
try {
|
|
// Parse Topic: owntracks/user/device
|
|
const parts = topic.split('/');
|
|
if (parts.length !== 3 || parts[0] !== 'owntracks') {
|
|
console.log(`Ignoring non-owntracks topic: ${topic}`);
|
|
return;
|
|
}
|
|
|
|
const [, user, device] = parts;
|
|
|
|
// Parse Message Payload
|
|
const payload = JSON.parse(message.toString()) as OwnTracksMessage;
|
|
|
|
// Nur location messages verarbeiten
|
|
if (payload._type !== 'location') {
|
|
console.log(`Ignoring non-location message type: ${payload._type}`);
|
|
return;
|
|
}
|
|
|
|
// Konvertiere zu Location Format
|
|
const location: Location = {
|
|
latitude: payload.lat,
|
|
longitude: payload.lon,
|
|
timestamp: new Date(payload.tst * 1000).toISOString(),
|
|
user_id: 0, // MQTT Devices haben user_id 0
|
|
username: device, // Device ID als username
|
|
first_name: null,
|
|
last_name: null,
|
|
marker_label: payload.tid || device,
|
|
display_time: null,
|
|
chat_id: 0,
|
|
battery: payload.batt ?? null,
|
|
speed: payload.vel ?? null,
|
|
};
|
|
|
|
// Speichere in Datenbank
|
|
const saved = locationDb.create(location);
|
|
|
|
if (saved) {
|
|
console.log(`✓ Location saved: ${device} at (${payload.lat}, ${payload.lon})`);
|
|
|
|
// Geofence-Check asynchron ausführen (nicht blockieren)
|
|
setImmediate(async () => {
|
|
try {
|
|
const events = await checkGeofences(saved, device, geofenceDb);
|
|
|
|
if (events.length > 0) {
|
|
console.log(`[Geofence] Detected ${events.length} event(s) for device ${device}`);
|
|
|
|
// Events in Datenbank speichern
|
|
const savedEvents = events.map((eventData) =>
|
|
geofenceDb.createEvent(eventData)
|
|
);
|
|
|
|
// Benachrichtigungen versenden (asynchron)
|
|
await sendGeofenceNotifications(savedEvents);
|
|
}
|
|
} catch (error) {
|
|
console.error('[Geofence] Check failed:', error);
|
|
}
|
|
});
|
|
} else {
|
|
console.log(`⚠ Duplicate location ignored: ${device}`);
|
|
}
|
|
} catch (error) {
|
|
console.error('Failed to process MQTT message:', error);
|
|
console.error('Topic:', topic);
|
|
console.error('Message:', message.toString());
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Disconnect vom MQTT Broker
|
|
*/
|
|
disconnect(): void {
|
|
if (this.client) {
|
|
console.log('Disconnecting from MQTT broker');
|
|
this.client.end();
|
|
this.client = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check ob verbunden
|
|
*/
|
|
isConnected(): boolean {
|
|
return this.client?.connected ?? false;
|
|
}
|
|
|
|
/**
|
|
* Hole Client Status Info
|
|
*/
|
|
getStatus(): {
|
|
connected: boolean;
|
|
reconnectAttempts: number;
|
|
brokerUrl: string;
|
|
} {
|
|
return {
|
|
connected: this.isConnected(),
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
brokerUrl: this.brokerUrl,
|
|
};
|
|
}
|
|
}
|
|
|
|
// Singleton Instance
|
|
let mqttSubscriber: MQTTSubscriber | null = null;
|
|
|
|
/**
|
|
* Initialisiere und starte MQTT Subscriber
|
|
*/
|
|
export function initMQTTSubscriber(): MQTTSubscriber {
|
|
if (mqttSubscriber) {
|
|
return mqttSubscriber;
|
|
}
|
|
|
|
const brokerUrl = process.env.MQTT_BROKER_URL || 'mqtt://localhost:1883';
|
|
// Use admin credentials for backend subscriber (full access to all topics)
|
|
const username = process.env.MQTT_ADMIN_USERNAME || process.env.MQTT_USERNAME;
|
|
const password = process.env.MQTT_ADMIN_PASSWORD || process.env.MQTT_PASSWORD;
|
|
|
|
mqttSubscriber = new MQTTSubscriber(brokerUrl, username, password);
|
|
mqttSubscriber.connect();
|
|
|
|
return mqttSubscriber;
|
|
}
|
|
|
|
/**
|
|
* Hole existierende MQTT Subscriber Instance
|
|
*/
|
|
export function getMQTTSubscriber(): MQTTSubscriber | null {
|
|
return mqttSubscriber;
|
|
}
|
|
|
|
/**
|
|
* Stoppe MQTT Subscriber
|
|
*/
|
|
export function stopMQTTSubscriber(): void {
|
|
if (mqttSubscriber) {
|
|
mqttSubscriber.disconnect();
|
|
mqttSubscriber = null;
|
|
}
|
|
}
|