refactor: modularizar WebhookServer y endpoints /metrics /health

Co-authored-by: aider (openrouter/openai/gpt-5) <aider@aider.chat>
main
brobert 1 month ago
parent ffad59f18f
commit 46bec524a2

@ -0,0 +1,90 @@
import type { Database } from 'bun:sqlite';
import { WebhookManager } from '../services/webhook-manager';
import { GroupSyncService } from '../services/group-sync';
import { ResponseQueue } from '../services/response-queue';
import { RemindersService } from '../services/reminders';
import { MaintenanceService } from '../services/maintenance';
export async function startServices(_db: Database): Promise<void> {
await WebhookManager.registerWebhook();
// Add small delay to allow webhook to propagate
await new Promise(resolve => setTimeout(resolve, 1000));
const isActive = await WebhookManager.verifyWebhook();
if (!isActive) {
console.error('❌ Webhook verification failed - retrying in 2 seconds...');
await new Promise(resolve => setTimeout(resolve, 2000));
const isActiveRetry = await WebhookManager.verifyWebhook();
if (!isActiveRetry) {
console.error('❌ Webhook verification failed after retry');
process.exit(1);
}
}
// Initialize groups - critical for operation
await GroupSyncService.checkInitialGroups();
// Start groups scheduler (periodic sync of groups)
try {
GroupSyncService.startGroupsScheduler();
console.log('✅ Group scheduler started');
} catch (e) {
console.error('⚠️ Failed to start Group scheduler:', e);
}
// Initial members sync (non-blocking if fails)
try {
await GroupSyncService.syncMembersForActiveGroups();
GroupSyncService.startMembersScheduler();
console.log('✅ Group members scheduler started');
} catch (e) {
console.error('⚠️ Failed to run initial members sync or start scheduler:', e);
}
// Start response queue worker (background)
try {
await ResponseQueue.process();
console.log('✅ ResponseQueue worker started');
// Start cleanup scheduler (daily retention)
ResponseQueue.startCleanupScheduler();
console.log('✅ ResponseQueue cleanup scheduler started');
RemindersService.start();
console.log('✅ RemindersService started');
} catch (e) {
console.error('❌ Failed to start ResponseQueue worker or cleanup scheduler:', e);
}
// Mantenimiento (cleanup de miembros inactivos)
try {
MaintenanceService.start();
console.log('✅ MaintenanceService started');
// Ejecutar reconciliación de alias una vez al arranque (one-shot)
try {
await MaintenanceService.reconcileAliasUsersOnce();
console.log('✅ MaintenanceService: reconciliación de alias ejecutada (one-shot)');
} catch (e2) {
console.error('⚠️ Failed to run alias reconciliation one-shot:', e2);
}
} catch (e) {
console.error('⚠️ Failed to start MaintenanceService:', e);
}
}
export function stopServices(): void {
try {
ResponseQueue.stopCleanupScheduler();
} catch {}
try {
// No existe un "stop" público de workers; paramos el lazo
(ResponseQueue as any).stop?.();
} catch {}
try {
RemindersService.stop();
} catch {}
try {
GroupSyncService.stopGroupsScheduler();
GroupSyncService.stopMembersScheduler();
} catch {}
try {
MaintenanceService.stop();
} catch {}
}

@ -0,0 +1,46 @@
import type { Database } from 'bun:sqlite';
import { Metrics } from '../services/metrics';
export async function handleHealthRequest(url: URL, db: Database): Promise<Response> {
// /health?full=1 devuelve JSON con detalles
if (url.searchParams.get('full') === '1') {
try {
const rowG = db.prepare(`SELECT COUNT(*) AS c, MAX(last_verified) AS lv FROM groups WHERE active = 1`).get() as { c?: number; lv?: string | null } | undefined;
const rowM = db.prepare(`SELECT COUNT(*) AS c FROM group_members WHERE is_active = 1`).get() as { c?: number } | undefined;
const active_groups = Number(rowG?.c || 0);
const active_members = Number(rowM?.c || 0);
const lv = rowG?.lv ? String(rowG.lv) : null;
let last_sync_at: string | null = lv;
let snapshot_age_ms: number | null = null;
if (lv) {
const iso = lv.includes('T') ? lv : (lv.replace(' ', 'T') + 'Z');
const ms = Date.parse(iso);
if (Number.isFinite(ms)) {
snapshot_age_ms = Date.now() - ms;
}
}
const lastSyncMetric = Metrics.get('last_sync_ok');
const maxAgeRaw = Number(process.env.MAX_MEMBERS_SNAPSHOT_AGE_MS);
const maxAgeMs = Number.isFinite(maxAgeRaw) && maxAgeRaw > 0 ? maxAgeRaw : 24 * 60 * 60 * 1000;
const snapshot_fresh = typeof snapshot_age_ms === 'number' ? (snapshot_age_ms <= maxAgeMs) : false;
let last_sync_ok: number;
if (typeof lastSyncMetric === 'number') {
last_sync_ok = (lastSyncMetric === 1 && snapshot_fresh) ? 1 : 0;
} else {
// Si no hay métrica explícita, nos basamos exclusivamente en la frescura de la snapshot
last_sync_ok = snapshot_fresh ? 1 : 0;
}
const payload = { status: 'ok', active_groups, active_members, last_sync_at, snapshot_age_ms, snapshot_fresh, last_sync_ok };
return new Response(JSON.stringify(payload), {
status: 200,
headers: { 'Content-Type': 'application/json' }
});
} catch (e) {
return new Response(JSON.stringify({ status: 'error' }), {
status: 500,
headers: { 'Content-Type': 'application/json' }
});
}
}
return new Response('OK', { status: 200 });
}

@ -0,0 +1,44 @@
import type { Database } from 'bun:sqlite';
import { Metrics } from '../services/metrics';
import { GroupSyncService } from '../services/group-sync';
export async function handleMetricsRequest(request: Request, db: Database): Promise<Response> {
if (request.method !== 'GET') {
return new Response('🚫 Method not allowed', { status: 405 });
}
if (!Metrics.enabled()) {
return new Response('Metrics disabled', { status: 404 });
}
// Gauges de allowed_groups por estado (best-effort)
try {
const rows = db
.prepare(`SELECT status, COUNT(*) AS c FROM allowed_groups GROUP BY status`)
.all() as Array<{ status: string; c: number }>;
let pending = 0, allowed = 0, blocked = 0;
for (const r of rows) {
const s = String(r?.status || '');
const c = Number(r?.c || 0);
if (s === 'pending') pending = c;
else if (s === 'allowed') allowed = c;
else if (s === 'blocked') blocked = c;
}
Metrics.set('allowed_groups_total_pending', pending);
Metrics.set('allowed_groups_total_allowed', allowed);
Metrics.set('allowed_groups_total_blocked', blocked);
} catch {}
// Exponer métrica con el tiempo restante hasta el próximo group sync (o -1 si scheduler inactivo)
try {
const secs = GroupSyncService.getSecondsUntilNextGroupSync();
const val = (secs == null || !Number.isFinite(secs)) ? -1 : secs;
Metrics.set('group_sync_seconds_until_next', val);
} catch {}
const format = (process.env.METRICS_FORMAT || 'prom').toLowerCase() === 'json' ? 'json' : 'prom';
const body = Metrics.render(format as any);
return new Response(body, {
status: 200,
headers: { 'Content-Type': format === 'json' ? 'application/json' : 'text/plain; version=0.0.4' }
});
}

@ -1,21 +1,14 @@
//// <reference types="bun-types" />
import type { Database } from 'bun:sqlite';
import { CommandService } from './services/command';
import { GroupSyncService } from './services/group-sync';
import { ResponseQueue } from './services/response-queue';
import { TaskService } from './tasks/service';
import { WebhookManager } from './services/webhook-manager';
import { normalizeWhatsAppId, isGroupId } from './utils/whatsapp';
import { ensureUserExists, db } from './db';
import { ContactsService } from './services/contacts';
import { Migrator } from './db/migrator';
import { RateLimiter } from './services/rate-limit';
import { RemindersService } from './services/reminders';
import { Metrics } from './services/metrics';
import { MaintenanceService } from './services/maintenance';
import { IdentityService } from './services/identity';
import { AllowedGroups } from './services/allowed-groups';
import { AdminService } from './services/admin';
import { db } from './db';
import { handleMetricsRequest } from './http/metrics';
import { handleHealthRequest } from './http/health';
import { startServices } from './http/bootstrap';
// Bun is available globally when running under Bun runtime
declare global {
@ -61,84 +54,10 @@ export class WebhookServer {
// Health check endpoint y métricas
const url = new URL(request.url);
if (url.pathname.endsWith('/metrics')) {
if (request.method !== 'GET') {
return new Response('🚫 Method not allowed', { status: 405 });
}
if (!Metrics.enabled()) {
return new Response('Metrics disabled', { status: 404 });
}
// Gauges de allowed_groups por estado (best-effort)
try {
const rows = WebhookServer.dbInstance
.prepare(`SELECT status, COUNT(*) AS c FROM allowed_groups GROUP BY status`)
.all() as Array<{ status: string; c: number }>;
let pending = 0, allowed = 0, blocked = 0;
for (const r of rows) {
const s = String(r?.status || '');
const c = Number(r?.c || 0);
if (s === 'pending') pending = c;
else if (s === 'allowed') allowed = c;
else if (s === 'blocked') blocked = c;
}
Metrics.set('allowed_groups_total_pending', pending);
Metrics.set('allowed_groups_total_allowed', allowed);
Metrics.set('allowed_groups_total_blocked', blocked);
} catch {}
// Exponer métrica con el tiempo restante hasta el próximo group sync (o -1 si scheduler inactivo)
try {
const secs = GroupSyncService.getSecondsUntilNextGroupSync();
const val = (secs == null || !Number.isFinite(secs)) ? -1 : secs;
Metrics.set('group_sync_seconds_until_next', val);
} catch {}
const format = (process.env.METRICS_FORMAT || 'prom').toLowerCase() === 'json' ? 'json' : 'prom';
const body = Metrics.render(format as any);
return new Response(body, {
status: 200,
headers: { 'Content-Type': format === 'json' ? 'application/json' : 'text/plain; version=0.0.4' }
});
return await handleMetricsRequest(request, WebhookServer.dbInstance);
}
if (url.pathname.endsWith('/health')) {
// /health?full=1 devuelve JSON con detalles
if (url.searchParams.get('full') === '1') {
try {
const rowG = WebhookServer.dbInstance.prepare(`SELECT COUNT(*) AS c, MAX(last_verified) AS lv FROM groups WHERE active = 1`).get() as { c?: number; lv?: string | null } | undefined;
const rowM = WebhookServer.dbInstance.prepare(`SELECT COUNT(*) AS c FROM group_members WHERE is_active = 1`).get() as { c?: number } | undefined;
const active_groups = Number(rowG?.c || 0);
const active_members = Number(rowM?.c || 0);
const lv = rowG?.lv ? String(rowG.lv) : null;
let last_sync_at: string | null = lv;
let snapshot_age_ms: number | null = null;
if (lv) {
const iso = lv.includes('T') ? lv : (lv.replace(' ', 'T') + 'Z');
const ms = Date.parse(iso);
if (Number.isFinite(ms)) {
snapshot_age_ms = Date.now() - ms;
}
}
const lastSyncMetric = Metrics.get('last_sync_ok');
const maxAgeRaw = Number(process.env.MAX_MEMBERS_SNAPSHOT_AGE_MS);
const maxAgeMs = Number.isFinite(maxAgeRaw) && maxAgeRaw > 0 ? maxAgeRaw : 24 * 60 * 60 * 1000;
const snapshot_fresh = typeof snapshot_age_ms === 'number' ? (snapshot_age_ms <= maxAgeMs) : false;
let last_sync_ok: number;
if (typeof lastSyncMetric === 'number') {
last_sync_ok = (lastSyncMetric === 1 && snapshot_fresh) ? 1 : 0;
} else {
// Si no hay métrica explícita, nos basamos exclusivamente en la frescura de la snapshot
last_sync_ok = snapshot_fresh ? 1 : 0;
}
const payload = { status: 'ok', active_groups, active_members, last_sync_at, snapshot_age_ms, snapshot_fresh, last_sync_ok };
return new Response(JSON.stringify(payload), {
status: 200,
headers: { 'Content-Type': 'application/json' }
});
} catch (e) {
return new Response(JSON.stringify({ status: 'error' }), {
status: 500,
headers: { 'Content-Type': 'application/json' }
});
}
}
return new Response('OK', { status: 200 });
return await handleHealthRequest(url, WebhookServer.dbInstance);
}
if (process.env.NODE_ENV !== 'test') {
@ -588,67 +507,7 @@ export class WebhookServer {
if (process.env.NODE_ENV !== 'test') {
try {
await WebhookManager.registerWebhook();
// Add small delay to allow webhook to propagate
await new Promise(resolve => setTimeout(resolve, 1000));
const isActive = await WebhookManager.verifyWebhook();
if (!isActive) {
console.error('❌ Webhook verification failed - retrying in 2 seconds...');
await new Promise(resolve => setTimeout(resolve, 2000));
const isActiveRetry = await WebhookManager.verifyWebhook();
if (!isActiveRetry) {
console.error('❌ Webhook verification failed after retry');
process.exit(1);
}
}
// Initialize groups - critical for operation
await GroupSyncService.checkInitialGroups();
// Start groups scheduler (periodic sync of groups)
try {
GroupSyncService.startGroupsScheduler();
console.log('✅ Group scheduler started');
} catch (e) {
console.error('⚠️ Failed to start Group scheduler:', e);
}
// Initial members sync (non-blocking if fails)
try {
await GroupSyncService.syncMembersForActiveGroups();
GroupSyncService.startMembersScheduler();
console.log('✅ Group members scheduler started');
} catch (e) {
console.error('⚠️ Failed to run initial members sync or start scheduler:', e);
}
// Start response queue worker (background)
try {
await ResponseQueue.process();
console.log('✅ ResponseQueue worker started');
// Start cleanup scheduler (daily retention)
ResponseQueue.startCleanupScheduler();
console.log('✅ ResponseQueue cleanup scheduler started');
RemindersService.start();
console.log('✅ RemindersService started');
} catch (e) {
console.error('❌ Failed to start ResponseQueue worker or cleanup scheduler:', e);
}
// Mantenimiento (cleanup de miembros inactivos)
try {
MaintenanceService.start();
console.log('✅ MaintenanceService started');
// Ejecutar reconciliación de alias una vez al arranque (one-shot)
try {
await MaintenanceService.reconcileAliasUsersOnce();
console.log('✅ MaintenanceService: reconciliación de alias ejecutada (one-shot)');
} catch (e2) {
console.error('⚠️ Failed to run alias reconciliation one-shot:', e2);
}
} catch (e) {
console.error('⚠️ Failed to start MaintenanceService:', e);
}
await startServices(this.dbInstance);
} catch (error) {
console.error('❌ Failed to setup webhook:', error instanceof Error ? error.message : error);
process.exit(1);

Loading…
Cancel
Save