From efe8aaef894a29576befa72ecb60426192c0fd2d Mon Sep 17 00:00:00 2001 From: borja Date: Sat, 6 Sep 2025 18:48:05 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20a=C3=B1adir=20migrador=20de=20migracion?= =?UTF-8?q?es=20up-only=20para=20SQLite?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: aider (openrouter/openai/gpt-5) --- src/db/migrations/index.ts | 119 +++++++++++++++++++++++++++++++++++++ src/db/migrator.ts | 116 ++++++++++++++++++++++++++++++++++++ src/server.ts | 7 ++- 3 files changed, 239 insertions(+), 3 deletions(-) create mode 100644 src/db/migrations/index.ts create mode 100644 src/db/migrator.ts diff --git a/src/db/migrations/index.ts b/src/db/migrations/index.ts new file mode 100644 index 0000000..480bd94 --- /dev/null +++ b/src/db/migrations/index.ts @@ -0,0 +1,119 @@ +import type { Database } from 'bun:sqlite'; + +export type Migration = { + version: number; + name: string; + checksum: string; // estático para trazabilidad básica + up: (db: Database) => void | Promise; +}; + +function tableHasColumn(db: Database, table: string, column: string): boolean { + const cols = db.query(`PRAGMA table_info(${table})`).all() as any[]; + return Array.isArray(cols) && cols.some((c: any) => c.name === column); +} + +export const migrations: Migration[] = [ + { + version: 1, + name: 'initial-schema', + checksum: 'v1-initial-schema-2025-09-06', + up: (db: Database) => { + // Esquema inicial (equivalente al initializeDatabase actual) + db.exec(`PRAGMA foreign_keys = ON;`); + + db.exec(` + CREATE TABLE IF NOT EXISTS users ( + id TEXT PRIMARY KEY, + first_seen TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')), + last_seen TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')) + ); + `); + + db.exec(` + CREATE TABLE IF NOT EXISTS groups ( + id TEXT PRIMARY KEY, + community_id TEXT NOT NULL, + name TEXT, + last_verified TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')), + active BOOLEAN DEFAULT TRUE + ); + `); + + db.exec(` + CREATE TABLE IF NOT EXISTS tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + description TEXT NOT NULL, + created_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')), + due_date TEXT NULL, + completed BOOLEAN DEFAULT FALSE, + completed_at TEXT NULL, + group_id TEXT NULL, + created_by TEXT NOT NULL, + completed_by TEXT NULL, + FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE CASCADE, + FOREIGN KEY (completed_by) REFERENCES users(id) ON DELETE SET NULL, + FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE SET NULL + ); + `); + + db.exec(` + CREATE TABLE IF NOT EXISTS task_assignments ( + task_id INTEGER NOT NULL, + user_id TEXT NOT NULL, + assigned_by TEXT NOT NULL, + assigned_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')), + PRIMARY KEY (task_id, user_id), + FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, + FOREIGN KEY (assigned_by) REFERENCES users(id) ON DELETE CASCADE + ); + `); + + db.exec(` + CREATE TABLE IF NOT EXISTS response_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + recipient TEXT NOT NULL, + message TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'queued' CHECK (status IN ('queued','processing','sent','failed')), + attempts INTEGER NOT NULL DEFAULT 0, + last_error TEXT NULL, + metadata TEXT NULL, + created_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')), + updated_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')) + ); + `); + + db.exec(` + CREATE INDEX IF NOT EXISTS idx_response_queue_status_created_at + ON response_queue (status, created_at); + `); + } + }, + { + version: 2, + name: 'response-queue-reliability', + checksum: 'v2-rq-reliability-2025-09-06', + up: (db: Database) => { + // Añadir columnas necesarias si no existen (idempotente) + if (!tableHasColumn(db, 'response_queue', 'next_attempt_at')) { + db.exec(`ALTER TABLE response_queue ADD COLUMN next_attempt_at TEXT NULL;`); + } + if (!tableHasColumn(db, 'response_queue', 'lease_until')) { + db.exec(`ALTER TABLE response_queue ADD COLUMN lease_until TEXT NULL;`); + } + if (!tableHasColumn(db, 'response_queue', 'last_status_code')) { + db.exec(`ALTER TABLE response_queue ADD COLUMN last_status_code INTEGER NULL;`); + } + + // Índices complementarios + db.exec(` + CREATE INDEX IF NOT EXISTS idx_response_queue_status_next_attempt + ON response_queue (status, next_attempt_at); + `); + db.exec(` + CREATE INDEX IF NOT EXISTS idx_response_queue_status_lease_until + ON response_queue (status, lease_until); + `); + } + } +]; diff --git a/src/db/migrator.ts b/src/db/migrator.ts new file mode 100644 index 0000000..97821eb --- /dev/null +++ b/src/db/migrator.ts @@ -0,0 +1,116 @@ +import type { Database } from 'bun:sqlite'; +import { mkdirSync } from 'fs'; +import { join } from 'path'; +import { migrations, type Migration } from './migrations'; + +function nowIso(): string { + return new Date().toISOString().replace('T', ' ').replace('Z', ''); +} + +function ensureMigrationsTable(db: Database) { + db.exec(` + CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + name TEXT NOT NULL, + checksum TEXT NOT NULL, + applied_at TEXT NOT NULL + ); + `); +} + +function getAppliedVersions(db: Database): Map { + const rows = db.query(`SELECT version, name, checksum, applied_at FROM schema_migrations ORDER BY version`).all() as any[]; + const map = new Map(); + for (const r of rows) { + map.set(Number(r.version), { name: String(r.name), checksum: String(r.checksum), applied_at: String(r.applied_at) }); + } + return map; +} + +function tableExists(db: Database, table: string): boolean { + const row = db.query(`SELECT name FROM sqlite_master WHERE type='table' AND name=?`).get(table) as any; + return !!row; +} + +function detectExistingSchema(db: Database): boolean { + // Consideramos esquema "existente" si ya hay tablas clave + const coreTables = ['users', 'tasks', 'response_queue']; + return coreTables.every(t => tableExists(db, t)); +} + +function insertMigrationRow(db: Database, mig: Migration) { + db.prepare( + `INSERT INTO schema_migrations (version, name, checksum, applied_at) VALUES (?, ?, ?, ?)` + ).run(mig.version, mig.name, mig.checksum, nowIso()); +} + +async function backupDatabaseIfNeeded(db: Database): Promise { + if (process.env.NODE_ENV === 'test') return null; + try { + mkdirSync('data', { recursive: true }); + } catch {} + const stamp = new Date().toISOString().replace(/[-:T.Z]/g, '').slice(0, 14); + const backupPath = join('data', `tasks.db.bak-${stamp}.sqlite`); + try { + // VACUUM INTO hace copia consistente del estado actual + db.exec(`VACUUM INTO '${backupPath.replace(/'/g, "''")}'`); + console.log(`ℹ️ Backup de base de datos creado en: ${backupPath}`); + return backupPath; + } catch (e) { + console.warn('⚠️ No se pudo crear el backup con VACUUM INTO (continuando de todos modos):', e); + return null; + } +} + +export const Migrator = { + ensureMigrationsTable, + getAppliedVersions, + + async migrateToLatest(db: Database, options?: { withBackup?: boolean; allowBaseline?: boolean }) { + const withBackup = options?.withBackup !== false; + const allowBaseline = options?.allowBaseline !== false; + + ensureMigrationsTable(db); + const applied = getAppliedVersions(db); + const pending = migrations.filter(m => !applied.has(m.version)).sort((a, b) => a.version - b.version); + + if (applied.size === 0 && allowBaseline && detectExistingSchema(db)) { + // Baseline a v1 si ya existe el esquema pero no hay registro + const v1 = migrations.find(m => m.version === 1)!; + db.transaction(() => { + insertMigrationRow(db, v1); + })(); + console.log('ℹ️ Baseline aplicado: schema_migrations marcada en v1 (sin ejecutar up)'); + // Recalcular pendientes + pending.splice(0, pending.length, ...migrations.filter(m => m.version > 1)); + } + + if (pending.length === 0) { + console.log('ℹ️ No hay migraciones pendientes'); + return; + } + + if (withBackup) { + await backupDatabaseIfNeeded(db); + } + + for (const mig of pending) { + console.log(`➡️ Aplicando migración v${mig.version} - ${mig.name}`); + try { + db.transaction(() => { + // Ejecutar up + const res = mig.up(db); + if (res instanceof Promise) { + throw new Error('Las migraciones up no deben ser asíncronas en este migrador'); + } + // Registrar + insertMigrationRow(db, mig); + })(); + console.log(`✅ Migración v${mig.version} aplicada`); + } catch (e) { + console.error(`❌ Error aplicando migración v${mig.version}:`, e); + throw e; + } + } + } +}; diff --git a/src/server.ts b/src/server.ts index 524fb2a..b079807 100644 --- a/src/server.ts +++ b/src/server.ts @@ -6,8 +6,9 @@ import { ResponseQueue } from './services/response-queue'; import { TaskService } from './tasks/service'; import { WebhookManager } from './services/webhook-manager'; import { normalizeWhatsAppId, isGroupId } from './utils/whatsapp'; -import { ensureUserExists, db, initializeDatabase } from './db'; +import { ensureUserExists, db } from './db'; import { ContactsService } from './services/contacts'; +import { Migrator } from './db/migrator'; // Bun is available globally when running under Bun runtime declare global { @@ -265,8 +266,8 @@ export class WebhookServer { static async start() { this.validateEnv(); - // Ensure database schema and migrations are applied - initializeDatabase(this.dbInstance); + // Run database migrations (up-only) before starting services + await Migrator.migrateToLatest(this.dbInstance); const PORT = process.env.PORT || '3007'; console.log('✅ Environment variables validated');