RateS.matchingRateProfileForEvent

This commit is contained in:
DanB
2020-06-12 12:47:04 +02:00
parent a6258b08e9
commit 373b4bb233
4 changed files with 44 additions and 50 deletions

View File

@@ -530,7 +530,7 @@ func main() {
ldrs, anz, dspS, dmService, storDBService,
services.NewEventExporterService(cfg, filterSChan,
connManager, server, exitChan, internalEEsChan),
services.NewRateService(cfg, filterSChan,
services.NewRateService(cfg, filterSChan, dmService,
server, exitChan, internalRateSChan),
services.NewSIPAgent(cfg, filterSChan, exitChan, connManager),
)

View File

@@ -20,6 +20,7 @@ package rates
import (
"fmt"
"sort"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -27,10 +28,11 @@ import (
)
// NewRateS instantiates the RateS
func NewRateS(cfg *config.CGRConfig, filterS *engine.FilterS) *RateS {
func NewRateS(cfg *config.CGRConfig, filterS *engine.FilterS, dm *engine.DataManager) *RateS {
return &RateS{
cfg: cfg,
filterS: filterS,
dm: dm,
}
}
@@ -68,61 +70,47 @@ func (rS *RateS) Call(serviceMethod string, args interface{}, reply interface{})
return utils.RPCCall(rS, serviceMethod, args, reply)
}
/*
// matchingRateProfileForEvent returns the matched RateProfile for the given event
func (rS *RateS) matchingRateProfileForEvent(cgrEv *utils.CGREvent) (rtPfl *RateProfile, err error) {
var rPrfIDs []string
if rPrfIDs, err = engine.MatchingItemIDsForEvent(
func (rS *RateS) matchingRateProfileForEvent(cgrEv *utils.CGREvent) (rtPfl *engine.RateProfile, err error) {
var rPfIDs utils.StringMap
if rPfIDs, err = engine.MatchingItemIDsForEvent(
cgrEv.Event,
rS.cfg.RateSCfg().StringIndexedFields,
rS.cfg.RateSCfg().PrefixIndexedFields,
rS.dm, utils.CacheRateProfilesFilterIndexes,
cgrEv.Tenant,
rpS.cgrcfg.RouteSCfg().IndexedSelects,
rpS.cgrcfg.RouteSCfg().NestedFields,
rS.cfg.RouteSCfg().IndexedSelects,
rS.cfg.RouteSCfg().NestedFields,
); err != nil {
return
}
evNm := utils.MapStorage{utils.MetaReq: ev.Event}
for lpID := range rPrfIDs {
rPrf, err := rpS.dm.GetRouteProfile(ev.Tenant, lpID, true, true, utils.NonTransactional)
if err != nil {
var matchingRPfs []*engine.RateProfile
evNm := utils.MapStorage{utils.MetaReq: cgrEv.Event}
for rPfID := range rPfIDs {
var rPf *engine.RateProfile
if rPf, err = rS.dm.GetRateProfile(cgrEv.Tenant, rPfID, true, true, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = nil
continue
}
return nil, err
return
}
if rPrf.ActivationInterval != nil && ev.Time != nil &&
!rPrf.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active
if rPf.ActivationInterval != nil && cgrEv.Time != nil &&
!rPf.ActivationInterval.IsActiveAtTime(*cgrEv.Time) { // not active
continue
}
if pass, err := rpS.filterS.Pass(ev.Tenant, rPrf.FilterIDs,
evNm); err != nil {
return nil, err
var pass bool
if pass, err = rS.filterS.Pass(cgrEv.Tenant, rPf.FilterIDs, evNm); err != nil {
return
} else if !pass {
continue
}
if singleResult {
if matchingRPrf[0] == nil || matchingRPrf[0].Weight < rPrf.Weight {
matchingRPrf[0] = rPrf
}
} else {
matchingRPrf = append(matchingRPrf, rPrf)
}
}
if singleResult {
if matchingRPrf[0] == nil {
return nil, utils.ErrNotFound
}
} else {
if len(matchingRPrf) == 0 {
return nil, utils.ErrNotFound
}
sort.Slice(matchingRPrf, func(i, j int) bool { return matchingRPrf[i].Weight > matchingRPrf[j].Weight })
matchingRPfs = append(matchingRPfs, rPf)
}
sort.Slice(matchingRPfs, func(i, j int) bool { return matchingRPfs[i].Weight > matchingRPfs[j].Weight })
return
}
*/
// V1CostForEvent will be called to calculate the cost for an event
func (rS *RateS) V1CostForEvent(cgrEv *utils.CGREventWithOpts, cC *utils.ChargedCost) (err error) {

View File

@@ -43,7 +43,7 @@ type DataDBService struct {
oldDBCfg *config.DataDbCfg
connMgr *engine.ConnManager
db *engine.DataManager
dm *engine.DataManager
dbchan chan *engine.DataManager
}
@@ -69,13 +69,13 @@ func (db *DataDBService) Start() (err error) {
return
}
db.db = engine.NewDataManager(d, db.cfg.CacheCfg(), db.connMgr)
engine.SetDataStorage(db.db)
if err = engine.CheckVersions(db.db.DataDB()); err != nil {
db.dm = engine.NewDataManager(d, db.cfg.CacheCfg(), db.connMgr)
engine.SetDataStorage(db.dm)
if err = engine.CheckVersions(db.dm.DataDB()); err != nil {
fmt.Println(err)
return
}
db.dbchan <- db.db
db.dbchan <- db.dm
return
}
@@ -84,14 +84,14 @@ func (db *DataDBService) Reload() (err error) {
db.Lock()
defer db.Unlock()
if db.needsConnectionReload() {
if err = db.db.Reconnect(db.cfg.GeneralCfg().DBDataEncoding, db.cfg.DataDbCfg()); err != nil {
if err = db.dm.Reconnect(db.cfg.GeneralCfg().DBDataEncoding, db.cfg.DataDbCfg()); err != nil {
return
}
db.oldDBCfg = db.cfg.DataDbCfg().Clone()
return
}
if db.cfg.DataDbCfg().DataDbType == utils.MONGO {
mgo, canCast := db.db.DataDB().(*engine.MongoStorage)
mgo, canCast := db.dm.DataDB().(*engine.MongoStorage)
if !canCast {
return fmt.Errorf("can't conver DataDB of type %s to MongoStorage",
db.cfg.DataDbCfg().DataDbType)
@@ -104,8 +104,8 @@ func (db *DataDBService) Reload() (err error) {
// Shutdown stops the service
func (db *DataDBService) Shutdown() (err error) {
db.Lock()
db.db.DataDB().Close()
db.db = nil
db.dm.DataDB().Close()
db.dm = nil
db.Unlock()
return
}
@@ -114,7 +114,7 @@ func (db *DataDBService) Shutdown() (err error) {
func (db *DataDBService) IsRunning() bool {
db.RLock()
defer db.RUnlock()
return db != nil && db.db != nil && db.db.DataDB() != nil
return db != nil && db.dm != nil && db.dm.DataDB() != nil
}
// ServiceName returns the service name
@@ -139,7 +139,7 @@ func (db *DataDBService) mandatoryDB() bool {
func (db *DataDBService) GetDM() *engine.DataManager {
db.RLock()
defer db.RUnlock()
return db.db
return db.dm
}
// needsConnectionReload returns if the DB connection needs to reloaded

View File

@@ -33,12 +33,15 @@ import (
)
// NewRateService constructs RateService
func NewRateService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
func NewRateService(
cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
dmS *DataDBService,
server *utils.Server, exitChan chan bool,
intConnChan chan rpcclient.ClientConnector) servmanager.Service {
return &RateService{
cfg: cfg,
filterSChan: filterSChan,
dmS: dmS,
server: server,
exitChan: exitChan,
intConnChan: intConnChan,
@@ -52,6 +55,7 @@ type RateService struct {
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
dmS *DataDBService
server *utils.Server
exitChan chan bool
intConnChan chan rpcclient.ClientConnector
@@ -104,9 +108,11 @@ func (rs *RateService) Start() (err error) {
fltrS := <-rs.filterSChan
rs.filterSChan <- fltrS
dbchan := rs.dmS.GetDMChan()
dm := <-dbchan
dbchan <- dm
rs.Lock()
rs.rateS = rates.NewRateS(rs.cfg, fltrS)
rs.rateS = rates.NewRateS(rs.cfg, fltrS, dm)
rs.Unlock()
/*rs.rpc = v1.NewEventExporterSv1(es.eeS)
if !rs.cfg.DispatcherSCfg().Enabled {