diff --git a/apier/v1/stats.go b/apier/v1/stats.go index 072421f63..7d7535f19 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -183,6 +183,11 @@ func (stsv1 *StatSv1) GetStatQueuesForEvent(args *engine.StatsArgsProcessEvent, return stsv1.sS.V1GetStatQueuesForEvent(args, reply) } +// GetStatQueue returns a StatQueue object +func (stsv1 *StatSv1) GetStatQueue(args *utils.TenantIDWithArgDispatcher, reply *engine.StatQueue) (err error) { + return stsv1.sS.V1GetStatQueue(args, reply) +} + // GetStringMetrics returns the string metrics for a Queue func (stsv1 *StatSv1) GetQueueStringMetrics(args *utils.TenantIDWithArgDispatcher, reply *map[string]string) (err error) { return stsv1.sS.V1GetQueueStringMetrics(args.TenantID, reply) diff --git a/data/conf/samples/actions_internal/cgradmin.json b/data/conf/samples/actions_internal/cgradmin.json index 09d950cd6..585f7e34f 100644 --- a/data/conf/samples/actions_internal/cgradmin.json +++ b/data/conf/samples/actions_internal/cgradmin.json @@ -25,6 +25,7 @@ "rals": { "enabled": true, // enable Rater service: + "stats_conns": ["*internal"], }, "schedulers": { @@ -41,15 +42,19 @@ "enabled": true, }, -"users": { - "enabled": true, // starts users service: . -}, "thresholds": { "enabled": true, "store_interval": "-1", }, + +"stats": { + "enabled": true, + "store_interval": "-1", +}, + + "apiers": { "enabled": true, "scheduler_conns": ["*internal"], diff --git a/data/conf/samples/actions_internal_gob/cgradmin.json b/data/conf/samples/actions_internal_gob/cgradmin.json index 565293191..e7104d16f 100644 --- a/data/conf/samples/actions_internal_gob/cgradmin.json +++ b/data/conf/samples/actions_internal_gob/cgradmin.json @@ -32,16 +32,17 @@ }, "rals": { - "enabled": true, // enable Rater service: + "enabled": true, + "stats_conns": ["*internal"], }, "schedulers": { - "enabled": true, // start Scheduler service: + "enabled": true, "cdrs_conns": ["conn1"], }, "cdrs": { - "enabled": true, // start the CDR Server service: + "enabled": true, "chargers_conns":["conn1"], }, @@ -49,15 +50,20 @@ "enabled": true, }, -"users": { - "enabled": true, // starts users service: . -}, "thresholds": { "enabled": true, "store_interval": "-1", }, + + +"stats": { + "enabled": true, + "store_interval": "-1", +}, + + "apiers": { "enabled": true, "scheduler_conns": ["*internal"], diff --git a/data/conf/samples/actions_mongo/cgradmin.json b/data/conf/samples/actions_mongo/cgradmin.json index 5f77eb2c1..32076943d 100644 --- a/data/conf/samples/actions_mongo/cgradmin.json +++ b/data/conf/samples/actions_mongo/cgradmin.json @@ -29,7 +29,8 @@ }, "rals": { - "enabled": true, // enable Rater service: + "enabled": true, + "stats_conns": ["*internal"], }, "schedulers": { @@ -46,15 +47,19 @@ "enabled": true, }, -"users": { - "enabled": true, // starts users service: . -}, "thresholds": { "enabled": true, "store_interval": "1s", }, + +"stats": { + "enabled": true, + "store_interval": "1s", +}, + + "apiers": { "enabled": true, "scheduler_conns": ["*internal"], diff --git a/data/conf/samples/actions_mongo_gob/cgradmin.json b/data/conf/samples/actions_mongo_gob/cgradmin.json index 6addcb5d4..ba6dc1377 100644 --- a/data/conf/samples/actions_mongo_gob/cgradmin.json +++ b/data/conf/samples/actions_mongo_gob/cgradmin.json @@ -28,12 +28,10 @@ "db_port": 27017, }, -"stor_db": { - "db_password": "CGRateS.org", -}, "rals": { - "enabled": true, // enable Rater service: + "enabled": true, + "stats_conns": ["*internal"], }, "schedulers": { @@ -50,15 +48,19 @@ "enabled": true, }, -"users": { - "enabled": true, // starts users service: . -}, "thresholds": { "enabled": true, "store_interval": "1s", }, + +"stats": { + "enabled": true, + "store_interval": "1s", +}, + + "apiers": { "enabled": true, "scheduler_conns": ["*internal"], diff --git a/data/conf/samples/actions_mysql/cgradmin.json b/data/conf/samples/actions_mysql/cgradmin.json index 48bba4b17..2bb5c9988 100644 --- a/data/conf/samples/actions_mysql/cgradmin.json +++ b/data/conf/samples/actions_mysql/cgradmin.json @@ -26,7 +26,8 @@ }, "rals": { - "enabled": true, // enable Rater service: + "enabled": true, + "stats_conns": ["*internal"], }, "schedulers": { @@ -43,15 +44,19 @@ "enabled": true, }, -"users": { - "enabled": true, // starts users service: . -}, "thresholds": { "enabled": true, "store_interval": "1s", }, + +"stats": { + "enabled": true, + "store_interval": "1s", +}, + + "apiers": { "enabled": true, "scheduler_conns": ["*internal"], diff --git a/data/conf/samples/actions_mysql_gob/cgradmin.json b/data/conf/samples/actions_mysql_gob/cgradmin.json index 521a90b56..940f3e3a2 100644 --- a/data/conf/samples/actions_mysql_gob/cgradmin.json +++ b/data/conf/samples/actions_mysql_gob/cgradmin.json @@ -34,6 +34,7 @@ "rals": { "enabled": true, // enable Rater service: + "stats_conns": ["conn1"], }, "schedulers": { @@ -50,15 +51,18 @@ "enabled": true, }, -"users": { - "enabled": true, // starts users service: . -}, "thresholds": { "enabled": true, "store_interval": "1s", }, + +"stats": { + "enabled": true, + "store_interval": "1s" +}, + "apiers": { "enabled": true, "scheduler_conns": ["*internal"], diff --git a/engine/action.go b/engine/action.go index e52ae594f..6341a0b88 100644 --- a/engine/action.go +++ b/engine/action.go @@ -543,9 +543,14 @@ func setddestinations(ub *Account, a *Action, acs Actions, extraData interface{} if statID == utils.EmptyString { continue } - var sts *StatQueue - if sts, err = dm.GetStatQueue(config.CgrConfig().GeneralCfg().DefaultTenant, statID, - true, false, utils.NonTransactional); err != nil { + var sts StatQueue + if err = connMgr.Call(config.CgrConfig().RalsCfg().StatSConns, nil, utils.StatSv1GetStatQueue, + &utils.TenantIDWithArgDispatcher{ + TenantID: &utils.TenantID{ + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + ID: statID, + }, + }, &sts); err != nil { return } ddcIface, has := sts.SQMetrics[utils.MetaDDC] diff --git a/engine/actions_it_test.go b/engine/actions_it_test.go index 2433f4c4b..905bd0a65 100644 --- a/engine/actions_it_test.go +++ b/engine/actions_it_test.go @@ -50,6 +50,7 @@ var ( testActionsitCDRAccount, testActionsitThresholdCgrRpcAction, testActionsitThresholdPostEvent, + testActionsitSetSDestinations, testActionsitStopCgrEngine, } ) @@ -639,6 +640,139 @@ func testActionsitThresholdPostEvent(t *testing.T) { } +func testActionsitSetSDestinations(t *testing.T) { + var reply string + attrsSetAccount := &utils.AttrSetAccount{ + Tenant: "cgrates.org", + Account: "testAccSetDDestination", + } + if err := actsLclRpc.Call(utils.APIerSv1SetAccount, attrsSetAccount, &reply); err != nil { + t.Error("Got error on APIerSv1.SetAccount: ", err.Error()) + } else if reply != utils.OK { + t.Errorf("Calling APIerSv1.SetAccount received: %s", reply) + } + attrsAA := &utils.AttrSetActions{ActionsId: "ACT_AddBalance", Actions: []*utils.TPAction{ + {Identifier: utils.TOPUP, BalanceType: utils.MONETARY, DestinationIds: "*ddc_test", + Units: "5", ExpiryTime: utils.UNLIMITED, Weight: 20.0}, + }} + if err := actsLclRpc.Call(utils.APIerSv2SetActions, attrsAA, &reply); err != nil && err.Error() != utils.ErrExists.Error() { + t.Error("Got error on APIerSv2.SetActions: ", err.Error()) + } else if reply != utils.OK { + t.Errorf("Calling APIerSv2.SetActions received: %s", reply) + } + + attrsEA := &utils.AttrExecuteAction{Tenant: attrsSetAccount.Tenant, Account: attrsSetAccount.Account, ActionsId: attrsAA.ActionsId} + if err := actsLclRpc.Call(utils.APIerSv1ExecuteAction, attrsEA, &reply); err != nil { + t.Error("Got error on APIerSv1.ExecuteAction: ", err.Error()) + } else if reply != utils.OK { + t.Errorf("Calling APIerSv1.ExecuteAction received: %s", reply) + } + + var acc Account + attrs2 := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "testAccSetDDestination"} + if err := actsLclRpc.Call(utils.APIerSv2GetAccount, attrs2, &acc); err != nil { + t.Error(err.Error()) + } else if _, has := acc.BalanceMap[utils.MONETARY][0].DestinationIDs["*ddc_test"]; !has { + t.Errorf("Unexpected destinationIDs: %+v", acc.BalanceMap[utils.MONETARY][0].DestinationIDs) + } + + if err := actsLclRpc.Call(utils.APIerSv1SetDestination, + &utils.AttrSetDestination{Id: "*ddc_test", Prefixes: []string{"111", "222"}}, &reply); err != nil { + t.Error("Got error on APIerSv1.ExecuteAction: ", err.Error()) + } else if reply != utils.OK { + t.Errorf("Calling APIerSv1.ExecuteAction received: %s", reply) + } + //verify destinations + var dest Destination + if err := actsLclRpc.Call(utils.APIerSv1GetDestination, + "*ddc_test", &dest); err != nil { + t.Error(err.Error()) + } else { + if len(dest.Prefixes) != 2 || !utils.IsSliceMember(dest.Prefixes, "111") || !utils.IsSliceMember(dest.Prefixes, "222") { + t.Errorf("Unexpected destination : %+v", dest) + } + } + + // set a StatQueueProfile and simulate process event + statConfig := &StatQueueProfile{ + Tenant: "cgrates.org", + ID: "DistinctMetricProfile", + QueueLength: 10, + TTL: time.Duration(10) * time.Second, + Metrics: []*MetricWithFilters{ + &MetricWithFilters{ + MetricID: utils.MetaDDC, + }, + }, + ThresholdIDs: []string{utils.META_NONE}, + Stored: true, + Weight: 20, + } + + if err := actsLclRpc.Call(utils.APIerSv1SetStatQueueProfile, statConfig, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Error("Unexpected reply returned", reply) + } + + var reply2 []string + expected := []string{"DistinctMetricProfile"} + args := StatsArgsProcessEvent{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.Destination: "333", + utils.Usage: time.Duration(6 * time.Second)}}} + if err := actsLclRpc.Call(utils.StatSv1ProcessEvent, &args, &reply2); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(reply2, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply2) + } + + args = StatsArgsProcessEvent{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event2", + Event: map[string]interface{}{ + utils.Destination: "777", + utils.Usage: time.Duration(6 * time.Second)}}} + if err := actsLclRpc.Call(utils.StatSv1ProcessEvent, &args, &reply2); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(reply2, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply2) + } + + //Execute setDDestinations + attrSetDDest := &utils.AttrSetActions{ActionsId: "ACT_setDDestination", Actions: []*utils.TPAction{ + {Identifier: utils.SET_DDESTINATIONS, ExtraParameters: "DistinctMetricProfile"}, + }} + if err := actsLclRpc.Call(utils.APIerSv2SetActions, attrSetDDest, &reply); err != nil && err.Error() != utils.ErrExists.Error() { + t.Error("Got error on APIerSv2.SetActions: ", err.Error()) + } else if reply != utils.OK { + t.Errorf("Calling APIerSv2.SetActions received: %s", reply) + } + + attrsetDDest := &utils.AttrExecuteAction{Tenant: attrsSetAccount.Tenant, + Account: attrsSetAccount.Account, ActionsId: attrSetDDest.ActionsId} + if err := actsLclRpc.Call(utils.APIerSv1ExecuteAction, attrsetDDest, &reply); err != nil { + t.Error("Got error on APIerSv1.ExecuteAction: ", err.Error()) + } else if reply != utils.OK { + t.Errorf("Calling APIerSv1.ExecuteAction received: %s", reply) + } + + //verify destinations + if err := actsLclRpc.Call(utils.APIerSv1GetDestination, + "*ddc_test", &dest); err != nil { + t.Error(err.Error()) + } else { + if len(dest.Prefixes) != 2 || !utils.IsSliceMember(dest.Prefixes, "333") || !utils.IsSliceMember(dest.Prefixes, "777") { + t.Errorf("Unexpected destination : %+v", dest) + } + } + +} + func testActionsitStopCgrEngine(t *testing.T) { if err := KillEngine(*waitRater); err != nil { t.Error(err) diff --git a/engine/actions_test.go b/engine/actions_test.go index 7b676b041..343c8993a 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -1478,70 +1478,6 @@ func TestTopupActionLoaded(t *testing.T) { } } -func TestActionSetDDestination(t *testing.T) { - acc := &Account{BalanceMap: map[string]Balances{ - utils.MONETARY: Balances{&Balance{DestinationIDs: utils.NewStringMap("*ddc_test")}}}} - origD := &Destination{Id: "*ddc_test", Prefixes: []string{"111", "222"}} - dm.SetDestination(origD, utils.NonTransactional) - dm.SetReverseDestination(origD, utils.NonTransactional) - // check redis and cache - if d, err := dm.GetDestination("*ddc_test", false, utils.NonTransactional); err != nil || !reflect.DeepEqual(d, origD) { - t.Error("Error storing destination: ", d, err) - } - dm.GetReverseDestination("111", false, utils.NonTransactional) - x1, found := Cache.Get(utils.CacheReverseDestinations, "111") - if !found || len(x1.([]string)) != 1 { - t.Error("Error cacheing destination: ", x1) - } - dm.GetReverseDestination("222", false, utils.NonTransactional) - x1, found = Cache.Get(utils.CacheReverseDestinations, "222") - if !found || len(x1.([]string)) != 1 { - t.Error("Error cacheing destination: ", x1) - } - - if err := dm.SetStatQueue(&StatQueue{Tenant: "cgrates.org", ID: "StatDestination", - SQMetrics: map[string]StatMetric{ - utils.MetaDDC: &StatDDC{FieldValues: map[string]map[string]struct{}{ - "333": {"Ev1": struct{}{}}, - "666": {"Ev2": struct{}{}}, - }}, - }}); err != nil { - t.Error(err) - } - - if err := setddestinations(acc, &Action{ExtraParameters: "StatDestination"}, nil, nil); err != nil { - t.Error(err) - } - d, err := dm.GetDestination("*ddc_test", false, utils.NonTransactional) - if err != nil || - d.Id != origD.Id || - len(d.Prefixes) != 2 || - !utils.IsSliceMember(d.Prefixes, "333") || - !utils.IsSliceMember(d.Prefixes, "666") { - t.Error("Error storing destination: ", d, err) - } - - var ok bool - x1, ok = Cache.Get(utils.CacheReverseDestinations, "111") - if ok { - t.Error("Error cacheing destination: ", x1) - } - x1, ok = Cache.Get(utils.CacheReverseDestinations, "222") - if ok { - t.Error("Error cacheing destination: ", x1) - } - dm.GetReverseDestination("333", false, utils.NonTransactional) - x1, found = Cache.Get(utils.CacheReverseDestinations, "333") - if !found || len(x1.([]string)) != 1 { - t.Error("Error cacheing destination: ", x1) - } - dm.GetReverseDestination("666", false, utils.NonTransactional) - x1, found = Cache.Get(utils.CacheReverseDestinations, "666") - if !found || len(x1.([]string)) != 1 { - t.Error("Error cacheing destination: ", x1) - } -} - func TestActionTransactionFuncType(t *testing.T) { err := dm.SetAccount(&Account{ ID: "cgrates.org:trans", diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index f6c2d0f79..844a3ab98 100644 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -365,7 +365,7 @@ func testLoaderITWriteToDatabase(t *testing.T) { } for tenantid, fltr := range loader.filters { - rcv, err := GetFilter(loader.dm, tenantid.Tenant, tenantid.ID, false, false, utils.NonTransactional) + rcv, err := loader.dm.GetFilter(tenantid.Tenant, tenantid.ID, false, false, utils.NonTransactional) if err != nil { t.Error("Failed GetFilter: ", err.Error()) } diff --git a/engine/stats.go b/engine/stats.go index 65d0b8573..b4d855f56 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -356,6 +356,16 @@ func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, repl return } +// V1GetStatQueue returns a StatQueue object +func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithArgDispatcher, reply *StatQueue) (err error) { + if sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, true, true, ""); err != nil { + return err + } else { + *reply = *sq + } + return +} + // V1GetQueueStringMetrics returns the metrics of a Queue as string values func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[string]string) (err error) { if missing := utils.MissingStructFields(args, []string{utils.Tenant, utils.ID}); len(missing) != 0 { //Params missing diff --git a/utils/consts.go b/utils/consts.go index da054e67b..c6242ba35 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1256,6 +1256,7 @@ const ( StatSv1GetQueueFloatMetrics = "StatSv1.GetQueueFloatMetrics" StatSv1Ping = "StatSv1.Ping" StatSv1GetStatQueuesForEvent = "StatSv1.GetStatQueuesForEvent" + StatSv1GetStatQueue = "StatSv1.GetStatQueue" APIerSv1GetStatQueueProfile = "APIerSv1.GetStatQueueProfile" APIerSv1RemoveStatQueueProfile = "APIerSv1.RemoveStatQueueProfile" APIerSv1SetStatQueueProfile = "APIerSv1.SetStatQueueProfile"