feat: añadir limpieza/retención de response_queue (borrado duro)

Co-authored-by: aider (openrouter/openai/gpt-5) <aider@aider.chat>
pull/1/head
borja 2 months ago
parent a5daba241b
commit 3f9280eb1a

@ -186,7 +186,7 @@ bun test
- [x] Añadir reintentos con backoff exponencial y jitter.
- [x] Recuperar ítems en estado `processing` tras reinicios (lease o expiración y requeue).
- [ ] Métricas y logging mejorado (contadores de enviados/fallidos, tiempos).
- [ ] Limpieza/retención de historiales.
- [x] Limpieza/retención de historiales.
### Phase 4 — Desglose y estado
- Etapa 1 — Reintentos con backoff exponencial + jitter (Completada)
@ -194,7 +194,7 @@ bun test
- Lógica: 2xx → sent; 4xx → failed definitivo; 5xx/red → reintento con `next_attempt_at` hasta MAX_ATTEMPTS.
- Etapa 2 — Recuperación de items en `processing` mediante lease/expiración (Completada)
- Etapa 3 — Métricas y observabilidad (Pendiente)
- Etapa 4 — Limpieza/retención (Pendiente)
- Etapa 4 — Limpieza/retención (Completada)
### Phase 5: Advanced Features (Low Priority)
- [ ] Add task reminders system.

@ -40,9 +40,7 @@
- **Gestión de Tareas**
- Eliminación opcional de tareas y mejoras de edición
- **Cola de Respuestas**
- Recuperación de ítems en estado `processing` tras caídas (lease/expiración)
- Métricas/observabilidad
- Limpieza/retención
- **Validaciones**
- Permisos de usuario (roles) y pertenencia a grupos (si se requiere política estricta)
- **Menciones y nombres**
@ -68,7 +66,7 @@
- Etapa 1 — Reintentos con backoff exponencial + jitter: COMPLETADA.
- Etapa 2 — Recuperación de items en `processing` (lease/expiración): COMPLETADA.
- Etapa 3 — Métricas y observabilidad: POSTERGADA.
- Etapa 4 — Limpieza/retención: PENDIENTE.
- Etapa 4 — Limpieza/retención: COMPLETADA.
## Commit history and status
- Latest status: All unit tests passing; Phase 2 completada; ACK to creator always in single-line format; optional group notify disabled by default; ContactsService avoids network calls under tests; basic name resolution via ContactsService integrated.

@ -115,5 +115,16 @@ export const migrations: Migration[] = [
ON response_queue (status, lease_until);
`);
}
},
{
version: 3,
name: 'response-queue-retention-index',
checksum: 'v3-rq-retention-index-2025-09-06',
up: (db: Database) => {
db.exec(`
CREATE INDEX IF NOT EXISTS idx_response_queue_status_updated_at
ON response_queue (status, updated_at);
`);
}
}
];

@ -295,8 +295,11 @@ export class WebhookServer {
try {
await ResponseQueue.process();
console.log('✅ ResponseQueue worker started');
// Start cleanup scheduler (daily retention)
ResponseQueue.startCleanupScheduler();
console.log('✅ ResponseQueue cleanup scheduler started');
} catch (e) {
console.error('❌ Failed to start ResponseQueue worker:', e);
console.error('❌ Failed to start ResponseQueue worker or cleanup scheduler:', e);
}
} catch (error) {
console.error('❌ Failed to setup webhook:', error instanceof Error ? error.message : error);

@ -32,7 +32,20 @@ export const ResponseQueue = {
BASE_BACKOFF_MS: process.env.RQ_BASE_BACKOFF_MS ? Number(process.env.RQ_BASE_BACKOFF_MS) : 5000,
MAX_BACKOFF_MS: process.env.RQ_MAX_BACKOFF_MS ? Number(process.env.RQ_MAX_BACKOFF_MS) : 3600000,
// Limpieza/retención (configurable por entorno)
CLEANUP_ENABLED: process.env.RQ_CLEANUP_ENABLED !== 'false',
RETENTION_DAYS_SENT: process.env.RQ_RETENTION_DAYS_SENT ? Number(process.env.RQ_RETENTION_DAYS_SENT) : 14,
RETENTION_DAYS_FAILED: process.env.RQ_RETENTION_DAYS_FAILED ? Number(process.env.RQ_RETENTION_DAYS_FAILED) : 30,
CLEANUP_INTERVAL_MS: process.env.RQ_CLEANUP_INTERVAL_MS ? Number(process.env.RQ_CLEANUP_INTERVAL_MS) : 24 * 60 * 60 * 1000, // 24h
CLEANUP_BATCH: process.env.RQ_CLEANUP_BATCH ? Number(process.env.RQ_CLEANUP_BATCH) : 1000,
OPTIMIZE_ENABLED: process.env.RQ_OPTIMIZE_ENABLED !== 'false',
VACUUM_ENABLED: process.env.RQ_VACUUM_ENABLED === 'true',
VACUUM_EVERY_N_RUNS: process.env.RQ_VACUUM_EVERY_N_RUNS ? Number(process.env.RQ_VACUUM_EVERY_N_RUNS) : 28,
_running: false,
_cleanupTimer: null as any,
_cleanupRunning: false,
_cleanupRunCount: 0,
nowIso(): string {
return new Date().toISOString().replace('T', ' ').replace('Z', '');
@ -255,6 +268,108 @@ export const ResponseQueue = {
}
},
// Limpieza/retención de historiales
async runCleanupOnce(now: Date = new Date()): Promise<{ deletedSent: number; deletedFailed: number; totalDeleted: number; skipped: boolean }> {
// Evitar solapes
if (this._cleanupRunning) {
return { deletedSent: 0, deletedFailed: 0, totalDeleted: 0, skipped: true };
}
this._cleanupRunning = true;
const startedAt = Date.now();
try {
const toIso = (d: Date) => d.toISOString().replace('T', ' ').replace('Z', '');
const msPerDay = 24 * 60 * 60 * 1000;
const sentThresholdIso = toIso(new Date(now.getTime() - this.RETENTION_DAYS_SENT * msPerDay));
const failedThresholdIso = toIso(new Date(now.getTime() - this.RETENTION_DAYS_FAILED * msPerDay));
const cleanStatus = (status: 'sent' | 'failed', thresholdIso: string): number => {
let deleted = 0;
const selectStmt = this.dbInstance.prepare(`
SELECT id
FROM response_queue
WHERE status = ? AND updated_at < ?
ORDER BY updated_at
LIMIT ?
`);
while (true) {
const rows = selectStmt.all(status, thresholdIso, this.CLEANUP_BATCH) as Array<{ id: number }>;
if (!rows || rows.length === 0) break;
const ids = rows.map(r => r.id);
const placeholders = ids.map(() => '?').join(',');
this.dbInstance.prepare(`DELETE FROM response_queue WHERE id IN (${placeholders})`).run(...ids);
deleted += ids.length;
// Si el lote es menor que el batch, no quedan más candidatos
if (rows.length < this.CLEANUP_BATCH) break;
}
return deleted;
};
const deletedSent = cleanStatus('sent', sentThresholdIso);
const deletedFailed = cleanStatus('failed', failedThresholdIso);
const totalDeleted = deletedSent + deletedFailed;
// Mantenimiento ligero tras limpieza
if (this.OPTIMIZE_ENABLED && totalDeleted > 0) {
try {
this.dbInstance.exec('PRAGMA optimize;');
} catch (e) {
console.warn('PRAGMA optimize failed:', e);
}
}
// VACUUM opcional (desactivado por defecto)
if (this.VACUUM_ENABLED && totalDeleted > 0) {
this._cleanupRunCount++;
if (this._cleanupRunCount % Math.max(1, this.VACUUM_EVERY_N_RUNS) === 0) {
try {
this.dbInstance.exec('VACUUM;');
} catch (e) {
console.warn('VACUUM failed:', e);
}
}
}
const tookMs = Date.now() - startedAt;
if (process.env.NODE_ENV !== 'test') {
console.log(`🧹 Cleanup done in ${tookMs}ms: sent=${deletedSent}, failed=${deletedFailed}, total=${totalDeleted}`);
}
return { deletedSent, deletedFailed, totalDeleted, skipped: false };
} catch (err) {
console.error('Cleanup error:', err);
return { deletedSent: 0, deletedFailed: 0, totalDeleted: 0, skipped: false };
} finally {
this._cleanupRunning = false;
}
},
startCleanupScheduler() {
if (process.env.NODE_ENV === 'test') return;
if (!this.CLEANUP_ENABLED) return;
if (this._cleanupTimer) return;
const interval = this.CLEANUP_INTERVAL_MS;
this._cleanupTimer = setInterval(() => {
this.runCleanupOnce().catch(err => console.error('Scheduled cleanup error:', err));
}, interval);
console.log(`🗓️ Cleanup scheduler started (every ${Math.round(interval / (60 * 60 * 1000))}h)`);
},
stopCleanupScheduler() {
if (this._cleanupTimer) {
clearInterval(this._cleanupTimer);
this._cleanupTimer = null;
if (process.env.NODE_ENV !== 'test') {
console.log('🛑 Cleanup scheduler stopped');
}
}
},
stop() {
this._running = false;
}

@ -0,0 +1,124 @@
import { describe, test, expect, beforeAll, afterAll, beforeEach } from 'bun:test';
import { Database } from 'bun:sqlite';
import { initializeDatabase } from '../../../src/db';
import { ResponseQueue } from '../../../src/services/response-queue';
let testDb: Database;
let originalDbInstance: Database;
function toIso(dt: Date): string {
return dt.toISOString().replace('T', ' ').replace('Z', '');
}
describe('ResponseQueue cleanup/retention', () => {
beforeAll(() => {
testDb = new Database(':memory:');
initializeDatabase(testDb);
originalDbInstance = (ResponseQueue as any).dbInstance;
(ResponseQueue as any).dbInstance = testDb;
});
afterAll(() => {
(ResponseQueue as any).dbInstance = originalDbInstance;
testDb.close();
});
beforeEach(() => {
testDb.exec('DELETE FROM response_queue');
});
test('does not delete queued or processing items regardless of age', async () => {
const old = toIso(new Date(2000, 0, 1));
testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`).run('u1','m1','queued', old);
testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`).run('u2','m2','processing', old);
testDb.prepare(`INSERT INTO response_queue (recipient, message, status) VALUES (?,?,?)`).run('u3','m3','queued');
const res = await (ResponseQueue as any).runCleanupOnce(new Date());
expect(res.totalDeleted).toBe(0);
const counts = testDb.query(`SELECT status, COUNT(*) as c FROM response_queue GROUP BY status ORDER BY status`).all() as any[];
const map = Object.fromEntries(counts.map(r => [r.status, r.c]));
expect(map['queued']).toBe(2);
expect(map['processing']).toBe(1);
});
test('deletes sent older than 14 days but keeps recent', async () => {
const now = new Date();
const days14Ago = new Date(now.getTime() - 14 * 24 * 60 * 60 * 1000);
const days13Ago = new Date(now.getTime() - 13 * 24 * 60 * 60 * 1000);
const thresholdExact = toIso(days14Ago); // exact boundary
// exactly at threshold (should NOT delete because comparison is strict <)
testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`)
.run('u1','m1','sent', thresholdExact);
// older than threshold
testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`)
.run('u2','m2','sent', toIso(new Date(days14Ago.getTime() - 1000)));
// newer than threshold
testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`)
.run('u3','m3','sent', toIso(days13Ago));
const res = await (ResponseQueue as any).runCleanupOnce(now);
expect(res.deletedSent).toBe(1);
const rows = testDb.query(`SELECT status, updated_at FROM response_queue WHERE status='sent' ORDER BY updated_at`).all() as any[];
expect(rows.length).toBe(2);
expect(rows[0].updated_at).toBe(thresholdExact);
expect(rows[1].updated_at).toBe(toIso(days13Ago));
});
test('deletes failed older than 30 days but keeps newer', async () => {
const now = new Date();
const days30Ago = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000);
const days29Ago = new Date(now.getTime() - 29 * 24 * 60 * 60 * 1000);
testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`)
.run('u1','m1','failed', toIso(new Date(days30Ago.getTime() - 1000)));
testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`)
.run('u2','m2','failed', toIso(days29Ago));
const res = await (ResponseQueue as any).runCleanupOnce(now);
expect(res.deletedFailed).toBe(1);
const rows = testDb.query(`SELECT COUNT(*) as c FROM response_queue WHERE status='failed'`).get() as any;
expect(rows.c).toBe(1);
});
test('batch deletes large sets in multiple passes', async () => {
(ResponseQueue as any).CLEANUP_BATCH = 500; // reduce for test
const old = toIso(new Date(2000, 0, 1));
const total = 1200;
const insert = testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`);
testDb.transaction(() => {
for (let i = 0; i < total; i++) {
insert.run(`u${i}`, `m${i}`, 'sent', old);
}
})();
const res = await (ResponseQueue as any).runCleanupOnce(new Date());
expect(res.deletedSent).toBe(total);
const count = testDb.query(`SELECT COUNT(*) as c FROM response_queue WHERE status='sent'`).get() as any;
expect(count.c).toBe(0);
});
test('concurrent cleanup calls do not overlap', async () => {
const old = toIso(new Date(2000, 0, 1));
for (let i = 0; i < 50; i++) {
testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`)
.run(`u${i}`, `m${i}`, 'sent', old);
}
// Trigger two cleanups concurrently
const [r1, r2] = await Promise.all([
(ResponseQueue as any).runCleanupOnce(new Date()),
(ResponseQueue as any).runCleanupOnce(new Date()),
]);
const total = (r1.totalDeleted || 0) + (r2.totalDeleted || 0);
expect(total).toBe(50); // no double-deletes
const remain = testDb.query(`SELECT COUNT(*) as c FROM response_queue`).get() as any;
expect(remain.c).toBe(0);
});
});
Loading…
Cancel
Save