From 79b5d83f5bbe644c043ada8490add563dc6fe9c7 Mon Sep 17 00:00:00 2001 From: borja Date: Sat, 6 Sep 2025 19:06:04 +0200 Subject: [PATCH] feat: habilitar reintentos con backoff exponencial + jitter en cola Co-authored-by: aider (openrouter/openai/gpt-5) --- src/db.ts | 38 ++++++++++++ src/services/response-queue.ts | 105 +++++++++++++++++++++++++-------- 2 files changed, 120 insertions(+), 23 deletions(-) diff --git a/src/db.ts b/src/db.ts index 7ca548a..3902d84 100644 --- a/src/db.ts +++ b/src/db.ts @@ -117,6 +117,44 @@ export function initializeDatabase(instance: Database) { } catch (e) { console.warn('[initializeDatabase] Skipped adding tasks.completed_by column:', e); } + + // Migration: ensure reliability columns exist on response_queue (next_attempt_at, lease_until, last_status_code) + try { + const cols = instance.query(`PRAGMA table_info('response_queue')`).all() as any[]; + const hasNextAttempt = Array.isArray(cols) && cols.some((c: any) => c.name === 'next_attempt_at'); + if (!hasNextAttempt) { + instance.exec(`ALTER TABLE response_queue ADD COLUMN next_attempt_at TEXT NULL;`); + } + const hasLeaseUntil = Array.isArray(cols) && cols.some((c: any) => c.name === 'lease_until'); + if (!hasLeaseUntil) { + instance.exec(`ALTER TABLE response_queue ADD COLUMN lease_until TEXT NULL;`); + } + const hasLastStatus = Array.isArray(cols) && cols.some((c: any) => c.name === 'last_status_code'); + if (!hasLastStatus) { + instance.exec(`ALTER TABLE response_queue ADD COLUMN last_status_code INTEGER NULL;`); + } + } catch (e) { + console.warn('[initializeDatabase] Skipped ensuring response_queue reliability columns:', e); + } + + // Ensure supporting indexes exist + try { + instance.exec(` + CREATE INDEX IF NOT EXISTS idx_response_queue_status_next_attempt + ON response_queue (status, next_attempt_at); + `); + } catch (e) { + console.warn('[initializeDatabase] Skipped creating idx_response_queue_status_next_attempt:', e); + } + + try { + instance.exec(` + CREATE INDEX IF NOT EXISTS idx_response_queue_status_lease_until + ON response_queue (status, lease_until); + `); + } catch (e) { + console.warn('[initializeDatabase] Skipped creating idx_response_queue_status_lease_until:', e); + } } /** diff --git a/src/services/response-queue.ts b/src/services/response-queue.ts index 67c385e..504acdc 100644 --- a/src/services/response-queue.ts +++ b/src/services/response-queue.ts @@ -12,6 +12,7 @@ type ClaimedItem = { recipient: string; message: string; metadata?: string | null; // JSON-encoded metadata (e.g., { mentioned: [...] }) + attempts: number; }; export const ResponseQueue = { @@ -26,8 +27,26 @@ export const ResponseQueue = { BATCH_SIZE: 10, SLEEP_MS: 500, + // Reintentos con backoff exponencial + jitter (valores por defecto, override opcional por env) + MAX_ATTEMPTS: process.env.RQ_MAX_ATTEMPTS ? Number(process.env.RQ_MAX_ATTEMPTS) : 6, + BASE_BACKOFF_MS: process.env.RQ_BASE_BACKOFF_MS ? Number(process.env.RQ_BASE_BACKOFF_MS) : 5000, + MAX_BACKOFF_MS: process.env.RQ_MAX_BACKOFF_MS ? Number(process.env.RQ_MAX_BACKOFF_MS) : 3600000, + _running: false, + nowIso(): string { + return new Date().toISOString().replace('T', ' ').replace('Z', ''); + }, + + futureIso(ms: number): string { + return new Date(Date.now() + ms).toISOString().replace('T', ' ').replace('Z', ''); + }, + + computeDelayMs(attempt: number): number { + const exp = Math.min(this.MAX_BACKOFF_MS, this.BASE_BACKOFF_MS * 2 ** Math.max(0, attempt - 1)); + return Math.floor(Math.random() * exp); // full jitter + }, + async add(responses: QueuedResponse[]) { try { const botNumber = process.env.CHATBOT_PHONE_NUMBER; @@ -42,8 +61,8 @@ export const ResponseQueue = { } const insert = this.dbInstance.prepare(` - INSERT INTO response_queue (recipient, message, metadata) - VALUES (?, ?, ?) + INSERT INTO response_queue (recipient, message, metadata, next_attempt_at) + VALUES (?, ?, ?, ?) `); this.dbInstance.transaction((rows: QueuedResponse[]) => { @@ -52,7 +71,7 @@ export const ResponseQueue = { r.mentions && r.mentions.length > 0 ? JSON.stringify({ mentioned: r.mentions }) : null; - insert.run(r.recipient, r.message, metadata); + insert.run(r.recipient, r.message, metadata, this.nowIso()); } })(filtered); @@ -70,12 +89,13 @@ export const ResponseQueue = { }; }, - async sendOne(item: ClaimedItem): Promise { + async sendOne(item: ClaimedItem): Promise<{ ok: boolean; status?: number; error?: string }> { const baseUrl = process.env.EVOLUTION_API_URL; const instance = process.env.EVOLUTION_API_INSTANCE; if (!baseUrl || !instance) { - console.error('Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE'); - return false; + const msg = 'Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE'; + console.error(msg); + return { ok: false, error: msg }; } // Endpoint típico de Evolution API para texto simple @@ -107,14 +127,16 @@ export const ResponseQueue = { if (!response.ok) { const body = await response.text().catch(() => ''); - console.warn('Send failed:', { status: response.status, body: body?.slice(0, 200) }); - return false; + const errTxt = body?.slice(0, 200) || `HTTP ${response.status}`; + console.warn('Send failed:', { status: response.status, body: errTxt }); + return { ok: false, status: response.status, error: errTxt }; } - console.log(`✅ Sent message to with this as payload: ${JSON.stringify(payload)}`); - return true; + console.log(`✅ Sent message with payload: ${JSON.stringify(payload)}`); + return { ok: true, status: response.status }; } catch (err) { - console.error('Network error sending message:', err); - return false; + const errMsg = (err instanceof Error ? err.message : String(err)); + console.error('Network error sending message:', errMsg); + return { ok: false, error: errMsg }; } }, @@ -127,33 +149,51 @@ export const ResponseQueue = { WHERE id IN ( SELECT id FROM response_queue WHERE status = 'queued' - ORDER BY created_at, id + AND (next_attempt_at IS NULL OR next_attempt_at <= strftime('%Y-%m-%d %H:%M:%f', 'now')) + ORDER BY COALESCE(next_attempt_at, created_at), id LIMIT ? ) - RETURNING id, recipient, message, metadata + RETURNING id, recipient, message, metadata, attempts `).all(limit) as ClaimedItem[]; return rows || []; }, - markSent(id: number) { + markSent(id: number, statusCode?: number) { this.dbInstance.prepare(` UPDATE response_queue SET status = 'sent', + last_status_code = ?, updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') WHERE id = ? - `).run(id); + `).run(statusCode ?? null, id); }, - markFailed(id: number, errorMsg: string) { + markFailed(id: number, errorMsg: string, statusCode?: number, attempts?: number) { const msg = (errorMsg || '').toString().slice(0, 500); this.dbInstance.prepare(` UPDATE response_queue SET status = 'failed', + attempts = COALESCE(?, attempts), last_error = ?, + last_status_code = ?, updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') WHERE id = ? - `).run(msg, id); + `).run(attempts ?? null, msg, statusCode ?? null, id); + }, + + requeueWithBackoff(id: number, nextAttempts: number, nextAttemptAt: string, statusCode?: number | null, errorMsg?: string) { + const msg = (errorMsg || '').toString().slice(0, 500) || null; + this.dbInstance.prepare(` + UPDATE response_queue + SET status = 'queued', + attempts = ?, + next_attempt_at = ?, + last_error = COALESCE(?, last_error), + last_status_code = COALESCE(?, last_status_code), + updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') + WHERE id = ? + `).run(nextAttempts, nextAttemptAt, msg, statusCode ?? null, id); }, async workerLoop(workerId: number) { @@ -167,12 +207,31 @@ export const ResponseQueue = { } for (const item of batch) { - const ok = await this.sendOne(item); - if (ok) { - this.markSent(item.id); - } else { - this.markFailed(item.id, 'send failed'); + const result = await this.sendOne(item); + if (result.ok) { + this.markSent(item.id, result.status); + continue; } + + const status = result.status; + const attemptsNow = (item.attempts || 0) + 1; + const errMsg = result.error || 'send failed'; + + // 4xx = fallo definitivo + if (typeof status === 'number' && status >= 400 && status < 500) { + this.markFailed(item.id, errMsg, status, attemptsNow); + continue; + } + + // 5xx o error de red: reintento con backoff si no superó el máximo + if (attemptsNow >= this.MAX_ATTEMPTS) { + this.markFailed(item.id, errMsg, status, attemptsNow); + continue; + } + + const delayMs = this.computeDelayMs(attemptsNow); + const when = this.futureIso(delayMs); + this.requeueWithBackoff(item.id, attemptsNow, when, status ?? null, errMsg); } } catch (err) { console.error(`ResponseQueue worker ${workerId} error:`, err);