From 28264f9369050090b3b30fcb7047f0e19b090473 Mon Sep 17 00:00:00 2001 From: brobert Date: Tue, 21 Oct 2025 19:06:16 +0200 Subject: [PATCH] feat: sincronizar solo grupos cambiados y aprender usuario al mensaje Co-authored-by: aider (openrouter/openai/gpt-5) --- src/server.ts | 13 ++--- src/services/group-sync.ts | 89 ++++++++++++++++++++++++++++++++- src/services/webhook-manager.ts | 18 ++++++- 3 files changed, 111 insertions(+), 9 deletions(-) diff --git a/src/server.ts b/src/server.ts index 6d833ae..48ecb29 100644 --- a/src/server.ts +++ b/src/server.ts @@ -209,9 +209,11 @@ export class WebhookServer { console.log('ℹ️ Handling groups upsert event:', { rawEvent: evt }); } try { - await GroupSyncService.syncGroups(); + const res = await GroupSyncService.syncGroups(); GroupSyncService.refreshActiveGroupsCache(); - await GroupSyncService.syncMembersForActiveGroups(); + if (Array.isArray((res as any).changedActive) && (res as any).changedActive.length > 0) { + await GroupSyncService.syncMembersForGroups((res as any).changedActive); + } } catch (e) { console.error('❌ Error handling groups.upsert:', e); } @@ -435,15 +437,14 @@ export class WebhookServer { return; } if (process.env.NODE_ENV !== 'test') { - console.log('ℹ️ Group not active in cache — ensuring group and triggering quick members sync'); + console.log('ℹ️ Group not active in cache — ensuring group (no immediate members sync)'); } try { GroupSyncService.ensureGroupExists(data.key.remoteJid); - GroupSyncService.refreshActiveGroupsCache(); - await GroupSyncService.syncMembersForGroup(data.key.remoteJid); + try { GroupSyncService.upsertMemberSeen(data.key.remoteJid, normalizedSenderId); } catch {} } catch (e) { if (process.env.NODE_ENV !== 'test') { - console.error('⚠️ Failed to ensure/sync group on-the-fly:', e); + console.error('⚠️ Failed to ensure group on-the-fly:', e); } } } diff --git a/src/services/group-sync.ts b/src/services/group-sync.ts index 1578d09..caaf5df 100644 --- a/src/services/group-sync.ts +++ b/src/services/group-sync.ts @@ -74,14 +74,16 @@ export class GroupSyncService { private static _membersSchedulerRunning = false; private static _groupsIntervalMs: number | null = null; private static _groupsNextTickAt: number | null = null; + private static _membersGlobalCooldownUntil: number = 0; - static async syncGroups(force: boolean = false): Promise<{ added: number; updated: number }> { + static async syncGroups(force: boolean = false): Promise<{ added: number; updated: number; changedActive: string[] }> { if (!this.shouldSync(force)) { return { added: 0, updated: 0 }; } const startedAt = Date.now(); Metrics.inc('sync_runs_total'); + let newlyActivatedIds: string[] = []; try { const groups = await this.fetchGroupsFromAPI(); console.log('ℹ️ Grupos crudos de la API:', JSON.stringify(groups, null, 2)); @@ -106,6 +108,17 @@ export class GroupSyncService { afterMap.set(String(r.id), { active: Number(r.active || 0), archived: Number(r.archived || 0), is_community: Number((r as any).is_community || 0), name: r.name ? String(r.name) : null }); } + // Determinar grupos que pasaron a estar activos (nuevos o reactivados) + const newlyActivatedLocal: string[] = []; + for (const [id, a] of afterMap.entries()) { + const b = beforeMap.get(id); + const becameActive = Number(a.active) === 1 && Number(a.archived) === 0 && Number((a as any).is_community || 0) === 0; + if (becameActive && (!b || Number(b.active) !== 1)) { + newlyActivatedLocal.push(id); + } + } + newlyActivatedIds = newlyActivatedLocal; + const newlyDeactivated: Array<{ id: string; name: string | null }> = []; for (const [id, b] of beforeMap.entries()) { const a = afterMap.get(id); @@ -176,7 +189,7 @@ export class GroupSyncService { // Duración opcional Metrics.set('last_sync_duration_ms', Date.now() - (typeof startedAt !== 'undefined' ? startedAt : Date.now())); - return result; + return { ...result, changedActive: newlyActivatedIds }; } catch (error) { console.error('Group sync failed:', error); Metrics.inc('sync_errors_total'); @@ -485,6 +498,13 @@ export class GroupSyncService { // Fetch members for a single group from Evolution API. Uses a robust parser to accept multiple payload shapes. private static async fetchGroupMembersFromAPI(groupId: string): Promise> { + // Cooldown global por rate limit 429 (evitar ráfagas) + try { + if (this._membersGlobalCooldownUntil && Date.now() < this._membersGlobalCooldownUntil) { + console.warn('⚠️ Skipping members fetch due to global cooldown'); + return []; + } + } catch {} // En tests se recomienda simular fetch; no retornamos temprano para permitir validar el parser // 1) Intento preferente: endpoint de Evolution "Find Group Members" @@ -560,6 +580,11 @@ export class GroupSyncService { console.warn('⚠️ /group/participants responded without participants array, falling back to fetchAllGroups'); } else { const body = await r1.text().catch(() => ''); + if (r1.status === 429) { + console.warn(`⚠️ /group/participants rate-limited (429): ${body.slice(0, 200)}`); + this._membersGlobalCooldownUntil = Date.now() + 2 * 60 * 1000; + return []; + } console.warn(`⚠️ /group/participants failed: ${r1.status} ${r1.statusText} - ${body.slice(0, 200)}. Falling back to fetchAllGroups`); } } catch (e) { @@ -578,6 +603,11 @@ export class GroupSyncService { }); if (!response.ok) { const body = await response.text().catch(() => ''); + if (response.status === 429) { + console.warn(`⚠️ fetchAllGroups(getParticipants=true) rate-limited (429): ${body.slice(0, 200)}`); + this._membersGlobalCooldownUntil = Date.now() + 2 * 60 * 1000; + return []; + } throw new Error(`Failed to fetch groups with participants: ${response.status} ${response.statusText} - ${body.slice(0,200)}`); } const raw = await response.text(); @@ -650,6 +680,22 @@ export class GroupSyncService { return resolved; } + /** + * Upsert optimista de la membresía a partir de un mensaje recibido en el grupo. + * Marca al usuario como activo y actualiza last_seen_at sin consultar Evolution API. + */ + static upsertMemberSeen(groupId: string, userId: string, nowIso?: string): void { + if (!groupId || !userId) return; + const now = nowIso || new Date().toISOString().replace('T', ' ').replace('Z', ''); + this.dbInstance.prepare(` + INSERT INTO group_members (group_id, user_id, is_admin, is_active, first_seen_at, last_seen_at) + VALUES (?, ?, 0, 1, ?, ?) + ON CONFLICT(group_id, user_id) DO UPDATE SET + is_active = 1, + last_seen_at = excluded.last_seen_at + `).run(groupId, userId, now, now); + } + /** * Reconciles current DB membership state for a group with a fresh snapshot. * Idempotente y atómico por grupo. @@ -921,6 +967,45 @@ export class GroupSyncService { return { groups, added, updated, deactivated }; } + static async syncMembersForGroups(ids: string[]): Promise<{ groups: number; added: number; updated: number; deactivated: number }> { + if (process.env.NODE_ENV === 'test') { + return { groups: 0, added: 0, updated: 0, deactivated: 0 }; + } + if (!Array.isArray(ids) || ids.length === 0) { + return { groups: 0, added: 0, updated: 0, deactivated: 0 }; + } + + const mode = String(process.env.GROUP_GATING_MODE || 'off').toLowerCase(); + const enforce = mode === 'enforce'; + if (enforce) { + try { (AllowedGroups as any).dbInstance = this.dbInstance; } catch {} + } + + let groups = 0, added = 0, updated = 0, deactivated = 0; + for (const groupId of ids) { + try { + if (enforce) { + try { + if (!AllowedGroups.isAllowed(groupId)) { + try { Metrics.inc('sync_skipped_group_total'); } catch {} + continue; + } + } catch {} + } + const snapshot = await this.fetchGroupMembersFromAPI(groupId); + const res = this.reconcileGroupMembers(groupId, snapshot); + groups++; + added += res.added; + updated += res.updated; + deactivated += res.deactivated; + } catch (e) { + console.error(`❌ Failed to sync members for group ${groupId}:`, e instanceof Error ? e.message : String(e)); + } + } + console.log('ℹ️ Targeted members sync summary:', { groups, added, updated, deactivated }); + return { groups, added, updated, deactivated }; + } + public static refreshActiveGroupsCache(): void { this.cacheActiveGroups(); } diff --git a/src/services/webhook-manager.ts b/src/services/webhook-manager.ts index 6988e44..73f3086 100644 --- a/src/services/webhook-manager.ts +++ b/src/services/webhook-manager.ts @@ -70,13 +70,29 @@ export class WebhookManager { } private static getConfig(): { webhook: WebhookConfig } { + const fromEnv = String(process.env.WEBHOOK_EVENTS || '') + .split(',') + .map(s => s.trim()) + .filter(Boolean); + const allow = [ + 'APPLICATION_STARTUP', + 'MESSAGES_UPSERT', + 'GROUPS_UPSERT', + 'MESSAGES_UPDATE', + 'MESSAGES_DELETE', + 'PRESENCE_UPDATE', + 'CONTACTS_UPDATE', + 'CHATS_UPDATE' + ]; + const events = (fromEnv.length ? fromEnv : ['APPLICATION_STARTUP','MESSAGES_UPSERT','GROUPS_UPSERT']) + .filter(e => allow.includes(e)); return { webhook: { url: process.env.WEBHOOK_URL!, enabled: true, webhook_by_events: true, webhook_base64: true, - events: this.REQUIRED_EVENTS, + events } }; }