diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 810e33b9d..44652e4b6 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -926,7 +926,20 @@ func (self *ApierV1) ReloadCache(attrs utils.AttrReloadCache, reply *string) (er if err = self.DataDB.CacheDataFromDB(utils.REVERSE_ALIASES_PREFIX, dataIDs, true); err != nil { return } - // Resources + // ResourceConfig + dataIDs = make([]string, 0) + if attrs.ResourceConfigIDs == nil { + dataIDs = nil // Reload all + } else if len(*attrs.ResourceConfigIDs) > 0 { + dataIDs = make([]string, len(*attrs.ResourceConfigIDs)) + for idx, dId := range *attrs.ResourceConfigIDs { + dataIDs[idx] = dId + } + } + if err = self.DataDB.CacheDataFromDB(utils.ResourceConfigsPrefix, dataIDs, true); err != nil { + return + } + // Resource dataIDs = make([]string, 0) if attrs.ResourceIDs == nil { dataIDs = nil // Reload all @@ -936,7 +949,7 @@ func (self *ApierV1) ReloadCache(attrs utils.AttrReloadCache, reply *string) (er dataIDs[idx] = dId } } - if err = self.DataDB.CacheDataFromDB(utils.ResourceConfigsPrefix, dataIDs, true); err != nil { + if err = self.DataDB.CacheDataFromDB(utils.ResourcesPrefix, dataIDs, true); err != nil { return } *reply = utils.OK @@ -947,7 +960,7 @@ func (self *ApierV1) LoadCache(args utils.AttrReloadCache, reply *string) (err e if args.FlushAll { cache.Flush() } - var dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rlIDs []string + var dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rlIDs, resIDs []string if args.DestinationIDs == nil { dstIDs = nil } else { @@ -1013,15 +1026,21 @@ func (self *ApierV1) LoadCache(args utils.AttrReloadCache, reply *string) (err e } else { rvAlsIDs = *args.ReverseAliasIDs } - if args.ResourceIDs == nil { + if args.ResourceConfigIDs == nil { rlIDs = nil } else { - rlIDs = *args.ResourceIDs + rlIDs = *args.ResourceConfigIDs } + if args.ResourceIDs == nil { + resIDs = nil + } else { + resIDs = *args.ResourceIDs + } + if err := self.DataDB.LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs); err != nil { return utils.NewErrServerError(err) } - if err := self.DataDB.LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs); err != nil { + if err := self.DataDB.LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs, resIDs); err != nil { return utils.NewErrServerError(err) } *reply = utils.OK diff --git a/apier/v1/resourcesv1_it_test.go b/apier/v1/resourcesv1_it_test.go index bba1a02ef..a59a8c52c 100644 --- a/apier/v1/resourcesv1_it_test.go +++ b/apier/v1/resourcesv1_it_test.go @@ -43,42 +43,42 @@ var ( ) var sTestsRLSV1 = []func(t *testing.T){ - testV1RLSLoadConfig, - testV1RLSInitDataDb, - testV1RLSResetStorDb, - testV1RLSStartEngine, - testV1RLSRpcConn, - testV1RLSFromFolder, - testV1RLSGetResourcesFromEvent, - testV1RLSAllocateResource, - testV1RLSAllowUsage, - testV1RLSReleaseResource, - testV1RLSGetResourceConfigBeforeSet, - testV1RLSSetResourceConfig, - testV1RLSGetResourceConfigAfterSet, - testV1RLSUpdateResourceConfig, - testV1RLSGetResourceConfigAfterUpdate, - testV1RLSRemResourceCOnfig, - testV1RLSGetResourceConfigAfterDelete, - testV1RLSStopEngine, + testV1RsLoadConfig, + testV1RsInitDataDb, + testV1RsResetStorDb, + testV1RsStartEngine, + testV1RsRpcConn, + testV1RsFromFolder, + testV1RsGetResourcesFromEvent, + testV1RsAllocateResource, + testV1RsAllowUsage, + testV1RsReleaseResource, + testV1RsGetResourceConfigBeforeSet, + testV1RsSetResourceConfig, + testV1RsGetResourceConfigAfterSet, + testV1RsUpdateResourceConfig, + testV1RsGetResourceConfigAfterUpdate, + testV1RsRemResourceCOnfig, + testV1RsGetResourceConfigAfterDelete, + testV1RsStopEngine, } //Test start here -func TestRLSV1ITMySQL(t *testing.T) { +func TestRsV1ITMySQL(t *testing.T) { rlsV1ConfDIR = "tutmysql" for _, stest := range sTestsRLSV1 { t.Run(rlsV1ConfDIR, stest) } } -func TestRLSV1ITMongo(t *testing.T) { +func TestRsV1ITMongo(t *testing.T) { rlsV1ConfDIR = "tutmongo" for _, stest := range sTestsRLSV1 { t.Run(rlsV1ConfDIR, stest) } } -func testV1RLSLoadConfig(t *testing.T) { +func testV1RsLoadConfig(t *testing.T) { var err error rlsV1CfgPath = path.Join(*dataDir, "conf", "samples", rlsV1ConfDIR) if rlsV1Cfg, err = config.NewCGRConfigFromFolder(rlsV1CfgPath); err != nil { @@ -92,26 +92,26 @@ func testV1RLSLoadConfig(t *testing.T) { } } -func testV1RLSInitDataDb(t *testing.T) { +func testV1RsInitDataDb(t *testing.T) { if err := engine.InitDataDb(rlsV1Cfg); err != nil { t.Fatal(err) } } // Wipe out the cdr database -func testV1RLSResetStorDb(t *testing.T) { +func testV1RsResetStorDb(t *testing.T) { if err := engine.InitStorDb(rlsV1Cfg); err != nil { t.Fatal(err) } } -func testV1RLSStartEngine(t *testing.T) { +func testV1RsStartEngine(t *testing.T) { if _, err := engine.StopStartEngine(rlsV1CfgPath, resDelay); err != nil { t.Fatal(err) } } -func testV1RLSRpcConn(t *testing.T) { +func testV1RsRpcConn(t *testing.T) { var err error rlsV1Rpc, err = jsonrpc.Dial("tcp", rlsV1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed if err != nil { @@ -119,7 +119,7 @@ func testV1RLSRpcConn(t *testing.T) { } } -func testV1RLSFromFolder(t *testing.T) { +func testV1RsFromFolder(t *testing.T) { var reply string time.Sleep(time.Duration(2000) * time.Millisecond) attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} @@ -129,7 +129,7 @@ func testV1RLSFromFolder(t *testing.T) { time.Sleep(time.Duration(1000) * time.Millisecond) } -func testV1RLSGetResourcesFromEvent(t *testing.T) { +func testV1RsGetResourcesFromEvent(t *testing.T) { var reply *[]*engine.ResourceCfg ev := map[string]interface{}{"Unknown": "unknown"} if err := rlsV1Rpc.Call("ResourceSV1.GetResourcesForEvent", ev, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { @@ -171,7 +171,7 @@ func testV1RLSGetResourcesFromEvent(t *testing.T) { } } -func testV1RLSAllocateResource(t *testing.T) { +func testV1RsAllocateResource(t *testing.T) { var reply string attrRU := utils.AttrRLsResourceUsage{ @@ -216,17 +216,17 @@ func testV1RLSAllocateResource(t *testing.T) { } -func testV1RLSAllowUsage(t *testing.T) { - var reply bool +func testV1RsAllowUsage(t *testing.T) { + var allowed bool attrRU := utils.AttrRLsResourceUsage{ UsageID: "651a8db2-4f67-4cf8-b622-169e8a482e51", Event: map[string]interface{}{"Account": "1002", "Subject": "1001", "Destination": "1002"}, Units: 1, } - if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &reply); err != nil { + if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &allowed); err != nil { t.Error(err) - } else if reply != true { - t.Errorf("Expecting: %+v, received: %+v", true, reply) + } else if !allowed { + t.Errorf("Expecting: %+v, received: %+v", true, allowed) } attrRU = utils.AttrRLsResourceUsage{ @@ -234,14 +234,15 @@ func testV1RLSAllowUsage(t *testing.T) { Event: map[string]interface{}{"Account": "1002", "Subject": "1001", "Destination": "1002"}, Units: 2, } - if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &reply); err != nil { + if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &allowed); err != nil { // already t.Error(err) + } else if allowed { // already 3 usages active before allow call, we should have now more than allowed + t.Error("Resource allowed") } } -func testV1RLSReleaseResource(t *testing.T) { - var reply interface{} - +func testV1RsReleaseResource(t *testing.T) { + var reply string attrRU := utils.AttrRLsResourceUsage{ UsageID: "651a8db2-4f67-4cf8-b622-169e8a482e52", Event: map[string]interface{}{"Destination": "100"}, @@ -250,29 +251,28 @@ func testV1RLSReleaseResource(t *testing.T) { if err := rlsV1Rpc.Call("ResourceSV1.ReleaseResource", attrRU, &reply); err != nil { t.Error(err) } - if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &reply); err != nil { + var allowed bool + if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &allowed); err != nil { t.Error(err) - } else { - if reply != true { - t.Errorf("Expecting: %+v, received: %+v", true, reply) - } + } else if !allowed { + t.Error("not allowed") } - attrRU.Units += 7 - if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &reply); err == nil { - t.Errorf("Expecting: %+v, received: %+v", false, reply) + if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &allowed); err != nil { + t.Error(err) + } else if allowed { + t.Error("Resource should not be allowed") } - } -func testV1RLSGetResourceConfigBeforeSet(t *testing.T) { +func testV1RsGetResourceConfigBeforeSet(t *testing.T) { var reply *string if err := rlsV1Rpc.Call("ApierV1.GetResourceConfig", &AttrGetResCfg{ID: "RCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } -func testV1RLSSetResourceConfig(t *testing.T) { +func testV1RsSetResourceConfig(t *testing.T) { rlsConfig = &engine.ResourceCfg{ ID: "RCFG1", Filters: []*engine.RequestFilter{ @@ -302,7 +302,7 @@ func testV1RLSSetResourceConfig(t *testing.T) { } } -func testV1RLSGetResourceConfigAfterSet(t *testing.T) { +func testV1RsGetResourceConfigAfterSet(t *testing.T) { var reply *engine.ResourceCfg if err := rlsV1Rpc.Call("ApierV1.GetResourceConfig", &AttrGetResCfg{ID: rlsConfig.ID}, &reply); err != nil { t.Error(err) @@ -311,7 +311,7 @@ func testV1RLSGetResourceConfigAfterSet(t *testing.T) { } } -func testV1RLSUpdateResourceConfig(t *testing.T) { +func testV1RsUpdateResourceConfig(t *testing.T) { var result string rlsConfig.Filters = []*engine.RequestFilter{ &engine.RequestFilter{ @@ -332,7 +332,7 @@ func testV1RLSUpdateResourceConfig(t *testing.T) { } } -func testV1RLSGetResourceConfigAfterUpdate(t *testing.T) { +func testV1RsGetResourceConfigAfterUpdate(t *testing.T) { var reply *engine.ResourceCfg if err := rlsV1Rpc.Call("ApierV1.GetResourceConfig", &AttrGetResCfg{ID: rlsConfig.ID}, &reply); err != nil { t.Error(err) @@ -341,7 +341,7 @@ func testV1RLSGetResourceConfigAfterUpdate(t *testing.T) { } } -func testV1RLSRemResourceCOnfig(t *testing.T) { +func testV1RsRemResourceCOnfig(t *testing.T) { var resp string if err := rlsV1Rpc.Call("ApierV1.RemResourceConfig", &AttrGetResCfg{ID: rlsConfig.ID}, &resp); err != nil { t.Error(err) @@ -350,14 +350,14 @@ func testV1RLSRemResourceCOnfig(t *testing.T) { } } -func testV1RLSGetResourceConfigAfterDelete(t *testing.T) { +func testV1RsGetResourceConfigAfterDelete(t *testing.T) { var reply *string if err := rlsV1Rpc.Call("ApierV1.GetResourceConfig", &AttrGetResCfg{ID: "RCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } -func testV1RLSStopEngine(t *testing.T) { +func testV1RsStopEngine(t *testing.T) { if err := engine.KillEngine(100); err != nil { t.Error(err) } diff --git a/apier/v1/tpresources.go b/apier/v1/tpresources.go index 788806861..4be74cd37 100644 --- a/apier/v1/tpresources.go +++ b/apier/v1/tpresources.go @@ -44,9 +44,10 @@ func (self *ApierV1) GetTPResource(attr AttrGetTPResource, reply *utils.TPResour return utils.NewErrMandatoryIeMissing(missing...) } if rls, err := self.StorDb.GetTPResources(attr.TPid, attr.ID); err != nil { - return utils.NewErrServerError(err) - } else if len(rls) == 0 { - return utils.ErrNotFound + if err.Error() != utils.ErrNotFound.Error() { + err = utils.NewErrServerError(err) + } + return err } else { *reply = *rls[0] } diff --git a/apier/v1/tpresources_it_test.go b/apier/v1/tpresources_it_test.go new file mode 100644 index 000000000..d257cc8d9 --- /dev/null +++ b/apier/v1/tpresources_it_test.go @@ -0,0 +1,217 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package v1 + +import ( + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "net/rpc" + "net/rpc/jsonrpc" + "path" + "reflect" + "testing" +) + +var ( + tpResCfgPath string + tpResCfg *config.CGRConfig + tpResRPC *rpc.Client + tpResDataDir = "/usr/share/cgrates" + tpRes *utils.TPResource + tpResDelay int + tpResConfigDIR string //run tests for specific configuration +) + +var sTestsTPResources = []func(t *testing.T){ + testTPResInitCfg, + testTPResResetStorDb, + testTPResStartEngine, + testTPResRpcConn, + testTPResGetTPResourceBeforeSet, + testTPResSetTPResource, + testTPResGetTPResourceAfterSet, + testTPResUpdateTPResource, + testTPResGetTPResourceAfterUpdate, + testTPResRemTPResource, + testTPResGetTPResourceAfterRemove, + testTPResKillEngine, +} + +//Test start here +func TestTPResITMySql(t *testing.T) { + tpResConfigDIR = "tutmysql" + for _, stest := range sTestsTPResources { + t.Run(tpResConfigDIR, stest) + } +} + +func TestTPResITMongo(t *testing.T) { + tpResConfigDIR = "tutmongo" + for _, stest := range sTestsTPResources { + t.Run(tpResConfigDIR, stest) + } +} + +func TestTPResITPG(t *testing.T) { + tpResConfigDIR = "tutpostgres" + for _, stest := range sTestsTPResources { + t.Run(tpResConfigDIR, stest) + } +} + +func testTPResInitCfg(t *testing.T) { + var err error + tpResCfgPath = path.Join(tpResDataDir, "conf", "samples", tpResConfigDIR) + tpResCfg, err = config.NewCGRConfigFromFolder(tpResCfgPath) + if err != nil { + t.Error(err) + } + tpResCfg.DataFolderPath = tpResDataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(tpResCfg) + switch tpResConfigDIR { + case "tutmongo": // Mongo needs more time to reset db, need to investigate + tpResDelay = 4000 + default: + tpResDelay = 1000 + } +} + +// Wipe out the cdr database +func testTPResResetStorDb(t *testing.T) { + if err := engine.InitStorDb(tpResCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func testTPResStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(tpResCfgPath, tpResDelay); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func testTPResRpcConn(t *testing.T) { + var err error + tpResRPC, err = jsonrpc.Dial("tcp", tpResCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +func testTPResGetTPResourceBeforeSet(t *testing.T) { + var reply *utils.TPResource + if err := tpResRPC.Call("ApierV1.GetTPResource", AttrGetTPResource{TPid: "TPR1", ID: "Res"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + +func testTPResSetTPResource(t *testing.T) { + tpRes = &utils.TPResource{ + TPid: "TPR1", + ID: "Res", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + Type: "*string", + FieldName: "Account", + Values: []string{"1001", "1002"}, + }, + }, + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + ExpiryTime: "", + }, + UsageTTL: "1", + Limit: "1", + AllocationMessage: "Message", + Blocker: false, + Stored: false, + Weight: 20, + Thresholds: []string{"ValOne", "ValTwo"}, + } + var result string + if err := tpResRPC.Call("ApierV1.SetTPResource", tpRes, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + +func testTPResGetTPResourceAfterSet(t *testing.T) { + var respond *utils.TPResource + if err := tpResRPC.Call("ApierV1.GetTPResource", &AttrGetTPResource{TPid: tpRes.TPid, ID: tpRes.ID}, &respond); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tpRes, respond) { + t.Errorf("Expecting : %+v, received: %+v", tpRes, respond) + } +} + +func testTPResUpdateTPResource(t *testing.T) { + var result string + tpRes.Filters = []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + Type: "*string", + FieldName: "Account", + Values: []string{"1001", "1002"}, + }, + &utils.TPRequestFilter{ + Type: "*string_prefix", + FieldName: "Destination", + Values: []string{"10", "20"}, + }, + } + if err := tpResRPC.Call("ApierV1.SetTPResource", tpRes, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + +func testTPResGetTPResourceAfterUpdate(t *testing.T) { + var expectedTPR *utils.TPResource + if err := tpResRPC.Call("ApierV1.GetTPResource", &AttrGetTPResource{TPid: tpRes.TPid, ID: tpRes.ID}, &expectedTPR); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tpRes, expectedTPR) { + t.Errorf("Expecting: %+v, received: %+v", tpRes, expectedTPR) + } +} + +func testTPResRemTPResource(t *testing.T) { + var resp string + if err := tpResRPC.Call("ApierV1.RemTPResource", &AttrGetTPResource{TPid: tpRes.TPid, ID: tpRes.ID}, &resp); err != nil { + t.Error(err) + } else if resp != utils.OK { + t.Error("Unexpected reply returned", resp) + } +} + +func testTPResGetTPResourceAfterRemove(t *testing.T) { + var respond *utils.TPResource + if err := tpResRPC.Call("ApierV1.GetTPResource", &AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &respond); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + +func testTPResKillEngine(t *testing.T) { + if err := engine.KillEngine(tpResDelay); err != nil { + t.Error(err) + } +} diff --git a/apier/v1/tpstats_it_test.go b/apier/v1/tpstats_it_test.go index c27b25bdf..8c39b9539 100644 --- a/apier/v1/tpstats_it_test.go +++ b/apier/v1/tpstats_it_test.go @@ -30,99 +30,140 @@ import ( "testing" ) -var tpCfgPath string -var tpCfg *config.CGRConfig -var tpRPC *rpc.Client -var tpDataDir = "/usr/share/cgrates" +var ( + tpStatCfgPath string + tpStatCfg *config.CGRConfig + tpStatRPC *rpc.Client + tpStatDataDir = "/usr/share/cgrates" + tpStat *utils.TPStats + tpStatDelay int + tpStatConfigDIR string //run tests for specific configuration +) -func TestTPStatInitCfg(t *testing.T) { +var sTestsTPStats = []func(t *testing.T){ + testTPStatsInitCfg, + testTPStatsResetStorDb, + testTPStatsStartEngine, + testTPStatsRpcConn, + testTPStatsGetTPStatBeforeSet, + testTPStatsSetTPStat, + testTPStatsGetTPStatAfterSet, + testTPStatsUpdateTPStat, + testTPStatsGetTPStatAfterUpdate, + testTPStatsRemTPStat, + testTPStatsGetTPStatAfterRemove, + testTPStatsKillEngine, +} + +//Test start here +func TestTPStatITMySql(t *testing.T) { + tpStatConfigDIR = "tutmysql" + for _, stest := range sTestsTPStats { + t.Run(tpStatConfigDIR, stest) + } +} + +func TestTPStatITMongo(t *testing.T) { + tpStatConfigDIR = "tutmongo" + for _, stest := range sTestsTPStats { + t.Run(tpStatConfigDIR, stest) + } +} + +func TestTPStatITPG(t *testing.T) { + tpStatConfigDIR = "tutpostgres" + for _, stest := range sTestsTPStats { + t.Run(tpStatConfigDIR, stest) + } +} + +func testTPStatsInitCfg(t *testing.T) { var err error - tpCfgPath = path.Join(tpDataDir, "conf", "samples", "tutmysql") - tpCfg, err = config.NewCGRConfigFromFolder(tpCfgPath) + tpStatCfgPath = path.Join(tpStatDataDir, "conf", "samples", tpStatConfigDIR) + tpStatCfg, err = config.NewCGRConfigFromFolder(tpStatCfgPath) if err != nil { t.Error(err) } - tpCfg.DataFolderPath = tpDataDir // Share DataFolderPath through config towards StoreDb for Flush() - config.SetCgrConfig(tpCfg) + tpStatCfg.DataFolderPath = tpStatDataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(tpStatCfg) + switch tpStatConfigDIR { + case "tutmongo": // Mongo needs more time to reset db, need to investigate + tpStatDelay = 4000 + default: + tpStatDelay = 1000 + } } // Wipe out the cdr database -func TestTPStatResetStorDb(t *testing.T) { - if err := engine.InitStorDb(tpCfg); err != nil { +func testTPStatsResetStorDb(t *testing.T) { + if err := engine.InitStorDb(tpStatCfg); err != nil { t.Fatal(err) } } // Start CGR Engine - -func TestTPStatStartEngine(t *testing.T) { - if _, err := engine.StopStartEngine(tpCfgPath, 1000); err != nil { +func testTPStatsStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(tpStatCfgPath, tpStatDelay); err != nil { t.Fatal(err) } } // Connect rpc client to rater -func TestTPStatRpcConn(t *testing.T) { +func testTPStatsRpcConn(t *testing.T) { var err error - tpRPC, err = jsonrpc.Dial("tcp", tpCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + tpStatRPC, err = jsonrpc.Dial("tcp", tpStatCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal(err) } } -var tpStat = &utils.TPStats{ - TPid: "TPS1", - ID: "Stat1", - Filters: []*utils.TPRequestFilter{ - &utils.TPRequestFilter{ - Type: "*string", - FieldName: "Account", - Values: []string{"1001", "1002"}, - }, - &utils.TPRequestFilter{ - Type: "*string_prefix", - FieldName: "Destination", - Values: []string{"10", "20"}, - }, - }, - ActivationInterval: &utils.TPActivationInterval{ - ActivationTime: "2014-07-29T15:00:00Z", - ExpiryTime: "", - }, - TTL: "1", - Metrics: []string{"MetricValue", "MetricValueTwo"}, - Blocker: true, - Stored: true, - Weight: 20, - Thresholds: nil, -} - -func TestTPStatGetTPStatIDs(t *testing.T) { - var reply []string - if err := tpRPC.Call("ApierV1.GetTPStatIDs", AttrGetTPStatIds{TPid: "TPS1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { +func testTPStatsGetTPStatBeforeSet(t *testing.T) { + var reply *utils.TPStats + if err := tpStatRPC.Call("ApierV1.GetTPStat", AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } -func TestTPStatSetTPStat(t *testing.T) { +func testTPStatsSetTPStat(t *testing.T) { + tpStat = &utils.TPStats{ + TPid: "TPS1", + ID: "Stat1", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + Type: "*string", + FieldName: "Account", + Values: []string{"1001", "1002"}, + }, + }, + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + ExpiryTime: "", + }, + TTL: "1", + Metrics: []string{"MetricValue", "MetricValueTwo"}, + Blocker: false, + Stored: false, + Weight: 20, + Thresholds: []string{"ThreshValue", "ThreshValueTwo"}, + } var result string - if err := tpRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil { + if err := tpStatRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil { t.Error(err) } else if result != utils.OK { t.Error("Unexpected reply returned", result) } } -func TestTPStatGetTPStat(t *testing.T) { +func testTPStatsGetTPStatAfterSet(t *testing.T) { var respond *utils.TPStats - if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &respond); err != nil { + if err := tpStatRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &respond); err != nil { t.Error(err) } else if !reflect.DeepEqual(tpStat, respond) { - t.Errorf("Expecting: %+v, received: %+v", tpStat.TPid, respond.TPid) + t.Errorf("Expecting: %+v, received: %+v", tpStat, respond) } } -func TestTPStatUpdateTPStat(t *testing.T) { +func testTPStatsUpdateTPStat(t *testing.T) { var result string tpStat.Weight = 21 tpStat.Filters = []*utils.TPRequestFilter{ @@ -136,43 +177,41 @@ func TestTPStatUpdateTPStat(t *testing.T) { FieldName: "Destination", Values: []string{"10", "20"}, }, - &utils.TPRequestFilter{ - Type: "*rsr_fields", - FieldName: "", - Values: []string{"Subject(~^1.*1$)", "Destination(1002)"}, - }, } - if err := tpRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil { + if err := tpStatRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil { t.Error(err) } else if result != utils.OK { t.Error("Unexpected reply returned", result) } +} + +func testTPStatsGetTPStatAfterUpdate(t *testing.T) { var expectedTPS *utils.TPStats - if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &expectedTPS); err != nil { + if err := tpStatRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &expectedTPS); err != nil { t.Error(err) } else if !reflect.DeepEqual(tpStat, expectedTPS) { t.Errorf("Expecting: %+v, received: %+v", tpStat, expectedTPS) } } -func TestTPStatRemTPStat(t *testing.T) { +func testTPStatsRemTPStat(t *testing.T) { var resp string - if err := tpRPC.Call("ApierV1.RemTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &resp); err != nil { + if err := tpStatRPC.Call("ApierV1.RemTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &resp); err != nil { t.Error(err) } else if resp != utils.OK { t.Error("Unexpected reply returned", resp) } } -func TestTPStatCheckDelete(t *testing.T) { +func testTPStatsGetTPStatAfterRemove(t *testing.T) { var respond *utils.TPStats - if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &respond); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := tpStatRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &respond); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } -func TestTPStatKillEngine(t *testing.T) { - if err := engine.KillEngine(100); err != nil { +func testTPStatsKillEngine(t *testing.T) { + if err := engine.KillEngine(tpStatDelay); err != nil { t.Error(err) } } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 3c3c646ed..46a11931c 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -530,27 +530,31 @@ func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, data func startResourceService(internalRsChan, internalStatSConn chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dataDB engine.DataDB, server *utils.Server, exitChan chan bool) { var statsConn *rpcclient.RpcClientPool - if len(cfg.ResourceLimiterCfg().StatSConns) != 0 { // Stats connection init + if len(cfg.ResourceSCfg().StatSConns) != 0 { // Stats connection init statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.ResourceLimiterCfg().StatSConns, internalStatSConn, cfg.InternalTtl) + cfg.ResourceSCfg().StatSConns, internalStatSConn, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS: %s", err.Error())) exitChan <- true return } } - rS, err := engine.NewResourceService(cfg, dataDB, statsConn) + rS, err := engine.NewResourceService(dataDB, cfg.ResourceSCfg().ShortCache, cfg.ResourceSCfg().StoreInterval, statsConn) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) exitChan <- true return } utils.Logger.Info(fmt.Sprintf("Starting Resource Service")) - if err := rS.ListenAndServe(exitChan); err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + go func() { + if err := rS.ListenAndServe(exitChan); err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + + } + rS.Shutdown() exitChan <- true return - } + }() rsV1 := v1.NewResourceSV1(rS) server.RpcRegister(rsV1) internalRsChan <- rsV1 @@ -848,7 +852,7 @@ func main() { } // Start RL service - if cfg.ResourceLimiterCfg().Enabled { + if cfg.ResourceSCfg().Enabled { go startResourceService(internalRsChan, internalStatSChan, cfg, dataDB, server, exitChan) } diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index ebceaae46..0046efec6 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -42,7 +42,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, cacheTaskChan) go func() { defer close(cacheTaskChan) - var dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rlIDs []string + var dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rlIDs, resIDs []string if cCfg, has := cfg.CacheConfig[utils.CacheDestinations]; !has || !cCfg.Precache { dstIDs = make([]string, 0) // Don't cache any } @@ -85,13 +85,17 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC if cCfg, has := cfg.CacheConfig[utils.CacheResourceConfigs]; !has || !cCfg.Precache { rlIDs = make([]string, 0) } + if cCfg, has := cfg.CacheConfig[utils.CacheResources]; !has || !cCfg.Precache { + resIDs = make([]string, 0) + } + // ToDo: Add here timings if err := dataDB.LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs); err != nil { utils.Logger.Crit(fmt.Sprintf(" Cache rating error: %s", err.Error())) exitChan <- true return } - if err := dataDB.LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs); err != nil { + if err := dataDB.LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs, resIDs); err != nil { utils.Logger.Crit(fmt.Sprintf(" Cache accounting error: %s", err.Error())) exitChan <- true return diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index 11aaef819..9ab67df3b 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -68,7 +68,7 @@ func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) { if err := dataDb.LoadRatingCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil); err != nil { return nilDuration, fmt.Errorf("Cache rating error: %s", err.Error()) } - if err := dataDb.LoadAccountingCache(nil, nil, nil); err != nil { + if err := dataDb.LoadAccountingCache(nil, nil, nil, nil); err != nil { return nilDuration, fmt.Errorf("Cache accounting error: %s", err.Error()) } log.Printf("Runnning %d cycles...", *runs) diff --git a/config/config.go b/config/config.go index 7678bb991..97aa589c8 100755 --- a/config/config.go +++ b/config/config.go @@ -265,7 +265,7 @@ type CGRConfig struct { AliasesServerEnabled bool // Starts PubSub as server: . UserServerEnabled bool // Starts User as server: UserServerIndexes []string // List of user profile field indexes - resourceLimiterCfg *ResourceLimiterConfig // Configuration for resource limiter + resourceSCfg *ResourceSConfig // Configuration for resource limiter statsCfg *StatSCfg // Configuration for StatS MailerServer string // The server to use when sending emails out MailerAuthUser string // Authenticate to email server using this user @@ -417,7 +417,7 @@ func (self *CGRConfig) checkConfigSanity() error { } } for _, smFSRLsConn := range self.SmFsConfig.RLsConns { - if smFSRLsConn.Address == utils.MetaInternal && !self.resourceLimiterCfg.Enabled { + if smFSRLsConn.Address == utils.MetaInternal && !self.resourceSCfg.Enabled { return errors.New("RLs not enabled but referenced by SMFreeSWITCH component") } } @@ -441,7 +441,7 @@ func (self *CGRConfig) checkConfigSanity() error { } } for _, smKamRLsConn := range self.SmKamConfig.RLsConns { - if smKamRLsConn.Address == utils.MetaInternal && !self.resourceLimiterCfg.Enabled { + if smKamRLsConn.Address == utils.MetaInternal && !self.resourceSCfg.Enabled { return errors.New("RLs not enabled but requested by SM-Kamailio component") } } @@ -502,8 +502,8 @@ func (self *CGRConfig) checkConfigSanity() error { } } // ResourceLimiter checks - if self.resourceLimiterCfg != nil && self.resourceLimiterCfg.Enabled { - for _, connCfg := range self.resourceLimiterCfg.StatSConns { + if self.resourceSCfg != nil && self.resourceSCfg.Enabled { + for _, connCfg := range self.resourceSCfg.StatSConns { if connCfg.Address == utils.MetaInternal && !self.statsCfg.Enabled { return errors.New("StatS not enabled but requested by ResourceLimiter component.") } @@ -632,7 +632,7 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { return err } - jsnRLSCfg, err := jsnCfg.ResourceLimiterJsonCfg() + jsnRLSCfg, err := jsnCfg.ResourceSJsonCfg() if err != nil { return err } @@ -1075,10 +1075,10 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { } if jsnRLSCfg != nil { - if self.resourceLimiterCfg == nil { - self.resourceLimiterCfg = new(ResourceLimiterConfig) + if self.resourceSCfg == nil { + self.resourceSCfg = new(ResourceSConfig) } - if self.resourceLimiterCfg.loadFromJsonCfg(jsnRLSCfg); err != nil { + if self.resourceSCfg.loadFromJsonCfg(jsnRLSCfg); err != nil { return err } } @@ -1146,8 +1146,8 @@ func (self *CGRConfig) RadiusAgentCfg() *RadiusAgentCfg { } // ToDo: fix locking here -func (self *CGRConfig) ResourceLimiterCfg() *ResourceLimiterConfig { - return self.resourceLimiterCfg +func (self *CGRConfig) ResourceSCfg() *ResourceSConfig { + return self.resourceSCfg } // ToDo: fix locking diff --git a/config/config_defaults.go b/config/config_defaults.go index 27261f56a..3d6498d94 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -405,9 +405,10 @@ const CGRATES_CFG_JSON = ` "resources": { - "enabled": false, // starts ResourceLimiter service: . - "stats_conns": [], // address where to reach the stats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> - "store_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur> + "enabled": false, // starts ResourceLimiter service: . + "stats_conns": [], // address where to reach the stats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> + "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur> + "short_cache": {"limit": -1, "ttl": "1m", "static_ttl": false}, // short cache for data like resources for events in case of allow queries }, diff --git a/config/config_json.go b/config/config_json.go index 5159ecc0b..0fb7d2402 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -27,37 +27,37 @@ import ( ) const ( - GENERAL_JSN = "general" - CACHE_JSN = "cache" - LISTEN_JSN = "listen" - HTTP_JSN = "http" - DATADB_JSN = "data_db" - STORDB_JSN = "stor_db" - RALS_JSN = "rals" - SCHEDULER_JSN = "scheduler" - CDRS_JSN = "cdrs" - MEDIATOR_JSN = "mediator" - CDRSTATS_JSN = "cdrstats" - CDRE_JSN = "cdre" - CDRC_JSN = "cdrc" - SMGENERIC_JSON = "sm_generic" - SMFS_JSN = "sm_freeswitch" - SMKAM_JSN = "sm_kamailio" - SMOSIPS_JSN = "sm_opensips" - SMAsteriskJSN = "sm_asterisk" - SM_JSN = "session_manager" - FS_JSN = "freeswitch" - KAMAILIO_JSN = "kamailio" - OSIPS_JSN = "opensips" - DA_JSN = "diameter_agent" - RA_JSN = "radius_agent" - HISTSERV_JSN = "historys" - PUBSUBSERV_JSN = "pubsubs" - ALIASESSERV_JSN = "aliases" - USERSERV_JSN = "users" - RESOURCELIMITER_JSON = "resources" - MAILER_JSN = "mailer" - SURETAX_JSON = "suretax" + GENERAL_JSN = "general" + CACHE_JSN = "cache" + LISTEN_JSN = "listen" + HTTP_JSN = "http" + DATADB_JSN = "data_db" + STORDB_JSN = "stor_db" + RALS_JSN = "rals" + SCHEDULER_JSN = "scheduler" + CDRS_JSN = "cdrs" + MEDIATOR_JSN = "mediator" + CDRSTATS_JSN = "cdrstats" + CDRE_JSN = "cdre" + CDRC_JSN = "cdrc" + SMGENERIC_JSON = "sm_generic" + SMFS_JSN = "sm_freeswitch" + SMKAM_JSN = "sm_kamailio" + SMOSIPS_JSN = "sm_opensips" + SMAsteriskJSN = "sm_asterisk" + SM_JSN = "session_manager" + FS_JSN = "freeswitch" + KAMAILIO_JSN = "kamailio" + OSIPS_JSN = "opensips" + DA_JSN = "diameter_agent" + RA_JSN = "radius_agent" + HISTSERV_JSN = "historys" + PUBSUBSERV_JSN = "pubsubs" + ALIASESSERV_JSN = "aliases" + USERSERV_JSN = "users" + RESOURCES_JSON = "resources" + MAILER_JSN = "mailer" + SURETAX_JSON = "suretax" ) // Loads the json config out of io.Reader, eg other sources than file, maybe over http @@ -347,12 +347,12 @@ func (self CgrJsonCfg) UserServJsonCfg() (*UserServJsonCfg, error) { return cfg, nil } -func (self CgrJsonCfg) ResourceLimiterJsonCfg() (*ResourceLimiterServJsonCfg, error) { - rawCfg, hasKey := self[RESOURCELIMITER_JSON] +func (self CgrJsonCfg) ResourceSJsonCfg() (*ResourceSJsonCfg, error) { + rawCfg, hasKey := self[RESOURCES_JSON] if !hasKey { return nil, nil } - cfg := new(ResourceLimiterServJsonCfg) + cfg := new(ResourceSJsonCfg) if err := json.Unmarshal(*rawCfg, cfg); err != nil { return nil, err } diff --git a/config/config_json_test.go b/config/config_json_test.go index 924ceccf2..d66132ec1 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -666,15 +666,19 @@ func TestDfUserServJsonCfg(t *testing.T) { } func TestDfResourceLimiterSJsonCfg(t *testing.T) { - eCfg := &ResourceLimiterServJsonCfg{ + eCfg := &ResourceSJsonCfg{ Enabled: utils.BoolPointer(false), Stats_conns: &[]*HaPoolJsonCfg{}, - Store_interval: utils.StringPointer("0s"), + Store_interval: utils.StringPointer(""), + Short_cache: &CacheParamJsonCfg{ + Limit: utils.IntPointer(-1), + Ttl: utils.StringPointer("1m"), + Static_ttl: utils.BoolPointer(false)}, } - if cfg, err := dfCgrJsonCfg.ResourceLimiterJsonCfg(); err != nil { + if cfg, err := dfCgrJsonCfg.ResourceSJsonCfg(); err != nil { t.Error(err) } else if !reflect.DeepEqual(eCfg, cfg) { - t.Error("Received: ", cfg) + t.Errorf("Received: %s", utils.ToIJSON(cfg)) } } diff --git a/config/config_test.go b/config/config_test.go index 75b4ac6e4..6c8169e11 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -555,14 +555,16 @@ func TestCgrCfgJSONDefaultsUserS(t *testing.T) { } func TestCgrCfgJSONDefaultsResLimCfg(t *testing.T) { - eResLiCfg := &ResourceLimiterConfig{ + eResLiCfg := &ResourceSConfig{ Enabled: false, StatSConns: []*HaPoolConfig{}, StoreInterval: 0, + ShortCache: &CacheParamConfig{Limit: -1, + TTL: time.Duration(1 * time.Minute), StaticTTL: false}, } - if !reflect.DeepEqual(cgrCfg.resourceLimiterCfg, eResLiCfg) { - t.Errorf("received: %+v, expecting: %+v", cgrCfg.resourceLimiterCfg, eResLiCfg) + if !reflect.DeepEqual(cgrCfg.resourceSCfg, eResLiCfg) { + t.Errorf("expecting: %s, received: %s", utils.ToIJSON(eResLiCfg), utils.ToIJSON(cgrCfg.resourceSCfg)) } } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index b908bf4c8..91aed962e 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -379,10 +379,11 @@ type UserServJsonCfg struct { } // ResourceLimiter service config section -type ResourceLimiterServJsonCfg struct { +type ResourceSJsonCfg struct { Enabled *bool Stats_conns *[]*HaPoolJsonCfg Store_interval *string + Short_cache *CacheParamJsonCfg } // Stat service config section diff --git a/config/reslimitercfg.go b/config/reslimitercfg.go index fe77fedf2..aede8f589 100644 --- a/config/reslimitercfg.go +++ b/config/reslimitercfg.go @@ -23,13 +23,14 @@ import ( "github.com/cgrates/cgrates/utils" ) -type ResourceLimiterConfig struct { +type ResourceSConfig struct { Enabled bool StatSConns []*HaPoolConfig // Connections towards StatS StoreInterval time.Duration // Dump regularly from cache into dataDB + ShortCache *CacheParamConfig } -func (rlcfg *ResourceLimiterConfig) loadFromJsonCfg(jsnCfg *ResourceLimiterServJsonCfg) (err error) { +func (rlcfg *ResourceSConfig) loadFromJsonCfg(jsnCfg *ResourceSJsonCfg) (err error) { if jsnCfg == nil { return nil } @@ -45,7 +46,13 @@ func (rlcfg *ResourceLimiterConfig) loadFromJsonCfg(jsnCfg *ResourceLimiterServJ } if jsnCfg.Store_interval != nil { if rlcfg.StoreInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Store_interval); err != nil { - return err + return + } + } + if jsnCfg.Short_cache != nil { + rlcfg.ShortCache = new(CacheParamConfig) + if err = rlcfg.ShortCache.loadFromJsonCfg(jsnCfg.Short_cache); err != nil { + return } } return nil diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index 6286b22e0..5bb324d5c 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -7,23 +7,23 @@ "reply_timeout": "30s", }, + "listen": { "rpc_json": ":2012", "rpc_gob": ":2013", "http": ":2080", }, + "data_db": { "db_type": "mongo", - "db_host": "127.0.0.1", "db_port": 27017, }, + "stor_db": { "db_type": "mongo", - "db_host": "127.0.0.1", "db_port": 27017, - "db_password":"", }, @@ -43,10 +43,12 @@ ], }, + "scheduler": { "enabled": true, }, + "cdrs": { "enabled": true, "cdrstats_conns": [ @@ -54,37 +56,39 @@ ], }, + "cdrstats": { "enabled": true, }, + "pubsubs": { "enabled": true, }, + "users": { "enabled": true, "indexes": ["Uuid"], }, + "aliases": { "enabled": true, }, + "resources": { "enabled": true, - "stats_conns": [ - //{"address": "*internal"} - ], - "cache_dump_interval": "0s", - "usage_ttl": "3h", }, + "stats": { "enabled": true, "store_interval": "0s", }, + "sm_generic": { "enabled": true, }, diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index eaf4b8c19..e8b0f8b49 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -7,34 +7,17 @@ "log_level": 7, }, + "listen": { "rpc_json": ":2012", "rpc_gob": ":2013", "http": ":2080", }, -"data_db": { // database used to store runtime data (eg: accounts, cdr stats) - "db_type": "redis", // data_db type: - "db_host": "192.168.100.40", // data_db host address - "db_port": 6379, // data_db port to reach the database - "db_name": "10", // data_db database name to connect to - "db_user": "cgrates", // username to use when connecting to data_db - "db_password": "", // password to use when connecting to data_db - "load_history_size": 10, // Number of records in the load history -}, -"stor_db": { // database used to store offline tariff plans and CDRs - "db_type": "mysql", // stor database type to use: - "db_host": "192.168.100.40", // the host to connect to - "db_port": 3306, // the port to reach the stordb - "db_name": "cgrates", // stor database name - "db_user": "cgrates", // username to use when connecting to stordb - "db_password": "CGRateS.org", // password to use when connecting to stordb - "max_open_conns": 100, // maximum database connections opened, not applying for mongo - "max_idle_conns": 10, // maximum database connections idle, not applying for mongo - "conn_max_lifetime": 0, // maximum amount of time in seconds a connection may be reused (0 for unlimited), not applying for mongo - "cdrs_indexes": [], // indexes on cdrs table to speed up queries, used only in case of mongo -}, +"stor_db": { + "db_password": "CGRateS.org", +}, "cache":{ @@ -72,6 +55,7 @@ ], }, + "scheduler": { "enabled": true, }, @@ -87,7 +71,7 @@ "cdre": { "TestTutITExportCDR": { - "content_fields": [ // template of the exported content fields + "content_fields": [ {"tag": "CGRID", "type": "*composed", "value": "CGRID"}, {"tag": "RunID", "type": "*composed", "value": "RunID"}, {"tag":"OriginID", "type": "*composed", "value": "OriginID"}, @@ -106,51 +90,45 @@ "cdrstats": { - "enabled": true, // starts the cdrstats service: + "enabled": true, }, "pubsubs": { - "enabled": true, // starts PubSub service: . + "enabled": true, }, "users": { - "enabled": true, // starts User service: . - "indexes": ["Uuid"], // user profile field indexes + "enabled": true, + "indexes": ["Uuid"], }, -"rls": { - "enabled": true, // starts ResourceLimiter service: . - "cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> - "cache_dump_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|dur> - "usage_ttl": "3h", // expire usage records if older than this duration <""|*never|dur> -}, "resources": { "enabled": true, - "stats_conns": [ - //{"address": "*internal"} - ], - "cache_dump_interval": "0s", - "usage_ttl": "3h", }, + "stats": { "enabled": true, "store_interval": "0s", }, + "historys": { "enabled": true, }, + "aliases": { - "enabled": true, // starts Aliases service: . + "enabled": true, }, + "sm_generic": { "enabled": true, }, -} + +} \ No newline at end of file diff --git a/data/conf/samples/tutpostgres/cgrates.json b/data/conf/samples/tutpostgres/cgrates.json index f40477ccb..77b78db03 100644 --- a/data/conf/samples/tutpostgres/cgrates.json +++ b/data/conf/samples/tutpostgres/cgrates.json @@ -1,21 +1,24 @@ - { +{ // CGRateS Configuration file // // Used for cgradmin // Starts rater, scheduler + "listen": { "rpc_json": ":2012", // RPC JSON listening address "rpc_gob": ":2013", // RPC GOB listening address "http": ":2080", // HTTP listening address }, + "stor_db": { "db_type": "postgres", // stor database type to use: "db_port": 5432, // the port to reach the stordb "db_password": "CGRateS.org", }, + "rals": { "enabled": true, // enable Rater service: "cdrstats_conns": [ @@ -32,10 +35,12 @@ ], }, + "scheduler": { "enabled": true, // start Scheduler service: }, + "cdrs": { "enabled": true, // start the CDR Server service: "cdrstats_conns": [ @@ -43,10 +48,12 @@ ], }, + "cdrstats": { "enabled": true, // starts the cdrstats service: }, + "pubsubs": { "enabled": true, // starts PubSub service: . }, @@ -57,14 +64,20 @@ "indexes": ["Uuid"], // user profile field indexes }, + "aliases": { "enabled": true, // starts Aliases service: . }, +"resources": { + "enabled": true, +}, + + "sm_generic": { "enabled": true, }, -} +} \ No newline at end of file diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index ec604c377..f8b6cf374 100755 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -265,7 +265,7 @@ cgrates.org,mas,true,another,value,10 *out,cgrates.org,call,remo,remo,*any,*rating,Subject,remo,minu,10 *out,cgrates.org,call,remo,remo,*any,*rating,Account,remo,minu,10 ` - resLimits = ` + resCfgs = ` #Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],TTL[5],Limit[6],AllocationMessage[7],Weight[8],Thresholds[9] ResGroup21,*string,HdrAccount,1001;1002,2014-07-29T15:00:00Z,1s,2,call,true,true,10, ResGroup21,*string_prefix,HdrDestination,10;20,,,,,,,, @@ -286,7 +286,7 @@ var csvr *TpReader func init() { csvr = NewTpReader(dataStorage, NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats, thresholds), testTPID, "") + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resCfgs, stats, thresholds), testTPID, "") if err := csvr.LoadDestinations(); err != nil { log.Print("error in LoadDestinations:", err) } @@ -348,7 +348,7 @@ func init() { csvr.WriteToDatabase(false, false, false) cache.Flush() dataStorage.LoadRatingCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - dataStorage.LoadAccountingCache(nil, nil, nil) + dataStorage.LoadAccountingCache(nil, nil, nil, nil) } func TestLoadDestinations(t *testing.T) { @@ -1389,7 +1389,7 @@ func TestLoadReverseAliases(t *testing.T) { } func TestLoadResources(t *testing.T) { - eResLimits := map[string]*utils.TPResource{ + eResCfgs := map[string]*utils.TPResource{ "ResGroup21": &utils.TPResource{ TPid: testTPID, ID: "ResGroup21", @@ -1423,10 +1423,10 @@ func TestLoadResources(t *testing.T) { Limit: "2", }, } - if len(csvr.resLimits) != len(eResLimits) { - t.Error("Failed to load resourcelimits: ", len(csvr.resLimits)) - } else if !reflect.DeepEqual(eResLimits["ResGroup22"], csvr.resLimits["ResGroup22"]) { - t.Errorf("Expecting: %+v, received: %+v", eResLimits["ResGroup22"], csvr.resLimits["ResGroup22"]) + if len(csvr.resCfgs) != len(eResCfgs) { + t.Error("Failed to load resourcelimits: ", len(csvr.resCfgs)) + } else if !reflect.DeepEqual(eResCfgs["ResGroup22"], csvr.resCfgs["ResGroup22"]) { + t.Errorf("Expecting: %+v, received: %+v", eResCfgs["ResGroup22"], csvr.resCfgs["ResGroup22"]) } } diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index c9f6e7d37..212bfd30f 100755 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -309,7 +309,7 @@ func TestLoaderITWriteToDatabase(t *testing.T) { } } - for k, rl := range loader.resLimits { + for k, rl := range loader.resCfgs { rcv, err := loader.dataStorage.GetResourceCfg(k, true, utils.NonTransactional) if err != nil { t.Error("Failed GetResourceLimit: ", err.Error()) diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 45e156624..509b943a5 100755 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -1837,6 +1837,8 @@ func (tps TpResources) AsTPResources() (result []*utils.TPResource) { if tp.AllocationMessage != "" { rl.AllocationMessage = tp.AllocationMessage } + rl.Blocker = tp.Blocker + rl.Stored = tp.Stored if len(tp.ActivationInterval) != 0 { rl.ActivationInterval = new(utils.TPActivationInterval) aiSplt := strings.Split(tp.ActivationInterval, utils.INFIELD_SEP) @@ -1884,6 +1886,8 @@ func APItoModelResource(rl *utils.TPResource) (mdls TpResources) { mdl.Weight = rl.Weight mdl.Limit = rl.Limit mdl.AllocationMessage = rl.AllocationMessage + mdl.Blocker = rl.Blocker + mdl.Stored = rl.Stored if rl.ActivationInterval != nil { if rl.ActivationInterval.ActivationTime != "" { mdl.ActivationInterval = rl.ActivationInterval.ActivationTime @@ -1894,20 +1898,20 @@ func APItoModelResource(rl *utils.TPResource) (mdls TpResources) { } for i, val := range rl.Thresholds { if i != 0 { - mdl.Thresholds = mdl.Thresholds + utils.INFIELD_SEP + val - } else { - mdl.Thresholds = val + mdl.Thresholds += utils.INFIELD_SEP } + mdl.Thresholds += val + } } mdl.FilterType = fltr.Type mdl.FilterFieldName = fltr.FieldName for i, val := range fltr.Values { if i != 0 { - mdl.FilterFieldValues = mdl.FilterFieldValues + utils.INFIELD_SEP + val - } else { - mdl.FilterFieldValues = val + mdl.FilterFieldValues += utils.INFIELD_SEP } + mdl.FilterFieldValues += val + } mdls = append(mdls, mdl) } @@ -1961,6 +1965,13 @@ func (tps TpStatsS) AsTPStats() (result []*utils.TPStats) { Stored: tp.Stored, } } + if tp.Blocker == false || tp.Blocker == true { + st.Blocker = tp.Blocker + } + if tp.Stored == false || tp.Stored == true { + st.Stored = tp.Stored + } + if tp.QueueLength != 0 { st.QueueLength = tp.QueueLength } @@ -2030,8 +2041,11 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) { } mdl.Metrics += val } - for _, val := range st.Thresholds { - mdl.Thresholds = mdl.Thresholds + utils.INFIELD_SEP + val + for i, val := range st.Thresholds { + if i != 0 { + mdl.Thresholds += utils.INFIELD_SEP + } + mdl.Thresholds += val } if st.ActivationInterval != nil { if st.ActivationInterval.ActivationTime != "" { @@ -2046,10 +2060,9 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) { mdl.FilterFieldName = fltr.FieldName for i, val := range fltr.Values { if i != 0 { - mdl.FilterFieldValues = mdl.FilterFieldValues + utils.INFIELD_SEP + val - } else { - mdl.FilterFieldValues = val + mdl.FilterFieldValues += utils.INFIELD_SEP } + mdl.FilterFieldValues += val } mdls = append(mdls, mdl) } diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 70abcbb92..930cd9f63 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -61,6 +61,7 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCacheAlias, testOnStorITCacheReverseAlias, testOnStorITCacheResource, + testOnStorITCacheResourceCfg, testOnStorITCacheTiming, // ToDo: test cache flush for a prefix // ToDo: testOnStorITLoadAccountingCache @@ -84,6 +85,7 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCRUDAlias, testOnStorITCRUDReverseAlias, testOnStorITCRUDResource, + testOnStorITCRUDResourceCfg, testOnStorITCRUDTiming, testOnStorITCRUDHistory, testOnStorITCRUDStructVersion, @@ -778,7 +780,7 @@ func testOnStorITCacheReverseAlias(t *testing.T) { } } -func testOnStorITCacheResource(t *testing.T) { +func testOnStorITCacheResourceCfg(t *testing.T) { rCfg := &ResourceCfg{ ID: "RL_TEST", Weight: 10, @@ -849,6 +851,42 @@ func testOnStorITCacheTiming(t *testing.T) { } } +func testOnStorITCacheResource(t *testing.T) { + res := &Resource{ + ID: "RL1", + Usages: map[string]*ResourceUsage{ + "RU1": &ResourceUsage{ + ID: "RU1", + ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC).Local(), + Units: 2, + }, + }, + TTLIdx: []string{"RU1"}, + } + if err := onStor.SetResource(res); err != nil { + t.Error(err) + } + expectedT := []string{"res_RL1"} + if itm, err := onStor.GetKeysForPrefix(utils.ResourcesPrefix); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expectedT, itm) { + t.Errorf("Expected : %+v, but received %+v", expectedT, itm) + } + + if _, hasIt := cache.Get(utils.ResourcesPrefix + res.ID); hasIt { + t.Error("Already in cache") + } + if err := onStor.CacheDataFromDB(utils.ResourcesPrefix, []string{res.ID}, false); err != nil { + t.Error(err) + } + if itm, hasIt := cache.Get(utils.ResourcesPrefix + res.ID); !hasIt { + t.Error("Did not cache") + } else if rcv := itm.(*Resource); !reflect.DeepEqual(res, rcv) { + t.Errorf("Expecting: %+v, received: %+v", res, rcv) + } + +} + func testOnStorITHasData(t *testing.T) { rp := &RatingPlan{ Id: "HasData", @@ -1744,7 +1782,7 @@ func testOnStorITCRUDReverseAlias(t *testing.T) { // } } -func testOnStorITCRUDResource(t *testing.T) { +func testOnStorITCRUDResourceCfg(t *testing.T) { rL := &ResourceCfg{ ID: "RL_TEST2", Weight: 10, @@ -1853,6 +1891,42 @@ func testOnStorITCRUDHistory(t *testing.T) { } } +func testOnStorITCRUDResource(t *testing.T) { + res := &Resource{ + ID: "RL1", + Usages: map[string]*ResourceUsage{ + "RU1": &ResourceUsage{ + ID: "RU1", + ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC).Local(), + Units: 2, + }, + }, + TTLIdx: []string{"RU1"}, + } + if _, rcvErr := onStor.GetResource("RL1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } + if err := onStor.SetResource(res); err != nil { + t.Error(err) + } + if rcv, err := onStor.GetResource("RL1", true, utils.NonTransactional); err != nil { + t.Error(err) + } else if !(reflect.DeepEqual(res, rcv)) { + t.Errorf("Expecting: %v, received: %v", res, rcv) + } + if rcv, err := onStor.GetResource("RL1", false, utils.NonTransactional); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(res, rcv) { + t.Errorf("Expecting: %v, received: %v", res, rcv) + } + if err := onStor.RemoveResource(res.ID, utils.NonTransactional); err != nil { + t.Error(err) + } + if _, rcvErr := onStor.GetResource(res.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } +} + func testOnStorITCRUDStructVersion(t *testing.T) { cv := &StructVersion{ Destinations: "1", diff --git a/engine/reqfilterhelpers.go b/engine/reqfilterhelpers.go index 463b557ab..bee0cb0c4 100644 --- a/engine/reqfilterhelpers.go +++ b/engine/reqfilterhelpers.go @@ -25,6 +25,7 @@ import ( // matchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event // helper on top of dataDB.MatchReqFilterIndex, adding utils.NOT_AVAILABLE to list of fields queried +// executes a number of $(len(fields) + 1) queries to dataDB so the size of event influences the speed of return func matchingItemIDsForEvent(ev map[string]interface{}, dataDB DataDB, dbIdxKey string) (itemIDs utils.StringMap, err error) { itemIDs = make(utils.StringMap) for fldName, fieldValIf := range ev { diff --git a/engine/resources.go b/engine/resources.go index 8815bf772..823d38fb4 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -193,7 +193,7 @@ func (rs Resources) ids() (ids []string) { // returns utils.ErrResourceUnavailable if allocation is not possible func (rs Resources) AllocateResource(ru *ResourceUsage, dryRun bool) (alcMessage string, err error) { if len(rs) == 0 { - return utils.META_NONE, nil + return "", utils.ErrResourceUnavailable } lockIDs := utils.PrefixSliceItems(rs.ids(), utils.ResourcesPrefix) guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...) @@ -220,26 +220,29 @@ func (rs Resources) AllocateResource(ru *ResourceUsage, dryRun bool) (alcMessage } // Pas the config as a whole so we can ask access concurrently -func NewResourceService(cfg *config.CGRConfig, dataDB DataDB, statS rpcclient.RpcClientConnection) (*ResourceService, error) { +func NewResourceService(dataDB DataDB, shortCache *config.CacheParamConfig, storeInterval time.Duration, + statS rpcclient.RpcClientConnection) (*ResourceService, error) { if statS != nil && reflect.ValueOf(statS).IsNil() { statS = nil } return &ResourceService{dataDB: dataDB, statS: statS, - scEventResources: ltcache.New(ltcache.UnlimitedCaching, time.Duration(1)*time.Minute, false, nil), - lcEventResources: ltcache.New(ltcache.UnlimitedCaching, ltcache.UnlimitedCaching, false, nil)}, nil + scEventResources: ltcache.New(shortCache.Limit, shortCache.TTL, shortCache.StaticTTL, nil), + lcEventResources: make(map[string][]string), + storedResources: make(utils.StringMap), + storeInterval: storeInterval, stopBackup: make(chan struct{})}, nil } // ResourceService is the service handling resources type ResourceService struct { - cfg *config.CGRConfig dataDB DataDB // So we can load the data in cache and index it statS rpcclient.RpcClientConnection // allows applying filters based on stats scEventResources *ltcache.Cache // short cache map[ruID], used to keep references to matched resources for events in allow queries - lcEventResources *ltcache.Cache // cache recording resources for events in alocation phase + lcEventResources map[string][]string // cache recording resources for events in alocation phase + lcERMux sync.RWMutex // protects the lcEventResources storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool - srMux sync.RWMutex - stopBackup chan struct{} // control storing process - backupInterval time.Duration + srMux sync.RWMutex // protects storedResources + storeInterval time.Duration // interval to dump data on + stopBackup chan struct{} // control storing process } // Called to start the service @@ -251,7 +254,7 @@ func (rS *ResourceService) ListenAndServe(exitChan chan bool) error { } // Called to shutdown the service -func (rS *ResourceService) ServiceShutdown() error { +func (rS *ResourceService) Shutdown() error { utils.Logger.Info(" service shutdown initialized") close(rS.stopBackup) rS.storeResources() @@ -306,7 +309,7 @@ func (rS *ResourceService) storeResources() { // backup will regularly store resources changed to dataDB func (rS *ResourceService) runBackup() { - if rS.backupInterval <= 0 { + if rS.storeInterval <= 0 { return } for { @@ -316,7 +319,7 @@ func (rS *ResourceService) runBackup() { } rS.storeResources() } - time.Sleep(rS.backupInterval) + time.Sleep(rS.storeInterval) } // cachedResourcesForEvent attempts to retrieve cached resources for an event @@ -324,24 +327,24 @@ func (rS *ResourceService) runBackup() { // returns []Resource if negative reply was cached func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) { var shortCached bool - rIDsIf, has := rS.lcEventResources.Get(evUUID) + rS.lcERMux.RLock() + rIDs, has := rS.lcEventResources[evUUID] + rS.lcERMux.RUnlock() if !has { - if rIDsIf, has = rS.scEventResources.Get(evUUID); !has { + if rIDsIf, has := rS.scEventResources.Get(evUUID); !has { return nil + } else if rIDsIf != nil { + rIDs = rIDsIf.([]string) } shortCached = true } - var rIDs []string - if rIDsIf != nil { - rIDs = rIDsIf.([]string) - } rs = make(Resources, len(rIDs)) if len(rIDs) == 0 { return } lockIDs := utils.PrefixSliceItems(rIDs, utils.ResourcesPrefix) - guardian.Guardian.GuardIDs(rS.cfg.LockingTimeout, lockIDs...) + guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...) defer guardian.Guardian.UnguardIDs(lockIDs...) for i, rID := range rIDs { if r, err := rS.dataDB.GetResource(rID, false, ""); err != nil { @@ -352,7 +355,9 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) if shortCached { rS.scEventResources.Remove(evUUID) } else { - rS.lcEventResources.Remove(evUUID) + rS.lcERMux.Lock() + delete(rS.lcEventResources, evUUID) + rS.lcERMux.Unlock() } return nil } else { @@ -370,7 +375,7 @@ func (rS *ResourceService) matchingResourcesForEvent(ev map[string]interface{}) return nil, err } lockIDs := utils.PrefixSliceItems(rIDs.Slice(), utils.ResourcesPrefix) - guardian.Guardian.GuardIDs(rS.cfg.LockingTimeout, lockIDs...) + guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...) defer guardian.Guardian.UnguardIDs(lockIDs...) for resName := range rIDs { rCfg, err := rS.dataDB.GetResourceCfg(resName, false, utils.NonTransactional) @@ -452,6 +457,7 @@ func (rS *ResourceService) V1AllowUsage(args utils.AttrRLsResourceUsage, allow * Units: args.Units}, true); err != nil { if err == utils.ErrResourceUnavailable { rS.scEventResources.Set(args.UsageID, nil) + err = nil return // not error but still not allowed } return err @@ -486,13 +492,14 @@ func (rS *ResourceService) V1AllocateResource(args utils.AttrRLsResourceUsage, r } } if wasShortCached || !wasCached { - rS.lcEventResources.Set(args.UsageID, mtcRLs.ids()) + rS.lcERMux.Lock() + rS.lcEventResources[args.UsageID] = mtcRLs.ids() + rS.lcERMux.Unlock() } - // index it for storing rS.srMux.Lock() for _, r := range mtcRLs { - if rS.backupInterval == -1 { + if rS.storeInterval == -1 { rS.StoreResource(r) } else if r.dirty != nil { *r.dirty = true // mark it to be saved @@ -513,13 +520,15 @@ func (rS *ResourceService) V1ReleaseResource(args utils.AttrRLsResourceUsage, re } } mtcRLs.clearUsage(args.UsageID) - rS.lcEventResources.Remove(args.UsageID) - if rS.backupInterval != -1 { + rS.lcERMux.Lock() + delete(rS.lcEventResources, args.UsageID) + rS.lcERMux.Unlock() + if rS.storeInterval != -1 { rS.srMux.Lock() } for _, r := range mtcRLs { if r.dirty != nil { - if rS.backupInterval == -1 { + if rS.storeInterval == -1 { rS.StoreResource(r) } else { *r.dirty = true // mark it to be saved @@ -527,7 +536,7 @@ func (rS *ResourceService) V1ReleaseResource(args utils.AttrRLsResourceUsage, re } } } - if rS.backupInterval != -1 { + if rS.storeInterval != -1 { rS.srMux.Unlock() } *reply = utils.OK diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 27ed86c41..9bf78850d 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -80,7 +80,7 @@ type DataDB interface { RemAccountActionPlans(acntID string, apIDs []string) (err error) PushTask(*Task) error PopTask() (*Task, error) - LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs []string) error + LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs, resIDs []string) error GetAccount(string) (*Account, error) SetAccount(*Account) error RemoveAccount(string) error diff --git a/engine/storage_map.go b/engine/storage_map.go index dcae9ea8d..76aab54ee 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -165,7 +165,7 @@ func (ms *MapStorage) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, return } -func (ms *MapStorage) LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs []string) error { +func (ms *MapStorage) LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs, resIDs []string) error { if ms.cacheCfg == nil { return nil } @@ -317,7 +317,7 @@ func (ms *MapStorage) HasData(categ, subject string) (bool, error) { ms.mu.RLock() defer ms.mu.RUnlock() switch categ { - case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX: + case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ResourcesPrefix: _, exists := ms.dict[categ+subject] return exists, nil } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 151e2f902..0452c1257 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -60,6 +60,7 @@ const ( colSts = "stats" colRFI = "request_filter_indexes" colTmg = "timings" + colRes = "resources" ) var ( @@ -149,7 +150,7 @@ func (ms *MongoStorage) EnsureIndexes() (err error) { } var colectNames []string // collection names containing this index if ms.storageType == utils.DataDB { - colectNames = []string{colAct, colApl, colAAp, colAtr, colDcs, colRCfgs, colRpl, colLcr, colDst, colRds, colAls, colUsr, colLht} + colectNames = []string{colAct, colApl, colAAp, colAtr, colDcs, colRCfgs, colRpl, colLcr, colDst, colRds, colAls, colUsr, colLht, colRes} } for _, col := range colectNames { if err = db.C(col).EnsureIndex(idx); err != nil { @@ -180,7 +181,7 @@ func (ms *MongoStorage) EnsureIndexes() (err error) { Sparse: false, } for _, col := range []string{utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPDestinationRates, utils.TBLTPRatingPlans, - utils.TBLTPSharedGroups, utils.TBLTPCdrStats, utils.TBLTPActions, utils.TBLTPActionPlans, utils.TBLTPActionTriggers, utils.TBLTPStats} { + utils.TBLTPSharedGroups, utils.TBLTPCdrStats, utils.TBLTPActions, utils.TBLTPActionPlans, utils.TBLTPActionTriggers, utils.TBLTPStats, utils.TBLTPResources} { if err = db.C(col).EnsureIndex(idx); err != nil { return } @@ -326,6 +327,7 @@ func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool utils.ResourceConfigsPrefix: colRCfg, utils.StatsPrefix: colSts, utils.TimingsPrefix: colTmg, + utils.ResourcesPrefix: colRes, } name, ok = colMap[prefix] return @@ -436,11 +438,12 @@ func (ms *MongoStorage) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs return } -func (ms *MongoStorage) LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs []string) (err error) { +func (ms *MongoStorage) LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs, resIDs []string) (err error) { for key, ids := range map[string][]string{ utils.ALIASES_PREFIX: alsIDs, utils.REVERSE_ALIASES_PREFIX: rvAlsIDs, utils.ResourceConfigsPrefix: rlIDs, + utils.ResourcesPrefix: resIDs, } { if err = ms.CacheDataFromDB(key, ids, false); err != nil { return @@ -467,7 +470,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached utils.ALIASES_PREFIX, utils.REVERSE_ALIASES_PREFIX, utils.ResourceConfigsPrefix, - utils.TimingsPrefix}, prfx) { + utils.TimingsPrefix, + utils.ResourcesPrefix}, prfx) { return utils.NewCGRError(utils.MONGO, utils.MandatoryIEMissingCaps, utils.UnsupportedCachePrefix, @@ -534,6 +538,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached _, err = ms.GetResourceCfg(dataID, true, utils.NonTransactional) case utils.TimingsPrefix: _, err = ms.GetTiming(dataID, true, utils.NonTransactional) + case utils.ResourcesPrefix: + _, err = ms.GetResource(dataID, true, utils.NonTransactional) } if err != nil { return utils.NewCGRError(utils.MONGO, @@ -649,6 +655,11 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er for iter.Next(&idResult) { result = append(result, utils.TimingsPrefix+idResult.Id) } + case utils.ResourcesPrefix: + iter := db.C(colRes).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter() + for iter.Next(&idResult) { + result = append(result, utils.ResourcesPrefix+idResult.Id) + } default: err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) } @@ -678,6 +689,9 @@ func (ms *MongoStorage) HasData(category, subject string) (bool, error) { case utils.ACCOUNT_PREFIX: count, err := db.C(colAcc).Find(bson.M{"id": subject}).Count() return count > 0, err + case utils.ResourcesPrefix: + count, err := db.C(colRes).Find(bson.M{"id": subject}).Count() + return count > 0, err } return false, errors.New("unsupported category in HasData") } @@ -1893,15 +1907,44 @@ func (ms *MongoStorage) RemoveResourceCfg(id string, transactionID string) (err } func (ms *MongoStorage) GetResource(id string, skipCache bool, transactionID string) (r *Resource, err error) { + key := utils.ResourcesPrefix + id + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*Resource), nil + } + } + session, col := ms.conn(colRes) + defer session.Close() + r = new(Resource) + if err = col.Find(bson.M{"id": id}).One(r); err != nil { + if err == mgo.ErrNotFound { + err = utils.ErrNotFound + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + } + return nil, err + } + cache.Set(key, r, cacheCommit(transactionID), transactionID) return } func (ms *MongoStorage) SetResource(r *Resource) (err error) { + session, col := ms.conn(colRes) + defer session.Close() + _, err = col.Upsert(bson.M{"id": r.ID}, r) return } func (ms *MongoStorage) RemoveResource(id string, transactionID string) (err error) { - return + session, col := ms.conn(colRes) + defer session.Close() + if err = col.Remove(bson.M{"id": id}); err != nil { + return + } + cache.RemKey(utils.ResourcesPrefix+id, cacheCommit(transactionID), transactionID) + return nil } func (ms *MongoStorage) GetTiming(id string, skipCache bool, transactionID string) (t *utils.TPTiming, err error) { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 1c3ad8ecf..7b3649563 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1,4 +1,4 @@ -/* +/* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH @@ -140,11 +140,12 @@ func (rs *RedisStorage) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs return } -func (rs *RedisStorage) LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs []string) (err error) { +func (rs *RedisStorage) LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs, resIDs []string) (err error) { for key, ids := range map[string][]string{ utils.ALIASES_PREFIX: alsIDs, utils.REVERSE_ALIASES_PREFIX: rvAlsIDs, utils.ResourceConfigsPrefix: rlIDs, + utils.ResourcesPrefix: resIDs, } { if err = rs.CacheDataFromDB(key, ids, false); err != nil { return @@ -231,7 +232,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached utils.ALIASES_PREFIX, utils.REVERSE_ALIASES_PREFIX, utils.ResourceConfigsPrefix, - utils.TimingsPrefix}, prfx) { + utils.TimingsPrefix, + utils.ResourcesPrefix}, prfx) { return utils.NewCGRError(utils.REDIS, utils.MandatoryIEMissingCaps, utils.UnsupportedCachePrefix, @@ -298,6 +300,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached _, err = rs.GetResourceCfg(dataID, true, utils.NonTransactional) case utils.TimingsPrefix: _, err = rs.GetTiming(dataID, true, utils.NonTransactional) + case utils.ResourcesPrefix: + _, err = rs.GetResource(dataID, true, utils.NonTransactional) } if err != nil { return utils.NewCGRError(utils.REDIS, @@ -324,7 +328,7 @@ func (rs *RedisStorage) GetKeysForPrefix(prefix string) ([]string, error) { // Used to check if specific subject is stored using prefix key attached to entity func (rs *RedisStorage) HasData(category, subject string) (bool, error) { switch category { - case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX: + case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ResourcesPrefix: i, err := rs.Cmd("EXISTS", category+subject).Int() return i == 1, err } @@ -1375,8 +1379,7 @@ func (rs *RedisStorage) GetAllCdrStats() (css []*CdrStats, err error) { // return // } -func (rs *RedisStorage) GetResourceCfg(id string, - skipCache bool, transactionID string) (rl *ResourceCfg, err error) { +func (rs *RedisStorage) GetResourceCfg(id string, skipCache bool, transactionID string) (rl *ResourceCfg, err error) { key := utils.ResourceConfigsPrefix + id if !skipCache { if x, ok := cache.Get(key); ok { @@ -1424,14 +1427,44 @@ func (rs *RedisStorage) RemoveResourceCfg(id string, transactionID string) (err } func (rs *RedisStorage) GetResource(id string, skipCache bool, transactionID string) (r *Resource, err error) { + key := utils.ResourcesPrefix + id + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*Resource), nil + } + } + var values []byte + if values, err = rs.Cmd("GET", key).Bytes(); err != nil { + if err == redis.ErrRespNil { // did not find the destination + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + err = utils.ErrNotFound + } + return + } + if err = rs.ms.Unmarshal(values, &r); err != nil { + return + } + cache.Set(key, r, cacheCommit(transactionID), transactionID) return } func (rs *RedisStorage) SetResource(r *Resource) (err error) { - return + result, err := rs.ms.Marshal(r) + if err != nil { + return err + } + return rs.Cmd("SET", utils.ResourcesPrefix+r.ID, result).Err } func (rs *RedisStorage) RemoveResource(id string, transactionID string) (err error) { + key := utils.ResourcesPrefix + id + if err = rs.Cmd("DEL", key).Err; err != nil { + return + } + cache.RemKey(key, cacheCommit(transactionID), transactionID) return } diff --git a/engine/stordb_it_test.go b/engine/stordb_it_test.go index 47c785177..c3dbdd0cb 100755 --- a/engine/stordb_it_test.go +++ b/engine/stordb_it_test.go @@ -1440,6 +1440,8 @@ func testStorDBitCRUDTpResources(t *testing.T) { Values: []string{"test1", "test2"}, }, }, + Blocker: true, + Stored: true, }, &utils.TPResource{ TPid: "testTPid", @@ -1455,6 +1457,8 @@ func testStorDBitCRUDTpResources(t *testing.T) { Values: []string{"test1", "test2"}, }, }, + Blocker: true, + Stored: false, }, } if err := storDB.SetTPResources(snd); err != nil { diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 052219dc6..c2196e012 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -52,7 +52,8 @@ type TpReader struct { cdrStats map[string]*CdrStats users map[string]*UserProfile aliases map[string]*Alias - resLimits map[string]*utils.TPResource + resCfgs map[string]*utils.TPResource + res []string // IDs of resources which need creation based on resourceConfigs stats map[string]*utils.TPStats thresholds map[string]*utils.TPThreshold @@ -126,7 +127,7 @@ func (tpr *TpReader) Init() { tpr.users = make(map[string]*UserProfile) tpr.aliases = make(map[string]*Alias) tpr.derivedChargers = make(map[string]*utils.DerivedChargers) - tpr.resLimits = make(map[string]*utils.TPResource) + tpr.resCfgs = make(map[string]*utils.TPResource) tpr.stats = make(map[string]*utils.TPStats) tpr.thresholds = make(map[string]*utils.TPThreshold) tpr.revDests = make(map[string][]string) @@ -1600,7 +1601,14 @@ func (tpr *TpReader) LoadResourcesFiltered(tag string) error { for _, rl := range rls { mapRLs[rl.ID] = rl } - tpr.resLimits = mapRLs + tpr.resCfgs = mapRLs + for rID := range mapRLs { + if has, err := tpr.dataStorage.HasData(utils.ResourcesPrefix, rID); err != nil { + return err + } else if !has { + tpr.res = append(tpr.res, rID) + } + } return nil } @@ -1932,10 +1940,30 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Printf("\t %s : %+v", id, vals) } } + if verbose { + log.Print("ResourceConfigs:") + } + for _, tpRL := range tpr.resCfgs { + rl, err := APItoResource(tpRL, tpr.timezone) + if err != nil { + return err + } + if err = tpr.dataStorage.SetResourceCfg(rl, utils.NonTransactional); err != nil { + return err + } + if verbose { + log.Print("\t", rl.ID) + } + } if verbose { log.Print("Resources:") } - for _, tpRL := range tpr.resLimits { + for _, rID := range tpr.res { + if err = tpr.dataStorage.SetResource(&Resource{ID: rID, Usages: make(map[string]*ResourceUsage)}); err != nil { + return + } + } + for _, tpRL := range tpr.resCfgs { rl, err := APItoResource(tpRL, tpr.timezone) if err != nil { return err @@ -2013,7 +2041,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err return err } } - if len(tpr.resLimits) > 0 { + if len(tpr.resCfgs) > 0 { if verbose { log.Print("Indexing resource limits") } @@ -2021,7 +2049,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } - for _, tpRL := range tpr.resLimits { + for _, tpRL := range tpr.resCfgs { if rl, err := APItoResource(tpRL, tpr.timezone); err != nil { return err } else { @@ -2141,7 +2169,7 @@ func (tpr *TpReader) ShowStatistics() { // cdr stats log.Print("CDR stats: ", len(tpr.cdrStats)) // resource limits - log.Print("ResourceLimits: ", len(tpr.resLimits)) + log.Print("ResourceLimits: ", len(tpr.resCfgs)) // stats log.Print("Stats: ", len(tpr.stats)) } @@ -2254,9 +2282,9 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { } return keys, nil case utils.ResourceConfigsPrefix: - keys := make([]string, len(tpr.resLimits)) + keys := make([]string, len(tpr.resCfgs)) i := 0 - for k := range tpr.resLimits { + for k := range tpr.resCfgs { keys[i] = k i++ } diff --git a/general_tests/acntacts_test.go b/general_tests/acntacts_test.go index 1ddeddbae..1c2bc6dfb 100644 --- a/general_tests/acntacts_test.go +++ b/general_tests/acntacts_test.go @@ -64,7 +64,7 @@ ENABLE_ACNT,*enable_account,,,,,,,,,,,,,,false,false,10` cache.Flush() dbAcntActs.LoadRatingCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - dbAcntActs.LoadAccountingCache(nil, nil, nil) + dbAcntActs.LoadAccountingCache(nil, nil, nil, nil) expectAcnt := &engine.Account{ID: "cgrates.org:1"} if acnt, err := dbAcntActs.GetAccount("cgrates.org:1"); err != nil { diff --git a/general_tests/auth_test.go b/general_tests/auth_test.go index ea25082f9..009e06513 100644 --- a/general_tests/auth_test.go +++ b/general_tests/auth_test.go @@ -76,7 +76,7 @@ RP_ANY,DR_ANY_1CNT,*any,10` cache.Flush() dbAuth.LoadRatingCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - dbAuth.LoadAccountingCache(nil, nil, nil) + dbAuth.LoadAccountingCache(nil, nil, nil, nil) if cachedDests := cache.CountEntries(utils.DESTINATION_PREFIX); cachedDests != 0 { t.Error("Wrong number of cached destinations found", cachedDests) diff --git a/general_tests/costs1_test.go b/general_tests/costs1_test.go index 630dd3caa..92ef56747 100644 --- a/general_tests/costs1_test.go +++ b/general_tests/costs1_test.go @@ -74,7 +74,7 @@ RP_SMS1,DR_SMS_1,ALWAYS,10` csvr.WriteToDatabase(false, false, false) cache.Flush() dataDB.LoadRatingCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - dataDB.LoadAccountingCache(nil, nil, nil) + dataDB.LoadAccountingCache(nil, nil, nil, nil) if cachedRPlans := cache.CountEntries(utils.RATING_PLAN_PREFIX); cachedRPlans != 3 { t.Error("Wrong number of cached rating plans found", cachedRPlans) diff --git a/general_tests/datachrg1_test.go b/general_tests/datachrg1_test.go index 32ed70afd..a7384e9e0 100644 --- a/general_tests/datachrg1_test.go +++ b/general_tests/datachrg1_test.go @@ -61,7 +61,7 @@ RP_DATA1,DR_DATA_2,TM2,10` csvr.WriteToDatabase(false, false, false) cache.Flush() dataDB.LoadRatingCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - dataDB.LoadAccountingCache(nil, nil, nil) + dataDB.LoadAccountingCache(nil, nil, nil, nil) if cachedRPlans := cache.CountEntries(utils.RATING_PLAN_PREFIX); cachedRPlans != 1 { t.Error("Wrong number of cached rating plans found", cachedRPlans) diff --git a/general_tests/ddazmbl1_test.go b/general_tests/ddazmbl1_test.go index 486770a64..7bd6d307e 100644 --- a/general_tests/ddazmbl1_test.go +++ b/general_tests/ddazmbl1_test.go @@ -111,7 +111,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` } cache.Flush() dataDB.LoadRatingCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - dataDB.LoadAccountingCache(nil, nil, nil) + dataDB.LoadAccountingCache(nil, nil, nil, nil) if cachedDests := cache.CountEntries(utils.DESTINATION_PREFIX); cachedDests != 0 { t.Error("Wrong number of cached destinations found", cachedDests) diff --git a/general_tests/ddazmbl2_test.go b/general_tests/ddazmbl2_test.go index 7f7b49965..831761d4a 100644 --- a/general_tests/ddazmbl2_test.go +++ b/general_tests/ddazmbl2_test.go @@ -111,7 +111,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` } cache.Flush() dataDB2.LoadRatingCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - dataDB2.LoadAccountingCache(nil, nil, nil) + dataDB2.LoadAccountingCache(nil, nil, nil, nil) if cachedDests := cache.CountEntries(utils.DESTINATION_PREFIX); cachedDests != 0 { t.Error("Wrong number of cached destinations found", cachedDests) diff --git a/general_tests/ddazmbl3_test.go b/general_tests/ddazmbl3_test.go index fa7f5953e..19537e324 100644 --- a/general_tests/ddazmbl3_test.go +++ b/general_tests/ddazmbl3_test.go @@ -109,7 +109,7 @@ RP_UK,DR_UK_Mobile_BIG5,ALWAYS,10` } cache.Flush() dataDB3.LoadRatingCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - dataDB3.LoadAccountingCache(nil, nil, nil) + dataDB3.LoadAccountingCache(nil, nil, nil, nil) if cachedDests := cache.CountEntries(utils.DESTINATION_PREFIX); cachedDests != 0 { t.Error("Wrong number of cached destinations found", cachedDests) diff --git a/general_tests/smschrg1_test.go b/general_tests/smschrg1_test.go index cef967b2b..4f11b2787 100644 --- a/general_tests/smschrg1_test.go +++ b/general_tests/smschrg1_test.go @@ -59,7 +59,7 @@ func TestSMSLoadCsvTpSmsChrg1(t *testing.T) { csvr.WriteToDatabase(false, false, false) cache.Flush() dataDB.LoadRatingCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - dataDB.LoadAccountingCache(nil, nil, nil) + dataDB.LoadAccountingCache(nil, nil, nil, nil) if cachedRPlans := cache.CountEntries(utils.RATING_PLAN_PREFIX); cachedRPlans != 1 { t.Error("Wrong number of cached rating plans found", cachedRPlans) diff --git a/general_tests/tutorial_it_test.go b/general_tests/tutorial_it_test.go index 4b5eda0a1..e0cbde196 100644 --- a/general_tests/tutorial_it_test.go +++ b/general_tests/tutorial_it_test.go @@ -104,7 +104,7 @@ func TestTutITCacheStats(t *testing.T) { var rcvStats *utils.CacheStats expectedStats := &utils.CacheStats{Destinations: 5, ReverseDestinations: 7, RatingPlans: 4, RatingProfiles: 9, Actions: 8, ActionPlans: 4, AccountActionPlans: 5, SharedGroups: 1, DerivedChargers: 1, LcrProfiles: 5, - CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, Resources: 3} + CdrStats: 6, Users: 3, Aliases: 1, ReverseAliases: 2, ResourcesConfig: 0, Resources: 3} var args utils.AttrCacheStats if err := tutLocalRpc.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil { t.Error("Got error on ApierV1.GetCacheStats: ", err.Error()) diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 88f6070f0..3fe7cddff 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -283,7 +283,7 @@ func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) { Units: 1, } if sm.rls != nil { - if err := sm.rls.Call("RLsV1.ReleaseResource", attrRU, &reply); err != nil { + if err := sm.rls.Call("ResourceSV1.ReleaseResource", attrRU, &reply); err != nil { utils.Logger.Err(fmt.Sprintf(" RLs API error: %s", err.Error())) } } diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index f7a1aa2c0..274d07c3e 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -74,7 +74,6 @@ func (self *KamailioSessionManager) allocateResources(kev KamEvent) (err error) if self.rlS == nil { return errors.New("no RLs connection") } - fmt.Printf("In allocateResources, rls: %+v", self.rlS) var ev map[string]interface{} if ev, err = kev.AsMapStringIface(); err != nil { return @@ -85,7 +84,7 @@ func (self *KamailioSessionManager) allocateResources(kev KamEvent) (err error) Units: 1, // One channel reserved } var reply string - return self.rlS.Call("RLsV1.AllocateResource", attrRU, &reply) + return self.rlS.Call("ResourceSV1.AllocateResource", attrRU, &reply) } func (self *KamailioSessionManager) onCgrAuth(evData []byte, connId string) { @@ -217,7 +216,7 @@ func (self *KamailioSessionManager) onCallEnd(evData []byte, connId string) { Event: ev, Units: 1, } - if err := self.rlS.Call("RLsV1.ReleaseResource", attrRU, &reply); err != nil { + if err := self.rlS.Call("ResourceSV1.ReleaseResource", attrRU, &reply); err != nil { utils.Logger.Err(fmt.Sprintf(" RLs API error: %s", err.Error())) } }() diff --git a/utils/apitpdata.go b/utils/apitpdata.go index c263a0c8c..eafebb408 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -667,6 +667,7 @@ type ArgsCache struct { DerivedChargerIDs *[]string AliasIDs *[]string ReverseAliasIDs *[]string + ResourceConfigIDs *[]string ResourceIDs *[]string StatsIDs *[]string ThresholdsIDs *[]string @@ -704,6 +705,7 @@ type CacheStats struct { Users int Aliases int ReverseAliases int + ResourcesConfig int Resources int //Stats int //thresholds int