|
|
|
|
@ -2,125 +2,125 @@ import type { Database } from 'bun:sqlite';
|
|
|
|
|
import { db } from '../db';
|
|
|
|
|
|
|
|
|
|
type QueuedResponse = {
|
|
|
|
|
recipient: string;
|
|
|
|
|
message: string;
|
|
|
|
|
mentions?: string[]; // full JIDs to mention (e.g., '346xxx@s.whatsapp.net')
|
|
|
|
|
recipient: string;
|
|
|
|
|
message: string;
|
|
|
|
|
mentions?: string[]; // full JIDs to mention (e.g., '346xxx@s.whatsapp.net')
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
type ClaimedItem = {
|
|
|
|
|
id: number;
|
|
|
|
|
recipient: string;
|
|
|
|
|
message: string;
|
|
|
|
|
metadata?: string | null; // JSON-encoded metadata (e.g., { mentioned: [...] })
|
|
|
|
|
id: number;
|
|
|
|
|
recipient: string;
|
|
|
|
|
message: string;
|
|
|
|
|
metadata?: string | null; // JSON-encoded metadata (e.g., { mentioned: [...] })
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
export const ResponseQueue = {
|
|
|
|
|
// Permite inyectar una DB distinta en tests si se necesita
|
|
|
|
|
dbInstance: db as Database,
|
|
|
|
|
// Permite inyectar una DB distinta en tests si se necesita
|
|
|
|
|
dbInstance: db as Database,
|
|
|
|
|
|
|
|
|
|
// Conservamos la cola en memoria por compatibilidad, aunque no se usa para persistencia
|
|
|
|
|
queue: [] as QueuedResponse[],
|
|
|
|
|
// Conservamos la cola en memoria por compatibilidad, aunque no se usa para persistencia
|
|
|
|
|
queue: [] as QueuedResponse[],
|
|
|
|
|
|
|
|
|
|
// Configuración fija (MVP)
|
|
|
|
|
WORKERS: 2,
|
|
|
|
|
BATCH_SIZE: 10,
|
|
|
|
|
SLEEP_MS: 500,
|
|
|
|
|
// Configuración fija (MVP)
|
|
|
|
|
WORKERS: 2,
|
|
|
|
|
BATCH_SIZE: 10,
|
|
|
|
|
SLEEP_MS: 500,
|
|
|
|
|
|
|
|
|
|
_running: false,
|
|
|
|
|
_running: false,
|
|
|
|
|
|
|
|
|
|
async add(responses: QueuedResponse[]) {
|
|
|
|
|
try {
|
|
|
|
|
const botNumber = process.env.CHATBOT_PHONE_NUMBER;
|
|
|
|
|
const filtered = responses.filter(r =>
|
|
|
|
|
r.recipient &&
|
|
|
|
|
r.message &&
|
|
|
|
|
(!botNumber || r.recipient !== botNumber)
|
|
|
|
|
);
|
|
|
|
|
async add(responses: QueuedResponse[]) {
|
|
|
|
|
try {
|
|
|
|
|
const botNumber = process.env.CHATBOT_PHONE_NUMBER;
|
|
|
|
|
const filtered = responses.filter(r =>
|
|
|
|
|
r.recipient &&
|
|
|
|
|
r.message &&
|
|
|
|
|
(!botNumber || r.recipient !== botNumber)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if (filtered.length === 0) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (filtered.length === 0) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const insert = this.dbInstance.prepare(`
|
|
|
|
|
const insert = this.dbInstance.prepare(`
|
|
|
|
|
INSERT INTO response_queue (recipient, message, metadata)
|
|
|
|
|
VALUES (?, ?, ?)
|
|
|
|
|
`);
|
|
|
|
|
|
|
|
|
|
this.dbInstance.transaction((rows: QueuedResponse[]) => {
|
|
|
|
|
for (const r of rows) {
|
|
|
|
|
const metadata =
|
|
|
|
|
r.mentions && r.mentions.length > 0
|
|
|
|
|
? JSON.stringify({ mentioned: r.mentions })
|
|
|
|
|
: null;
|
|
|
|
|
insert.run(r.recipient, r.message, metadata);
|
|
|
|
|
}
|
|
|
|
|
})(filtered);
|
|
|
|
|
|
|
|
|
|
console.log('Queued responses (persisted):', filtered.length);
|
|
|
|
|
} catch (err) {
|
|
|
|
|
console.error('Failed to persist queued responses:', err);
|
|
|
|
|
throw err;
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
getHeaders(): HeadersInit {
|
|
|
|
|
return {
|
|
|
|
|
apikey: process.env.EVOLUTION_API_KEY || '',
|
|
|
|
|
'Content-Type': 'application/json',
|
|
|
|
|
};
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
async sendOne(item: ClaimedItem): Promise<boolean> {
|
|
|
|
|
const baseUrl = process.env.EVOLUTION_API_URL;
|
|
|
|
|
const instance = process.env.EVOLUTION_API_INSTANCE;
|
|
|
|
|
if (!baseUrl || !instance) {
|
|
|
|
|
console.error('Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE');
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Endpoint típico de Evolution API para texto simple
|
|
|
|
|
const url = `${baseUrl}/message/sendText/${instance}`;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
// Build payload, adding mentioned JIDs if present in metadata
|
|
|
|
|
const payload: any = {
|
|
|
|
|
number: item.recipient,
|
|
|
|
|
text: item.message,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if (item.metadata) {
|
|
|
|
|
try {
|
|
|
|
|
const parsed = JSON.parse(item.metadata);
|
|
|
|
|
if (parsed && Array.isArray(parsed.mentioned) && parsed.mentioned.length > 0) {
|
|
|
|
|
payload.mentioned = parsed.mentioned;
|
|
|
|
|
}
|
|
|
|
|
} catch {
|
|
|
|
|
// ignore bad metadata
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const response = await fetch(url, {
|
|
|
|
|
method: 'POST',
|
|
|
|
|
headers: this.getHeaders(),
|
|
|
|
|
body: JSON.stringify(payload),
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if (!response.ok) {
|
|
|
|
|
const body = await response.text().catch(() => '');
|
|
|
|
|
console.warn('Send failed:', { status: response.status, body: body?.slice(0, 200) });
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
} catch (err) {
|
|
|
|
|
console.error('Network error sending message:', err);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
claimNextBatch(limit: number): ClaimedItem[] {
|
|
|
|
|
// Selecciona y marca como 'processing' en una sola sentencia para evitar carreras
|
|
|
|
|
const rows = this.dbInstance.prepare(`
|
|
|
|
|
this.dbInstance.transaction((rows: QueuedResponse[]) => {
|
|
|
|
|
for (const r of rows) {
|
|
|
|
|
const metadata =
|
|
|
|
|
r.mentions && r.mentions.length > 0
|
|
|
|
|
? JSON.stringify({ mentioned: r.mentions })
|
|
|
|
|
: null;
|
|
|
|
|
insert.run(r.recipient, r.message, metadata);
|
|
|
|
|
}
|
|
|
|
|
})(filtered);
|
|
|
|
|
|
|
|
|
|
console.log('Queued responses (persisted):', filtered.length);
|
|
|
|
|
} catch (err) {
|
|
|
|
|
console.error('Failed to persist queued responses:', err);
|
|
|
|
|
throw err;
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
getHeaders(): HeadersInit {
|
|
|
|
|
return {
|
|
|
|
|
apikey: process.env.EVOLUTION_API_KEY || '',
|
|
|
|
|
'Content-Type': 'application/json',
|
|
|
|
|
};
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
async sendOne(item: ClaimedItem): Promise<boolean> {
|
|
|
|
|
const baseUrl = process.env.EVOLUTION_API_URL;
|
|
|
|
|
const instance = process.env.EVOLUTION_API_INSTANCE;
|
|
|
|
|
if (!baseUrl || !instance) {
|
|
|
|
|
console.error('Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE');
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Endpoint típico de Evolution API para texto simple
|
|
|
|
|
const url = `${baseUrl}/message/sendText/${instance}`;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
// Build payload, adding mentioned JIDs if present in metadata
|
|
|
|
|
const payload: any = {
|
|
|
|
|
number: item.recipient,
|
|
|
|
|
text: item.message,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if (item.metadata) {
|
|
|
|
|
try {
|
|
|
|
|
const parsed = JSON.parse(item.metadata);
|
|
|
|
|
if (parsed && Array.isArray(parsed.mentioned) && parsed.mentioned.length > 0) {
|
|
|
|
|
payload.mentioned = parsed.mentioned;
|
|
|
|
|
}
|
|
|
|
|
} catch {
|
|
|
|
|
// ignore bad metadata
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const response = await fetch(url, {
|
|
|
|
|
method: 'POST',
|
|
|
|
|
headers: this.getHeaders(),
|
|
|
|
|
body: JSON.stringify(payload),
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if (!response.ok) {
|
|
|
|
|
const body = await response.text().catch(() => '');
|
|
|
|
|
console.warn('Send failed:', { status: response.status, body: body?.slice(0, 200) });
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
console.log(`✅ Sent message to with this as payload: ${JSON.stringify(payload)}`);
|
|
|
|
|
return true;
|
|
|
|
|
} catch (err) {
|
|
|
|
|
console.error('Network error sending message:', err);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
claimNextBatch(limit: number): ClaimedItem[] {
|
|
|
|
|
// Selecciona y marca como 'processing' en una sola sentencia para evitar carreras
|
|
|
|
|
const rows = this.dbInstance.prepare(`
|
|
|
|
|
UPDATE response_queue
|
|
|
|
|
SET status = 'processing',
|
|
|
|
|
updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now')
|
|
|
|
|
@ -133,70 +133,70 @@ export const ResponseQueue = {
|
|
|
|
|
RETURNING id, recipient, message, metadata
|
|
|
|
|
`).all(limit) as ClaimedItem[];
|
|
|
|
|
|
|
|
|
|
return rows || [];
|
|
|
|
|
},
|
|
|
|
|
return rows || [];
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
markSent(id: number) {
|
|
|
|
|
this.dbInstance.prepare(`
|
|
|
|
|
markSent(id: number) {
|
|
|
|
|
this.dbInstance.prepare(`
|
|
|
|
|
UPDATE response_queue
|
|
|
|
|
SET status = 'sent',
|
|
|
|
|
updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now')
|
|
|
|
|
WHERE id = ?
|
|
|
|
|
`).run(id);
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
markFailed(id: number, errorMsg: string) {
|
|
|
|
|
const msg = (errorMsg || '').toString().slice(0, 500);
|
|
|
|
|
this.dbInstance.prepare(`
|
|
|
|
|
markFailed(id: number, errorMsg: string) {
|
|
|
|
|
const msg = (errorMsg || '').toString().slice(0, 500);
|
|
|
|
|
this.dbInstance.prepare(`
|
|
|
|
|
UPDATE response_queue
|
|
|
|
|
SET status = 'failed',
|
|
|
|
|
last_error = ?,
|
|
|
|
|
updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now')
|
|
|
|
|
WHERE id = ?
|
|
|
|
|
`).run(msg, id);
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
async workerLoop(workerId: number) {
|
|
|
|
|
while (this._running) {
|
|
|
|
|
try {
|
|
|
|
|
const batch = this.claimNextBatch(this.BATCH_SIZE);
|
|
|
|
|
|
|
|
|
|
if (batch.length === 0) {
|
|
|
|
|
await new Promise(r => setTimeout(r, this.SLEEP_MS));
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (const item of batch) {
|
|
|
|
|
const ok = await this.sendOne(item);
|
|
|
|
|
if (ok) {
|
|
|
|
|
this.markSent(item.id);
|
|
|
|
|
} else {
|
|
|
|
|
this.markFailed(item.id, 'send failed');
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (err) {
|
|
|
|
|
console.error(`ResponseQueue worker ${workerId} error:`, err);
|
|
|
|
|
// Evitar bucle apretado ante errores
|
|
|
|
|
await new Promise(r => setTimeout(r, this.SLEEP_MS));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
async process() {
|
|
|
|
|
// Inicia N workers en background, retorna inmediatamente
|
|
|
|
|
if (this._running) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
this._running = true;
|
|
|
|
|
console.log(`Starting ResponseQueue with ${this.WORKERS} workers`);
|
|
|
|
|
|
|
|
|
|
for (let i = 0; i < this.WORKERS; i++) {
|
|
|
|
|
// No await: correr en paralelo
|
|
|
|
|
this.workerLoop(i + 1);
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
stop() {
|
|
|
|
|
this._running = false;
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
async workerLoop(workerId: number) {
|
|
|
|
|
while (this._running) {
|
|
|
|
|
try {
|
|
|
|
|
const batch = this.claimNextBatch(this.BATCH_SIZE);
|
|
|
|
|
|
|
|
|
|
if (batch.length === 0) {
|
|
|
|
|
await new Promise(r => setTimeout(r, this.SLEEP_MS));
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (const item of batch) {
|
|
|
|
|
const ok = await this.sendOne(item);
|
|
|
|
|
if (ok) {
|
|
|
|
|
this.markSent(item.id);
|
|
|
|
|
} else {
|
|
|
|
|
this.markFailed(item.id, 'send failed');
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (err) {
|
|
|
|
|
console.error(`ResponseQueue worker ${workerId} error:`, err);
|
|
|
|
|
// Evitar bucle apretado ante errores
|
|
|
|
|
await new Promise(r => setTimeout(r, this.SLEEP_MS));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
async process() {
|
|
|
|
|
// Inicia N workers en background, retorna inmediatamente
|
|
|
|
|
if (this._running) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
this._running = true;
|
|
|
|
|
console.log(`Starting ResponseQueue with ${this.WORKERS} workers`);
|
|
|
|
|
|
|
|
|
|
for (let i = 0; i < this.WORKERS; i++) {
|
|
|
|
|
// No await: correr en paralelo
|
|
|
|
|
this.workerLoop(i + 1);
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
stop() {
|
|
|
|
|
this._running = false;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|