You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1308 lines
48 KiB
TypeScript

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import type { Database } from 'bun:sqlite';
import { db, ensureUserExists } from '../db';
import { normalizeWhatsAppId } from '../utils/whatsapp';
import { Metrics } from './metrics';
import { IdentityService } from './identity';
import { AllowedGroups } from './allowed-groups';
import { ResponseQueue } from './response-queue';
// In-memory cache for active groups
// const activeGroupsCache = new Map<string, string>(); // groupId -> groupName
/**
* Represents a group from the Evolution API response
*
* API returns an array of groups in this format:
* [
* {
* id: string, // Group ID in @g.us format (primary key)
* subject: string, // Group name (displayed to users)
* linkedParent?: string, // Parent community ID if group belongs to one
* size?: number, // Current member count (unused in our system)
* creation?: number, // Unix timestamp of group creation (unused)
* desc?: string, // Group description text (unused)
* // ...other fields exist but are ignored by our implementation
* }
* ]
*
* Required fields for our implementation:
* - id (used as database primary key)
* - subject (used as group display name)
* - linkedParent (used for community filtering)
*/
type EvolutionGroup = {
id: string;
subject: string;
linkedParent?: string;
};
export class GroupSyncService {
// Static property for DB instance injection (defaults to global db)
static dbInstance: Database = db;
// In-memory cache for active groups (made public for tests)
public static readonly activeGroupsCache = new Map<string, string>(); // groupId -> groupName
/**
* Gets the sync interval duration in milliseconds.
*
* Priority:
* 1. GROUP_SYNC_INTERVAL_MS environment variable if set
* 2. Default 24 hour interval
*
* In development mode, enforces minimum 10 second interval
* to prevent accidental excessive API calls.
*
* @returns {number} Sync interval in milliseconds
*/
private static get SYNC_INTERVAL_MS(): number {
const interval = process.env.GROUP_SYNC_INTERVAL_MS
? Number(process.env.GROUP_SYNC_INTERVAL_MS)
: 24 * 60 * 60 * 1000; // Default 24 hours
// Ensure minimum 10 second interval in development
if (process.env.NODE_ENV === 'development' && interval < 10000) {
console.warn(`Sync interval too low (${interval}ms), using 10s minimum`);
return 10000;
}
return interval;
}
private static lastSyncAttempt = 0;
private static _groupsTimer: any = null;
private static _groupsSchedulerRunning = false;
private static _membersTimer: any = null;
private static _membersSchedulerRunning = false;
private static _groupsIntervalMs: number | null = null;
private static _groupsNextTickAt: number | null = null;
private static _membersGlobalCooldownUntil: number = 0;
private static _lastChangedActive: string[] = [];
static async syncGroups(force: boolean = false): Promise<{ added: number; updated: number }> {
if (!this.shouldSync(force)) {
return { added: 0, updated: 0 };
}
const startedAt = Date.now();
Metrics.inc('sync_runs_total');
let newlyActivatedIds: string[] = [];
try {
const groups = await this.fetchGroupsFromAPI();
console.log(' Grupos crudos de la API:', JSON.stringify(groups, null, 2));
console.log(' Sin filtrar por comunidad (modo multicomunidad). Total grupos:', groups.length);
const dbGroupsBefore = this.dbInstance.prepare('SELECT id, active, COALESCE(archived,0) AS archived, COALESCE(is_community,0) AS is_community, name FROM groups').all();
console.log(' Grupos en DB antes de upsert:', dbGroupsBefore);
const result = await this.upsertGroups(groups);
const dbGroupsAfter = this.dbInstance.prepare('SELECT id, active, COALESCE(archived,0) AS archived, COALESCE(is_community,0) AS is_community, name FROM groups').all();
console.log(' Grupos en DB después de upsert:', dbGroupsAfter);
// Detectar grupos que pasaron de activos a inactivos (y no están archivados) en este sync
try {
const beforeMap = new Map<string, { active: number; archived: number; is_community: number; name?: string | null }>();
for (const r of dbGroupsBefore as any[]) {
beforeMap.set(String(r.id), { active: Number(r.active || 0), archived: Number(r.archived || 0), is_community: Number((r as any).is_community || 0), name: r.name ? String(r.name) : null });
}
const afterMap = new Map<string, { active: number; archived: number; is_community: number; name?: string | null }>();
for (const r of dbGroupsAfter as any[]) {
afterMap.set(String(r.id), { active: Number(r.active || 0), archived: Number(r.archived || 0), is_community: Number((r as any).is_community || 0), name: r.name ? String(r.name) : null });
}
// Determinar grupos que pasaron a estar activos (nuevos o reactivados)
const newlyActivatedLocal: string[] = [];
for (const [id, a] of afterMap.entries()) {
const b = beforeMap.get(id);
const becameActive = Number(a.active) === 1 && Number(a.archived) === 0 && Number((a as any).is_community || 0) === 0;
if (becameActive && (!b || Number(b.active) !== 1)) {
newlyActivatedLocal.push(id);
}
}
newlyActivatedIds = newlyActivatedLocal;
const newlyDeactivated: Array<{ id: string; name: string | null }> = [];
for (const [id, b] of beforeMap.entries()) {
const a = afterMap.get(id);
if (!a) continue;
if (Number(b.active) === 1 && Number(a.active) === 0 && Number(a.archived) === 0 && Number(a.is_community || 0) === 0 && Number(b.is_community || 0) === 0) {
newlyDeactivated.push({ id, name: a.name ?? b.name ?? null });
}
}
if (newlyDeactivated.length > 0) {
// Revocar tokens y desactivar membresía para estos grupos
this.dbInstance.transaction(() => {
for (const g of newlyDeactivated) {
this.dbInstance.prepare(`
UPDATE calendar_tokens
SET revoked_at = strftime('%Y-%m-%d %H:%M:%f','now')
WHERE group_id = ? AND revoked_at IS NULL
`).run(g.id);
this.dbInstance.prepare(`
UPDATE group_members
SET is_active = 0
WHERE group_id = ? AND is_active = 1
`).run(g.id);
}
})();
// Notificar a admins (omitir en tests)
if (String(process.env.NODE_ENV || '').toLowerCase() !== 'test') {
const adminSet = new Set<string>();
const rawAdmins = String(process.env.ADMIN_USERS || '');
for (const token of rawAdmins.split(',').map(s => s.trim()).filter(Boolean)) {
const n = normalizeWhatsAppId(token);
if (n) adminSet.add(n);
}
const admins = Array.from(adminSet);
if (admins.length > 0) {
const messages = [];
const makeMsg = (g: { id: string; name: string | null }) => {
const label = g.name ? `${g.name} (${g.id})` : g.id;
return `⚠️ El grupo ${label} parece haber dejado de existir o no está disponible.\n\nAcciones disponibles:\n- Archivar (recomendado): /admin archivar-grupo ${g.id}\n- Borrar definitivamente: /admin borrar-grupo ${g.id}`;
};
for (const g of newlyDeactivated) {
const msg = makeMsg(g);
for (const admin of admins) {
messages.push({ recipient: admin, message: msg });
}
}
if (messages.length > 0) {
try { await ResponseQueue.add(messages as any); } catch (e) { console.warn('No se pudo encolar notificación a admins:', e); }
}
}
}
}
} catch (e) {
console.warn('⚠️ Error al procesar grupos desactivados para notificación/limpieza:', e);
}
// Completar labels faltantes en allowed_groups usando todos los grupos devueltos por la API
try { (AllowedGroups as any).dbInstance = this.dbInstance; this.fillMissingAllowedGroupLabels(groups); } catch {}
// Actualizar métricas
this.cacheActiveGroups();
Metrics.set('active_groups', this.activeGroupsCache.size);
const rowM = this.dbInstance.prepare(`SELECT COUNT(*) AS c FROM group_members WHERE is_active = 1`).get() as any;
Metrics.set('active_members', Number(rowM?.c || 0));
Metrics.set('last_sync_timestamp_seconds', Math.floor(Date.now() / 1000));
Metrics.set('last_sync_ok', 1);
// Duración opcional
Metrics.set('last_sync_duration_ms', Date.now() - (typeof startedAt !== 'undefined' ? startedAt : Date.now()));
// Guardar lista de grupos que han pasado a activos para consumo externo
this._lastChangedActive = Array.isArray(newlyActivatedIds) ? newlyActivatedIds : [];
return result;
} catch (error) {
console.error('Group sync failed:', error);
Metrics.inc('sync_errors_total');
Metrics.set('last_sync_ok', 0);
throw error;
} finally {
this.lastSyncAttempt = Date.now();
}
}
private static shouldSync(force: boolean = false): boolean {
if (force) return true;
const timeSinceLastSync = Date.now() - this.lastSyncAttempt;
const shouldSync = timeSinceLastSync > this.SYNC_INTERVAL_MS;
if (!shouldSync) {
const nextSyncIn = this.SYNC_INTERVAL_MS - timeSinceLastSync;
console.debug(`Next sync available in ${Math.round(nextSyncIn / 1000)} seconds`);
}
return shouldSync;
}
private static async fetchGroupsFromAPI(): Promise<EvolutionGroup[]> {
const url = `${process.env.EVOLUTION_API_URL}/group/fetchAllGroups/${process.env.EVOLUTION_API_INSTANCE}?getParticipants=false`;
console.log(' Fetching groups from API:', {
url: `${url}...`, // Log partial URL for security
communityId: process.env.WHATSAPP_COMMUNITY_ID,
time: new Date().toISOString()
});
try {
const response = await fetch(url, {
method: 'GET',
headers: {
apikey: process.env.EVOLUTION_API_KEY,
},
httpVersion: '2',
timeout: 320000 // 120 second timeout
});
if (!response.ok) {
const errorBody = await response.text().catch(() => 'Unable to read error body');
console.error('❌ API request failed:', {
status: response.status,
statusText: response.statusText,
headers: Object.fromEntries(response.headers.entries()),
body: errorBody
});
throw new Error(`API request failed: ${response.status} ${response.statusText}`);
}
const rawResponse = await response.text();
console.log(' Raw API response length:', rawResponse.length);
// Parse response which could be either:
// 1. Direct array of groups: [{group1}, {group2}]
// 2. Or wrapped response: {status, message, response}
let groups;
try {
const parsed = JSON.parse(rawResponse);
if (Array.isArray(parsed)) {
// Case 1: Direct array response
groups = parsed;
console.log(' Received direct array of', groups.length, 'groups');
} else if (parsed.response && Array.isArray(parsed.response)) {
// Case 2: Wrapped response
if (parsed.status !== 'success') {
throw new Error(`API error: ${parsed.message || 'Unknown error'}`);
}
groups = parsed.response;
console.log(' Received wrapped response with', groups.length, 'groups');
} else {
throw new Error('Invalid API response format - expected array or wrapped response');
}
} catch (e) {
console.error('❌ Failed to parse API response:', {
error: e instanceof Error ? e.message : String(e),
responseSample: rawResponse.substring(0, 100) + '...'
});
throw e;
}
if (!groups.length) {
console.warn('⚠️ API returned empty group list');
}
return groups;
} catch (error) {
console.error('❌ Failed to fetch groups:', {
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined
});
throw error;
}
}
private static cacheActiveGroups(): void {
const groups = this.dbInstance.prepare('SELECT id, name FROM groups WHERE active = TRUE AND COALESCE(is_community,0) = 0 AND COALESCE(archived,0) = 0').all();
this.activeGroupsCache.clear();
for (const group of groups) {
this.activeGroupsCache.set(group.id, group.name);
}
console.log(`Cached ${this.activeGroupsCache.size} active groups`);
}
// Rellena labels faltantes en allowed_groups a partir de los grupos devueltos por la API.
private static fillMissingAllowedGroupLabels(allGroups: EvolutionGroup[]): number {
try {
if (!Array.isArray(allGroups) || allGroups.length === 0) return 0;
const nameById = new Map<string, string>();
for (const g of allGroups) {
// Omitir grupos "comunidad/announce" no operativos
const isComm = !!((g as any)?.isCommunity || (g as any)?.is_community || (g as any)?.isCommunityAnnounce || (g as any)?.is_community_announce);
if (isComm) continue;
if (!g?.id) continue;
const name = String(g.subject || '').trim();
if (!name) continue;
nameById.set(String(g.id), name);
}
if (nameById.size === 0) return 0;
const rows = this.dbInstance.prepare(`
SELECT group_id AS id
FROM allowed_groups
WHERE label IS NULL OR TRIM(label) = ''
`).all() as any[];
if (!rows || rows.length === 0) return 0;
let filled = 0;
for (const r of rows) {
const id = r?.id ? String(r.id) : null;
if (!id) continue;
const label = nameById.get(id);
if (label) {
try { (AllowedGroups as any).dbInstance = this.dbInstance; AllowedGroups.upsertPending(id, label, null); } catch {}
filled++;
}
}
if (filled > 0) {
try { Metrics.inc('allowed_groups_labels_filled_total', filled); } catch {}
console.log(` Rellenadas ${filled} labels faltantes en allowed_groups`);
}
return filled;
} catch (e) {
console.warn('⚠️ No se pudieron rellenar labels faltantes en allowed_groups:', e);
return 0;
}
}
private static getActiveGroupsCount(): number {
const result = this.dbInstance.prepare('SELECT COUNT(*) as count FROM groups WHERE active = TRUE AND COALESCE(is_community,0) = 0 AND COALESCE(archived,0) = 0').get();
return result?.count || 0;
}
static async checkInitialGroups(): Promise<void> {
const count = this.getActiveGroupsCount();
if (count > 0) {
this.cacheActiveGroups();
console.log(`✅ Using ${count} existing groups from database`);
return;
}
const communityId = process.env.WHATSAPP_COMMUNITY_ID;
if (!communityId) {
console.log(' WHATSAPP_COMMUNITY_ID no definido - mostrando comunidades disponibles');
try {
const allGroups = await this.fetchGroupsFromAPI();
const communities = allGroups.filter(g => g.linkedParent);
if (communities.length === 0) {
console.log('❌ No se encontraron comunidades (grupos con linkedParent)');
} else {
console.log('\n📋 Comunidades disponibles (copia el ID completo):');
console.log('='.repeat(80));
console.log('Nombre'.padEnd(30), 'ID Comunidad');
console.log('-'.repeat(30), '-'.repeat(48));
communities.forEach(c => {
console.log(c.subject.padEnd(30), c.id);
});
console.log('='.repeat(80));
console.log('⚠️ ATENCIÓN: Estos IDs son sensibles. No los compartas públicamente.');
console.log(`\n⏳ El proceso terminará automáticamente en 120 segundos...`);
// Cuenta regresiva de 120 segundos
await new Promise(resolve => {
setTimeout(resolve, 120000);
const interval = setInterval(() => {
const remaining = Math.ceil((120000 - (Date.now() - startTime)) / 1000);
process.stdout.write(`\r⏳ Tiempo restante: ${remaining}s `);
}, 1000);
const startTime = Date.now();
});
console.log('\n\n✅ Listado completado. Por favor configura WHATSAPP_COMMUNITY_ID');
}
process.exit(0);
} catch (error) {
console.error('❌ Error al obtener comunidades:', error instanceof Error ? error.message : error);
process.exit(1);
}
}
console.log('⚠️ No groups found in database - performing initial sync');
try {
const { added } = await this.syncGroups();
if (added === 0) {
throw new Error('Initial group sync completed but no groups were added');
}
this.cacheActiveGroups();
console.log(`✅ Initial group sync completed - added ${added} groups`);
} catch (error) {
console.error('❌ Critical: Initial group sync failed - no groups available');
console.error(error instanceof Error ? error.message : 'Unknown error');
process.exit(1);
}
}
private static async upsertGroups(groups: EvolutionGroup[]): Promise<{ added: number; updated: number }> {
let added = 0;
let updated = 0;
const transactionResult = this.dbInstance.transaction(() => {
// First mark all groups as inactive and update verification timestamp
const inactiveResult = this.dbInstance.prepare(`
UPDATE groups
SET active = FALSE,
last_verified = CURRENT_TIMESTAMP
WHERE active = TRUE
`).run();
console.log(' Grupos marcados como inactivos:', {
count: inactiveResult.changes,
lastId: inactiveResult.lastInsertRowid
});
for (const group of groups) {
const existing = this.dbInstance.prepare('SELECT 1 FROM groups WHERE id = ?').get(group.id);
console.log('Checking group:', group.id, 'exists:', !!existing);
const isCommunityFlag = !!(((group as any)?.isCommunity) || ((group as any)?.is_community) || ((group as any)?.isCommunityAnnounce) || ((group as any)?.is_community_announce));
if (existing) {
const updateResult = this.dbInstance.prepare(
'UPDATE groups SET name = ?, community_id = COALESCE(?, community_id), is_community = ?, active = TRUE, last_verified = CURRENT_TIMESTAMP WHERE id = ?'
).run(group.subject, group.linkedParent || null, isCommunityFlag ? 1 : 0, group.id);
console.log('Updated group:', group.id, 'result:', updateResult);
updated++;
} else {
const insertResult = this.dbInstance.prepare(
'INSERT INTO groups (id, community_id, name, active, is_community) VALUES (?, ?, ?, TRUE, ?)'
).run(group.id, (group.linkedParent ?? ''), group.subject, isCommunityFlag ? 1 : 0);
console.log('Added group:', group.id, 'result:', insertResult);
added++;
}
// Propagar subject a allowed_groups:
// - Si es grupo "comunidad/announce", bloquearlo.
// - En caso contrario, upsert pending y label.
try {
(AllowedGroups as any).dbInstance = this.dbInstance;
if (isCommunityFlag) {
AllowedGroups.setStatus(group.id, 'blocked', group.subject);
} else {
AllowedGroups.upsertPending(group.id, group.subject, null);
}
} catch {}
// Si es grupo de comunidad, limpiar residuos: revocar tokens y desactivar membresías
if (isCommunityFlag) {
try {
this.dbInstance.prepare(`
UPDATE calendar_tokens
SET revoked_at = strftime('%Y-%m-%d %H:%M:%f','now')
WHERE group_id = ? AND revoked_at IS NULL
`).run(group.id);
} catch {}
try {
this.dbInstance.prepare(`
UPDATE group_members
SET is_active = 0
WHERE group_id = ? AND is_active = 1
`).run(group.id);
} catch {}
}
}
return { added, updated };
});
try {
const result = transactionResult();
console.log(`Group sync completed: ${result.added} added, ${result.updated} updated`);
return result;
} catch (error) {
console.error('Error in upsertGroups:', error);
throw error;
}
}
/**
* Checks if a given group ID is active based on the in-memory cache.
*
* @param groupId The group ID to check (e.g., '123456789@g.us').
* @returns True if the group is active, false otherwise.
*/
static isGroupActive(groupId: string): boolean {
return this.activeGroupsCache.has(groupId);
}
// Fetch members for a single group from Evolution API. Uses a robust parser to accept multiple payload shapes.
private static async fetchGroupMembersFromAPI(groupId: string): Promise<Array<{ userId: string; isAdmin: boolean }>> {
// Cooldown global por rate limit 429 (evitar ráfagas)
try {
if (this._membersGlobalCooldownUntil && Date.now() < this._membersGlobalCooldownUntil) {
console.warn('⚠️ Skipping members fetch due to global cooldown');
return [];
}
} catch {}
// En tests se recomienda simular fetch; no retornamos temprano para permitir validar el parser
// 1) Intento preferente: endpoint de Evolution "Find Group Members"
// Documentación provista: GET /group/participants/{instance}
// Suponemos soporte de query param groupJid
try {
const url1 = `${process.env.EVOLUTION_API_URL}/group/participants/${process.env.EVOLUTION_API_INSTANCE}?groupJid=${encodeURIComponent(groupId)}`;
console.log(' Fetching members via /group/participants:', { groupId });
const r1 = await fetch(url1, {
method: 'GET',
headers: { apikey: process.env.EVOLUTION_API_KEY },
httpVersion: '2',
timeout: 320000
});
if (r1.ok) {
const raw1 = await r1.text();
let parsed1: any;
try {
parsed1 = JSON.parse(raw1);
} catch (e) {
console.error('❌ Failed to parse /group/participants JSON:', String(e));
throw e;
}
const participantsArr = Array.isArray(parsed1?.participants) ? parsed1.participants : null;
if (participantsArr) {
const result: Array<{ userId: string; isAdmin: boolean }> = [];
for (const p of participantsArr) {
let jid: string | null = null;
let isAdmin = false;
if (typeof p === 'string') {
jid = p;
} else if (p && typeof p === 'object') {
const rawId = p.id || p?.user?.id || p.user || null;
const rawJid = p.jid || null; // preferir .jid cuando exista
jid = rawJid || rawId || null;
// Aprender mapping alias→número si vienen ambos
if (rawId && rawJid) {
try { IdentityService.upsertAlias(String(rawId), String(rawJid), 'group.participants'); } catch {}
}
if (typeof p.isAdmin === 'boolean') {
isAdmin = p.isAdmin;
} else if (typeof p.admin === 'string') {
isAdmin = p.admin === 'admin' || p.admin === 'superadmin';
} else if (typeof p.role === 'string') {
isAdmin = p.role.toLowerCase().includes('admin');
}
}
let norm = normalizeWhatsAppId(jid);
if (!norm) {
const digits = (jid || '').replace(/\D+/g, '');
norm = digits || null;
}
if (!norm) continue;
result.push({ userId: norm, isAdmin });
}
let resolved: Array<{ userId: string; isAdmin: boolean }>;
try {
const map = IdentityService.resolveMany(result.map(r => r.userId));
resolved = result.map(r => ({ userId: map.get(r.userId) || r.userId, isAdmin: r.isAdmin }));
} catch {
resolved = result;
}
return resolved;
}
// Si no viene en el formato esperado, caemos al plan B
console.warn('⚠️ /group/participants responded without participants array, falling back to fetchAllGroups');
} else {
const body = await r1.text().catch(() => '');
if (r1.status === 429) {
console.warn(`⚠️ /group/participants rate-limited (429): ${body.slice(0, 200)}`);
this._membersGlobalCooldownUntil = Date.now() + 2 * 60 * 1000;
return [];
}
console.warn(`⚠️ /group/participants failed: ${r1.status} ${r1.statusText} - ${body.slice(0, 200)}. Falling back to fetchAllGroups`);
}
} catch (e) {
console.warn('⚠️ Error calling /group/participants, falling back to fetchAllGroups:', e instanceof Error ? e.message : String(e));
}
// 2) Fallback robusto: fetchAllGroups(getParticipants=true) y filtrar por groupId
const url = `${process.env.EVOLUTION_API_URL}/group/fetchAllGroups/${process.env.EVOLUTION_API_INSTANCE}?getParticipants=true`;
console.log(' Fetching members via fetchAllGroups (participants=true):', { groupId });
const response = await fetch(url, {
method: 'GET',
headers: { apikey: process.env.EVOLUTION_API_KEY },
httpVersion: '2',
timeout: 320000
});
if (!response.ok) {
const body = await response.text().catch(() => '');
if (response.status === 429) {
console.warn(`⚠️ fetchAllGroups(getParticipants=true) rate-limited (429): ${body.slice(0, 200)}`);
this._membersGlobalCooldownUntil = Date.now() + 2 * 60 * 1000;
return [];
}
throw new Error(`Failed to fetch groups with participants: ${response.status} ${response.statusText} - ${body.slice(0,200)}`);
}
const raw = await response.text();
let parsed: any;
try {
parsed = JSON.parse(raw);
} catch (e) {
console.error('❌ Failed to parse members response JSON:', String(e));
throw e;
}
let groups: any[] = [];
if (Array.isArray(parsed)) {
groups = parsed;
} else if (parsed && Array.isArray(parsed.response)) {
groups = parsed.response;
} else {
throw new Error('Invalid response format for groups with participants');
}
const g = groups.find((g: any) => g?.id === groupId);
if (!g) {
console.warn(`⚠️ Group ${groupId} not present in fetchAllGroups(getParticipants=true) response`);
return [];
}
const participants = Array.isArray(g.participants) ? g.participants : [];
const result: Array<{ userId: string; isAdmin: boolean }> = [];
for (const p of participants) {
let jid: string | null = null;
let isAdmin = false;
if (typeof p === 'string') {
jid = p;
} else if (p && typeof p === 'object') {
const rawId = p.id || p?.user?.id || p.user || null;
const rawJid = p.jid || null; // preferir .jid cuando exista
jid = rawJid || rawId || null;
// Aprender mapping alias→número si vienen ambos
if (rawId && rawJid) {
try { IdentityService.upsertAlias(String(rawId), String(rawJid), 'group.participants'); } catch {}
}
if (typeof p.isAdmin === 'boolean') {
isAdmin = p.isAdmin;
} else if (typeof p.admin === 'string') {
// common shapes: 'admin', 'superadmin' or null
isAdmin = p.admin === 'admin' || p.admin === 'superadmin';
} else if (typeof p.role === 'string') {
isAdmin = p.role.toLowerCase().includes('admin');
}
}
let norm = normalizeWhatsAppId(jid);
if (!norm) {
const digits = (jid || '').replace(/\D+/g, '');
norm = digits || null;
}
if (!norm) continue;
result.push({ userId: norm, isAdmin });
}
let resolved: Array<{ userId: string; isAdmin: boolean }>;
try {
const map = IdentityService.resolveMany(result.map(r => r.userId));
resolved = result.map(r => ({ userId: map.get(r.userId) || r.userId, isAdmin: r.isAdmin }));
} catch {
resolved = result;
}
return resolved;
}
/**
* Upsert optimista de la membresía a partir de un mensaje recibido en el grupo.
* Marca al usuario como activo y actualiza last_seen_at sin consultar Evolution API.
*/
static upsertMemberSeen(groupId: string, userId: string, nowIso?: string): void {
if (!groupId || !userId) return;
const now = nowIso || new Date().toISOString().replace('T', ' ').replace('Z', '');
this.dbInstance.prepare(`
INSERT INTO group_members (group_id, user_id, is_admin, is_active, first_seen_at, last_seen_at)
VALUES (?, ?, 0, 1, ?, ?)
ON CONFLICT(group_id, user_id) DO UPDATE SET
is_active = 1,
last_seen_at = excluded.last_seen_at
`).run(groupId, userId, now, now);
}
/**
* Reconciles current DB membership state for a group with a fresh snapshot.
* Idempotente y atómico por grupo.
*/
static reconcileGroupMembers(groupId: string, snapshot: Array<{ userId: string; isAdmin: boolean }>, nowIso?: string): { added: number; updated: number; deactivated: number } {
if (!groupId || !Array.isArray(snapshot)) {
throw new Error('Invalid arguments for reconcileGroupMembers');
}
const now = nowIso || new Date().toISOString().replace('T', ' ').replace('Z', '');
let added = 0, updated = 0, deactivated = 0;
// Build quick lookup from snapshot
const incoming = new Map<string, { isAdmin: boolean }>();
for (const m of snapshot) {
if (!m?.userId) continue;
incoming.set(m.userId, { isAdmin: !!m.isAdmin });
}
this.dbInstance.transaction(() => {
// Load existing membership for group
const existingRows = this.dbInstance.prepare(`
SELECT user_id, is_admin, is_active
FROM group_members
WHERE group_id = ?
`).all(groupId) as Array<{ user_id: string; is_admin: number; is_active: number }>;
const existing = new Map(existingRows.map(r => [r.user_id, { isAdmin: !!r.is_admin, isActive: !!r.is_active }]));
// Upsert present members
for (const [userId, { isAdmin }] of incoming.entries()) {
// Ensure user exists (FK)
ensureUserExists(userId, this.dbInstance);
const row = existing.get(userId);
if (!row) {
// insert
this.dbInstance.prepare(`
INSERT INTO group_members (group_id, user_id, is_admin, is_active, first_seen_at, last_seen_at)
VALUES (?, ?, ?, 1, ?, ?)
`).run(groupId, userId, isAdmin ? 1 : 0, now, now);
added++;
} else {
// update if needed
let roleChanged = row.isAdmin !== isAdmin;
if (!row.isActive || roleChanged) {
this.dbInstance.prepare(`
UPDATE group_members
SET is_active = 1,
is_admin = ?,
last_seen_at = ?,
last_role_change_at = CASE WHEN ? THEN ? ELSE last_role_change_at END
WHERE group_id = ? AND user_id = ?
`).run(isAdmin ? 1 : 0, now, roleChanged ? 1 : 0, roleChanged ? now : null, groupId, userId);
updated++;
} else {
// still update last_seen_at to reflect presence
this.dbInstance.prepare(`
UPDATE group_members
SET last_seen_at = ?
WHERE group_id = ? AND user_id = ?
`).run(now, groupId, userId);
}
}
}
// Deactivate absent members
for (const [userId, state] of existing.entries()) {
if (!incoming.has(userId) && state.isActive) {
this.dbInstance.prepare(`
UPDATE group_members
SET is_active = 0,
last_seen_at = ?
WHERE group_id = ? AND user_id = ?
`).run(now, groupId, userId);
deactivated++;
}
}
})();
try { this.computeAndPublishAliasCoverage(groupId); } catch {}
return { added, updated, deactivated };
}
private static computeAndPublishAliasCoverage(groupId: string): void {
try {
const rows = this.dbInstance.prepare(`
SELECT user_id
FROM group_members
WHERE group_id = ? AND is_active = 1
`).all(groupId) as Array<{ user_id: string }>;
const total = rows.length;
if (total === 0) {
try { Metrics.set('alias_coverage_ratio', 1, { group_id: groupId }); } catch {}
return;
}
let resolvable = 0;
for (const r of rows) {
const uid = String(r.user_id || '');
if (/^\d+$/.test(uid)) {
resolvable++;
continue;
}
try {
const resolved = IdentityService.resolveAliasOrNull(uid);
if (resolved && /^\d+$/.test(resolved)) {
resolvable++;
}
} catch {}
}
const ratio = Math.max(0, Math.min(1, total > 0 ? resolvable / total : 1));
try { Metrics.set('alias_coverage_ratio', ratio, { group_id: groupId }); } catch {}
// A3: publicación condicional del mensaje de onboarding (sin spam)
try {
// Flags y parámetros
const isTest = String(process.env.NODE_ENV || '').toLowerCase() === 'test';
const enabled =
isTest
? String(process.env.ONBOARDING_ENABLE_IN_TEST || '').toLowerCase() === 'true'
: (() => {
const v = process.env.ONBOARDING_PROMPTS_ENABLED;
return v == null ? true : ['true', '1', 'yes'].includes(String(v).toLowerCase());
})();
if (!enabled) {
try { Metrics.inc('onboarding_prompts_skipped_total', 1, { group_id: groupId, reason: 'disabled' }); } catch {}
return;
}
const thrRaw = Number(process.env.ONBOARDING_COVERAGE_THRESHOLD);
const threshold = Number.isFinite(thrRaw) ? Math.min(1, Math.max(0, thrRaw)) : 1;
if (ratio >= threshold) {
try { Metrics.inc('onboarding_prompts_skipped_total', 1, { group_id: groupId, reason: 'coverage_100' }); } catch {}
return;
}
// Gating en modo enforce: no publicar en grupos no allowed
try {
const mode = String(process.env.GROUP_GATING_MODE || 'off').toLowerCase();
if (mode === 'enforce') {
try { (AllowedGroups as any).dbInstance = this.dbInstance; } catch {}
if (!AllowedGroups.isAllowed(groupId)) {
try { Metrics.inc('onboarding_prompts_skipped_total', 1, { group_id: groupId, reason: 'not_allowed' }); } catch {}
return;
}
}
} catch {}
// Grace y cooldown desde tabla groups
const rowG = this.dbInstance.prepare(`
SELECT last_verified, onboarding_prompted_at
FROM groups
WHERE id = ?
`).get(groupId) as any;
const nowMs = Date.now();
const graceRaw = Number(process.env.ONBOARDING_GRACE_SECONDS);
const graceSec = Number.isFinite(graceRaw) && graceRaw >= 0 ? Math.floor(graceRaw) : 90;
const lv = rowG?.last_verified ? String(rowG.last_verified) : null;
if (lv) {
const iso = lv.includes('T') ? lv : (lv.replace(' ', 'T') + 'Z');
const ms = Date.parse(iso);
if (Number.isFinite(ms)) {
const ageSec = Math.floor((nowMs - ms) / 1000);
if (ageSec < graceSec) {
try { Metrics.inc('onboarding_prompts_skipped_total', 1, { group_id: groupId, reason: 'grace_period' }); } catch {}
return;
}
}
}
const cdRaw = Number(process.env.ONBOARDING_COOLDOWN_DAYS);
const cdDays = Number.isFinite(cdRaw) && cdRaw >= 0 ? Math.floor(cdRaw) : 7;
const promptedAt = rowG?.onboarding_prompted_at ? String(rowG.onboarding_prompted_at) : null;
if (promptedAt) {
const iso = promptedAt.includes('T') ? promptedAt : (promptedAt.replace(' ', 'T') + 'Z');
const ms = Date.parse(iso);
if (Number.isFinite(ms)) {
const diffMs = nowMs - ms;
if (diffMs < cdDays * 24 * 60 * 60 * 1000) {
try { Metrics.inc('onboarding_prompts_skipped_total', 1, { group_id: groupId, reason: 'cooldown_active' }); } catch {}
return;
}
}
}
// Número del bot para construir wa.me
const bot = String(process.env.CHATBOT_PHONE_NUMBER || '').trim();
if (!bot || !/^\d+$/.test(bot)) {
try { Metrics.inc('onboarding_prompts_skipped_total', 1, { group_id: groupId, reason: 'missing_bot_number' }); } catch {}
return;
}
// Encolar mensaje en la cola persistente y marcar timestamp en groups
const msg = `Para poder asignarte tareas y acceder a la web, envía 'activar' al bot por privado. Solo hace falta una vez. Enlace: https://wa.me/${bot}`;
this.dbInstance.transaction(() => {
this.dbInstance.prepare(`
INSERT INTO response_queue (recipient, message, status, attempts, metadata, created_at, updated_at, next_attempt_at)
VALUES (?, ?, 'queued', 0, NULL, strftime('%Y-%m-%d %H:%M:%f','now'), strftime('%Y-%m-%d %H:%M:%f','now'), strftime('%Y-%m-%d %H:%M:%f','now'))
`).run(groupId, msg);
this.dbInstance.prepare(`
UPDATE groups
SET onboarding_prompted_at = strftime('%Y-%m-%d %H:%M:%f','now')
WHERE id = ?
`).run(groupId);
})();
try { Metrics.inc('onboarding_prompts_sent_total', 1, { group_id: groupId, reason: 'coverage_below_threshold' }); } catch {}
} catch (e) {
// Evitar romper el flujo si falla el encolado
if (process.env.NODE_ENV !== 'test') {
console.warn('⚠️ Onboarding prompt skipped due to internal error for', groupId, e);
}
}
} catch (e) {
console.warn('⚠️ No se pudo calcular alias_coverage_ratio para', groupId, e);
}
}
/**
* Sync members for all active groups by calling Evolution API and reconciling.
* Devuelve contadores agregados.
*/
static async syncMembersForActiveGroups(): Promise<{ groups: number; added: number; updated: number; deactivated: number }> {
if (process.env.NODE_ENV === 'test') {
return { groups: 0, added: 0, updated: 0, deactivated: 0 };
}
// ensure cache is populated
if (this.activeGroupsCache.size === 0) {
this.cacheActiveGroups();
}
// Etapa 3: gating también en el scheduler masivo
const mode = String(process.env.GROUP_GATING_MODE || 'off').toLowerCase();
const enforce = mode === 'enforce';
if (enforce) {
try { (AllowedGroups as any).dbInstance = this.dbInstance; } catch {}
}
let groups = 0, added = 0, updated = 0, deactivated = 0;
for (const [groupId] of this.activeGroupsCache.entries()) {
try {
if (enforce) {
try {
if (!AllowedGroups.isAllowed(groupId)) {
// Saltar grupos no permitidos en modo enforce
try { Metrics.inc('sync_skipped_group_total'); } catch {}
continue;
}
} catch {
// Si falla el check, no bloquear el grupo
}
}
const snapshot = await this.fetchGroupMembersFromAPI(groupId);
const res = this.reconcileGroupMembers(groupId, snapshot);
groups++;
added += res.added;
updated += res.updated;
deactivated += res.deactivated;
} catch (e) {
console.error(`❌ Failed to sync members for group ${groupId}:`, e instanceof Error ? e.message : String(e));
}
}
console.log(' Members sync summary:', { groups, added, updated, deactivated });
return { groups, added, updated, deactivated };
}
static async syncMembersForGroups(ids: string[]): Promise<{ groups: number; added: number; updated: number; deactivated: number }> {
if (process.env.NODE_ENV === 'test') {
return { groups: 0, added: 0, updated: 0, deactivated: 0 };
}
if (!Array.isArray(ids) || ids.length === 0) {
return { groups: 0, added: 0, updated: 0, deactivated: 0 };
}
const mode = String(process.env.GROUP_GATING_MODE || 'off').toLowerCase();
const enforce = mode === 'enforce';
if (enforce) {
try { (AllowedGroups as any).dbInstance = this.dbInstance; } catch {}
}
let groups = 0, added = 0, updated = 0, deactivated = 0;
for (const groupId of ids) {
try {
if (enforce) {
try {
if (!AllowedGroups.isAllowed(groupId)) {
try { Metrics.inc('sync_skipped_group_total'); } catch {}
continue;
}
} catch {}
}
const snapshot = await this.fetchGroupMembersFromAPI(groupId);
const res = this.reconcileGroupMembers(groupId, snapshot);
groups++;
added += res.added;
updated += res.updated;
deactivated += res.deactivated;
} catch (e) {
console.error(`❌ Failed to sync members for group ${groupId}:`, e instanceof Error ? e.message : String(e));
}
}
console.log(' Targeted members sync summary:', { groups, added, updated, deactivated });
return { groups, added, updated, deactivated };
}
public static refreshActiveGroupsCache(): void {
this.cacheActiveGroups();
}
public static getLastChangedActive(): string[] {
try {
return Array.from(this._lastChangedActive || []);
} catch {
return [];
}
}
public static startGroupsScheduler(): void {
if (process.env.NODE_ENV === 'test') return;
if (this._groupsSchedulerRunning) return;
this._groupsSchedulerRunning = true;
// Intervalo de grupos configurable; mínimo 10s en desarrollo
let interval = Number(process.env.GROUP_SYNC_INTERVAL_MS);
if (!Number.isFinite(interval) || interval <= 0) {
interval = 24 * 60 * 60 * 1000; // 24h por defecto
}
if (process.env.NODE_ENV === 'development' && interval < 10000) {
interval = 10000;
}
this._groupsIntervalMs = interval;
this._groupsNextTickAt = Date.now() + interval;
this._groupsTimer = setInterval(() => {
// Programar el siguiente tick antes de ejecutar la sincronización
this._groupsNextTickAt = Date.now() + (this._groupsIntervalMs ?? interval);
this.syncGroups().catch(err => {
console.error('❌ Groups scheduler run error:', err);
});
}, interval);
}
public static stopGroupsScheduler(): void {
this._groupsSchedulerRunning = false;
if (this._groupsTimer) {
clearInterval(this._groupsTimer);
this._groupsTimer = null;
}
this._groupsIntervalMs = null;
this._groupsNextTickAt = null;
}
public static getSecondsUntilNextGroupSync(nowMs: number = Date.now()): number | null {
const next = this._groupsNextTickAt;
if (next == null) return null;
const secs = (next - nowMs) / 1000;
return secs > 0 ? secs : 0;
}
public static startMembersScheduler(): void {
if (process.env.NODE_ENV === 'test') return;
if (this._membersSchedulerRunning) return;
this._membersSchedulerRunning = true;
// Intervalo por defecto 6h; configurable por env; mínimo 10s en desarrollo
const raw = process.env.GROUP_MEMBERS_SYNC_INTERVAL_MS;
let interval = Number.isFinite(Number(raw)) && Number(raw) > 0 ? Number(raw) : 6 * 60 * 60 * 1000;
if (process.env.NODE_ENV === 'development' && interval < 10000) {
interval = 10000;
}
this._membersTimer = setInterval(() => {
this.syncMembersForActiveGroups().catch(err => {
console.error('❌ Members scheduler run error:', err);
});
}, interval);
}
public static stopMembersScheduler(): void {
this._membersSchedulerRunning = false;
if (this._membersTimer) {
clearInterval(this._membersTimer);
this._membersTimer = null;
}
}
// ===== Helpers de membresía y snapshot (Etapa 3) =====
private static get MAX_SNAPSHOT_AGE_MS(): number {
const raw = Number(process.env.MAX_MEMBERS_SNAPSHOT_AGE_MS);
return Number.isFinite(raw) && raw > 0 ? raw : 24 * 60 * 60 * 1000; // 24h por defecto
}
/**
* Devuelve true si la snapshot de un grupo es "fresca" según MAX_SNAPSHOT_AGE_MS.
* Considera no fresca si no hay registro/fecha.
*/
public static isSnapshotFresh(groupId: string, nowMs: number = Date.now()): boolean {
try {
const row = this.dbInstance.prepare(`SELECT last_verified FROM groups WHERE id = ?`).get(groupId) as any;
const lv = row?.last_verified ? String(row.last_verified) : null;
if (!lv) return false;
// Persistimos 'YYYY-MM-DD HH:MM:SS[.mmm]'. Convertimos a ISO-like para Date.parse
const iso = lv.includes('T') ? lv : (lv.replace(' ', 'T') + 'Z');
const ms = Date.parse(iso);
if (!Number.isFinite(ms)) return false;
return (nowMs - ms) <= this.MAX_SNAPSHOT_AGE_MS;
} catch {
return false;
}
}
/**
* ¿El usuario figura como miembro activo del grupo?
*/
public static isUserActiveInGroup(userId: string, groupId: string): boolean {
if (!userId || !groupId) return false;
const row = this.dbInstance.prepare(`
SELECT 1
FROM group_members
WHERE group_id = ? AND user_id = ? AND is_active = 1
LIMIT 1
`).get(groupId, userId);
return !!row;
}
/**
* Devuelve todos los group_ids activos donde el usuario figura activo.
* Filtra también por grupos activos en la tabla groups.
*/
public static getActiveGroupIdsForUser(userId: string): string[] {
if (!userId) return [];
const rows = this.dbInstance.prepare(`
SELECT gm.group_id AS id
FROM group_members gm
JOIN groups g ON g.id = gm.group_id
WHERE gm.user_id = ? AND gm.is_active = 1 AND g.active = 1
AND COALESCE(g.is_community,0) = 0
AND COALESCE(g.archived,0) = 0
`).all(userId) as any[];
const set = new Set<string>();
for (const r of rows) {
if (r?.id) set.add(String(r.id));
}
return Array.from(set);
}
/**
* Devuelve los group_ids donde el usuario es miembro activo y cuya snapshot es fresca.
*/
public static getFreshMemberGroupsForUser(userId: string): string[] {
const gids = this.getActiveGroupIdsForUser(userId);
return gids.filter(gid => this.isSnapshotFresh(gid));
}
/**
* Asegura un registro de grupo activo en la base de datos (upsert idempotente).
* Si no existe, lo crea con active=1. Si existe y estaba inactivo, lo reactiva.
* Puede actualizar el nombre si se proporciona.
*/
public static ensureGroupExists(groupId: string, name?: string | null): { created: boolean; reactivated: boolean; updatedName: boolean } {
if (!groupId) return { created: false, reactivated: false, updatedName: false };
let created = false, reactivated = false, updatedName = false;
this.dbInstance.transaction(() => {
const row = this.dbInstance.prepare(`SELECT id, active, name FROM groups WHERE id = ?`).get(groupId) as any;
if (!row) {
const community = process.env.WHATSAPP_COMMUNITY_ID || '';
this.dbInstance.prepare(`
INSERT INTO groups (id, community_id, name, active, last_verified)
VALUES (?, ?, ?, 1, CURRENT_TIMESTAMP)
`).run(groupId, community, name || null);
created = true;
} else {
// Reactivar si estaba inactivo y opcionalmente actualizar nombre
const shouldUpdateName = (typeof name === 'string' && name.trim().length > 0 && name !== row.name);
if (row.active !== 1 || shouldUpdateName) {
this.dbInstance.prepare(`
UPDATE groups
SET active = 1,
name = COALESCE(?, name),
last_verified = CURRENT_TIMESTAMP
WHERE id = ?
`).run(shouldUpdateName ? name : null, groupId);
reactivated = row.active !== 1;
updatedName = shouldUpdateName;
}
}
})();
// Actualizar caché
this.cacheActiveGroups();
Metrics.set('active_groups', this.activeGroupsCache.size);
return { created, reactivated, updatedName };
}
/**
* Asegura tener el nombre/label de un grupo (cache/DB/API) y lo persiste tanto en groups como en allowed_groups.
* Devuelve el nombre si se pudo resolver, o null en caso contrario.
*/
public static async ensureGroupLabelAndName(groupId: string): Promise<string | null> {
try {
if (!groupId) return null;
// 1) Cache en memoria
const cached = this.activeGroupsCache.get(groupId);
if (cached && cached.trim()) {
try { this.ensureGroupExists(groupId, cached); } catch {}
try { (AllowedGroups as any).dbInstance = this.dbInstance; AllowedGroups.upsertPending(groupId, cached, null); } catch {}
this.cacheActiveGroups();
return cached;
}
// 2) DB (tabla groups)
try {
const row = this.dbInstance.prepare('SELECT name FROM groups WHERE id = ?').get(groupId) as any;
const name = row?.name ? String(row.name).trim() : '';
if (name) {
try { this.ensureGroupExists(groupId, name); } catch {}
try { (AllowedGroups as any).dbInstance = this.dbInstance; AllowedGroups.upsertPending(groupId, name, null); } catch {}
this.cacheActiveGroups();
return name;
}
} catch {}
// 3) API (evitar en tests)
if (process.env.NODE_ENV !== 'test') {
const groups = await this.fetchGroupsFromAPI();
const g = groups.find((gg) => gg?.id === groupId);
const subject = g?.subject ? String(g.subject).trim() : '';
if (subject) {
try { this.ensureGroupExists(groupId, subject); } catch {}
try { (AllowedGroups as any).dbInstance = this.dbInstance; AllowedGroups.upsertPending(groupId, subject, null); } catch {}
this.cacheActiveGroups();
return subject;
}
}
return null;
} catch {
return null;
}
}
/**
* Sincroniza miembros para un grupo concreto (útil tras detectar un grupo nuevo).
*/
public static async syncMembersForGroup(groupId: string): Promise<{ added: number; updated: number; deactivated: number }> {
// Gating en modo 'enforce': solo sincronizar miembros para grupos permitidos
try {
const mode = String(process.env.GROUP_GATING_MODE || 'off').toLowerCase();
if (mode === 'enforce') {
try {
(AllowedGroups as any).dbInstance = this.dbInstance;
if (!AllowedGroups.isAllowed(groupId)) {
try { Metrics.inc('sync_skipped_group_total'); } catch {}
return { added: 0, updated: 0, deactivated: 0 };
}
} catch {
// Si el check falla, seguimos sin bloquear
}
}
} catch {}
try {
// Asegurar existencia del grupo en DB (FKs) antes de reconciliar
this.ensureGroupExists(groupId);
const snapshot = await (this as any).fetchGroupMembersFromAPI(groupId);
return this.reconcileGroupMembers(groupId, snapshot);
} catch (e) {
console.error(`❌ Failed to sync members for group ${groupId}:`, e instanceof Error ? e.message : String(e));
return { added: 0, updated: 0, deactivated: 0 };
}
}
/**
* Devuelve los IDs de usuario activos del grupo, filtrados a dígitos puros con longitud < 14.
* No devuelve duplicados.
*/
public static listActiveMemberIds(groupId: string): string[] {
if (!groupId) return [];
try {
const rows = this.dbInstance.prepare(`
SELECT user_id
FROM group_members
WHERE group_id = ? AND is_active = 1
`).all(groupId) as Array<{ user_id: string }>;
const out = new Set<string>();
for (const r of rows) {
const uid = String(r.user_id || '').trim();
if (/^\d+$/.test(uid) && uid.length < 14) {
out.add(uid);
}
}
return Array.from(out);
} catch {
return [];
}
}
}