From 3b195dcf1d02d287127f9b0acde1bbcd4eb3bb96 Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 23 Nov 2024 20:13:57 +0100 Subject: [PATCH] Adding StateIndexer, ServiceIndexer, StateDeps --- services/libstatedeps.go | 49 +++++++++++++++++++++++++++ services/serviceindexer.go | 69 ++++++++++++++++++++++++++++++++++++++ services/stateindexer.go | 69 ++++++++++++++++++++++++++++++++++++++ utils/coreutils.go | 10 ++++++ 4 files changed, 197 insertions(+) create mode 100644 services/libstatedeps.go create mode 100644 services/serviceindexer.go create mode 100644 services/stateindexer.go diff --git a/services/libstatedeps.go b/services/libstatedeps.go new file mode 100644 index 000000000..dd9df8275 --- /dev/null +++ b/services/libstatedeps.go @@ -0,0 +1,49 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "sync" +) + +// newStateDependencies constructs a stateDependencies struct +func newStateDependencies() *stateDependencies { + return &stateDependencies{stateDeps: make(map[string]chan struct{})} +} + +// stateDependencies enhances a service with state dependencies management +type stateDependencies struct { + stateDeps map[string]chan struct{} // listeners for various states of the service + stateDepsMux sync.RWMutex // protects stateDeps +} + +// RegisterStateDependency will be called by a service interested by specific stateID of the service +func (sDs *stateDependencies) RegisterStateDependency(stateID string) (retChan chan struct{}) { + sDs.stateDepsMux.RLock() + retChan = sDs.stateDeps[stateID] + sDs.stateDepsMux.RUnlock() + if retChan != nil { + return + } + sDs.stateDepsMux.Lock() + defer sDs.stateDepsMux.Unlock() + retChan = make(chan struct{}) + sDs.stateDeps[stateID] = retChan + return +} diff --git a/services/serviceindexer.go b/services/serviceindexer.go new file mode 100644 index 000000000..86c2377bc --- /dev/null +++ b/services/serviceindexer.go @@ -0,0 +1,69 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "sync" + + "github.com/cgrates/cgrates/servmanager" +) + +// NewServiceIndexer constructs a ServiceIndexer +func NewServiceIndexer() *ServiceIndexer { + return &ServiceIndexer{srvS: make(map[string]servmanager.Service)} +} + +// ServiceIndexer implements servmanager.Service indexing in a thread safe way +type ServiceIndexer struct { + mux sync.RWMutex + + srvS map[string]servmanager.Service // servmanager.Services indexed by ID +} + +// Getservmanager.Service returns one servmanager.Service or nil +func (sI *ServiceIndexer) GetService(srvID string) servmanager.Service { + sI.mux.RLock() + defer sI.mux.RUnlock() + return sI.srvS[srvID] +} + +// Addservmanager.Service adds a servmanager.Service based on it's id to the index +func (sI *ServiceIndexer) AddService(srvID string, srv servmanager.Service) { + sI.mux.Lock() + sI.srvS[srvID] = srv + sI.mux.Unlock() +} + +// Remservmanager.Service will remove a servmanager.Service based on it's ID +func (sI *ServiceIndexer) RemService(srvID string) { + sI.mux.Lock() + defer sI.mux.Unlock() + delete(sI.srvS, srvID) +} + +// Getservmanager.Services returns the list of servmanager.Services indexed +func (sI *ServiceIndexer) GetServices() []servmanager.Service { + sI.mux.RLock() + defer sI.mux.RUnlock() + srvs := make([]servmanager.Service, 0, len(sI.srvS)) + for _, s := range sI.srvS { + srvs = append(srvs, s) + } + return srvs +} diff --git a/services/stateindexer.go b/services/stateindexer.go new file mode 100644 index 000000000..63810f2cc --- /dev/null +++ b/services/stateindexer.go @@ -0,0 +1,69 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "sync" + + "github.com/cgrates/cgrates/servmanager" +) + +// NewStateIndexer constructs a StateIndexer +func NewStateIndexer() *StateIndexer { + return &StateIndexer{srvS: make(map[string]servmanager.Service)} +} + +// StateIndexer implements service indexing in a thread safe way +type StateIndexer struct { + mux sync.RWMutex + + srvS map[string]servmanager.Service // services indexed by ID +} + +// GetService returns one service or nil +func (sI *StateIndexer) GetService(srvID string) servmanager.Service { + sI.mux.RLock() + defer sI.mux.RUnlock() + return sI.srvS[srvID] +} + +// AddService adds a service based on it's id to the index +func (sI *StateIndexer) AddService(srvID string, srv servmanager.Service) { + sI.mux.Lock() + sI.srvS[srvID] = srv + sI.mux.Unlock() +} + +// RemService will remove a service based on it's ID +func (sI *StateIndexer) RemService(srvID string) { + sI.mux.Lock() + defer sI.mux.Unlock() + delete(sI.srvS, srvID) +} + +// GetServices returns the list of services indexed +func (sI *StateIndexer) GetServices() []servmanager.Service { + sI.mux.RLock() + defer sI.mux.RUnlock() + srvs := make([]servmanager.Service, 0, len(sI.srvS)) + for _, s := range sI.srvS { + srvs = append(srvs, s) + } + return srvs +} diff --git a/utils/coreutils.go b/utils/coreutils.go index 3fa406a75..359de4024 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -981,3 +981,13 @@ func SplitPath(rule string, sep byte, n int) []string { splt = append(splt, rule[pos:]) // add last element return splt } + +// StructChanTimeout will return true if timeout occurs before struct is received +func StructChanTimeout(chn chan struct{}, timeout time.Duration) bool { + select { + case <-chn: + return false + case <-time.After(timeout): + return true + } +}