feat: integrar EvolutionClient, limpieza de cola y parseo de metadata

Co-authored-by: aider (openrouter/openai/gpt-5) <aider@aider.chat>
main
brobert 1 month ago
parent a25fd4ee3b
commit 1b7420e123

@ -0,0 +1,63 @@
export type EvolutionResult = { ok: boolean; status?: number; error?: string };
export function buildHeaders(): HeadersInit {
return {
apikey: process.env.EVOLUTION_API_KEY || '',
'Content-Type': 'application/json'
};
}
export async function sendText(payload: { number: string; text: string; mentioned?: string[] }): Promise<EvolutionResult> {
const baseUrl = process.env.EVOLUTION_API_URL;
const instance = process.env.EVOLUTION_API_INSTANCE;
if (!baseUrl || !instance) {
const msg = 'Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE';
return { ok: false, error: msg };
}
const url = `${baseUrl}/message/sendText/${instance}`;
try {
const res = await fetch(url, {
method: 'POST',
headers: buildHeaders(),
body: JSON.stringify(payload)
});
if (!res.ok) {
const body = await res.text().catch(() => '');
const errTxt = body?.slice(0, 200) || `HTTP ${res.status}`;
return { ok: false, status: res.status, error: errTxt };
}
return { ok: true, status: res.status };
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
return { ok: false, error: errMsg };
}
}
export async function sendReaction(payload: {
key: { remoteJid: string; id: string; fromMe: boolean; participant?: string };
reaction: string;
}): Promise<EvolutionResult> {
const baseUrl = process.env.EVOLUTION_API_URL;
const instance = process.env.EVOLUTION_API_INSTANCE;
if (!baseUrl || !instance) {
const msg = 'Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE';
return { ok: false, error: msg };
}
const url = `${baseUrl}/message/sendReaction/${instance}`;
try {
const res = await fetch(url, {
method: 'POST',
headers: buildHeaders(),
body: JSON.stringify(payload)
});
if (!res.ok) {
const body = await res.text().catch(() => '');
const errTxt = body?.slice(0, 200) || `HTTP ${res.status}`;
return { ok: false, status: res.status, error: errTxt };
}
return { ok: true, status: res.status };
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
return { ok: false, error: errMsg };
}
}

@ -0,0 +1,74 @@
import type { Database } from 'bun:sqlite';
import { toIsoSqlUTC } from '../../utils/datetime';
export type CleanupOptions = {
retentionDaysSent: number;
retentionDaysFailed: number;
batchSize: number;
optimize: boolean;
vacuum: boolean;
vacuumEveryNRuns: number;
cleanupRunCount: number;
};
export async function runCleanupOnce(
db: Database,
opts: CleanupOptions,
now: Date = new Date()
): Promise<{ deletedSent: number; deletedFailed: number; totalDeleted: number; nextCleanupRunCount: number }> {
const msPerDay = 24 * 60 * 60 * 1000;
const sentThresholdIso = toIsoSqlUTC(new Date(now.getTime() - opts.retentionDaysSent * msPerDay));
const failedThresholdIso = toIsoSqlUTC(new Date(now.getTime() - opts.retentionDaysFailed * msPerDay));
const cleanStatus = (status: 'sent' | 'failed', thresholdIso: string, batch: number): number => {
let deleted = 0;
const selectStmt = db.prepare(`
SELECT id
FROM response_queue
WHERE status = ? AND updated_at < ?
ORDER BY updated_at
LIMIT ?
`);
while (true) {
const rows = selectStmt.all(status, thresholdIso, batch) as Array<{ id: number }>;
if (!rows || rows.length === 0) break;
const ids = rows.map((r) => r.id);
const placeholders = ids.map(() => '?').join(',');
db.prepare(`DELETE FROM response_queue WHERE id IN (${placeholders})`).run(...ids);
deleted += ids.length;
if (rows.length < batch) break;
}
return deleted;
};
const deletedSent = cleanStatus('sent', sentThresholdIso, opts.batchSize);
const deletedFailed = cleanStatus('failed', failedThresholdIso, opts.batchSize);
const totalDeleted = deletedSent + deletedFailed;
// Mantenimiento ligero tras limpieza
if (opts.optimize && totalDeleted > 0) {
try {
db.exec('PRAGMA optimize;');
} catch (e) {
console.warn('PRAGMA optimize failed:', e);
}
}
// VACUUM opcional
let nextCleanupRunCount = opts.cleanupRunCount;
if (opts.vacuum && totalDeleted > 0) {
nextCleanupRunCount++;
if (nextCleanupRunCount % Math.max(1, opts.vacuumEveryNRuns) === 0) {
try {
db.exec('VACUUM;');
} catch (e) {
console.warn('VACUUM failed:', e);
}
}
}
return { deletedSent, deletedFailed, totalDeleted, nextCleanupRunCount };
}

@ -0,0 +1,46 @@
export type OnboardingMeta = {
kind: 'onboarding';
variant: 'initial' | 'reminder';
part: 1 | 2;
bundle_id: string;
group_id?: string | null;
task_id?: number | null;
display_code?: number | null;
};
export type ReactionMeta = {
kind: 'reaction';
emoji: string;
chatId: string;
messageId: string;
participant?: string;
fromMe?: boolean;
};
export type QueueMetadata = OnboardingMeta | ReactionMeta | Record<string, any>;
export function parseQueueMetadata(raw: string | null | undefined): QueueMetadata | null {
if (!raw) return null;
try {
const obj = JSON.parse(String(raw));
if (!obj || typeof obj !== 'object') return null;
const kind = String((obj as any).kind || '');
if (kind === 'reaction') {
// Validación mínima
return {
kind: 'reaction',
emoji: String((obj as any).emoji || ''),
chatId: String((obj as any).chatId || ''),
messageId: String((obj as any).messageId || ''),
participant: typeof (obj as any).participant === 'string' ? String((obj as any).participant) : undefined,
fromMe: typeof (obj as any).fromMe === 'boolean' ? Boolean((obj as any).fromMe) : undefined
} as ReactionMeta;
}
if (kind === 'onboarding') {
return obj as OnboardingMeta;
}
return obj as Record<string, any>;
} catch {
return null;
}
}

@ -4,6 +4,9 @@ import { IdentityService } from './identity';
import { normalizeWhatsAppId } from '../utils/whatsapp';
import { Metrics } from './metrics';
import { toIsoSqlUTC } from '../utils/datetime';
import * as EvolutionClient from '../clients/evolution';
import { runCleanupOnce as cleanupRunOnce } from './queue/cleanup';
import { parseQueueMetadata } from './queue/metadata';
const MAX_FALLBACK_DIGITS = (() => {
const raw = (process.env.ONBOARDING_FALLBACK_MAX_DIGITS || '').trim();
@ -260,10 +263,8 @@ export const ResponseQueue = {
}
// Detectar jobs de reacción
let meta: any = null;
try { meta = item.metadata ? JSON.parse(item.metadata) : null; } catch {}
const meta = parseQueueMetadata(item.metadata);
if (meta && meta.kind === 'reaction') {
const reactionUrl = `${baseUrl}/message/sendReaction/${instance}`;
const chatId = String(meta.chatId || '');
const messageId = String(meta.messageId || '');
const emoji = String(meta.emoji || '');
@ -276,32 +277,17 @@ export const ResponseQueue = {
if (meta.participant) {
key.participant = String(meta.participant);
}
const payload = {
key,
reaction: emoji
};
try {
const response = await fetch(reactionUrl, {
method: 'POST',
headers: this.getHeaders(),
body: JSON.stringify(payload),
});
if (!response.ok) {
const body = await response.text().catch(() => '');
const errTxt = body?.slice(0, 200) || `HTTP ${response.status}`;
console.warn('Send reaction failed:', { status: response.status, body: errTxt });
const payload = { key, reaction: emoji };
const result = await EvolutionClient.sendReaction(payload);
if (!result.ok) {
const errTxt = result.error || (typeof result.status === 'number' ? `HTTP ${result.status}` : 'unknown_error');
console.warn('Send reaction failed:', { status: result.status, body: errTxt });
try { Metrics.inc('reactions_failed_total', 1, { emoji: emojiLabel }); } catch {}
return { ok: false, status: response.status, error: errTxt };
return { ok: false, status: result.status, error: errTxt };
}
console.log(`✅ Sent reaction with payload: ${JSON.stringify(payload)}`);
try { Metrics.inc('reactions_sent_total', 1, { emoji: emojiLabel }); } catch {}
return { ok: true, status: response.status };
} catch (err) {
const errMsg = (err instanceof Error ? err.message : String(err));
console.error('Network error sending reaction:', errMsg);
try { Metrics.inc('reactions_failed_total', 1, { emoji: emojiLabel }); } catch {}
return { ok: false, error: errMsg };
}
return { ok: true, status: result.status };
}
// Endpoint típico de Evolution API para texto simple
@ -372,20 +358,16 @@ export const ResponseQueue = {
}
}
const response = await fetch(url, {
method: 'POST',
headers: this.getHeaders(),
body: JSON.stringify(payload),
});
if (!response.ok) {
const body = await response.text().catch(() => '');
const errTxt = body?.slice(0, 200) || `HTTP ${response.status}`;
console.warn('Send failed:', { status: response.status, body: errTxt });
return { ok: false, status: response.status, error: errTxt };
{
const result = await EvolutionClient.sendText(payload);
if (!result.ok) {
const errTxt = result.error || (typeof result.status === 'number' ? `HTTP ${result.status}` : 'unknown_error');
console.warn('Send failed:', { status: result.status, body: errTxt });
return { ok: false, status: result.status, error: errTxt };
}
console.log(`✅ Sent message with payload: ${JSON.stringify(payload)}`);
return { ok: true, status: response.status };
return { ok: true, status: result.status };
}
} catch (err) {
const errMsg = (err instanceof Error ? err.message : String(err));
console.error('Network error sending message:', errMsg);
@ -576,66 +558,23 @@ export const ResponseQueue = {
const startedAt = Date.now();
try {
const msPerDay = 24 * 60 * 60 * 1000;
const sentThresholdIso = toIsoSqlUTC(new Date(now.getTime() - this.RETENTION_DAYS_SENT * msPerDay));
const failedThresholdIso = toIsoSqlUTC(new Date(now.getTime() - this.RETENTION_DAYS_FAILED * msPerDay));
const cleanStatus = (status: 'sent' | 'failed', thresholdIso: string): number => {
let deleted = 0;
const selectStmt = this.dbInstance.prepare(`
SELECT id
FROM response_queue
WHERE status = ? AND updated_at < ?
ORDER BY updated_at
LIMIT ?
`);
while (true) {
const rows = selectStmt.all(status, thresholdIso, this.CLEANUP_BATCH) as Array<{ id: number }>;
if (!rows || rows.length === 0) break;
const ids = rows.map(r => r.id);
const placeholders = ids.map(() => '?').join(',');
this.dbInstance.prepare(`DELETE FROM response_queue WHERE id IN (${placeholders})`).run(...ids);
deleted += ids.length;
// Si el lote es menor que el batch, no quedan más candidatos
if (rows.length < this.CLEANUP_BATCH) break;
}
return deleted;
};
const deletedSent = cleanStatus('sent', sentThresholdIso);
const deletedFailed = cleanStatus('failed', failedThresholdIso);
const totalDeleted = deletedSent + deletedFailed;
// Mantenimiento ligero tras limpieza
if (this.OPTIMIZE_ENABLED && totalDeleted > 0) {
try {
this.dbInstance.exec('PRAGMA optimize;');
} catch (e) {
console.warn('PRAGMA optimize failed:', e);
}
}
// VACUUM opcional (desactivado por defecto)
if (this.VACUUM_ENABLED && totalDeleted > 0) {
this._cleanupRunCount++;
if (this._cleanupRunCount % Math.max(1, this.VACUUM_EVERY_N_RUNS) === 0) {
try {
this.dbInstance.exec('VACUUM;');
} catch (e) {
console.warn('VACUUM failed:', e);
}
}
}
const res = await cleanupRunOnce(this.dbInstance, {
retentionDaysSent: this.RETENTION_DAYS_SENT,
retentionDaysFailed: this.RETENTION_DAYS_FAILED,
batchSize: this.CLEANUP_BATCH,
optimize: this.OPTIMIZE_ENABLED,
vacuum: this.VACUUM_ENABLED,
vacuumEveryNRuns: this.VACUUM_EVERY_N_RUNS,
cleanupRunCount: this._cleanupRunCount
}, now);
this._cleanupRunCount = res.nextCleanupRunCount;
const tookMs = Date.now() - startedAt;
if (process.env.NODE_ENV !== 'test') {
console.log(`🧹 Cleanup done in ${tookMs}ms: sent=${deletedSent}, failed=${deletedFailed}, total=${totalDeleted}`);
console.log(`🧹 Cleanup done in ${tookMs}ms: sent=${res.deletedSent}, failed=${res.deletedFailed}, total=${res.totalDeleted}`);
}
return { deletedSent, deletedFailed, totalDeleted, skipped: false };
return { deletedSent: res.deletedSent, deletedFailed: res.deletedFailed, totalDeleted: res.totalDeleted, skipped: false };
} catch (err) {
console.error('Cleanup error:', err);
return { deletedSent: 0, deletedFailed: 0, totalDeleted: 0, skipped: false };

Loading…
Cancel
Save