You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

489 lines
17 KiB
TypeScript

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

//// <reference types="bun-types" />
import type { Database } from 'bun:sqlite';
import { CommandService } from './services/command';
import { GroupSyncService } from './services/group-sync';
import { ResponseQueue } from './services/response-queue';
import { TaskService } from './tasks/service';
import { WebhookManager } from './services/webhook-manager';
import { normalizeWhatsAppId, isGroupId } from './utils/whatsapp';
import { ensureUserExists, db } from './db';
import { ContactsService } from './services/contacts';
import { Migrator } from './db/migrator';
import { RateLimiter } from './services/rate-limit';
import { RemindersService } from './services/reminders';
import { Metrics } from './services/metrics';
import { MaintenanceService } from './services/maintenance';
import { IdentityService } from './services/identity';
import { AllowedGroups } from './services/allowed-groups';
// Bun is available globally when running under Bun runtime
declare global {
var Bun: typeof import('bun');
}
export const REQUIRED_ENV = [
'EVOLUTION_API_URL',
'EVOLUTION_API_KEY',
'EVOLUTION_API_INSTANCE',
'CHATBOT_PHONE_NUMBER',
'WEBHOOK_URL'
];
type WebhookPayload = {
event: string;
instance: string;
data: any;
// Other fields from Evolution API
};
export class WebhookServer {
static dbInstance: Database = db;
private static getBaseUrl(request: Request): string {
const proto = request.headers.get('x-forwarded-proto') || 'http';
const host = request.headers.get('x-forwarded-host') || request.headers.get('host');
return `${proto}://${host}`;
}
private static getMessageText(message: any): string {
if (!message || typeof message !== 'object') return '';
const text =
message.conversation ||
message?.extendedTextMessage?.text ||
message?.imageMessage?.caption ||
message?.videoMessage?.caption ||
'';
return typeof text === 'string' ? text.trim() : '';
}
static async handleRequest(request: Request): Promise<Response> {
// Health check endpoint y métricas
const url = new URL(request.url);
if (url.pathname.endsWith('/metrics')) {
if (request.method !== 'GET') {
return new Response('🚫 Method not allowed', { status: 405 });
}
if (!Metrics.enabled()) {
return new Response('Metrics disabled', { status: 404 });
}
const format = (process.env.METRICS_FORMAT || 'prom').toLowerCase() === 'json' ? 'json' : 'prom';
const body = Metrics.render(format as any);
return new Response(body, {
status: 200,
headers: { 'Content-Type': format === 'json' ? 'application/json' : 'text/plain; version=0.0.4' }
});
}
if (url.pathname.endsWith('/health')) {
// /health?full=1 devuelve JSON con detalles
if (url.searchParams.get('full') === '1') {
try {
const rowG = WebhookServer.dbInstance.prepare(`SELECT COUNT(*) AS c, MAX(last_verified) AS lv FROM groups WHERE active = 1`).get() as any;
const rowM = WebhookServer.dbInstance.prepare(`SELECT COUNT(*) AS c FROM group_members WHERE is_active = 1`).get() as any;
const active_groups = Number(rowG?.c || 0);
const active_members = Number(rowM?.c || 0);
const lv = rowG?.lv ? String(rowG.lv) : null;
let last_sync_at: string | null = lv;
let snapshot_age_ms: number | null = null;
if (lv) {
const iso = lv.includes('T') ? lv : (lv.replace(' ', 'T') + 'Z');
const ms = Date.parse(iso);
if (Number.isFinite(ms)) {
snapshot_age_ms = Date.now() - ms;
}
}
const lastSyncMetric = Metrics.get('last_sync_ok');
const maxAgeRaw = Number(process.env.MAX_MEMBERS_SNAPSHOT_AGE_MS);
const maxAgeMs = Number.isFinite(maxAgeRaw) && maxAgeRaw > 0 ? maxAgeRaw : 24 * 60 * 60 * 1000;
const snapshot_fresh = typeof snapshot_age_ms === 'number' ? (snapshot_age_ms <= maxAgeMs) : false;
let last_sync_ok: number;
if (typeof lastSyncMetric === 'number') {
last_sync_ok = (lastSyncMetric === 1 && snapshot_fresh) ? 1 : 0;
} else {
// Si no hay métrica explícita, nos basamos exclusivamente en la frescura de la snapshot
last_sync_ok = snapshot_fresh ? 1 : 0;
}
const payload = { status: 'ok', active_groups, active_members, last_sync_at, snapshot_age_ms, snapshot_fresh, last_sync_ok };
return new Response(JSON.stringify(payload), {
status: 200,
headers: { 'Content-Type': 'application/json' }
});
} catch (e) {
return new Response(JSON.stringify({ status: 'error' }), {
status: 500,
headers: { 'Content-Type': 'application/json' }
});
}
}
return new Response('OK', { status: 200 });
}
if (process.env.NODE_ENV !== 'test') {
console.log(' Incoming webhook request:')
}
// 1. Method validation
if (request.method !== 'POST') {
return new Response('🚫 Method not allowed', { status: 405 });
}
// 2. Content-Type validation
const contentType = request.headers.get('content-type');
if (!contentType?.includes('application/json')) {
return new Response('🚫 Invalid content type', { status: 400 });
}
try {
// 3. Parse and validate payload
const payload = await request.json() as WebhookPayload;
if (!payload.event || !payload.instance) {
return new Response('🚫 Invalid payload', { status: 400 });
}
// 4. Verify instance matches (skip in test environment unless TEST_VERIFY_INSTANCE is set)
if ((process.env.NODE_ENV !== 'test' || process.env.TEST_VERIFY_INSTANCE) &&
payload.instance !== process.env.EVOLUTION_API_INSTANCE) {
return new Response('🚫 Invalid instance', { status: 403 });
}
// 5. Route events
// console.log(' Webhook event received:', {
// event: payload.event,
// instance: payload.instance,
// data: payload.data ? '[...]' : null
// });
// Normalize event name to handle different casing/format (e.g., MESSAGES_UPSERT)
const evt = String(payload.event);
const evtNorm = evt.toLowerCase().replace(/_/g, '.');
// Contabilizar evento de webhook por tipo
try {
Metrics.inc(`webhook_events_total_${evtNorm.replace(/\./g, '_')}`);
} catch {}
switch (evtNorm) {
case 'messages.upsert':
if (process.env.NODE_ENV !== 'test') {
console.log(' Handling message upsert:', {
groupId: payload.data?.key?.remoteJid,
message: payload.data?.message?.conversation,
rawEvent: evt
});
}
await WebhookServer.handleMessageUpsert(payload.data);
break;
case 'contacts.update':
case 'chats.update':
if (process.env.NODE_ENV !== 'test') {
console.log(' Handling contacts/chats update event:', { rawEvent: evt });
}
ContactsService.updateFromWebhook(payload.data);
break;
case 'groups.upsert':
if (process.env.NODE_ENV !== 'test') {
console.log(' Handling groups upsert event:', { rawEvent: evt });
}
try {
await GroupSyncService.syncGroups();
GroupSyncService.refreshActiveGroupsCache();
await GroupSyncService.syncMembersForActiveGroups();
} catch (e) {
console.error('❌ Error handling groups.upsert:', e);
}
break;
// Other events will be added later
}
return new Response('OK', { status: 200 });
} catch (error) {
console.error('❌ Error processing webhook:', {
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
time: new Date().toISOString()
});
try { Metrics.inc('webhook_errors_total'); } catch {}
return new Response('Invalid request', { status: 400 });
}
}
static async handleMessageUpsert(data: any) {
if (!data?.key?.remoteJid || !data.message) {
if (process.env.NODE_ENV !== 'test') {
console.log('⚠️ Invalid message format - missing required fields');
console.log(data);
}
return;
}
const messageText = WebhookServer.getMessageText(data.message);
if (!messageText) {
if (process.env.NODE_ENV !== 'test') {
console.log('⚠️ Empty or unsupported message content');
}
return;
}
// Determine sender depending on context (group vs DM) and ignore non-user messages
const remoteJid = data.key.remoteJid;
const participant = data.key.participant;
const fromMe = !!data.key.fromMe;
// Ignore broadcasts/status
if (remoteJid === 'status@broadcast' || (typeof remoteJid === 'string' && remoteJid.endsWith('@broadcast'))) {
if (process.env.NODE_ENV !== 'test') {
console.log(' Ignoring broadcast/status message');
}
return;
}
// Ignore our own messages
if (fromMe) {
if (process.env.NODE_ENV !== 'test') {
console.log(' Ignoring message sent by the bot (fromMe=true)');
}
return;
}
// Compute sender JID based on chat type (prefer participantAlt when available due to Baileys change)
const senderRaw = isGroupId(remoteJid)
? (data.key.participantAlt || participant)
: remoteJid;
// Aprender mapping alias→número cuando vienen ambos y difieren (participant vs participantAlt)
if (isGroupId(remoteJid)) {
const pAlt = typeof data.key.participantAlt === 'string' ? data.key.participantAlt : null;
const p = typeof participant === 'string' ? participant : null;
if (pAlt && p) {
try {
const nAlt = normalizeWhatsAppId(pAlt);
const n = normalizeWhatsAppId(p);
if (nAlt && n && nAlt !== n) {
IdentityService.upsertAlias(p, pAlt, 'message.key');
}
} catch {}
}
}
// Normalize sender ID for consistency and validation
const normalizedSenderId = normalizeWhatsAppId(senderRaw);
if (!normalizedSenderId) {
if (process.env.NODE_ENV !== 'test') {
console.debug('⚠️ Invalid sender ID, ignoring message', { remoteJid, participant, fromMe });
}
return;
}
// Avoid processing messages from the bot number
if (process.env.CHATBOT_PHONE_NUMBER && normalizedSenderId === process.env.CHATBOT_PHONE_NUMBER) {
if (process.env.NODE_ENV !== 'test') {
console.log(' Ignoring message from the bot number');
}
return;
}
// Ensure user exists in database (swallow DB errors to keep webhook 200)
let userId: string | null = null;
try {
userId = ensureUserExists(senderRaw, WebhookServer.dbInstance);
} catch (e) {
if (process.env.NODE_ENV !== 'test') {
console.error('⚠️ Error ensuring user exists, ignoring message:', e);
}
return;
}
if (!userId) {
if (process.env.NODE_ENV !== 'test') {
console.log('⚠️ Failed to ensure user exists, ignoring message');
}
return;
}
// Etapa 2: Descubrimiento seguro de grupos (modo 'discover')
if (isGroupId(remoteJid)) {
try { (AllowedGroups as any).dbInstance = WebhookServer.dbInstance; } catch {}
const gatingMode = String(process.env.GROUP_GATING_MODE || 'off').toLowerCase();
if (gatingMode === 'discover') {
try {
const exists = WebhookServer.dbInstance
.prepare(`SELECT 1 FROM allowed_groups WHERE group_id = ? LIMIT 1`)
.get(remoteJid) as any;
if (!exists) {
try { AllowedGroups.upsertPending(remoteJid, null, normalizedSenderId); } catch {}
try { Metrics.inc('unknown_groups_discovered_total'); } catch {}
return;
}
} catch {
// Si la tabla no existe por alguna razón, intentar upsert y retornar igualmente
try { AllowedGroups.upsertPending(remoteJid, null, normalizedSenderId); } catch {}
try { Metrics.inc('unknown_groups_discovered_total'); } catch {}
return;
}
}
}
// Check/ensure group exists (allow DMs always)
if (isGroupId(data.key.remoteJid) && !GroupSyncService.isGroupActive(data.key.remoteJid)) {
// En tests, mantener comportamiento anterior: ignorar mensajes de grupos inactivos
if (process.env.NODE_ENV === 'test') {
return;
}
if (process.env.NODE_ENV !== 'test') {
console.log(' Group not active in cache — ensuring group and triggering quick members sync');
}
try {
GroupSyncService.ensureGroupExists(data.key.remoteJid);
GroupSyncService.refreshActiveGroupsCache();
await GroupSyncService.syncMembersForGroup(data.key.remoteJid);
} catch (e) {
if (process.env.NODE_ENV !== 'test') {
console.error('⚠️ Failed to ensure/sync group on-the-fly:', e);
}
}
}
// Forward to command service only if it's a text-ish message and starts with /t or /tarea
const messageTextTrimmed = messageText.trim();
if (messageTextTrimmed.startsWith('/tarea') || messageTextTrimmed.startsWith('/t')) {
// Rate limiting básico por usuario (desactivado en tests)
if (process.env.NODE_ENV !== 'test') {
const allowed = RateLimiter.checkAndConsume(normalizedSenderId);
if (!allowed) {
// Notificar como máximo una vez por minuto
if (RateLimiter.shouldNotify(normalizedSenderId)) {
await ResponseQueue.add([{
recipient: normalizedSenderId,
message: `Has superado el límite de ${((() => { const v = Number(process.env.RATE_LIMIT_PER_MIN); return Number.isFinite(v) && v > 0 ? v : 15; })())} comandos por minuto. Inténtalo de nuevo en un momento.`
}]);
}
return;
}
}
// Extraer menciones desde el mensaje (varios formatos)
const mentions = data.message?.contextInfo?.mentionedJid
|| data.message?.extendedTextMessage?.contextInfo?.mentionedJid
|| data.message?.imageMessage?.contextInfo?.mentionedJid
|| data.message?.videoMessage?.contextInfo?.mentionedJid
|| [];
// Asegurar que CommandService y TaskService usen la misma DB (tests/producción)
(CommandService as any).dbInstance = WebhookServer.dbInstance;
(TaskService as any).dbInstance = WebhookServer.dbInstance;
// Delegar el manejo del comando
const responses = await CommandService.handle({
sender: normalizedSenderId,
groupId: data.key.remoteJid,
message: messageText,
mentions
});
// Encolar respuestas si las hay
if (responses.length > 0) {
await ResponseQueue.add(responses);
}
}
}
static validateEnv() {
console.log(' Checking environment variables...');
console.log('EVOLUTION_API_URL:', process.env.EVOLUTION_API_URL ? '***' : 'MISSING');
console.log('EVOLUTION_API_INSTANCE:', process.env.EVOLUTION_API_INSTANCE || 'MISSING');
console.log('WEBHOOK_URL:', process.env.WEBHOOK_URL ? `${process.env.WEBHOOK_URL.substring(0, 20)}...` : 'NOT SET');
console.log('WHATSAPP_COMMUNITY_ID:', process.env.WHATSAPP_COMMUNITY_ID ? '***' : 'NOT SET (se mostrarán comunidades disponibles)');
const missing = REQUIRED_ENV.filter(v => !process.env[v]);
if (missing.length) {
console.error('❌ Missing required environment variables:');
missing.forEach(v => console.error(`- ${v}`));
console.error('Add these to your CapRover environment configuration');
process.exit(1);
}
if (process.env.CHATBOT_PHONE_NUMBER &&
!/^\d+$/.test(process.env.CHATBOT_PHONE_NUMBER)) {
console.error('❌ CHATBOT_PHONE_NUMBER must contain only digits');
process.exit(1);
}
}
static async start() {
this.validateEnv();
// Run database migrations (up-only) before starting services
await Migrator.migrateToLatest(this.dbInstance);
const PORT = process.env.PORT || '3007';
console.log('✅ Environment variables validated');
if (process.env.NODE_ENV !== 'test') {
try {
await WebhookManager.registerWebhook();
// Add small delay to allow webhook to propagate
await new Promise(resolve => setTimeout(resolve, 1000));
const isActive = await WebhookManager.verifyWebhook();
if (!isActive) {
console.error('❌ Webhook verification failed - retrying in 2 seconds...');
await new Promise(resolve => setTimeout(resolve, 2000));
const isActiveRetry = await WebhookManager.verifyWebhook();
if (!isActiveRetry) {
console.error('❌ Webhook verification failed after retry');
process.exit(1);
}
}
// Initialize groups - critical for operation
await GroupSyncService.checkInitialGroups();
// Start groups scheduler (periodic sync of groups)
try {
GroupSyncService.startGroupsScheduler();
console.log('✅ Group scheduler started');
} catch (e) {
console.error('⚠️ Failed to start Group scheduler:', e);
}
// Initial members sync (non-blocking if fails)
try {
await GroupSyncService.syncMembersForActiveGroups();
GroupSyncService.startMembersScheduler();
console.log('✅ Group members scheduler started');
} catch (e) {
console.error('⚠️ Failed to run initial members sync or start scheduler:', e);
}
// Start response queue worker (background)
try {
await ResponseQueue.process();
console.log('✅ ResponseQueue worker started');
// Start cleanup scheduler (daily retention)
ResponseQueue.startCleanupScheduler();
console.log('✅ ResponseQueue cleanup scheduler started');
RemindersService.start();
console.log('✅ RemindersService started');
} catch (e) {
console.error('❌ Failed to start ResponseQueue worker or cleanup scheduler:', e);
}
// Mantenimiento (cleanup de miembros inactivos)
try {
MaintenanceService.start();
console.log('✅ MaintenanceService started');
} catch (e) {
console.error('⚠️ Failed to start MaintenanceService:', e);
}
} catch (error) {
console.error('❌ Failed to setup webhook:', error instanceof Error ? error.message : error);
process.exit(1);
}
}
const server = Bun.serve({
port: parseInt(PORT),
fetch: (request) => WebhookServer.handleRequest(request)
});
console.log(`Server running on port ${PORT}`);
return server;
}
}