diff --git a/src/clients/evolution.ts b/src/clients/evolution.ts new file mode 100644 index 0000000..5cef4a5 --- /dev/null +++ b/src/clients/evolution.ts @@ -0,0 +1,63 @@ +export type EvolutionResult = { ok: boolean; status?: number; error?: string }; + +export function buildHeaders(): HeadersInit { + return { + apikey: process.env.EVOLUTION_API_KEY || '', + 'Content-Type': 'application/json' + }; +} + +export async function sendText(payload: { number: string; text: string; mentioned?: string[] }): Promise { + const baseUrl = process.env.EVOLUTION_API_URL; + const instance = process.env.EVOLUTION_API_INSTANCE; + if (!baseUrl || !instance) { + const msg = 'Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE'; + return { ok: false, error: msg }; + } + const url = `${baseUrl}/message/sendText/${instance}`; + try { + const res = await fetch(url, { + method: 'POST', + headers: buildHeaders(), + body: JSON.stringify(payload) + }); + if (!res.ok) { + const body = await res.text().catch(() => ''); + const errTxt = body?.slice(0, 200) || `HTTP ${res.status}`; + return { ok: false, status: res.status, error: errTxt }; + } + return { ok: true, status: res.status }; + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + return { ok: false, error: errMsg }; + } +} + +export async function sendReaction(payload: { + key: { remoteJid: string; id: string; fromMe: boolean; participant?: string }; + reaction: string; +}): Promise { + const baseUrl = process.env.EVOLUTION_API_URL; + const instance = process.env.EVOLUTION_API_INSTANCE; + if (!baseUrl || !instance) { + const msg = 'Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE'; + return { ok: false, error: msg }; + } + const url = `${baseUrl}/message/sendReaction/${instance}`; + try { + const res = await fetch(url, { + method: 'POST', + headers: buildHeaders(), + body: JSON.stringify(payload) + }); + if (!res.ok) { + const body = await res.text().catch(() => ''); + const errTxt = body?.slice(0, 200) || `HTTP ${res.status}`; + return { ok: false, status: res.status, error: errTxt }; + } + return { ok: true, status: res.status }; + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + return { ok: false, error: errMsg }; + } +} diff --git a/src/services/queue/cleanup.ts b/src/services/queue/cleanup.ts new file mode 100644 index 0000000..e9b6b0b --- /dev/null +++ b/src/services/queue/cleanup.ts @@ -0,0 +1,74 @@ +import type { Database } from 'bun:sqlite'; +import { toIsoSqlUTC } from '../../utils/datetime'; + +export type CleanupOptions = { + retentionDaysSent: number; + retentionDaysFailed: number; + batchSize: number; + optimize: boolean; + vacuum: boolean; + vacuumEveryNRuns: number; + cleanupRunCount: number; +}; + +export async function runCleanupOnce( + db: Database, + opts: CleanupOptions, + now: Date = new Date() +): Promise<{ deletedSent: number; deletedFailed: number; totalDeleted: number; nextCleanupRunCount: number }> { + const msPerDay = 24 * 60 * 60 * 1000; + const sentThresholdIso = toIsoSqlUTC(new Date(now.getTime() - opts.retentionDaysSent * msPerDay)); + const failedThresholdIso = toIsoSqlUTC(new Date(now.getTime() - opts.retentionDaysFailed * msPerDay)); + + const cleanStatus = (status: 'sent' | 'failed', thresholdIso: string, batch: number): number => { + let deleted = 0; + const selectStmt = db.prepare(` + SELECT id + FROM response_queue + WHERE status = ? AND updated_at < ? + ORDER BY updated_at + LIMIT ? + `); + + while (true) { + const rows = selectStmt.all(status, thresholdIso, batch) as Array<{ id: number }>; + if (!rows || rows.length === 0) break; + + const ids = rows.map((r) => r.id); + const placeholders = ids.map(() => '?').join(','); + db.prepare(`DELETE FROM response_queue WHERE id IN (${placeholders})`).run(...ids); + deleted += ids.length; + + if (rows.length < batch) break; + } + return deleted; + }; + + const deletedSent = cleanStatus('sent', sentThresholdIso, opts.batchSize); + const deletedFailed = cleanStatus('failed', failedThresholdIso, opts.batchSize); + const totalDeleted = deletedSent + deletedFailed; + + // Mantenimiento ligero tras limpieza + if (opts.optimize && totalDeleted > 0) { + try { + db.exec('PRAGMA optimize;'); + } catch (e) { + console.warn('PRAGMA optimize failed:', e); + } + } + + // VACUUM opcional + let nextCleanupRunCount = opts.cleanupRunCount; + if (opts.vacuum && totalDeleted > 0) { + nextCleanupRunCount++; + if (nextCleanupRunCount % Math.max(1, opts.vacuumEveryNRuns) === 0) { + try { + db.exec('VACUUM;'); + } catch (e) { + console.warn('VACUUM failed:', e); + } + } + } + + return { deletedSent, deletedFailed, totalDeleted, nextCleanupRunCount }; +} diff --git a/src/services/queue/metadata.ts b/src/services/queue/metadata.ts new file mode 100644 index 0000000..35eef6a --- /dev/null +++ b/src/services/queue/metadata.ts @@ -0,0 +1,46 @@ +export type OnboardingMeta = { + kind: 'onboarding'; + variant: 'initial' | 'reminder'; + part: 1 | 2; + bundle_id: string; + group_id?: string | null; + task_id?: number | null; + display_code?: number | null; +}; + +export type ReactionMeta = { + kind: 'reaction'; + emoji: string; + chatId: string; + messageId: string; + participant?: string; + fromMe?: boolean; +}; + +export type QueueMetadata = OnboardingMeta | ReactionMeta | Record; + +export function parseQueueMetadata(raw: string | null | undefined): QueueMetadata | null { + if (!raw) return null; + try { + const obj = JSON.parse(String(raw)); + if (!obj || typeof obj !== 'object') return null; + const kind = String((obj as any).kind || ''); + if (kind === 'reaction') { + // Validación mínima + return { + kind: 'reaction', + emoji: String((obj as any).emoji || ''), + chatId: String((obj as any).chatId || ''), + messageId: String((obj as any).messageId || ''), + participant: typeof (obj as any).participant === 'string' ? String((obj as any).participant) : undefined, + fromMe: typeof (obj as any).fromMe === 'boolean' ? Boolean((obj as any).fromMe) : undefined + } as ReactionMeta; + } + if (kind === 'onboarding') { + return obj as OnboardingMeta; + } + return obj as Record; + } catch { + return null; + } +} diff --git a/src/services/response-queue.ts b/src/services/response-queue.ts index d6acee9..0503b3c 100644 --- a/src/services/response-queue.ts +++ b/src/services/response-queue.ts @@ -4,6 +4,9 @@ import { IdentityService } from './identity'; import { normalizeWhatsAppId } from '../utils/whatsapp'; import { Metrics } from './metrics'; import { toIsoSqlUTC } from '../utils/datetime'; +import * as EvolutionClient from '../clients/evolution'; +import { runCleanupOnce as cleanupRunOnce } from './queue/cleanup'; +import { parseQueueMetadata } from './queue/metadata'; const MAX_FALLBACK_DIGITS = (() => { const raw = (process.env.ONBOARDING_FALLBACK_MAX_DIGITS || '').trim(); @@ -260,10 +263,8 @@ export const ResponseQueue = { } // Detectar jobs de reacción - let meta: any = null; - try { meta = item.metadata ? JSON.parse(item.metadata) : null; } catch {} + const meta = parseQueueMetadata(item.metadata); if (meta && meta.kind === 'reaction') { - const reactionUrl = `${baseUrl}/message/sendReaction/${instance}`; const chatId = String(meta.chatId || ''); const messageId = String(meta.messageId || ''); const emoji = String(meta.emoji || ''); @@ -276,32 +277,17 @@ export const ResponseQueue = { if (meta.participant) { key.participant = String(meta.participant); } - const payload = { - key, - reaction: emoji - }; - try { - const response = await fetch(reactionUrl, { - method: 'POST', - headers: this.getHeaders(), - body: JSON.stringify(payload), - }); - if (!response.ok) { - const body = await response.text().catch(() => ''); - const errTxt = body?.slice(0, 200) || `HTTP ${response.status}`; - console.warn('Send reaction failed:', { status: response.status, body: errTxt }); - try { Metrics.inc('reactions_failed_total', 1, { emoji: emojiLabel }); } catch {} - return { ok: false, status: response.status, error: errTxt }; - } - console.log(`✅ Sent reaction with payload: ${JSON.stringify(payload)}`); - try { Metrics.inc('reactions_sent_total', 1, { emoji: emojiLabel }); } catch {} - return { ok: true, status: response.status }; - } catch (err) { - const errMsg = (err instanceof Error ? err.message : String(err)); - console.error('Network error sending reaction:', errMsg); + const payload = { key, reaction: emoji }; + const result = await EvolutionClient.sendReaction(payload); + if (!result.ok) { + const errTxt = result.error || (typeof result.status === 'number' ? `HTTP ${result.status}` : 'unknown_error'); + console.warn('Send reaction failed:', { status: result.status, body: errTxt }); try { Metrics.inc('reactions_failed_total', 1, { emoji: emojiLabel }); } catch {} - return { ok: false, error: errMsg }; + return { ok: false, status: result.status, error: errTxt }; } + console.log(`✅ Sent reaction with payload: ${JSON.stringify(payload)}`); + try { Metrics.inc('reactions_sent_total', 1, { emoji: emojiLabel }); } catch {} + return { ok: true, status: result.status }; } // Endpoint típico de Evolution API para texto simple @@ -372,20 +358,16 @@ export const ResponseQueue = { } } - const response = await fetch(url, { - method: 'POST', - headers: this.getHeaders(), - body: JSON.stringify(payload), - }); - - if (!response.ok) { - const body = await response.text().catch(() => ''); - const errTxt = body?.slice(0, 200) || `HTTP ${response.status}`; - console.warn('Send failed:', { status: response.status, body: errTxt }); - return { ok: false, status: response.status, error: errTxt }; + { + const result = await EvolutionClient.sendText(payload); + if (!result.ok) { + const errTxt = result.error || (typeof result.status === 'number' ? `HTTP ${result.status}` : 'unknown_error'); + console.warn('Send failed:', { status: result.status, body: errTxt }); + return { ok: false, status: result.status, error: errTxt }; + } + console.log(`✅ Sent message with payload: ${JSON.stringify(payload)}`); + return { ok: true, status: result.status }; } - console.log(`✅ Sent message with payload: ${JSON.stringify(payload)}`); - return { ok: true, status: response.status }; } catch (err) { const errMsg = (err instanceof Error ? err.message : String(err)); console.error('Network error sending message:', errMsg); @@ -576,66 +558,23 @@ export const ResponseQueue = { const startedAt = Date.now(); try { - const msPerDay = 24 * 60 * 60 * 1000; - - const sentThresholdIso = toIsoSqlUTC(new Date(now.getTime() - this.RETENTION_DAYS_SENT * msPerDay)); - const failedThresholdIso = toIsoSqlUTC(new Date(now.getTime() - this.RETENTION_DAYS_FAILED * msPerDay)); - - const cleanStatus = (status: 'sent' | 'failed', thresholdIso: string): number => { - let deleted = 0; - const selectStmt = this.dbInstance.prepare(` - SELECT id - FROM response_queue - WHERE status = ? AND updated_at < ? - ORDER BY updated_at - LIMIT ? - `); - - while (true) { - const rows = selectStmt.all(status, thresholdIso, this.CLEANUP_BATCH) as Array<{ id: number }>; - if (!rows || rows.length === 0) break; - - const ids = rows.map(r => r.id); - const placeholders = ids.map(() => '?').join(','); - this.dbInstance.prepare(`DELETE FROM response_queue WHERE id IN (${placeholders})`).run(...ids); - deleted += ids.length; - - // Si el lote es menor que el batch, no quedan más candidatos - if (rows.length < this.CLEANUP_BATCH) break; - } - return deleted; - }; - - const deletedSent = cleanStatus('sent', sentThresholdIso); - const deletedFailed = cleanStatus('failed', failedThresholdIso); - const totalDeleted = deletedSent + deletedFailed; - - // Mantenimiento ligero tras limpieza - if (this.OPTIMIZE_ENABLED && totalDeleted > 0) { - try { - this.dbInstance.exec('PRAGMA optimize;'); - } catch (e) { - console.warn('PRAGMA optimize failed:', e); - } - } - - // VACUUM opcional (desactivado por defecto) - if (this.VACUUM_ENABLED && totalDeleted > 0) { - this._cleanupRunCount++; - if (this._cleanupRunCount % Math.max(1, this.VACUUM_EVERY_N_RUNS) === 0) { - try { - this.dbInstance.exec('VACUUM;'); - } catch (e) { - console.warn('VACUUM failed:', e); - } - } - } + const res = await cleanupRunOnce(this.dbInstance, { + retentionDaysSent: this.RETENTION_DAYS_SENT, + retentionDaysFailed: this.RETENTION_DAYS_FAILED, + batchSize: this.CLEANUP_BATCH, + optimize: this.OPTIMIZE_ENABLED, + vacuum: this.VACUUM_ENABLED, + vacuumEveryNRuns: this.VACUUM_EVERY_N_RUNS, + cleanupRunCount: this._cleanupRunCount + }, now); + + this._cleanupRunCount = res.nextCleanupRunCount; const tookMs = Date.now() - startedAt; if (process.env.NODE_ENV !== 'test') { - console.log(`🧹 Cleanup done in ${tookMs}ms: sent=${deletedSent}, failed=${deletedFailed}, total=${totalDeleted}`); + console.log(`🧹 Cleanup done in ${tookMs}ms: sent=${res.deletedSent}, failed=${res.deletedFailed}, total=${res.totalDeleted}`); } - return { deletedSent, deletedFailed, totalDeleted, skipped: false }; + return { deletedSent: res.deletedSent, deletedFailed: res.deletedFailed, totalDeleted: res.totalDeleted, skipped: false }; } catch (err) { console.error('Cleanup error:', err); return { deletedSent: 0, deletedFailed: 0, totalDeleted: 0, skipped: false };