Added some fixes for ditpatcher

This commit is contained in:
Trial97
2019-02-12 16:10:44 +02:00
committed by Dan Christian Bogos
parent bf6b801372
commit aac1435234
19 changed files with 104 additions and 78 deletions

View File

@@ -40,7 +40,7 @@ func (aSv1 *AnalyzerSv1) Call(serviceMethod string,
}
// Ping return pong if the service is active
func (alSv1 *AnalyzerSv1) Ping(ign string, reply *string) error {
func (alSv1 *AnalyzerSv1) Ping(ign *utils.CGREvent, reply *string) error {
*reply = utils.Pong
return nil
}

View File

@@ -94,7 +94,7 @@ func (chSv1 *CacheSv1) RemoveGroup(args *engine.ArgsGetGroup,
return chSv1.cacheS.V1RemoveGroup(args, rply)
}
func (chSv1 *CacheSv1) Ping(ign string, reply *string) error {
func (chSv1 *CacheSv1) Ping(ign *utils.CGREvent, reply *string) error {
*reply = utils.Pong
return nil
}

View File

@@ -43,7 +43,7 @@ func (ldrSv1 *LoaderSv1) Load(args *loaders.ArgsProcessFolder,
return ldrSv1.ldrS.V1Load(args, rply)
}
func (rsv1 *LoaderSv1) Ping(ign string, reply *string) error {
func (rsv1 *LoaderSv1) Ping(ign *utils.CGREvent, reply *string) error {
*reply = utils.Pong
return nil
}

View File

@@ -106,8 +106,8 @@ func (stsv1 *StatSv1) Call(serviceMethod string, args interface{}, reply interfa
}
// GetQueueIDs returns list of queueIDs registered for a tenant
func (stsv1 *StatSv1) GetQueueIDs(tenant string, qIDs *[]string) error {
return stsv1.sS.V1GetQueueIDs(tenant, qIDs)
func (stsv1 *StatSv1) GetQueueIDs(tenant *utils.TenantArg, qIDs *[]string) error {
return stsv1.sS.V1GetQueueIDs(tenant.Tenant, qIDs)
}
// ProcessEvent returns processes a new Event

View File

@@ -142,7 +142,7 @@ func testV1STSFromFolder(t *testing.T) {
func testV1STSGetStats(t *testing.T) {
var reply []string
expectedIDs := []string{"Stats1"}
if err := stsV1Rpc.Call(utils.StatSv1GetQueueIDs, "cgrates.org", &reply); err != nil {
if err := stsV1Rpc.Call(utils.StatSv1GetQueueIDs, &utils.TenantArg{"cgrates.org"}, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expectedIDs, reply) {
t.Errorf("expecting: %+v, received reply: %s", expectedIDs, reply)

View File

@@ -39,8 +39,8 @@ func (tSv1 *ThresholdSv1) Call(serviceMethod string, args interface{}, reply int
}
// GetThresholdIDs returns list of threshold IDs registered for a tenant
func (tSv1 *ThresholdSv1) GetThresholdIDs(tenant string, tIDs *[]string) error {
return tSv1.tS.V1GetThresholdIDs(tenant, tIDs)
func (tSv1 *ThresholdSv1) GetThresholdIDs(tenant *utils.TenantArg, tIDs *[]string) error {
return tSv1.tS.V1GetThresholdIDs(tenant.Tenant, tIDs)
}
// GetThresholdsForEvent returns a list of thresholds matching an event

View File

@@ -229,7 +229,7 @@ func testV1TSFromFolder(t *testing.T) {
func testV1TSGetThresholds(t *testing.T) {
var tIDs []string
expectedIDs := []string{"THD_RES_1", "THD_STATS_2", "THD_STATS_1", "THD_ACNT_BALANCE_1", "THD_ACNT_EXPIRED", "THD_STATS_3", "THD_CDRS_1"}
if err := tSv1Rpc.Call(utils.ThresholdSv1GetThresholdIDs, "cgrates.org", &tIDs); err != nil {
if err := tSv1Rpc.Call(utils.ThresholdSv1GetThresholdIDs, &utils.TenantArg{Tenant: "cgrates.org"}, &tIDs); err != nil {
t.Error(err)
} else if len(expectedIDs) != len(tIDs) {
t.Errorf("expecting: %+v, received reply: %s", expectedIDs, tIDs)
@@ -303,7 +303,7 @@ func testV1TSProcessEvent(t *testing.T) {
func testV1TSGetThresholdsAfterProcess(t *testing.T) {
var tIDs []string
expectedIDs := []string{"THD_RES_1", "THD_STATS_2", "THD_STATS_1", "THD_ACNT_BALANCE_1", "THD_ACNT_EXPIRED"}
if err := tSv1Rpc.Call(utils.ThresholdSv1GetThresholdIDs, "cgrates.org", &tIDs); err != nil {
if err := tSv1Rpc.Call(utils.ThresholdSv1GetThresholdIDs, &utils.TenantArg{Tenant: "cgrates.org"}, &tIDs); err != nil {
t.Error(err)
} else if len(expectedIDs) != len(tIDs) { // THD_STATS_3 is not reccurent, so it was removed
t.Errorf("expecting: %+v, received reply: %s", expectedIDs, tIDs)

View File

@@ -33,7 +33,7 @@ func (dS *DispatcherService) AttributeSv1Ping(args *CGREvWithApiKey,
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaAttributes,
return dS.Dispatch(&args.CGREvent, utils.MetaAttributes, args.RouteID,
utils.AttributeSv1Ping, args.CGREvent, reply)
}
@@ -47,7 +47,7 @@ func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(args *ArgsAttrProc
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaAttributes,
return dS.Dispatch(&args.CGREvent, utils.MetaAttributes, args.RouteID,
utils.AttributeSv1GetAttributeForEvent, args.AttrArgsProcessEvent, reply)
}
@@ -61,6 +61,6 @@ func (dS *DispatcherService) AttributeSv1ProcessEvent(args *ArgsAttrProcessEvent
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaAttributes,
return dS.Dispatch(&args.CGREvent, utils.MetaAttributes, args.RouteID,
utils.AttributeSv1ProcessEvent, args.AttrArgsProcessEvent, reply)
}

View File

@@ -35,15 +35,34 @@ import (
"github.com/cgrates/cgrates/utils"
)
type testDipatcer struct {
var (
attrEngine *testDispatcher
dispEngine *testDispatcher
allEngine *testDispatcher
allEngine2 *testDispatcher
)
var sTestsDspAttr = []func(t *testing.T){
testDspAttrPingFailover,
testDspAttrGetAttrFailover,
testDspAttrPing,
testDspAttrTestMissingApiKey,
testDspAttrTestUnknownApiKey,
testDspAttrTestAuthKey,
testDspAttrTestAuthKey2,
testDspAttrTestAuthKey3,
}
type testDispatcher struct {
CfgParh string
Cfg *config.CGRConfig
RCP *rpc.Client
cmd *exec.Cmd
}
func newTestEngine(t *testing.T, cfgPath string, initDataDB, intitStoreDB bool) (d *testDipatcer) {
d = new(testDipatcer)
func newTestEngine(t *testing.T, cfgPath string, initDataDB, intitStoreDB bool) (d *testDispatcher) {
d = new(testDispatcher)
d.CfgParh = cfgPath
var err error
d.Cfg, err = config.NewCGRConfigFromFolder(d.CfgParh)
@@ -63,7 +82,7 @@ func newTestEngine(t *testing.T, cfgPath string, initDataDB, intitStoreDB bool)
return d
}
func (d *testDipatcer) startEngine(t *testing.T) {
func (d *testDispatcher) startEngine(t *testing.T) {
var err error
if d.cmd, err = engine.StartEngine(d.CfgParh, dspDelay); err != nil {
t.Fatalf("Error at engine start:%v\n", err)
@@ -74,7 +93,7 @@ func (d *testDipatcer) startEngine(t *testing.T) {
}
}
func (d *testDipatcer) stopEngine(t *testing.T) {
func (d *testDispatcher) stopEngine(t *testing.T) {
pid := strconv.Itoa(d.cmd.Process.Pid)
if err := exec.Command("kill", "-9", pid).Run(); err != nil {
t.Fatalf("Error at stop engine:%v\n", err)
@@ -84,19 +103,19 @@ func (d *testDipatcer) stopEngine(t *testing.T) {
// }
}
func (d *testDipatcer) initDataDb(t *testing.T) {
func (d *testDispatcher) initDataDb(t *testing.T) {
if err := engine.InitDataDb(d.Cfg); err != nil {
t.Fatalf("Error at DataDB init:%v\n", err)
}
}
// Wipe out the cdr database
func (d *testDipatcer) resetStorDb(t *testing.T) {
func (d *testDispatcher) resetStorDb(t *testing.T) {
if err := engine.InitStorDb(d.Cfg); err != nil {
t.Fatalf("Error at DataDB init:%v\n", err)
}
}
func (d *testDipatcer) loadData(t *testing.T, path string) {
func (d *testDispatcher) loadData(t *testing.T, path string) {
var reply string
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path}
if err := d.RCP.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
@@ -104,25 +123,6 @@ func (d *testDipatcer) loadData(t *testing.T, path string) {
}
}
var (
attrEngine *testDipatcer
dispEngine *testDipatcer
allEngine *testDipatcer
allEngine2 *testDipatcer
)
var sTestsDspAttr = []func(t *testing.T){
testDspAttrPingFailover,
testDspAttrGetAttrFailover,
testDspAttrPing,
testDspAttrTestMissingApiKey,
testDspAttrTestUnknownApiKey,
testDspAttrTestAuthKey,
testDspAttrTestAuthKey2,
testDspAttrTestAuthKey3,
}
//Test start here
func TestDspAttributeS(t *testing.T) {
allEngine = newTestEngine(t, path.Join(dspDataDir, "conf", "samples", "dispatchers", "all"), true, true)

View File

@@ -31,7 +31,7 @@ func (dS *DispatcherService) ChargerSv1Ping(args *CGREvWithApiKey, reply *string
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaChargers,
return dS.Dispatch(&args.CGREvent, utils.MetaChargers, args.RouteID,
utils.ChargerSv1Ping, args.CGREvent, reply)
}
@@ -44,7 +44,7 @@ func (dS *DispatcherService) ChargerSv1GetChargersForEvent(args *CGREvWithApiKey
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaChargers,
return dS.Dispatch(&args.CGREvent, utils.MetaChargers, args.RouteID,
utils.ChargerSv1GetChargersForEvent, args.CGREvent, reply)
}
@@ -58,6 +58,6 @@ func (dS *DispatcherService) ChargerSv1ProcessEvent(args *CGREvWithApiKey,
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaChargers,
return dS.Dispatch(&args.CGREvent, utils.MetaChargers, args.RouteID,
utils.ChargerSv1ProcessEvent, args.CGREvent, reply)
}

View File

@@ -138,18 +138,18 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
}
// Dispatch is the method forwarding the request towards the right
func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, RouteID *string,
serviceMethod string, args interface{}, reply interface{}) (err error) {
d, errDsp := dS.dispatcherForEvent(ev, subsys)
if errDsp != nil {
return utils.NewErrDispatcherS(errDsp)
}
var connID string
if ev.RouteID != nil &&
*ev.RouteID != "" {
if RouteID != nil &&
*RouteID != "" {
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*ev.RouteID); ok && x != nil {
*RouteID); ok && x != nil {
connID = x.(string)
if err = dS.conns[connID].Call(serviceMethod, args, reply); !utils.IsNetworkError(err) {
return
@@ -167,9 +167,9 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
if err = conn.Call(serviceMethod, args, reply); utils.IsNetworkError(err) {
continue
}
if ev.RouteID != nil &&
*ev.RouteID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, *ev.RouteID, connID,
if RouteID != nil &&
*RouteID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, *RouteID, connID,
nil, true, utils.EmptyString)
}
break

View File

@@ -31,7 +31,7 @@ func (dS *DispatcherService) ResourceSv1Ping(args *CGREvWithApiKey, rpl *string)
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaResources,
return dS.Dispatch(&args.CGREvent, utils.MetaResources, args.RouteID,
utils.ResourceSv1Ping, args.CGREvent, rpl)
}
@@ -45,7 +45,7 @@ func (dS *DispatcherService) ResourceSv1GetResourcesForEvent(args *ArgsV1ResUsag
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaResources,
return dS.Dispatch(&args.CGREvent, utils.MetaResources, args.RouteID,
utils.ResourceSv1GetResourcesForEvent, args.ArgRSv1ResourceUsage, reply)
}

View File

@@ -32,7 +32,7 @@ func (dS *DispatcherService) StatSv1Ping(args *CGREvWithApiKey, reply *string) (
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaStats,
return dS.Dispatch(&args.CGREvent, utils.MetaStats, args.RouteID,
utils.StatSv1Ping, args.CGREvent, reply)
}
@@ -45,7 +45,7 @@ func (dS *DispatcherService) StatSv1GetStatQueuesForEvent(args *ArgsStatProcessE
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaStats,
return dS.Dispatch(&args.CGREvent, utils.MetaStats, args.RouteID,
utils.StatSv1GetStatQueuesForEvent, args.StatsArgsProcessEvent, reply)
}
@@ -61,7 +61,7 @@ func (dS *DispatcherService) StatSv1GetQueueStringMetrics(args *TntIDWithApiKey,
return dS.Dispatch(&utils.CGREvent{
Tenant: args.Tenant,
ID: args.ID,
}, utils.MetaStats, utils.StatSv1GetQueueStringMetrics,
}, utils.MetaStats, args.RouteID, utils.StatSv1GetQueueStringMetrics,
args.TenantID, reply)
}
@@ -74,6 +74,6 @@ func (dS *DispatcherService) StatSv1ProcessEvent(args *ArgsStatProcessEventWithA
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaStats,
return dS.Dispatch(&args.CGREvent, utils.MetaStats, args.RouteID,
utils.StatSv1ProcessEvent, args.StatsArgsProcessEvent, reply)
}

View File

@@ -31,7 +31,7 @@ func (dS *DispatcherService) SupplierSv1Ping(args *CGREvWithApiKey, reply *strin
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaSuppliers,
return dS.Dispatch(&args.CGREvent, utils.MetaSuppliers, args.RouteID,
utils.SupplierSv1Ping, args.CGREvent, reply)
}
@@ -44,6 +44,6 @@ func (dS *DispatcherService) SupplierSv1GetSuppliers(args *ArgsGetSuppliersWithA
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaSuppliers,
return dS.Dispatch(&args.CGREvent, utils.MetaSuppliers, args.RouteID,
utils.SupplierSv1GetSuppliers, args.ArgsGetSuppliers, reply)
}

View File

@@ -31,6 +31,9 @@ import (
)
var sTestsDspSup = []func(t *testing.T){
testDspSupPingFailover,
testDspSupGetSupFailover,
testDspSupPing,
testDspSupTestAuthKey,
testDspSupTestAuthKey2,
@@ -108,6 +111,19 @@ func testDspSupPingFailover(t *testing.T) {
func testDspSupGetSupFailover(t *testing.T) {
var rpl *engine.SortedSuppliers
eRpl1 := &engine.SortedSuppliers{
ProfileID: "SPL_WEIGHT_2",
Sorting: utils.MetaWeight,
SortedSuppliers: []*engine.SortedSupplier{
{
SupplierID: "supplier1",
SupplierParameters: "",
SortingData: map[string]interface{}{
utils.Weight: 10.0,
},
},
},
}
eRpl := &engine.SortedSuppliers{
ProfileID: "SPL_ACNT_1002",
Sorting: utils.MetaLeastCost,
@@ -151,8 +167,10 @@ func testDspSupGetSupFailover(t *testing.T) {
},
}
if err := dispEngine.RCP.Call(utils.SupplierSv1GetSuppliers,
args, &rpl); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Errorf("Expected error NOT_FOUND but recived %v and reply %v\n", err, rpl)
args, &rpl); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eRpl1, rpl) {
t.Errorf("Expecting : %+v, received: %+v", utils.ToJSON(eRpl1), utils.ToJSON(rpl))
}
allEngine2.stopEngine(t)
if err := dispEngine.RCP.Call(utils.SupplierSv1GetSuppliers,

View File

@@ -31,7 +31,7 @@ func (dS *DispatcherService) ThresholdSv1Ping(args *CGREvWithApiKey, reply *stri
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaThresholds,
return dS.Dispatch(&args.CGREvent, utils.MetaThresholds, args.RouteID,
utils.ThresholdSv1Ping, args.CGREvent, reply)
}
@@ -44,7 +44,7 @@ func (dS *DispatcherService) ThresholdSv1GetThresholdsForEvent(args *ArgsProcess
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaThresholds,
return dS.Dispatch(&args.CGREvent, utils.MetaThresholds, args.RouteID,
utils.ThresholdSv1GetThresholdsForEvent, args.ArgsProcessEvent, t)
}
@@ -57,6 +57,6 @@ func (dS *DispatcherService) ThresholdSv1ProcessEvent(args *ArgsProcessEventWith
return
}
}
return dS.Dispatch(&args.CGREvent, utils.MetaThresholds,
return dS.Dispatch(&args.CGREvent, utils.MetaThresholds, args.RouteID,
utils.ThresholdSv1ProcessEvent, args.ArgsProcessEvent, tIDs)
}

View File

@@ -33,68 +33,73 @@ var ( //var used in all tests
nowTime = time.Now()
)
type DispatcherResource struct {
APIKey string
RouteID *string // route over previous computed path
}
type CGREvWithApiKey struct {
APIKey string
DispatcherResource
utils.CGREvent
}
type TntIDWithApiKey struct {
utils.TenantID
APIKey string
DispatcherResource
}
type TntWithApiKey struct {
Tenant string
ApiKey string
utils.TenantArg
DispatcherResource
}
type ArgsV1ResUsageWithApiKey struct {
APIKey string
DispatcherResource
utils.ArgRSv1ResourceUsage
}
type ArgsProcessEventWithApiKey struct {
APIKey string
DispatcherResource
engine.ArgsProcessEvent
}
type ArgsAttrProcessEventWithApiKey struct {
APIKey string
DispatcherResource
engine.AttrArgsProcessEvent
}
type ArgsGetSuppliersWithApiKey struct {
APIKey string
DispatcherResource
engine.ArgsGetSuppliers
}
type ArgsStatProcessEventWithApiKey struct {
APIKey string
DispatcherResource
engine.StatsArgsProcessEvent
}
type AuthorizeArgsWithApiKey struct {
APIKey string
DispatcherResource
sessions.V1AuthorizeArgs
}
type InitArgsWithApiKey struct {
APIKey string
DispatcherResource
sessions.V1InitSessionArgs
}
type ProcessEventWithApiKey struct {
APIKey string
DispatcherResource
sessions.V1ProcessEventArgs
}
type TerminateSessionWithApiKey struct {
APIKey string
DispatcherResource
sessions.V1TerminateSessionArgs
}
type UpdateSessionWithApiKey struct {
APIKey string
DispatcherResource
sessions.V1UpdateSessionArgs
}

View File

@@ -31,7 +31,6 @@ type CGREvent struct {
ID string
Context *string // attach the event to a context
Time *time.Time // event time
RouteID *string // route over previous computed path
Event map[string]interface{}
}

View File

@@ -779,6 +779,10 @@ func NewTenantID(tntID string) *TenantID {
return &TenantID{Tenant: tIDSplt[0], ID: tIDSplt[1]}
}
type TenantArg struct {
Tenant string
}
type TenantID struct {
Tenant string
ID string