diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 7fd06bcdb..38f6df448 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -530,7 +530,7 @@ func main() { ldrs, anz, dspS, dmService, storDBService, services.NewEventExporterService(cfg, filterSChan, connManager, server, exitChan, internalEEsChan), - services.NewRateService(cfg, filterSChan, + services.NewRateService(cfg, filterSChan, dmService, server, exitChan, internalRateSChan), services.NewSIPAgent(cfg, filterSChan, exitChan, connManager), ) diff --git a/rates/rates.go b/rates/rates.go index 9a6e059bf..dac66c21a 100644 --- a/rates/rates.go +++ b/rates/rates.go @@ -20,6 +20,7 @@ package rates import ( "fmt" + "sort" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -27,10 +28,11 @@ import ( ) // NewRateS instantiates the RateS -func NewRateS(cfg *config.CGRConfig, filterS *engine.FilterS) *RateS { +func NewRateS(cfg *config.CGRConfig, filterS *engine.FilterS, dm *engine.DataManager) *RateS { return &RateS{ cfg: cfg, filterS: filterS, + dm: dm, } } @@ -68,61 +70,47 @@ func (rS *RateS) Call(serviceMethod string, args interface{}, reply interface{}) return utils.RPCCall(rS, serviceMethod, args, reply) } -/* // matchingRateProfileForEvent returns the matched RateProfile for the given event -func (rS *RateS) matchingRateProfileForEvent(cgrEv *utils.CGREvent) (rtPfl *RateProfile, err error) { - var rPrfIDs []string - if rPrfIDs, err = engine.MatchingItemIDsForEvent( +func (rS *RateS) matchingRateProfileForEvent(cgrEv *utils.CGREvent) (rtPfl *engine.RateProfile, err error) { + var rPfIDs utils.StringMap + if rPfIDs, err = engine.MatchingItemIDsForEvent( cgrEv.Event, rS.cfg.RateSCfg().StringIndexedFields, rS.cfg.RateSCfg().PrefixIndexedFields, rS.dm, utils.CacheRateProfilesFilterIndexes, cgrEv.Tenant, - rpS.cgrcfg.RouteSCfg().IndexedSelects, - rpS.cgrcfg.RouteSCfg().NestedFields, + rS.cfg.RouteSCfg().IndexedSelects, + rS.cfg.RouteSCfg().NestedFields, ); err != nil { return } - evNm := utils.MapStorage{utils.MetaReq: ev.Event} - for lpID := range rPrfIDs { - rPrf, err := rpS.dm.GetRouteProfile(ev.Tenant, lpID, true, true, utils.NonTransactional) - if err != nil { + var matchingRPfs []*engine.RateProfile + evNm := utils.MapStorage{utils.MetaReq: cgrEv.Event} + for rPfID := range rPfIDs { + var rPf *engine.RateProfile + if rPf, err = rS.dm.GetRateProfile(cgrEv.Tenant, rPfID, true, true, utils.NonTransactional); err != nil { if err == utils.ErrNotFound { + err = nil continue } - return nil, err + return } - if rPrf.ActivationInterval != nil && ev.Time != nil && - !rPrf.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active + if rPf.ActivationInterval != nil && cgrEv.Time != nil && + !rPf.ActivationInterval.IsActiveAtTime(*cgrEv.Time) { // not active continue } - if pass, err := rpS.filterS.Pass(ev.Tenant, rPrf.FilterIDs, - evNm); err != nil { - return nil, err + var pass bool + if pass, err = rS.filterS.Pass(cgrEv.Tenant, rPf.FilterIDs, evNm); err != nil { + return } else if !pass { continue } - if singleResult { - if matchingRPrf[0] == nil || matchingRPrf[0].Weight < rPrf.Weight { - matchingRPrf[0] = rPrf - } - } else { - matchingRPrf = append(matchingRPrf, rPrf) - } - } - if singleResult { - if matchingRPrf[0] == nil { - return nil, utils.ErrNotFound - } - } else { - if len(matchingRPrf) == 0 { - return nil, utils.ErrNotFound - } - sort.Slice(matchingRPrf, func(i, j int) bool { return matchingRPrf[i].Weight > matchingRPrf[j].Weight }) + + matchingRPfs = append(matchingRPfs, rPf) } + sort.Slice(matchingRPfs, func(i, j int) bool { return matchingRPfs[i].Weight > matchingRPfs[j].Weight }) return } -*/ // V1CostForEvent will be called to calculate the cost for an event func (rS *RateS) V1CostForEvent(cgrEv *utils.CGREventWithOpts, cC *utils.ChargedCost) (err error) { diff --git a/services/datadb.go b/services/datadb.go index c0871e9a1..e03c8adbe 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -43,7 +43,7 @@ type DataDBService struct { oldDBCfg *config.DataDbCfg connMgr *engine.ConnManager - db *engine.DataManager + dm *engine.DataManager dbchan chan *engine.DataManager } @@ -69,13 +69,13 @@ func (db *DataDBService) Start() (err error) { return } - db.db = engine.NewDataManager(d, db.cfg.CacheCfg(), db.connMgr) - engine.SetDataStorage(db.db) - if err = engine.CheckVersions(db.db.DataDB()); err != nil { + db.dm = engine.NewDataManager(d, db.cfg.CacheCfg(), db.connMgr) + engine.SetDataStorage(db.dm) + if err = engine.CheckVersions(db.dm.DataDB()); err != nil { fmt.Println(err) return } - db.dbchan <- db.db + db.dbchan <- db.dm return } @@ -84,14 +84,14 @@ func (db *DataDBService) Reload() (err error) { db.Lock() defer db.Unlock() if db.needsConnectionReload() { - if err = db.db.Reconnect(db.cfg.GeneralCfg().DBDataEncoding, db.cfg.DataDbCfg()); err != nil { + if err = db.dm.Reconnect(db.cfg.GeneralCfg().DBDataEncoding, db.cfg.DataDbCfg()); err != nil { return } db.oldDBCfg = db.cfg.DataDbCfg().Clone() return } if db.cfg.DataDbCfg().DataDbType == utils.MONGO { - mgo, canCast := db.db.DataDB().(*engine.MongoStorage) + mgo, canCast := db.dm.DataDB().(*engine.MongoStorage) if !canCast { return fmt.Errorf("can't conver DataDB of type %s to MongoStorage", db.cfg.DataDbCfg().DataDbType) @@ -104,8 +104,8 @@ func (db *DataDBService) Reload() (err error) { // Shutdown stops the service func (db *DataDBService) Shutdown() (err error) { db.Lock() - db.db.DataDB().Close() - db.db = nil + db.dm.DataDB().Close() + db.dm = nil db.Unlock() return } @@ -114,7 +114,7 @@ func (db *DataDBService) Shutdown() (err error) { func (db *DataDBService) IsRunning() bool { db.RLock() defer db.RUnlock() - return db != nil && db.db != nil && db.db.DataDB() != nil + return db != nil && db.dm != nil && db.dm.DataDB() != nil } // ServiceName returns the service name @@ -139,7 +139,7 @@ func (db *DataDBService) mandatoryDB() bool { func (db *DataDBService) GetDM() *engine.DataManager { db.RLock() defer db.RUnlock() - return db.db + return db.dm } // needsConnectionReload returns if the DB connection needs to reloaded diff --git a/services/rates.go b/services/rates.go index faed0de0b..8ec872d04 100644 --- a/services/rates.go +++ b/services/rates.go @@ -33,12 +33,15 @@ import ( ) // NewRateService constructs RateService -func NewRateService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, +func NewRateService( + cfg *config.CGRConfig, filterSChan chan *engine.FilterS, + dmS *DataDBService, server *utils.Server, exitChan chan bool, intConnChan chan rpcclient.ClientConnector) servmanager.Service { return &RateService{ cfg: cfg, filterSChan: filterSChan, + dmS: dmS, server: server, exitChan: exitChan, intConnChan: intConnChan, @@ -52,6 +55,7 @@ type RateService struct { cfg *config.CGRConfig filterSChan chan *engine.FilterS + dmS *DataDBService server *utils.Server exitChan chan bool intConnChan chan rpcclient.ClientConnector @@ -104,9 +108,11 @@ func (rs *RateService) Start() (err error) { fltrS := <-rs.filterSChan rs.filterSChan <- fltrS - + dbchan := rs.dmS.GetDMChan() + dm := <-dbchan + dbchan <- dm rs.Lock() - rs.rateS = rates.NewRateS(rs.cfg, fltrS) + rs.rateS = rates.NewRateS(rs.cfg, fltrS, dm) rs.Unlock() /*rs.rpc = v1.NewEventExporterSv1(es.eeS) if !rs.cfg.DispatcherSCfg().Enabled {