From 46bec524a2557c98a796f0c06759983806f4b06f Mon Sep 17 00:00:00 2001 From: brobert Date: Mon, 10 Nov 2025 14:44:28 +0100 Subject: [PATCH] refactor: modularizar WebhookServer y endpoints /metrics /health Co-authored-by: aider (openrouter/openai/gpt-5) --- src/http/bootstrap.ts | 90 ++++++++++++++++++++++++ src/http/health.ts | 46 +++++++++++++ src/http/metrics.ts | 44 ++++++++++++ src/server.ts | 155 ++---------------------------------------- 4 files changed, 187 insertions(+), 148 deletions(-) create mode 100644 src/http/bootstrap.ts create mode 100644 src/http/health.ts create mode 100644 src/http/metrics.ts diff --git a/src/http/bootstrap.ts b/src/http/bootstrap.ts new file mode 100644 index 0000000..256b211 --- /dev/null +++ b/src/http/bootstrap.ts @@ -0,0 +1,90 @@ +import type { Database } from 'bun:sqlite'; +import { WebhookManager } from '../services/webhook-manager'; +import { GroupSyncService } from '../services/group-sync'; +import { ResponseQueue } from '../services/response-queue'; +import { RemindersService } from '../services/reminders'; +import { MaintenanceService } from '../services/maintenance'; + +export async function startServices(_db: Database): Promise { + await WebhookManager.registerWebhook(); + // Add small delay to allow webhook to propagate + await new Promise(resolve => setTimeout(resolve, 1000)); + const isActive = await WebhookManager.verifyWebhook(); + if (!isActive) { + console.error('❌ Webhook verification failed - retrying in 2 seconds...'); + await new Promise(resolve => setTimeout(resolve, 2000)); + const isActiveRetry = await WebhookManager.verifyWebhook(); + if (!isActiveRetry) { + console.error('❌ Webhook verification failed after retry'); + process.exit(1); + } + } + + // Initialize groups - critical for operation + await GroupSyncService.checkInitialGroups(); + + // Start groups scheduler (periodic sync of groups) + try { + GroupSyncService.startGroupsScheduler(); + console.log('✅ Group scheduler started'); + } catch (e) { + console.error('⚠️ Failed to start Group scheduler:', e); + } + + // Initial members sync (non-blocking if fails) + try { + await GroupSyncService.syncMembersForActiveGroups(); + GroupSyncService.startMembersScheduler(); + console.log('✅ Group members scheduler started'); + } catch (e) { + console.error('⚠️ Failed to run initial members sync or start scheduler:', e); + } + + // Start response queue worker (background) + try { + await ResponseQueue.process(); + console.log('✅ ResponseQueue worker started'); + // Start cleanup scheduler (daily retention) + ResponseQueue.startCleanupScheduler(); + console.log('✅ ResponseQueue cleanup scheduler started'); + RemindersService.start(); + console.log('✅ RemindersService started'); + } catch (e) { + console.error('❌ Failed to start ResponseQueue worker or cleanup scheduler:', e); + } + + // Mantenimiento (cleanup de miembros inactivos) + try { + MaintenanceService.start(); + console.log('✅ MaintenanceService started'); + // Ejecutar reconciliación de alias una vez al arranque (one-shot) + try { + await MaintenanceService.reconcileAliasUsersOnce(); + console.log('✅ MaintenanceService: reconciliación de alias ejecutada (one-shot)'); + } catch (e2) { + console.error('⚠️ Failed to run alias reconciliation one-shot:', e2); + } + } catch (e) { + console.error('⚠️ Failed to start MaintenanceService:', e); + } +} + +export function stopServices(): void { + try { + ResponseQueue.stopCleanupScheduler(); + } catch {} + try { + // No existe un "stop" público de workers; paramos el lazo + (ResponseQueue as any).stop?.(); + } catch {} + try { + RemindersService.stop(); + } catch {} + try { + GroupSyncService.stopGroupsScheduler(); + GroupSyncService.stopMembersScheduler(); + } catch {} + try { + MaintenanceService.stop(); + } catch {} +} diff --git a/src/http/health.ts b/src/http/health.ts new file mode 100644 index 0000000..cfbd453 --- /dev/null +++ b/src/http/health.ts @@ -0,0 +1,46 @@ +import type { Database } from 'bun:sqlite'; +import { Metrics } from '../services/metrics'; + +export async function handleHealthRequest(url: URL, db: Database): Promise { + // /health?full=1 devuelve JSON con detalles + if (url.searchParams.get('full') === '1') { + try { + const rowG = db.prepare(`SELECT COUNT(*) AS c, MAX(last_verified) AS lv FROM groups WHERE active = 1`).get() as { c?: number; lv?: string | null } | undefined; + const rowM = db.prepare(`SELECT COUNT(*) AS c FROM group_members WHERE is_active = 1`).get() as { c?: number } | undefined; + const active_groups = Number(rowG?.c || 0); + const active_members = Number(rowM?.c || 0); + const lv = rowG?.lv ? String(rowG.lv) : null; + let last_sync_at: string | null = lv; + let snapshot_age_ms: number | null = null; + if (lv) { + const iso = lv.includes('T') ? lv : (lv.replace(' ', 'T') + 'Z'); + const ms = Date.parse(iso); + if (Number.isFinite(ms)) { + snapshot_age_ms = Date.now() - ms; + } + } + const lastSyncMetric = Metrics.get('last_sync_ok'); + const maxAgeRaw = Number(process.env.MAX_MEMBERS_SNAPSHOT_AGE_MS); + const maxAgeMs = Number.isFinite(maxAgeRaw) && maxAgeRaw > 0 ? maxAgeRaw : 24 * 60 * 60 * 1000; + const snapshot_fresh = typeof snapshot_age_ms === 'number' ? (snapshot_age_ms <= maxAgeMs) : false; + let last_sync_ok: number; + if (typeof lastSyncMetric === 'number') { + last_sync_ok = (lastSyncMetric === 1 && snapshot_fresh) ? 1 : 0; + } else { + // Si no hay métrica explícita, nos basamos exclusivamente en la frescura de la snapshot + last_sync_ok = snapshot_fresh ? 1 : 0; + } + const payload = { status: 'ok', active_groups, active_members, last_sync_at, snapshot_age_ms, snapshot_fresh, last_sync_ok }; + return new Response(JSON.stringify(payload), { + status: 200, + headers: { 'Content-Type': 'application/json' } + }); + } catch (e) { + return new Response(JSON.stringify({ status: 'error' }), { + status: 500, + headers: { 'Content-Type': 'application/json' } + }); + } + } + return new Response('OK', { status: 200 }); +} diff --git a/src/http/metrics.ts b/src/http/metrics.ts new file mode 100644 index 0000000..edc5133 --- /dev/null +++ b/src/http/metrics.ts @@ -0,0 +1,44 @@ +import type { Database } from 'bun:sqlite'; +import { Metrics } from '../services/metrics'; +import { GroupSyncService } from '../services/group-sync'; + +export async function handleMetricsRequest(request: Request, db: Database): Promise { + if (request.method !== 'GET') { + return new Response('🚫 Method not allowed', { status: 405 }); + } + if (!Metrics.enabled()) { + return new Response('Metrics disabled', { status: 404 }); + } + + // Gauges de allowed_groups por estado (best-effort) + try { + const rows = db + .prepare(`SELECT status, COUNT(*) AS c FROM allowed_groups GROUP BY status`) + .all() as Array<{ status: string; c: number }>; + let pending = 0, allowed = 0, blocked = 0; + for (const r of rows) { + const s = String(r?.status || ''); + const c = Number(r?.c || 0); + if (s === 'pending') pending = c; + else if (s === 'allowed') allowed = c; + else if (s === 'blocked') blocked = c; + } + Metrics.set('allowed_groups_total_pending', pending); + Metrics.set('allowed_groups_total_allowed', allowed); + Metrics.set('allowed_groups_total_blocked', blocked); + } catch {} + + // Exponer métrica con el tiempo restante hasta el próximo group sync (o -1 si scheduler inactivo) + try { + const secs = GroupSyncService.getSecondsUntilNextGroupSync(); + const val = (secs == null || !Number.isFinite(secs)) ? -1 : secs; + Metrics.set('group_sync_seconds_until_next', val); + } catch {} + + const format = (process.env.METRICS_FORMAT || 'prom').toLowerCase() === 'json' ? 'json' : 'prom'; + const body = Metrics.render(format as any); + return new Response(body, { + status: 200, + headers: { 'Content-Type': format === 'json' ? 'application/json' : 'text/plain; version=0.0.4' } + }); +} diff --git a/src/server.ts b/src/server.ts index 90719a2..ab2a21a 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,21 +1,14 @@ //// import type { Database } from 'bun:sqlite'; -import { CommandService } from './services/command'; import { GroupSyncService } from './services/group-sync'; -import { ResponseQueue } from './services/response-queue'; -import { TaskService } from './tasks/service'; -import { WebhookManager } from './services/webhook-manager'; -import { normalizeWhatsAppId, isGroupId } from './utils/whatsapp'; -import { ensureUserExists, db } from './db'; import { ContactsService } from './services/contacts'; import { Migrator } from './db/migrator'; -import { RateLimiter } from './services/rate-limit'; -import { RemindersService } from './services/reminders'; import { Metrics } from './services/metrics'; -import { MaintenanceService } from './services/maintenance'; -import { IdentityService } from './services/identity'; import { AllowedGroups } from './services/allowed-groups'; -import { AdminService } from './services/admin'; +import { db } from './db'; +import { handleMetricsRequest } from './http/metrics'; +import { handleHealthRequest } from './http/health'; +import { startServices } from './http/bootstrap'; // Bun is available globally when running under Bun runtime declare global { @@ -61,84 +54,10 @@ export class WebhookServer { // Health check endpoint y métricas const url = new URL(request.url); if (url.pathname.endsWith('/metrics')) { - if (request.method !== 'GET') { - return new Response('🚫 Method not allowed', { status: 405 }); - } - if (!Metrics.enabled()) { - return new Response('Metrics disabled', { status: 404 }); - } - // Gauges de allowed_groups por estado (best-effort) - try { - const rows = WebhookServer.dbInstance - .prepare(`SELECT status, COUNT(*) AS c FROM allowed_groups GROUP BY status`) - .all() as Array<{ status: string; c: number }>; - let pending = 0, allowed = 0, blocked = 0; - for (const r of rows) { - const s = String(r?.status || ''); - const c = Number(r?.c || 0); - if (s === 'pending') pending = c; - else if (s === 'allowed') allowed = c; - else if (s === 'blocked') blocked = c; - } - Metrics.set('allowed_groups_total_pending', pending); - Metrics.set('allowed_groups_total_allowed', allowed); - Metrics.set('allowed_groups_total_blocked', blocked); - } catch {} - // Exponer métrica con el tiempo restante hasta el próximo group sync (o -1 si scheduler inactivo) - try { - const secs = GroupSyncService.getSecondsUntilNextGroupSync(); - const val = (secs == null || !Number.isFinite(secs)) ? -1 : secs; - Metrics.set('group_sync_seconds_until_next', val); - } catch {} - const format = (process.env.METRICS_FORMAT || 'prom').toLowerCase() === 'json' ? 'json' : 'prom'; - const body = Metrics.render(format as any); - return new Response(body, { - status: 200, - headers: { 'Content-Type': format === 'json' ? 'application/json' : 'text/plain; version=0.0.4' } - }); + return await handleMetricsRequest(request, WebhookServer.dbInstance); } if (url.pathname.endsWith('/health')) { - // /health?full=1 devuelve JSON con detalles - if (url.searchParams.get('full') === '1') { - try { - const rowG = WebhookServer.dbInstance.prepare(`SELECT COUNT(*) AS c, MAX(last_verified) AS lv FROM groups WHERE active = 1`).get() as { c?: number; lv?: string | null } | undefined; - const rowM = WebhookServer.dbInstance.prepare(`SELECT COUNT(*) AS c FROM group_members WHERE is_active = 1`).get() as { c?: number } | undefined; - const active_groups = Number(rowG?.c || 0); - const active_members = Number(rowM?.c || 0); - const lv = rowG?.lv ? String(rowG.lv) : null; - let last_sync_at: string | null = lv; - let snapshot_age_ms: number | null = null; - if (lv) { - const iso = lv.includes('T') ? lv : (lv.replace(' ', 'T') + 'Z'); - const ms = Date.parse(iso); - if (Number.isFinite(ms)) { - snapshot_age_ms = Date.now() - ms; - } - } - const lastSyncMetric = Metrics.get('last_sync_ok'); - const maxAgeRaw = Number(process.env.MAX_MEMBERS_SNAPSHOT_AGE_MS); - const maxAgeMs = Number.isFinite(maxAgeRaw) && maxAgeRaw > 0 ? maxAgeRaw : 24 * 60 * 60 * 1000; - const snapshot_fresh = typeof snapshot_age_ms === 'number' ? (snapshot_age_ms <= maxAgeMs) : false; - let last_sync_ok: number; - if (typeof lastSyncMetric === 'number') { - last_sync_ok = (lastSyncMetric === 1 && snapshot_fresh) ? 1 : 0; - } else { - // Si no hay métrica explícita, nos basamos exclusivamente en la frescura de la snapshot - last_sync_ok = snapshot_fresh ? 1 : 0; - } - const payload = { status: 'ok', active_groups, active_members, last_sync_at, snapshot_age_ms, snapshot_fresh, last_sync_ok }; - return new Response(JSON.stringify(payload), { - status: 200, - headers: { 'Content-Type': 'application/json' } - }); - } catch (e) { - return new Response(JSON.stringify({ status: 'error' }), { - status: 500, - headers: { 'Content-Type': 'application/json' } - }); - } - } - return new Response('OK', { status: 200 }); + return await handleHealthRequest(url, WebhookServer.dbInstance); } if (process.env.NODE_ENV !== 'test') { @@ -588,67 +507,7 @@ export class WebhookServer { if (process.env.NODE_ENV !== 'test') { try { - await WebhookManager.registerWebhook(); - // Add small delay to allow webhook to propagate - await new Promise(resolve => setTimeout(resolve, 1000)); - const isActive = await WebhookManager.verifyWebhook(); - if (!isActive) { - console.error('❌ Webhook verification failed - retrying in 2 seconds...'); - await new Promise(resolve => setTimeout(resolve, 2000)); - const isActiveRetry = await WebhookManager.verifyWebhook(); - if (!isActiveRetry) { - console.error('❌ Webhook verification failed after retry'); - process.exit(1); - } - } - - // Initialize groups - critical for operation - await GroupSyncService.checkInitialGroups(); - - // Start groups scheduler (periodic sync of groups) - try { - GroupSyncService.startGroupsScheduler(); - console.log('✅ Group scheduler started'); - } catch (e) { - console.error('⚠️ Failed to start Group scheduler:', e); - } - - // Initial members sync (non-blocking if fails) - try { - await GroupSyncService.syncMembersForActiveGroups(); - GroupSyncService.startMembersScheduler(); - console.log('✅ Group members scheduler started'); - } catch (e) { - console.error('⚠️ Failed to run initial members sync or start scheduler:', e); - } - - // Start response queue worker (background) - try { - await ResponseQueue.process(); - console.log('✅ ResponseQueue worker started'); - // Start cleanup scheduler (daily retention) - ResponseQueue.startCleanupScheduler(); - console.log('✅ ResponseQueue cleanup scheduler started'); - RemindersService.start(); - console.log('✅ RemindersService started'); - } catch (e) { - console.error('❌ Failed to start ResponseQueue worker or cleanup scheduler:', e); - } - - // Mantenimiento (cleanup de miembros inactivos) - try { - MaintenanceService.start(); - console.log('✅ MaintenanceService started'); - // Ejecutar reconciliación de alias una vez al arranque (one-shot) - try { - await MaintenanceService.reconcileAliasUsersOnce(); - console.log('✅ MaintenanceService: reconciliación de alias ejecutada (one-shot)'); - } catch (e2) { - console.error('⚠️ Failed to run alias reconciliation one-shot:', e2); - } - } catch (e) { - console.error('⚠️ Failed to start MaintenanceService:', e); - } + await startServices(this.dbInstance); } catch (error) { console.error('❌ Failed to setup webhook:', error instanceof Error ? error.message : error); process.exit(1);