From 63143098757cce3a7365141d3e953e4b4f8c0e13 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 5 Sep 2017 18:34:26 +0200 Subject: [PATCH 1/4] TpReader - create Resources when not in dataDB together with ResourceConfigs --- apier/v1/resourcesv1_it_test.go | 32 +++++++++++------------ engine/reqfilterhelpers.go | 1 + engine/resources.go | 4 ++- engine/storage_map.go | 2 +- engine/storage_mongo_datadb.go | 5 +++- engine/storage_redis.go | 4 +-- engine/tp_reader.go | 46 ++++++++++++++++++++++++++------- 7 files changed, 64 insertions(+), 30 deletions(-) diff --git a/apier/v1/resourcesv1_it_test.go b/apier/v1/resourcesv1_it_test.go index 84de43413..7f4e8be7e 100644 --- a/apier/v1/resourcesv1_it_test.go +++ b/apier/v1/resourcesv1_it_test.go @@ -216,16 +216,16 @@ func testV1RsAllocateResource(t *testing.T) { } func testV1RsAllowUsage(t *testing.T) { - var reply bool + var allowed bool attrRU := utils.AttrRLsResourceUsage{ UsageID: "651a8db2-4f67-4cf8-b622-169e8a482e51", Event: map[string]interface{}{"Account": "1002", "Subject": "1001", "Destination": "1002"}, Units: 1, } - if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &reply); err != nil { + if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &allowed); err != nil { t.Error(err) - } else if reply != true { - t.Errorf("Expecting: %+v, received: %+v", true, reply) + } else if !allowed { + t.Errorf("Expecting: %+v, received: %+v", true, allowed) } attrRU = utils.AttrRLsResourceUsage{ @@ -233,14 +233,15 @@ func testV1RsAllowUsage(t *testing.T) { Event: map[string]interface{}{"Account": "1002", "Subject": "1001", "Destination": "1002"}, Units: 2, } - if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &reply); err != nil { + if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &allowed); err != nil { // already t.Error(err) + } else if allowed { // already 3 usages active before allow call, we should have now more than allowed + t.Error("Resource allowed") } } func testV1RsReleaseResource(t *testing.T) { - var reply interface{} - + var reply string attrRU := utils.AttrRLsResourceUsage{ UsageID: "651a8db2-4f67-4cf8-b622-169e8a482e52", Event: map[string]interface{}{"Destination": "100"}, @@ -249,19 +250,18 @@ func testV1RsReleaseResource(t *testing.T) { if err := rlsV1Rpc.Call("ResourceSV1.ReleaseResource", attrRU, &reply); err != nil { t.Error(err) } - if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &reply); err != nil { + var allowed bool + if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &allowed); err != nil { t.Error(err) - } else { - if reply != true { - t.Errorf("Expecting: %+v, received: %+v", true, reply) - } + } else if !allowed { + t.Error("not allowed") } - attrRU.Units += 7 - if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &reply); err == nil { - t.Errorf("Expecting: %+v, received: %+v", false, reply) + if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &allowed); err != nil { + t.Error(err) + } else if allowed { + t.Error("Resource should not be allowed") } - } func testV1RsGetResourceConfigBeforeSet(t *testing.T) { diff --git a/engine/reqfilterhelpers.go b/engine/reqfilterhelpers.go index 463b557ab..bee0cb0c4 100644 --- a/engine/reqfilterhelpers.go +++ b/engine/reqfilterhelpers.go @@ -25,6 +25,7 @@ import ( // matchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event // helper on top of dataDB.MatchReqFilterIndex, adding utils.NOT_AVAILABLE to list of fields queried +// executes a number of $(len(fields) + 1) queries to dataDB so the size of event influences the speed of return func matchingItemIDsForEvent(ev map[string]interface{}, dataDB DataDB, dbIdxKey string) (itemIDs utils.StringMap, err error) { itemIDs = make(utils.StringMap) for fldName, fieldValIf := range ev { diff --git a/engine/resources.go b/engine/resources.go index 90491093c..50a273359 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -228,7 +228,8 @@ func NewResourceService(dataDB DataDB, shortCache *config.CacheParamConfig, stor return &ResourceService{dataDB: dataDB, statS: statS, scEventResources: ltcache.New(shortCache.Limit, shortCache.TTL, shortCache.StaticTTL, nil), lcEventResources: ltcache.New(ltcache.UnlimitedCaching, ltcache.UnlimitedCaching, false, nil), - storeInterval: storeInterval}, nil + storedResources: make(utils.StringMap), + storeInterval: storeInterval, stopBackup: make(chan struct{})}, nil } // ResourceService is the service handling resources @@ -453,6 +454,7 @@ func (rS *ResourceService) V1AllowUsage(args utils.AttrRLsResourceUsage, allow * Units: args.Units}, true); err != nil { if err == utils.ErrResourceUnavailable { rS.scEventResources.Set(args.UsageID, nil) + err = nil return // not error but still not allowed } return err diff --git a/engine/storage_map.go b/engine/storage_map.go index 9c6413ced..46c6286e0 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -317,7 +317,7 @@ func (ms *MapStorage) HasData(categ, subject string) (bool, error) { ms.mu.RLock() defer ms.mu.RUnlock() switch categ { - case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX: + case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ResourcesPrefix: _, exists := ms.dict[categ+subject] return exists, nil } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index ba65069a4..209f2c112 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1,4 +1,4 @@ -/* +/* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH @@ -688,6 +688,9 @@ func (ms *MongoStorage) HasData(category, subject string) (bool, error) { case utils.ACCOUNT_PREFIX: count, err := db.C(colAcc).Find(bson.M{"id": subject}).Count() return count > 0, err + case utils.ResourcesPrefix: + count, err := db.C(colRes).Find(bson.M{"id": subject}).Count() + return count > 0, err } return false, errors.New("unsupported category in HasData") } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index c208798cd..d9ba227a3 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1,4 +1,4 @@ -/* +/* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH @@ -327,7 +327,7 @@ func (rs *RedisStorage) GetKeysForPrefix(prefix string) ([]string, error) { // Used to check if specific subject is stored using prefix key attached to entity func (rs *RedisStorage) HasData(category, subject string) (bool, error) { switch category { - case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX: + case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ResourcesPrefix: i, err := rs.Cmd("EXISTS", category+subject).Int() return i == 1, err } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 052219dc6..c2196e012 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -52,7 +52,8 @@ type TpReader struct { cdrStats map[string]*CdrStats users map[string]*UserProfile aliases map[string]*Alias - resLimits map[string]*utils.TPResource + resCfgs map[string]*utils.TPResource + res []string // IDs of resources which need creation based on resourceConfigs stats map[string]*utils.TPStats thresholds map[string]*utils.TPThreshold @@ -126,7 +127,7 @@ func (tpr *TpReader) Init() { tpr.users = make(map[string]*UserProfile) tpr.aliases = make(map[string]*Alias) tpr.derivedChargers = make(map[string]*utils.DerivedChargers) - tpr.resLimits = make(map[string]*utils.TPResource) + tpr.resCfgs = make(map[string]*utils.TPResource) tpr.stats = make(map[string]*utils.TPStats) tpr.thresholds = make(map[string]*utils.TPThreshold) tpr.revDests = make(map[string][]string) @@ -1600,7 +1601,14 @@ func (tpr *TpReader) LoadResourcesFiltered(tag string) error { for _, rl := range rls { mapRLs[rl.ID] = rl } - tpr.resLimits = mapRLs + tpr.resCfgs = mapRLs + for rID := range mapRLs { + if has, err := tpr.dataStorage.HasData(utils.ResourcesPrefix, rID); err != nil { + return err + } else if !has { + tpr.res = append(tpr.res, rID) + } + } return nil } @@ -1932,10 +1940,30 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Printf("\t %s : %+v", id, vals) } } + if verbose { + log.Print("ResourceConfigs:") + } + for _, tpRL := range tpr.resCfgs { + rl, err := APItoResource(tpRL, tpr.timezone) + if err != nil { + return err + } + if err = tpr.dataStorage.SetResourceCfg(rl, utils.NonTransactional); err != nil { + return err + } + if verbose { + log.Print("\t", rl.ID) + } + } if verbose { log.Print("Resources:") } - for _, tpRL := range tpr.resLimits { + for _, rID := range tpr.res { + if err = tpr.dataStorage.SetResource(&Resource{ID: rID, Usages: make(map[string]*ResourceUsage)}); err != nil { + return + } + } + for _, tpRL := range tpr.resCfgs { rl, err := APItoResource(tpRL, tpr.timezone) if err != nil { return err @@ -2013,7 +2041,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err return err } } - if len(tpr.resLimits) > 0 { + if len(tpr.resCfgs) > 0 { if verbose { log.Print("Indexing resource limits") } @@ -2021,7 +2049,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } - for _, tpRL := range tpr.resLimits { + for _, tpRL := range tpr.resCfgs { if rl, err := APItoResource(tpRL, tpr.timezone); err != nil { return err } else { @@ -2141,7 +2169,7 @@ func (tpr *TpReader) ShowStatistics() { // cdr stats log.Print("CDR stats: ", len(tpr.cdrStats)) // resource limits - log.Print("ResourceLimits: ", len(tpr.resLimits)) + log.Print("ResourceLimits: ", len(tpr.resCfgs)) // stats log.Print("Stats: ", len(tpr.stats)) } @@ -2254,9 +2282,9 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { } return keys, nil case utils.ResourceConfigsPrefix: - keys := make([]string, len(tpr.resLimits)) + keys := make([]string, len(tpr.resCfgs)) i := 0 - for k := range tpr.resLimits { + for k := range tpr.resCfgs { keys[i] = k i++ } From 411c0a56fc3f1dd58547a91edf616fc61db97907 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 5 Sep 2017 18:40:58 +0200 Subject: [PATCH 2/4] Fix test tpreader --- engine/loader_csv_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index ec604c377..7bf0f51a3 100755 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -265,7 +265,7 @@ cgrates.org,mas,true,another,value,10 *out,cgrates.org,call,remo,remo,*any,*rating,Subject,remo,minu,10 *out,cgrates.org,call,remo,remo,*any,*rating,Account,remo,minu,10 ` - resLimits = ` + resCfgs = ` #Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],TTL[5],Limit[6],AllocationMessage[7],Weight[8],Thresholds[9] ResGroup21,*string,HdrAccount,1001;1002,2014-07-29T15:00:00Z,1s,2,call,true,true,10, ResGroup21,*string_prefix,HdrDestination,10;20,,,,,,,, @@ -286,7 +286,7 @@ var csvr *TpReader func init() { csvr = NewTpReader(dataStorage, NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats, thresholds), testTPID, "") + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resCfgs, stats, thresholds), testTPID, "") if err := csvr.LoadDestinations(); err != nil { log.Print("error in LoadDestinations:", err) } @@ -1389,7 +1389,7 @@ func TestLoadReverseAliases(t *testing.T) { } func TestLoadResources(t *testing.T) { - eResLimits := map[string]*utils.TPResource{ + eResCfgs := map[string]*utils.TPResource{ "ResGroup21": &utils.TPResource{ TPid: testTPID, ID: "ResGroup21", @@ -1423,10 +1423,10 @@ func TestLoadResources(t *testing.T) { Limit: "2", }, } - if len(csvr.resLimits) != len(eResLimits) { - t.Error("Failed to load resourcelimits: ", len(csvr.resLimits)) - } else if !reflect.DeepEqual(eResLimits["ResGroup22"], csvr.resLimits["ResGroup22"]) { - t.Errorf("Expecting: %+v, received: %+v", eResLimits["ResGroup22"], csvr.resLimits["ResGroup22"]) + if len(csvr.resCfgs) != len(eResCfgs) { + t.Error("Failed to load resourcelimits: ", len(csvr.resCfgs)) + } else if !reflect.DeepEqual(eResCfgs["ResGroup22"], csvr.resCfgs["ResGroup22"]) { + t.Errorf("Expecting: %+v, received: %+v", eResCfgs["ResGroup22"], csvr.resCfgs["ResGroup22"]) } } From c11d56ca376b1d7933d326a80e2a32d619732dd9 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 5 Sep 2017 18:50:42 +0200 Subject: [PATCH 3/4] Loader tests resLimits -> resCfgs --- engine/loader_it_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index c9f6e7d37..212bfd30f 100755 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -309,7 +309,7 @@ func TestLoaderITWriteToDatabase(t *testing.T) { } } - for k, rl := range loader.resLimits { + for k, rl := range loader.resCfgs { rcv, err := loader.dataStorage.GetResourceCfg(k, true, utils.NonTransactional) if err != nil { t.Error("Failed GetResourceLimit: ", err.Error()) From dd8afa24867e3d532c7a2b81fc8070aceec07dad Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 5 Sep 2017 19:09:42 +0200 Subject: [PATCH 4/4] ResourceService with map as longCache --- engine/resources.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/engine/resources.go b/engine/resources.go index 50a273359..823d38fb4 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -227,7 +227,7 @@ func NewResourceService(dataDB DataDB, shortCache *config.CacheParamConfig, stor } return &ResourceService{dataDB: dataDB, statS: statS, scEventResources: ltcache.New(shortCache.Limit, shortCache.TTL, shortCache.StaticTTL, nil), - lcEventResources: ltcache.New(ltcache.UnlimitedCaching, ltcache.UnlimitedCaching, false, nil), + lcEventResources: make(map[string][]string), storedResources: make(utils.StringMap), storeInterval: storeInterval, stopBackup: make(chan struct{})}, nil } @@ -237,7 +237,8 @@ type ResourceService struct { dataDB DataDB // So we can load the data in cache and index it statS rpcclient.RpcClientConnection // allows applying filters based on stats scEventResources *ltcache.Cache // short cache map[ruID], used to keep references to matched resources for events in allow queries - lcEventResources *ltcache.Cache // cache recording resources for events in alocation phase + lcEventResources map[string][]string // cache recording resources for events in alocation phase + lcERMux sync.RWMutex // protects the lcEventResources storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool srMux sync.RWMutex // protects storedResources storeInterval time.Duration // interval to dump data on @@ -326,17 +327,17 @@ func (rS *ResourceService) runBackup() { // returns []Resource if negative reply was cached func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) { var shortCached bool - rIDsIf, has := rS.lcEventResources.Get(evUUID) + rS.lcERMux.RLock() + rIDs, has := rS.lcEventResources[evUUID] + rS.lcERMux.RUnlock() if !has { - if rIDsIf, has = rS.scEventResources.Get(evUUID); !has { + if rIDsIf, has := rS.scEventResources.Get(evUUID); !has { return nil + } else if rIDsIf != nil { + rIDs = rIDsIf.([]string) } shortCached = true } - var rIDs []string - if rIDsIf != nil { - rIDs = rIDsIf.([]string) - } rs = make(Resources, len(rIDs)) if len(rIDs) == 0 { return @@ -354,7 +355,9 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) if shortCached { rS.scEventResources.Remove(evUUID) } else { - rS.lcEventResources.Remove(evUUID) + rS.lcERMux.Lock() + delete(rS.lcEventResources, evUUID) + rS.lcERMux.Unlock() } return nil } else { @@ -489,9 +492,10 @@ func (rS *ResourceService) V1AllocateResource(args utils.AttrRLsResourceUsage, r } } if wasShortCached || !wasCached { - rS.lcEventResources.Set(args.UsageID, mtcRLs.ids()) + rS.lcERMux.Lock() + rS.lcEventResources[args.UsageID] = mtcRLs.ids() + rS.lcERMux.Unlock() } - // index it for storing rS.srMux.Lock() for _, r := range mtcRLs { @@ -516,7 +520,9 @@ func (rS *ResourceService) V1ReleaseResource(args utils.AttrRLsResourceUsage, re } } mtcRLs.clearUsage(args.UsageID) - rS.lcEventResources.Remove(args.UsageID) + rS.lcERMux.Lock() + delete(rS.lcEventResources, args.UsageID) + rS.lcERMux.Unlock() if rS.storeInterval != -1 { rS.srMux.Lock() }