import type { Database } from 'bun:sqlite'; import { db } from '../db'; import { IdentityService } from './identity'; import { normalizeWhatsAppId } from '../utils/whatsapp'; import { Metrics } from './metrics'; import { toIsoSqlUTC } from '../utils/datetime'; const MAX_FALLBACK_DIGITS = (() => { const raw = (process.env.ONBOARDING_FALLBACK_MAX_DIGITS || '').trim(); const n = parseInt(raw || '15', 10); return Number.isFinite(n) && n > 0 ? n : 15; })(); const isDigits = (s: string) => /^\d+$/.test(s); type QueuedResponse = { recipient: string; message: string; mentions?: string[]; // full JIDs to mention (e.g., '346xxx@s.whatsapp.net') }; type ClaimedItem = { id: number; recipient: string; message: string; metadata?: string | null; // JSON-encoded metadata (e.g., { mentioned: [...] }) attempts: number; }; export const ResponseQueue = { // Permite inyectar una DB distinta en tests si se necesita dbInstance: db as Database, // Conservamos la cola en memoria por compatibilidad, aunque no se usa para persistencia queue: [] as QueuedResponse[], // Configuración fija (MVP) WORKERS: 2, BATCH_SIZE: 10, SLEEP_MS: 500, // Reintentos con backoff exponencial + jitter (valores por defecto, override opcional por env) MAX_ATTEMPTS: process.env.RQ_MAX_ATTEMPTS ? Number(process.env.RQ_MAX_ATTEMPTS) : 6, BASE_BACKOFF_MS: process.env.RQ_BASE_BACKOFF_MS ? Number(process.env.RQ_BASE_BACKOFF_MS) : 5000, MAX_BACKOFF_MS: process.env.RQ_MAX_BACKOFF_MS ? Number(process.env.RQ_MAX_BACKOFF_MS) : 3600000, REACTIONS_MAX_ATTEMPTS: process.env.RQ_REACTIONS_MAX_ATTEMPTS ? Number(process.env.RQ_REACTIONS_MAX_ATTEMPTS) : null, // Limpieza/retención (configurable por entorno) CLEANUP_ENABLED: process.env.RQ_CLEANUP_ENABLED !== 'false', RETENTION_DAYS_SENT: process.env.RQ_RETENTION_DAYS_SENT ? Number(process.env.RQ_RETENTION_DAYS_SENT) : 14, RETENTION_DAYS_FAILED: process.env.RQ_RETENTION_DAYS_FAILED ? Number(process.env.RQ_RETENTION_DAYS_FAILED) : 30, CLEANUP_INTERVAL_MS: process.env.RQ_CLEANUP_INTERVAL_MS ? Number(process.env.RQ_CLEANUP_INTERVAL_MS) : 24 * 60 * 60 * 1000, // 24h CLEANUP_BATCH: process.env.RQ_CLEANUP_BATCH ? Number(process.env.RQ_CLEANUP_BATCH) : 1000, OPTIMIZE_ENABLED: process.env.RQ_OPTIMIZE_ENABLED !== 'false', VACUUM_ENABLED: process.env.RQ_VACUUM_ENABLED === 'true', VACUUM_EVERY_N_RUNS: process.env.RQ_VACUUM_EVERY_N_RUNS ? Number(process.env.RQ_VACUUM_EVERY_N_RUNS) : 28, _running: false, _cleanupTimer: null as any, _cleanupRunning: false, _cleanupRunCount: 0, nowIso(): string { return toIsoSqlUTC(new Date()); }, futureIso(ms: number): string { return toIsoSqlUTC(new Date(Date.now() + ms)); }, computeDelayMs(attempt: number): number { const exp = Math.min(this.MAX_BACKOFF_MS, this.BASE_BACKOFF_MS * 2 ** Math.max(0, attempt - 1)); return Math.floor(Math.random() * exp); // full jitter }, async add(responses: QueuedResponse[]) { try { const botNumber = process.env.CHATBOT_PHONE_NUMBER; const filtered = responses.filter(r => r.recipient && r.message && (!botNumber || r.recipient !== botNumber) ); if (filtered.length === 0) { return; } const insert = this.dbInstance.prepare(` INSERT INTO response_queue (recipient, message, metadata, next_attempt_at) VALUES (?, ?, ?, ?) `); this.dbInstance.transaction((rows: QueuedResponse[]) => { for (const r of rows) { const metadata = r.mentions && r.mentions.length > 0 ? JSON.stringify({ mentioned: r.mentions }) : null; insert.run(r.recipient, r.message, metadata, this.nowIso()); } })(filtered); console.log('Queued responses (persisted):', filtered.length); } catch (err) { console.error('Failed to persist queued responses:', err); throw err; } }, // Encolar un DM de onboarding (part=1 inmediato, part=2 con retraso) enqueueOnboarding( recipient: string, message: string, metadata: { variant: 'initial' | 'reminder'; part: 1 | 2; bundle_id: string; group_id?: string | null; task_id?: number | null; display_code?: number | null; }, delayMs?: number ): void { if (!recipient || !message) return; const botNumber = (process.env.CHATBOT_PHONE_NUMBER || '').trim(); if (botNumber && recipient === botNumber) { try { Metrics.inc('onboarding_dm_skipped_total', 1, { reason: 'bot_number', group_id: String(metadata.group_id || '') }); } catch {} return; } const metaObj: any = { kind: 'onboarding', variant: metadata.variant, part: metadata.part, bundle_id: metadata.bundle_id, group_id: metadata.group_id ?? null, task_id: metadata.task_id ?? null, display_code: metadata.display_code ?? null }; const nextAt = delayMs && delayMs > 0 ? this.futureIso(delayMs) : this.nowIso(); this.dbInstance.prepare(` INSERT INTO response_queue (recipient, message, metadata, next_attempt_at) VALUES (?, ?, ?, ?) `).run(recipient, message, JSON.stringify(metaObj), nextAt); try { Metrics.inc('onboarding_dm_sent_total', 1, { variant: metadata.variant, part: String(metadata.part), group_id: String(metadata.group_id || '') }); } catch {} }, // Estadísticas de onboarding por destinatario (consulta simple sobre response_queue) getOnboardingStats(recipient: string): { total: number; lastSentAt: string | null; firstInitialAt?: string | null; lastVariant?: 'initial' | 'reminder' | null } { if (!recipient) return { total: 0, lastSentAt: null, firstInitialAt: undefined, lastVariant: null }; const rows = this.dbInstance.prepare(` SELECT status, created_at, updated_at, metadata FROM response_queue WHERE recipient = ? AND metadata IS NOT NULL `).all(recipient) as Array<{ status: string; created_at: string; updated_at: string; metadata: string | null }>; let total = 0; let lastSentAt: string | null = null; let firstInitialAt: string | null | undefined = undefined; let lastVariant: 'initial' | 'reminder' | null = null; let lastTsMs = -1; for (const r of rows) { let meta: any = null; try { meta = r.metadata ? JSON.parse(r.metadata) : null; } catch { meta = null; } if (!meta || meta.kind !== 'onboarding') continue; total++; // Elegir timestamp de referencia const tRaw = (r.updated_at || r.created_at || '').toString(); const iso = tRaw.includes('T') ? tRaw : (tRaw.replace(' ', 'T') + 'Z'); const ts = Date.parse(iso); if (Number.isFinite(ts) && ts > lastTsMs) { lastTsMs = ts; lastSentAt = tRaw || null; lastVariant = (meta.variant === 'reminder' ? 'reminder' : 'initial'); } // Primer initial (preferimos part=1) if (meta.variant === 'initial') { const created = (r.created_at || '').toString(); if (!firstInitialAt) { firstInitialAt = created || null; } else { // mantener el más antiguo try { const curIso = (firstInitialAt as string).includes('T') ? firstInitialAt as string : ((firstInitialAt as string).replace(' ', 'T') + 'Z'); const curMs = Date.parse(curIso); const newIso = created.includes('T') ? created : (created.replace(' ', 'T') + 'Z'); const newMs = Date.parse(newIso); if (Number.isFinite(newMs) && (!Number.isFinite(curMs) || newMs < curMs)) { firstInitialAt = created || null; } } catch {} } } } return { total, lastSentAt, firstInitialAt, lastVariant }; }, // Encolar una reacción con idempotencia (24h) usando metadata canónica async enqueueReaction(chatId: string, messageId: string, emoji: string, opts?: { participant?: string; fromMe?: boolean }): Promise { try { if (!chatId || !messageId || !emoji) return; // Construir JSON canónico (incluir participant/fromMe si están disponibles) const metaObj: any = { kind: 'reaction', emoji, chatId, messageId }; if (typeof opts?.fromMe === 'boolean') metaObj.fromMe = !!opts.fromMe; if (opts?.participant) metaObj.participant = opts.participant; const metadata = JSON.stringify(metaObj); const emojiLabel = emoji === '✅' ? 'check' : (emoji === '🤖' ? 'robot' : (emoji === '⚠️' ? 'warn' : 'other')); // Ventana de 24h const cutoff = this.futureIso(-24 * 60 * 60 * 1000); // Idempotencia: existe job igual reciente en estados activos? const exists = this.dbInstance.prepare(` SELECT 1 FROM response_queue WHERE metadata = ? AND status IN ('queued','processing','sent') AND (updated_at > ? OR created_at > ?) LIMIT 1 `).get(metadata, cutoff, cutoff) as any; if (exists) { return; } this.dbInstance.prepare(` INSERT INTO response_queue (recipient, message, metadata, next_attempt_at) VALUES (?, ?, ?, ?) `).run(chatId, '', metadata, this.nowIso()); try { Metrics.inc('reactions_enqueued_total', 1, { emoji: emojiLabel }); } catch {} } catch (err) { console.error('Failed to enqueue reaction:', err); throw err; } }, getHeaders(): HeadersInit { return { apikey: process.env.EVOLUTION_API_KEY || '', 'Content-Type': 'application/json', }; }, async sendOne(item: ClaimedItem): Promise<{ ok: boolean; status?: number; error?: string }> { const baseUrl = process.env.EVOLUTION_API_URL; const instance = process.env.EVOLUTION_API_INSTANCE; if (!baseUrl || !instance) { const msg = 'Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE'; console.error(msg); return { ok: false, error: msg }; } // Detectar jobs de reacción let meta: any = null; try { meta = item.metadata ? JSON.parse(item.metadata) : null; } catch {} if (meta && meta.kind === 'reaction') { const reactionUrl = `${baseUrl}/message/sendReaction/${instance}`; const chatId = String(meta.chatId || ''); const messageId = String(meta.messageId || ''); const emoji = String(meta.emoji || ''); const emojiLabel = emoji === '✅' ? 'check' : (emoji === '🤖' ? 'robot' : (emoji === '⚠️' ? 'warn' : 'other')); if (!chatId || !messageId || !emoji) { return { ok: false, error: 'invalid_reaction_metadata' }; } const fromMe = !!meta.fromMe; const key: any = { remoteJid: chatId, fromMe, id: messageId }; if (meta.participant) { key.participant = String(meta.participant); } const payload = { key, reaction: emoji }; try { const response = await fetch(reactionUrl, { method: 'POST', headers: this.getHeaders(), body: JSON.stringify(payload), }); if (!response.ok) { const body = await response.text().catch(() => ''); const errTxt = body?.slice(0, 200) || `HTTP ${response.status}`; console.warn('Send reaction failed:', { status: response.status, body: errTxt }); try { Metrics.inc('reactions_failed_total', 1, { emoji: emojiLabel }); } catch {} return { ok: false, status: response.status, error: errTxt }; } console.log(`✅ Sent reaction with payload: ${JSON.stringify(payload)}`); try { Metrics.inc('reactions_sent_total', 1, { emoji: emojiLabel }); } catch {} return { ok: true, status: response.status }; } catch (err) { const errMsg = (err instanceof Error ? err.message : String(err)); console.error('Network error sending reaction:', errMsg); try { Metrics.inc('reactions_failed_total', 1, { emoji: emojiLabel }); } catch {} return { ok: false, error: errMsg }; } } // Endpoint típico de Evolution API para texto simple const url = `${baseUrl}/message/sendText/${instance}`; try { // Resolver destinatario efectivo (alias → número) y validar antes de construir el payload const rawRecipient = String(item.recipient || ''); let numberOrJid = rawRecipient; if (rawRecipient.includes('@')) { if (rawRecipient.endsWith('@g.us')) { // Envío a grupo: usar el JID completo tal cual numberOrJid = rawRecipient; } else if (rawRecipient.endsWith('@s.whatsapp.net')) { // JID de usuario: normalizar a dígitos const n = normalizeWhatsAppId(rawRecipient); if (!n || !isDigits(n)) { try { Metrics.inc('responses_skipped_unresolvable_recipient_total', 1, { reason: 'non_numeric' }); } catch {} return { ok: false, status: 422, error: 'unresolvable_recipient_non_numeric' }; } if (n.length >= MAX_FALLBACK_DIGITS) { try { Metrics.inc('responses_skipped_unresolvable_recipient_total', 1, { reason: 'too_long' }); } catch {} return { ok: false, status: 422, error: 'unresolvable_recipient_too_long' }; } numberOrJid = n; } else { try { Metrics.inc('responses_skipped_unresolvable_recipient_total', 1, { reason: 'bad_domain' }); } catch {} return { ok: false, status: 422, error: 'unresolvable_recipient_bad_domain' }; } } else { // Sin dominio: resolver alias si existe y validar const resolved = IdentityService.resolveAliasOrNull(rawRecipient) || rawRecipient; if (!isDigits(resolved)) { try { Metrics.inc('responses_skipped_unresolvable_recipient_total', 1, { reason: 'non_numeric' }); } catch {} return { ok: false, status: 422, error: 'unresolvable_recipient_non_numeric' }; } if (resolved.length >= MAX_FALLBACK_DIGITS) { try { Metrics.inc('responses_skipped_unresolvable_recipient_total', 1, { reason: 'too_long' }); } catch {} return { ok: false, status: 422, error: 'unresolvable_recipient_too_long' }; } numberOrJid = resolved; } // Build payload, adding mentioned JIDs if present in metadata const payload: any = { number: numberOrJid, text: item.message, }; if (item.metadata) { try { const parsed = JSON.parse(item.metadata); if (parsed && Array.isArray(parsed.mentioned) && parsed.mentioned.length > 0) { const resolved: string[] = []; for (const m of parsed.mentioned) { const n = normalizeWhatsAppId(String(m)); if (!n) continue; const r = IdentityService.resolveAliasOrNull(n) || n; if (!/^\d+$/.test(r)) continue; resolved.push(`${r}@s.whatsapp.net`); } // Eliminar duplicados payload.mentioned = Array.from(new Set(resolved)); } } catch { // ignore bad metadata } } const response = await fetch(url, { method: 'POST', headers: this.getHeaders(), body: JSON.stringify(payload), }); if (!response.ok) { const body = await response.text().catch(() => ''); const errTxt = body?.slice(0, 200) || `HTTP ${response.status}`; console.warn('Send failed:', { status: response.status, body: errTxt }); return { ok: false, status: response.status, error: errTxt }; } console.log(`✅ Sent message with payload: ${JSON.stringify(payload)}`); return { ok: true, status: response.status }; } catch (err) { const errMsg = (err instanceof Error ? err.message : String(err)); console.error('Network error sending message:', errMsg); return { ok: false, error: errMsg }; } }, claimNextBatch(limit: number): ClaimedItem[] { // Selecciona y marca como 'processing' en una sola sentencia para evitar carreras const rows = this.dbInstance.prepare(` UPDATE response_queue SET status = 'processing', updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') WHERE id IN ( SELECT id FROM response_queue WHERE status = 'queued' AND (next_attempt_at IS NULL OR next_attempt_at <= strftime('%Y-%m-%d %H:%M:%f', 'now')) ORDER BY COALESCE(next_attempt_at, created_at), id LIMIT ? ) RETURNING id, recipient, message, metadata, attempts `).all(limit) as ClaimedItem[]; return rows || []; }, markSent(id: number, statusCode?: number) { this.dbInstance.prepare(` UPDATE response_queue SET status = 'sent', last_status_code = ?, updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') WHERE id = ? `).run(statusCode ?? null, id); // Recalcular métricas agregadas de onboarding si aplica try { const row = this.dbInstance.prepare(`SELECT metadata FROM response_queue WHERE id = ?`).get(id) as any; let meta: any = null; try { meta = row?.metadata ? JSON.parse(String(row.metadata)) : null; } catch {} if (meta && meta.kind === 'onboarding') { this.setOnboardingAggregatesMetrics(); } } catch {} }, markFailed(id: number, errorMsg: string, statusCode?: number, attempts?: number) { const msg = (errorMsg || '').toString().slice(0, 500); this.dbInstance.prepare(` UPDATE response_queue SET status = 'failed', attempts = COALESCE(?, attempts), last_error = ?, last_status_code = ?, updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') WHERE id = ? `).run(attempts ?? null, msg, statusCode ?? null, id); }, requeueWithBackoff(id: number, nextAttempts: number, nextAttemptAt: string, statusCode?: number | null, errorMsg?: string) { const msg = (errorMsg || '').toString().slice(0, 500) || null; this.dbInstance.prepare(` UPDATE response_queue SET status = 'queued', attempts = ?, next_attempt_at = ?, last_error = COALESCE(?, last_error), last_status_code = COALESCE(?, last_status_code), updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') WHERE id = ? `).run(nextAttempts, nextAttemptAt, msg, statusCode ?? null, id); }, setOnboardingAggregatesMetrics(): void { try { // Total de mensajes de onboarding enviados const sentRow = this.dbInstance.prepare(` SELECT COUNT(*) AS c FROM response_queue WHERE status = 'sent' AND metadata LIKE '%"kind":"onboarding"%' `).get() as any; const sentAbs = Number(sentRow?.c || 0); // Destinatarios únicos con al menos 1 onboarding enviado const rcptRow = this.dbInstance.prepare(` SELECT COUNT(DISTINCT recipient) AS c FROM response_queue WHERE status = 'sent' AND metadata LIKE '%"kind":"onboarding"%' `).get() as any; const recipientsAbs = Number(rcptRow?.c || 0); // Usuarios convertidos: last_command_at > primer onboarding enviado const convRow = this.dbInstance.prepare(` SELECT COUNT(*) AS c FROM users u JOIN ( SELECT recipient, MIN(created_at) AS first_at FROM response_queue WHERE status = 'sent' AND metadata LIKE '%"kind":"onboarding"%' GROUP BY recipient ) f ON f.recipient = u.id WHERE u.last_command_at IS NOT NULL AND u.last_command_at > f.first_at `).get() as any; const convertedAbs = Number(convRow?.c || 0); const rate = recipientsAbs > 0 ? Math.max(0, Math.min(1, convertedAbs / recipientsAbs)) : 0; try { Metrics.set('onboarding_dm_sent_abs', sentAbs); } catch {} try { Metrics.set('onboarding_recipients_abs', recipientsAbs); } catch {} try { Metrics.set('onboarding_converted_users_abs', convertedAbs); } catch {} try { Metrics.set('onboarding_conversion_rate', rate); } catch {} } catch { // no-op } }, async workerLoop(workerId: number) { while (this._running) { try { const batch = this.claimNextBatch(this.BATCH_SIZE); if (batch.length === 0) { await new Promise(r => setTimeout(r, this.SLEEP_MS)); continue; } for (const item of batch) { const result = await this.sendOne(item); if (result.ok) { this.markSent(item.id, result.status); continue; } const status = result.status; const attemptsNow = (item.attempts || 0) + 1; const errMsg = result.error || 'send failed'; // 4xx = fallo definitivo if (typeof status === 'number' && status >= 400 && status < 500) { this.markFailed(item.id, errMsg, status, attemptsNow); continue; } // 5xx o error de red: reintento con backoff si no superó el máximo (ajustado para reacciones) let metaForMax: any = null; try { metaForMax = item.metadata ? JSON.parse(String(item.metadata)) : null; } catch {} const isReactionJob = !!(metaForMax && metaForMax.kind === 'reaction'); const effectiveMax = isReactionJob && this.REACTIONS_MAX_ATTEMPTS ? this.REACTIONS_MAX_ATTEMPTS : this.MAX_ATTEMPTS; if (attemptsNow >= effectiveMax) { this.markFailed(item.id, errMsg, status, attemptsNow); continue; } const delayMs = this.computeDelayMs(attemptsNow); const when = this.futureIso(delayMs); this.requeueWithBackoff(item.id, attemptsNow, when, status ?? null, errMsg); } } catch (err) { console.error(`ResponseQueue worker ${workerId} error:`, err); // Evitar bucle apretado ante errores await new Promise(r => setTimeout(r, this.SLEEP_MS)); } } }, async process() { // Inicia N workers en background, retorna inmediatamente if (this._running) { return; } this._running = true; console.log(`Starting ResponseQueue with ${this.WORKERS} workers`); for (let i = 0; i < this.WORKERS; i++) { // No await: correr en paralelo this.workerLoop(i + 1); } }, // Limpieza/retención de historiales async runCleanupOnce(now: Date = new Date()): Promise<{ deletedSent: number; deletedFailed: number; totalDeleted: number; skipped: boolean }> { // Evitar solapes if (this._cleanupRunning) { return { deletedSent: 0, deletedFailed: 0, totalDeleted: 0, skipped: true }; } this._cleanupRunning = true; const startedAt = Date.now(); try { const msPerDay = 24 * 60 * 60 * 1000; const sentThresholdIso = toIsoSqlUTC(new Date(now.getTime() - this.RETENTION_DAYS_SENT * msPerDay)); const failedThresholdIso = toIsoSqlUTC(new Date(now.getTime() - this.RETENTION_DAYS_FAILED * msPerDay)); const cleanStatus = (status: 'sent' | 'failed', thresholdIso: string): number => { let deleted = 0; const selectStmt = this.dbInstance.prepare(` SELECT id FROM response_queue WHERE status = ? AND updated_at < ? ORDER BY updated_at LIMIT ? `); while (true) { const rows = selectStmt.all(status, thresholdIso, this.CLEANUP_BATCH) as Array<{ id: number }>; if (!rows || rows.length === 0) break; const ids = rows.map(r => r.id); const placeholders = ids.map(() => '?').join(','); this.dbInstance.prepare(`DELETE FROM response_queue WHERE id IN (${placeholders})`).run(...ids); deleted += ids.length; // Si el lote es menor que el batch, no quedan más candidatos if (rows.length < this.CLEANUP_BATCH) break; } return deleted; }; const deletedSent = cleanStatus('sent', sentThresholdIso); const deletedFailed = cleanStatus('failed', failedThresholdIso); const totalDeleted = deletedSent + deletedFailed; // Mantenimiento ligero tras limpieza if (this.OPTIMIZE_ENABLED && totalDeleted > 0) { try { this.dbInstance.exec('PRAGMA optimize;'); } catch (e) { console.warn('PRAGMA optimize failed:', e); } } // VACUUM opcional (desactivado por defecto) if (this.VACUUM_ENABLED && totalDeleted > 0) { this._cleanupRunCount++; if (this._cleanupRunCount % Math.max(1, this.VACUUM_EVERY_N_RUNS) === 0) { try { this.dbInstance.exec('VACUUM;'); } catch (e) { console.warn('VACUUM failed:', e); } } } const tookMs = Date.now() - startedAt; if (process.env.NODE_ENV !== 'test') { console.log(`🧹 Cleanup done in ${tookMs}ms: sent=${deletedSent}, failed=${deletedFailed}, total=${totalDeleted}`); } return { deletedSent, deletedFailed, totalDeleted, skipped: false }; } catch (err) { console.error('Cleanup error:', err); return { deletedSent: 0, deletedFailed: 0, totalDeleted: 0, skipped: false }; } finally { this._cleanupRunning = false; } }, startCleanupScheduler() { if (process.env.NODE_ENV === 'test') return; if (!this.CLEANUP_ENABLED) return; if (this._cleanupTimer) return; const interval = this.CLEANUP_INTERVAL_MS; this._cleanupTimer = setInterval(() => { this.runCleanupOnce().catch(err => console.error('Scheduled cleanup error:', err)); }, interval); console.log(`🗓️ Cleanup scheduler started (every ${Math.round(interval / (60 * 60 * 1000))}h)`); }, stopCleanupScheduler() { if (this._cleanupTimer) { clearInterval(this._cleanupTimer); this._cleanupTimer = null; if (process.env.NODE_ENV !== 'test') { console.log('🛑 Cleanup scheduler stopped'); } } }, stop() { this._running = false; } };