import { string } from 'alga-js' import type { H3Event } from 'h3' import { sendNewOrderNotification } from '../../../utils/pushNotifier' // ============================================================================ // CONNECTION REGISTRY // ============================================================================ interface ClientConnection { res: any // Response stream roleId: number // User's role ID from JWT filters: { orgId?: string documentNo?: string externalOrderId?: string openOnly?: boolean } } // Global registry of all active SSE connections const activeConnections = new Map() // ============================================================================ // GLOBAL POLLING STATE // ============================================================================ let globalPollingInterval: NodeJS.Timeout | null = null let globalLastOrderId = 0 const POLLING_INTERVAL = 30000 // 30 seconds const HEARTBEAT_INTERVAL = 15000 // 15 seconds // ============================================================================ // SSE HELPER FUNCTIONS // ============================================================================ /** * Send SSE message to a specific client */ function sendSSEToClient(res: any, data: any) { try { res.write(`data: ${JSON.stringify(data)}\n\n`) } catch (err) { console.error('[SSE] Error writing to stream:', err) } } /** * Broadcast SSE message to all connected clients */ function broadcastSSE(data: any) { activeConnections.forEach((connection, clientId) => { sendSSEToClient(connection.res, data) }) } // ============================================================================ // FILTERING LOGIC // ============================================================================ /** * Filter orders for a specific client based on: * 1. Role-based organization access (using AD_Role_OrgAccess) * 2. Client-specific filters (documentNo, externalOrderId, openOnly) */ async function filterOrdersForClient( orders: any[], connection: ClientConnection, orgRoleAccessMap: Map> ): Promise { return orders.filter(order => { const orderOrgId = order.AD_Org_ID?.id if (orderOrgId === undefined) { return false // Skip orders without org ID } // 1. Check role-based organization access if (!hasRoleAccessToOrg(orgRoleAccessMap, connection.roleId, orderOrgId)) { return false // User's role doesn't have access to this org } // 2. Apply client-specific filters const filters = connection.filters // Organization filter (if client requested specific org) if (filters.orgId && orderOrgId != parseInt(filters.orgId)) { return false } // Document number filter if (filters.documentNo) { const docNo = order.DocumentNo || order.documentNo || '' if (!docNo.includes(filters.documentNo)) { return false } } // External order ID filter if (filters.externalOrderId) { const extId = order.ExternalOrderId || order.externalOrderId || '' if (!extId.includes(filters.externalOrderId)) { return false } } // Note: openOnly filter is handled at query level, not here return true }) } // ============================================================================ // GLOBAL POLLING LOOP // ============================================================================ /** * Single global polling function that queries ALL new orders * and broadcasts them to connected clients with appropriate filtering */ async function pollForNewOrders() { if (activeConnections.size === 0) { return // No clients connected, skip polling } try { // Get any connection's event to use for token/fetch const firstConnection = activeConnections.values().next().value if (!firstConnection) return // Get a dummy event from the first connection to fetch token // Note: We need a proper token, ideally a system-level token or the first user's token const token = await getTokenHelper(firstConnection.event) // Fetch org-role access mapping (cached) const orgRoleAccessMap = await getOrgRoleAccessMap(firstConnection.event) // Build filter for querying ALL new orders (no org-specific filters) let filterParts = [`c_order_id gt ${globalLastOrderId}`] const filter = filterParts.join(' AND ') // Check if any client has openOnly filter enabled const hasOpenOnlyClients = Array.from(activeConnections.values()).some( conn => conn.filters.openOnly === true ) const hasRegularClients = Array.from(activeConnections.values()).some( conn => conn.filters.openOnly !== true ) // If we have mixed clients (some with openOnly, some without), we need to fetch both // For simplicity, we'll fetch regular orders and filter fulfilled ones client-side const model = 'models/c_order' const filterString = string.urlEncode('isSOTrx eq true AND (isActive eq true OR isActive eq false) AND ') + string.urlEncode(filter) const res: any = await firstConnection.event.context.fetch( `${model}?$filter=${filterString}&$expand=c_ordersource_id,c_bpartner_location_id,bill_location_id,c_orderline,m_inout($expand=M_Shipper_ID),M_Shipper_ID&$orderby=${string.urlEncode('c_order_id desc')}&$top=100`, 'GET', token, null ) if (res?.records && res.records.length > 0) { const allNewOrders = res.records // Update global last order ID const maxId = Math.max(...allNewOrders.map((order: any) => order.id || 0)) if (maxId > globalLastOrderId) { globalLastOrderId = maxId console.log(`[SSE Poll] Found ${allNewOrders.length} new orders, broadcasting to ${activeConnections.size} clients`) // Collect all unique role IDs that should receive push notifications const notifiedRoles = new Set() // Broadcast filtered orders to each connected client for (const [clientId, connection] of activeConnections.entries()) { const filteredOrders = await filterOrdersForClient( allNewOrders, connection, orgRoleAccessMap ) if (filteredOrders.length > 0) { sendSSEToClient(connection.res, { type: 'newOrders', orders: filteredOrders, lastOrderId: maxId, count: filteredOrders.length }) console.log(`[SSE] Sent ${filteredOrders.length} orders to client ${clientId} (role ${connection.roleId})`) // Track roles for push notifications notifiedRoles.add(connection.roleId) } } // Send push notifications to all roles that have access to these orders if (notifiedRoles.size > 0) { const roleIds = Array.from(notifiedRoles) console.log(`[SSE] Sending push notifications to roles:`, roleIds) // Send push notification in background (don't wait) sendNewOrderNotification(roleIds, allNewOrders.length, { maxOrderId: maxId }).catch((error) => { console.error('[SSE] Error sending push notifications:', error) }) } } } } catch (error: any) { console.error('[SSE Poll] Error polling for orders:', error.message) // Notify all clients of the error broadcastSSE({ type: 'error', message: 'Error fetching orders', error: error.message }) } } /** * Start the global polling loop (only if not already running) */ function startGlobalPolling() { if (globalPollingInterval) { return // Already running } console.log('[SSE] Starting global polling loop') // Initial poll pollForNewOrders() // Set up recurring poll globalPollingInterval = setInterval(pollForNewOrders, POLLING_INTERVAL) } /** * Stop the global polling loop (when no clients are connected) */ function stopGlobalPolling() { if (globalPollingInterval) { console.log('[SSE] Stopping global polling loop') clearInterval(globalPollingInterval) globalPollingInterval = null } } // ============================================================================ // HEARTBEAT // ============================================================================ let globalHeartbeatInterval: NodeJS.Timeout | null = null function startGlobalHeartbeat() { if (globalHeartbeatInterval) { return // Already running } globalHeartbeatInterval = setInterval(() => { broadcastSSE({ type: 'heartbeat', timestamp: Date.now(), connectedClients: activeConnections.size }) }, HEARTBEAT_INTERVAL) } function stopGlobalHeartbeat() { if (globalHeartbeatInterval) { clearInterval(globalHeartbeatInterval) globalHeartbeatInterval = null } } // ============================================================================ // EVENT HANDLER // ============================================================================ export default defineEventHandler(async (event) => { const query = getQuery(event) const clientId = query.clientId as string || `client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}` // Set SSE headers setResponseHeaders(event, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no' }) // Extract user's role ID from JWT token const token = await getTokenHelper(event) let roleId: number try { // Decode JWT to get role ID const tokenPayload = JSON.parse(Buffer.from(token.split('.')[1], 'base64').toString()) roleId = tokenPayload.AD_Role_ID if (!roleId) { throw new Error('Role ID not found in token') } } catch (error: any) { console.error('[SSE] Error extracting role from token:', error.message) sendSSEToClient(event.node.res, { type: 'error', message: 'Authentication error: Invalid token' }) return } // Register this connection activeConnections.set(clientId, { res: event.node.res, roleId, filters: { orgId: query.orgId as string, documentNo: query.documentNo as string, externalOrderId: query.externalOrderId as string, openOnly: query.openOnly === 'true' }, event // Store event for later use in polling } as any) // Initialize globalLastOrderId from client's lastOrderId if this is the first connection if (activeConnections.size === 1 && query.lastOrderId) { const clientLastOrderId = parseInt(query.lastOrderId as string) || 0 if (clientLastOrderId > globalLastOrderId) { globalLastOrderId = clientLastOrderId console.log(`[SSE] Initialized global lastOrderId from client: ${globalLastOrderId}`) } } console.log(`[SSE] Client ${clientId} connected (role: ${roleId}, lastOrderId: ${query.lastOrderId}, global: ${globalLastOrderId}, total clients: ${activeConnections.size})`) // Send initial connection message sendSSEToClient(event.node.res, { type: 'connected', clientId, roleId, lastOrderId: globalLastOrderId, message: 'Connected to order stream' }) // Start global polling if this is the first client if (activeConnections.size === 1) { startGlobalPolling() startGlobalHeartbeat() } // Clean up on disconnect event.node.req.on('close', () => { activeConnections.delete(clientId) console.log(`[SSE] Client ${clientId} disconnected (remaining clients: ${activeConnections.size})`) // Stop polling if no clients remain if (activeConnections.size === 0) { stopGlobalPolling() stopGlobalHeartbeat() } }) // Keep connection alive return new Promise(() => {}) })