diff --git a/README.md b/README.md index 09558ed..8c2a52e 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,9 @@ Un chatbot de WhatsApp para gestionar tareas en grupos, integrado con Evolution - RQ_MAX_ATTEMPTS: reintentos máximos; por defecto 6. - RQ_BASE_BACKOFF_MS: backoff base en ms; por defecto 5000. - RQ_MAX_BACKOFF_MS: backoff máximo en ms; por defecto 3600000. +- Opcionales — migraciones + - MIGRATOR_CHECKSUM_STRICT: si "false" desactiva validación estricta de checksum de migraciones; por defecto "true". + - MIGRATIONS_LOG_PATH: ruta del fichero de log de migraciones; por defecto data/migrations.log. - Entorno - NODE_ENV: production | development | test. diff --git a/STATUS.md b/STATUS.md index a3306d5..39d576d 100644 --- a/STATUS.md +++ b/STATUS.md @@ -12,7 +12,7 @@ Estado general: listo para piloto con la junta directiva; 170 tests pasando. Rie - Inicialización con PRAGMA FK y timestamps de alta precisión. - Esquema: users, tasks, task_assignments, user_preferences, response_queue (con metadata). - Modo journal WAL activado (busy_timeout y autocheckpoint configurados) para mejorar concurrencia y rendimiento. - - Migrador up-only con tabla schema_migrations; backup automático con VACUUM INTO; baseline si existe esquema previo. + - Migrador up-only con tabla schema_migrations; FK siempre ON al abrir; backup automático con VACUUM INTO; sin baseline por defecto; validación de checksum; log persistente en data/migrations.log; resumen de estado al arrancar. - Cola de respuestas - Persistente con workers en background, reintentos con backoff exponencial + jitter, recuperación de items processing tras reinicios, y limpieza/retención. - 2xx=sent, 4xx=failed definitivo, 5xx/red=reintento; evita enviar al propio bot. @@ -78,6 +78,7 @@ Estado general: listo para piloto con la junta directiva; 170 tests pasando. Rie ## Notas de despliegue/operación - SQLite en modo WAL; backups consistentes con VACUUM INTO previo; tener en cuenta archivos -wal y -shm. +- Log persistente de migraciones en data/migrations.log (una línea por evento, formato JSONL); revisar en despliegues. - Persistencia en data/; mapear a volumen y monitorizar tamaño; ejecutar VACUUM periódicamente si procede. - Rotación de logs vía orquestador; considerar niveles/etiquetas para búsquedas. diff --git a/src/db.ts b/src/db.ts index 4081380..2c25bb8 100644 --- a/src/db.ts +++ b/src/db.ts @@ -2,6 +2,7 @@ import { Database } from 'bun:sqlite'; import { normalizeWhatsAppId } from './utils/whatsapp'; import { mkdirSync } from 'fs'; import { join } from 'path'; +import { Migrator } from './db/migrator'; function applyDefaultPragmas(instance: Database): void { try { @@ -10,6 +11,8 @@ function applyDefaultPragmas(instance: Database): void { instance.query(`PRAGMA journal_mode = WAL`).get(); instance.exec(`PRAGMA synchronous = NORMAL;`); instance.exec(`PRAGMA wal_autocheckpoint = 1000;`); + // Asegurar claves foráneas siempre activas + instance.exec(`PRAGMA foreign_keys = ON;`); } catch (e) { console.warn('[db] No se pudieron aplicar PRAGMAs (WAL, busy_timeout...):', e); } @@ -31,172 +34,17 @@ export function getDb(filename: string = 'tasks.db'): Database { // Default export for the main application database export const db = getDb(); -// Initialize function now accepts a database instance + // Initialize function now accepts a database instance export function initializeDatabase(instance: Database) { - // Aplicar PRAGMAs por defecto (WAL, busy_timeout, etc.) + // Aplicar PRAGMAs por defecto (WAL, busy_timeout, FK, etc.) applyDefaultPragmas(instance); - // Enable foreign key constraints - instance.exec(`PRAGMA foreign_keys = ON;`); - - // Create users table first as others depend on it - // Use TEXT for timestamps to store higher precision ISO8601 format easily - instance.exec(` - CREATE TABLE IF NOT EXISTS users ( - id TEXT PRIMARY KEY, -- WhatsApp user ID (normalized) - first_seen TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')), - last_seen TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')) - ); - `); - - // Create groups table - instance.exec(` - CREATE TABLE IF NOT EXISTS groups ( - id TEXT PRIMARY KEY, -- Group ID (normalized) - community_id TEXT NOT NULL, - name TEXT, - last_verified TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')), - active BOOLEAN DEFAULT TRUE - ); - `); - - // Create group_members table - instance.exec(` - CREATE TABLE IF NOT EXISTS group_members ( - group_id TEXT NOT NULL, - user_id TEXT NOT NULL, - is_admin BOOLEAN NOT NULL DEFAULT 0, - is_active BOOLEAN NOT NULL DEFAULT 1, - first_seen_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')), - last_seen_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')), - last_role_change_at TEXT NULL, - PRIMARY KEY (group_id, user_id), - FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE CASCADE, - FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE - ); - `); - - // Indexes for membership lookups - instance.exec(` - CREATE INDEX IF NOT EXISTS idx_group_members_group_active - ON group_members (group_id, is_active); - `); - instance.exec(` - CREATE INDEX IF NOT EXISTS idx_group_members_user_active - ON group_members (user_id, is_active); - `); - - // Create tasks table - instance.exec(` - CREATE TABLE IF NOT EXISTS tasks ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - description TEXT NOT NULL, - created_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')), - due_date TEXT NULL, -- Store dates as ISO8601 strings or YYYY-MM-DD - completed BOOLEAN DEFAULT FALSE, - completed_at TEXT NULL, - group_id TEXT NULL, -- Normalized group ID - created_by TEXT NOT NULL, -- Normalized user ID - completed_by TEXT NULL, -- Normalized user ID who completed the task - FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE CASCADE, - FOREIGN KEY (completed_by) REFERENCES users(id) ON DELETE SET NULL, - FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE SET NULL -- Optional: Link task to group - ); - `); - - // Create task_assignments table - instance.exec(` - CREATE TABLE IF NOT EXISTS task_assignments ( - task_id INTEGER NOT NULL, - user_id TEXT NOT NULL, -- Normalized user ID - assigned_by TEXT NOT NULL, -- Normalized user ID - assigned_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')), - PRIMARY KEY (task_id, user_id), - FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE, - FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, - FOREIGN KEY (assigned_by) REFERENCES users(id) ON DELETE CASCADE - ); - `); - - // Create response_queue table (persistent outbox for replies) - instance.exec(` - CREATE TABLE IF NOT EXISTS response_queue ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - recipient TEXT NOT NULL, - message TEXT NOT NULL, - status TEXT NOT NULL DEFAULT 'queued' CHECK (status IN ('queued','processing','sent','failed')), - attempts INTEGER NOT NULL DEFAULT 0, - last_error TEXT NULL, - metadata TEXT NULL, - created_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')), - updated_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')) - ); - `); - - // Index to fetch pending items efficiently - instance.exec(` - CREATE INDEX IF NOT EXISTS idx_response_queue_status_created_at - ON response_queue (status, created_at); - `); - - // Migration: ensure 'metadata' column exists on response_queue for message options (e.g., mentions) + // Ejecutar migraciones up-only (sin baseline por defecto). Evitar backup duplicado aquí. try { - const cols = instance.query(`PRAGMA table_info('response_queue')`).all() as any[]; - const hasMetadata = Array.isArray(cols) && cols.some((c: any) => c.name === 'metadata'); - if (!hasMetadata) { - instance.exec(`ALTER TABLE response_queue ADD COLUMN metadata TEXT NULL;`); - } - } catch (e) { - console.warn('[initializeDatabase] Skipped adding response_queue.metadata column:', e); - } - - // Migration: ensure 'completed_by' column exists on tasks (to record who completed) - try { - const cols = instance.query(`PRAGMA table_info('tasks')`).all() as any[]; - const hasCompletedBy = Array.isArray(cols) && cols.some((c: any) => c.name === 'completed_by'); - if (!hasCompletedBy) { - instance.exec(`ALTER TABLE tasks ADD COLUMN completed_by TEXT NULL;`); - } - } catch (e) { - console.warn('[initializeDatabase] Skipped adding tasks.completed_by column:', e); - } - - // Migration: ensure reliability columns exist on response_queue (next_attempt_at, lease_until, last_status_code) - try { - const cols = instance.query(`PRAGMA table_info('response_queue')`).all() as any[]; - const hasNextAttempt = Array.isArray(cols) && cols.some((c: any) => c.name === 'next_attempt_at'); - if (!hasNextAttempt) { - instance.exec(`ALTER TABLE response_queue ADD COLUMN next_attempt_at TEXT NULL;`); - } - const hasLeaseUntil = Array.isArray(cols) && cols.some((c: any) => c.name === 'lease_until'); - if (!hasLeaseUntil) { - instance.exec(`ALTER TABLE response_queue ADD COLUMN lease_until TEXT NULL;`); - } - const hasLastStatus = Array.isArray(cols) && cols.some((c: any) => c.name === 'last_status_code'); - if (!hasLastStatus) { - instance.exec(`ALTER TABLE response_queue ADD COLUMN last_status_code INTEGER NULL;`); - } - } catch (e) { - console.warn('[initializeDatabase] Skipped ensuring response_queue reliability columns:', e); - } - - // Ensure supporting indexes exist - try { - instance.exec(` - CREATE INDEX IF NOT EXISTS idx_response_queue_status_next_attempt - ON response_queue (status, next_attempt_at); - `); - } catch (e) { - console.warn('[initializeDatabase] Skipped creating idx_response_queue_status_next_attempt:', e); - } - - try { - instance.exec(` - CREATE INDEX IF NOT EXISTS idx_response_queue_status_lease_until - ON response_queue (status, lease_until); - `); + Migrator.migrateToLatest(instance, { withBackup: false, allowBaseline: false }); } catch (e) { - console.warn('[initializeDatabase] Skipped creating idx_response_queue_status_lease_until:', e); + console.error('[initializeDatabase] Error al aplicar migraciones:', e); + throw e; } } diff --git a/src/db/migrator.ts b/src/db/migrator.ts index 97821eb..fab6b21 100644 --- a/src/db/migrator.ts +++ b/src/db/migrator.ts @@ -1,5 +1,5 @@ import type { Database } from 'bun:sqlite'; -import { mkdirSync } from 'fs'; +import { mkdirSync, appendFileSync } from 'fs'; import { join } from 'path'; import { migrations, type Migration } from './migrations'; @@ -7,6 +7,16 @@ function nowIso(): string { return new Date().toISOString().replace('T', ' ').replace('Z', ''); } +function logEvent(level: 'info' | 'error', event: string, data: any = {}) { + try { + mkdirSync('data', { recursive: true }); + } catch {} + try { + const line = JSON.stringify({ ts: nowIso(), level, event, ...data }); + appendFileSync(join('data', 'migrations.log'), line + '\n'); + } catch {} +} + function ensureMigrationsTable(db: Database) { db.exec(` CREATE TABLE IF NOT EXISTS schema_migrations ( @@ -44,7 +54,7 @@ function insertMigrationRow(db: Database, mig: Migration) { ).run(mig.version, mig.name, mig.checksum, nowIso()); } -async function backupDatabaseIfNeeded(db: Database): Promise { +function backupDatabaseIfNeeded(db: Database): string | null { if (process.env.NODE_ENV === 'test') return null; try { mkdirSync('data', { recursive: true }); @@ -66,14 +76,33 @@ export const Migrator = { ensureMigrationsTable, getAppliedVersions, - async migrateToLatest(db: Database, options?: { withBackup?: boolean; allowBaseline?: boolean }) { + migrateToLatest(db: Database, options?: { withBackup?: boolean; allowBaseline?: boolean }) { const withBackup = options?.withBackup !== false; - const allowBaseline = options?.allowBaseline !== false; + const allowBaseline = options?.allowBaseline === true; ensureMigrationsTable(db); const applied = getAppliedVersions(db); const pending = migrations.filter(m => !applied.has(m.version)).sort((a, b) => a.version - b.version); + // Validación de checksum (estricta por defecto, configurable) + const strict = (process.env.MIGRATOR_CHECKSUM_STRICT ?? 'true').toLowerCase() !== 'false'; + for (const [version, info] of applied) { + const codeMig = migrations.find(m => m.version === version); + if (codeMig && codeMig.checksum !== info.checksum) { + const msg = `❌ Checksum mismatch en migración v${version}: aplicado=${info.checksum}, código=${codeMig.checksum}`; + console.error(msg); + try { logEvent('error', 'checksum_mismatch', { version, applied_checksum: info.checksum, code_checksum: codeMig.checksum }); } catch {} + if (strict) throw new Error(msg); + } + } + + // Resumen inicial + const jmRow = db.query(`PRAGMA journal_mode`).get() as any; + const journalMode = jmRow ? (jmRow.journal_mode || jmRow.value || jmRow.mode || 'unknown') : 'unknown'; + const currentVersion = applied.size ? Math.max(...Array.from(applied.keys())) : 0; + console.log(`ℹ️ Migrador — journal_mode=${journalMode}, versión_actual=${currentVersion}, pendientes=${pending.length}`); + try { logEvent('info', 'startup_summary', { journal_mode: journalMode, current_version: currentVersion, pending: pending.length }); } catch {} + if (applied.size === 0 && allowBaseline && detectExistingSchema(db)) { // Baseline a v1 si ya existe el esquema pero no hay registro const v1 = migrations.find(m => m.version === 1)!; @@ -81,22 +110,27 @@ export const Migrator = { insertMigrationRow(db, v1); })(); console.log('ℹ️ Baseline aplicado: schema_migrations marcada en v1 (sin ejecutar up)'); + try { logEvent('info', 'baseline_applied', { version: 1 }); } catch {} // Recalcular pendientes pending.splice(0, pending.length, ...migrations.filter(m => m.version > 1)); } if (pending.length === 0) { console.log('ℹ️ No hay migraciones pendientes'); + try { logEvent('info', 'no_pending', {}); } catch {} return; } if (withBackup) { - await backupDatabaseIfNeeded(db); + const backupPath = backupDatabaseIfNeeded(db); + try { logEvent('info', 'backup', { path: backupPath }); } catch {} } for (const mig of pending) { console.log(`➡️ Aplicando migración v${mig.version} - ${mig.name}`); try { + try { logEvent('info', 'apply_start', { version: mig.version, name: mig.name, checksum: mig.checksum }); } catch {} + const t0 = Date.now(); db.transaction(() => { // Ejecutar up const res = mig.up(db); @@ -106,9 +140,12 @@ export const Migrator = { // Registrar insertMigrationRow(db, mig); })(); - console.log(`✅ Migración v${mig.version} aplicada`); + const ms = Date.now() - t0; + console.log(`✅ Migración v${mig.version} aplicada (${ms} ms)`); + try { logEvent('info', 'apply_success', { version: mig.version, name: mig.name, checksum: mig.checksum, duration_ms: ms }); } catch {} } catch (e) { console.error(`❌ Error aplicando migración v${mig.version}:`, e); + try { logEvent('error', 'apply_error', { version: mig.version, name: mig.name, checksum: mig.checksum, error: String(e) }); } catch {} throw e; } }