Engine starting Alias service, AliasSv1 in apier

This commit is contained in:
DanB
2017-12-06 16:26:58 +01:00
parent 40ac3d9100
commit e02cc76d52
5 changed files with 126 additions and 4 deletions

View File

@@ -66,3 +66,32 @@ func (apierV1 *ApierV1) RemAliasProfile(arg utils.TenantID, reply *string) error
*reply = utils.OK
return nil
}
func NewAliasSv1(alS *engine.AliasService) *AliasSv1 {
return &AliasSv1{alS: alS}
}
// Exports RPC from RLs
type AliasSv1 struct {
alS *engine.AliasService
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
func (alSv1 *AliasSv1) Call(serviceMethod string,
args interface{}, reply interface{}) error {
return utils.APIerRPCCall(alSv1, serviceMethod, args, reply)
}
/*
// GetAliasProfileForEvent returns matching AliasProfile for Event
func (alSv1 *AliasSv1) GetAliasProfileForEvent(ev *utils.CGREvent,
reply *engine.ApierAliasProfile) error {
return alSv1.alS.V1GetSuppliers(args, reply)
}
*/
// ProcessEvent will replace event fields with the ones in maching AliasProfile
func (alSv1 *AliasSv1) ProcessEvent(ev *utils.CGREvent,
reply *string) error {
return alSv1.alS.V1ProcessEvent(ev, reply)
}

View File

@@ -538,6 +538,30 @@ func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, dm *
internalUserSChan <- userServer
}
// startAliasService fires up the AliasS
func startAliasService(internalAliasSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
filterS := <-filterSChan
filterSChan <- filterS
aS, err := engine.NewAliasService(dm, filterS, cfg.AliasSCfg().IndexedFields)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AliasS, err.Error()))
exitChan <- true
return
}
go func() {
if err := aS.ListenAndServe(exitChan); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.AliasS, err.Error()))
}
aS.Shutdown()
exitChan <- true
return
}()
aSv1 := v1.NewAliasSv1(aS)
server.RpcRegister(aSv1)
internalAliasSChan <- aSv1
}
func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
var thdSConn *rpcclient.RpcClientPool
@@ -867,6 +891,7 @@ func main() {
internalUserSChan := make(chan rpcclient.RpcClientConnection, 1)
internalAliaseSChan := make(chan rpcclient.RpcClientConnection, 1)
internalSMGChan := make(chan *sessionmanager.SMGeneric, 1)
internalAliasSChan := make(chan rpcclient.RpcClientConnection, 1)
internalRsChan := make(chan rpcclient.RpcClientConnection, 1)
internalStatSChan := make(chan rpcclient.RpcClientConnection, 1)
internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1)
@@ -965,6 +990,10 @@ func main() {
// Start FilterS
go startFilterService(filterSChan, internalStatSChan, cfg, dm, exitChan)
if cfg.AliasSCfg().Enabled {
go startAliasService(internalAliasSChan, cfg, dm, server, exitChan, filterSChan)
}
// Start RL service
if cfg.ResourceSCfg().Enabled {
go startResourceService(internalRsChan,

View File

@@ -1271,7 +1271,7 @@ func (self *CGRConfig) RadiusAgentCfg() *RadiusAgentCfg {
return self.radiusAgentCfg
}
func (cfg *CGRConfig) AiasSCfg() *AliasSCfg {
func (cfg *CGRConfig) AliasSCfg() *AliasSCfg {
return cfg.aliasSCfg
}

View File

@@ -20,7 +20,11 @@ package engine
import (
"fmt"
"sort"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
@@ -43,6 +47,14 @@ func (als *AliasProfile) TenantID() string {
return utils.ConcatenatedKey(als.Tenant, als.ID)
}
// AliasProfiles is a sortable list of Alias profiles
type AliasProfiles []*AliasProfile
// Sort is part of sort interface, sort based on Weight
func (aps AliasProfiles) Sort() {
sort.Slice(aps, func(i, j int) bool { return aps[i].Weight > aps[j].Weight })
}
func NewAliasService(dm *DataManager, filterS *FilterS, indexedFields []string) (*AliasService, error) {
return &AliasService{dm: dm, filterS: filterS, indexedFields: indexedFields}, nil
}
@@ -55,7 +67,7 @@ type AliasService struct {
// ListenAndServe will initialize the service
func (alS *AliasService) ListenAndServe(exitChan chan bool) (err error) {
utils.Logger.Info("Starting Alias Service")
utils.Logger.Info("Starting Alias service")
e := <-exitChan
exitChan <- e // put back for the others listening for shutdown request
return
@@ -63,8 +75,59 @@ func (alS *AliasService) ListenAndServe(exitChan chan bool) (err error) {
// Shutdown is called to shutdown the service
func (alS *AliasService) Shutdown() (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.AliasS))
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.AliasS))
utils.Logger.Info(fmt.Sprintf("<%s> shutdown initialized", utils.AliasS))
utils.Logger.Info(fmt.Sprintf("<%s> shutdown complete", utils.AliasS))
return
}
// matchingSupplierProfilesForEvent returns ordered list of matching resources which are active by the time of the call
func (alS *AliasService) matchingAliasProfilesForEvent(ev *utils.CGREvent) (aPrfls AliasProfiles, err error) {
matchingAPs := make(map[string]*AliasProfile)
aPrflIDs, err := matchingItemIDsForEvent(ev.Event, alS.indexedFields,
alS.dm, utils.AliasProfilesStringIndex+ev.Tenant)
if err != nil {
return nil, err
}
lockIDs := utils.PrefixSliceItems(aPrflIDs.Slice(), utils.AliasProfilesStringIndex)
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
for apID := range aPrflIDs {
aPrfl, err := alS.dm.GetAliasProfile(ev.Tenant, apID, false, utils.NonTransactional)
if err != nil {
if err == utils.ErrNotFound {
continue
}
return nil, err
}
evTime := time.Now()
if ev.Time != nil {
evTime = *ev.Time
}
if aPrfl.ActivationInterval != nil &&
!aPrfl.ActivationInterval.IsActiveAtTime(evTime) { // not active
continue
}
if pass, err := alS.filterS.PassFiltersForEvent(ev.Tenant,
ev.Event, aPrfl.FilterIDs); err != nil {
return nil, err
} else if !pass {
continue
}
matchingAPs[apID] = aPrfl
}
// All good, convert from Map to Slice so we can sort
aPrfls = make(AliasProfiles, len(matchingAPs))
i := 0
for _, aPrfl := range matchingAPs {
aPrfls[i] = aPrfl
i++
}
aPrfls.Sort()
return
}
func (alS *AliasService) V1ProcessEvent(ev *utils.CGREvent,
reply *string) (err error) {
return
}

View File

@@ -29,6 +29,7 @@ import (
type CGREvent struct {
Tenant string
ID string
Time *time.Time // event time
Event map[string]interface{}
}