diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 8b776f07b..a8dc4ef6a 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -21,12 +21,13 @@ package apier import ( "errors" "fmt" + "path" + "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/utils" - "path" ) const ( @@ -204,7 +205,7 @@ func (self *ApierV1) SetRatingProfile(attrs AttrSetRatingProfile, reply *string) tpRpf := utils.TPRatingProfile{Tenant: attrs.Tenant, TOR: attrs.TOR, Direction: attrs.Direction, Subject: attrs.Subject} keyId := tpRpf.KeyId() if !attrs.Overwrite { - if exists, err := self.RatingDb.ExistsData(engine.RATING_PROFILE_PREFIX, keyId); err != nil { + if exists, err := self.RatingDb.DataExists(engine.RATING_PROFILE_PREFIX, keyId); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } else if exists { return errors.New(utils.ERR_EXISTS) @@ -216,7 +217,7 @@ func (self *ApierV1) SetRatingProfile(attrs AttrSetRatingProfile, reply *string) if err != nil { return fmt.Errorf(fmt.Sprintf("%s:Cannot parse activation time from %v", utils.ERR_SERVER_ERROR, ra.ActivationTime)) } - if exists, err := self.RatingDb.ExistsData(engine.RATING_PLAN_PREFIX, ra.RatingPlanId); err != nil { + if exists, err := self.RatingDb.DataExists(engine.RATING_PLAN_PREFIX, ra.RatingPlanId); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } else if !exists { return fmt.Errorf(fmt.Sprintf("%s:RatingPlanId:%s", utils.ERR_NOT_FOUND, ra.RatingPlanId)) @@ -251,7 +252,7 @@ func (self *ApierV1) SetActions(attrs AttrSetActions, reply *string) error { } } if !attrs.Overwrite { - if exists, err := self.AccountDb.ExistsData(engine.ACTION_PREFIX, attrs.ActionsId); err != nil { + if exists, err := self.AccountDb.DataExists(engine.ACTION_PREFIX, attrs.ActionsId); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } else if exists { return errors.New(utils.ERR_EXISTS) @@ -311,7 +312,7 @@ func (self *ApierV1) SetActionTimings(attrs AttrSetActionTimings, reply *string) } } if !attrs.Overwrite { - if exists, err := self.AccountDb.ExistsData(engine.ACTION_TIMING_PREFIX, attrs.ActionTimingsId); err != nil { + if exists, err := self.AccountDb.DataExists(engine.ACTION_TIMING_PREFIX, attrs.ActionTimingsId); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } else if exists { return errors.New(utils.ERR_EXISTS) @@ -319,7 +320,7 @@ func (self *ApierV1) SetActionTimings(attrs AttrSetActionTimings, reply *string) } storeAtms := make(engine.ActionTimings, len(attrs.ActionTimings)) for idx, apiAtm := range attrs.ActionTimings { - if exists, err := self.AccountDb.ExistsData(engine.ACTION_PREFIX, apiAtm.ActionsId); err != nil { + if exists, err := self.AccountDb.DataExists(engine.ACTION_PREFIX, apiAtm.ActionsId); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } else if !exists { return fmt.Errorf("%s:%s", utils.ERR_BROKEN_REFERENCE, err.Error()) @@ -475,7 +476,7 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error { } func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) error { - var dstKeys, rpKeys, rpfKeys, actKeys []string + var dstKeys, rpKeys, rpfKeys, actKeys, shgKeys []string if len(attrs.DestinationIds) > 0 { dstKeys = make([]string, len(attrs.DestinationIds)) for idx, dId := range attrs.DestinationIds { @@ -500,10 +501,16 @@ func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) erro actKeys[idx] = engine.ACTION_PREFIX + actId } } + if len(attrs.SharedGroupIds) > 0 { + shgKeys = make([]string, len(attrs.SharedGroupIds)) + for idx, shgId := range attrs.SharedGroupIds { + shgKeys[idx] = engine.SHARED_GROUP_PREFIX + shgId + } + } if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys); err != nil { return err } - if err := self.AccountDb.CacheAccounting(actKeys); err != nil { + if err := self.AccountDb.CacheAccounting(actKeys, shgKeys); err != nil { return err } *reply = "OK" @@ -526,8 +533,8 @@ func (self *ApierV1) GetCachedItemAge(itemId string, reply *utils.CachedItemAge) } cachedItemAge := new(utils.CachedItemAge) var found bool - for idx, cacheKey := range []string{ engine.DESTINATION_PREFIX+itemId, engine.RATING_PLAN_PREFIX+itemId, engine.RATING_PROFILE_PREFIX+itemId, - engine.ACTION_PREFIX+itemId} { + for idx, cacheKey := range []string{engine.DESTINATION_PREFIX + itemId, engine.RATING_PLAN_PREFIX + itemId, engine.RATING_PROFILE_PREFIX + itemId, + engine.ACTION_PREFIX + itemId} { if age, err := cache2go.GetKeyAge(cacheKey); err == nil { found = true switch idx { diff --git a/engine/accountlock.go b/engine/accountlock.go index a781447ed..27349135b 100644 --- a/engine/accountlock.go +++ b/engine/accountlock.go @@ -30,7 +30,7 @@ func init() { type AccountLock struct { queue map[string]chan bool - sync.Mutex + sync.RWMutex } func NewAccountLock() *AccountLock { @@ -38,13 +38,15 @@ func NewAccountLock() *AccountLock { } func (cm *AccountLock) GuardGetCost(name string, handler func() (*CallCost, error)) (reply *CallCost, err error) { - cm.Lock() + cm.RLock() lock, exists := AccLock.queue[name] + cm.RUnlock() if !exists { + cm.Lock() lock = make(chan bool, 1) AccLock.queue[name] = lock + cm.Unlock() } - cm.Unlock() lock <- true reply, err = handler() <-lock @@ -52,10 +54,14 @@ func (cm *AccountLock) GuardGetCost(name string, handler func() (*CallCost, erro } func (cm *AccountLock) Guard(name string, handler func() (float64, error)) (reply float64, err error) { + cm.RLock() lock, exists := AccLock.queue[name] + cm.RUnlock() if !exists { + cm.Lock() lock = make(chan bool, 1) AccLock.queue[name] = lock + cm.Unlock() } lock <- true reply, err = handler() diff --git a/engine/balances.go b/engine/balances.go index 439b7caac..c72d2f41b 100644 --- a/engine/balances.go +++ b/engine/balances.go @@ -35,6 +35,7 @@ type Balance struct { GroupIds []string DestinationId string RateSubject string + SharedGroup string precision int } diff --git a/engine/callcost.go b/engine/callcost.go index cbce826ad..7966644ad 100644 --- a/engine/callcost.go +++ b/engine/callcost.go @@ -85,3 +85,12 @@ func (cc *CallCost) CreateCallDescriptor() *CallDescriptor { Destination: cc.Destination, } } + +func (cc *CallCost) IsPaid() bool { + for _, ts := range cc.Timespans { + if paid, _ := ts.IsPaid(); !paid { + return false + } + } + return true +} diff --git a/engine/calldesc.go b/engine/calldesc.go index ceca357bc..147e07e60 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -588,7 +588,7 @@ func (cd *CallDescriptor) FlushCache() (err error) { cache2go.XFlush() cache2go.Flush() dataStorage.CacheRating(nil, nil, nil) - accountingStorage.CacheAccounting(nil) + accountingStorage.CacheAccounting(nil, nil) return nil } diff --git a/engine/loader_csv.go b/engine/loader_csv.go index 1752c724a..61e782e4e 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -335,7 +335,7 @@ func (csvr *CSVReader) LoadDestinationRates() (err error) { } } if !destinationExists { - if dbExists, err := csvr.dataStorage.ExistsData(DESTINATION_PREFIX, record[1]); err != nil { + if dbExists, err := csvr.dataStorage.DataExists(DESTINATION_PREFIX, record[1]); err != nil { return err } else if !dbExists { return fmt.Errorf("Could not get destination for tag %v", record[1]) @@ -418,7 +418,7 @@ func (csvr *CSVReader) LoadRatingProfiles() (err error) { } _, exists := csvr.ratingPlans[record[5]] if !exists { - if dbExists, err := csvr.dataStorage.ExistsData(RATING_PLAN_PREFIX, record[5]); err != nil { + if dbExists, err := csvr.dataStorage.DataExists(RATING_PLAN_PREFIX, record[5]); err != nil { return err } else if !dbExists { return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", record[5])) diff --git a/engine/loader_db.go b/engine/loader_db.go index bac74b9b5..d82d0f3c6 100644 --- a/engine/loader_db.go +++ b/engine/loader_db.go @@ -224,7 +224,7 @@ func (dbr *DbReader) LoadDestinationRates() (err error) { } } if !destinationExists { - if dbExists, err := dbr.dataDb.ExistsData(DESTINATION_PREFIX, dr.DestinationId); err != nil { + if dbExists, err := dbr.dataDb.DataExists(DESTINATION_PREFIX, dr.DestinationId); err != nil { return err } else if !dbExists { return errors.New(fmt.Sprintf("Could not get destination for tag %v", dr.DestinationId)) @@ -278,7 +278,7 @@ func (dbr *DbReader) LoadRatingProfiles() error { } _, exists := dbr.ratingPlans[tpRa.RatingPlanId] if !exists { - if dbExists, err := dbr.dataDb.ExistsData(RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil { + if dbExists, err := dbr.dataDb.DataExists(RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil { return err } else if !dbExists { return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", tpRa.RatingPlanId)) @@ -332,7 +332,7 @@ func (dbr *DbReader) LoadRatingPlanByTag(tag string) (bool, error) { if err != nil { return false, err } else if len(dms) == 0 { - if dbExists, err := dbr.dataDb.ExistsData(DESTINATION_PREFIX, drate.DestinationId); err != nil { + if dbExists, err := dbr.dataDb.DataExists(DESTINATION_PREFIX, drate.DestinationId); err != nil { return false, err } else if !dbExists { return false, fmt.Errorf("Could not get destination for tag %v", drate.DestinationId) @@ -369,7 +369,7 @@ func (dbr *DbReader) LoadRatingProfileFiltered(qriedRpf *utils.TPRatingProfile) } _, exists := dbr.ratingPlans[tpRa.RatingPlanId] if !exists { - if dbExists, err := dbr.dataDb.ExistsData(RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil { + if dbExists, err := dbr.dataDb.DataExists(RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil { return err } else if !dbExists { return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", tpRa.RatingPlanId)) diff --git a/engine/sharedgroup.go b/engine/sharedgroup.go new file mode 100644 index 000000000..632544e1c --- /dev/null +++ b/engine/sharedgroup.go @@ -0,0 +1,28 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2013 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +type SharedGroup struct { + Id string + Strategy string + Account string + RateSubject string + Weight float64 + Members []string +} diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 4ba43e621..2485c314b 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -36,6 +36,7 @@ const ( RATING_PLAN_PREFIX = "rpl_" RATING_PROFILE_PREFIX = "rpf_" ACTION_PREFIX = "act_" + SHARED_GROUP_PREFIX = "shg_" USER_BALANCE_PREFIX = "ubl_" DESTINATION_PREFIX = "dst_" TEMP_DESTINATION_PREFIX = "tmp_" @@ -69,7 +70,7 @@ Interface for storage providers. type RatingStorage interface { Storage CacheRating([]string, []string, []string) error - ExistsData(string, string) (bool, error) + DataExists(string, string) (bool, error) GetRatingPlan(string, bool) (*RatingPlan, error) SetRatingPlan(*RatingPlan) error GetRatingProfile(string, bool) (*RatingProfile, error) @@ -80,10 +81,12 @@ type RatingStorage interface { type AccountingStorage interface { Storage - ExistsData(string, string) (bool, error) - CacheAccounting([]string) error + DataExists(string, string) (bool, error) + CacheAccounting([]string, []string) error GetActions(string, bool) (Actions, error) SetActions(string, Actions) error + GetSharedGroup(string, bool) (*SharedGroup, error) + SetSharedGroup(string, *SharedGroup) error GetUserBalance(string) (*UserBalance, error) SetUserBalance(*UserBalance) error GetActionTimings(string) (ActionTimings, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index e70e547d8..62a0288d5 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -22,11 +22,11 @@ import ( "errors" "fmt" + "strings" + "time" "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/utils" - "strings" - "time" ) type MapStorage struct { @@ -72,7 +72,7 @@ func (ms *MapStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) error { return nil } -func (ms *MapStorage) CacheAccounting(actKeys []string) error { +func (ms *MapStorage) CacheAccounting(actKeys, shgKeys []string) error { if actKeys == nil { cache2go.RemPrefixKey(ACTION_PREFIX) // Forced until we can fine tune it } @@ -84,11 +84,22 @@ func (ms *MapStorage) CacheAccounting(actKeys []string) error { } } } + if shgKeys == nil { + cache2go.RemPrefixKey(SHARED_GROUP_PREFIX) // Forced until we can fine tune it + } + for k, _ := range ms.dict { + if strings.HasPrefix(k, SHARED_GROUP_PREFIX) { + cache2go.RemKey(k) + if _, err := ms.GetSharedGroup(k[len(SHARED_GROUP_PREFIX):], true); err != nil { + return err + } + } + } return nil } // Used to check if specific subject is stored using prefix key attached to entity -func (ms *MapStorage) ExistsData(categ, subject string) (bool, error) { +func (ms *MapStorage) DataExists(categ, subject string) (bool, error) { switch categ { case DESTINATION_PREFIX: _, exists := ms.dict[DESTINATION_PREFIX+subject] @@ -216,6 +227,37 @@ func (ms *MapStorage) SetActions(key string, as Actions) (err error) { return } +func (ms *MapStorage) GetSharedGroup(key string, checkDb bool) (sg *SharedGroup, err error) { + if values, ok := ms.dict[SHARED_GROUP_PREFIX+key]; ok { + err = ms.ms.Unmarshal(values, &sg) + } else { + return nil, errors.New("not found") + } + return + + key = SHARED_GROUP_PREFIX + key + if x, err := cache2go.GetCached(key); err == nil { + return x.(*SharedGroup), nil + } + if !checkDb { + return nil, errors.New(utils.ERR_NOT_FOUND) + } + if values, ok := ms.dict[key]; ok { + err = ms.ms.Unmarshal(values, &sg) + cache2go.Cache(key, sg) + } else { + return nil, errors.New("not found") + } + return +} + +func (ms *MapStorage) SetSharedGroup(key string, sg *SharedGroup) (err error) { + result, err := ms.ms.Marshal(sg) + ms.dict[SHARED_GROUP_PREFIX+key] = result + cache2go.Cache(ACTION_PREFIX+key, sg) + return +} + func (ms *MapStorage) GetUserBalance(key string) (ub *UserBalance, err error) { if values, ok := ms.dict[USER_BALANCE_PREFIX+key]; ok { ub = &UserBalance{Id: key} diff --git a/engine/storage_redis.go b/engine/storage_redis.go index c9860c2fb..513f7f0a9 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -126,7 +126,7 @@ func (rs *RedisStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) (err error) return } -func (rs *RedisStorage) CacheAccounting(actKeys []string) (err error) { +func (rs *RedisStorage) CacheAccounting(actKeys, shgKeys []string) (err error) { if actKeys == nil { cache2go.RemPrefixKey(ACTION_PREFIX) } @@ -147,11 +147,31 @@ func (rs *RedisStorage) CacheAccounting(actKeys []string) (err error) { if len(actKeys) != 0 { Logger.Info("Finished actions caching.") } + if shgKeys == nil { + cache2go.RemPrefixKey(SHARED_GROUP_PREFIX) + } + if shgKeys == nil { + Logger.Info("Caching all shared groups") + if shgKeys, err = rs.db.Keys(SHARED_GROUP_PREFIX + "*"); err != nil { + return + } + } else if len(shgKeys) != 0 { + Logger.Info(fmt.Sprintf("Caching shared groups: %v", shgKeys)) + } + for _, key := range shgKeys { + cache2go.RemKey(key) + if _, err = rs.GetSharedGroup(key[len(SHARED_GROUP_PREFIX):], true); err != nil { + return err + } + } + if len(shgKeys) != 0 { + Logger.Info("Finished shared groups caching.") + } return nil } // Used to check if specific subject is stored using prefix key attached to entity -func (rs *RedisStorage) ExistsData(category, subject string) (bool, error) { +func (rs *RedisStorage) DataExists(category, subject string) (bool, error) { switch category { case DESTINATION_PREFIX, RATING_PLAN_PREFIX, RATING_PROFILE_PREFIX, ACTION_PREFIX, ACTION_TIMING_PREFIX, USER_BALANCE_PREFIX: return rs.db.Exists(category + subject) @@ -301,6 +321,29 @@ func (rs *RedisStorage) SetActions(key string, as Actions) (err error) { return } +func (rs *RedisStorage) GetSharedGroup(key string, checkDb bool) (sg *SharedGroup, err error) { + key = SHARED_GROUP_PREFIX + key + if x, err := cache2go.GetCached(key); err == nil { + return x.(*SharedGroup), nil + } + if !checkDb { + return nil, errors.New(utils.ERR_NOT_FOUND) + } + var values []byte + if values, err = rs.db.Get(key); err == nil { + err = rs.ms.Unmarshal(values, &sg) + cache2go.Cache(key, sg) + } + return +} + +func (rs *RedisStorage) SetSharedGroup(key string, sg *SharedGroup) (err error) { + result, err := rs.ms.Marshal(sg) + err = rs.db.Set(ACTION_PREFIX+key, result) + cache2go.Cache(ACTION_PREFIX+key, sg) + return +} + func (rs *RedisStorage) GetUserBalance(key string) (ub *UserBalance, err error) { var values []byte if values, err = rs.db.Get(USER_BALANCE_PREFIX + key); err == nil { diff --git a/engine/userbalance.go b/engine/userbalance.go index 3dc6433db..323d41900 100644 --- a/engine/userbalance.go +++ b/engine/userbalance.go @@ -20,6 +20,7 @@ package engine import ( "errors" + "fmt" "time" "github.com/cgrates/cgrates/cache2go" @@ -180,12 +181,31 @@ func (ub *UserBalance) debitCreditBalance(cc *CallCost, count bool) error { // debit minutes for _, balance := range usefulMinuteBalances { balance.DebitMinutes(cc, count, ub, usefulMoneyBalances) + if cc.IsPaid() { + return nil + } else { + // chack if it's shared + if balance.SharedGroup != "" { + sharedGroup, err := accountingStorage.GetSharedGroup(balance.SharedGroup, false) + if err != nil { + Logger.Warning(fmt.Sprintf("Could not get shared group: %s", balance.SharedGroup)) + continue + } + for _, ubId := range sharedGroup.Members { + AccLock.Guard(ubId, func() (float64, error) { + _, err := accountingStorage.GetUserBalance(ubId) + if err != nil { + Logger.Warning(fmt.Sprintf("Could not get user balance: %s", ubId)) + } + return 0, nil + }) + } + } + } } - allPaidWithMinutes := true for tsIndex := 0; tsIndex < len(cc.Timespans); tsIndex++ { ts := cc.Timespans[tsIndex] if paid, incrementIndex := ts.IsPaid(); !paid { - allPaidWithMinutes = false newTs := ts.SplitByIncrement(incrementIndex) if newTs != nil { idx := tsIndex + 1 @@ -195,9 +215,6 @@ func (ub *UserBalance) debitCreditBalance(cc *CallCost, count bool) error { } } } - if allPaidWithMinutes { - return nil - } // debit money for _, balance := range usefulMoneyBalances { balance.DebitMoney(cc, count, ub) @@ -406,8 +423,7 @@ func (ub *UserBalance) initCounters() { } func (ub *UserBalance) CleanExpiredBalancesAndBuckets() { - for key, _ := range ub.BalanceMap { - bm := ub.BalanceMap[key] + for key, bm := range ub.BalanceMap { for i := 0; i < len(bm); i++ { if bm[i].IsExpired() { // delete it @@ -416,9 +432,16 @@ func (ub *UserBalance) CleanExpiredBalancesAndBuckets() { } ub.BalanceMap[key] = bm } - for i := 0; i < len(ub.BalanceMap[MINUTES+OUTBOUND]); i++ { - if ub.BalanceMap[MINUTES+OUTBOUND][i].IsExpired() { - ub.BalanceMap[MINUTES+OUTBOUND] = append(ub.BalanceMap[MINUTES+OUTBOUND][:i], ub.BalanceMap[MINUTES+OUTBOUND][i+1:]...) +} + +// returns the shared groups that this user balance belnongs to +func (ub *UserBalance) GetSharedGroups() (groups []string) { + for _, balanceChain := range ub.BalanceMap { + for _, b := range balanceChain { + if b.SharedGroup != "" { + groups = append(groups, b.SharedGroup) + } } } + return } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 0d021870a..44c8eab97 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -281,6 +281,7 @@ type ApiReloadCache struct { RatingPlanIds []string RatingProfileIds []string ActionIds []string + SharedGroupIds []string } type AttrCacheStats struct { // Add in the future filters here maybe so we avoid counting complete cache @@ -299,8 +300,8 @@ type AttrCachedItemAge struct { } type CachedItemAge struct { - Destination time.Duration - RatingPlan time.Duration - RatingProfile time.Duration - Action time.Duration + Destination time.Duration + RatingPlan time.Duration + RatingProfile time.Duration + Action time.Duration }