diff --git a/src/services/response-queue.ts b/src/services/response-queue.ts index a112952..67c385e 100644 --- a/src/services/response-queue.ts +++ b/src/services/response-queue.ts @@ -2,125 +2,125 @@ import type { Database } from 'bun:sqlite'; import { db } from '../db'; type QueuedResponse = { - recipient: string; - message: string; - mentions?: string[]; // full JIDs to mention (e.g., '346xxx@s.whatsapp.net') + recipient: string; + message: string; + mentions?: string[]; // full JIDs to mention (e.g., '346xxx@s.whatsapp.net') }; type ClaimedItem = { - id: number; - recipient: string; - message: string; - metadata?: string | null; // JSON-encoded metadata (e.g., { mentioned: [...] }) + id: number; + recipient: string; + message: string; + metadata?: string | null; // JSON-encoded metadata (e.g., { mentioned: [...] }) }; export const ResponseQueue = { - // Permite inyectar una DB distinta en tests si se necesita - dbInstance: db as Database, + // Permite inyectar una DB distinta en tests si se necesita + dbInstance: db as Database, - // Conservamos la cola en memoria por compatibilidad, aunque no se usa para persistencia - queue: [] as QueuedResponse[], + // Conservamos la cola en memoria por compatibilidad, aunque no se usa para persistencia + queue: [] as QueuedResponse[], - // Configuración fija (MVP) - WORKERS: 2, - BATCH_SIZE: 10, - SLEEP_MS: 500, + // Configuración fija (MVP) + WORKERS: 2, + BATCH_SIZE: 10, + SLEEP_MS: 500, - _running: false, + _running: false, - async add(responses: QueuedResponse[]) { - try { - const botNumber = process.env.CHATBOT_PHONE_NUMBER; - const filtered = responses.filter(r => - r.recipient && - r.message && - (!botNumber || r.recipient !== botNumber) - ); + async add(responses: QueuedResponse[]) { + try { + const botNumber = process.env.CHATBOT_PHONE_NUMBER; + const filtered = responses.filter(r => + r.recipient && + r.message && + (!botNumber || r.recipient !== botNumber) + ); - if (filtered.length === 0) { - return; - } + if (filtered.length === 0) { + return; + } - const insert = this.dbInstance.prepare(` + const insert = this.dbInstance.prepare(` INSERT INTO response_queue (recipient, message, metadata) VALUES (?, ?, ?) `); - this.dbInstance.transaction((rows: QueuedResponse[]) => { - for (const r of rows) { - const metadata = - r.mentions && r.mentions.length > 0 - ? JSON.stringify({ mentioned: r.mentions }) - : null; - insert.run(r.recipient, r.message, metadata); - } - })(filtered); - - console.log('Queued responses (persisted):', filtered.length); - } catch (err) { - console.error('Failed to persist queued responses:', err); - throw err; - } - }, - - getHeaders(): HeadersInit { - return { - apikey: process.env.EVOLUTION_API_KEY || '', - 'Content-Type': 'application/json', - }; - }, - - async sendOne(item: ClaimedItem): Promise { - const baseUrl = process.env.EVOLUTION_API_URL; - const instance = process.env.EVOLUTION_API_INSTANCE; - if (!baseUrl || !instance) { - console.error('Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE'); - return false; - } - - // Endpoint típico de Evolution API para texto simple - const url = `${baseUrl}/message/sendText/${instance}`; - - try { - // Build payload, adding mentioned JIDs if present in metadata - const payload: any = { - number: item.recipient, - text: item.message, - }; - - if (item.metadata) { - try { - const parsed = JSON.parse(item.metadata); - if (parsed && Array.isArray(parsed.mentioned) && parsed.mentioned.length > 0) { - payload.mentioned = parsed.mentioned; - } - } catch { - // ignore bad metadata - } - } - - const response = await fetch(url, { - method: 'POST', - headers: this.getHeaders(), - body: JSON.stringify(payload), - }); - - if (!response.ok) { - const body = await response.text().catch(() => ''); - console.warn('Send failed:', { status: response.status, body: body?.slice(0, 200) }); - return false; - } - - return true; - } catch (err) { - console.error('Network error sending message:', err); - return false; - } - }, - - claimNextBatch(limit: number): ClaimedItem[] { - // Selecciona y marca como 'processing' en una sola sentencia para evitar carreras - const rows = this.dbInstance.prepare(` + this.dbInstance.transaction((rows: QueuedResponse[]) => { + for (const r of rows) { + const metadata = + r.mentions && r.mentions.length > 0 + ? JSON.stringify({ mentioned: r.mentions }) + : null; + insert.run(r.recipient, r.message, metadata); + } + })(filtered); + + console.log('Queued responses (persisted):', filtered.length); + } catch (err) { + console.error('Failed to persist queued responses:', err); + throw err; + } + }, + + getHeaders(): HeadersInit { + return { + apikey: process.env.EVOLUTION_API_KEY || '', + 'Content-Type': 'application/json', + }; + }, + + async sendOne(item: ClaimedItem): Promise { + const baseUrl = process.env.EVOLUTION_API_URL; + const instance = process.env.EVOLUTION_API_INSTANCE; + if (!baseUrl || !instance) { + console.error('Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE'); + return false; + } + + // Endpoint típico de Evolution API para texto simple + const url = `${baseUrl}/message/sendText/${instance}`; + + try { + // Build payload, adding mentioned JIDs if present in metadata + const payload: any = { + number: item.recipient, + text: item.message, + }; + + if (item.metadata) { + try { + const parsed = JSON.parse(item.metadata); + if (parsed && Array.isArray(parsed.mentioned) && parsed.mentioned.length > 0) { + payload.mentioned = parsed.mentioned; + } + } catch { + // ignore bad metadata + } + } + + const response = await fetch(url, { + method: 'POST', + headers: this.getHeaders(), + body: JSON.stringify(payload), + }); + + if (!response.ok) { + const body = await response.text().catch(() => ''); + console.warn('Send failed:', { status: response.status, body: body?.slice(0, 200) }); + return false; + } + console.log(`✅ Sent message to with this as payload: ${JSON.stringify(payload)}`); + return true; + } catch (err) { + console.error('Network error sending message:', err); + return false; + } + }, + + claimNextBatch(limit: number): ClaimedItem[] { + // Selecciona y marca como 'processing' en una sola sentencia para evitar carreras + const rows = this.dbInstance.prepare(` UPDATE response_queue SET status = 'processing', updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') @@ -133,70 +133,70 @@ export const ResponseQueue = { RETURNING id, recipient, message, metadata `).all(limit) as ClaimedItem[]; - return rows || []; - }, + return rows || []; + }, - markSent(id: number) { - this.dbInstance.prepare(` + markSent(id: number) { + this.dbInstance.prepare(` UPDATE response_queue SET status = 'sent', updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') WHERE id = ? `).run(id); - }, + }, - markFailed(id: number, errorMsg: string) { - const msg = (errorMsg || '').toString().slice(0, 500); - this.dbInstance.prepare(` + markFailed(id: number, errorMsg: string) { + const msg = (errorMsg || '').toString().slice(0, 500); + this.dbInstance.prepare(` UPDATE response_queue SET status = 'failed', last_error = ?, updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') WHERE id = ? `).run(msg, id); - }, - - async workerLoop(workerId: number) { - while (this._running) { - try { - const batch = this.claimNextBatch(this.BATCH_SIZE); - - if (batch.length === 0) { - await new Promise(r => setTimeout(r, this.SLEEP_MS)); - continue; - } - - for (const item of batch) { - const ok = await this.sendOne(item); - if (ok) { - this.markSent(item.id); - } else { - this.markFailed(item.id, 'send failed'); - } - } - } catch (err) { - console.error(`ResponseQueue worker ${workerId} error:`, err); - // Evitar bucle apretado ante errores - await new Promise(r => setTimeout(r, this.SLEEP_MS)); - } - } - }, - - async process() { - // Inicia N workers en background, retorna inmediatamente - if (this._running) { - return; - } - this._running = true; - console.log(`Starting ResponseQueue with ${this.WORKERS} workers`); - - for (let i = 0; i < this.WORKERS; i++) { - // No await: correr en paralelo - this.workerLoop(i + 1); - } - }, - - stop() { - this._running = false; - } + }, + + async workerLoop(workerId: number) { + while (this._running) { + try { + const batch = this.claimNextBatch(this.BATCH_SIZE); + + if (batch.length === 0) { + await new Promise(r => setTimeout(r, this.SLEEP_MS)); + continue; + } + + for (const item of batch) { + const ok = await this.sendOne(item); + if (ok) { + this.markSent(item.id); + } else { + this.markFailed(item.id, 'send failed'); + } + } + } catch (err) { + console.error(`ResponseQueue worker ${workerId} error:`, err); + // Evitar bucle apretado ante errores + await new Promise(r => setTimeout(r, this.SLEEP_MS)); + } + } + }, + + async process() { + // Inicia N workers en background, retorna inmediatamente + if (this._running) { + return; + } + this._running = true; + console.log(`Starting ResponseQueue with ${this.WORKERS} workers`); + + for (let i = 0; i < this.WORKERS; i++) { + // No await: correr en paralelo + this.workerLoop(i + 1); + } + }, + + stop() { + this._running = false; + } };