Compare commits

...

2 Commits

Author SHA1 Message Date
brobert 5efcbbc98b refactor: usar getLastChangedActive y eliminar changedActive de syncGroups
Co-authored-by: aider (openrouter/openai/gpt-5) <aider@aider.chat>
1 week ago
brobert 28264f9369 feat: sincronizar solo grupos cambiados y aprender usuario al mensaje
Co-authored-by: aider (openrouter/openai/gpt-5) <aider@aider.chat>
1 week ago

@ -209,9 +209,14 @@ export class WebhookServer {
console.log(' Handling groups upsert event:', { rawEvent: evt });
}
try {
await GroupSyncService.syncGroups();
const res = await GroupSyncService.syncGroups();
GroupSyncService.refreshActiveGroupsCache();
const changed = GroupSyncService.getLastChangedActive();
if (changed.length > 0) {
await GroupSyncService.syncMembersForGroups(changed);
} else {
await GroupSyncService.syncMembersForActiveGroups();
}
} catch (e) {
console.error('❌ Error handling groups.upsert:', e);
}
@ -435,15 +440,14 @@ export class WebhookServer {
return;
}
if (process.env.NODE_ENV !== 'test') {
console.log(' Group not active in cache — ensuring group and triggering quick members sync');
console.log(' Group not active in cache — ensuring group (no immediate members sync)');
}
try {
GroupSyncService.ensureGroupExists(data.key.remoteJid);
GroupSyncService.refreshActiveGroupsCache();
await GroupSyncService.syncMembersForGroup(data.key.remoteJid);
try { GroupSyncService.upsertMemberSeen(data.key.remoteJid, normalizedSenderId); } catch {}
} catch (e) {
if (process.env.NODE_ENV !== 'test') {
console.error('⚠️ Failed to ensure/sync group on-the-fly:', e);
console.error('⚠️ Failed to ensure group on-the-fly:', e);
}
}
}

@ -74,6 +74,8 @@ export class GroupSyncService {
private static _membersSchedulerRunning = false;
private static _groupsIntervalMs: number | null = null;
private static _groupsNextTickAt: number | null = null;
private static _membersGlobalCooldownUntil: number = 0;
private static _lastChangedActive: string[] = [];
static async syncGroups(force: boolean = false): Promise<{ added: number; updated: number }> {
if (!this.shouldSync(force)) {
@ -82,6 +84,7 @@ export class GroupSyncService {
const startedAt = Date.now();
Metrics.inc('sync_runs_total');
let newlyActivatedIds: string[] = [];
try {
const groups = await this.fetchGroupsFromAPI();
console.log(' Grupos crudos de la API:', JSON.stringify(groups, null, 2));
@ -106,6 +109,17 @@ export class GroupSyncService {
afterMap.set(String(r.id), { active: Number(r.active || 0), archived: Number(r.archived || 0), is_community: Number((r as any).is_community || 0), name: r.name ? String(r.name) : null });
}
// Determinar grupos que pasaron a estar activos (nuevos o reactivados)
const newlyActivatedLocal: string[] = [];
for (const [id, a] of afterMap.entries()) {
const b = beforeMap.get(id);
const becameActive = Number(a.active) === 1 && Number(a.archived) === 0 && Number((a as any).is_community || 0) === 0;
if (becameActive && (!b || Number(b.active) !== 1)) {
newlyActivatedLocal.push(id);
}
}
newlyActivatedIds = newlyActivatedLocal;
const newlyDeactivated: Array<{ id: string; name: string | null }> = [];
for (const [id, b] of beforeMap.entries()) {
const a = afterMap.get(id);
@ -176,6 +190,8 @@ export class GroupSyncService {
// Duración opcional
Metrics.set('last_sync_duration_ms', Date.now() - (typeof startedAt !== 'undefined' ? startedAt : Date.now()));
// Guardar lista de grupos que han pasado a activos para consumo externo
this._lastChangedActive = Array.isArray(newlyActivatedIds) ? newlyActivatedIds : [];
return result;
} catch (error) {
console.error('Group sync failed:', error);
@ -485,6 +501,13 @@ export class GroupSyncService {
// Fetch members for a single group from Evolution API. Uses a robust parser to accept multiple payload shapes.
private static async fetchGroupMembersFromAPI(groupId: string): Promise<Array<{ userId: string; isAdmin: boolean }>> {
// Cooldown global por rate limit 429 (evitar ráfagas)
try {
if (this._membersGlobalCooldownUntil && Date.now() < this._membersGlobalCooldownUntil) {
console.warn('⚠️ Skipping members fetch due to global cooldown');
return [];
}
} catch {}
// En tests se recomienda simular fetch; no retornamos temprano para permitir validar el parser
// 1) Intento preferente: endpoint de Evolution "Find Group Members"
@ -560,6 +583,11 @@ export class GroupSyncService {
console.warn('⚠️ /group/participants responded without participants array, falling back to fetchAllGroups');
} else {
const body = await r1.text().catch(() => '');
if (r1.status === 429) {
console.warn(`⚠️ /group/participants rate-limited (429): ${body.slice(0, 200)}`);
this._membersGlobalCooldownUntil = Date.now() + 2 * 60 * 1000;
return [];
}
console.warn(`⚠️ /group/participants failed: ${r1.status} ${r1.statusText} - ${body.slice(0, 200)}. Falling back to fetchAllGroups`);
}
} catch (e) {
@ -578,6 +606,11 @@ export class GroupSyncService {
});
if (!response.ok) {
const body = await response.text().catch(() => '');
if (response.status === 429) {
console.warn(`⚠️ fetchAllGroups(getParticipants=true) rate-limited (429): ${body.slice(0, 200)}`);
this._membersGlobalCooldownUntil = Date.now() + 2 * 60 * 1000;
return [];
}
throw new Error(`Failed to fetch groups with participants: ${response.status} ${response.statusText} - ${body.slice(0,200)}`);
}
const raw = await response.text();
@ -650,6 +683,22 @@ export class GroupSyncService {
return resolved;
}
/**
* Upsert optimista de la membresía a partir de un mensaje recibido en el grupo.
* Marca al usuario como activo y actualiza last_seen_at sin consultar Evolution API.
*/
static upsertMemberSeen(groupId: string, userId: string, nowIso?: string): void {
if (!groupId || !userId) return;
const now = nowIso || new Date().toISOString().replace('T', ' ').replace('Z', '');
this.dbInstance.prepare(`
INSERT INTO group_members (group_id, user_id, is_admin, is_active, first_seen_at, last_seen_at)
VALUES (?, ?, 0, 1, ?, ?)
ON CONFLICT(group_id, user_id) DO UPDATE SET
is_active = 1,
last_seen_at = excluded.last_seen_at
`).run(groupId, userId, now, now);
}
/**
* Reconciles current DB membership state for a group with a fresh snapshot.
* Idempotente y atómico por grupo.
@ -921,10 +970,57 @@ export class GroupSyncService {
return { groups, added, updated, deactivated };
}
static async syncMembersForGroups(ids: string[]): Promise<{ groups: number; added: number; updated: number; deactivated: number }> {
if (process.env.NODE_ENV === 'test') {
return { groups: 0, added: 0, updated: 0, deactivated: 0 };
}
if (!Array.isArray(ids) || ids.length === 0) {
return { groups: 0, added: 0, updated: 0, deactivated: 0 };
}
const mode = String(process.env.GROUP_GATING_MODE || 'off').toLowerCase();
const enforce = mode === 'enforce';
if (enforce) {
try { (AllowedGroups as any).dbInstance = this.dbInstance; } catch {}
}
let groups = 0, added = 0, updated = 0, deactivated = 0;
for (const groupId of ids) {
try {
if (enforce) {
try {
if (!AllowedGroups.isAllowed(groupId)) {
try { Metrics.inc('sync_skipped_group_total'); } catch {}
continue;
}
} catch {}
}
const snapshot = await this.fetchGroupMembersFromAPI(groupId);
const res = this.reconcileGroupMembers(groupId, snapshot);
groups++;
added += res.added;
updated += res.updated;
deactivated += res.deactivated;
} catch (e) {
console.error(`❌ Failed to sync members for group ${groupId}:`, e instanceof Error ? e.message : String(e));
}
}
console.log(' Targeted members sync summary:', { groups, added, updated, deactivated });
return { groups, added, updated, deactivated };
}
public static refreshActiveGroupsCache(): void {
this.cacheActiveGroups();
}
public static getLastChangedActive(): string[] {
try {
return Array.from(this._lastChangedActive || []);
} catch {
return [];
}
}
public static startGroupsScheduler(): void {
if (process.env.NODE_ENV === 'test') return;
if (this._groupsSchedulerRunning) return;

@ -70,13 +70,29 @@ export class WebhookManager {
}
private static getConfig(): { webhook: WebhookConfig } {
const fromEnv = String(process.env.WEBHOOK_EVENTS || '')
.split(',')
.map(s => s.trim())
.filter(Boolean);
const allow = [
'APPLICATION_STARTUP',
'MESSAGES_UPSERT',
'GROUPS_UPSERT',
'MESSAGES_UPDATE',
'MESSAGES_DELETE',
'PRESENCE_UPDATE',
'CONTACTS_UPDATE',
'CHATS_UPDATE'
];
const events = (fromEnv.length ? fromEnv : ['APPLICATION_STARTUP','MESSAGES_UPSERT','GROUPS_UPSERT'])
.filter(e => allow.includes(e));
return {
webhook: {
url: process.env.WEBHOOK_URL!,
enabled: true,
webhook_by_events: true,
webhook_base64: true,
events: this.REQUIRED_EVENTS,
events
}
};
}

Loading…
Cancel
Save