feat: agregar sincronización de miembros con Evolution API y webhooks

Co-authored-by: aider (openrouter/openai/gpt-5) <aider@aider.chat>
pull/1/head
borja 2 months ago
parent e7d3596005
commit a092a25234

@ -27,6 +27,7 @@ Un chatbot de WhatsApp para gestionar tareas en grupos, integrado con Evolution
- Recordatorios por DM (daily/weekly) por usuario; evita duplicados y respeta TZ.
- Cola de respuestas persistente con reintentos (backoff exponencial + jitter) y recuperación tras reinicios.
- Nombres amigables vía caché de contactos (sin llamadas de red en tests).
- Sincronización de miembros de grupos (snapshot periódica; tolerante a fallos en webhooks).
- Mensajes compactos con emojis y cursiva; fechas dd/MM; vencidas con ⚠️.
## Requisitos
@ -47,6 +48,7 @@ Un chatbot de WhatsApp para gestionar tareas en grupos, integrado con Evolution
- TZ: zona horaria para “hoy/mañana” y render de fechas; por defecto Europe/Madrid.
- NOTIFY_GROUP_ON_CREATE: si “true”, envía resumen al grupo al crear (por defecto false).
- GROUP_SYNC_INTERVAL_MS: intervalo de sync de grupos; por defecto 24h (mín 10s en desarrollo).
- GROUP_MEMBERS_SYNC_INTERVAL_MS: intervalo de sync de miembros; por defecto 6h (mín 10s en desarrollo).
- RATE_LIMIT_PER_MIN: límite por usuario (tokens/min); por defecto 15.
- RATE_LIMIT_BURST: capacidad del bucket; por defecto = RATE_LIMIT_PER_MIN.
- Opcionales — cola de respuestas

@ -6,7 +6,7 @@ Estado general: listo para piloto con la junta directiva; 170 tests pasando. Rie
- Servidor webhook
- Endpoint /health, validación de entorno, extracción robusta de texto (conversation/extended/captions).
- Detección DM vs grupo y política “solo DM”.
- Registro/verificación de webhooks y sincronización de grupos activos con caché.
- Registro/verificación de webhooks y sincronización de grupos activos con caché; sincronización periódica de miembros.
- Rate limiting por usuario (15/min por defecto; desactivado en tests; aviso con cooldown).
- Base de datos y migraciones
- Inicialización con PRAGMA FK y timestamps de alta precisión.
@ -39,7 +39,6 @@ Estado general: listo para piloto con la junta directiva; 170 tests pasando. Rie
- Permisos/roles y pertenencia a grupos (si se requiere).
- Edición/eliminación de tareas.
- Política de caché de contactos (TTL configurable, invalidación más fina).
- Sincronización mínima de miembros (cacheada; no bloqueante).
## Roadmap y próximas iteraciones
- Iteración A — Operativa lista para piloto

@ -117,6 +117,18 @@ export class WebhookServer {
}
ContactsService.updateFromWebhook(payload.data);
break;
case 'groups.upsert':
if (process.env.NODE_ENV !== 'test') {
console.log(' Handling groups upsert event:', { rawEvent: evt });
}
try {
await GroupSyncService.syncGroups();
GroupSyncService.refreshActiveGroupsCache();
await GroupSyncService.syncMembersForActiveGroups();
} catch (e) {
console.error('❌ Error handling groups.upsert:', e);
}
break;
// Other events will be added later
}
@ -307,6 +319,15 @@ export class WebhookServer {
// Initialize groups - critical for operation
await GroupSyncService.checkInitialGroups();
// Initial members sync (non-blocking if fails)
try {
await GroupSyncService.syncMembersForActiveGroups();
GroupSyncService.startMembersScheduler();
console.log('✅ Group members scheduler started');
} catch (e) {
console.error('⚠️ Failed to run initial members sync or start scheduler:', e);
}
// Start response queue worker (background)
try {
await ResponseQueue.process();

@ -66,6 +66,8 @@ export class GroupSyncService {
return interval;
}
private static lastSyncAttempt = 0;
private static _membersTimer: any = null;
private static _membersSchedulerRunning = false;
static async syncGroups(): Promise<{ added: number; updated: number }> {
if (!this.shouldSync()) {
@ -340,6 +342,70 @@ export class GroupSyncService {
// Fetch members for a single group from Evolution API. Uses a robust parser to accept multiple payload shapes.
private static async fetchGroupMembersFromAPI(groupId: string): Promise<Array<{ userId: string; isAdmin: boolean }>> {
// Evitar llamadas de red en tests
if (process.env.NODE_ENV === 'test') return [];
// 1) Intento preferente: endpoint de Evolution "Find Group Members"
// Documentación provista: GET /group/participants/{instance}
// Suponemos soporte de query param groupJid
try {
const url1 = `${env.EVOLUTION_API_URL}/group/participants/${env.EVOLUTION_API_INSTANCE}?groupJid=${encodeURIComponent(groupId)}`;
console.log(' Fetching members via /group/participants:', { groupId });
const r1 = await fetch(url1, {
method: 'GET',
headers: { apikey: env.EVOLUTION_API_KEY },
httpVersion: '2',
timeout: 320000
});
if (r1.ok) {
const raw1 = await r1.text();
let parsed1: any;
try {
parsed1 = JSON.parse(raw1);
} catch (e) {
console.error('❌ Failed to parse /group/participants JSON:', String(e));
throw e;
}
const participantsArr = Array.isArray(parsed1?.participants) ? parsed1.participants : null;
if (participantsArr) {
const result: Array<{ userId: string; isAdmin: boolean }> = [];
for (const p of participantsArr) {
let jid: string | null = null;
let isAdmin = false;
if (typeof p === 'string') {
jid = p;
} else if (p && typeof p === 'object') {
jid = p.id || p.jid || p.user || p?.user?.id || null;
if (typeof p.isAdmin === 'boolean') {
isAdmin = p.isAdmin;
} else if (typeof p.admin === 'string') {
isAdmin = p.admin === 'admin' || p.admin === 'superadmin';
} else if (typeof p.role === 'string') {
isAdmin = p.role.toLowerCase().includes('admin');
}
}
const norm = normalizeWhatsAppId(jid);
if (!norm) continue;
result.push({ userId: norm, isAdmin });
}
return result;
}
// Si no viene en el formato esperado, caemos al plan B
console.warn('⚠️ /group/participants responded without participants array, falling back to fetchAllGroups');
} else {
const body = await r1.text().catch(() => '');
console.warn(`⚠️ /group/participants failed: ${r1.status} ${r1.statusText} - ${body.slice(0, 200)}. Falling back to fetchAllGroups`);
}
} catch (e) {
console.warn('⚠️ Error calling /group/participants, falling back to fetchAllGroups:', e instanceof Error ? e.message : String(e));
}
// 2) Fallback robusto: fetchAllGroups(getParticipants=true) y filtrar por groupId
const url = `${env.EVOLUTION_API_URL}/group/fetchAllGroups/${env.EVOLUTION_API_INSTANCE}?getParticipants=true`;
console.log(' Fetching members via fetchAllGroups (participants=true):', { groupId });
@ -491,6 +557,9 @@ export class GroupSyncService {
* Devuelve contadores agregados.
*/
static async syncMembersForActiveGroups(): Promise<{ groups: number; added: number; updated: number; deactivated: number }> {
if (process.env.NODE_ENV === 'test') {
return { groups: 0, added: 0, updated: 0, deactivated: 0 };
}
// ensure cache is populated
if (this.activeGroupsCache.size === 0) {
this.cacheActiveGroups();
@ -511,4 +580,35 @@ export class GroupSyncService {
console.log(' Members sync summary:', { groups, added, updated, deactivated });
return { groups, added, updated, deactivated };
}
public static refreshActiveGroupsCache(): void {
this.cacheActiveGroups();
}
public static startMembersScheduler(): void {
if (process.env.NODE_ENV === 'test') return;
if (this._membersSchedulerRunning) return;
this._membersSchedulerRunning = true;
// Intervalo por defecto 6h; configurable por env; mínimo 10s en desarrollo
const raw = process.env.GROUP_MEMBERS_SYNC_INTERVAL_MS;
let interval = Number.isFinite(Number(raw)) && Number(raw) > 0 ? Number(raw) : 6 * 60 * 60 * 1000;
if (process.env.NODE_ENV === 'development' && interval < 10000) {
interval = 10000;
}
this._membersTimer = setInterval(() => {
this.syncMembersForActiveGroups().catch(err => {
console.error('❌ Members scheduler run error:', err);
});
}, interval);
}
public static stopMembersScheduler(): void {
this._membersSchedulerRunning = false;
if (this._membersTimer) {
clearInterval(this._membersTimer);
this._membersTimer = null;
}
}
}

Loading…
Cancel
Save