feat: iniciar worker de ResponseQueue y mejorar procesamiento

Co-authored-by: aider (openrouter/openai/gpt-5) <aider@aider.chat>
pull/1/head
borja 2 months ago
parent 49b21c6ead
commit 8bdb851062

@ -271,6 +271,14 @@ export class WebhookServer {
// Initialize groups - critical for operation // Initialize groups - critical for operation
await GroupSyncService.checkInitialGroups(); await GroupSyncService.checkInitialGroups();
// Start response queue worker (background)
try {
await ResponseQueue.process();
console.log('✅ ResponseQueue worker started');
} catch (e) {
console.error('❌ Failed to start ResponseQueue worker:', e);
}
} catch (error) { } catch (error) {
console.error('❌ Failed to setup webhook:', error instanceof Error ? error.message : error); console.error('❌ Failed to setup webhook:', error instanceof Error ? error.message : error);
process.exit(1); process.exit(1);

@ -6,6 +6,12 @@ type QueuedResponse = {
message: string; message: string;
}; };
type ClaimedItem = {
id: number;
recipient: string;
message: string;
};
export const ResponseQueue = { export const ResponseQueue = {
// Permite inyectar una DB distinta en tests si se necesita // Permite inyectar una DB distinta en tests si se necesita
dbInstance: db as Database, dbInstance: db as Database,
@ -13,6 +19,13 @@ export const ResponseQueue = {
// Conservamos la cola en memoria por compatibilidad, aunque no se usa para persistencia // Conservamos la cola en memoria por compatibilidad, aunque no se usa para persistencia
queue: [] as QueuedResponse[], queue: [] as QueuedResponse[],
// Configuración fija (MVP)
WORKERS: 2,
BATCH_SIZE: 10,
SLEEP_MS: 500,
_running: false,
async add(responses: QueuedResponse[]) { async add(responses: QueuedResponse[]) {
try { try {
const botNumber = process.env.CHATBOT_PHONE_NUMBER; const botNumber = process.env.CHATBOT_PHONE_NUMBER;
@ -44,7 +57,126 @@ export const ResponseQueue = {
} }
}, },
getHeaders(): HeadersInit {
return {
apikey: process.env.EVOLUTION_API_KEY || '',
'Content-Type': 'application/json',
};
},
async sendOne(item: ClaimedItem): Promise<boolean> {
const baseUrl = process.env.EVOLUTION_API_URL;
const instance = process.env.EVOLUTION_API_INSTANCE;
if (!baseUrl || !instance) {
console.error('Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE');
return false;
}
// Endpoint típico de Evolution API para texto simple
const url = `${baseUrl}/message/sendText/${instance}`;
try {
const response = await fetch(url, {
method: 'POST',
headers: this.getHeaders(),
body: JSON.stringify({
number: item.recipient,
text: item.message,
}),
});
if (!response.ok) {
const body = await response.text().catch(() => '');
console.warn('Send failed:', { status: response.status, body: body?.slice(0, 200) });
return false;
}
return true;
} catch (err) {
console.error('Network error sending message:', err);
return false;
}
},
claimNextBatch(limit: number): ClaimedItem[] {
// Selecciona y marca como 'processing' en una sola sentencia para evitar carreras
const rows = this.dbInstance.prepare(`
UPDATE response_queue
SET status = 'processing',
updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now')
WHERE id IN (
SELECT id FROM response_queue
WHERE status = 'queued'
ORDER BY created_at, id
LIMIT ?
)
RETURNING id, recipient, message
`).all(limit) as ClaimedItem[];
return rows || [];
},
markSent(id: number) {
this.dbInstance.prepare(`
UPDATE response_queue
SET status = 'sent',
updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now')
WHERE id = ?
`).run(id);
},
markFailed(id: number, errorMsg: string) {
const msg = (errorMsg || '').toString().slice(0, 500);
this.dbInstance.prepare(`
UPDATE response_queue
SET status = 'failed',
last_error = ?,
updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now')
WHERE id = ?
`).run(msg, id);
},
async workerLoop(workerId: number) {
while (this._running) {
try {
const batch = this.claimNextBatch(this.BATCH_SIZE);
if (batch.length === 0) {
await new Promise(r => setTimeout(r, this.SLEEP_MS));
continue;
}
for (const item of batch) {
const ok = await this.sendOne(item);
if (ok) {
this.markSent(item.id);
} else {
this.markFailed(item.id, 'send failed');
}
}
} catch (err) {
console.error(`ResponseQueue worker ${workerId} error:`, err);
// Evitar bucle apretado ante errores
await new Promise(r => setTimeout(r, this.SLEEP_MS));
}
}
},
async process() { async process() {
// Will implement actual processing later // Inicia N workers en background, retorna inmediatamente
if (this._running) {
return;
}
this._running = true;
console.log(`Starting ResponseQueue with ${this.WORKERS} workers`);
for (let i = 0; i < this.WORKERS; i++) {
// No await: correr en paralelo
this.workerLoop(i + 1);
}
},
stop() {
this._running = false;
} }
}; };

Loading…
Cancel
Save