refactor: activar FK, eliminar baseline y log persistente de migraciones

Co-authored-by: aider (openrouter/openai/gpt-5) <aider@aider.chat>
pull/1/head
borja 2 months ago
parent df82dbfe33
commit b686d20caa

@ -58,6 +58,9 @@ Un chatbot de WhatsApp para gestionar tareas en grupos, integrado con Evolution
- RQ_MAX_ATTEMPTS: reintentos máximos; por defecto 6.
- RQ_BASE_BACKOFF_MS: backoff base en ms; por defecto 5000.
- RQ_MAX_BACKOFF_MS: backoff máximo en ms; por defecto 3600000.
- Opcionales — migraciones
- MIGRATOR_CHECKSUM_STRICT: si "false" desactiva validación estricta de checksum de migraciones; por defecto "true".
- MIGRATIONS_LOG_PATH: ruta del fichero de log de migraciones; por defecto data/migrations.log.
- Entorno
- NODE_ENV: production | development | test.

@ -12,7 +12,7 @@ Estado general: listo para piloto con la junta directiva; 170 tests pasando. Rie
- Inicialización con PRAGMA FK y timestamps de alta precisión.
- Esquema: users, tasks, task_assignments, user_preferences, response_queue (con metadata).
- Modo journal WAL activado (busy_timeout y autocheckpoint configurados) para mejorar concurrencia y rendimiento.
- Migrador up-only con tabla schema_migrations; backup automático con VACUUM INTO; baseline si existe esquema previo.
- Migrador up-only con tabla schema_migrations; FK siempre ON al abrir; backup automático con VACUUM INTO; sin baseline por defecto; validación de checksum; log persistente en data/migrations.log; resumen de estado al arrancar.
- Cola de respuestas
- Persistente con workers en background, reintentos con backoff exponencial + jitter, recuperación de items processing tras reinicios, y limpieza/retención.
- 2xx=sent, 4xx=failed definitivo, 5xx/red=reintento; evita enviar al propio bot.
@ -78,6 +78,7 @@ Estado general: listo para piloto con la junta directiva; 170 tests pasando. Rie
## Notas de despliegue/operación
- SQLite en modo WAL; backups consistentes con VACUUM INTO previo; tener en cuenta archivos -wal y -shm.
- Log persistente de migraciones en data/migrations.log (una línea por evento, formato JSONL); revisar en despliegues.
- Persistencia en data/; mapear a volumen y monitorizar tamaño; ejecutar VACUUM periódicamente si procede.
- Rotación de logs vía orquestador; considerar niveles/etiquetas para búsquedas.

@ -2,6 +2,7 @@ import { Database } from 'bun:sqlite';
import { normalizeWhatsAppId } from './utils/whatsapp';
import { mkdirSync } from 'fs';
import { join } from 'path';
import { Migrator } from './db/migrator';
function applyDefaultPragmas(instance: Database): void {
try {
@ -10,6 +11,8 @@ function applyDefaultPragmas(instance: Database): void {
instance.query(`PRAGMA journal_mode = WAL`).get();
instance.exec(`PRAGMA synchronous = NORMAL;`);
instance.exec(`PRAGMA wal_autocheckpoint = 1000;`);
// Asegurar claves foráneas siempre activas
instance.exec(`PRAGMA foreign_keys = ON;`);
} catch (e) {
console.warn('[db] No se pudieron aplicar PRAGMAs (WAL, busy_timeout...):', e);
}
@ -31,172 +34,17 @@ export function getDb(filename: string = 'tasks.db'): Database {
// Default export for the main application database
export const db = getDb();
// Initialize function now accepts a database instance
// Initialize function now accepts a database instance
export function initializeDatabase(instance: Database) {
// Aplicar PRAGMAs por defecto (WAL, busy_timeout, etc.)
// Aplicar PRAGMAs por defecto (WAL, busy_timeout, FK, etc.)
applyDefaultPragmas(instance);
// Enable foreign key constraints
instance.exec(`PRAGMA foreign_keys = ON;`);
// Create users table first as others depend on it
// Use TEXT for timestamps to store higher precision ISO8601 format easily
instance.exec(`
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY, -- WhatsApp user ID (normalized)
first_seen TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
last_seen TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now'))
);
`);
// Create groups table
instance.exec(`
CREATE TABLE IF NOT EXISTS groups (
id TEXT PRIMARY KEY, -- Group ID (normalized)
community_id TEXT NOT NULL,
name TEXT,
last_verified TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
active BOOLEAN DEFAULT TRUE
);
`);
// Create group_members table
instance.exec(`
CREATE TABLE IF NOT EXISTS group_members (
group_id TEXT NOT NULL,
user_id TEXT NOT NULL,
is_admin BOOLEAN NOT NULL DEFAULT 0,
is_active BOOLEAN NOT NULL DEFAULT 1,
first_seen_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
last_seen_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
last_role_change_at TEXT NULL,
PRIMARY KEY (group_id, user_id),
FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
`);
// Indexes for membership lookups
instance.exec(`
CREATE INDEX IF NOT EXISTS idx_group_members_group_active
ON group_members (group_id, is_active);
`);
instance.exec(`
CREATE INDEX IF NOT EXISTS idx_group_members_user_active
ON group_members (user_id, is_active);
`);
// Create tasks table
instance.exec(`
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
description TEXT NOT NULL,
created_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
due_date TEXT NULL, -- Store dates as ISO8601 strings or YYYY-MM-DD
completed BOOLEAN DEFAULT FALSE,
completed_at TEXT NULL,
group_id TEXT NULL, -- Normalized group ID
created_by TEXT NOT NULL, -- Normalized user ID
completed_by TEXT NULL, -- Normalized user ID who completed the task
FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (completed_by) REFERENCES users(id) ON DELETE SET NULL,
FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE SET NULL -- Optional: Link task to group
);
`);
// Create task_assignments table
instance.exec(`
CREATE TABLE IF NOT EXISTS task_assignments (
task_id INTEGER NOT NULL,
user_id TEXT NOT NULL, -- Normalized user ID
assigned_by TEXT NOT NULL, -- Normalized user ID
assigned_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
PRIMARY KEY (task_id, user_id),
FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (assigned_by) REFERENCES users(id) ON DELETE CASCADE
);
`);
// Create response_queue table (persistent outbox for replies)
instance.exec(`
CREATE TABLE IF NOT EXISTS response_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
recipient TEXT NOT NULL,
message TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued' CHECK (status IN ('queued','processing','sent','failed')),
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT NULL,
metadata TEXT NULL,
created_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
updated_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now'))
);
`);
// Index to fetch pending items efficiently
instance.exec(`
CREATE INDEX IF NOT EXISTS idx_response_queue_status_created_at
ON response_queue (status, created_at);
`);
// Migration: ensure 'metadata' column exists on response_queue for message options (e.g., mentions)
// Ejecutar migraciones up-only (sin baseline por defecto). Evitar backup duplicado aquí.
try {
const cols = instance.query(`PRAGMA table_info('response_queue')`).all() as any[];
const hasMetadata = Array.isArray(cols) && cols.some((c: any) => c.name === 'metadata');
if (!hasMetadata) {
instance.exec(`ALTER TABLE response_queue ADD COLUMN metadata TEXT NULL;`);
}
} catch (e) {
console.warn('[initializeDatabase] Skipped adding response_queue.metadata column:', e);
}
// Migration: ensure 'completed_by' column exists on tasks (to record who completed)
try {
const cols = instance.query(`PRAGMA table_info('tasks')`).all() as any[];
const hasCompletedBy = Array.isArray(cols) && cols.some((c: any) => c.name === 'completed_by');
if (!hasCompletedBy) {
instance.exec(`ALTER TABLE tasks ADD COLUMN completed_by TEXT NULL;`);
}
} catch (e) {
console.warn('[initializeDatabase] Skipped adding tasks.completed_by column:', e);
}
// Migration: ensure reliability columns exist on response_queue (next_attempt_at, lease_until, last_status_code)
try {
const cols = instance.query(`PRAGMA table_info('response_queue')`).all() as any[];
const hasNextAttempt = Array.isArray(cols) && cols.some((c: any) => c.name === 'next_attempt_at');
if (!hasNextAttempt) {
instance.exec(`ALTER TABLE response_queue ADD COLUMN next_attempt_at TEXT NULL;`);
}
const hasLeaseUntil = Array.isArray(cols) && cols.some((c: any) => c.name === 'lease_until');
if (!hasLeaseUntil) {
instance.exec(`ALTER TABLE response_queue ADD COLUMN lease_until TEXT NULL;`);
}
const hasLastStatus = Array.isArray(cols) && cols.some((c: any) => c.name === 'last_status_code');
if (!hasLastStatus) {
instance.exec(`ALTER TABLE response_queue ADD COLUMN last_status_code INTEGER NULL;`);
}
} catch (e) {
console.warn('[initializeDatabase] Skipped ensuring response_queue reliability columns:', e);
}
// Ensure supporting indexes exist
try {
instance.exec(`
CREATE INDEX IF NOT EXISTS idx_response_queue_status_next_attempt
ON response_queue (status, next_attempt_at);
`);
} catch (e) {
console.warn('[initializeDatabase] Skipped creating idx_response_queue_status_next_attempt:', e);
}
try {
instance.exec(`
CREATE INDEX IF NOT EXISTS idx_response_queue_status_lease_until
ON response_queue (status, lease_until);
`);
Migrator.migrateToLatest(instance, { withBackup: false, allowBaseline: false });
} catch (e) {
console.warn('[initializeDatabase] Skipped creating idx_response_queue_status_lease_until:', e);
console.error('[initializeDatabase] Error al aplicar migraciones:', e);
throw e;
}
}

@ -1,5 +1,5 @@
import type { Database } from 'bun:sqlite';
import { mkdirSync } from 'fs';
import { mkdirSync, appendFileSync } from 'fs';
import { join } from 'path';
import { migrations, type Migration } from './migrations';
@ -7,6 +7,16 @@ function nowIso(): string {
return new Date().toISOString().replace('T', ' ').replace('Z', '');
}
function logEvent(level: 'info' | 'error', event: string, data: any = {}) {
try {
mkdirSync('data', { recursive: true });
} catch {}
try {
const line = JSON.stringify({ ts: nowIso(), level, event, ...data });
appendFileSync(join('data', 'migrations.log'), line + '\n');
} catch {}
}
function ensureMigrationsTable(db: Database) {
db.exec(`
CREATE TABLE IF NOT EXISTS schema_migrations (
@ -44,7 +54,7 @@ function insertMigrationRow(db: Database, mig: Migration) {
).run(mig.version, mig.name, mig.checksum, nowIso());
}
async function backupDatabaseIfNeeded(db: Database): Promise<string | null> {
function backupDatabaseIfNeeded(db: Database): string | null {
if (process.env.NODE_ENV === 'test') return null;
try {
mkdirSync('data', { recursive: true });
@ -66,14 +76,33 @@ export const Migrator = {
ensureMigrationsTable,
getAppliedVersions,
async migrateToLatest(db: Database, options?: { withBackup?: boolean; allowBaseline?: boolean }) {
migrateToLatest(db: Database, options?: { withBackup?: boolean; allowBaseline?: boolean }) {
const withBackup = options?.withBackup !== false;
const allowBaseline = options?.allowBaseline !== false;
const allowBaseline = options?.allowBaseline === true;
ensureMigrationsTable(db);
const applied = getAppliedVersions(db);
const pending = migrations.filter(m => !applied.has(m.version)).sort((a, b) => a.version - b.version);
// Validación de checksum (estricta por defecto, configurable)
const strict = (process.env.MIGRATOR_CHECKSUM_STRICT ?? 'true').toLowerCase() !== 'false';
for (const [version, info] of applied) {
const codeMig = migrations.find(m => m.version === version);
if (codeMig && codeMig.checksum !== info.checksum) {
const msg = `❌ Checksum mismatch en migración v${version}: aplicado=${info.checksum}, código=${codeMig.checksum}`;
console.error(msg);
try { logEvent('error', 'checksum_mismatch', { version, applied_checksum: info.checksum, code_checksum: codeMig.checksum }); } catch {}
if (strict) throw new Error(msg);
}
}
// Resumen inicial
const jmRow = db.query(`PRAGMA journal_mode`).get() as any;
const journalMode = jmRow ? (jmRow.journal_mode || jmRow.value || jmRow.mode || 'unknown') : 'unknown';
const currentVersion = applied.size ? Math.max(...Array.from(applied.keys())) : 0;
console.log(` Migrador — journal_mode=${journalMode}, versión_actual=${currentVersion}, pendientes=${pending.length}`);
try { logEvent('info', 'startup_summary', { journal_mode: journalMode, current_version: currentVersion, pending: pending.length }); } catch {}
if (applied.size === 0 && allowBaseline && detectExistingSchema(db)) {
// Baseline a v1 si ya existe el esquema pero no hay registro
const v1 = migrations.find(m => m.version === 1)!;
@ -81,22 +110,27 @@ export const Migrator = {
insertMigrationRow(db, v1);
})();
console.log(' Baseline aplicado: schema_migrations marcada en v1 (sin ejecutar up)');
try { logEvent('info', 'baseline_applied', { version: 1 }); } catch {}
// Recalcular pendientes
pending.splice(0, pending.length, ...migrations.filter(m => m.version > 1));
}
if (pending.length === 0) {
console.log(' No hay migraciones pendientes');
try { logEvent('info', 'no_pending', {}); } catch {}
return;
}
if (withBackup) {
await backupDatabaseIfNeeded(db);
const backupPath = backupDatabaseIfNeeded(db);
try { logEvent('info', 'backup', { path: backupPath }); } catch {}
}
for (const mig of pending) {
console.log(`➡️ Aplicando migración v${mig.version} - ${mig.name}`);
try {
try { logEvent('info', 'apply_start', { version: mig.version, name: mig.name, checksum: mig.checksum }); } catch {}
const t0 = Date.now();
db.transaction(() => {
// Ejecutar up
const res = mig.up(db);
@ -106,9 +140,12 @@ export const Migrator = {
// Registrar
insertMigrationRow(db, mig);
})();
console.log(`✅ Migración v${mig.version} aplicada`);
const ms = Date.now() - t0;
console.log(`✅ Migración v${mig.version} aplicada (${ms} ms)`);
try { logEvent('info', 'apply_success', { version: mig.version, name: mig.name, checksum: mig.checksum, duration_ms: ms }); } catch {}
} catch (e) {
console.error(`❌ Error aplicando migración v${mig.version}:`, e);
try { logEvent('error', 'apply_error', { version: mig.version, name: mig.name, checksum: mig.checksum, error: String(e) }); } catch {}
throw e;
}
}

Loading…
Cancel
Save