diff --git a/apier/v1/resourcesv1_it_test.go b/apier/v1/resourcesv1_it_test.go index f0bbbbaad..a59a8c52c 100644 --- a/apier/v1/resourcesv1_it_test.go +++ b/apier/v1/resourcesv1_it_test.go @@ -217,16 +217,16 @@ func testV1RsAllocateResource(t *testing.T) { } func testV1RsAllowUsage(t *testing.T) { - var reply bool + var allowed bool attrRU := utils.AttrRLsResourceUsage{ UsageID: "651a8db2-4f67-4cf8-b622-169e8a482e51", Event: map[string]interface{}{"Account": "1002", "Subject": "1001", "Destination": "1002"}, Units: 1, } - if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &reply); err != nil { + if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &allowed); err != nil { t.Error(err) - } else if reply != true { - t.Errorf("Expecting: %+v, received: %+v", true, reply) + } else if !allowed { + t.Errorf("Expecting: %+v, received: %+v", true, allowed) } attrRU = utils.AttrRLsResourceUsage{ @@ -234,14 +234,15 @@ func testV1RsAllowUsage(t *testing.T) { Event: map[string]interface{}{"Account": "1002", "Subject": "1001", "Destination": "1002"}, Units: 2, } - if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &reply); err != nil { + if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &allowed); err != nil { // already t.Error(err) + } else if allowed { // already 3 usages active before allow call, we should have now more than allowed + t.Error("Resource allowed") } } func testV1RsReleaseResource(t *testing.T) { - var reply interface{} - + var reply string attrRU := utils.AttrRLsResourceUsage{ UsageID: "651a8db2-4f67-4cf8-b622-169e8a482e52", Event: map[string]interface{}{"Destination": "100"}, @@ -250,19 +251,18 @@ func testV1RsReleaseResource(t *testing.T) { if err := rlsV1Rpc.Call("ResourceSV1.ReleaseResource", attrRU, &reply); err != nil { t.Error(err) } - if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &reply); err != nil { + var allowed bool + if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &allowed); err != nil { t.Error(err) - } else { - if reply != true { - t.Errorf("Expecting: %+v, received: %+v", true, reply) - } + } else if !allowed { + t.Error("not allowed") } - attrRU.Units += 7 - if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &reply); err == nil { - t.Errorf("Expecting: %+v, received: %+v", false, reply) + if err := rlsV1Rpc.Call("ResourceSV1.AllowUsage", attrRU, &allowed); err != nil { + t.Error(err) + } else if allowed { + t.Error("Resource should not be allowed") } - } func testV1RsGetResourceConfigBeforeSet(t *testing.T) { diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index c2751f3d6..10749f23a 100755 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -265,7 +265,11 @@ cgrates.org,mas,true,another,value,10 *out,cgrates.org,call,remo,remo,*any,*rating,Subject,remo,minu,10 *out,cgrates.org,call,remo,remo,*any,*rating,Account,remo,minu,10 ` +<<<<<<< HEAD resProfiles = ` +======= + resCfgs = ` +>>>>>>> dd8afa24867e3d532c7a2b81fc8070aceec07dad #Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],TTL[5],Limit[6],AllocationMessage[7],Weight[8],Thresholds[9] ResGroup21,*string,HdrAccount,1001;1002,2014-07-29T15:00:00Z,1s,2,call,true,true,10, ResGroup21,*string_prefix,HdrDestination,10;20,,,,,,,, @@ -286,7 +290,11 @@ var csvr *TpReader func init() { csvr = NewTpReader(dataStorage, NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, +<<<<<<< HEAD sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resProfiles, stats, thresholds), testTPID, "") +======= + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resCfgs, stats, thresholds), testTPID, "") +>>>>>>> dd8afa24867e3d532c7a2b81fc8070aceec07dad if err := csvr.LoadDestinations(); err != nil { log.Print("error in LoadDestinations:", err) } @@ -1388,8 +1396,13 @@ func TestLoadReverseAliases(t *testing.T) { } } +<<<<<<< HEAD func TestLoadResourceProfiles(t *testing.T) { eResProfiles := map[string]*utils.TPResource{ +======= +func TestLoadResources(t *testing.T) { + eResCfgs := map[string]*utils.TPResource{ +>>>>>>> dd8afa24867e3d532c7a2b81fc8070aceec07dad "ResGroup21": &utils.TPResource{ TPid: testTPID, ID: "ResGroup21", @@ -1423,10 +1436,17 @@ func TestLoadResourceProfiles(t *testing.T) { Limit: "2", }, } +<<<<<<< HEAD if len(csvr.resProfiles) != len(eResProfiles) { t.Error("Failed to load resourceProfiles: ", len(csvr.resProfiles)) } else if !reflect.DeepEqual(eResProfiles["ResGroup22"], csvr.resProfiles["ResGroup22"]) { t.Errorf("Expecting: %+v, received: %+v", eResProfiles["ResGroup22"], csvr.resProfiles["ResGroup22"]) +======= + if len(csvr.resCfgs) != len(eResCfgs) { + t.Error("Failed to load resourcelimits: ", len(csvr.resCfgs)) + } else if !reflect.DeepEqual(eResCfgs["ResGroup22"], csvr.resCfgs["ResGroup22"]) { + t.Errorf("Expecting: %+v, received: %+v", eResCfgs["ResGroup22"], csvr.resCfgs["ResGroup22"]) +>>>>>>> dd8afa24867e3d532c7a2b81fc8070aceec07dad } } diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index a5a5cb61d..94d271686 100755 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -309,8 +309,13 @@ func TestLoaderITWriteToDatabase(t *testing.T) { } } +<<<<<<< HEAD for k, rl := range loader.resProfiles { rcv, err := loader.dataStorage.GetResourceProfile(k, true, utils.NonTransactional) +======= + for k, rl := range loader.resCfgs { + rcv, err := loader.dataStorage.GetResourceCfg(k, true, utils.NonTransactional) +>>>>>>> dd8afa24867e3d532c7a2b81fc8070aceec07dad if err != nil { t.Error("Failed GetResourceProfile: ", err.Error()) } diff --git a/engine/reqfilterhelpers.go b/engine/reqfilterhelpers.go index 463b557ab..bee0cb0c4 100644 --- a/engine/reqfilterhelpers.go +++ b/engine/reqfilterhelpers.go @@ -25,6 +25,7 @@ import ( // matchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event // helper on top of dataDB.MatchReqFilterIndex, adding utils.NOT_AVAILABLE to list of fields queried +// executes a number of $(len(fields) + 1) queries to dataDB so the size of event influences the speed of return func matchingItemIDsForEvent(ev map[string]interface{}, dataDB DataDB, dbIdxKey string) (itemIDs utils.StringMap, err error) { itemIDs = make(utils.StringMap) for fldName, fieldValIf := range ev { diff --git a/engine/resources.go b/engine/resources.go index f2e70d28a..fa99f4e41 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -227,8 +227,9 @@ func NewResourceService(dataDB DataDB, shortCache *config.CacheParamConfig, stor } return &ResourceService{dataDB: dataDB, statS: statS, scEventResources: ltcache.New(shortCache.Limit, shortCache.TTL, shortCache.StaticTTL, nil), - lcEventResources: ltcache.New(ltcache.UnlimitedCaching, ltcache.UnlimitedCaching, false, nil), - storeInterval: storeInterval}, nil + lcEventResources: make(map[string][]string), + storedResources: make(utils.StringMap), + storeInterval: storeInterval, stopBackup: make(chan struct{})}, nil } // ResourceService is the service handling resources @@ -236,7 +237,8 @@ type ResourceService struct { dataDB DataDB // So we can load the data in cache and index it statS rpcclient.RpcClientConnection // allows applying filters based on stats scEventResources *ltcache.Cache // short cache map[ruID], used to keep references to matched resources for events in allow queries - lcEventResources *ltcache.Cache // cache recording resources for events in alocation phase + lcEventResources map[string][]string // cache recording resources for events in alocation phase + lcERMux sync.RWMutex // protects the lcEventResources storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool srMux sync.RWMutex // protects storedResources storeInterval time.Duration // interval to dump data on @@ -325,17 +327,17 @@ func (rS *ResourceService) runBackup() { // returns []Resource if negative reply was cached func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) { var shortCached bool - rIDsIf, has := rS.lcEventResources.Get(evUUID) + rS.lcERMux.RLock() + rIDs, has := rS.lcEventResources[evUUID] + rS.lcERMux.RUnlock() if !has { - if rIDsIf, has = rS.scEventResources.Get(evUUID); !has { + if rIDsIf, has := rS.scEventResources.Get(evUUID); !has { return nil + } else if rIDsIf != nil { + rIDs = rIDsIf.([]string) } shortCached = true } - var rIDs []string - if rIDsIf != nil { - rIDs = rIDsIf.([]string) - } rs = make(Resources, len(rIDs)) if len(rIDs) == 0 { return @@ -353,7 +355,9 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) if shortCached { rS.scEventResources.Remove(evUUID) } else { - rS.lcEventResources.Remove(evUUID) + rS.lcERMux.Lock() + delete(rS.lcEventResources, evUUID) + rS.lcERMux.Unlock() } return nil } else { @@ -453,6 +457,7 @@ func (rS *ResourceService) V1AllowUsage(args utils.AttrRLsResourceUsage, allow * Units: args.Units}, true); err != nil { if err == utils.ErrResourceUnavailable { rS.scEventResources.Set(args.UsageID, nil) + err = nil return // not error but still not allowed } return err @@ -487,9 +492,10 @@ func (rS *ResourceService) V1AllocateResource(args utils.AttrRLsResourceUsage, r } } if wasShortCached || !wasCached { - rS.lcEventResources.Set(args.UsageID, mtcRLs.ids()) + rS.lcERMux.Lock() + rS.lcEventResources[args.UsageID] = mtcRLs.ids() + rS.lcERMux.Unlock() } - // index it for storing rS.srMux.Lock() for _, r := range mtcRLs { @@ -514,7 +520,9 @@ func (rS *ResourceService) V1ReleaseResource(args utils.AttrRLsResourceUsage, re } } mtcRLs.clearUsage(args.UsageID) - rS.lcEventResources.Remove(args.UsageID) + rS.lcERMux.Lock() + delete(rS.lcEventResources, args.UsageID) + rS.lcERMux.Unlock() if rS.storeInterval != -1 { rS.srMux.Lock() } diff --git a/engine/storage_map.go b/engine/storage_map.go index d8fbf8e70..d79793221 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -320,7 +320,7 @@ func (ms *MapStorage) HasData(categ, subject string) (bool, error) { ms.mu.RLock() defer ms.mu.RUnlock() switch categ { - case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX: + case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ResourcesPrefix: _, exists := ms.dict[categ+subject] return exists, nil } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 9b7eb31a1..4f965fb8e 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -689,6 +689,9 @@ func (ms *MongoStorage) HasData(category, subject string) (bool, error) { case utils.ACCOUNT_PREFIX: count, err := db.C(colAcc).Find(bson.M{"id": subject}).Count() return count > 0, err + case utils.ResourcesPrefix: + count, err := db.C(colRes).Find(bson.M{"id": subject}).Count() + return count > 0, err } return false, errors.New("unsupported category in HasData") } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 5307fd7a5..9b335582d 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -328,7 +328,7 @@ func (rs *RedisStorage) GetKeysForPrefix(prefix string) ([]string, error) { // Used to check if specific subject is stored using prefix key attached to entity func (rs *RedisStorage) HasData(category, subject string) (bool, error) { switch category { - case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX: + case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, utils.ResourcesPrefix: i, err := rs.Cmd("EXISTS", category+subject).Int() return i == 1, err } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 12e5f77ae..fb7cde21e 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -52,7 +52,12 @@ type TpReader struct { cdrStats map[string]*CdrStats users map[string]*UserProfile aliases map[string]*Alias +<<<<<<< HEAD resProfiles map[string]*utils.TPResource +======= + resCfgs map[string]*utils.TPResource + res []string // IDs of resources which need creation based on resourceConfigs +>>>>>>> dd8afa24867e3d532c7a2b81fc8070aceec07dad stats map[string]*utils.TPStats thresholds map[string]*utils.TPThreshold @@ -126,7 +131,11 @@ func (tpr *TpReader) Init() { tpr.users = make(map[string]*UserProfile) tpr.aliases = make(map[string]*Alias) tpr.derivedChargers = make(map[string]*utils.DerivedChargers) +<<<<<<< HEAD tpr.resProfiles = make(map[string]*utils.TPResource) +======= + tpr.resCfgs = make(map[string]*utils.TPResource) +>>>>>>> dd8afa24867e3d532c7a2b81fc8070aceec07dad tpr.stats = make(map[string]*utils.TPStats) tpr.thresholds = make(map[string]*utils.TPThreshold) tpr.revDests = make(map[string][]string) @@ -1600,7 +1609,18 @@ func (tpr *TpReader) LoadResourceProfilesFiltered(tag string) error { for _, rl := range rls { mapRsPs[rl.ID] = rl } +<<<<<<< HEAD tpr.resProfiles = mapRsPs +======= + tpr.resCfgs = mapRLs + for rID := range mapRLs { + if has, err := tpr.dataStorage.HasData(utils.ResourcesPrefix, rID); err != nil { + return err + } else if !has { + tpr.res = append(tpr.res, rID) + } + } +>>>>>>> dd8afa24867e3d532c7a2b81fc8070aceec07dad return nil } @@ -1933,10 +1953,37 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } if verbose { +<<<<<<< HEAD log.Print("ResourceProfiles:") } for _, tpRsp := range tpr.resProfiles { rsp, err := APItoResource(tpRsp, tpr.timezone) +======= + log.Print("ResourceConfigs:") + } + for _, tpRL := range tpr.resCfgs { + rl, err := APItoResource(tpRL, tpr.timezone) + if err != nil { + return err + } + if err = tpr.dataStorage.SetResourceCfg(rl, utils.NonTransactional); err != nil { + return err + } + if verbose { + log.Print("\t", rl.ID) + } + } + if verbose { + log.Print("Resources:") + } + for _, rID := range tpr.res { + if err = tpr.dataStorage.SetResource(&Resource{ID: rID, Usages: make(map[string]*ResourceUsage)}); err != nil { + return + } + } + for _, tpRL := range tpr.resCfgs { + rl, err := APItoResource(tpRL, tpr.timezone) +>>>>>>> dd8afa24867e3d532c7a2b81fc8070aceec07dad if err != nil { return err } @@ -2013,7 +2060,11 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err return err } } +<<<<<<< HEAD if len(tpr.resProfiles) > 0 { +======= + if len(tpr.resCfgs) > 0 { +>>>>>>> dd8afa24867e3d532c7a2b81fc8070aceec07dad if verbose { log.Print("Indexing resource profiles") } @@ -2021,7 +2072,11 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } +<<<<<<< HEAD for _, tpRL := range tpr.resProfiles { +======= + for _, tpRL := range tpr.resCfgs { +>>>>>>> dd8afa24867e3d532c7a2b81fc8070aceec07dad if rl, err := APItoResource(tpRL, tpr.timezone); err != nil { return err } else { @@ -2141,7 +2196,11 @@ func (tpr *TpReader) ShowStatistics() { // cdr stats log.Print("CDR stats: ", len(tpr.cdrStats)) // resource limits +<<<<<<< HEAD log.Print("ResourceProfiles: ", len(tpr.resProfiles)) +======= + log.Print("ResourceLimits: ", len(tpr.resCfgs)) +>>>>>>> dd8afa24867e3d532c7a2b81fc8070aceec07dad // stats log.Print("Stats: ", len(tpr.stats)) } @@ -2253,10 +2312,17 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { i++ } return keys, nil +<<<<<<< HEAD case utils.ResourceProfilesPrefix: keys := make([]string, len(tpr.resProfiles)) i := 0 for k := range tpr.resProfiles { +======= + case utils.ResourceConfigsPrefix: + keys := make([]string, len(tpr.resCfgs)) + i := 0 + for k := range tpr.resCfgs { +>>>>>>> dd8afa24867e3d532c7a2b81fc8070aceec07dad keys[i] = k i++ }