feat: habilitar reintentos con backoff exponencial + jitter en cola

Co-authored-by: aider (openrouter/openai/gpt-5) <aider@aider.chat>
pull/1/head
borja 2 months ago
parent e98e4ce1d0
commit 79b5d83f5b

@ -117,6 +117,44 @@ export function initializeDatabase(instance: Database) {
} catch (e) {
console.warn('[initializeDatabase] Skipped adding tasks.completed_by column:', e);
}
// Migration: ensure reliability columns exist on response_queue (next_attempt_at, lease_until, last_status_code)
try {
const cols = instance.query(`PRAGMA table_info('response_queue')`).all() as any[];
const hasNextAttempt = Array.isArray(cols) && cols.some((c: any) => c.name === 'next_attempt_at');
if (!hasNextAttempt) {
instance.exec(`ALTER TABLE response_queue ADD COLUMN next_attempt_at TEXT NULL;`);
}
const hasLeaseUntil = Array.isArray(cols) && cols.some((c: any) => c.name === 'lease_until');
if (!hasLeaseUntil) {
instance.exec(`ALTER TABLE response_queue ADD COLUMN lease_until TEXT NULL;`);
}
const hasLastStatus = Array.isArray(cols) && cols.some((c: any) => c.name === 'last_status_code');
if (!hasLastStatus) {
instance.exec(`ALTER TABLE response_queue ADD COLUMN last_status_code INTEGER NULL;`);
}
} catch (e) {
console.warn('[initializeDatabase] Skipped ensuring response_queue reliability columns:', e);
}
// Ensure supporting indexes exist
try {
instance.exec(`
CREATE INDEX IF NOT EXISTS idx_response_queue_status_next_attempt
ON response_queue (status, next_attempt_at);
`);
} catch (e) {
console.warn('[initializeDatabase] Skipped creating idx_response_queue_status_next_attempt:', e);
}
try {
instance.exec(`
CREATE INDEX IF NOT EXISTS idx_response_queue_status_lease_until
ON response_queue (status, lease_until);
`);
} catch (e) {
console.warn('[initializeDatabase] Skipped creating idx_response_queue_status_lease_until:', e);
}
}
/**

@ -12,6 +12,7 @@ type ClaimedItem = {
recipient: string;
message: string;
metadata?: string | null; // JSON-encoded metadata (e.g., { mentioned: [...] })
attempts: number;
};
export const ResponseQueue = {
@ -26,8 +27,26 @@ export const ResponseQueue = {
BATCH_SIZE: 10,
SLEEP_MS: 500,
// Reintentos con backoff exponencial + jitter (valores por defecto, override opcional por env)
MAX_ATTEMPTS: process.env.RQ_MAX_ATTEMPTS ? Number(process.env.RQ_MAX_ATTEMPTS) : 6,
BASE_BACKOFF_MS: process.env.RQ_BASE_BACKOFF_MS ? Number(process.env.RQ_BASE_BACKOFF_MS) : 5000,
MAX_BACKOFF_MS: process.env.RQ_MAX_BACKOFF_MS ? Number(process.env.RQ_MAX_BACKOFF_MS) : 3600000,
_running: false,
nowIso(): string {
return new Date().toISOString().replace('T', ' ').replace('Z', '');
},
futureIso(ms: number): string {
return new Date(Date.now() + ms).toISOString().replace('T', ' ').replace('Z', '');
},
computeDelayMs(attempt: number): number {
const exp = Math.min(this.MAX_BACKOFF_MS, this.BASE_BACKOFF_MS * 2 ** Math.max(0, attempt - 1));
return Math.floor(Math.random() * exp); // full jitter
},
async add(responses: QueuedResponse[]) {
try {
const botNumber = process.env.CHATBOT_PHONE_NUMBER;
@ -42,8 +61,8 @@ export const ResponseQueue = {
}
const insert = this.dbInstance.prepare(`
INSERT INTO response_queue (recipient, message, metadata)
VALUES (?, ?, ?)
INSERT INTO response_queue (recipient, message, metadata, next_attempt_at)
VALUES (?, ?, ?, ?)
`);
this.dbInstance.transaction((rows: QueuedResponse[]) => {
@ -52,7 +71,7 @@ export const ResponseQueue = {
r.mentions && r.mentions.length > 0
? JSON.stringify({ mentioned: r.mentions })
: null;
insert.run(r.recipient, r.message, metadata);
insert.run(r.recipient, r.message, metadata, this.nowIso());
}
})(filtered);
@ -70,12 +89,13 @@ export const ResponseQueue = {
};
},
async sendOne(item: ClaimedItem): Promise<boolean> {
async sendOne(item: ClaimedItem): Promise<{ ok: boolean; status?: number; error?: string }> {
const baseUrl = process.env.EVOLUTION_API_URL;
const instance = process.env.EVOLUTION_API_INSTANCE;
if (!baseUrl || !instance) {
console.error('Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE');
return false;
const msg = 'Missing EVOLUTION_API_URL or EVOLUTION_API_INSTANCE';
console.error(msg);
return { ok: false, error: msg };
}
// Endpoint típico de Evolution API para texto simple
@ -107,14 +127,16 @@ export const ResponseQueue = {
if (!response.ok) {
const body = await response.text().catch(() => '');
console.warn('Send failed:', { status: response.status, body: body?.slice(0, 200) });
return false;
const errTxt = body?.slice(0, 200) || `HTTP ${response.status}`;
console.warn('Send failed:', { status: response.status, body: errTxt });
return { ok: false, status: response.status, error: errTxt };
}
console.log(`✅ Sent message to with this as payload: ${JSON.stringify(payload)}`);
return true;
console.log(`✅ Sent message with payload: ${JSON.stringify(payload)}`);
return { ok: true, status: response.status };
} catch (err) {
console.error('Network error sending message:', err);
return false;
const errMsg = (err instanceof Error ? err.message : String(err));
console.error('Network error sending message:', errMsg);
return { ok: false, error: errMsg };
}
},
@ -127,33 +149,51 @@ export const ResponseQueue = {
WHERE id IN (
SELECT id FROM response_queue
WHERE status = 'queued'
ORDER BY created_at, id
AND (next_attempt_at IS NULL OR next_attempt_at <= strftime('%Y-%m-%d %H:%M:%f', 'now'))
ORDER BY COALESCE(next_attempt_at, created_at), id
LIMIT ?
)
RETURNING id, recipient, message, metadata
RETURNING id, recipient, message, metadata, attempts
`).all(limit) as ClaimedItem[];
return rows || [];
},
markSent(id: number) {
markSent(id: number, statusCode?: number) {
this.dbInstance.prepare(`
UPDATE response_queue
SET status = 'sent',
last_status_code = ?,
updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now')
WHERE id = ?
`).run(id);
`).run(statusCode ?? null, id);
},
markFailed(id: number, errorMsg: string) {
markFailed(id: number, errorMsg: string, statusCode?: number, attempts?: number) {
const msg = (errorMsg || '').toString().slice(0, 500);
this.dbInstance.prepare(`
UPDATE response_queue
SET status = 'failed',
attempts = COALESCE(?, attempts),
last_error = ?,
last_status_code = ?,
updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now')
WHERE id = ?
`).run(attempts ?? null, msg, statusCode ?? null, id);
},
requeueWithBackoff(id: number, nextAttempts: number, nextAttemptAt: string, statusCode?: number | null, errorMsg?: string) {
const msg = (errorMsg || '').toString().slice(0, 500) || null;
this.dbInstance.prepare(`
UPDATE response_queue
SET status = 'queued',
attempts = ?,
next_attempt_at = ?,
last_error = COALESCE(?, last_error),
last_status_code = COALESCE(?, last_status_code),
updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now')
WHERE id = ?
`).run(msg, id);
`).run(nextAttempts, nextAttemptAt, msg, statusCode ?? null, id);
},
async workerLoop(workerId: number) {
@ -167,12 +207,31 @@ export const ResponseQueue = {
}
for (const item of batch) {
const ok = await this.sendOne(item);
if (ok) {
this.markSent(item.id);
} else {
this.markFailed(item.id, 'send failed');
const result = await this.sendOne(item);
if (result.ok) {
this.markSent(item.id, result.status);
continue;
}
const status = result.status;
const attemptsNow = (item.attempts || 0) + 1;
const errMsg = result.error || 'send failed';
// 4xx = fallo definitivo
if (typeof status === 'number' && status >= 400 && status < 500) {
this.markFailed(item.id, errMsg, status, attemptsNow);
continue;
}
// 5xx o error de red: reintento con backoff si no superó el máximo
if (attemptsNow >= this.MAX_ATTEMPTS) {
this.markFailed(item.id, errMsg, status, attemptsNow);
continue;
}
const delayMs = this.computeDelayMs(attemptsNow);
const when = this.futureIso(delayMs);
this.requeueWithBackoff(item.id, attemptsNow, when, status ?? null, errMsg);
}
} catch (err) {
console.error(`ResponseQueue worker ${workerId} error:`, err);

Loading…
Cancel
Save