diff --git a/src/services/response-queue.ts b/src/services/response-queue.ts index 26b3cf6..8563a5a 100644 --- a/src/services/response-queue.ts +++ b/src/services/response-queue.ts @@ -1,5 +1,6 @@ import type { Database } from 'bun:sqlite'; import { db } from '../db'; +import { getDb } from '../db/locator'; import { IdentityService } from './identity'; import { normalizeWhatsAppId } from '../utils/whatsapp'; import { Metrics } from './metrics'; @@ -63,6 +64,15 @@ export const ResponseQueue = { _cleanupRunning: false, _cleanupRunCount: 0, + getDbInstance(): Database { + const anyThis = this as any; + try { + return (anyThis.dbInstance as Database) ?? getDb(); + } catch { + return anyThis.dbInstance as Database; + } + }, + nowIso(): string { return toIsoSqlUTC(new Date()); }, @@ -89,12 +99,12 @@ export const ResponseQueue = { return; } - const insert = this.dbInstance.prepare(` + const insert = this.getDbInstance().prepare(` INSERT INTO response_queue (recipient, message, metadata, next_attempt_at) VALUES (?, ?, ?, ?) `); - this.dbInstance.transaction((rows: QueuedResponse[]) => { + this.getDbInstance().transaction((rows: QueuedResponse[]) => { for (const r of rows) { const metadata = r.mentions && r.mentions.length > 0 @@ -141,7 +151,7 @@ export const ResponseQueue = { display_code: metadata.display_code ?? null }; const nextAt = delayMs && delayMs > 0 ? this.futureIso(delayMs) : this.nowIso(); - this.dbInstance.prepare(` + this.getDbInstance().prepare(` INSERT INTO response_queue (recipient, message, metadata, next_attempt_at) VALUES (?, ?, ?, ?) `).run(recipient, message, JSON.stringify(metaObj), nextAt); @@ -151,7 +161,7 @@ export const ResponseQueue = { // Estadísticas de onboarding por destinatario (consulta simple sobre response_queue) getOnboardingStats(recipient: string): { total: number; lastSentAt: string | null; firstInitialAt?: string | null; lastVariant?: 'initial' | 'reminder' | null } { if (!recipient) return { total: 0, lastSentAt: null, lastVariant: null }; - const rows = this.dbInstance.prepare(` + const rows = this.getDbInstance().prepare(` SELECT status, created_at, updated_at, metadata FROM response_queue WHERE recipient = ? AND metadata IS NOT NULL @@ -222,7 +232,7 @@ export const ResponseQueue = { const cutoff = this.futureIso(-24 * 60 * 60 * 1000); // Idempotencia: existe job igual reciente en estados activos? - const exists = this.dbInstance.prepare(` + const exists = this.getDbInstance().prepare(` SELECT 1 FROM response_queue WHERE metadata = ? @@ -235,7 +245,7 @@ export const ResponseQueue = { return; } - this.dbInstance.prepare(` + this.getDbInstance().prepare(` INSERT INTO response_queue (recipient, message, metadata, next_attempt_at) VALUES (?, ?, ?, ?) `).run(chatId, '', metadata, this.nowIso()); @@ -385,7 +395,7 @@ export const ResponseQueue = { claimNextBatch(limit: number): ClaimedItem[] { // Selecciona y marca como 'processing' en una sola sentencia para evitar carreras - const rows = this.dbInstance.prepare(` + const rows = this.getDbInstance().prepare(` UPDATE response_queue SET status = 'processing', updated_at = strftime('%Y-%m-%d %H:%M:%f', 'now') @@ -403,7 +413,7 @@ export const ResponseQueue = { }, markSent(id: number, statusCode?: number) { - this.dbInstance.prepare(` + this.getDbInstance().prepare(` UPDATE response_queue SET status = 'sent', last_status_code = ?, @@ -412,7 +422,7 @@ export const ResponseQueue = { `).run(statusCode ?? null, id); // Recalcular métricas agregadas de onboarding si aplica try { - const row = this.dbInstance.prepare(`SELECT metadata FROM response_queue WHERE id = ?`).get(id) as { metadata?: string | null } | undefined; + const row = this.getDbInstance().prepare(`SELECT metadata FROM response_queue WHERE id = ?`).get(id) as { metadata?: string | null } | undefined; let meta: any = null; try { meta = row?.metadata ? JSON.parse(String(row.metadata)) : null; } catch {} if (meta && meta.kind === 'onboarding') { @@ -423,7 +433,7 @@ export const ResponseQueue = { markFailed(id: number, errorMsg: string, statusCode?: number, attempts?: number) { const msg = (errorMsg || '').toString().slice(0, 500); - this.dbInstance.prepare(` + this.getDbInstance().prepare(` UPDATE response_queue SET status = 'failed', attempts = COALESCE(?, attempts), @@ -436,7 +446,7 @@ export const ResponseQueue = { requeueWithBackoff(id: number, nextAttempts: number, nextAttemptAt: string, statusCode?: number | null, errorMsg?: string) { const msg = (errorMsg || '').toString().slice(0, 500) || null; - this.dbInstance.prepare(` + this.getDbInstance().prepare(` UPDATE response_queue SET status = 'queued', attempts = ?, @@ -451,7 +461,7 @@ export const ResponseQueue = { setOnboardingAggregatesMetrics(): void { try { // Total de mensajes de onboarding enviados - const sentRow = this.dbInstance.prepare(` + const sentRow = this.getDbInstance().prepare(` SELECT COUNT(*) AS c FROM response_queue WHERE status = 'sent' AND metadata LIKE '%"kind":"onboarding"%' @@ -459,7 +469,7 @@ export const ResponseQueue = { const sentAbs = Number(sentRow?.c || 0); // Destinatarios únicos con al menos 1 onboarding enviado - const rcptRow = this.dbInstance.prepare(` + const rcptRow = this.getDbInstance().prepare(` SELECT COUNT(DISTINCT recipient) AS c FROM response_queue WHERE status = 'sent' AND metadata LIKE '%"kind":"onboarding"%' @@ -467,7 +477,7 @@ export const ResponseQueue = { const recipientsAbs = Number(rcptRow?.c || 0); // Usuarios convertidos: last_command_at > primer onboarding enviado - const convRow = this.dbInstance.prepare(` + const convRow = this.getDbInstance().prepare(` SELECT COUNT(*) AS c FROM users u JOIN ( @@ -566,7 +576,7 @@ export const ResponseQueue = { const startedAt = Date.now(); try { - const res = await cleanupRunOnce(this.dbInstance, { + const res = await cleanupRunOnce(this.getDbInstance(), { retentionDaysSent: this.RETENTION_DAYS_SENT, retentionDaysFailed: this.RETENTION_DAYS_FAILED, batchSize: this.CLEANUP_BATCH, diff --git a/src/tasks/service.ts b/src/tasks/service.ts index 593e8f3..ac18e7e 100644 --- a/src/tasks/service.ts +++ b/src/tasks/service.ts @@ -1,5 +1,6 @@ import type { Database } from 'bun:sqlite'; import { db, ensureUserExists } from '../db'; +import { getDb as getGlobalDb } from '../db/locator'; import { AllowedGroups } from '../services/allowed-groups'; import { isGroupId } from '../utils/whatsapp'; import { pickNextDisplayCode } from './display-code'; @@ -21,15 +22,19 @@ type CreateAssignmentInput = { export class TaskService { static dbInstance: Database = db; + private static getDb(): Database { + return ((this as any).dbInstance as Database) ?? getGlobalDb(); + } + static createTask(task: CreateTaskInput, assignments: CreateAssignmentInput[] = []): number { - const runTx = this.dbInstance.transaction(() => { + const runTx = this.getDb().transaction(() => { - const insertTask = this.dbInstance.prepare(` + const insertTask = this.getDb().prepare(` INSERT INTO tasks (description, due_date, group_id, created_by, display_code) VALUES (?, ?, ?, ?, ?) `); - const ensuredCreator = ensureUserExists(task.created_by, this.dbInstance); + const ensuredCreator = ensureUserExists(task.created_by, this.getDb()); if (!ensuredCreator) { throw new Error('No se pudo asegurar created_by'); } @@ -49,14 +54,14 @@ export class TaskService { } catch {} if (groupIdToInsert) { - const exists = this.dbInstance.prepare(`SELECT 1 FROM groups WHERE id = ? AND COALESCE(is_community,0) = 0`).get(groupIdToInsert); + const exists = this.getDb().prepare(`SELECT 1 FROM groups WHERE id = ? AND COALESCE(is_community,0) = 0`).get(groupIdToInsert); if (!exists) { groupIdToInsert = null; } } // Elegir display_code global reutilizable entre tareas activas - const displayCode = pickNextDisplayCode(this.dbInstance); + const displayCode = pickNextDisplayCode(this.getDb()); const runResult = insertTask.run( task.description, @@ -68,7 +73,7 @@ export class TaskService { const taskId = Number((runResult as { lastInsertRowid?: number | bigint }).lastInsertRowid); if (assignments.length > 0) { - const insertAssignment = this.dbInstance.prepare(` + const insertAssignment = this.getDb().prepare(` INSERT INTO task_assignments (task_id, user_id, assigned_by) VALUES (?, ?, ?) `); @@ -76,13 +81,13 @@ export class TaskService { // Evitar duplicados por (task_id, user_id) tras asegurar usuarios const seen = new Set(); for (const a of assignments) { - const ensuredUser = ensureUserExists(a.user_id, this.dbInstance); + const ensuredUser = ensureUserExists(a.user_id, this.getDb()); if (!ensuredUser) continue; if (seen.has(ensuredUser)) continue; seen.add(ensuredUser); const ensuredAssigner = - ensureUserExists(a.assigned_by || ensuredCreator, this.dbInstance) || ensuredCreator; + ensureUserExists(a.assigned_by || ensuredCreator, this.getDb()) || ensuredCreator; insertAssignment.run(taskId, ensuredUser, ensuredAssigner); } @@ -103,7 +108,7 @@ export class TaskService { display_code: number | null; assignees: string[]; }> { - const rows = this.dbInstance + const rows = this.getDb() .prepare(` SELECT id, description, due_date, group_id, display_code FROM tasks @@ -117,7 +122,7 @@ export class TaskService { `) .all(groupId, limit) as Array<{ id: number; description: string; due_date: string | null; group_id: string | null; display_code: number | null }>; - const getAssignees = this.dbInstance.prepare(` + const getAssignees = this.getDb().prepare(` SELECT user_id FROM task_assignments WHERE task_id = ? ORDER BY assigned_at ASC @@ -139,7 +144,7 @@ export class TaskService { display_code: number | null; assignees: string[]; }> { - const rows = this.dbInstance + const rows = this.getDb() .prepare(` SELECT t.id, t.description, t.due_date, t.group_id, t.display_code FROM tasks t @@ -154,7 +159,7 @@ export class TaskService { `) .all(userId, limit) as Array<{ id: number; description: string; due_date: string | null; group_id: string | null; display_code: number | null }>; - const getAssignees = this.dbInstance.prepare(` + const getAssignees = this.getDb().prepare(` SELECT user_id FROM task_assignments WHERE task_id = ? ORDER BY assigned_at ASC @@ -169,7 +174,7 @@ export class TaskService { // Contar pendientes del grupo (sin límite) static countGroupPending(groupId: string): number { - const row = this.dbInstance + const row = this.getDb() .prepare(` SELECT COUNT(*) as cnt FROM tasks @@ -182,7 +187,7 @@ export class TaskService { // Contar pendientes asignadas al usuario (sin límite) static countUserPending(userId: string): number { - const row = this.dbInstance + const row = this.getDb() .prepare(` SELECT COUNT(*) as cnt FROM tasks t @@ -199,9 +204,9 @@ export class TaskService { status: 'updated' | 'already' | 'not_found'; task?: { id: number; description: string; due_date: string | null; display_code: number | null }; } { - const ensured = ensureUserExists(completedBy, this.dbInstance); + const ensured = ensureUserExists(completedBy, this.getDb()); - const existing = this.dbInstance + const existing = this.getDb() .prepare(` SELECT id, description, due_date, completed, completed_at, display_code, group_id FROM tasks @@ -225,7 +230,7 @@ export class TaskService { }; } - this.dbInstance + this.getDb() .prepare(` UPDATE tasks SET completed = 1, @@ -237,7 +242,7 @@ export class TaskService { // Fase 2: reacción ✅ al completar dentro del TTL y con gating try { - enqueueCompletionReactionIfEligible(this.dbInstance, taskId); + enqueueCompletionReactionIfEligible(this.getDb(), taskId); } catch {} return { @@ -260,7 +265,7 @@ export class TaskService { display_code: number | null; assignees: string[]; }> { - const rows = this.dbInstance + const rows = this.getDb() .prepare(` SELECT id, description, due_date, group_id, display_code FROM tasks @@ -282,7 +287,7 @@ export class TaskService { // Contar pendientes sin dueño del grupo (sin límite) static countGroupUnassigned(groupId: string): number { - const row = this.dbInstance + const row = this.getDb() .prepare(` SELECT COUNT(*) as cnt FROM tasks t @@ -301,12 +306,12 @@ export class TaskService { status: 'claimed' | 'already' | 'not_found' | 'completed'; task?: { id: number; description: string; due_date: string | null; display_code: number | null }; } { - const ensuredUser = ensureUserExists(userId, this.dbInstance); + const ensuredUser = ensureUserExists(userId, this.getDb()); if (!ensuredUser) { throw new Error('No se pudo asegurar el usuario'); } - const existing = this.dbInstance + const existing = this.getDb() .prepare(` SELECT id, description, due_date, group_id, completed, completed_at, display_code FROM tasks @@ -330,7 +335,7 @@ export class TaskService { }; } - const already = this.dbInstance + const already = this.getDb() .prepare(`SELECT 1 FROM task_assignments WHERE task_id = ? AND user_id = ?`) .get(taskId, ensuredUser); @@ -346,12 +351,12 @@ export class TaskService { }; } - const insertAssignment = this.dbInstance.prepare(` + const insertAssignment = this.getDb().prepare(` INSERT OR IGNORE INTO task_assignments (task_id, user_id, assigned_by) VALUES (?, ?, ?) `); - this.dbInstance.transaction(() => { + this.getDb().transaction(() => { insertAssignment.run(taskId, ensuredUser, ensuredUser); })(); @@ -372,12 +377,12 @@ export class TaskService { task?: { id: number; description: string; due_date: string | null; display_code: number | null }; now_unassigned?: boolean; // true si tras soltar no quedan asignados } { - const ensuredUser = ensureUserExists(userId, this.dbInstance); + const ensuredUser = ensureUserExists(userId, this.getDb()); if (!ensuredUser) { throw new Error('No se pudo asegurar el usuario'); } - const existing = this.dbInstance + const existing = this.getDb() .prepare(` SELECT id, description, due_date, group_id, completed, completed_at, display_code FROM tasks @@ -403,7 +408,7 @@ export class TaskService { // Regla: no permitir soltar si es tarea personal y el usuario es el único asignatario try { - const stats = this.dbInstance.prepare(` + const stats = this.getDb().prepare(` SELECT COUNT(*) AS cnt, SUM(CASE WHEN user_id = ? THEN 1 ELSE 0 END) AS mine FROM task_assignments @@ -425,14 +430,14 @@ export class TaskService { } } catch {} - const deleteStmt = this.dbInstance.prepare(` + const deleteStmt = this.getDb().prepare(` DELETE FROM task_assignments WHERE task_id = ? AND user_id = ? `); const result = deleteStmt.run(taskId, ensuredUser) as { changes?: number }; - const cntRow = this.dbInstance + const cntRow = this.getDb() .prepare(`SELECT COUNT(*) as cnt FROM task_assignments WHERE task_id = ?`) .get(taskId) as { cnt?: number } | undefined; const remaining = Number(cntRow?.cnt || 0); @@ -474,7 +479,7 @@ export class TaskService { completed: number; completed_at: string | null; } | null { - const row = this.dbInstance.prepare(` + const row = this.getDb().prepare(` SELECT id, description, @@ -492,7 +497,7 @@ export class TaskService { // Buscar tarea activa por display_code global static getActiveTaskByDisplayCode(displayCode: number): { id: number; description: string; due_date: string | null; display_code: number | null } | null { - const row = this.dbInstance.prepare(` + const row = this.getDb().prepare(` SELECT id, description, due_date, display_code FROM tasks WHERE display_code = ? AND COALESCE(completed, 0) = 0 AND completed_at IS NULL @@ -542,7 +547,7 @@ export class TaskService { group_name: string | null; display_code: number | null; }> { - const rows = this.dbInstance + const rows = this.getDb() .prepare(` SELECT t.id, t.description, t.due_date, t.group_id, t.display_code, g.name AS group_name FROM tasks t @@ -567,7 +572,7 @@ export class TaskService { } static countAllActive(): number { - const row = this.dbInstance + const row = this.getDb() .prepare(` SELECT COUNT(*) AS cnt FROM tasks t