From 8bdb8510622905174efb5ecec855e0c2b694eb7e Mon Sep 17 00:00:00 2001 From: borja Date: Fri, 5 Sep 2025 19:41:29 +0200 Subject: [PATCH] feat: iniciar worker de ResponseQueue y mejorar procesamiento Co-authored-by: aider (openrouter/openai/gpt-5) --- src/server.ts | 8 ++ src/services/response-queue.ts | 136 ++++++++++++++++++++++++++++++++- 2 files changed, 142 insertions(+), 2 deletions(-) diff --git a/src/server.ts b/src/server.ts index cf62a1c..c30d056 100644 --- a/src/server.ts +++ b/src/server.ts @@ -271,6 +271,14 @@ export class WebhookServer { // Initialize groups - critical for operation await GroupSyncService.checkInitialGroups(); + + // Start response queue worker (background) + try { + await ResponseQueue.process(); + console.log('✅ ResponseQueue worker started'); + } catch (e) { + console.error('❌ Failed to start ResponseQueue worker:', e); + } } catch (error) { console.error('❌ Failed to setup webhook:', error instanceof Error ? error.message : error); process.exit(1); diff --git a/src/services/response-queue.ts b/src/services/response-queue.ts index 6a9409a..c4bd4c8 100644 --- a/src/services/response-queue.ts +++ b/src/services/response-queue.ts @@ -6,13 +6,26 @@ type QueuedResponse = { message: string; }; +type ClaimedItem = { + id: number; + recipient: string; + message: string; +}; + export const ResponseQueue = { // Permite inyectar una DB distinta en tests si se necesita dbInstance: db as Database, // Conservamos la cola en memoria por compatibilidad, aunque no se usa para persistencia queue: [] as QueuedResponse[], - + + // Configuración fija (MVP) + WORKERS: 2, + BATCH_SIZE: 10, + SLEEP_MS: 500, + + _running: false, + async add(responses: QueuedResponse[]) { try { const botNumber = process.env.CHATBOT_PHONE_NUMBER; @@ -44,7 +57,126 @@ export const ResponseQueue = { } }, + getHeaders(): HeadersInit { + return { + apikey: process.env.EVOLUTION_API_KEY || '', + 'Content-Type': 'application/json', + }; + }, + + async sendOne(item: ClaimedItem): Promise { + const baseUrl = process.env.EVOLUTION_API_URL; + const instance = process.env.EVOLUTION_API_INSTANCE; + if (!baseUrl || !instance) { + console.error('Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE'); + return false; + } + + // Endpoint típico de Evolution API para texto simple + const url = `${baseUrl}/message/sendText/${instance}`; + + try { + const response = await fetch(url, { + method: 'POST', + headers: this.getHeaders(), + body: JSON.stringify({ + number: item.recipient, + text: item.message, + }), + }); + + if (!response.ok) { + const body = await response.text().catch(() => ''); + console.warn('Send failed:', { status: response.status, body: body?.slice(0, 200) }); + return false; + } + + return true; + } catch (err) { + console.error('Network error sending message:', err); + return false; + } + }, + + claimNextBatch(limit: number): ClaimedItem[] { + // Selecciona y marca como 'processing' en una sola sentencia para evitar carreras + const rows = this.dbInstance.prepare(` + UPDATE response_queue + SET status = 'processing', + updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') + WHERE id IN ( + SELECT id FROM response_queue + WHERE status = 'queued' + ORDER BY created_at, id + LIMIT ? + ) + RETURNING id, recipient, message + `).all(limit) as ClaimedItem[]; + + return rows || []; + }, + + markSent(id: number) { + this.dbInstance.prepare(` + UPDATE response_queue + SET status = 'sent', + updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') + WHERE id = ? + `).run(id); + }, + + markFailed(id: number, errorMsg: string) { + const msg = (errorMsg || '').toString().slice(0, 500); + this.dbInstance.prepare(` + UPDATE response_queue + SET status = 'failed', + last_error = ?, + updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') + WHERE id = ? + `).run(msg, id); + }, + + async workerLoop(workerId: number) { + while (this._running) { + try { + const batch = this.claimNextBatch(this.BATCH_SIZE); + + if (batch.length === 0) { + await new Promise(r => setTimeout(r, this.SLEEP_MS)); + continue; + } + + for (const item of batch) { + const ok = await this.sendOne(item); + if (ok) { + this.markSent(item.id); + } else { + this.markFailed(item.id, 'send failed'); + } + } + } catch (err) { + console.error(`ResponseQueue worker ${workerId} error:`, err); + // Evitar bucle apretado ante errores + await new Promise(r => setTimeout(r, this.SLEEP_MS)); + } + } + }, + async process() { - // Will implement actual processing later + // Inicia N workers en background, retorna inmediatamente + if (this._running) { + return; + } + this._running = true; + console.log(`Starting ResponseQueue with ${this.WORKERS} workers`); + + for (let i = 0; i < this.WORKERS; i++) { + // No await: correr en paralelo + this.workerLoop(i + 1); + } + }, + + stop() { + this._running = false; } };