diff --git a/README.md b/README.md index a21ca37..31c28bd 100644 --- a/README.md +++ b/README.md @@ -186,7 +186,7 @@ bun test - [x] Añadir reintentos con backoff exponencial y jitter. - [x] Recuperar ítems en estado `processing` tras reinicios (lease o expiración y requeue). - [ ] Métricas y logging mejorado (contadores de enviados/fallidos, tiempos). -- [ ] Limpieza/retención de historiales. +- [x] Limpieza/retención de historiales. ### Phase 4 — Desglose y estado - Etapa 1 — Reintentos con backoff exponencial + jitter (Completada) @@ -194,7 +194,7 @@ bun test - Lógica: 2xx → sent; 4xx → failed definitivo; 5xx/red → reintento con `next_attempt_at` hasta MAX_ATTEMPTS. - Etapa 2 — Recuperación de items en `processing` mediante lease/expiración (Completada) - Etapa 3 — Métricas y observabilidad (Pendiente) -- Etapa 4 — Limpieza/retención (Pendiente) +- Etapa 4 — Limpieza/retención (Completada) ### Phase 5: Advanced Features (Low Priority) - [ ] Add task reminders system. diff --git a/STATUS.md b/STATUS.md index 0a5348c..fef6001 100644 --- a/STATUS.md +++ b/STATUS.md @@ -40,9 +40,7 @@ - **Gestión de Tareas** - Eliminación opcional de tareas y mejoras de edición - **Cola de Respuestas** - - Recuperación de ítems en estado `processing` tras caídas (lease/expiración) - Métricas/observabilidad - - Limpieza/retención - **Validaciones** - Permisos de usuario (roles) y pertenencia a grupos (si se requiere política estricta) - **Menciones y nombres** @@ -68,7 +66,7 @@ - Etapa 1 — Reintentos con backoff exponencial + jitter: COMPLETADA. - Etapa 2 — Recuperación de items en `processing` (lease/expiración): COMPLETADA. - Etapa 3 — Métricas y observabilidad: POSTERGADA. -- Etapa 4 — Limpieza/retención: PENDIENTE. +- Etapa 4 — Limpieza/retención: COMPLETADA. ## Commit history and status - Latest status: All unit tests passing; Phase 2 completada; ACK to creator always in single-line format; optional group notify disabled by default; ContactsService avoids network calls under tests; basic name resolution via ContactsService integrated. diff --git a/src/db/migrations/index.ts b/src/db/migrations/index.ts index 480bd94..d84b558 100644 --- a/src/db/migrations/index.ts +++ b/src/db/migrations/index.ts @@ -115,5 +115,16 @@ export const migrations: Migration[] = [ ON response_queue (status, lease_until); `); } + }, + { + version: 3, + name: 'response-queue-retention-index', + checksum: 'v3-rq-retention-index-2025-09-06', + up: (db: Database) => { + db.exec(` + CREATE INDEX IF NOT EXISTS idx_response_queue_status_updated_at + ON response_queue (status, updated_at); + `); + } } ]; diff --git a/src/server.ts b/src/server.ts index b079807..d3d1e20 100644 --- a/src/server.ts +++ b/src/server.ts @@ -295,8 +295,11 @@ export class WebhookServer { try { await ResponseQueue.process(); console.log('✅ ResponseQueue worker started'); + // Start cleanup scheduler (daily retention) + ResponseQueue.startCleanupScheduler(); + console.log('✅ ResponseQueue cleanup scheduler started'); } catch (e) { - console.error('❌ Failed to start ResponseQueue worker:', e); + console.error('❌ Failed to start ResponseQueue worker or cleanup scheduler:', e); } } catch (error) { console.error('❌ Failed to setup webhook:', error instanceof Error ? error.message : error); diff --git a/src/services/response-queue.ts b/src/services/response-queue.ts index 504acdc..461f25d 100644 --- a/src/services/response-queue.ts +++ b/src/services/response-queue.ts @@ -32,7 +32,20 @@ export const ResponseQueue = { BASE_BACKOFF_MS: process.env.RQ_BASE_BACKOFF_MS ? Number(process.env.RQ_BASE_BACKOFF_MS) : 5000, MAX_BACKOFF_MS: process.env.RQ_MAX_BACKOFF_MS ? Number(process.env.RQ_MAX_BACKOFF_MS) : 3600000, + // Limpieza/retención (configurable por entorno) + CLEANUP_ENABLED: process.env.RQ_CLEANUP_ENABLED !== 'false', + RETENTION_DAYS_SENT: process.env.RQ_RETENTION_DAYS_SENT ? Number(process.env.RQ_RETENTION_DAYS_SENT) : 14, + RETENTION_DAYS_FAILED: process.env.RQ_RETENTION_DAYS_FAILED ? Number(process.env.RQ_RETENTION_DAYS_FAILED) : 30, + CLEANUP_INTERVAL_MS: process.env.RQ_CLEANUP_INTERVAL_MS ? Number(process.env.RQ_CLEANUP_INTERVAL_MS) : 24 * 60 * 60 * 1000, // 24h + CLEANUP_BATCH: process.env.RQ_CLEANUP_BATCH ? Number(process.env.RQ_CLEANUP_BATCH) : 1000, + OPTIMIZE_ENABLED: process.env.RQ_OPTIMIZE_ENABLED !== 'false', + VACUUM_ENABLED: process.env.RQ_VACUUM_ENABLED === 'true', + VACUUM_EVERY_N_RUNS: process.env.RQ_VACUUM_EVERY_N_RUNS ? Number(process.env.RQ_VACUUM_EVERY_N_RUNS) : 28, + _running: false, + _cleanupTimer: null as any, + _cleanupRunning: false, + _cleanupRunCount: 0, nowIso(): string { return new Date().toISOString().replace('T', ' ').replace('Z', ''); @@ -255,6 +268,108 @@ export const ResponseQueue = { } }, + // Limpieza/retención de historiales + async runCleanupOnce(now: Date = new Date()): Promise<{ deletedSent: number; deletedFailed: number; totalDeleted: number; skipped: boolean }> { + // Evitar solapes + if (this._cleanupRunning) { + return { deletedSent: 0, deletedFailed: 0, totalDeleted: 0, skipped: true }; + } + this._cleanupRunning = true; + const startedAt = Date.now(); + + try { + const toIso = (d: Date) => d.toISOString().replace('T', ' ').replace('Z', ''); + const msPerDay = 24 * 60 * 60 * 1000; + + const sentThresholdIso = toIso(new Date(now.getTime() - this.RETENTION_DAYS_SENT * msPerDay)); + const failedThresholdIso = toIso(new Date(now.getTime() - this.RETENTION_DAYS_FAILED * msPerDay)); + + const cleanStatus = (status: 'sent' | 'failed', thresholdIso: string): number => { + let deleted = 0; + const selectStmt = this.dbInstance.prepare(` + SELECT id + FROM response_queue + WHERE status = ? AND updated_at < ? + ORDER BY updated_at + LIMIT ? + `); + + while (true) { + const rows = selectStmt.all(status, thresholdIso, this.CLEANUP_BATCH) as Array<{ id: number }>; + if (!rows || rows.length === 0) break; + + const ids = rows.map(r => r.id); + const placeholders = ids.map(() => '?').join(','); + this.dbInstance.prepare(`DELETE FROM response_queue WHERE id IN (${placeholders})`).run(...ids); + deleted += ids.length; + + // Si el lote es menor que el batch, no quedan más candidatos + if (rows.length < this.CLEANUP_BATCH) break; + } + return deleted; + }; + + const deletedSent = cleanStatus('sent', sentThresholdIso); + const deletedFailed = cleanStatus('failed', failedThresholdIso); + const totalDeleted = deletedSent + deletedFailed; + + // Mantenimiento ligero tras limpieza + if (this.OPTIMIZE_ENABLED && totalDeleted > 0) { + try { + this.dbInstance.exec('PRAGMA optimize;'); + } catch (e) { + console.warn('PRAGMA optimize failed:', e); + } + } + + // VACUUM opcional (desactivado por defecto) + if (this.VACUUM_ENABLED && totalDeleted > 0) { + this._cleanupRunCount++; + if (this._cleanupRunCount % Math.max(1, this.VACUUM_EVERY_N_RUNS) === 0) { + try { + this.dbInstance.exec('VACUUM;'); + } catch (e) { + console.warn('VACUUM failed:', e); + } + } + } + + const tookMs = Date.now() - startedAt; + if (process.env.NODE_ENV !== 'test') { + console.log(`🧹 Cleanup done in ${tookMs}ms: sent=${deletedSent}, failed=${deletedFailed}, total=${totalDeleted}`); + } + return { deletedSent, deletedFailed, totalDeleted, skipped: false }; + } catch (err) { + console.error('Cleanup error:', err); + return { deletedSent: 0, deletedFailed: 0, totalDeleted: 0, skipped: false }; + } finally { + this._cleanupRunning = false; + } + }, + + startCleanupScheduler() { + if (process.env.NODE_ENV === 'test') return; + if (!this.CLEANUP_ENABLED) return; + if (this._cleanupTimer) return; + + const interval = this.CLEANUP_INTERVAL_MS; + this._cleanupTimer = setInterval(() => { + this.runCleanupOnce().catch(err => console.error('Scheduled cleanup error:', err)); + }, interval); + + console.log(`🗓️ Cleanup scheduler started (every ${Math.round(interval / (60 * 60 * 1000))}h)`); + }, + + stopCleanupScheduler() { + if (this._cleanupTimer) { + clearInterval(this._cleanupTimer); + this._cleanupTimer = null; + if (process.env.NODE_ENV !== 'test') { + console.log('🛑 Cleanup scheduler stopped'); + } + } + }, + stop() { this._running = false; } diff --git a/tests/unit/services/response-queue.cleanup.test.ts b/tests/unit/services/response-queue.cleanup.test.ts new file mode 100644 index 0000000..2d778d4 --- /dev/null +++ b/tests/unit/services/response-queue.cleanup.test.ts @@ -0,0 +1,124 @@ +import { describe, test, expect, beforeAll, afterAll, beforeEach } from 'bun:test'; +import { Database } from 'bun:sqlite'; +import { initializeDatabase } from '../../../src/db'; +import { ResponseQueue } from '../../../src/services/response-queue'; + +let testDb: Database; +let originalDbInstance: Database; + +function toIso(dt: Date): string { + return dt.toISOString().replace('T', ' ').replace('Z', ''); +} + +describe('ResponseQueue cleanup/retention', () => { + beforeAll(() => { + testDb = new Database(':memory:'); + initializeDatabase(testDb); + originalDbInstance = (ResponseQueue as any).dbInstance; + (ResponseQueue as any).dbInstance = testDb; + }); + + afterAll(() => { + (ResponseQueue as any).dbInstance = originalDbInstance; + testDb.close(); + }); + + beforeEach(() => { + testDb.exec('DELETE FROM response_queue'); + }); + + test('does not delete queued or processing items regardless of age', async () => { + const old = toIso(new Date(2000, 0, 1)); + testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`).run('u1','m1','queued', old); + testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`).run('u2','m2','processing', old); + testDb.prepare(`INSERT INTO response_queue (recipient, message, status) VALUES (?,?,?)`).run('u3','m3','queued'); + + const res = await (ResponseQueue as any).runCleanupOnce(new Date()); + expect(res.totalDeleted).toBe(0); + + const counts = testDb.query(`SELECT status, COUNT(*) as c FROM response_queue GROUP BY status ORDER BY status`).all() as any[]; + const map = Object.fromEntries(counts.map(r => [r.status, r.c])); + expect(map['queued']).toBe(2); + expect(map['processing']).toBe(1); + }); + + test('deletes sent older than 14 days but keeps recent', async () => { + const now = new Date(); + const days14Ago = new Date(now.getTime() - 14 * 24 * 60 * 60 * 1000); + const days13Ago = new Date(now.getTime() - 13 * 24 * 60 * 60 * 1000); + const thresholdExact = toIso(days14Ago); // exact boundary + + // exactly at threshold (should NOT delete because comparison is strict <) + testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`) + .run('u1','m1','sent', thresholdExact); + // older than threshold + testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`) + .run('u2','m2','sent', toIso(new Date(days14Ago.getTime() - 1000))); + // newer than threshold + testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`) + .run('u3','m3','sent', toIso(days13Ago)); + + const res = await (ResponseQueue as any).runCleanupOnce(now); + expect(res.deletedSent).toBe(1); + + const rows = testDb.query(`SELECT status, updated_at FROM response_queue WHERE status='sent' ORDER BY updated_at`).all() as any[]; + expect(rows.length).toBe(2); + expect(rows[0].updated_at).toBe(thresholdExact); + expect(rows[1].updated_at).toBe(toIso(days13Ago)); + }); + + test('deletes failed older than 30 days but keeps newer', async () => { + const now = new Date(); + const days30Ago = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000); + const days29Ago = new Date(now.getTime() - 29 * 24 * 60 * 60 * 1000); + + testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`) + .run('u1','m1','failed', toIso(new Date(days30Ago.getTime() - 1000))); + testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`) + .run('u2','m2','failed', toIso(days29Ago)); + + const res = await (ResponseQueue as any).runCleanupOnce(now); + expect(res.deletedFailed).toBe(1); + + const rows = testDb.query(`SELECT COUNT(*) as c FROM response_queue WHERE status='failed'`).get() as any; + expect(rows.c).toBe(1); + }); + + test('batch deletes large sets in multiple passes', async () => { + (ResponseQueue as any).CLEANUP_BATCH = 500; // reduce for test + const old = toIso(new Date(2000, 0, 1)); + const total = 1200; + + const insert = testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`); + testDb.transaction(() => { + for (let i = 0; i < total; i++) { + insert.run(`u${i}`, `m${i}`, 'sent', old); + } + })(); + + const res = await (ResponseQueue as any).runCleanupOnce(new Date()); + expect(res.deletedSent).toBe(total); + const count = testDb.query(`SELECT COUNT(*) as c FROM response_queue WHERE status='sent'`).get() as any; + expect(count.c).toBe(0); + }); + + test('concurrent cleanup calls do not overlap', async () => { + const old = toIso(new Date(2000, 0, 1)); + for (let i = 0; i < 50; i++) { + testDb.prepare(`INSERT INTO response_queue (recipient, message, status, updated_at) VALUES (?,?,?,?)`) + .run(`u${i}`, `m${i}`, 'sent', old); + } + + // Trigger two cleanups concurrently + const [r1, r2] = await Promise.all([ + (ResponseQueue as any).runCleanupOnce(new Date()), + (ResponseQueue as any).runCleanupOnce(new Date()), + ]); + + const total = (r1.totalDeleted || 0) + (r2.totalDeleted || 0); + expect(total).toBe(50); // no double-deletes + + const remain = testDb.query(`SELECT COUNT(*) as c FROM response_queue`).get() as any; + expect(remain.c).toBe(0); + }); +});