@ -7,6 +7,10 @@ import { AllowedGroups } from './allowed-groups';
import { ResponseQueue } from './response-queue' ;
import { toIsoSqlUTC } from '../utils/datetime' ;
import { publishGroupCoveragePrompt } from './onboarding' ;
import { fetchGroupsFromAPI as apiFetchGroups , fetchGroupMembersFromAPI as apiFetchMembers } from './group-sync/api' ;
import { upsertGroups as repoUpsertGroups } from './group-sync/repo' ;
import { cacheActiveGroups as computeActiveCache } from './group-sync/cache' ;
import { reconcileGroupMembers as reconcileMembers } from './group-sync/reconcile' ;
// In-memory cache for active groups
// const activeGroupsCache = new Map<string, string>(); // groupId -> groupName
@ -219,85 +223,14 @@ export class GroupSyncService {
}
private static async fetchGroupsFromAPI ( ) : Promise < EvolutionGroup [ ] > {
const url = ` ${ process . env . EVOLUTION_API_URL } /group/fetchAllGroups/ ${ process . env . EVOLUTION_API_INSTANCE } ?getParticipants=false ` ;
console . log ( 'ℹ ️ Fetching groups from API:' , {
url : ` ${ url } ... ` , // Log partial URL for security
communityId : process.env.WHATSAPP_COMMUNITY_ID ,
time : new Date ( ) . toISOString ( )
} ) ;
try {
const response = await fetch ( url , {
method : 'GET' ,
headers : {
apikey : String ( process . env . EVOLUTION_API_KEY || '' ) ,
} ,
httpVersion : '2' ,
timeout : 320000 // 120 second timeout
} ) ;
if ( ! response . ok ) {
const errorBody = await response . text ( ) . catch ( ( ) = > 'Unable to read error body' ) ;
console . error ( '❌ API request failed:' , {
status : response.status ,
statusText : response.statusText ,
headers : Object.fromEntries ( response . headers . entries ( ) ) ,
body : errorBody
} ) ;
throw new Error ( ` API request failed: ${ response . status } ${ response . statusText } ` ) ;
}
const rawResponse = await response . text ( ) ;
console . log ( 'ℹ ️ Raw API response length:' , rawResponse . length ) ;
// Parse response which could be either:
// 1. Direct array of groups: [{group1}, {group2}]
// 2. Or wrapped response: {status, message, response}
let groups : EvolutionGroup [ ] = [ ] ;
try {
const parsed = JSON . parse ( rawResponse ) ;
if ( Array . isArray ( parsed ) ) {
// Case 1: Direct array response
groups = parsed as EvolutionGroup [ ] ;
console . log ( 'ℹ ️ Received direct array of' , groups . length , 'groups' ) ;
} else if ( parsed . response && Array . isArray ( parsed . response ) ) {
// Case 2: Wrapped response
if ( parsed . status !== 'success' ) {
throw new Error ( ` API error: ${ parsed . message || 'Unknown error' } ` ) ;
}
groups = parsed . response as EvolutionGroup [ ] ;
console . log ( 'ℹ ️ Received wrapped response with' , groups . length , 'groups' ) ;
} else {
throw new Error ( 'Invalid API response format - expected array or wrapped response' ) ;
}
} catch ( e ) {
console . error ( '❌ Failed to parse API response:' , {
error : e instanceof Error ? e.message : String ( e ) ,
responseSample : rawResponse.substring ( 0 , 100 ) + '...'
} ) ;
throw e ;
}
if ( ! groups . length ) {
console . warn ( '⚠️ API returned empty group list' ) ;
}
return groups ;
} catch ( error ) {
console . error ( '❌ Failed to fetch groups:' , {
error : error instanceof Error ? error.message : String ( error ) ,
stack : error instanceof Error ? error.stack : undefined
} ) ;
throw error ;
}
return await apiFetchGroups ( ) as unknown as EvolutionGroup [ ] ;
}
private static cacheActiveGroups ( ) : void {
const groups = this . dbInstance
. prepare ( 'SELECT id, name FROM groups WHERE active = TRUE AND COALESCE(is_community,0) = 0 AND COALESCE(archived,0) = 0' )
. all ( ) as Array < { id : string ; name : string | null } > ;
const map = computeActiveCache ( this . dbInstance ) ;
this . activeGroupsCache . clear ( ) ;
for ( const group of groups ) {
this . activeGroupsCache . set ( String ( group . id ) , String ( group . name ? ? '' ) ) ;
for ( const [ id , name ] of map . entries ( ) ) {
this . activeGroupsCache . set ( id , name ) ;
}
console . log ( ` Cached ${ this . activeGroupsCache . size } active groups ` ) ;
}
@ -415,78 +348,8 @@ export class GroupSyncService {
}
private static async upsertGroups ( groups : EvolutionGroup [ ] ) : Promise < { added : number ; updated : number } > {
let added = 0 ;
let updated = 0 ;
const transactionResult = this . dbInstance . transaction ( ( ) = > {
// First mark all groups as inactive and update verification timestamp
const inactiveResult = this . dbInstance . prepare ( `
UPDATE groups
SET active = FALSE ,
last_verified = CURRENT_TIMESTAMP
WHERE active = TRUE
` ).run();
console . log ( 'ℹ ️ Grupos marcados como inactivos:' , {
count : inactiveResult.changes ,
lastId : inactiveResult.lastInsertRowid
} ) ;
for ( const group of groups as EvolutionGroup [ ] ) {
const existing = this . dbInstance . prepare ( 'SELECT 1 FROM groups WHERE id = ?' ) . get ( group . id ) ;
console . log ( 'Checking group:' , group . id , 'exists:' , ! ! existing ) ;
const isCommunityFlag = ! ! ( ( ( group as any ) ? . isCommunity ) || ( ( group as any ) ? . is_community ) || ( ( group as any ) ? . isCommunityAnnounce ) || ( ( group as any ) ? . is_community_announce ) ) ;
if ( existing ) {
const updateResult = this . dbInstance . prepare (
'UPDATE groups SET name = ?, community_id = COALESCE(?, community_id), is_community = ?, active = TRUE, last_verified = CURRENT_TIMESTAMP WHERE id = ?'
) . run ( ( group as EvolutionGroup ) . subject , ( group as EvolutionGroup ) . linkedParent || null , isCommunityFlag ? 1 : 0 , ( group as EvolutionGroup ) . id ) ;
console . log ( 'Updated group:' , ( group as EvolutionGroup ) . id , 'result:' , updateResult ) ;
updated ++ ;
} else {
const insertResult = this . dbInstance . prepare (
'INSERT INTO groups (id, community_id, name, active, is_community) VALUES (?, ?, ?, TRUE, ?)'
) . run ( ( group as EvolutionGroup ) . id , ( ( ( group as EvolutionGroup ) . linkedParent ? ? '' ) ) , ( group as EvolutionGroup ) . subject , isCommunityFlag ? 1 : 0 ) ;
console . log ( 'Added group:' , ( group as EvolutionGroup ) . id , 'result:' , insertResult ) ;
added ++ ;
}
// Propagar subject a allowed_groups:
// - Si es grupo "comunidad/announce", bloquearlo.
// - En caso contrario, upsert pending y label.
try {
AllowedGroups . dbInstance = this . dbInstance ;
if ( isCommunityFlag ) {
AllowedGroups . setStatus ( group . id , 'blocked' , group . subject ) ;
} else {
AllowedGroups . upsertPending ( group . id , group . subject , null ) ;
}
} catch { }
// Si es grupo de comunidad, limpiar residuos: revocar tokens y desactivar membresías
if ( isCommunityFlag ) {
try {
this . dbInstance . prepare ( `
UPDATE calendar_tokens
SET revoked_at = strftime ( '%Y-%m-%d %H:%M:%f' , 'now' )
WHERE group_id = ? AND revoked_at IS NULL
` ).run(group.id);
} catch { }
try {
this . dbInstance . prepare ( `
UPDATE group_members
SET is_active = 0
WHERE group_id = ? AND is_active = 1
` ).run(group.id);
} catch { }
}
}
return { added , updated } ;
} ) ;
try {
const result = transactionResult ( ) ;
console . log ( ` Group sync completed: ${ result . added } added, ${ result . updated } updated ` ) ;
return result ;
return await repoUpsertGroups ( this . dbInstance , groups as any ) ;
} catch ( error ) {
console . error ( 'Error in upsertGroups:' , error ) ;
throw error ;
@ -505,186 +368,7 @@ export class GroupSyncService {
// Fetch members for a single group from Evolution API. Uses a robust parser to accept multiple payload shapes.
private static async fetchGroupMembersFromAPI ( groupId : string ) : Promise < Array < { userId : string ; isAdmin : boolean } > > {
// Cooldown global por rate limit 429 (evitar ráfagas)
try {
if ( this . _membersGlobalCooldownUntil && Date . now ( ) < this . _membersGlobalCooldownUntil ) {
console . warn ( '⚠️ Skipping members fetch due to global cooldown' ) ;
return [ ] ;
}
} catch { }
// En tests se recomienda simular fetch; no retornamos temprano para permitir validar el parser
// 1) Intento preferente: endpoint de Evolution "Find Group Members"
// Documentación provista: GET /group/participants/{instance}
// Suponemos soporte de query param groupJid
try {
const url1 = ` ${ process . env . EVOLUTION_API_URL } /group/participants/ ${ process . env . EVOLUTION_API_INSTANCE } ?groupJid= ${ encodeURIComponent ( groupId ) } ` ;
console . log ( 'ℹ ️ Fetching members via /group/participants:' , { groupId } ) ;
const r1 = await fetch ( url1 , {
method : 'GET' ,
headers : { apikey : String ( process . env . EVOLUTION_API_KEY || '' ) } ,
httpVersion : '2' ,
timeout : 320000
} ) ;
if ( r1 . ok ) {
const raw1 = await r1 . text ( ) ;
let parsed1 : any ;
try {
parsed1 = JSON . parse ( raw1 ) ;
} catch ( e ) {
console . error ( '❌ Failed to parse /group/participants JSON:' , String ( e ) ) ;
throw e ;
}
const participantsArr = Array . isArray ( parsed1 ? . participants ) ? parsed1.participants : null ;
if ( participantsArr ) {
const result : Array < { userId : string ; isAdmin : boolean } > = [ ] ;
for ( const p of participantsArr ) {
let jid : string | null = null ;
let isAdmin = false ;
if ( typeof p === 'string' ) {
jid = p ;
} else if ( p && typeof p === 'object' ) {
const rawId = p . id || p ? . user ? . id || p . user || null ;
const rawJid = p . jid || null ; // preferir .jid cuando exista
jid = rawJid || rawId || null ;
// Aprender mapping alias→número si vienen ambos
if ( rawId && rawJid ) {
try { IdentityService . upsertAlias ( String ( rawId ) , String ( rawJid ) , 'group.participants' ) ; } catch { }
}
if ( typeof p . isAdmin === 'boolean' ) {
isAdmin = p . isAdmin ;
} else if ( typeof p . admin === 'string' ) {
isAdmin = p . admin === 'admin' || p . admin === 'superadmin' ;
} else if ( typeof p . role === 'string' ) {
isAdmin = p . role . toLowerCase ( ) . includes ( 'admin' ) ;
}
}
let norm = normalizeWhatsAppId ( jid ) ;
if ( ! norm ) {
const digits = ( jid || '' ) . replace ( /\D+/g , '' ) ;
norm = digits || null ;
}
if ( ! norm ) continue ;
result . push ( { userId : norm , isAdmin } ) ;
}
let resolved : Array < { userId : string ; isAdmin : boolean } > ;
try {
const map = IdentityService . resolveMany ( result . map ( r = > r . userId ) ) ;
resolved = result . map ( r = > ( { userId : map.get ( r . userId ) || r . userId , isAdmin : r.isAdmin } ) ) ;
} catch {
resolved = result ;
}
return resolved ;
}
// Si no viene en el formato esperado, caemos al plan B
console . warn ( '⚠️ /group/participants responded without participants array, falling back to fetchAllGroups' ) ;
} else {
const body = await r1 . text ( ) . catch ( ( ) = > '' ) ;
if ( r1 . status === 429 ) {
console . warn ( ` ⚠️ /group/participants rate-limited (429): ${ body . slice ( 0 , 200 ) } ` ) ;
this . _membersGlobalCooldownUntil = Date . now ( ) + 2 * 60 * 1000 ;
return [ ] ;
}
console . warn ( ` ⚠️ /group/participants failed: ${ r1 . status } ${ r1 . statusText } - ${ body . slice ( 0 , 200 ) } . Falling back to fetchAllGroups ` ) ;
}
} catch ( e ) {
console . warn ( '⚠️ Error calling /group/participants, falling back to fetchAllGroups:' , e instanceof Error ? e.message : String ( e ) ) ;
}
// 2) Fallback robusto: fetchAllGroups(getParticipants=true) y filtrar por groupId
const url = ` ${ process . env . EVOLUTION_API_URL } /group/fetchAllGroups/ ${ process . env . EVOLUTION_API_INSTANCE } ?getParticipants=true ` ;
console . log ( 'ℹ ️ Fetching members via fetchAllGroups (participants=true):' , { groupId } ) ;
const response = await fetch ( url , {
method : 'GET' ,
headers : { apikey : String ( process . env . EVOLUTION_API_KEY || '' ) } ,
httpVersion : '2' ,
timeout : 320000
} ) ;
if ( ! response . ok ) {
const body = await response . text ( ) . catch ( ( ) = > '' ) ;
if ( response . status === 429 ) {
console . warn ( ` ⚠️ fetchAllGroups(getParticipants=true) rate-limited (429): ${ body . slice ( 0 , 200 ) } ` ) ;
this . _membersGlobalCooldownUntil = Date . now ( ) + 2 * 60 * 1000 ;
return [ ] ;
}
throw new Error ( ` Failed to fetch groups with participants: ${ response . status } ${ response . statusText } - ${ body . slice ( 0 , 200 ) } ` ) ;
}
const raw = await response . text ( ) ;
let parsed : any ;
try {
parsed = JSON . parse ( raw ) ;
} catch ( e ) {
console . error ( '❌ Failed to parse members response JSON:' , String ( e ) ) ;
throw e ;
}
let groups : any [ ] = [ ] ;
if ( Array . isArray ( parsed ) ) {
groups = parsed ;
} else if ( parsed && Array . isArray ( parsed . response ) ) {
groups = parsed . response ;
} else {
throw new Error ( 'Invalid response format for groups with participants' ) ;
}
const g = groups . find ( ( g : any ) = > g ? . id === groupId ) ;
if ( ! g ) {
console . warn ( ` ⚠️ Group ${ groupId } not present in fetchAllGroups(getParticipants=true) response ` ) ;
return [ ] ;
}
const participants = Array . isArray ( g . participants ) ? g . participants : [ ] ;
const result : Array < { userId : string ; isAdmin : boolean } > = [ ] ;
for ( const p of participants ) {
let jid : string | null = null ;
let isAdmin = false ;
if ( typeof p === 'string' ) {
jid = p ;
} else if ( p && typeof p === 'object' ) {
const rawId = p . id || p ? . user ? . id || p . user || null ;
const rawJid = p . jid || null ; // preferir .jid cuando exista
jid = rawJid || rawId || null ;
// Aprender mapping alias→número si vienen ambos
if ( rawId && rawJid ) {
try { IdentityService . upsertAlias ( String ( rawId ) , String ( rawJid ) , 'group.participants' ) ; } catch { }
}
if ( typeof p . isAdmin === 'boolean' ) {
isAdmin = p . isAdmin ;
} else if ( typeof p . admin === 'string' ) {
// common shapes: 'admin', 'superadmin' or null
isAdmin = p . admin === 'admin' || p . admin === 'superadmin' ;
} else if ( typeof p . role === 'string' ) {
isAdmin = p . role . toLowerCase ( ) . includes ( 'admin' ) ;
}
}
let norm = normalizeWhatsAppId ( jid ) ;
if ( ! norm ) {
const digits = ( jid || '' ) . replace ( /\D+/g , '' ) ;
norm = digits || null ;
}
if ( ! norm ) continue ;
result . push ( { userId : norm , isAdmin } ) ;
}
let resolved : Array < { userId : string ; isAdmin : boolean } > ;
try {
const map = IdentityService . resolveMany ( result . map ( r = > r . userId ) ) ;
resolved = result . map ( r = > ( { userId : map.get ( r . userId ) || r . userId , isAdmin : r.isAdmin } ) ) ;
} catch {
resolved = result ;
}
return resolved ;
return await apiFetchMembers ( groupId ) ;
}
/ * *
@ -708,82 +392,9 @@ export class GroupSyncService {
* Idempotente y at ó mico por grupo .
* /
static reconcileGroupMembers ( groupId : string , snapshot : Array < { userId : string ; isAdmin : boolean } > , nowIso? : string ) : { added : number ; updated : number ; deactivated : number } {
if ( ! groupId || ! Array . isArray ( snapshot ) ) {
throw new Error ( 'Invalid arguments for reconcileGroupMembers' ) ;
}
const now = nowIso || toIsoSqlUTC ( new Date ( ) ) ;
let added = 0 , updated = 0 , deactivated = 0 ;
// Build quick lookup from snapshot
const incoming = new Map < string , { isAdmin : boolean } > ( ) ;
for ( const m of snapshot ) {
if ( ! m ? . userId ) continue ;
incoming . set ( m . userId , { isAdmin : ! ! m . isAdmin } ) ;
}
this . dbInstance . transaction ( ( ) = > {
// Load existing membership for group
const existingRows = this . dbInstance . prepare ( `
SELECT user_id , is_admin , is_active
FROM group_members
WHERE group_id = ?
` ).all(groupId) as Array<{ user_id: string; is_admin: number; is_active: number }>;
const existing = new Map ( existingRows . map ( r = > [ r . user_id , { isAdmin : ! ! r . is_admin , isActive : ! ! r . is_active } ] ) ) ;
// Upsert present members
for ( const [ userId , { isAdmin } ] of incoming . entries ( ) ) {
// Ensure user exists (FK)
ensureUserExists ( userId , this . dbInstance ) ;
const row = existing . get ( userId ) ;
if ( ! row ) {
// insert
this . dbInstance . prepare ( `
INSERT INTO group_members ( group_id , user_id , is_admin , is_active , first_seen_at , last_seen_at )
VALUES ( ? , ? , ? , 1 , ? , ? )
` ).run(groupId, userId, isAdmin ? 1 : 0, now, now);
added ++ ;
} else {
// update if needed
let roleChanged = row . isAdmin !== isAdmin ;
if ( ! row . isActive || roleChanged ) {
this . dbInstance . prepare ( `
UPDATE group_members
SET is_active = 1 ,
is_admin = ? ,
last_seen_at = ? ,
last_role_change_at = CASE WHEN ? THEN ? ELSE last_role_change_at END
WHERE group_id = ? AND user_id = ?
` ).run(isAdmin ? 1 : 0, now, roleChanged ? 1 : 0, roleChanged ? now : null, groupId, userId);
updated ++ ;
} else {
// still update last_seen_at to reflect presence
this . dbInstance . prepare ( `
UPDATE group_members
SET last_seen_at = ?
WHERE group_id = ? AND user_id = ?
` ).run(now, groupId, userId);
}
}
}
// Deactivate absent members
for ( const [ userId , state ] of existing . entries ( ) ) {
if ( ! incoming . has ( userId ) && state . isActive ) {
this . dbInstance . prepare ( `
UPDATE group_members
SET is_active = 0 ,
last_seen_at = ?
WHERE group_id = ? AND user_id = ?
` ).run(now, groupId, userId);
deactivated ++ ;
}
}
} ) ( ) ;
const res = reconcileMembers ( this . dbInstance , groupId , snapshot , nowIso || toIsoSqlUTC ( new Date ( ) ) ) ;
try { this . computeAndPublishAliasCoverage ( groupId ) ; } catch { }
return { added , updated , deactivated } ;
return res ;
}
private static computeAndPublishAliasCoverage ( groupId : string ) : void {