From 2f24806a06932755f5e4b5e016ae0f5fdecd052e Mon Sep 17 00:00:00 2001 From: brobert Date: Mon, 10 Nov 2025 14:26:17 +0100 Subject: [PATCH] =?UTF-8?q?refactor:=20modularizar=20group-sync=20y=20a?= =?UTF-8?q?=C3=B1adir=20API,=20cache=20y=20repo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: aider (openrouter/openai/gpt-5) --- src/services/group-sync.ts | 413 +-------------------------- src/services/group-sync/api.ts | 264 +++++++++++++++++ src/services/group-sync/cache.ts | 15 + src/services/group-sync/reconcile.ts | 79 +++++ src/services/group-sync/repo.ts | 83 ++++++ 5 files changed, 453 insertions(+), 401 deletions(-) create mode 100644 src/services/group-sync/api.ts create mode 100644 src/services/group-sync/cache.ts create mode 100644 src/services/group-sync/reconcile.ts create mode 100644 src/services/group-sync/repo.ts diff --git a/src/services/group-sync.ts b/src/services/group-sync.ts index 6c0aed0..af07582 100644 --- a/src/services/group-sync.ts +++ b/src/services/group-sync.ts @@ -7,6 +7,10 @@ import { AllowedGroups } from './allowed-groups'; import { ResponseQueue } from './response-queue'; import { toIsoSqlUTC } from '../utils/datetime'; import { publishGroupCoveragePrompt } from './onboarding'; +import { fetchGroupsFromAPI as apiFetchGroups, fetchGroupMembersFromAPI as apiFetchMembers } from './group-sync/api'; +import { upsertGroups as repoUpsertGroups } from './group-sync/repo'; +import { cacheActiveGroups as computeActiveCache } from './group-sync/cache'; +import { reconcileGroupMembers as reconcileMembers } from './group-sync/reconcile'; // In-memory cache for active groups // const activeGroupsCache = new Map(); // groupId -> groupName @@ -219,85 +223,14 @@ export class GroupSyncService { } private static async fetchGroupsFromAPI(): Promise { - const url = `${process.env.EVOLUTION_API_URL}/group/fetchAllGroups/${process.env.EVOLUTION_API_INSTANCE}?getParticipants=false`; - console.log('ℹ️ Fetching groups from API:', { - url: `${url}...`, // Log partial URL for security - communityId: process.env.WHATSAPP_COMMUNITY_ID, - time: new Date().toISOString() - }); - - try { - const response = await fetch(url, { - method: 'GET', - headers: { - apikey: String(process.env.EVOLUTION_API_KEY || ''), - }, - httpVersion: '2', - timeout: 320000 // 120 second timeout - }); - - if (!response.ok) { - const errorBody = await response.text().catch(() => 'Unable to read error body'); - console.error('❌ API request failed:', { - status: response.status, - statusText: response.statusText, - headers: Object.fromEntries(response.headers.entries()), - body: errorBody - }); - throw new Error(`API request failed: ${response.status} ${response.statusText}`); - } - - const rawResponse = await response.text(); - console.log('ℹ️ Raw API response length:', rawResponse.length); - - // Parse response which could be either: - // 1. Direct array of groups: [{group1}, {group2}] - // 2. Or wrapped response: {status, message, response} - let groups: EvolutionGroup[] = []; - try { - const parsed = JSON.parse(rawResponse); - if (Array.isArray(parsed)) { - // Case 1: Direct array response - groups = parsed as EvolutionGroup[]; - console.log('ℹ️ Received direct array of', groups.length, 'groups'); - } else if (parsed.response && Array.isArray(parsed.response)) { - // Case 2: Wrapped response - if (parsed.status !== 'success') { - throw new Error(`API error: ${parsed.message || 'Unknown error'}`); - } - groups = parsed.response as EvolutionGroup[]; - console.log('ℹ️ Received wrapped response with', groups.length, 'groups'); - } else { - throw new Error('Invalid API response format - expected array or wrapped response'); - } - } catch (e) { - console.error('❌ Failed to parse API response:', { - error: e instanceof Error ? e.message : String(e), - responseSample: rawResponse.substring(0, 100) + '...' - }); - throw e; - } - - if (!groups.length) { - console.warn('⚠️ API returned empty group list'); - } - return groups; - } catch (error) { - console.error('❌ Failed to fetch groups:', { - error: error instanceof Error ? error.message : String(error), - stack: error instanceof Error ? error.stack : undefined - }); - throw error; - } + return await apiFetchGroups() as unknown as EvolutionGroup[]; } private static cacheActiveGroups(): void { - const groups = this.dbInstance - .prepare('SELECT id, name FROM groups WHERE active = TRUE AND COALESCE(is_community,0) = 0 AND COALESCE(archived,0) = 0') - .all() as Array<{ id: string; name: string | null }>; + const map = computeActiveCache(this.dbInstance); this.activeGroupsCache.clear(); - for (const group of groups) { - this.activeGroupsCache.set(String(group.id), String(group.name ?? '')); + for (const [id, name] of map.entries()) { + this.activeGroupsCache.set(id, name); } console.log(`Cached ${this.activeGroupsCache.size} active groups`); } @@ -415,78 +348,8 @@ export class GroupSyncService { } private static async upsertGroups(groups: EvolutionGroup[]): Promise<{ added: number; updated: number }> { - let added = 0; - let updated = 0; - - const transactionResult = this.dbInstance.transaction(() => { - // First mark all groups as inactive and update verification timestamp - const inactiveResult = this.dbInstance.prepare(` - UPDATE groups - SET active = FALSE, - last_verified = CURRENT_TIMESTAMP - WHERE active = TRUE - `).run(); - console.log('ℹ️ Grupos marcados como inactivos:', { - count: inactiveResult.changes, - lastId: inactiveResult.lastInsertRowid - }); - - for (const group of groups as EvolutionGroup[]) { - const existing = this.dbInstance.prepare('SELECT 1 FROM groups WHERE id = ?').get(group.id); - console.log('Checking group:', group.id, 'exists:', !!existing); - - const isCommunityFlag = !!(((group as any)?.isCommunity) || ((group as any)?.is_community) || ((group as any)?.isCommunityAnnounce) || ((group as any)?.is_community_announce)); - - if (existing) { - const updateResult = this.dbInstance.prepare( - 'UPDATE groups SET name = ?, community_id = COALESCE(?, community_id), is_community = ?, active = TRUE, last_verified = CURRENT_TIMESTAMP WHERE id = ?' - ).run((group as EvolutionGroup).subject, (group as EvolutionGroup).linkedParent || null, isCommunityFlag ? 1 : 0, (group as EvolutionGroup).id); - console.log('Updated group:', (group as EvolutionGroup).id, 'result:', updateResult); - updated++; - } else { - const insertResult = this.dbInstance.prepare( - 'INSERT INTO groups (id, community_id, name, active, is_community) VALUES (?, ?, ?, TRUE, ?)' - ).run((group as EvolutionGroup).id, (((group as EvolutionGroup).linkedParent ?? '')), (group as EvolutionGroup).subject, isCommunityFlag ? 1 : 0); - console.log('Added group:', (group as EvolutionGroup).id, 'result:', insertResult); - added++; - } - // Propagar subject a allowed_groups: - // - Si es grupo "comunidad/announce", bloquearlo. - // - En caso contrario, upsert pending y label. - try { - AllowedGroups.dbInstance = this.dbInstance; - if (isCommunityFlag) { - AllowedGroups.setStatus(group.id, 'blocked', group.subject); - } else { - AllowedGroups.upsertPending(group.id, group.subject, null); - } - } catch {} - // Si es grupo de comunidad, limpiar residuos: revocar tokens y desactivar membresías - if (isCommunityFlag) { - try { - this.dbInstance.prepare(` - UPDATE calendar_tokens - SET revoked_at = strftime('%Y-%m-%d %H:%M:%f','now') - WHERE group_id = ? AND revoked_at IS NULL - `).run(group.id); - } catch {} - try { - this.dbInstance.prepare(` - UPDATE group_members - SET is_active = 0 - WHERE group_id = ? AND is_active = 1 - `).run(group.id); - } catch {} - } - } - - return { added, updated }; - }); - try { - const result = transactionResult(); - console.log(`Group sync completed: ${result.added} added, ${result.updated} updated`); - return result; + return await repoUpsertGroups(this.dbInstance, groups as any); } catch (error) { console.error('Error in upsertGroups:', error); throw error; @@ -505,186 +368,7 @@ export class GroupSyncService { // Fetch members for a single group from Evolution API. Uses a robust parser to accept multiple payload shapes. private static async fetchGroupMembersFromAPI(groupId: string): Promise> { - // Cooldown global por rate limit 429 (evitar ráfagas) - try { - if (this._membersGlobalCooldownUntil && Date.now() < this._membersGlobalCooldownUntil) { - console.warn('⚠️ Skipping members fetch due to global cooldown'); - return []; - } - } catch {} - // En tests se recomienda simular fetch; no retornamos temprano para permitir validar el parser - - // 1) Intento preferente: endpoint de Evolution "Find Group Members" - // Documentación provista: GET /group/participants/{instance} - // Suponemos soporte de query param groupJid - try { - const url1 = `${process.env.EVOLUTION_API_URL}/group/participants/${process.env.EVOLUTION_API_INSTANCE}?groupJid=${encodeURIComponent(groupId)}`; - console.log('ℹ️ Fetching members via /group/participants:', { groupId }); - - const r1 = await fetch(url1, { - method: 'GET', - headers: { apikey: String(process.env.EVOLUTION_API_KEY || '') }, - httpVersion: '2', - timeout: 320000 - }); - - if (r1.ok) { - const raw1 = await r1.text(); - let parsed1: any; - try { - parsed1 = JSON.parse(raw1); - } catch (e) { - console.error('❌ Failed to parse /group/participants JSON:', String(e)); - throw e; - } - - const participantsArr = Array.isArray(parsed1?.participants) ? parsed1.participants : null; - if (participantsArr) { - const result: Array<{ userId: string; isAdmin: boolean }> = []; - for (const p of participantsArr) { - let jid: string | null = null; - let isAdmin = false; - - if (typeof p === 'string') { - jid = p; - } else if (p && typeof p === 'object') { - const rawId = p.id || p?.user?.id || p.user || null; - const rawJid = p.jid || null; // preferir .jid cuando exista - jid = rawJid || rawId || null; - - // Aprender mapping alias→número si vienen ambos - if (rawId && rawJid) { - try { IdentityService.upsertAlias(String(rawId), String(rawJid), 'group.participants'); } catch {} - } - - if (typeof p.isAdmin === 'boolean') { - isAdmin = p.isAdmin; - } else if (typeof p.admin === 'string') { - isAdmin = p.admin === 'admin' || p.admin === 'superadmin'; - } else if (typeof p.role === 'string') { - isAdmin = p.role.toLowerCase().includes('admin'); - } - } - - let norm = normalizeWhatsAppId(jid); - if (!norm) { - const digits = (jid || '').replace(/\D+/g, ''); - norm = digits || null; - } - if (!norm) continue; - result.push({ userId: norm, isAdmin }); - } - let resolved: Array<{ userId: string; isAdmin: boolean }>; - try { - const map = IdentityService.resolveMany(result.map(r => r.userId)); - resolved = result.map(r => ({ userId: map.get(r.userId) || r.userId, isAdmin: r.isAdmin })); - } catch { - resolved = result; - } - return resolved; - } - // Si no viene en el formato esperado, caemos al plan B - console.warn('⚠️ /group/participants responded without participants array, falling back to fetchAllGroups'); - } else { - const body = await r1.text().catch(() => ''); - if (r1.status === 429) { - console.warn(`⚠️ /group/participants rate-limited (429): ${body.slice(0, 200)}`); - this._membersGlobalCooldownUntil = Date.now() + 2 * 60 * 1000; - return []; - } - console.warn(`⚠️ /group/participants failed: ${r1.status} ${r1.statusText} - ${body.slice(0, 200)}. Falling back to fetchAllGroups`); - } - } catch (e) { - console.warn('⚠️ Error calling /group/participants, falling back to fetchAllGroups:', e instanceof Error ? e.message : String(e)); - } - - // 2) Fallback robusto: fetchAllGroups(getParticipants=true) y filtrar por groupId - const url = `${process.env.EVOLUTION_API_URL}/group/fetchAllGroups/${process.env.EVOLUTION_API_INSTANCE}?getParticipants=true`; - console.log('ℹ️ Fetching members via fetchAllGroups (participants=true):', { groupId }); - - const response = await fetch(url, { - method: 'GET', - headers: { apikey: String(process.env.EVOLUTION_API_KEY || '') }, - httpVersion: '2', - timeout: 320000 - }); - if (!response.ok) { - const body = await response.text().catch(() => ''); - if (response.status === 429) { - console.warn(`⚠️ fetchAllGroups(getParticipants=true) rate-limited (429): ${body.slice(0, 200)}`); - this._membersGlobalCooldownUntil = Date.now() + 2 * 60 * 1000; - return []; - } - throw new Error(`Failed to fetch groups with participants: ${response.status} ${response.statusText} - ${body.slice(0,200)}`); - } - const raw = await response.text(); - let parsed: any; - try { - parsed = JSON.parse(raw); - } catch (e) { - console.error('❌ Failed to parse members response JSON:', String(e)); - throw e; - } - - let groups: any[] = []; - if (Array.isArray(parsed)) { - groups = parsed; - } else if (parsed && Array.isArray(parsed.response)) { - groups = parsed.response; - } else { - throw new Error('Invalid response format for groups with participants'); - } - - const g = groups.find((g: any) => g?.id === groupId); - if (!g) { - console.warn(`⚠️ Group ${groupId} not present in fetchAllGroups(getParticipants=true) response`); - return []; - } - const participants = Array.isArray(g.participants) ? g.participants : []; - - const result: Array<{ userId: string; isAdmin: boolean }> = []; - for (const p of participants) { - let jid: string | null = null; - let isAdmin = false; - - if (typeof p === 'string') { - jid = p; - } else if (p && typeof p === 'object') { - const rawId = p.id || p?.user?.id || p.user || null; - const rawJid = p.jid || null; // preferir .jid cuando exista - jid = rawJid || rawId || null; - - // Aprender mapping alias→número si vienen ambos - if (rawId && rawJid) { - try { IdentityService.upsertAlias(String(rawId), String(rawJid), 'group.participants'); } catch {} - } - - if (typeof p.isAdmin === 'boolean') { - isAdmin = p.isAdmin; - } else if (typeof p.admin === 'string') { - // common shapes: 'admin', 'superadmin' or null - isAdmin = p.admin === 'admin' || p.admin === 'superadmin'; - } else if (typeof p.role === 'string') { - isAdmin = p.role.toLowerCase().includes('admin'); - } - } - - let norm = normalizeWhatsAppId(jid); - if (!norm) { - const digits = (jid || '').replace(/\D+/g, ''); - norm = digits || null; - } - if (!norm) continue; - result.push({ userId: norm, isAdmin }); - } - let resolved: Array<{ userId: string; isAdmin: boolean }>; - try { - const map = IdentityService.resolveMany(result.map(r => r.userId)); - resolved = result.map(r => ({ userId: map.get(r.userId) || r.userId, isAdmin: r.isAdmin })); - } catch { - resolved = result; - } - return resolved; + return await apiFetchMembers(groupId); } /** @@ -708,82 +392,9 @@ export class GroupSyncService { * Idempotente y atómico por grupo. */ static reconcileGroupMembers(groupId: string, snapshot: Array<{ userId: string; isAdmin: boolean }>, nowIso?: string): { added: number; updated: number; deactivated: number } { - if (!groupId || !Array.isArray(snapshot)) { - throw new Error('Invalid arguments for reconcileGroupMembers'); - } - const now = nowIso || toIsoSqlUTC(new Date()); - let added = 0, updated = 0, deactivated = 0; - - // Build quick lookup from snapshot - const incoming = new Map(); - for (const m of snapshot) { - if (!m?.userId) continue; - incoming.set(m.userId, { isAdmin: !!m.isAdmin }); - } - - this.dbInstance.transaction(() => { - // Load existing membership for group - const existingRows = this.dbInstance.prepare(` - SELECT user_id, is_admin, is_active - FROM group_members - WHERE group_id = ? - `).all(groupId) as Array<{ user_id: string; is_admin: number; is_active: number }>; - - const existing = new Map(existingRows.map(r => [r.user_id, { isAdmin: !!r.is_admin, isActive: !!r.is_active }])); - - // Upsert present members - for (const [userId, { isAdmin }] of incoming.entries()) { - // Ensure user exists (FK) - ensureUserExists(userId, this.dbInstance); - - const row = existing.get(userId); - if (!row) { - // insert - this.dbInstance.prepare(` - INSERT INTO group_members (group_id, user_id, is_admin, is_active, first_seen_at, last_seen_at) - VALUES (?, ?, ?, 1, ?, ?) - `).run(groupId, userId, isAdmin ? 1 : 0, now, now); - added++; - } else { - // update if needed - let roleChanged = row.isAdmin !== isAdmin; - if (!row.isActive || roleChanged) { - this.dbInstance.prepare(` - UPDATE group_members - SET is_active = 1, - is_admin = ?, - last_seen_at = ?, - last_role_change_at = CASE WHEN ? THEN ? ELSE last_role_change_at END - WHERE group_id = ? AND user_id = ? - `).run(isAdmin ? 1 : 0, now, roleChanged ? 1 : 0, roleChanged ? now : null, groupId, userId); - updated++; - } else { - // still update last_seen_at to reflect presence - this.dbInstance.prepare(` - UPDATE group_members - SET last_seen_at = ? - WHERE group_id = ? AND user_id = ? - `).run(now, groupId, userId); - } - } - } - - // Deactivate absent members - for (const [userId, state] of existing.entries()) { - if (!incoming.has(userId) && state.isActive) { - this.dbInstance.prepare(` - UPDATE group_members - SET is_active = 0, - last_seen_at = ? - WHERE group_id = ? AND user_id = ? - `).run(now, groupId, userId); - deactivated++; - } - } - })(); - + const res = reconcileMembers(this.dbInstance, groupId, snapshot, nowIso || toIsoSqlUTC(new Date())); try { this.computeAndPublishAliasCoverage(groupId); } catch {} - return { added, updated, deactivated }; + return res; } private static computeAndPublishAliasCoverage(groupId: string): void { diff --git a/src/services/group-sync/api.ts b/src/services/group-sync/api.ts new file mode 100644 index 0000000..d49c59f --- /dev/null +++ b/src/services/group-sync/api.ts @@ -0,0 +1,264 @@ +import { normalizeWhatsAppId } from '../../utils/whatsapp'; +import { IdentityService } from '../identity'; + +export type ApiEvolutionGroup = { + id: string; + subject: string; + linkedParent?: string; + // flags que pueden venir en la API y usamos para filtrar comunidades/announce + isCommunity?: any; + is_community?: any; + isCommunityAnnounce?: any; + is_community_announce?: any; +}; + +/** + * Obtiene todos los grupos desde Evolution API. + * Acepta respuesta como array directo o envuelta { status, response }. + */ +export async function fetchGroupsFromAPI(): Promise { + const url = `${process.env.EVOLUTION_API_URL}/group/fetchAllGroups/${process.env.EVOLUTION_API_INSTANCE}?getParticipants=false`; + console.log('ℹ️ Fetching groups from API:', { + url: `${url}...`, + communityId: process.env.WHATSAPP_COMMUNITY_ID, + time: new Date().toISOString() + }); + + try { + const response = await fetch(url, { + method: 'GET', + headers: { apikey: String(process.env.EVOLUTION_API_KEY || '') }, + httpVersion: '2', + timeout: 320000 + }); + + if (!response.ok) { + const errorBody = await response.text().catch(() => 'Unable to read error body'); + console.error('❌ API request failed:', { + status: response.status, + statusText: response.statusText, + headers: Object.fromEntries(response.headers.entries()), + body: errorBody + }); + throw new Error(`API request failed: ${response.status} ${response.statusText}`); + } + + const rawResponse = await response.text(); + console.log('ℹ️ Raw API response length:', rawResponse.length); + + let groups: ApiEvolutionGroup[] = []; + try { + const parsed = JSON.parse(rawResponse); + if (Array.isArray(parsed)) { + groups = parsed as ApiEvolutionGroup[]; + console.log('ℹ️ Received direct array of', groups.length, 'groups'); + } else if (parsed?.response && Array.isArray(parsed.response)) { + if (parsed.status !== 'success') { + throw new Error(`API error: ${parsed.message || 'Unknown error'}`); + } + groups = parsed.response as ApiEvolutionGroup[]; + console.log('ℹ️ Received wrapped response with', groups.length, 'groups'); + } else { + throw new Error('Invalid API response format - expected array or wrapped response'); + } + } catch (e) { + console.error('❌ Failed to parse API response:', { + error: e instanceof Error ? e.message : String(e), + responseSample: rawResponse.substring(0, 100) + '...' + }); + throw e; + } + + if (!groups.length) { + console.warn('⚠️ API returned empty group list'); + } + return groups; + } catch (error) { + console.error('❌ Failed to fetch groups:', { + error: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined + }); + throw error; + } +} + +// Cooldown global simple para 429 de miembros +let membersGlobalCooldownUntil = 0; + +/** + * Obtiene miembros de un grupo desde Evolution API, con parser robusto y fallback a fetchAllGroups. + */ +export async function fetchGroupMembersFromAPI(groupId: string): Promise> { + try { + if (membersGlobalCooldownUntil && Date.now() < membersGlobalCooldownUntil) { + console.warn('⚠️ Skipping members fetch due to global cooldown'); + return []; + } + } catch {} + + // 1) Endpoint preferente: /group/participants + try { + const url1 = `${process.env.EVOLUTION_API_URL}/group/participants/${process.env.EVOLUTION_API_INSTANCE}?groupJid=${encodeURIComponent(groupId)}`; + console.log('ℹ️ Fetching members via /group/participants:', { groupId }); + + const r1 = await fetch(url1, { + method: 'GET', + headers: { apikey: String(process.env.EVOLUTION_API_KEY || '') }, + httpVersion: '2', + timeout: 320000 + }); + + if (r1.ok) { + const raw1 = await r1.text(); + let parsed1: any; + try { + parsed1 = JSON.parse(raw1); + } catch (e) { + console.error('❌ Failed to parse /group/participants JSON:', String(e)); + throw e; + } + + const participantsArr = Array.isArray(parsed1?.participants) ? parsed1.participants : null; + if (participantsArr) { + const result: Array<{ userId: string; isAdmin: boolean }> = []; + for (const p of participantsArr) { + let jid: string | null = null; + let isAdmin = false; + + if (typeof p === 'string') { + jid = p; + } else if (p && typeof p === 'object') { + const rawId = p.id || p?.user?.id || p.user || null; + const rawJid = p.jid || null; + jid = rawJid || rawId || null; + + if (rawId && rawJid) { + try { IdentityService.upsertAlias(String(rawId), String(rawJid), 'group.participants'); } catch {} + } + + if (typeof p.isAdmin === 'boolean') { + isAdmin = p.isAdmin; + } else if (typeof p.admin === 'string') { + isAdmin = p.admin === 'admin' || p.admin === 'superadmin'; + } else if (typeof p.role === 'string') { + isAdmin = p.role.toLowerCase().includes('admin'); + } + } + + let norm = normalizeWhatsAppId(jid); + if (!norm) { + const digits = (jid || '').replace(/\D+/g, ''); + norm = digits || null; + } + if (!norm) continue; + result.push({ userId: norm, isAdmin }); + } + let resolved: Array<{ userId: string; isAdmin: boolean }>; + try { + const map = IdentityService.resolveMany(result.map(r => r.userId)); + resolved = result.map(r => ({ userId: map.get(r.userId) || r.userId, isAdmin: r.isAdmin })); + } catch { + resolved = result; + } + return resolved; + } + console.warn('⚠️ /group/participants responded without participants array, falling back to fetchAllGroups'); + } else { + const body = await r1.text().catch(() => ''); + if (r1.status === 429) { + console.warn(`⚠️ /group/participants rate-limited (429): ${body.slice(0, 200)}`); + membersGlobalCooldownUntil = Date.now() + 2 * 60 * 1000; + return []; + } + console.warn(`⚠️ /group/participants failed: ${r1.status} ${r1.statusText} - ${body.slice(0, 200)}. Falling back to fetchAllGroups`); + } + } catch (e) { + console.warn('⚠️ Error calling /group/participants, falling back to fetchAllGroups:', e instanceof Error ? e.message : String(e)); + } + + // 2) Fallback: fetchAllGroups(getParticipants=true) + const url = `${process.env.EVOLUTION_API_URL}/group/fetchAllGroups/${process.env.EVOLUTION_API_INSTANCE}?getParticipants=true`; + console.log('ℹ️ Fetching members via fetchAllGroups (participants=true):', { groupId }); + + const response = await fetch(url, { + method: 'GET', + headers: { apikey: String(process.env.EVOLUTION_API_KEY || '') }, + httpVersion: '2', + timeout: 320000 + }); + if (!response.ok) { + const body = await response.text().catch(() => ''); + if (response.status === 429) { + console.warn(`⚠️ fetchAllGroups(getParticipants=true) rate-limited (429): ${body.slice(0, 200)}`); + membersGlobalCooldownUntil = Date.now() + 2 * 60 * 1000; + return []; + } + throw new Error(`Failed to fetch groups with participants: ${response.status} ${response.statusText} - ${body.slice(0,200)}`); + } + const raw = await response.text(); + let parsed: any; + try { + parsed = JSON.parse(raw); + } catch (e) { + console.error('❌ Failed to parse members response JSON:', String(e)); + throw e; + } + + let groups: any[] = []; + if (Array.isArray(parsed)) { + groups = parsed; + } else if (parsed && Array.isArray(parsed.response)) { + groups = parsed.response; + } else { + throw new Error('Invalid response format for groups with participants'); + } + + const g = groups.find((g: any) => g?.id === groupId); + if (!g) { + console.warn(`⚠️ Group ${groupId} not present in fetchAllGroups(getParticipants=true) response`); + return []; + } + const participants = Array.isArray(g.participants) ? g.participants : []; + + const result: Array<{ userId: string; isAdmin: boolean }> = []; + for (const p of participants) { + let jid: string | null = null; + let isAdmin = false; + + if (typeof p === 'string') { + jid = p; + } else if (p && typeof p === 'object') { + const rawId = p.id || p?.user?.id || p.user || null; + const rawJid = p.jid || null; + jid = rawJid || rawId || null; + + if (rawId && rawJid) { + try { IdentityService.upsertAlias(String(rawId), String(rawJid), 'group.participants'); } catch {} + } + + if (typeof p.isAdmin === 'boolean') { + isAdmin = p.isAdmin; + } else if (typeof p.admin === 'string') { + isAdmin = p.admin === 'admin' || p.admin === 'superadmin'; + } else if (typeof p.role === 'string') { + isAdmin = p.role.toLowerCase().includes('admin'); + } + } + + let norm = normalizeWhatsAppId(jid); + if (!norm) { + const digits = (jid || '').replace(/\D+/g, ''); + norm = digits || null; + } + if (!norm) continue; + result.push({ userId: norm, isAdmin }); + } + let resolved: Array<{ userId: string; isAdmin: boolean }>; + try { + const map = IdentityService.resolveMany(result.map(r => r.userId)); + resolved = result.map(r => ({ userId: map.get(r.userId) || r.userId, isAdmin: r.isAdmin })); + } catch { + resolved = result; + } + return resolved; +} diff --git a/src/services/group-sync/cache.ts b/src/services/group-sync/cache.ts new file mode 100644 index 0000000..f354cad --- /dev/null +++ b/src/services/group-sync/cache.ts @@ -0,0 +1,15 @@ +import type { Database } from 'bun:sqlite'; + +/** + * Construye un mapa de grupos activos (no comunidad, no archivados) id -> nombre. + */ +export function cacheActiveGroups(db: Database): Map { + const map = new Map(); + const rows = db + .prepare('SELECT id, name FROM groups WHERE active = TRUE AND COALESCE(is_community,0) = 0 AND COALESCE(archived,0) = 0') + .all() as Array<{ id: string; name: string | null }>; + for (const g of rows) { + map.set(String(g.id), String(g.name ?? '')); + } + return map; +} diff --git a/src/services/group-sync/reconcile.ts b/src/services/group-sync/reconcile.ts new file mode 100644 index 0000000..3fbee0e --- /dev/null +++ b/src/services/group-sync/reconcile.ts @@ -0,0 +1,79 @@ +import type { Database } from 'bun:sqlite'; +import { ensureUserExists } from '../../db'; +import { toIsoSqlUTC } from '../../utils/datetime'; + +/** + * Reconciliación idempotente de membresías de un grupo. + */ +export function reconcileGroupMembers( + db: Database, + groupId: string, + snapshot: Array<{ userId: string; isAdmin: boolean }>, + nowIso?: string +): { added: number; updated: number; deactivated: number } { + if (!groupId || !Array.isArray(snapshot)) { + throw new Error('Invalid arguments for reconcileGroupMembers'); + } + const now = nowIso || toIsoSqlUTC(new Date()); + let added = 0, updated = 0, deactivated = 0; + + const incoming = new Map(); + for (const m of snapshot) { + if (!m?.userId) continue; + incoming.set(m.userId, { isAdmin: !!m.isAdmin }); + } + + db.transaction(() => { + const existingRows = db.prepare(` + SELECT user_id, is_admin, is_active + FROM group_members + WHERE group_id = ? + `).all(groupId) as Array<{ user_id: string; is_admin: number; is_active: number }>; + const existing = new Map(existingRows.map(r => [r.user_id, { isAdmin: !!r.is_admin, isActive: !!r.is_active }])); + + for (const [userId, { isAdmin }] of incoming.entries()) { + ensureUserExists(userId, db); + const row = existing.get(userId); + if (!row) { + db.prepare(` + INSERT INTO group_members (group_id, user_id, is_admin, is_active, first_seen_at, last_seen_at) + VALUES (?, ?, ?, 1, ?, ?) + `).run(groupId, userId, isAdmin ? 1 : 0, now, now); + added++; + } else { + const roleChanged = row.isAdmin !== isAdmin; + if (!row.isActive || roleChanged) { + db.prepare(` + UPDATE group_members + SET is_active = 1, + is_admin = ?, + last_seen_at = ?, + last_role_change_at = CASE WHEN ? THEN ? ELSE last_role_change_at END + WHERE group_id = ? AND user_id = ? + `).run(isAdmin ? 1 : 0, now, roleChanged ? 1 : 0, roleChanged ? now : null, groupId, userId); + updated++; + } else { + db.prepare(` + UPDATE group_members + SET last_seen_at = ? + WHERE group_id = ? AND user_id = ? + `).run(now, groupId, userId); + } + } + } + + for (const [userId, state] of existing.entries()) { + if (!incoming.has(userId) && state.isActive) { + db.prepare(` + UPDATE group_members + SET is_active = 0, + last_seen_at = ? + WHERE group_id = ? AND user_id = ? + `).run(now, groupId, userId); + deactivated++; + } + } + })(); + + return { added, updated, deactivated }; +} diff --git a/src/services/group-sync/repo.ts b/src/services/group-sync/repo.ts new file mode 100644 index 0000000..6d856a1 --- /dev/null +++ b/src/services/group-sync/repo.ts @@ -0,0 +1,83 @@ +import type { Database } from 'bun:sqlite'; +import { AllowedGroups } from '../allowed-groups'; + +/** + * Upsert de grupos desde snapshot de Evolution API. + * - Marca todos como inactivos al inicio (manteniendo last_verified). + * - Inserta/actualiza cada grupo, detectando flags de comunidad. + * - Propaga subject a allowed_groups: comunidades -> blocked, resto -> pending con label. + * - Limpia residuos de comunidades (revocar tokens, desactivar membresías). + */ +export async function upsertGroups( + db: Database, + groups: Array<{ id: string; subject: string; linkedParent?: string } & Record> +): Promise<{ added: number; updated: number }> { + let added = 0; + let updated = 0; + + const tx = db.transaction(() => { + const inactiveResult = db.prepare(` + UPDATE groups + SET active = FALSE, + last_verified = CURRENT_TIMESTAMP + WHERE active = TRUE + `).run(); + console.log('ℹ️ Grupos marcados como inactivos:', { + count: inactiveResult.changes, + lastId: inactiveResult.lastInsertRowid + }); + + for (const group of groups) { + const existing = db.prepare('SELECT 1 FROM groups WHERE id = ?').get(group.id); + + const isCommunityFlag = !!(group?.isCommunity || group?.is_community || group?.isCommunityAnnounce || group?.is_community_announce); + + if (existing) { + const updateResult = db.prepare( + 'UPDATE groups SET name = ?, community_id = COALESCE(?, community_id), is_community = ?, active = TRUE, last_verified = CURRENT_TIMESTAMP WHERE id = ?' + ).run(group.subject, group.linkedParent || null, isCommunityFlag ? 1 : 0, group.id); + console.log('Updated group:', group.id, 'result:', updateResult); + updated++; + } else { + const insertResult = db.prepare( + 'INSERT INTO groups (id, community_id, name, active, is_community) VALUES (?, ?, ?, TRUE, ?)' + ).run(group.id, (group.linkedParent ?? ''), group.subject, isCommunityFlag ? 1 : 0); + console.log('Added group:', group.id, 'result:', insertResult); + added++; + } + + // Propagar sujeto a allowed_groups + try { + (AllowedGroups as any).dbInstance = db; + if (isCommunityFlag) { + AllowedGroups.setStatus(group.id, 'blocked', group.subject); + } else { + AllowedGroups.upsertPending(group.id, group.subject, null); + } + } catch {} + + if (isCommunityFlag) { + try { + db.prepare(` + UPDATE calendar_tokens + SET revoked_at = strftime('%Y-%m-%d %H:%M:%f','now') + WHERE group_id = ? AND revoked_at IS NULL + `).run(group.id); + } catch {} + try { + db.prepare(` + UPDATE group_members + SET is_active = 0 + WHERE group_id = ? AND is_active = 1 + `).run(group.id); + } catch {} + } + } + + return { added, updated }; + }); + + const result = tx(); + console.log(`Group sync completed: ${result.added} added, ${result.updated} updated`); + return result; +}