diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go
index ac515b468..e79af9382 100644
--- a/cdrc/cdrc_test.go
+++ b/cdrc/cdrc_test.go
@@ -156,7 +156,7 @@ BYE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|14
}}
cdrc := &Cdrc{CdrFormat: utils.OSIPS_FLATSTORE, cdrSourceIds: []string{"TEST_CDRC"}, failedCallsPrefix: "missed_calls",
cdrFields: cdrFields, partialRecords: make(map[string]map[string]*PartialFlatstoreRecord),
- guard: engine.NewGuardianLock()}
+ guard: engine.Guardian}
cdrsContent := bytes.NewReader([]byte(flatstoreCdrs))
csvReader := csv.NewReader(cdrsContent)
csvReader.Comma = '|'
@@ -283,7 +283,7 @@ INVITE|324cb497|d4af7023|8deaadf2ae9a17809a391f05af31afb0@0:0:0:0:0:0:0:0|486|Bu
}}
cdrc := &Cdrc{CdrFormat: utils.OSIPS_FLATSTORE, cdrSourceIds: []string{"TEST_CDRC"}, failedCallsPrefix: "missed_calls",
cdrFields: cdrFields, partialRecords: make(map[string]map[string]*PartialFlatstoreRecord),
- guard: engine.NewGuardianLock()}
+ guard: engine.Guardian}
cdrsContent := bytes.NewReader([]byte(flatstoreCdrs))
csvReader := csv.NewReader(cdrsContent)
csvReader.Comma = '|'
diff --git a/cdrc/csv.go b/cdrc/csv.go
index c6daaaee0..740ff14ac 100644
--- a/cdrc/csv.go
+++ b/cdrc/csv.go
@@ -20,6 +20,7 @@ package cdrc
import (
"encoding/csv"
+ "encoding/json"
"errors"
"fmt"
"os"
@@ -93,7 +94,7 @@ func pairToRecord(part1, part2 *PartialFlatstoreRecord) ([]string, error) {
func NewPartialRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune) (*PartialRecordsCache, error) {
return &PartialRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep,
- partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.NewGuardianLock()}, nil
+ partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.Guardian}, nil
}
type PartialRecordsCache struct {
@@ -323,7 +324,12 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcId strin
for _, rsrFld := range httpFieldCfg.Value {
httpAddr += rsrFld.ParseValue("")
}
- if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, storedCdr); err != nil && httpFieldCfg.Mandatory {
+ var jsn []byte
+ jsn, err = json.Marshal(storedCdr)
+ if err != nil {
+ return nil, err
+ }
+ if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
return nil, err
} else {
fieldVal = string(outValByte)
diff --git a/cdrc/fwv.go b/cdrc/fwv.go
index b1fffe60d..7940d268d 100644
--- a/cdrc/fwv.go
+++ b/cdrc/fwv.go
@@ -20,16 +20,18 @@ package cdrc
import (
"bufio"
+ "encoding/json"
"fmt"
- "github.com/cgrates/cgrates/config"
- "github.com/cgrates/cgrates/engine"
- "github.com/cgrates/cgrates/utils"
"io"
"net/http"
"os"
"strconv"
"strings"
"time"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
)
func fwvValue(cdrLine string, indexStart, width int, padding string) string {
@@ -214,7 +216,12 @@ func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cfgKey string)
for _, rsrFld := range httpFieldCfg.Value {
httpAddr += rsrFld.ParseValue("")
}
- if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, storedCdr); err != nil && httpFieldCfg.Mandatory {
+ var jsn []byte
+ jsn, err = json.Marshal(storedCdr)
+ if err != nil {
+ return nil, err
+ }
+ if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
return nil, err
} else {
fieldVal = string(outValByte)
diff --git a/cdre/cdrexporter.go b/cdre/cdrexporter.go
index e90b88e03..c269385e1 100644
--- a/cdre/cdrexporter.go
+++ b/cdre/cdrexporter.go
@@ -352,9 +352,13 @@ func (cdre *CdrExporter) processCdr(cdr *engine.CDR) error {
case utils.META_HTTP_POST:
var outValByte []byte
httpAddr := cfgFld.Value.Id()
+ jsn, err := json.Marshal(cdr)
+ if err != nil {
+ return err
+ }
if len(httpAddr) == 0 {
err = fmt.Errorf("Empty http address for field %s type %s", cfgFld.Tag, cfgFld.Type)
- } else if outValByte, err = utils.HttpJsonPost(httpAddr, cdre.httpSkipTlsCheck, cdr); err == nil {
+ } else if outValByte, err = utils.HttpJsonPost(httpAddr, cdre.httpSkipTlsCheck, jsn); err == nil {
outVal = string(outValByte)
if len(outVal) == 0 && cfgFld.Mandatory {
err = fmt.Errorf("Empty result for http_post field: %s", cfgFld.Tag)
diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go
index 1fbdc9c66..e4597655d 100644
--- a/cmd/cgr-tester/cgr-tester.go
+++ b/cmd/cgr-tester/cgr-tester.go
@@ -153,7 +153,6 @@ func durRemoteRater(cd *engine.CallDescriptor) (time.Duration, error) {
func main() {
flag.Parse()
- runtime.GOMAXPROCS(runtime.NumCPU() - 1)
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
diff --git a/data/tariffplans/testtp/AccountActions.csv b/data/tariffplans/testtp/AccountActions.csv
index 4cf06f896..d04a5bde4 100644
--- a/data/tariffplans/testtp/AccountActions.csv
+++ b/data/tariffplans/testtp/AccountActions.csv
@@ -6,3 +6,4 @@ cgrates.org,1004,PREPAID_10,STANDARD_TRIGGERS,,
cgrates.org,1005,PREPAID_10,STANDARD_TRIGGERS,,
cgrates.org,1009,TEST_EXE,,,
cgrates.org,1010,TEST_DATA_r,,true,
+cgrates.org,1011,TEST_VOICE,,,
diff --git a/data/tariffplans/testtp/ActionPlans.csv b/data/tariffplans/testtp/ActionPlans.csv
index f41668f9b..45e11a89d 100644
--- a/data/tariffplans/testtp/ActionPlans.csv
+++ b/data/tariffplans/testtp/ActionPlans.csv
@@ -2,4 +2,5 @@
PREPAID_10,PREPAID_10,ASAP,10
PREPAID_10,BONUS_1,ASAP,10
TEST_EXE,TOPUP_EXE,ALWAYS,10
-TEST_DATA_r,TOPUP_DATA_r,ASAP,10
\ No newline at end of file
+TEST_DATA_r,TOPUP_DATA_r,ASAP,10
+TEST_VOICE,TOPUP_VOICE,ASAP,10
diff --git a/data/tariffplans/testtp/ActionTriggers.csv b/data/tariffplans/testtp/ActionTriggers.csv
index 795ef6ab8..8a1c88cac 100644
--- a/data/tariffplans/testtp/ActionTriggers.csv
+++ b/data/tariffplans/testtp/ActionTriggers.csv
@@ -2,6 +2,7 @@
STANDARD_TRIGGERS,,*min_balance,2,false,0,,,,*monetary,*out,,,,,,,,,,,LOG_BALANCE,10
STANDARD_TRIGGERS,,*max_balance,20,false,0,,,,*monetary,*out,,,,,,,,,,,LOG_BALANCE,10
STANDARD_TRIGGERS,,*max_event_counter,15,false,0,,,,*monetary,*out,,FS_USERS,,,,,,,,,LOG_BALANCE,10
+STANDARD_TRIGGERS,,*max_balance_counter,1,false,0,,,,*monetary,*out,,,,,,,,,,,LOG_BALANCE,10
CDRST1_WARN_ASR,,*min_asr,45,true,1h,,,,,,,,,,,,,,,3,CDRST_WARN_HTTP,10
CDRST1_WARN_ACD,,*min_acd,10,true,1h,,,,,,,,,,,,,,,5,CDRST_WARN_HTTP,10
CDRST1_WARN_ACC,,*max_acc,10,true,10m,,,,,,,,,,,,,,,5,CDRST_WARN_HTTP,10
diff --git a/data/tariffplans/testtp/Actions.csv b/data/tariffplans/testtp/Actions.csv
index 666038a38..e85a4781e 100644
--- a/data/tariffplans/testtp/Actions.csv
+++ b/data/tariffplans/testtp/Actions.csv
@@ -6,4 +6,5 @@ CDRST_WARN_HTTP,*call_url,http://localhost:8080,,,,,,,,,,,,,false,false,10
CDRST_LOG,*log,,,,,,,,,,,,,,false,false,10
TOPUP_EXE,*topup,,,,*monetary,*out,,*any,,,*unlimited,,5,10,false,false,10
TOPUP_DATA_r,*topup,,,,*monetary,*out,,DATA_DEST,,,*unlimited,,5000000,10,false,false,10
-TOPUP_DATA_r,*topup,,,,*data,*out,,DATA_DEST,datar,,*unlimited,,50000000000,10,false,false,10
\ No newline at end of file
+TOPUP_DATA_r,*topup,,,,*data,*out,,DATA_DEST,datar,,*unlimited,,50000000000,10,false,false,10
+TOPUP_VOICE,*topup,,,,*voice,*out,,GERMANY_MOBILE,,,*unlimited,,50000,10,false,false,10
\ No newline at end of file
diff --git a/data/tariffplans/testtp/DerivedChargers.csv b/data/tariffplans/testtp/DerivedChargers.csv
index b50586df9..ff24c376d 100644
--- a/data/tariffplans/testtp/DerivedChargers.csv
+++ b/data/tariffplans/testtp/DerivedChargers.csv
@@ -3,3 +3,4 @@
*out,cgrates.org,call,dan,dan,,extra2,,,,,,^ivo,^ivo,,,,,,*default,*default,*default,*default
*out,cgrates.org,call,dan,dan,,extra3,~filterhdr1:s/(.+)/special_run3/,,,,,^runusr3,^runusr3,,,,,,*default,*default,*default,*default
*out,cgrates.org,call,dan,*any,,extra1,,,,,,^rif2,^rif2,,,,,,*default,*default,*default,*default
+*out,cgrates.org,call,1011,1011,GERMANY,extra1,,,+4915,,,,,,,,,,*default,*default,*default,*default
\ No newline at end of file
diff --git a/engine/account.go b/engine/account.go
index e231875a2..45497e32f 100644
--- a/engine/account.go
+++ b/engine/account.go
@@ -287,7 +287,7 @@ func (ub *Account) getBalancesForPrefix(prefix, category, direction, tor string,
if b.Disabled {
continue
}
- if b.IsExpired() || (len(b.SharedGroups) == 0 && b.GetValue() <= 0) {
+ if b.IsExpired() || (len(b.SharedGroups) == 0 && b.GetValue() <= 0 && !b.Blocker) {
continue
}
if sharedGroup != "" && b.SharedGroups[sharedGroup] == false {
diff --git a/engine/action.go b/engine/action.go
index 6c57e8aaa..07152a58d 100644
--- a/engine/action.go
+++ b/engine/action.go
@@ -370,7 +370,7 @@ func resetCountersAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Ac
}
func genericMakeNegative(a *Action) {
- if a.Balance != nil && a.Balance.GetValue() >= 0 { // only apply if not allready negative
+ if a.Balance != nil && a.Balance.GetValue() > 0 { // only apply if not allready negative
a.Balance.SetValue(-a.Balance.GetValue())
}
}
diff --git a/engine/calldesc.go b/engine/calldesc.go
index d5f3c155f..9ab8a9373 100644
--- a/engine/calldesc.go
+++ b/engine/calldesc.go
@@ -188,7 +188,11 @@ func (cd *CallDescriptor) getAccount() (ub *Account, err error) {
cd.account, err = accountingStorage.GetAccount(cd.GetAccountKey())
}
if cd.account != nil && cd.account.Disabled {
- return nil, fmt.Errorf("User %s is disabled", cd.account.ID)
+ return nil, utils.ErrAccountDisabled
+ }
+ if err != nil || cd.account == nil {
+ utils.Logger.Warning(fmt.Sprintf("Account: %s, not found (%v)", cd.GetAccountKey(), err))
+ return nil, utils.ErrAccountNotFound
}
return cd.account, err
}
@@ -635,13 +639,9 @@ func (origCD *CallDescriptor) getMaxSessionDuration(origAcc *Account) (time.Dura
func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err error) {
cd.account = nil // make sure it's not cached
- if account, err := cd.getAccount(); err != nil || account == nil {
- utils.Logger.Err(fmt.Sprintf("Account: %s, not found", cd.GetAccountKey()))
- return 0, utils.ErrAccountNotFound
+ if account, err := cd.getAccount(); err != nil {
+ return 0, err
} else {
- if account.Disabled {
- return 0, utils.ErrAccountDisabled
- }
if memberIds, err := account.GetUniqueSharedGroupMembers(cd); err == nil {
if _, err := Guardian.Guard(func() (interface{}, error) {
duration, err = cd.getMaxSessionDuration(account)
@@ -705,13 +705,9 @@ func (cd *CallDescriptor) debit(account *Account, dryRun bool, goNegative bool)
func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
cd.account = nil // make sure it's not cached
// lock all group members
- if account, err := cd.getAccount(); err != nil || account == nil {
- utils.Logger.Err(fmt.Sprintf("Account: %s, not found", cd.GetAccountKey()))
- return nil, utils.ErrAccountNotFound
+ if account, err := cd.getAccount(); err != nil {
+ return nil, err
} else {
- if account.Disabled {
- return nil, utils.ErrAccountDisabled
- }
if memberIds, sgerr := account.GetUniqueSharedGroupMembers(cd); sgerr == nil {
_, err = Guardian.Guard(func() (interface{}, error) {
cc, err = cd.debit(account, cd.DryRun, true)
@@ -730,13 +726,9 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
// by the GetMaxSessionDuration method. The amount filed has to be filled in call descriptor.
func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
cd.account = nil // make sure it's not cached
- if account, err := cd.getAccount(); err != nil || account == nil {
- utils.Logger.Err(fmt.Sprintf("Account: %s, not found", cd.GetAccountKey()))
- return nil, utils.ErrAccountNotFound
+ if account, err := cd.getAccount(); err != nil {
+ return nil, err
} else {
- if account.Disabled {
- return nil, utils.ErrAccountDisabled
- }
//log.Printf("ACC: %+v", account)
if memberIDs, err := account.GetUniqueSharedGroupMembers(cd); err == nil {
_, err = Guardian.Guard(func() (interface{}, error) {
diff --git a/engine/calldesc_test.go b/engine/calldesc_test.go
index 540bc8dad..ddde0471f 100644
--- a/engine/calldesc_test.go
+++ b/engine/calldesc_test.go
@@ -566,7 +566,7 @@ func TestGetMaxSessiontWithBlocker(t *testing.T) {
MaxCostSoFar: 0,
}
result, err := cd.GetMaxSessionDuration()
- expected := 30 * time.Minute
+ expected := 17 * time.Minute
if result != expected || err != nil {
t.Errorf("Expected %v was %v (%v)", expected, result, err)
}
@@ -588,6 +588,57 @@ func TestGetMaxSessiontWithBlocker(t *testing.T) {
}
}
+func TestGetMaxSessiontWithBlockerEmpty(t *testing.T) {
+ ap, _ := ratingStorage.GetActionPlan("BLOCK_EMPTY_AT", false)
+ for _, at := range ap.ActionTimings {
+ at.accountIDs = ap.AccountIDs
+ at.Execute()
+ }
+ acc, err := accountingStorage.GetAccount("cgrates.org:block_empty")
+ if err != nil {
+ t.Error("error getting account: ", err)
+ }
+ if len(acc.BalanceMap[utils.MONETARY]) != 2 ||
+ acc.BalanceMap[utils.MONETARY][0].Blocker != true {
+ for _, b := range acc.BalanceMap[utils.MONETARY] {
+ t.Logf("B: %+v", b)
+ }
+ t.Error("Error executing action plan on account: ", acc.BalanceMap[utils.MONETARY])
+ }
+ cd := &CallDescriptor{
+ Direction: "*out",
+ Category: "call",
+ Tenant: "cgrates.org",
+ Subject: "block",
+ Account: "block_empty",
+ Destination: "0723",
+ TimeStart: time.Date(2016, 1, 13, 14, 0, 0, 0, time.UTC),
+ TimeEnd: time.Date(2016, 1, 13, 14, 30, 0, 0, time.UTC),
+ MaxCostSoFar: 0,
+ }
+ result, err := cd.GetMaxSessionDuration()
+ expected := 0 * time.Minute
+ if result != expected || err != nil {
+ t.Errorf("Expected %v was %v (%v)", expected, result, err)
+ }
+ cd = &CallDescriptor{
+ Direction: "*out",
+ Category: "call",
+ Tenant: "cgrates.org",
+ Subject: "block",
+ Account: "block_empty",
+ Destination: "444",
+ TimeStart: time.Date(2016, 1, 13, 14, 0, 0, 0, time.UTC),
+ TimeEnd: time.Date(2016, 1, 13, 14, 30, 0, 0, time.UTC),
+ MaxCostSoFar: 0,
+ }
+ result, err = cd.GetMaxSessionDuration()
+ expected = 30 * time.Minute
+ if result != expected || err != nil {
+ t.Errorf("Expected %v was %v (%v)", expected, result, err)
+ }
+}
+
func TestGetCostWithMaxCost(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {
diff --git a/engine/cdr_local_test.go b/engine/cdr_local_test.go
index 823e8f6d5..edf7c8880 100644
--- a/engine/cdr_local_test.go
+++ b/engine/cdr_local_test.go
@@ -19,10 +19,12 @@ along with this program. If not, see
package engine
import (
+ "encoding/json"
"flag"
- "github.com/cgrates/cgrates/utils"
"testing"
"time"
+
+ "github.com/cgrates/cgrates/utils"
)
// Arguments received via test command
@@ -42,7 +44,8 @@ func TestHttpJsonPost(t *testing.T) {
RunID: utils.DEFAULT_RUNID,
Usage: "0.00000001", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
}
- if _, err := utils.HttpJsonPost("http://localhost:8000", false, cdrOut); err == nil {
+ jsn, _ := json.Marshal(cdrOut)
+ if _, err := utils.HttpJsonPost("http://localhost:8000", false, jsn); err == nil {
t.Error(err)
}
}
diff --git a/engine/cdrs.go b/engine/cdrs.go
index 122bd5fbe..dd164fe03 100644
--- a/engine/cdrs.go
+++ b/engine/cdrs.go
@@ -70,7 +70,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
}
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, pubsub rpcclient.RpcClientConnection, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) {
- return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{locksMap: make(map[string]chan bool)}}, nil
+ return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian}, nil
}
type CdrServer struct {
diff --git a/engine/guardian.go b/engine/guardian.go
index dc2975aff..6d6664e39 100644
--- a/engine/guardian.go
+++ b/engine/guardian.go
@@ -26,10 +26,6 @@ import (
// global package variable
var Guardian = &GuardianLock{locksMap: make(map[string]chan bool)}
-func NewGuardianLock() *GuardianLock {
- return &GuardianLock{locksMap: make(map[string]chan bool)}
-}
-
type GuardianLock struct {
locksMap map[string]chan bool
mu sync.RWMutex
diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go
index 7c17f02fe..357f56fb8 100644
--- a/engine/loader_csv_test.go
+++ b/engine/loader_csv_test.go
@@ -172,8 +172,10 @@ EE0,*topup_reset,,,,*monetary,*out,,,,SG3,*unlimited,,0,10,false,false,10
EE0,*allow_negative,,,,*monetary,*out,,,,,*unlimited,,0,10,false,false,10
DEFEE,*cdrlog,"{""Category"":""^ddi"",""MediationRunId"":""^did_run""}",,,,,,,,,,,,,false,false,10
NEG,*allow_negative,,,,*monetary,*out,,,,,*unlimited,,0,10,false,false,10
-BLOCK,*topup,,,bblocker,*monetary,*out,,NAT,,,*unlimited,,10,20,true,false,20
+BLOCK,*topup,,,bblocker,*monetary,*out,,NAT,,,*unlimited,,1,20,true,false,20
BLOCK,*topup,,,bfree,*monetary,*out,,,,,*unlimited,,20,10,false,false,10
+BLOCK_EMPTY,*topup,,,bblocker,*monetary,*out,,NAT,,,*unlimited,,0,20,true,false,20
+BLOCK_EMPTY,*topup,,,bfree,*monetary,*out,,,,,*unlimited,,20,10,false,false,10
FILTER,*topup,,"{""*and"":[{""Value"":{""*lt"":0}},{""Id"":{""*eq"":""*default""}}]}",bfree,*monetary,*out,,,,,*unlimited,,20,10,false,false,10
EXP,*topup,,,,*voice,*out,,,,,*monthly,*any,300,10,false,false,10
NOEXP,*topup,,,,*voice,*out,,,,,*unlimited,*any,50,10,false,false,10
@@ -188,6 +190,7 @@ TOPUP_SHARED10_AT,SE10,*asap,10
TOPUP_EMPTY_AT,EE0,*asap,10
POST_AT,NEG,*asap,10
BLOCK_AT,BLOCK,*asap,10
+BLOCK_EMPTY_AT,BLOCK_EMPTY,*asap,10
EXP_AT,EXP,*asap,10
`
@@ -216,6 +219,7 @@ vdf,emptyY,TOPUP_EMPTY_AT,,,
vdf,post,POST_AT,,,
cgrates.org,alodis,TOPUP_EMPTY_AT,,true,true
cgrates.org,block,BLOCK_AT,,false,false
+cgrates.org,block_empty,BLOCK_EMPTY_AT,,false,false
cgrates.org,expo,EXP_AT,,false,false
cgrates.org,expnoexp,,,false,false
`
@@ -820,7 +824,7 @@ func TestLoadRatingProfiles(t *testing.T) {
}
func TestLoadActions(t *testing.T) {
- if len(csvr.actions) != 13 {
+ if len(csvr.actions) != 14 {
t.Error("Failed to load actions: ", len(csvr.actions))
}
as1 := csvr.actions["MINI"]
@@ -1006,7 +1010,7 @@ func TestLoadLCRs(t *testing.T) {
}
func TestLoadActionTimings(t *testing.T) {
- if len(csvr.actionPlans) != 8 {
+ if len(csvr.actionPlans) != 9 {
t.Error("Failed to load action timings: ", len(csvr.actionPlans))
}
atm := csvr.actionPlans["MORE_MINUTES"]
@@ -1101,7 +1105,7 @@ func TestLoadActionTriggers(t *testing.T) {
}
func TestLoadAccountActions(t *testing.T) {
- if len(csvr.accountActions) != 14 {
+ if len(csvr.accountActions) != 15 {
t.Error("Failed to load account actions: ", len(csvr.accountActions))
}
aa := csvr.accountActions["vdf:minitsboy"]
diff --git a/engine/pubsub.go b/engine/pubsub.go
index 0d4dd17c1..6d9bcd1ea 100644
--- a/engine/pubsub.go
+++ b/engine/pubsub.go
@@ -1,6 +1,7 @@
package engine
import (
+ "encoding/json"
"errors"
"fmt"
"sync"
@@ -43,7 +44,7 @@ type SubscriberData struct {
type PubSub struct {
subscribers map[string]*SubscriberData
ttlVerify bool
- pubFunc func(string, bool, interface{}) ([]byte, error)
+ pubFunc func(string, bool, []byte) ([]byte, error)
mux *sync.Mutex
accountDb AccountingStorage
}
@@ -140,12 +141,16 @@ func (ps *PubSub) Publish(evt CgrEvent, reply *string) error {
transport := split[0]
address := split[1]
ttlVerify := ps.ttlVerify
+ jsn, err := json.Marshal(evt)
+ if err != nil {
+ return err
+ }
switch transport {
case utils.META_HTTP_POST:
go func() {
delay := utils.Fib()
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
- if _, err := ps.pubFunc(address, ttlVerify, evt); err == nil {
+ if _, err := ps.pubFunc(address, ttlVerify, jsn); err == nil {
break // Success, no need to reinterate
} else if i == 4 { // Last iteration, syslog the warning
utils.Logger.Warning(fmt.Sprintf(" Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), evt["EventName"]))
diff --git a/engine/pubsub_test.go b/engine/pubsub_test.go
index 5345a06c0..ac28837a8 100644
--- a/engine/pubsub_test.go
+++ b/engine/pubsub_test.go
@@ -116,43 +116,9 @@ func TestUnsubscribeSave(t *testing.T) {
}
}
-func TestPublish(t *testing.T) {
- ps := NewPubSub(accountingStorage, true)
- ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
- obj.(CgrEvent)["called"] = url
- return nil, nil
- }
- var r string
- if err := ps.Subscribe(SubscribeInfo{
- EventFilter: "EventName/test",
- Transport: utils.META_HTTP_POST,
- Address: "url",
- LifeSpan: time.Second,
- }, &r); err != nil {
- t.Error("Error subscribing: ", err)
- }
- m := make(map[string]string)
- m["EventFilter"] = "test"
- if err := ps.Publish(m, &r); err != nil {
- t.Error("Error publishing: ", err)
- }
- for i := 0; i < 1000; i++ { // wait for the theread to populate map
- if len(m) == 2 {
- time.Sleep(time.Microsecond)
- } else {
- break
- }
- }
- if r, exists := m["called"]; !exists || r != "url" {
- t.Error("Error calling publish function: ", m)
- }
-}
-
func TestPublishExpired(t *testing.T) {
ps := NewPubSub(accountingStorage, true)
- ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
- m := obj.(map[string]string)
- m["called"] = "yes"
+ ps.pubFunc = func(url string, ttl bool, obj []byte) ([]byte, error) {
return nil, nil
}
var r string
@@ -174,9 +140,7 @@ func TestPublishExpired(t *testing.T) {
func TestPublishExpiredSave(t *testing.T) {
ps := NewPubSub(accountingStorage, true)
- ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
- m := obj.(map[string]string)
- m["called"] = "yes"
+ ps.pubFunc = func(url string, ttl bool, obj []byte) ([]byte, error) {
return nil, nil
}
var r string
diff --git a/engine/storage_map.go b/engine/storage_map.go
index d87fdcddd..f7e21306a 100644
--- a/engine/storage_map.go
+++ b/engine/storage_map.go
@@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"io/ioutil"
+ "sync"
"strings"
"time"
@@ -36,6 +37,7 @@ type MapStorage struct {
dict map[string][]byte
tasks [][]byte
ms Marshaler
+ mu sync.RWMutex
}
func NewMapStorage() (*MapStorage, error) {
@@ -49,11 +51,15 @@ func NewMapStorageJson() (*MapStorage, error) {
func (ms *MapStorage) Close() {}
func (ms *MapStorage) Flush(ignore string) error {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
ms.dict = make(map[string][]byte)
return nil
}
func (ms *MapStorage) GetKeysForPrefix(prefix string, skipCache bool) ([]string, error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
if skipCache {
keysForPrefix := make([]string, 0)
for key := range ms.dict {
@@ -256,6 +262,8 @@ func (ms *MapStorage) cacheAccounting(alsKeys []string) error {
// Used to check if specific subject is stored using prefix key attached to entity
func (ms *MapStorage) HasData(categ, subject string) (bool, error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
switch categ {
case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX:
_, exists := ms.dict[categ+subject]
@@ -265,6 +273,8 @@ func (ms *MapStorage) HasData(categ, subject string) (bool, error) {
}
func (ms *MapStorage) GetRatingPlan(key string, skipCache bool) (rp *RatingPlan, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
key = utils.RATING_PLAN_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -294,6 +304,8 @@ func (ms *MapStorage) GetRatingPlan(key string, skipCache bool) (rp *RatingPlan,
}
func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(rp)
var b bytes.Buffer
w := zlib.NewWriter(&b)
@@ -308,6 +320,8 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) {
}
func (ms *MapStorage) GetRatingProfile(key string, skipCache bool) (rpf *RatingProfile, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
key = utils.RATING_PROFILE_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -328,6 +342,8 @@ func (ms *MapStorage) GetRatingProfile(key string, skipCache bool) (rpf *RatingP
}
func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(rpf)
ms.dict[utils.RATING_PROFILE_PREFIX+rpf.Id] = result
response := 0
@@ -338,6 +354,8 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
}
func (ms *MapStorage) RemoveRatingProfile(key string) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
for k := range ms.dict {
if strings.HasPrefix(k, key) {
delete(ms.dict, key)
@@ -353,6 +371,8 @@ func (ms *MapStorage) RemoveRatingProfile(key string) (err error) {
}
func (ms *MapStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
key = utils.LCR_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -371,12 +391,16 @@ func (ms *MapStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error) {
}
func (ms *MapStorage) SetLCR(lcr *LCR) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(lcr)
ms.dict[utils.LCR_PREFIX+lcr.GetId()] = result
return
}
func (ms *MapStorage) GetDestination(key string) (dest *Destination, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
key = utils.DESTINATION_PREFIX + key
if values, ok := ms.dict[key]; ok {
b := bytes.NewBuffer(values)
@@ -402,6 +426,8 @@ func (ms *MapStorage) GetDestination(key string) (dest *Destination, err error)
}
func (ms *MapStorage) SetDestination(dest *Destination) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(dest)
var b bytes.Buffer
w := zlib.NewWriter(&b)
@@ -416,6 +442,8 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) {
}
func (ms *MapStorage) GetActions(key string, skipCache bool) (as Actions, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
key = utils.ACTION_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -434,12 +462,16 @@ func (ms *MapStorage) GetActions(key string, skipCache bool) (as Actions, err er
}
func (ms *MapStorage) SetActions(key string, as Actions) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(&as)
ms.dict[utils.ACTION_PREFIX+key] = result
return
}
func (ms *MapStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGroup, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
key = utils.SHARED_GROUP_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -460,12 +492,16 @@ func (ms *MapStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGrou
}
func (ms *MapStorage) SetSharedGroup(sg *SharedGroup) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(sg)
ms.dict[utils.SHARED_GROUP_PREFIX+sg.Id] = result
return
}
func (ms *MapStorage) GetAccount(key string) (ub *Account, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
if values, ok := ms.dict[utils.ACCOUNT_PREFIX+key]; ok {
ub = &Account{ID: key}
err = ms.ms.Unmarshal(values, ub)
@@ -488,17 +524,23 @@ func (ms *MapStorage) SetAccount(ub *Account) (err error) {
ub = ac
}
}
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(ub)
ms.dict[utils.ACCOUNT_PREFIX+ub.ID] = result
return
}
func (ms *MapStorage) RemoveAccount(key string) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
delete(ms.dict, utils.ACCOUNT_PREFIX+key)
return
}
func (ms *MapStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
if values, ok := ms.dict[utils.CDR_STATS_QUEUE_PREFIX+key]; ok {
sq = &StatsQueue{}
err = ms.ms.Unmarshal(values, sq)
@@ -509,12 +551,16 @@ func (ms *MapStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) {
}
func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(sq)
ms.dict[utils.CDR_STATS_QUEUE_PREFIX+sq.GetId()] = result
return
}
func (ms *MapStorage) GetSubscribers() (result map[string]*SubscriberData, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
result = make(map[string]*SubscriberData)
for key, value := range ms.dict {
if strings.HasPrefix(key, utils.PUBSUB_SUBSCRIBERS_PREFIX) {
@@ -527,17 +573,23 @@ func (ms *MapStorage) GetSubscribers() (result map[string]*SubscriberData, err e
return
}
func (ms *MapStorage) SetSubscriber(key string, sub *SubscriberData) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(sub)
ms.dict[utils.PUBSUB_SUBSCRIBERS_PREFIX+key] = result
return
}
func (ms *MapStorage) RemoveSubscriber(key string) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
delete(ms.dict, utils.PUBSUB_SUBSCRIBERS_PREFIX+key)
return
}
func (ms *MapStorage) SetUser(up *UserProfile) error {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(up)
if err != nil {
return err
@@ -546,6 +598,8 @@ func (ms *MapStorage) SetUser(up *UserProfile) error {
return nil
}
func (ms *MapStorage) GetUser(key string) (up *UserProfile, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
up = &UserProfile{}
if values, ok := ms.dict[utils.USERS_PREFIX+key]; ok {
err = ms.ms.Unmarshal(values, &up)
@@ -556,6 +610,8 @@ func (ms *MapStorage) GetUser(key string) (up *UserProfile, err error) {
}
func (ms *MapStorage) GetUsers() (result []*UserProfile, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
for key, value := range ms.dict {
if strings.HasPrefix(key, utils.USERS_PREFIX) {
up := &UserProfile{}
@@ -568,11 +624,15 @@ func (ms *MapStorage) GetUsers() (result []*UserProfile, err error) {
}
func (ms *MapStorage) RemoveUser(key string) error {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
delete(ms.dict, utils.USERS_PREFIX+key)
return nil
}
func (ms *MapStorage) SetAlias(al *Alias) error {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(al.Values)
if err != nil {
return err
@@ -582,6 +642,8 @@ func (ms *MapStorage) SetAlias(al *Alias) error {
}
func (ms *MapStorage) GetAlias(key string, skipCache bool) (al *Alias, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
key = utils.ALIASES_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -607,6 +669,8 @@ func (ms *MapStorage) GetAlias(key string, skipCache bool) (al *Alias, err error
}
func (ms *MapStorage) RemoveAlias(key string) error {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
al := &Alias{}
al.SetId(key)
key = utils.ALIASES_PREFIX + key
@@ -622,14 +686,20 @@ func (ms *MapStorage) RemoveAlias(key string) error {
}
func (ms *MapStorage) GetLoadHistory(limitItems int, skipCache bool) ([]*LoadInstance, error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
return nil, nil
}
func (ms *MapStorage) AddLoadHistory(*LoadInstance, int) error {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
return nil
}
func (ms *MapStorage) GetActionTriggers(key string) (atrs ActionTriggers, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
if values, ok := ms.dict[utils.ACTION_TRIGGER_PREFIX+key]; ok {
err = ms.ms.Unmarshal(values, &atrs)
} else {
@@ -639,6 +709,8 @@ func (ms *MapStorage) GetActionTriggers(key string) (atrs ActionTriggers, err er
}
func (ms *MapStorage) SetActionTriggers(key string, atrs ActionTriggers) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
if len(atrs) == 0 {
// delete the key
delete(ms.dict, utils.ACTION_TRIGGER_PREFIX+key)
@@ -650,6 +722,8 @@ func (ms *MapStorage) SetActionTriggers(key string, atrs ActionTriggers) (err er
}
func (ms *MapStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPlan, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
key = utils.ACTION_PLAN_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -669,6 +743,8 @@ func (ms *MapStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPlan
func (ms *MapStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool) (err error) {
if len(ats.ActionTimings) == 0 {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
// delete the key
delete(ms.dict, utils.ACTION_PLAN_PREFIX+key)
cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key)
@@ -685,12 +761,16 @@ func (ms *MapStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool)
}
}
}
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(&ats)
ms.dict[utils.ACTION_PLAN_PREFIX+key] = result
return
}
func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
apls, err := cache2go.GetAllEntries(utils.ACTION_PLAN_PREFIX)
if err != nil {
return nil, err
@@ -706,6 +786,8 @@ func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error
}
func (ms *MapStorage) PushTask(t *Task) error {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(t)
if err != nil {
return err
@@ -715,6 +797,8 @@ func (ms *MapStorage) PushTask(t *Task) error {
}
func (ms *MapStorage) PopTask() (t *Task, err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
if len(ms.tasks) > 0 {
var values []byte
values, ms.tasks = ms.tasks[0], ms.tasks[1:]
@@ -727,6 +811,8 @@ func (ms *MapStorage) PopTask() (t *Task, err error) {
}
func (ms *MapStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
key = utils.DERIVEDCHARGERS_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -745,6 +831,8 @@ func (ms *MapStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils
}
func (ms *MapStorage) SetDerivedChargers(key string, dcs *utils.DerivedChargers) error {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
if dcs == nil || len(dcs.Chargers) == 0 {
delete(ms.dict, utils.DERIVEDCHARGERS_PREFIX+key)
cache2go.RemKey(utils.DERIVEDCHARGERS_PREFIX + key)
@@ -756,12 +844,16 @@ func (ms *MapStorage) SetDerivedChargers(key string, dcs *utils.DerivedChargers)
}
func (ms *MapStorage) SetCdrStats(cs *CdrStats) error {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(cs)
ms.dict[utils.CDR_STATS_PREFIX+cs.Id] = result
return err
}
func (ms *MapStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
if values, ok := ms.dict[utils.CDR_STATS_PREFIX+key]; ok {
err = ms.ms.Unmarshal(values, &cs)
} else {
@@ -771,6 +863,8 @@ func (ms *MapStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
}
func (ms *MapStorage) GetAllCdrStats() (css []*CdrStats, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
for key, value := range ms.dict {
if !strings.HasPrefix(key, utils.CDR_STATS_PREFIX) {
continue
@@ -783,12 +877,16 @@ func (ms *MapStorage) GetAllCdrStats() (css []*CdrStats, err error) {
}
func (ms *MapStorage) SetSMCost(smCost *SMCost) error {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
result, err := ms.ms.Marshal(smCost)
ms.dict[utils.LOG_CALL_COST_PREFIX+smCost.CostSource+smCost.RunID+"_"+smCost.CGRID] = result
return err
}
func (ms *MapStorage) GetSMCost(cgrid, source, runid, originHost, originID string) (smCost *SMCost, err error) {
+ ms.mu.RLock()
+ defer ms.mu.RUnlock()
if values, ok := ms.dict[utils.LOG_CALL_COST_PREFIX+source+runid+"_"+cgrid]; ok {
err = ms.ms.Unmarshal(values, &smCost)
} else {
@@ -798,6 +896,8 @@ func (ms *MapStorage) GetSMCost(cgrid, source, runid, originHost, originID strin
}
func (ms *MapStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
mat, err := ms.ms.Marshal(at)
if err != nil {
return
@@ -811,6 +911,8 @@ func (ms *MapStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, a
}
func (ms *MapStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) {
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
mat, err := ms.ms.Marshal(at)
if err != nil {
return
diff --git a/engine/storage_test.go b/engine/storage_test.go
index cdb9d6802..eb007243b 100644
--- a/engine/storage_test.go
+++ b/engine/storage_test.go
@@ -274,7 +274,7 @@ func TestDifferentUuid(t *testing.T) {
func TestStorageTask(t *testing.T) {
// clean previous unused tasks
- for i := 0; i < 18; i++ {
+ for i := 0; i < 19; i++ {
ratingStorage.PopTask()
}
@@ -303,7 +303,7 @@ func TestStorageTask(t *testing.T) {
t.Error("Error poping task: ", task, err)
}
if task, err := ratingStorage.PopTask(); err == nil && task != nil {
- t.Errorf("Error poping task %+v, %v: ", task, err)
+ t.Errorf("Error poping task %+v, %v ", task, err)
}
}
diff --git a/general_tests/tp_it_test.go b/general_tests/tp_it_test.go
new file mode 100644
index 000000000..3eef405e1
--- /dev/null
+++ b/general_tests/tp_it_test.go
@@ -0,0 +1,119 @@
+package general_tests
+
+import (
+ "net/rpc"
+ "net/rpc/jsonrpc"
+ "path"
+ "testing"
+ "time"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+var tpCfgPath string
+var tpCfg *config.CGRConfig
+var tpRPC *rpc.Client
+var tpLoadInst engine.LoadInstance // Share load information between tests
+
+func TestTpInitCfg(t *testing.T) {
+ if !*testIntegration {
+ return
+ }
+ tpCfgPath = path.Join(*dataDir, "conf", "samples", "tutlocal")
+ // Init config first
+ var err error
+ tpCfg, err = config.NewCGRConfigFromFolder(tpCfgPath)
+ if err != nil {
+ t.Error(err)
+ }
+ tpCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush()
+ config.SetCgrConfig(tpCfg)
+}
+
+// Remove data in both rating and accounting db
+func TestTpResetDataDb(t *testing.T) {
+ if !*testIntegration {
+ return
+ }
+ if err := engine.InitDataDb(tpCfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Wipe out the cdr database
+func TestTpResetStorDb(t *testing.T) {
+ if !*testIntegration {
+ return
+ }
+ if err := engine.InitStorDb(tpCfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Start CGR Engine
+func TestTpStartEngine(t *testing.T) {
+ if !*testIntegration {
+ return
+ }
+ if _, err := engine.StopStartEngine(tpCfgPath, *waitRater); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Connect rpc client to rater
+func TestTpRpcConn(t *testing.T) {
+ if !*testIntegration {
+ return
+ }
+ var err error
+ tpRPC, err = jsonrpc.Dial("tcp", tpCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Load the tariff plan, creating accounts and their balances
+func TestTpLoadTariffPlanFromFolder(t *testing.T) {
+ if !*testIntegration {
+ return
+ }
+ attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "testtp")}
+ if err := tpRPC.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &tpLoadInst); err != nil {
+ t.Error(err)
+ } else if tpLoadInst.LoadId == "" {
+ t.Error("Empty loadId received, loadInstance: ", tpLoadInst)
+ }
+ time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
+}
+
+func TestTpBalanceCounter(t *testing.T) {
+ if !*testIntegration {
+ return
+ }
+ tStart := time.Date(2016, 3, 31, 0, 0, 0, 0, time.UTC)
+ cd := engine.CallDescriptor{
+ Direction: "*out",
+ Category: "call",
+ Tenant: "cgrates.org",
+ Subject: "1001",
+ Destination: "+49",
+ DurationIndex: 0,
+ TimeStart: tStart,
+ TimeEnd: tStart.Add(time.Duration(20) * time.Second),
+ }
+ var cc engine.CallCost
+ if err := tpRPC.Call("Responder.Debit", cd, &cc); err != nil {
+ t.Error("Got error on Responder.GetCost: ", err.Error())
+ } else if cc.GetDuration() == 20 {
+ t.Errorf("Calling Responder.MaxDebit got callcost: %v", cc.GetDuration())
+ }
+ var acnt *engine.Account
+ attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"}
+ if err := tpRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
+ t.Error("Got error on ApierV2.GetAccount: ", err.Error())
+ } else if acnt.UnitCounters[utils.MONETARY][1].Counters[0].Value != 20.0 {
+ t.Errorf("Calling ApierV2.GetBalance received: %s", utils.ToIJSON(acnt))
+ }
+}
diff --git a/sessionmanager/data_it_test.go b/sessionmanager/data_it_test.go
index b281c2985..a31f1048d 100644
--- a/sessionmanager/data_it_test.go
+++ b/sessionmanager/data_it_test.go
@@ -342,3 +342,45 @@ func TestSMGDataLastUsedMultipleData(t *testing.T) {
t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue())
}
}
+
+func TestSMGDataDerivedChargingNoCredit(t *testing.T) {
+ if !*testIntegration {
+ return
+ }
+ var acnt *engine.Account
+ attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1011"}
+ eAcntVal := 50000.000000
+ if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
+ t.Error(err)
+ } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal {
+ t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue())
+ }
+ smgEv := SMGenericEvent{
+ utils.EVENT_NAME: "TEST_EVENT",
+ utils.TOR: utils.VOICE,
+ utils.ACCID: "12349",
+ utils.DIRECTION: utils.OUT,
+ utils.ACCOUNT: "1011",
+ utils.SUBJECT: "1011",
+ utils.DESTINATION: "+49",
+ utils.CATEGORY: "call",
+ utils.TENANT: "cgrates.org",
+ utils.REQTYPE: utils.META_PREPAID,
+ utils.SETUP_TIME: "2016-01-05 18:30:49",
+ utils.ANSWER_TIME: "2016-01-05 18:31:05",
+ utils.USAGE: "100",
+ }
+ var maxUsage float64
+ if err := smgRPC.Call("SMGenericV1.SessionStart", smgEv, &maxUsage); err != nil {
+ t.Error(err)
+ }
+ if maxUsage != 100 {
+ t.Error("Bad max usage: ", maxUsage)
+ }
+ eAcntVal = 50000.000000
+ if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
+ t.Error(err)
+ } else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal {
+ t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue())
+ }
+}
diff --git a/sessionmanager/sessions.go b/sessionmanager/sessions.go
index 65e4a25c1..690a9127a 100644
--- a/sessionmanager/sessions.go
+++ b/sessionmanager/sessions.go
@@ -28,7 +28,7 @@ import (
func NewSessions() *Sessions {
return &Sessions{
sessionsMux: new(sync.Mutex),
- guard: engine.NewGuardianLock(),
+ guard: engine.Guardian,
}
}
diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go
index ee8b6e797..3ebf05e63 100644
--- a/sessionmanager/smgeneric.go
+++ b/sessionmanager/smgeneric.go
@@ -35,7 +35,7 @@ var ErrPartiallyExecuted = errors.New("Partially executed")
func NewSMGeneric(cgrCfg *config.CGRConfig, rater engine.Connector, cdrsrv engine.Connector, timezone string, extconns *SMGExternalConnections) *SMGeneric {
gsm := &SMGeneric{cgrCfg: cgrCfg, rater: rater, cdrsrv: cdrsrv, extconns: extconns, timezone: timezone,
- sessions: make(map[string][]*SMGSession), sessionsMux: new(sync.Mutex), guard: engine.NewGuardianLock()}
+ sessions: make(map[string][]*SMGSession), sessionsMux: new(sync.Mutex), guard: engine.Guardian}
return gsm
}
@@ -206,14 +206,6 @@ func (self *SMGeneric) GetLcrSuppliers(gev SMGenericEvent, clnt *rpc2.Client) ([
return lcr.SuppliersSlice()
}
-// Called on session start
-func (self *SMGeneric) SessionStart(gev SMGenericEvent, clnt *rpc2.Client) (time.Duration, error) {
- if err := self.sessionStart(gev, getClientConnId(clnt)); err != nil {
- return nilDuration, err
- }
- return self.SessionUpdate(gev, clnt)
-}
-
// Execute debits for usage/maxUsage
func (self *SMGeneric) SessionUpdate(gev SMGenericEvent, clnt *rpc2.Client) (time.Duration, error) {
if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil {
@@ -246,6 +238,19 @@ func (self *SMGeneric) SessionUpdate(gev SMGenericEvent, clnt *rpc2.Client) (tim
return evMaxUsage, nil
}
+// Called on session start
+func (self *SMGeneric) SessionStart(gev SMGenericEvent, clnt *rpc2.Client) (time.Duration, error) {
+ if err := self.sessionStart(gev, getClientConnId(clnt)); err != nil {
+ self.sessionEnd(gev.GetUUID(), 0)
+ return nilDuration, err
+ }
+ d, err := self.SessionUpdate(gev, clnt)
+ if err != nil {
+ self.sessionEnd(gev.GetUUID(), 0)
+ }
+ return d, err
+}
+
// Called on session end, should stop debit loop
func (self *SMGeneric) SessionEnd(gev SMGenericEvent, clnt *rpc2.Client) error {
if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil {
diff --git a/utils/httpclient.go b/utils/httpclient.go
index 0fad55afb..169067914 100644
--- a/utils/httpclient.go
+++ b/utils/httpclient.go
@@ -22,7 +22,6 @@ import (
"bytes"
"crypto/tls"
"encoding/gob"
- "encoding/json"
"fmt"
"io/ioutil"
"net/http"
@@ -49,16 +48,12 @@ func GetBytes(content interface{}) ([]byte, error) {
}
// Post without automatic failover
-func HttpJsonPost(url string, skipTlsVerify bool, content interface{}) ([]byte, error) {
- body, err := json.Marshal(content)
- if err != nil {
- return nil, err
- }
+func HttpJsonPost(url string, skipTlsVerify bool, content []byte) ([]byte, error) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTlsVerify},
}
client := &http.Client{Transport: tr}
- resp, err := client.Post(url, "application/json", bytes.NewBuffer(body))
+ resp, err := client.Post(url, "application/json", bytes.NewBuffer(content))
if err != nil {
return nil, err
}