feat: añadir migrador de migraciones up-only para SQLite

Co-authored-by: aider (openrouter/openai/gpt-5) <aider@aider.chat>
pull/1/head
borja 2 months ago
parent 4a305dc007
commit efe8aaef89

@ -0,0 +1,119 @@
import type { Database } from 'bun:sqlite';
export type Migration = {
version: number;
name: string;
checksum: string; // estático para trazabilidad básica
up: (db: Database) => void | Promise<void>;
};
function tableHasColumn(db: Database, table: string, column: string): boolean {
const cols = db.query(`PRAGMA table_info(${table})`).all() as any[];
return Array.isArray(cols) && cols.some((c: any) => c.name === column);
}
export const migrations: Migration[] = [
{
version: 1,
name: 'initial-schema',
checksum: 'v1-initial-schema-2025-09-06',
up: (db: Database) => {
// Esquema inicial (equivalente al initializeDatabase actual)
db.exec(`PRAGMA foreign_keys = ON;`);
db.exec(`
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
first_seen TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
last_seen TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now'))
);
`);
db.exec(`
CREATE TABLE IF NOT EXISTS groups (
id TEXT PRIMARY KEY,
community_id TEXT NOT NULL,
name TEXT,
last_verified TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
active BOOLEAN DEFAULT TRUE
);
`);
db.exec(`
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
description TEXT NOT NULL,
created_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
due_date TEXT NULL,
completed BOOLEAN DEFAULT FALSE,
completed_at TEXT NULL,
group_id TEXT NULL,
created_by TEXT NOT NULL,
completed_by TEXT NULL,
FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (completed_by) REFERENCES users(id) ON DELETE SET NULL,
FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE SET NULL
);
`);
db.exec(`
CREATE TABLE IF NOT EXISTS task_assignments (
task_id INTEGER NOT NULL,
user_id TEXT NOT NULL,
assigned_by TEXT NOT NULL,
assigned_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
PRIMARY KEY (task_id, user_id),
FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (assigned_by) REFERENCES users(id) ON DELETE CASCADE
);
`);
db.exec(`
CREATE TABLE IF NOT EXISTS response_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
recipient TEXT NOT NULL,
message TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued' CHECK (status IN ('queued','processing','sent','failed')),
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT NULL,
metadata TEXT NULL,
created_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
updated_at TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now'))
);
`);
db.exec(`
CREATE INDEX IF NOT EXISTS idx_response_queue_status_created_at
ON response_queue (status, created_at);
`);
}
},
{
version: 2,
name: 'response-queue-reliability',
checksum: 'v2-rq-reliability-2025-09-06',
up: (db: Database) => {
// Añadir columnas necesarias si no existen (idempotente)
if (!tableHasColumn(db, 'response_queue', 'next_attempt_at')) {
db.exec(`ALTER TABLE response_queue ADD COLUMN next_attempt_at TEXT NULL;`);
}
if (!tableHasColumn(db, 'response_queue', 'lease_until')) {
db.exec(`ALTER TABLE response_queue ADD COLUMN lease_until TEXT NULL;`);
}
if (!tableHasColumn(db, 'response_queue', 'last_status_code')) {
db.exec(`ALTER TABLE response_queue ADD COLUMN last_status_code INTEGER NULL;`);
}
// Índices complementarios
db.exec(`
CREATE INDEX IF NOT EXISTS idx_response_queue_status_next_attempt
ON response_queue (status, next_attempt_at);
`);
db.exec(`
CREATE INDEX IF NOT EXISTS idx_response_queue_status_lease_until
ON response_queue (status, lease_until);
`);
}
}
];

@ -0,0 +1,116 @@
import type { Database } from 'bun:sqlite';
import { mkdirSync } from 'fs';
import { join } from 'path';
import { migrations, type Migration } from './migrations';
function nowIso(): string {
return new Date().toISOString().replace('T', ' ').replace('Z', '');
}
function ensureMigrationsTable(db: Database) {
db.exec(`
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
name TEXT NOT NULL,
checksum TEXT NOT NULL,
applied_at TEXT NOT NULL
);
`);
}
function getAppliedVersions(db: Database): Map<number, { name: string; checksum: string; applied_at: string }> {
const rows = db.query(`SELECT version, name, checksum, applied_at FROM schema_migrations ORDER BY version`).all() as any[];
const map = new Map<number, { name: string; checksum: string; applied_at: string }>();
for (const r of rows) {
map.set(Number(r.version), { name: String(r.name), checksum: String(r.checksum), applied_at: String(r.applied_at) });
}
return map;
}
function tableExists(db: Database, table: string): boolean {
const row = db.query(`SELECT name FROM sqlite_master WHERE type='table' AND name=?`).get(table) as any;
return !!row;
}
function detectExistingSchema(db: Database): boolean {
// Consideramos esquema "existente" si ya hay tablas clave
const coreTables = ['users', 'tasks', 'response_queue'];
return coreTables.every(t => tableExists(db, t));
}
function insertMigrationRow(db: Database, mig: Migration) {
db.prepare(
`INSERT INTO schema_migrations (version, name, checksum, applied_at) VALUES (?, ?, ?, ?)`
).run(mig.version, mig.name, mig.checksum, nowIso());
}
async function backupDatabaseIfNeeded(db: Database): Promise<string | null> {
if (process.env.NODE_ENV === 'test') return null;
try {
mkdirSync('data', { recursive: true });
} catch {}
const stamp = new Date().toISOString().replace(/[-:T.Z]/g, '').slice(0, 14);
const backupPath = join('data', `tasks.db.bak-${stamp}.sqlite`);
try {
// VACUUM INTO hace copia consistente del estado actual
db.exec(`VACUUM INTO '${backupPath.replace(/'/g, "''")}'`);
console.log(` Backup de base de datos creado en: ${backupPath}`);
return backupPath;
} catch (e) {
console.warn('⚠️ No se pudo crear el backup con VACUUM INTO (continuando de todos modos):', e);
return null;
}
}
export const Migrator = {
ensureMigrationsTable,
getAppliedVersions,
async migrateToLatest(db: Database, options?: { withBackup?: boolean; allowBaseline?: boolean }) {
const withBackup = options?.withBackup !== false;
const allowBaseline = options?.allowBaseline !== false;
ensureMigrationsTable(db);
const applied = getAppliedVersions(db);
const pending = migrations.filter(m => !applied.has(m.version)).sort((a, b) => a.version - b.version);
if (applied.size === 0 && allowBaseline && detectExistingSchema(db)) {
// Baseline a v1 si ya existe el esquema pero no hay registro
const v1 = migrations.find(m => m.version === 1)!;
db.transaction(() => {
insertMigrationRow(db, v1);
})();
console.log(' Baseline aplicado: schema_migrations marcada en v1 (sin ejecutar up)');
// Recalcular pendientes
pending.splice(0, pending.length, ...migrations.filter(m => m.version > 1));
}
if (pending.length === 0) {
console.log(' No hay migraciones pendientes');
return;
}
if (withBackup) {
await backupDatabaseIfNeeded(db);
}
for (const mig of pending) {
console.log(`➡️ Aplicando migración v${mig.version} - ${mig.name}`);
try {
db.transaction(() => {
// Ejecutar up
const res = mig.up(db);
if (res instanceof Promise) {
throw new Error('Las migraciones up no deben ser asíncronas en este migrador');
}
// Registrar
insertMigrationRow(db, mig);
})();
console.log(`✅ Migración v${mig.version} aplicada`);
} catch (e) {
console.error(`❌ Error aplicando migración v${mig.version}:`, e);
throw e;
}
}
}
};

@ -6,8 +6,9 @@ import { ResponseQueue } from './services/response-queue';
import { TaskService } from './tasks/service'; import { TaskService } from './tasks/service';
import { WebhookManager } from './services/webhook-manager'; import { WebhookManager } from './services/webhook-manager';
import { normalizeWhatsAppId, isGroupId } from './utils/whatsapp'; import { normalizeWhatsAppId, isGroupId } from './utils/whatsapp';
import { ensureUserExists, db, initializeDatabase } from './db'; import { ensureUserExists, db } from './db';
import { ContactsService } from './services/contacts'; import { ContactsService } from './services/contacts';
import { Migrator } from './db/migrator';
// Bun is available globally when running under Bun runtime // Bun is available globally when running under Bun runtime
declare global { declare global {
@ -265,8 +266,8 @@ export class WebhookServer {
static async start() { static async start() {
this.validateEnv(); this.validateEnv();
// Ensure database schema and migrations are applied // Run database migrations (up-only) before starting services
initializeDatabase(this.dbInstance); await Migrator.migrateToLatest(this.dbInstance);
const PORT = process.env.PORT || '3007'; const PORT = process.env.PORT || '3007';
console.log('✅ Environment variables validated'); console.log('✅ Environment variables validated');

Loading…
Cancel
Save