From 10054d353a2f6a66216aa04560c946ec0f8c4e26 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 29 Mar 2016 22:38:03 +0300 Subject: [PATCH 1/8] attempt to fix concurrent map panic --- cdrc/cdrc_test.go | 4 +- cdrc/csv.go | 10 +++- cdrc/fwv.go | 15 ++++-- cdre/cdrexporter.go | 6 ++- cmd/cgr-tester/cgr-tester.go | 1 - engine/cdr_local_test.go | 7 ++- engine/cdrs.go | 2 +- engine/guardian.go | 4 -- engine/pubsub.go | 9 +++- engine/pubsub_test.go | 40 +------------- engine/storage_map.go | 102 +++++++++++++++++++++++++++++++++++ sessionmanager/sessions.go | 2 +- sessionmanager/smgeneric.go | 2 +- utils/httpclient.go | 9 +--- 14 files changed, 147 insertions(+), 66 deletions(-) diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go index ac515b468..e79af9382 100644 --- a/cdrc/cdrc_test.go +++ b/cdrc/cdrc_test.go @@ -156,7 +156,7 @@ BYE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|14 }} cdrc := &Cdrc{CdrFormat: utils.OSIPS_FLATSTORE, cdrSourceIds: []string{"TEST_CDRC"}, failedCallsPrefix: "missed_calls", cdrFields: cdrFields, partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), - guard: engine.NewGuardianLock()} + guard: engine.Guardian} cdrsContent := bytes.NewReader([]byte(flatstoreCdrs)) csvReader := csv.NewReader(cdrsContent) csvReader.Comma = '|' @@ -283,7 +283,7 @@ INVITE|324cb497|d4af7023|8deaadf2ae9a17809a391f05af31afb0@0:0:0:0:0:0:0:0|486|Bu }} cdrc := &Cdrc{CdrFormat: utils.OSIPS_FLATSTORE, cdrSourceIds: []string{"TEST_CDRC"}, failedCallsPrefix: "missed_calls", cdrFields: cdrFields, partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), - guard: engine.NewGuardianLock()} + guard: engine.Guardian} cdrsContent := bytes.NewReader([]byte(flatstoreCdrs)) csvReader := csv.NewReader(cdrsContent) csvReader.Comma = '|' diff --git a/cdrc/csv.go b/cdrc/csv.go index c6daaaee0..740ff14ac 100644 --- a/cdrc/csv.go +++ b/cdrc/csv.go @@ -20,6 +20,7 @@ package cdrc import ( "encoding/csv" + "encoding/json" "errors" "fmt" "os" @@ -93,7 +94,7 @@ func pairToRecord(part1, part2 *PartialFlatstoreRecord) ([]string, error) { func NewPartialRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune) (*PartialRecordsCache, error) { return &PartialRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep, - partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.NewGuardianLock()}, nil + partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.Guardian}, nil } type PartialRecordsCache struct { @@ -323,7 +324,12 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcId strin for _, rsrFld := range httpFieldCfg.Value { httpAddr += rsrFld.ParseValue("") } - if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, storedCdr); err != nil && httpFieldCfg.Mandatory { + var jsn []byte + jsn, err = json.Marshal(storedCdr) + if err != nil { + return nil, err + } + if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory { return nil, err } else { fieldVal = string(outValByte) diff --git a/cdrc/fwv.go b/cdrc/fwv.go index b1fffe60d..7940d268d 100644 --- a/cdrc/fwv.go +++ b/cdrc/fwv.go @@ -20,16 +20,18 @@ package cdrc import ( "bufio" + "encoding/json" "fmt" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" "io" "net/http" "os" "strconv" "strings" "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) func fwvValue(cdrLine string, indexStart, width int, padding string) string { @@ -214,7 +216,12 @@ func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cfgKey string) for _, rsrFld := range httpFieldCfg.Value { httpAddr += rsrFld.ParseValue("") } - if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, storedCdr); err != nil && httpFieldCfg.Mandatory { + var jsn []byte + jsn, err = json.Marshal(storedCdr) + if err != nil { + return nil, err + } + if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory { return nil, err } else { fieldVal = string(outValByte) diff --git a/cdre/cdrexporter.go b/cdre/cdrexporter.go index e90b88e03..c269385e1 100644 --- a/cdre/cdrexporter.go +++ b/cdre/cdrexporter.go @@ -352,9 +352,13 @@ func (cdre *CdrExporter) processCdr(cdr *engine.CDR) error { case utils.META_HTTP_POST: var outValByte []byte httpAddr := cfgFld.Value.Id() + jsn, err := json.Marshal(cdr) + if err != nil { + return err + } if len(httpAddr) == 0 { err = fmt.Errorf("Empty http address for field %s type %s", cfgFld.Tag, cfgFld.Type) - } else if outValByte, err = utils.HttpJsonPost(httpAddr, cdre.httpSkipTlsCheck, cdr); err == nil { + } else if outValByte, err = utils.HttpJsonPost(httpAddr, cdre.httpSkipTlsCheck, jsn); err == nil { outVal = string(outValByte) if len(outVal) == 0 && cfgFld.Mandatory { err = fmt.Errorf("Empty result for http_post field: %s", cfgFld.Tag) diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index 1fbdc9c66..e4597655d 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -153,7 +153,6 @@ func durRemoteRater(cd *engine.CallDescriptor) (time.Duration, error) { func main() { flag.Parse() - runtime.GOMAXPROCS(runtime.NumCPU() - 1) if *cpuprofile != "" { f, err := os.Create(*cpuprofile) diff --git a/engine/cdr_local_test.go b/engine/cdr_local_test.go index 823e8f6d5..edf7c8880 100644 --- a/engine/cdr_local_test.go +++ b/engine/cdr_local_test.go @@ -19,10 +19,12 @@ along with this program. If not, see package engine import ( + "encoding/json" "flag" - "github.com/cgrates/cgrates/utils" "testing" "time" + + "github.com/cgrates/cgrates/utils" ) // Arguments received via test command @@ -42,7 +44,8 @@ func TestHttpJsonPost(t *testing.T) { RunID: utils.DEFAULT_RUNID, Usage: "0.00000001", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, } - if _, err := utils.HttpJsonPost("http://localhost:8000", false, cdrOut); err == nil { + jsn, _ := json.Marshal(cdrOut) + if _, err := utils.HttpJsonPost("http://localhost:8000", false, jsn); err == nil { t.Error(err) } } diff --git a/engine/cdrs.go b/engine/cdrs.go index 122bd5fbe..dd164fe03 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -70,7 +70,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { } func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, pubsub rpcclient.RpcClientConnection, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) { - return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{locksMap: make(map[string]chan bool)}}, nil + return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian}, nil } type CdrServer struct { diff --git a/engine/guardian.go b/engine/guardian.go index dc2975aff..6d6664e39 100644 --- a/engine/guardian.go +++ b/engine/guardian.go @@ -26,10 +26,6 @@ import ( // global package variable var Guardian = &GuardianLock{locksMap: make(map[string]chan bool)} -func NewGuardianLock() *GuardianLock { - return &GuardianLock{locksMap: make(map[string]chan bool)} -} - type GuardianLock struct { locksMap map[string]chan bool mu sync.RWMutex diff --git a/engine/pubsub.go b/engine/pubsub.go index 0d4dd17c1..6d9bcd1ea 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -1,6 +1,7 @@ package engine import ( + "encoding/json" "errors" "fmt" "sync" @@ -43,7 +44,7 @@ type SubscriberData struct { type PubSub struct { subscribers map[string]*SubscriberData ttlVerify bool - pubFunc func(string, bool, interface{}) ([]byte, error) + pubFunc func(string, bool, []byte) ([]byte, error) mux *sync.Mutex accountDb AccountingStorage } @@ -140,12 +141,16 @@ func (ps *PubSub) Publish(evt CgrEvent, reply *string) error { transport := split[0] address := split[1] ttlVerify := ps.ttlVerify + jsn, err := json.Marshal(evt) + if err != nil { + return err + } switch transport { case utils.META_HTTP_POST: go func() { delay := utils.Fib() for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort - if _, err := ps.pubFunc(address, ttlVerify, evt); err == nil { + if _, err := ps.pubFunc(address, ttlVerify, jsn); err == nil { break // Success, no need to reinterate } else if i == 4 { // Last iteration, syslog the warning utils.Logger.Warning(fmt.Sprintf(" Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), evt["EventName"])) diff --git a/engine/pubsub_test.go b/engine/pubsub_test.go index 5345a06c0..ac28837a8 100644 --- a/engine/pubsub_test.go +++ b/engine/pubsub_test.go @@ -116,43 +116,9 @@ func TestUnsubscribeSave(t *testing.T) { } } -func TestPublish(t *testing.T) { - ps := NewPubSub(accountingStorage, true) - ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { - obj.(CgrEvent)["called"] = url - return nil, nil - } - var r string - if err := ps.Subscribe(SubscribeInfo{ - EventFilter: "EventName/test", - Transport: utils.META_HTTP_POST, - Address: "url", - LifeSpan: time.Second, - }, &r); err != nil { - t.Error("Error subscribing: ", err) - } - m := make(map[string]string) - m["EventFilter"] = "test" - if err := ps.Publish(m, &r); err != nil { - t.Error("Error publishing: ", err) - } - for i := 0; i < 1000; i++ { // wait for the theread to populate map - if len(m) == 2 { - time.Sleep(time.Microsecond) - } else { - break - } - } - if r, exists := m["called"]; !exists || r != "url" { - t.Error("Error calling publish function: ", m) - } -} - func TestPublishExpired(t *testing.T) { ps := NewPubSub(accountingStorage, true) - ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { - m := obj.(map[string]string) - m["called"] = "yes" + ps.pubFunc = func(url string, ttl bool, obj []byte) ([]byte, error) { return nil, nil } var r string @@ -174,9 +140,7 @@ func TestPublishExpired(t *testing.T) { func TestPublishExpiredSave(t *testing.T) { ps := NewPubSub(accountingStorage, true) - ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) { - m := obj.(map[string]string) - m["called"] = "yes" + ps.pubFunc = func(url string, ttl bool, obj []byte) ([]byte, error) { return nil, nil } var r string diff --git a/engine/storage_map.go b/engine/storage_map.go index d87fdcddd..f7e21306a 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "io/ioutil" + "sync" "strings" "time" @@ -36,6 +37,7 @@ type MapStorage struct { dict map[string][]byte tasks [][]byte ms Marshaler + mu sync.RWMutex } func NewMapStorage() (*MapStorage, error) { @@ -49,11 +51,15 @@ func NewMapStorageJson() (*MapStorage, error) { func (ms *MapStorage) Close() {} func (ms *MapStorage) Flush(ignore string) error { + ms.mu.Lock() + defer ms.mu.Unlock() ms.dict = make(map[string][]byte) return nil } func (ms *MapStorage) GetKeysForPrefix(prefix string, skipCache bool) ([]string, error) { + ms.mu.RLock() + defer ms.mu.RUnlock() if skipCache { keysForPrefix := make([]string, 0) for key := range ms.dict { @@ -256,6 +262,8 @@ func (ms *MapStorage) cacheAccounting(alsKeys []string) error { // Used to check if specific subject is stored using prefix key attached to entity func (ms *MapStorage) HasData(categ, subject string) (bool, error) { + ms.mu.RLock() + defer ms.mu.RUnlock() switch categ { case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX: _, exists := ms.dict[categ+subject] @@ -265,6 +273,8 @@ func (ms *MapStorage) HasData(categ, subject string) (bool, error) { } func (ms *MapStorage) GetRatingPlan(key string, skipCache bool) (rp *RatingPlan, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() key = utils.RATING_PLAN_PREFIX + key if !skipCache { if x, err := cache2go.Get(key); err == nil { @@ -294,6 +304,8 @@ func (ms *MapStorage) GetRatingPlan(key string, skipCache bool) (rp *RatingPlan, } func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(rp) var b bytes.Buffer w := zlib.NewWriter(&b) @@ -308,6 +320,8 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) { } func (ms *MapStorage) GetRatingProfile(key string, skipCache bool) (rpf *RatingProfile, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() key = utils.RATING_PROFILE_PREFIX + key if !skipCache { if x, err := cache2go.Get(key); err == nil { @@ -328,6 +342,8 @@ func (ms *MapStorage) GetRatingProfile(key string, skipCache bool) (rpf *RatingP } func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(rpf) ms.dict[utils.RATING_PROFILE_PREFIX+rpf.Id] = result response := 0 @@ -338,6 +354,8 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) { } func (ms *MapStorage) RemoveRatingProfile(key string) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() for k := range ms.dict { if strings.HasPrefix(k, key) { delete(ms.dict, key) @@ -353,6 +371,8 @@ func (ms *MapStorage) RemoveRatingProfile(key string) (err error) { } func (ms *MapStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() key = utils.LCR_PREFIX + key if !skipCache { if x, err := cache2go.Get(key); err == nil { @@ -371,12 +391,16 @@ func (ms *MapStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error) { } func (ms *MapStorage) SetLCR(lcr *LCR) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(lcr) ms.dict[utils.LCR_PREFIX+lcr.GetId()] = result return } func (ms *MapStorage) GetDestination(key string) (dest *Destination, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() key = utils.DESTINATION_PREFIX + key if values, ok := ms.dict[key]; ok { b := bytes.NewBuffer(values) @@ -402,6 +426,8 @@ func (ms *MapStorage) GetDestination(key string) (dest *Destination, err error) } func (ms *MapStorage) SetDestination(dest *Destination) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(dest) var b bytes.Buffer w := zlib.NewWriter(&b) @@ -416,6 +442,8 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) { } func (ms *MapStorage) GetActions(key string, skipCache bool) (as Actions, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() key = utils.ACTION_PREFIX + key if !skipCache { if x, err := cache2go.Get(key); err == nil { @@ -434,12 +462,16 @@ func (ms *MapStorage) GetActions(key string, skipCache bool) (as Actions, err er } func (ms *MapStorage) SetActions(key string, as Actions) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(&as) ms.dict[utils.ACTION_PREFIX+key] = result return } func (ms *MapStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGroup, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() key = utils.SHARED_GROUP_PREFIX + key if !skipCache { if x, err := cache2go.Get(key); err == nil { @@ -460,12 +492,16 @@ func (ms *MapStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGrou } func (ms *MapStorage) SetSharedGroup(sg *SharedGroup) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(sg) ms.dict[utils.SHARED_GROUP_PREFIX+sg.Id] = result return } func (ms *MapStorage) GetAccount(key string) (ub *Account, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() if values, ok := ms.dict[utils.ACCOUNT_PREFIX+key]; ok { ub = &Account{ID: key} err = ms.ms.Unmarshal(values, ub) @@ -488,17 +524,23 @@ func (ms *MapStorage) SetAccount(ub *Account) (err error) { ub = ac } } + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(ub) ms.dict[utils.ACCOUNT_PREFIX+ub.ID] = result return } func (ms *MapStorage) RemoveAccount(key string) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() delete(ms.dict, utils.ACCOUNT_PREFIX+key) return } func (ms *MapStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() if values, ok := ms.dict[utils.CDR_STATS_QUEUE_PREFIX+key]; ok { sq = &StatsQueue{} err = ms.ms.Unmarshal(values, sq) @@ -509,12 +551,16 @@ func (ms *MapStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) { } func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(sq) ms.dict[utils.CDR_STATS_QUEUE_PREFIX+sq.GetId()] = result return } func (ms *MapStorage) GetSubscribers() (result map[string]*SubscriberData, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() result = make(map[string]*SubscriberData) for key, value := range ms.dict { if strings.HasPrefix(key, utils.PUBSUB_SUBSCRIBERS_PREFIX) { @@ -527,17 +573,23 @@ func (ms *MapStorage) GetSubscribers() (result map[string]*SubscriberData, err e return } func (ms *MapStorage) SetSubscriber(key string, sub *SubscriberData) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(sub) ms.dict[utils.PUBSUB_SUBSCRIBERS_PREFIX+key] = result return } func (ms *MapStorage) RemoveSubscriber(key string) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() delete(ms.dict, utils.PUBSUB_SUBSCRIBERS_PREFIX+key) return } func (ms *MapStorage) SetUser(up *UserProfile) error { + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(up) if err != nil { return err @@ -546,6 +598,8 @@ func (ms *MapStorage) SetUser(up *UserProfile) error { return nil } func (ms *MapStorage) GetUser(key string) (up *UserProfile, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() up = &UserProfile{} if values, ok := ms.dict[utils.USERS_PREFIX+key]; ok { err = ms.ms.Unmarshal(values, &up) @@ -556,6 +610,8 @@ func (ms *MapStorage) GetUser(key string) (up *UserProfile, err error) { } func (ms *MapStorage) GetUsers() (result []*UserProfile, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() for key, value := range ms.dict { if strings.HasPrefix(key, utils.USERS_PREFIX) { up := &UserProfile{} @@ -568,11 +624,15 @@ func (ms *MapStorage) GetUsers() (result []*UserProfile, err error) { } func (ms *MapStorage) RemoveUser(key string) error { + ms.mu.Lock() + defer ms.mu.Unlock() delete(ms.dict, utils.USERS_PREFIX+key) return nil } func (ms *MapStorage) SetAlias(al *Alias) error { + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(al.Values) if err != nil { return err @@ -582,6 +642,8 @@ func (ms *MapStorage) SetAlias(al *Alias) error { } func (ms *MapStorage) GetAlias(key string, skipCache bool) (al *Alias, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() key = utils.ALIASES_PREFIX + key if !skipCache { if x, err := cache2go.Get(key); err == nil { @@ -607,6 +669,8 @@ func (ms *MapStorage) GetAlias(key string, skipCache bool) (al *Alias, err error } func (ms *MapStorage) RemoveAlias(key string) error { + ms.mu.Lock() + defer ms.mu.Unlock() al := &Alias{} al.SetId(key) key = utils.ALIASES_PREFIX + key @@ -622,14 +686,20 @@ func (ms *MapStorage) RemoveAlias(key string) error { } func (ms *MapStorage) GetLoadHistory(limitItems int, skipCache bool) ([]*LoadInstance, error) { + ms.mu.RLock() + defer ms.mu.RUnlock() return nil, nil } func (ms *MapStorage) AddLoadHistory(*LoadInstance, int) error { + ms.mu.Lock() + defer ms.mu.Unlock() return nil } func (ms *MapStorage) GetActionTriggers(key string) (atrs ActionTriggers, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() if values, ok := ms.dict[utils.ACTION_TRIGGER_PREFIX+key]; ok { err = ms.ms.Unmarshal(values, &atrs) } else { @@ -639,6 +709,8 @@ func (ms *MapStorage) GetActionTriggers(key string) (atrs ActionTriggers, err er } func (ms *MapStorage) SetActionTriggers(key string, atrs ActionTriggers) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() if len(atrs) == 0 { // delete the key delete(ms.dict, utils.ACTION_TRIGGER_PREFIX+key) @@ -650,6 +722,8 @@ func (ms *MapStorage) SetActionTriggers(key string, atrs ActionTriggers) (err er } func (ms *MapStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPlan, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() key = utils.ACTION_PLAN_PREFIX + key if !skipCache { if x, err := cache2go.Get(key); err == nil { @@ -669,6 +743,8 @@ func (ms *MapStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPlan func (ms *MapStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool) (err error) { if len(ats.ActionTimings) == 0 { + ms.mu.Lock() + defer ms.mu.Unlock() // delete the key delete(ms.dict, utils.ACTION_PLAN_PREFIX+key) cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key) @@ -685,12 +761,16 @@ func (ms *MapStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool) } } } + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(&ats) ms.dict[utils.ACTION_PLAN_PREFIX+key] = result return } func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() apls, err := cache2go.GetAllEntries(utils.ACTION_PLAN_PREFIX) if err != nil { return nil, err @@ -706,6 +786,8 @@ func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error } func (ms *MapStorage) PushTask(t *Task) error { + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(t) if err != nil { return err @@ -715,6 +797,8 @@ func (ms *MapStorage) PushTask(t *Task) error { } func (ms *MapStorage) PopTask() (t *Task, err error) { + ms.mu.Lock() + defer ms.mu.Unlock() if len(ms.tasks) > 0 { var values []byte values, ms.tasks = ms.tasks[0], ms.tasks[1:] @@ -727,6 +811,8 @@ func (ms *MapStorage) PopTask() (t *Task, err error) { } func (ms *MapStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() key = utils.DERIVEDCHARGERS_PREFIX + key if !skipCache { if x, err := cache2go.Get(key); err == nil { @@ -745,6 +831,8 @@ func (ms *MapStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils } func (ms *MapStorage) SetDerivedChargers(key string, dcs *utils.DerivedChargers) error { + ms.mu.Lock() + defer ms.mu.Unlock() if dcs == nil || len(dcs.Chargers) == 0 { delete(ms.dict, utils.DERIVEDCHARGERS_PREFIX+key) cache2go.RemKey(utils.DERIVEDCHARGERS_PREFIX + key) @@ -756,12 +844,16 @@ func (ms *MapStorage) SetDerivedChargers(key string, dcs *utils.DerivedChargers) } func (ms *MapStorage) SetCdrStats(cs *CdrStats) error { + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(cs) ms.dict[utils.CDR_STATS_PREFIX+cs.Id] = result return err } func (ms *MapStorage) GetCdrStats(key string) (cs *CdrStats, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() if values, ok := ms.dict[utils.CDR_STATS_PREFIX+key]; ok { err = ms.ms.Unmarshal(values, &cs) } else { @@ -771,6 +863,8 @@ func (ms *MapStorage) GetCdrStats(key string) (cs *CdrStats, err error) { } func (ms *MapStorage) GetAllCdrStats() (css []*CdrStats, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() for key, value := range ms.dict { if !strings.HasPrefix(key, utils.CDR_STATS_PREFIX) { continue @@ -783,12 +877,16 @@ func (ms *MapStorage) GetAllCdrStats() (css []*CdrStats, err error) { } func (ms *MapStorage) SetSMCost(smCost *SMCost) error { + ms.mu.Lock() + defer ms.mu.Unlock() result, err := ms.ms.Marshal(smCost) ms.dict[utils.LOG_CALL_COST_PREFIX+smCost.CostSource+smCost.RunID+"_"+smCost.CGRID] = result return err } func (ms *MapStorage) GetSMCost(cgrid, source, runid, originHost, originID string) (smCost *SMCost, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() if values, ok := ms.dict[utils.LOG_CALL_COST_PREFIX+source+runid+"_"+cgrid]; ok { err = ms.ms.Unmarshal(values, &smCost) } else { @@ -798,6 +896,8 @@ func (ms *MapStorage) GetSMCost(cgrid, source, runid, originHost, originID strin } func (ms *MapStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() mat, err := ms.ms.Marshal(at) if err != nil { return @@ -811,6 +911,8 @@ func (ms *MapStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, a } func (ms *MapStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() mat, err := ms.ms.Marshal(at) if err != nil { return diff --git a/sessionmanager/sessions.go b/sessionmanager/sessions.go index 65e4a25c1..690a9127a 100644 --- a/sessionmanager/sessions.go +++ b/sessionmanager/sessions.go @@ -28,7 +28,7 @@ import ( func NewSessions() *Sessions { return &Sessions{ sessionsMux: new(sync.Mutex), - guard: engine.NewGuardianLock(), + guard: engine.Guardian, } } diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 5f7359b1a..d8f1bd25f 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -35,7 +35,7 @@ var ErrPartiallyExecuted = errors.New("Partially executed") func NewSMGeneric(cgrCfg *config.CGRConfig, rater engine.Connector, cdrsrv engine.Connector, timezone string, extconns *SMGExternalConnections) *SMGeneric { gsm := &SMGeneric{cgrCfg: cgrCfg, rater: rater, cdrsrv: cdrsrv, extconns: extconns, timezone: timezone, - sessions: make(map[string][]*SMGSession), sessionsMux: new(sync.Mutex), guard: engine.NewGuardianLock()} + sessions: make(map[string][]*SMGSession), sessionsMux: new(sync.Mutex), guard: engine.Guardian} return gsm } diff --git a/utils/httpclient.go b/utils/httpclient.go index 0fad55afb..169067914 100644 --- a/utils/httpclient.go +++ b/utils/httpclient.go @@ -22,7 +22,6 @@ import ( "bytes" "crypto/tls" "encoding/gob" - "encoding/json" "fmt" "io/ioutil" "net/http" @@ -49,16 +48,12 @@ func GetBytes(content interface{}) ([]byte, error) { } // Post without automatic failover -func HttpJsonPost(url string, skipTlsVerify bool, content interface{}) ([]byte, error) { - body, err := json.Marshal(content) - if err != nil { - return nil, err - } +func HttpJsonPost(url string, skipTlsVerify bool, content []byte) ([]byte, error) { tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTlsVerify}, } client := &http.Client{Transport: tr} - resp, err := client.Post(url, "application/json", bytes.NewBuffer(body)) + resp, err := client.Post(url, "application/json", bytes.NewBuffer(content)) if err != nil { return nil, err } From 97c96dd46d23fa60d72c3fe0a87f87f01bf33dc1 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 29 Mar 2016 23:34:48 +0300 Subject: [PATCH 2/8] better account disabled error reporting --- engine/calldesc.go | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/engine/calldesc.go b/engine/calldesc.go index d5f3c155f..9ab8a9373 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -188,7 +188,11 @@ func (cd *CallDescriptor) getAccount() (ub *Account, err error) { cd.account, err = accountingStorage.GetAccount(cd.GetAccountKey()) } if cd.account != nil && cd.account.Disabled { - return nil, fmt.Errorf("User %s is disabled", cd.account.ID) + return nil, utils.ErrAccountDisabled + } + if err != nil || cd.account == nil { + utils.Logger.Warning(fmt.Sprintf("Account: %s, not found (%v)", cd.GetAccountKey(), err)) + return nil, utils.ErrAccountNotFound } return cd.account, err } @@ -635,13 +639,9 @@ func (origCD *CallDescriptor) getMaxSessionDuration(origAcc *Account) (time.Dura func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err error) { cd.account = nil // make sure it's not cached - if account, err := cd.getAccount(); err != nil || account == nil { - utils.Logger.Err(fmt.Sprintf("Account: %s, not found", cd.GetAccountKey())) - return 0, utils.ErrAccountNotFound + if account, err := cd.getAccount(); err != nil { + return 0, err } else { - if account.Disabled { - return 0, utils.ErrAccountDisabled - } if memberIds, err := account.GetUniqueSharedGroupMembers(cd); err == nil { if _, err := Guardian.Guard(func() (interface{}, error) { duration, err = cd.getMaxSessionDuration(account) @@ -705,13 +705,9 @@ func (cd *CallDescriptor) debit(account *Account, dryRun bool, goNegative bool) func (cd *CallDescriptor) Debit() (cc *CallCost, err error) { cd.account = nil // make sure it's not cached // lock all group members - if account, err := cd.getAccount(); err != nil || account == nil { - utils.Logger.Err(fmt.Sprintf("Account: %s, not found", cd.GetAccountKey())) - return nil, utils.ErrAccountNotFound + if account, err := cd.getAccount(); err != nil { + return nil, err } else { - if account.Disabled { - return nil, utils.ErrAccountDisabled - } if memberIds, sgerr := account.GetUniqueSharedGroupMembers(cd); sgerr == nil { _, err = Guardian.Guard(func() (interface{}, error) { cc, err = cd.debit(account, cd.DryRun, true) @@ -730,13 +726,9 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) { // by the GetMaxSessionDuration method. The amount filed has to be filled in call descriptor. func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) { cd.account = nil // make sure it's not cached - if account, err := cd.getAccount(); err != nil || account == nil { - utils.Logger.Err(fmt.Sprintf("Account: %s, not found", cd.GetAccountKey())) - return nil, utils.ErrAccountNotFound + if account, err := cd.getAccount(); err != nil { + return nil, err } else { - if account.Disabled { - return nil, utils.ErrAccountDisabled - } //log.Printf("ACC: %+v", account) if memberIDs, err := account.GetUniqueSharedGroupMembers(cd); err == nil { _, err = Guardian.Guard(func() (interface{}, error) { From 2a7087ba5432cb7e5626b49d618da90517ea372e Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 30 Mar 2016 00:05:43 +0300 Subject: [PATCH 3/8] empty blocker balance considered --- engine/account.go | 2 +- engine/calldesc_test.go | 53 ++++++++++++++++++++++++++++++++++++++- engine/loader_csv_test.go | 12 ++++++--- engine/storage_test.go | 4 +-- 4 files changed, 63 insertions(+), 8 deletions(-) diff --git a/engine/account.go b/engine/account.go index e231875a2..45497e32f 100644 --- a/engine/account.go +++ b/engine/account.go @@ -287,7 +287,7 @@ func (ub *Account) getBalancesForPrefix(prefix, category, direction, tor string, if b.Disabled { continue } - if b.IsExpired() || (len(b.SharedGroups) == 0 && b.GetValue() <= 0) { + if b.IsExpired() || (len(b.SharedGroups) == 0 && b.GetValue() <= 0 && !b.Blocker) { continue } if sharedGroup != "" && b.SharedGroups[sharedGroup] == false { diff --git a/engine/calldesc_test.go b/engine/calldesc_test.go index 540bc8dad..ddde0471f 100644 --- a/engine/calldesc_test.go +++ b/engine/calldesc_test.go @@ -566,7 +566,7 @@ func TestGetMaxSessiontWithBlocker(t *testing.T) { MaxCostSoFar: 0, } result, err := cd.GetMaxSessionDuration() - expected := 30 * time.Minute + expected := 17 * time.Minute if result != expected || err != nil { t.Errorf("Expected %v was %v (%v)", expected, result, err) } @@ -588,6 +588,57 @@ func TestGetMaxSessiontWithBlocker(t *testing.T) { } } +func TestGetMaxSessiontWithBlockerEmpty(t *testing.T) { + ap, _ := ratingStorage.GetActionPlan("BLOCK_EMPTY_AT", false) + for _, at := range ap.ActionTimings { + at.accountIDs = ap.AccountIDs + at.Execute() + } + acc, err := accountingStorage.GetAccount("cgrates.org:block_empty") + if err != nil { + t.Error("error getting account: ", err) + } + if len(acc.BalanceMap[utils.MONETARY]) != 2 || + acc.BalanceMap[utils.MONETARY][0].Blocker != true { + for _, b := range acc.BalanceMap[utils.MONETARY] { + t.Logf("B: %+v", b) + } + t.Error("Error executing action plan on account: ", acc.BalanceMap[utils.MONETARY]) + } + cd := &CallDescriptor{ + Direction: "*out", + Category: "call", + Tenant: "cgrates.org", + Subject: "block", + Account: "block_empty", + Destination: "0723", + TimeStart: time.Date(2016, 1, 13, 14, 0, 0, 0, time.UTC), + TimeEnd: time.Date(2016, 1, 13, 14, 30, 0, 0, time.UTC), + MaxCostSoFar: 0, + } + result, err := cd.GetMaxSessionDuration() + expected := 0 * time.Minute + if result != expected || err != nil { + t.Errorf("Expected %v was %v (%v)", expected, result, err) + } + cd = &CallDescriptor{ + Direction: "*out", + Category: "call", + Tenant: "cgrates.org", + Subject: "block", + Account: "block_empty", + Destination: "444", + TimeStart: time.Date(2016, 1, 13, 14, 0, 0, 0, time.UTC), + TimeEnd: time.Date(2016, 1, 13, 14, 30, 0, 0, time.UTC), + MaxCostSoFar: 0, + } + result, err = cd.GetMaxSessionDuration() + expected = 30 * time.Minute + if result != expected || err != nil { + t.Errorf("Expected %v was %v (%v)", expected, result, err) + } +} + func TestGetCostWithMaxCost(t *testing.T) { ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false) for _, at := range ap.ActionTimings { diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index 7c17f02fe..357f56fb8 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -172,8 +172,10 @@ EE0,*topup_reset,,,,*monetary,*out,,,,SG3,*unlimited,,0,10,false,false,10 EE0,*allow_negative,,,,*monetary,*out,,,,,*unlimited,,0,10,false,false,10 DEFEE,*cdrlog,"{""Category"":""^ddi"",""MediationRunId"":""^did_run""}",,,,,,,,,,,,,false,false,10 NEG,*allow_negative,,,,*monetary,*out,,,,,*unlimited,,0,10,false,false,10 -BLOCK,*topup,,,bblocker,*monetary,*out,,NAT,,,*unlimited,,10,20,true,false,20 +BLOCK,*topup,,,bblocker,*monetary,*out,,NAT,,,*unlimited,,1,20,true,false,20 BLOCK,*topup,,,bfree,*monetary,*out,,,,,*unlimited,,20,10,false,false,10 +BLOCK_EMPTY,*topup,,,bblocker,*monetary,*out,,NAT,,,*unlimited,,0,20,true,false,20 +BLOCK_EMPTY,*topup,,,bfree,*monetary,*out,,,,,*unlimited,,20,10,false,false,10 FILTER,*topup,,"{""*and"":[{""Value"":{""*lt"":0}},{""Id"":{""*eq"":""*default""}}]}",bfree,*monetary,*out,,,,,*unlimited,,20,10,false,false,10 EXP,*topup,,,,*voice,*out,,,,,*monthly,*any,300,10,false,false,10 NOEXP,*topup,,,,*voice,*out,,,,,*unlimited,*any,50,10,false,false,10 @@ -188,6 +190,7 @@ TOPUP_SHARED10_AT,SE10,*asap,10 TOPUP_EMPTY_AT,EE0,*asap,10 POST_AT,NEG,*asap,10 BLOCK_AT,BLOCK,*asap,10 +BLOCK_EMPTY_AT,BLOCK_EMPTY,*asap,10 EXP_AT,EXP,*asap,10 ` @@ -216,6 +219,7 @@ vdf,emptyY,TOPUP_EMPTY_AT,,, vdf,post,POST_AT,,, cgrates.org,alodis,TOPUP_EMPTY_AT,,true,true cgrates.org,block,BLOCK_AT,,false,false +cgrates.org,block_empty,BLOCK_EMPTY_AT,,false,false cgrates.org,expo,EXP_AT,,false,false cgrates.org,expnoexp,,,false,false ` @@ -820,7 +824,7 @@ func TestLoadRatingProfiles(t *testing.T) { } func TestLoadActions(t *testing.T) { - if len(csvr.actions) != 13 { + if len(csvr.actions) != 14 { t.Error("Failed to load actions: ", len(csvr.actions)) } as1 := csvr.actions["MINI"] @@ -1006,7 +1010,7 @@ func TestLoadLCRs(t *testing.T) { } func TestLoadActionTimings(t *testing.T) { - if len(csvr.actionPlans) != 8 { + if len(csvr.actionPlans) != 9 { t.Error("Failed to load action timings: ", len(csvr.actionPlans)) } atm := csvr.actionPlans["MORE_MINUTES"] @@ -1101,7 +1105,7 @@ func TestLoadActionTriggers(t *testing.T) { } func TestLoadAccountActions(t *testing.T) { - if len(csvr.accountActions) != 14 { + if len(csvr.accountActions) != 15 { t.Error("Failed to load account actions: ", len(csvr.accountActions)) } aa := csvr.accountActions["vdf:minitsboy"] diff --git a/engine/storage_test.go b/engine/storage_test.go index cdb9d6802..eb007243b 100644 --- a/engine/storage_test.go +++ b/engine/storage_test.go @@ -274,7 +274,7 @@ func TestDifferentUuid(t *testing.T) { func TestStorageTask(t *testing.T) { // clean previous unused tasks - for i := 0; i < 18; i++ { + for i := 0; i < 19; i++ { ratingStorage.PopTask() } @@ -303,7 +303,7 @@ func TestStorageTask(t *testing.T) { t.Error("Error poping task: ", task, err) } if task, err := ratingStorage.PopTask(); err == nil && task != nil { - t.Errorf("Error poping task %+v, %v: ", task, err) + t.Errorf("Error poping task %+v, %v ", task, err) } } From 666de5469f48064ef8727a138a655da02b5811e1 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 30 Mar 2016 00:16:46 +0300 Subject: [PATCH 4/8] better generic make negative in actions --- engine/action.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/action.go b/engine/action.go index 6c57e8aaa..07152a58d 100644 --- a/engine/action.go +++ b/engine/action.go @@ -370,7 +370,7 @@ func resetCountersAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Ac } func genericMakeNegative(a *Action) { - if a.Balance != nil && a.Balance.GetValue() >= 0 { // only apply if not allready negative + if a.Balance != nil && a.Balance.GetValue() > 0 { // only apply if not allready negative a.Balance.SetValue(-a.Balance.GetValue()) } } From 39d40732ede0b3c6237e83f83430c0a5c24c2b79 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 30 Mar 2016 14:24:22 +0300 Subject: [PATCH 5/8] call sessionEnd explicitly on init error --- sessionmanager/smgeneric.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index d8f1bd25f..1089304a8 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -241,6 +241,7 @@ func (self *SMGeneric) SessionUpdate(gev SMGenericEvent, clnt *rpc2.Client) (tim // Called on session start func (self *SMGeneric) SessionStart(gev SMGenericEvent, clnt *rpc2.Client) (time.Duration, error) { if err := self.sessionStart(gev, getClientConnId(clnt)); err != nil { + self.sessionEnd(gev.GetUUID(), 0) return nilDuration, err } return self.SessionUpdate(gev, clnt) From 8f6d56eb13b855273e9f90d7db797e8e7e3b3ad5 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 30 Mar 2016 18:18:03 +0300 Subject: [PATCH 6/8] second try at closing session after init error --- sessionmanager/smgeneric.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 1089304a8..7f3adbc15 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -244,7 +244,11 @@ func (self *SMGeneric) SessionStart(gev SMGenericEvent, clnt *rpc2.Client) (time self.sessionEnd(gev.GetUUID(), 0) return nilDuration, err } - return self.SessionUpdate(gev, clnt) + d, err := self.SessionUpdate(gev, clnt) + if err != nil { + self.sessionEnd(gev.GetUUID(), 0) + } + return d, err } // Called on session end, should stop debit loop From a8220e9d9ea9f9e2eabd7d45d2d161a7d2ac112d Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 31 Mar 2016 00:44:04 +0300 Subject: [PATCH 7/8] test for #399 --- data/tariffplans/testtp/ActionTriggers.csv | 1 + general_tests/tp_it_test.go | 119 +++++++++++++++++++++ 2 files changed, 120 insertions(+) create mode 100644 general_tests/tp_it_test.go diff --git a/data/tariffplans/testtp/ActionTriggers.csv b/data/tariffplans/testtp/ActionTriggers.csv index 795ef6ab8..8a1c88cac 100644 --- a/data/tariffplans/testtp/ActionTriggers.csv +++ b/data/tariffplans/testtp/ActionTriggers.csv @@ -2,6 +2,7 @@ STANDARD_TRIGGERS,,*min_balance,2,false,0,,,,*monetary,*out,,,,,,,,,,,LOG_BALANCE,10 STANDARD_TRIGGERS,,*max_balance,20,false,0,,,,*monetary,*out,,,,,,,,,,,LOG_BALANCE,10 STANDARD_TRIGGERS,,*max_event_counter,15,false,0,,,,*monetary,*out,,FS_USERS,,,,,,,,,LOG_BALANCE,10 +STANDARD_TRIGGERS,,*max_balance_counter,1,false,0,,,,*monetary,*out,,,,,,,,,,,LOG_BALANCE,10 CDRST1_WARN_ASR,,*min_asr,45,true,1h,,,,,,,,,,,,,,,3,CDRST_WARN_HTTP,10 CDRST1_WARN_ACD,,*min_acd,10,true,1h,,,,,,,,,,,,,,,5,CDRST_WARN_HTTP,10 CDRST1_WARN_ACC,,*max_acc,10,true,10m,,,,,,,,,,,,,,,5,CDRST_WARN_HTTP,10 diff --git a/general_tests/tp_it_test.go b/general_tests/tp_it_test.go new file mode 100644 index 000000000..3eef405e1 --- /dev/null +++ b/general_tests/tp_it_test.go @@ -0,0 +1,119 @@ +package general_tests + +import ( + "net/rpc" + "net/rpc/jsonrpc" + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var tpCfgPath string +var tpCfg *config.CGRConfig +var tpRPC *rpc.Client +var tpLoadInst engine.LoadInstance // Share load information between tests + +func TestTpInitCfg(t *testing.T) { + if !*testIntegration { + return + } + tpCfgPath = path.Join(*dataDir, "conf", "samples", "tutlocal") + // Init config first + var err error + tpCfg, err = config.NewCGRConfigFromFolder(tpCfgPath) + if err != nil { + t.Error(err) + } + tpCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(tpCfg) +} + +// Remove data in both rating and accounting db +func TestTpResetDataDb(t *testing.T) { + if !*testIntegration { + return + } + if err := engine.InitDataDb(tpCfg); err != nil { + t.Fatal(err) + } +} + +// Wipe out the cdr database +func TestTpResetStorDb(t *testing.T) { + if !*testIntegration { + return + } + if err := engine.InitStorDb(tpCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func TestTpStartEngine(t *testing.T) { + if !*testIntegration { + return + } + if _, err := engine.StopStartEngine(tpCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestTpRpcConn(t *testing.T) { + if !*testIntegration { + return + } + var err error + tpRPC, err = jsonrpc.Dial("tcp", tpCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +// Load the tariff plan, creating accounts and their balances +func TestTpLoadTariffPlanFromFolder(t *testing.T) { + if !*testIntegration { + return + } + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "testtp")} + if err := tpRPC.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &tpLoadInst); err != nil { + t.Error(err) + } else if tpLoadInst.LoadId == "" { + t.Error("Empty loadId received, loadInstance: ", tpLoadInst) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups +} + +func TestTpBalanceCounter(t *testing.T) { + if !*testIntegration { + return + } + tStart := time.Date(2016, 3, 31, 0, 0, 0, 0, time.UTC) + cd := engine.CallDescriptor{ + Direction: "*out", + Category: "call", + Tenant: "cgrates.org", + Subject: "1001", + Destination: "+49", + DurationIndex: 0, + TimeStart: tStart, + TimeEnd: tStart.Add(time.Duration(20) * time.Second), + } + var cc engine.CallCost + if err := tpRPC.Call("Responder.Debit", cd, &cc); err != nil { + t.Error("Got error on Responder.GetCost: ", err.Error()) + } else if cc.GetDuration() == 20 { + t.Errorf("Calling Responder.MaxDebit got callcost: %v", cc.GetDuration()) + } + var acnt *engine.Account + attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} + if err := tpRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error("Got error on ApierV2.GetAccount: ", err.Error()) + } else if acnt.UnitCounters[utils.MONETARY][1].Counters[0].Value != 20.0 { + t.Errorf("Calling ApierV2.GetBalance received: %s", utils.ToIJSON(acnt)) + } +} From ac1aeb9db47270908274e656dfbc771050dbdf76 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 31 Mar 2016 14:29:38 +0300 Subject: [PATCH 8/8] test for derived charges no credit --- data/tariffplans/testtp/AccountActions.csv | 1 + data/tariffplans/testtp/ActionPlans.csv | 3 +- data/tariffplans/testtp/Actions.csv | 3 +- data/tariffplans/testtp/DerivedChargers.csv | 1 + sessionmanager/data_it_test.go | 42 +++++++++++++++++++++ 5 files changed, 48 insertions(+), 2 deletions(-) diff --git a/data/tariffplans/testtp/AccountActions.csv b/data/tariffplans/testtp/AccountActions.csv index 4cf06f896..d04a5bde4 100644 --- a/data/tariffplans/testtp/AccountActions.csv +++ b/data/tariffplans/testtp/AccountActions.csv @@ -6,3 +6,4 @@ cgrates.org,1004,PREPAID_10,STANDARD_TRIGGERS,, cgrates.org,1005,PREPAID_10,STANDARD_TRIGGERS,, cgrates.org,1009,TEST_EXE,,, cgrates.org,1010,TEST_DATA_r,,true, +cgrates.org,1011,TEST_VOICE,,, diff --git a/data/tariffplans/testtp/ActionPlans.csv b/data/tariffplans/testtp/ActionPlans.csv index f41668f9b..45e11a89d 100644 --- a/data/tariffplans/testtp/ActionPlans.csv +++ b/data/tariffplans/testtp/ActionPlans.csv @@ -2,4 +2,5 @@ PREPAID_10,PREPAID_10,ASAP,10 PREPAID_10,BONUS_1,ASAP,10 TEST_EXE,TOPUP_EXE,ALWAYS,10 -TEST_DATA_r,TOPUP_DATA_r,ASAP,10 \ No newline at end of file +TEST_DATA_r,TOPUP_DATA_r,ASAP,10 +TEST_VOICE,TOPUP_VOICE,ASAP,10 diff --git a/data/tariffplans/testtp/Actions.csv b/data/tariffplans/testtp/Actions.csv index 666038a38..e85a4781e 100644 --- a/data/tariffplans/testtp/Actions.csv +++ b/data/tariffplans/testtp/Actions.csv @@ -6,4 +6,5 @@ CDRST_WARN_HTTP,*call_url,http://localhost:8080,,,,,,,,,,,,,false,false,10 CDRST_LOG,*log,,,,,,,,,,,,,,false,false,10 TOPUP_EXE,*topup,,,,*monetary,*out,,*any,,,*unlimited,,5,10,false,false,10 TOPUP_DATA_r,*topup,,,,*monetary,*out,,DATA_DEST,,,*unlimited,,5000000,10,false,false,10 -TOPUP_DATA_r,*topup,,,,*data,*out,,DATA_DEST,datar,,*unlimited,,50000000000,10,false,false,10 \ No newline at end of file +TOPUP_DATA_r,*topup,,,,*data,*out,,DATA_DEST,datar,,*unlimited,,50000000000,10,false,false,10 +TOPUP_VOICE,*topup,,,,*voice,*out,,GERMANY_MOBILE,,,*unlimited,,50000,10,false,false,10 \ No newline at end of file diff --git a/data/tariffplans/testtp/DerivedChargers.csv b/data/tariffplans/testtp/DerivedChargers.csv index b50586df9..ff24c376d 100644 --- a/data/tariffplans/testtp/DerivedChargers.csv +++ b/data/tariffplans/testtp/DerivedChargers.csv @@ -3,3 +3,4 @@ *out,cgrates.org,call,dan,dan,,extra2,,,,,,^ivo,^ivo,,,,,,*default,*default,*default,*default *out,cgrates.org,call,dan,dan,,extra3,~filterhdr1:s/(.+)/special_run3/,,,,,^runusr3,^runusr3,,,,,,*default,*default,*default,*default *out,cgrates.org,call,dan,*any,,extra1,,,,,,^rif2,^rif2,,,,,,*default,*default,*default,*default +*out,cgrates.org,call,1011,1011,GERMANY,extra1,,,+4915,,,,,,,,,,*default,*default,*default,*default \ No newline at end of file diff --git a/sessionmanager/data_it_test.go b/sessionmanager/data_it_test.go index b281c2985..a31f1048d 100644 --- a/sessionmanager/data_it_test.go +++ b/sessionmanager/data_it_test.go @@ -342,3 +342,45 @@ func TestSMGDataLastUsedMultipleData(t *testing.T) { t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue()) } } + +func TestSMGDataDerivedChargingNoCredit(t *testing.T) { + if !*testIntegration { + return + } + var acnt *engine.Account + attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1011"} + eAcntVal := 50000.000000 + if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } + smgEv := SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: utils.VOICE, + utils.ACCID: "12349", + utils.DIRECTION: utils.OUT, + utils.ACCOUNT: "1011", + utils.SUBJECT: "1011", + utils.DESTINATION: "+49", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: utils.META_PREPAID, + utils.SETUP_TIME: "2016-01-05 18:30:49", + utils.ANSWER_TIME: "2016-01-05 18:31:05", + utils.USAGE: "100", + } + var maxUsage float64 + if err := smgRPC.Call("SMGenericV1.SessionStart", smgEv, &maxUsage); err != nil { + t.Error(err) + } + if maxUsage != 100 { + t.Error("Bad max usage: ", maxUsage) + } + eAcntVal = 50000.000000 + if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal { + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } +}