From e02cc76d52c4524c59b9a0bbb82f75a082c160ac Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 6 Dec 2017 16:26:58 +0100 Subject: [PATCH] Engine starting Alias service, AliasSv1 in apier --- apier/v1/{aliasprofile.go => alias.go} | 29 +++++++++++ cmd/cgr-engine/cgr-engine.go | 29 +++++++++++ config/config.go | 2 +- engine/alias.go | 69 ++++++++++++++++++++++++-- utils/cgrevent.go | 1 + 5 files changed, 126 insertions(+), 4 deletions(-) rename apier/v1/{aliasprofile.go => alias.go} (74%) diff --git a/apier/v1/aliasprofile.go b/apier/v1/alias.go similarity index 74% rename from apier/v1/aliasprofile.go rename to apier/v1/alias.go index d750ebd0d..9b1a638e1 100644 --- a/apier/v1/aliasprofile.go +++ b/apier/v1/alias.go @@ -66,3 +66,32 @@ func (apierV1 *ApierV1) RemAliasProfile(arg utils.TenantID, reply *string) error *reply = utils.OK return nil } + +func NewAliasSv1(alS *engine.AliasService) *AliasSv1 { + return &AliasSv1{alS: alS} +} + +// Exports RPC from RLs +type AliasSv1 struct { + alS *engine.AliasService +} + +// Call implements rpcclient.RpcClientConnection interface for internal RPC +func (alSv1 *AliasSv1) Call(serviceMethod string, + args interface{}, reply interface{}) error { + return utils.APIerRPCCall(alSv1, serviceMethod, args, reply) +} + +/* +// GetAliasProfileForEvent returns matching AliasProfile for Event +func (alSv1 *AliasSv1) GetAliasProfileForEvent(ev *utils.CGREvent, + reply *engine.ApierAliasProfile) error { + return alSv1.alS.V1GetSuppliers(args, reply) +} +*/ + +// ProcessEvent will replace event fields with the ones in maching AliasProfile +func (alSv1 *AliasSv1) ProcessEvent(ev *utils.CGREvent, + reply *string) error { + return alSv1.alS.V1ProcessEvent(ev, reply) +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 99822a1e4..e528b23a1 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -538,6 +538,30 @@ func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, dm * internalUserSChan <- userServer } +// startAliasService fires up the AliasS +func startAliasService(internalAliasSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, + dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { + filterS := <-filterSChan + filterSChan <- filterS + aS, err := engine.NewAliasService(dm, filterS, cfg.AliasSCfg().IndexedFields) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AliasS, err.Error())) + exitChan <- true + return + } + go func() { + if err := aS.ListenAndServe(exitChan); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.AliasS, err.Error())) + } + aS.Shutdown() + exitChan <- true + return + }() + aSv1 := v1.NewAliasSv1(aS) + server.RpcRegister(aSv1) + internalAliasSChan <- aSv1 +} + func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { var thdSConn *rpcclient.RpcClientPool @@ -867,6 +891,7 @@ func main() { internalUserSChan := make(chan rpcclient.RpcClientConnection, 1) internalAliaseSChan := make(chan rpcclient.RpcClientConnection, 1) internalSMGChan := make(chan *sessionmanager.SMGeneric, 1) + internalAliasSChan := make(chan rpcclient.RpcClientConnection, 1) internalRsChan := make(chan rpcclient.RpcClientConnection, 1) internalStatSChan := make(chan rpcclient.RpcClientConnection, 1) internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1) @@ -965,6 +990,10 @@ func main() { // Start FilterS go startFilterService(filterSChan, internalStatSChan, cfg, dm, exitChan) + if cfg.AliasSCfg().Enabled { + go startAliasService(internalAliasSChan, cfg, dm, server, exitChan, filterSChan) + } + // Start RL service if cfg.ResourceSCfg().Enabled { go startResourceService(internalRsChan, diff --git a/config/config.go b/config/config.go index 76b815c2c..d0a8d674c 100755 --- a/config/config.go +++ b/config/config.go @@ -1271,7 +1271,7 @@ func (self *CGRConfig) RadiusAgentCfg() *RadiusAgentCfg { return self.radiusAgentCfg } -func (cfg *CGRConfig) AiasSCfg() *AliasSCfg { +func (cfg *CGRConfig) AliasSCfg() *AliasSCfg { return cfg.aliasSCfg } diff --git a/engine/alias.go b/engine/alias.go index e01871bae..ccdbcbcc4 100644 --- a/engine/alias.go +++ b/engine/alias.go @@ -20,7 +20,11 @@ package engine import ( "fmt" + "sort" + "time" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" ) @@ -43,6 +47,14 @@ func (als *AliasProfile) TenantID() string { return utils.ConcatenatedKey(als.Tenant, als.ID) } +// AliasProfiles is a sortable list of Alias profiles +type AliasProfiles []*AliasProfile + +// Sort is part of sort interface, sort based on Weight +func (aps AliasProfiles) Sort() { + sort.Slice(aps, func(i, j int) bool { return aps[i].Weight > aps[j].Weight }) +} + func NewAliasService(dm *DataManager, filterS *FilterS, indexedFields []string) (*AliasService, error) { return &AliasService{dm: dm, filterS: filterS, indexedFields: indexedFields}, nil } @@ -55,7 +67,7 @@ type AliasService struct { // ListenAndServe will initialize the service func (alS *AliasService) ListenAndServe(exitChan chan bool) (err error) { - utils.Logger.Info("Starting Alias Service") + utils.Logger.Info("Starting Alias service") e := <-exitChan exitChan <- e // put back for the others listening for shutdown request return @@ -63,8 +75,59 @@ func (alS *AliasService) ListenAndServe(exitChan chan bool) (err error) { // Shutdown is called to shutdown the service func (alS *AliasService) Shutdown() (err error) { - utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.AliasS)) - utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.AliasS)) + utils.Logger.Info(fmt.Sprintf("<%s> shutdown initialized", utils.AliasS)) + utils.Logger.Info(fmt.Sprintf("<%s> shutdown complete", utils.AliasS)) + return +} + +// matchingSupplierProfilesForEvent returns ordered list of matching resources which are active by the time of the call +func (alS *AliasService) matchingAliasProfilesForEvent(ev *utils.CGREvent) (aPrfls AliasProfiles, err error) { + matchingAPs := make(map[string]*AliasProfile) + aPrflIDs, err := matchingItemIDsForEvent(ev.Event, alS.indexedFields, + alS.dm, utils.AliasProfilesStringIndex+ev.Tenant) + if err != nil { + return nil, err + } + lockIDs := utils.PrefixSliceItems(aPrflIDs.Slice(), utils.AliasProfilesStringIndex) + guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...) + defer guardian.Guardian.UnguardIDs(lockIDs...) + for apID := range aPrflIDs { + aPrfl, err := alS.dm.GetAliasProfile(ev.Tenant, apID, false, utils.NonTransactional) + if err != nil { + if err == utils.ErrNotFound { + continue + } + return nil, err + } + evTime := time.Now() + if ev.Time != nil { + evTime = *ev.Time + } + if aPrfl.ActivationInterval != nil && + !aPrfl.ActivationInterval.IsActiveAtTime(evTime) { // not active + continue + } + if pass, err := alS.filterS.PassFiltersForEvent(ev.Tenant, + ev.Event, aPrfl.FilterIDs); err != nil { + return nil, err + } else if !pass { + continue + } + matchingAPs[apID] = aPrfl + } + // All good, convert from Map to Slice so we can sort + aPrfls = make(AliasProfiles, len(matchingAPs)) + i := 0 + for _, aPrfl := range matchingAPs { + aPrfls[i] = aPrfl + i++ + } + aPrfls.Sort() + return +} + +func (alS *AliasService) V1ProcessEvent(ev *utils.CGREvent, + reply *string) (err error) { return } diff --git a/utils/cgrevent.go b/utils/cgrevent.go index eee4b05d7..a82ae6f1d 100644 --- a/utils/cgrevent.go +++ b/utils/cgrevent.go @@ -29,6 +29,7 @@ import ( type CGREvent struct { Tenant string ID string + Time *time.Time // event time Event map[string]interface{} }