Started adding tests for attributeS with dispatcheS

This commit is contained in:
Trial97
2019-02-08 18:04:52 +02:00
committed by Dan Christian Bogos
parent 7a82689226
commit 571247a522
8 changed files with 123 additions and 88 deletions

View File

@@ -126,7 +126,7 @@ func (alSv1 *AttributeSv1) ProcessEvent(args *engine.AttrArgsProcessEvent,
return alSv1.attrS.V1ProcessEvent(args, reply)
}
func (alSv1 *AttributeSv1) Ping(ign string, reply *string) error {
func (alSv1 *AttributeSv1) Ping(ign *utils.CGREvent, reply *string) error {
*reply = utils.Pong
return nil
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -178,7 +179,7 @@ func (dSup *DispatcherSupplierSv1) Ping(ign string, reply *string) error {
func (dSup *DispatcherSupplierSv1) GetSuppliers(args *dispatchers.ArgsGetSuppliersWithApiKey,
reply *engine.SortedSuppliers) error {
return dSup.dSup.SupplierSv1GetSuppliers(args, reply)
}
}*/
func NewDispatcherAttributeSv1(dps *dispatchers.DispatcherService) *DispatcherAttributeSv1 {
return &DispatcherAttributeSv1{dA: dps}
@@ -189,9 +190,15 @@ type DispatcherAttributeSv1 struct {
dA *dispatchers.DispatcherService
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
// func (alSv1 *DispatcherAttributeSv1) Call(serviceMethod string,
// args interface{}, reply interface{}) error {
// return utils.APIerRPCCall(alSv1, serviceMethod, args, reply)
// }
// Ping implements SupplierSv1Ping
func (dA *DispatcherAttributeSv1) Ping(ign string, reply *string) error {
return dA.dA.AttributeSv1Ping(ign, reply)
func (dA *DispatcherAttributeSv1) Ping(args *dispatchers.CGREvWithApiKey, reply *string) error {
return dA.dA.AttributeSv1Ping(args, reply)
}
// GetAttributeForEvent implements AttributeSv1GetAttributeForEvent
@@ -206,6 +213,7 @@ func (dA *DispatcherAttributeSv1) ProcessEvent(args *dispatchers.ArgsAttrProcess
return dA.dA.AttributeSv1ProcessEvent(args, reply)
}
/*
func NewDispatcherSessionSv1(dps *dispatchers.DispatcherService) *DispatcherSessionSv1 {
return &DispatcherSessionSv1{dS: dps}
}

View File

@@ -965,7 +965,7 @@ func loaderService(cacheS *engine.CacheS, cfg *config.CGRConfig,
}
// startDispatcherService fires up the DispatcherS
func startDispatcherService(internalDispatcherSChan,
func startDispatcherService(internalDispatcherSChan chan *dispatchers.DispatcherService,
intAttrSChan chan rpcclient.RpcClientConnection,
cfg *config.CGRConfig,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
@@ -1041,10 +1041,12 @@ func startDispatcherService(internalDispatcherSChan,
server.RpcRegisterName(utils.SupplierSv1,
v1.NewDispatcherSupplierSv1(dspS))
}
if !cfg.AttributeSCfg().Enabled && len(cfg.DispatcherSCfg().AttrSConns) != 0 {
server.RpcRegisterName(utils.AttributeSv1,
v1.NewDispatcherAttributeSv1(dspS))
}
*/
// if !cfg.AttributeSCfg().Enabled { //dispatcer enable all methos
attrv1 := v1.NewDispatcherAttributeSv1(dspS)
server.RpcRegisterName(utils.AttributeSv1, attrv1)
// }
/*
if !cfg.SessionSCfg().Enabled && len(cfg.DispatcherSCfg().SessionSConns) != 0 {
server.RpcRegisterName(utils.SessionSv1,
v1.NewDispatcherSessionSv1(dspS))
@@ -1054,6 +1056,7 @@ func startDispatcherService(internalDispatcherSChan,
v1.NewDispatcherChargerSv1(dspS))
}
*/
internalDispatcherSChan <- dspS
}
// startAnalyzerService fires up the AnalyzerS
@@ -1084,8 +1087,8 @@ func startRpc(server *utils.Server, internalRaterChan,
internalCdrSChan, internalPubSubSChan, internalUserSChan,
internalAliaseSChan, internalRsChan, internalStatSChan,
internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan,
internalSMGChan, internalDispatcherSChan, internalAnalyzerSChan chan rpcclient.RpcClientConnection,
exitChan chan bool) {
internalSMGChan, internalAnalyzerSChan chan rpcclient.RpcClientConnection,
internalDispatcherSChan chan *dispatchers.DispatcherService, exitChan chan bool) {
select { // Any of the rpc methods will unlock listening to rpc requests
case resp := <-internalRaterChan:
internalRaterChan <- resp
@@ -1336,7 +1339,7 @@ func main() {
if cfg.RalsCfg().RALsEnabled || cfg.PubSubServerEnabled ||
cfg.AliasesServerEnabled || cfg.UserServerEnabled || cfg.SchedulerCfg().Enabled ||
cfg.AttributeSCfg().Enabled || cfg.ResourceSCfg().Enabled || cfg.StatSCfg().Enabled ||
cfg.ThresholdSCfg().Enabled || cfg.SupplierSCfg().Enabled { // Some services can run without db, ie: SessionS or CDRC
cfg.ThresholdSCfg().Enabled || cfg.SupplierSCfg().Enabled || cfg.DispatcherSCfg().Enabled { // Some services can run without db, ie: SessionS or CDRC
dm, err = engine.ConfigureDataStorage(cfg.DataDbCfg().DataDbType,
cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort,
cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser,
@@ -1412,7 +1415,7 @@ func main() {
internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1)
internalSupplierSChan := make(chan rpcclient.RpcClientConnection, 1)
filterSChan := make(chan *engine.FilterS, 1)
internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1)
internalDispatcherSChan := make(chan *dispatchers.DispatcherService, 1)
internalAnalyzerSChan := make(chan rpcclient.RpcClientConnection, 1)
// Start ServiceManager
@@ -1546,7 +1549,7 @@ func main() {
internalStatSChan,
internalAttributeSChan, internalChargerSChan, internalThresholdSChan,
internalSupplierSChan,
internalSMGChan, internalDispatcherSChan, internalAnalyzerSChan, exitChan)
internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan, exitChan)
<-exitChan
if *cpuProfDir != "" { // wait to end cpuProfiling

View File

@@ -701,6 +701,9 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
if self.dispatcherSCfg.Enabled {
if self.attributeSCfg.Enabled {
return fmt.Errorf("<%s> cannot start in tandem with <%s>", utils.DispatcherS, utils.AttributeS)
}
if len(self.dispatcherSCfg.Conns) == 0 {
return fmt.Errorf("<%s> no connections defined", utils.DispatcherS)
}

View File

@@ -24,17 +24,17 @@ import (
)
// AttributeSv1Ping interogates AttributeS server responsible to process the event
func (dS *DispatcherService) AttributeSv1Ping(args *ArgsAttrProcessEventWithApiKey,
func (dS *DispatcherService) AttributeSv1Ping(args *CGREvWithApiKey,
reply *string) (err error) {
if dS.attrS != nil {
if err = dS.authorize(utils.AttributeSv1Ping,
args.AttrArgsProcessEvent.CGREvent.Tenant,
args.APIKey, args.AttrArgsProcessEvent.CGREvent.Time); err != nil {
args.CGREvent.Tenant,
args.APIKey, args.CGREvent.Time); err != nil {
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaAttributes,
utils.AttributeSv1Ping, args.AttrArgsProcessEvent, reply)
utils.AttributeSv1Ping, args.CGREvent, reply)
}
// AttributeSv1GetAttributeForEvent is the dispatcher method for AttributeSv1.GetAttributeForEvent
@@ -52,17 +52,16 @@ func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(args *ArgsAttrProc
}
/*
func (dS *DispatcherService) AttributeSv1ProcessEvent(args *ArgsAttrProcessEventWithApiKey,
reply *engine.AttrSProcessEventReply) (err error) {
if dS.attrS == nil {
return utils.NewErrNotConnected(utils.AttributeS)
}
if err = dS.authorize(utils.AttributeSv1ProcessEvent, args.AttrArgsProcessEvent.CGREvent.Tenant,
args.APIKey, args.AttrArgsProcessEvent.CGREvent.Time); err != nil {
return
}
return dS.attrS.Call(utils.AttributeSv1ProcessEvent, args.AttrArgsProcessEvent, reply)
if dS.attrS != nil {
if err = dS.authorize(utils.AttributeSv1ProcessEvent,
args.AttrArgsProcessEvent.CGREvent.Tenant,
args.APIKey, args.AttrArgsProcessEvent.CGREvent.Time); err != nil {
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaAttributes,
utils.AttributeSv1ProcessEvent, args.AttrArgsProcessEvent, reply)
}
*/

View File

@@ -46,17 +46,17 @@ var sTestsDspAttr = []func(t *testing.T){
testDspAttrInitCfg,
testDspAttrInitDataDb,
testDspAttrResetStorDb,
testDspAttrStartEngine,
// testDspAttrStartEngine,
testDspAttrRPCConn,
testDspAttrPing,
testDspAttrLoadData,
testDspAttrPing,
testDspAttrAddAttributesWithPermision,
testDspAttrTestMissingApiKey,
testDspAttrTestUnknownApiKey,
testDspAttrTestAuthKey,
testDspAttrAddAttributesWithPermision2,
testDspAttrTestAuthKey2,
testDspAttrKillEngine,
// testDspAttrKillEngine,
}
//Test start here
@@ -68,7 +68,7 @@ func TestDspAttributeS(t *testing.T) {
func testDspAttrInitCfg(t *testing.T) {
var err error
dspAttrCfgPath = path.Join(dspDataDir, "conf", "samples", "dispatcher")
dspAttrCfgPath = path.Join(dspDataDir, "conf", "samples", "dispatchers")
dspAttrCfg, err = config.NewCGRConfigFromFolder(dspAttrCfgPath)
if err != nil {
t.Error(err)
@@ -118,33 +118,40 @@ func testDspAttrRPCConn(t *testing.T) {
if err != nil {
t.Fatal(err)
}
}
func testDspAttrPing(t *testing.T) {
var reply string
if err := instAttrRPC.Call(utils.AttributeSv1Ping, "", &reply); err != nil {
t.Error(err)
} else if reply != utils.Pong {
t.Errorf("Received: %s", reply)
}
if err := dspAttrRPC.Call(utils.AttributeSv1Ping, "", &reply); err != nil {
t.Error(err)
} else if reply != utils.Pong {
t.Errorf("Received: %s", reply)
}
}
func testDspAttrLoadData(t *testing.T) {
var reply string
attrs := &utils.AttrLoadTpFromFolder{
FolderPath: path.Join(dspDataDir, "tariffplans", "tutorial")}
FolderPath: path.Join(dspDataDir, "tariffplans", "dispatchers")}
if err := instAttrRPC.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
t.Error(err)
}
time.Sleep(500 * time.Millisecond)
}
func testDspAttrPing(t *testing.T) {
var reply string
if err := instAttrRPC.Call(utils.AttributeSv1Ping, &utils.CGREvent{}, &reply); err != nil {
t.Error(err)
} else if reply != utils.Pong {
t.Errorf("Received: %s", reply)
}
if dspAttrRPC == nil {
t.Fatal(dspAttrRPC)
}
if err := dspAttrRPC.Call(utils.AttributeSv1Ping, &CGREvWithApiKey{
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
},
APIKey: "attr12345",
}, &reply); err != nil {
t.Error(err)
} else if reply != utils.Pong {
t.Errorf("Received: %s", reply)
}
}
func testDspAttrAddAttributesWithPermision(t *testing.T) {
alsPrf := &engine.AttributeProfile{
Tenant: "cgrates.org",
@@ -197,7 +204,7 @@ func testDspAttrTestMissingApiKey(t *testing.T) {
var attrReply *engine.AttributeProfile
if err := dspAttrRPC.Call(utils.AttributeSv1GetAttributeForEvent,
args, &attrReply); err == nil || err.Error() != utils.NewErrMandatoryIeMissing(utils.APIKey).Error() {
t.Error(err)
t.Errorf("Error:%v rply=%s", err, utils.ToJSON(attrReply))
}
}
@@ -271,7 +278,9 @@ func testDspAttrAddAttributesWithPermision2(t *testing.T) {
&utils.TenantID{Tenant: "cgrates.org", ID: "AuthKey"}, &reply); err != nil {
t.Error(err)
}
reply.Compile()
if reply != nil {
reply.Compile()
}
if !reflect.DeepEqual(alsPrf, reply) {
t.Errorf("Expecting : %+v, received: %+v", alsPrf, reply)
}
@@ -310,7 +319,9 @@ func testDspAttrTestAuthKey2(t *testing.T) {
args, &attrReply); err != nil {
t.Error(err)
}
attrReply.Compile()
if attrReply != nil {
attrReply.Compile()
}
if !reflect.DeepEqual(eAttrPrf, attrReply) {
t.Errorf("Expecting: %s, received: %s", utils.ToJSON(eAttrPrf), utils.ToJSON(attrReply))
}

View File

@@ -76,14 +76,25 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
if subsys != "" {
idxKeyPrfx = utils.ConcatenatedKey(ev.Tenant, subsys)
}
matchingPrfls := make(map[string]*engine.DispatcherProfile)
var matchedPrlf *engine.DispatcherProfile
prflIDs, err := engine.MatchingItemIDsForEvent(ev.Event,
dS.cfg.DispatcherSCfg().StringIndexedFields,
dS.cfg.DispatcherSCfg().PrefixIndexedFields,
dS.dm, utils.CacheDispatcherFilterIndexes,
idxKeyPrfx, dS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
return nil, err
// return nil, err
if err != utils.ErrNotFound {
return nil, err
}
prflIDs, err = engine.MatchingItemIDsForEvent(ev.Event,
dS.cfg.DispatcherSCfg().StringIndexedFields,
dS.cfg.DispatcherSCfg().PrefixIndexedFields,
dS.dm, utils.CacheDispatcherFilterIndexes,
anyIdxPrfx, dS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
return nil, err
}
}
for prflID := range prflIDs {
prfl, err := dS.dm.GetDispatcherProfile(ev.Tenant, prflID, true, true, utils.NonTransactional)
@@ -91,17 +102,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
if err != utils.ErrNotFound {
return nil, err
}
if idxKeyPrfx == anyIdxPrfx {
continue // already checked *any
}
// check *any as subsystem
if prfl, err = dS.dm.GetDispatcherProfile(ev.Tenant, prflID, true, true, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
continue
}
return nil, err
}
continue
}
if prfl.ActivationInterval != nil && ev.Time != nil &&
!prfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active
@@ -113,20 +114,13 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
} else if !pass {
continue
}
matchingPrfls[prflID] = prfl
if matchedPrlf == nil || prfl.Weight > matchedPrlf.Weight {
matchedPrlf = prfl
}
}
if len(matchingPrfls) == 0 {
if matchedPrlf == nil {
return nil, utils.ErrNotFound
}
// All good, convert from Map to Slice so we can sort
prfls := make(engine.DispatcherProfiles, len(matchingPrfls))
i := 0
for _, prfl := range matchingPrfls {
prfls[i] = prfl
i++
}
prfls.Sort()
matchedPrlf := prfls[0] // only use the first profile
tntID := matchedPrlf.TenantID()
// get or build the Dispatcher for the config
if x, ok := engine.Cache.Get(utils.CacheDispatchers,
@@ -166,17 +160,19 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
connID := d.NextConnID()
conn, has := dS.conns[connID]
if !has {
utils.NewErrDispatcherS(
err = utils.NewErrDispatcherS(
fmt.Errorf("no connection with id: <%s>", connID))
continue
}
if err = conn.Call(serviceMethod, args, reply); !utils.IsNetworkError(err) {
break
if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
continue
}
if ev.RouteID != nil &&
*ev.RouteID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, *ev.RouteID, connID,
nil, true, utils.EmptyString)
}
break
}
return
}

View File

@@ -1319,20 +1319,30 @@ func (dm *DataManager) SetDispatcherProfile(dpp *DispatcherProfile, withIndex bo
}
if withIndex {
if oldDpp != nil {
var needsRemove bool
for _, fltrID := range oldDpp.FilterIDs {
if !utils.IsSliceMember(dpp.FilterIDs, fltrID) {
for _, ctx := range oldDpp.Subsystems {
var needsRemove bool
if !utils.IsSliceMember(dpp.Subsystems, ctx) {
needsRemove = true
} else {
for _, fltrID := range oldDpp.FilterIDs {
if !utils.IsSliceMember(dpp.FilterIDs, fltrID) {
needsRemove = true
}
}
}
}
if needsRemove {
if err = NewFilterIndexer(dm, utils.DispatcherProfilePrefix,
dpp.Tenant).RemoveItemFromIndex(dpp.Tenant, dpp.ID, oldDpp.FilterIDs); err != nil {
return
if needsRemove {
if err = NewFilterIndexer(dm, utils.DispatcherProfilePrefix,
utils.ConcatenatedKey(dpp.Tenant, ctx)).RemoveItemFromIndex(dpp.Tenant, dpp.ID, oldDpp.FilterIDs); err != nil {
return
}
}
}
}
return createAndIndex(utils.DispatcherProfilePrefix, dpp.Tenant, utils.EmptyString, dpp.ID, dpp.FilterIDs, dm)
for _, ctx := range dpp.Subsystems {
if err = createAndIndex(utils.DispatcherProfilePrefix, dpp.Tenant, ctx, dpp.ID, dpp.FilterIDs, dm); err != nil {
return
}
}
}
return
}
@@ -1352,7 +1362,12 @@ func (dm *DataManager) RemoveDispatcherProfile(tenant, id string,
return utils.ErrNotFound
}
if withIndex {
return NewFilterIndexer(dm, utils.DispatcherProfilePrefix, tenant).RemoveItemFromIndex(tenant, id, oldDpp.FilterIDs)
for _, ctx := range oldDpp.Subsystems {
if err = NewFilterIndexer(dm, utils.DispatcherProfilePrefix,
utils.ConcatenatedKey(tenant, ctx)).RemoveItemFromIndex(tenant, id, oldDpp.FilterIDs); err != nil {
return
}
}
}
return
}