Merge fix

This commit is contained in:
DanB
2016-03-31 13:57:06 +02:00
27 changed files with 403 additions and 104 deletions

View File

@@ -156,7 +156,7 @@ BYE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|14
}}
cdrc := &Cdrc{CdrFormat: utils.OSIPS_FLATSTORE, cdrSourceIds: []string{"TEST_CDRC"}, failedCallsPrefix: "missed_calls",
cdrFields: cdrFields, partialRecords: make(map[string]map[string]*PartialFlatstoreRecord),
guard: engine.NewGuardianLock()}
guard: engine.Guardian}
cdrsContent := bytes.NewReader([]byte(flatstoreCdrs))
csvReader := csv.NewReader(cdrsContent)
csvReader.Comma = '|'
@@ -283,7 +283,7 @@ INVITE|324cb497|d4af7023|8deaadf2ae9a17809a391f05af31afb0@0:0:0:0:0:0:0:0|486|Bu
}}
cdrc := &Cdrc{CdrFormat: utils.OSIPS_FLATSTORE, cdrSourceIds: []string{"TEST_CDRC"}, failedCallsPrefix: "missed_calls",
cdrFields: cdrFields, partialRecords: make(map[string]map[string]*PartialFlatstoreRecord),
guard: engine.NewGuardianLock()}
guard: engine.Guardian}
cdrsContent := bytes.NewReader([]byte(flatstoreCdrs))
csvReader := csv.NewReader(cdrsContent)
csvReader.Comma = '|'

View File

@@ -20,6 +20,7 @@ package cdrc
import (
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"os"
@@ -93,7 +94,7 @@ func pairToRecord(part1, part2 *PartialFlatstoreRecord) ([]string, error) {
func NewPartialRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune) (*PartialRecordsCache, error) {
return &PartialRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep,
partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.NewGuardianLock()}, nil
partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.Guardian}, nil
}
type PartialRecordsCache struct {
@@ -323,7 +324,12 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcId strin
for _, rsrFld := range httpFieldCfg.Value {
httpAddr += rsrFld.ParseValue("")
}
if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, storedCdr); err != nil && httpFieldCfg.Mandatory {
var jsn []byte
jsn, err = json.Marshal(storedCdr)
if err != nil {
return nil, err
}
if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
return nil, err
} else {
fieldVal = string(outValByte)

View File

@@ -20,16 +20,18 @@ package cdrc
import (
"bufio"
"encoding/json"
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"io"
"net/http"
"os"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func fwvValue(cdrLine string, indexStart, width int, padding string) string {
@@ -214,7 +216,12 @@ func (self *FwvRecordsProcessor) recordToStoredCdr(record string, cfgKey string)
for _, rsrFld := range httpFieldCfg.Value {
httpAddr += rsrFld.ParseValue("")
}
if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, storedCdr); err != nil && httpFieldCfg.Mandatory {
var jsn []byte
jsn, err = json.Marshal(storedCdr)
if err != nil {
return nil, err
}
if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, jsn); err != nil && httpFieldCfg.Mandatory {
return nil, err
} else {
fieldVal = string(outValByte)

View File

@@ -352,9 +352,13 @@ func (cdre *CdrExporter) processCdr(cdr *engine.CDR) error {
case utils.META_HTTP_POST:
var outValByte []byte
httpAddr := cfgFld.Value.Id()
jsn, err := json.Marshal(cdr)
if err != nil {
return err
}
if len(httpAddr) == 0 {
err = fmt.Errorf("Empty http address for field %s type %s", cfgFld.Tag, cfgFld.Type)
} else if outValByte, err = utils.HttpJsonPost(httpAddr, cdre.httpSkipTlsCheck, cdr); err == nil {
} else if outValByte, err = utils.HttpJsonPost(httpAddr, cdre.httpSkipTlsCheck, jsn); err == nil {
outVal = string(outValByte)
if len(outVal) == 0 && cfgFld.Mandatory {
err = fmt.Errorf("Empty result for http_post field: %s", cfgFld.Tag)

View File

@@ -153,7 +153,6 @@ func durRemoteRater(cd *engine.CallDescriptor) (time.Duration, error) {
func main() {
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU() - 1)
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)

View File

@@ -6,3 +6,4 @@ cgrates.org,1004,PREPAID_10,STANDARD_TRIGGERS,,
cgrates.org,1005,PREPAID_10,STANDARD_TRIGGERS,,
cgrates.org,1009,TEST_EXE,,,
cgrates.org,1010,TEST_DATA_r,,true,
cgrates.org,1011,TEST_VOICE,,,
1 #Tenant Account ActionPlanId ActionTriggersId AllowNegative Disabled
6 cgrates.org 1005 PREPAID_10 STANDARD_TRIGGERS
7 cgrates.org 1009 TEST_EXE
8 cgrates.org 1010 TEST_DATA_r true
9 cgrates.org 1011 TEST_VOICE

View File

@@ -2,4 +2,5 @@
PREPAID_10,PREPAID_10,ASAP,10
PREPAID_10,BONUS_1,ASAP,10
TEST_EXE,TOPUP_EXE,ALWAYS,10
TEST_DATA_r,TOPUP_DATA_r,ASAP,10
TEST_DATA_r,TOPUP_DATA_r,ASAP,10
TEST_VOICE,TOPUP_VOICE,ASAP,10
1 #Tag ActionsTag TimingTag Weight
2 PREPAID_10 PREPAID_10 ASAP 10
3 PREPAID_10 BONUS_1 ASAP 10
4 TEST_EXE TOPUP_EXE ALWAYS 10
5 TEST_DATA_r TOPUP_DATA_r ASAP 10
6 TEST_VOICE TOPUP_VOICE ASAP 10

View File

@@ -2,6 +2,7 @@
STANDARD_TRIGGERS,,*min_balance,2,false,0,,,,*monetary,*out,,,,,,,,,,,LOG_BALANCE,10
STANDARD_TRIGGERS,,*max_balance,20,false,0,,,,*monetary,*out,,,,,,,,,,,LOG_BALANCE,10
STANDARD_TRIGGERS,,*max_event_counter,15,false,0,,,,*monetary,*out,,FS_USERS,,,,,,,,,LOG_BALANCE,10
STANDARD_TRIGGERS,,*max_balance_counter,1,false,0,,,,*monetary,*out,,,,,,,,,,,LOG_BALANCE,10
CDRST1_WARN_ASR,,*min_asr,45,true,1h,,,,,,,,,,,,,,,3,CDRST_WARN_HTTP,10
CDRST1_WARN_ACD,,*min_acd,10,true,1h,,,,,,,,,,,,,,,5,CDRST_WARN_HTTP,10
CDRST1_WARN_ACC,,*max_acc,10,true,10m,,,,,,,,,,,,,,,5,CDRST_WARN_HTTP,10
1 #Tag[0] UniqueId[1] ThresholdType[2] ThresholdValue[3] Recurrent[4] MinSleep[5] ExpiryTime[6] ActivationTime[7] BalanceTag[8] BalanceType[9] BalanceDirections[10] BalanceCategories[11] BalanceDestinationIds[12] BalanceRatingSubject[13] BalanceSharedGroup[14] BalanceExpiryTime[15] BalanceTimingIds[16] BalanceWeight[17] BalanceBlocker[18] BalanceDisabled[19] StatsMinQueuedItems[20] ActionsId[21] Weight[22]
2 STANDARD_TRIGGERS *min_balance 2 false 0 *monetary *out LOG_BALANCE 10
3 STANDARD_TRIGGERS *max_balance 20 false 0 *monetary *out LOG_BALANCE 10
4 STANDARD_TRIGGERS *max_event_counter 15 false 0 *monetary *out FS_USERS LOG_BALANCE 10
5 STANDARD_TRIGGERS *max_balance_counter 1 false 0 *monetary *out LOG_BALANCE 10
6 CDRST1_WARN_ASR *min_asr 45 true 1h 3 CDRST_WARN_HTTP 10
7 CDRST1_WARN_ACD *min_acd 10 true 1h 5 CDRST_WARN_HTTP 10
8 CDRST1_WARN_ACC *max_acc 10 true 10m 5 CDRST_WARN_HTTP 10

View File

@@ -6,4 +6,5 @@ CDRST_WARN_HTTP,*call_url,http://localhost:8080,,,,,,,,,,,,,false,false,10
CDRST_LOG,*log,,,,,,,,,,,,,,false,false,10
TOPUP_EXE,*topup,,,,*monetary,*out,,*any,,,*unlimited,,5,10,false,false,10
TOPUP_DATA_r,*topup,,,,*monetary,*out,,DATA_DEST,,,*unlimited,,5000000,10,false,false,10
TOPUP_DATA_r,*topup,,,,*data,*out,,DATA_DEST,datar,,*unlimited,,50000000000,10,false,false,10
TOPUP_DATA_r,*topup,,,,*data,*out,,DATA_DEST,datar,,*unlimited,,50000000000,10,false,false,10
TOPUP_VOICE,*topup,,,,*voice,*out,,GERMANY_MOBILE,,,*unlimited,,50000,10,false,false,10
1 #ActionsTag[0] Action[1] ActionExtraParameters[2] Filter[3] BalanceTag[4] BalanceType[5] Directions[6] Categories[7] DestinationIds[8] RatingSubject[9] SharedGroup[10] ExpiryTime[11] TimingTags[12] Units[13] BalanceWeight[14] BalanceBlocker[15] BalanceDisabled[16] Weight[17]
6 CDRST_LOG *log false false 10
7 TOPUP_EXE *topup *monetary *out *any *unlimited 5 10 false false 10
8 TOPUP_DATA_r *topup *monetary *out DATA_DEST *unlimited 5000000 10 false false 10
9 TOPUP_DATA_r *topup *data *out DATA_DEST datar *unlimited 50000000000 10 false false 10
10 TOPUP_VOICE *topup *voice *out GERMANY_MOBILE *unlimited 50000 10 false false 10

View File

@@ -3,3 +3,4 @@
*out,cgrates.org,call,dan,dan,,extra2,,,,,,^ivo,^ivo,,,,,,*default,*default,*default,*default
*out,cgrates.org,call,dan,dan,,extra3,~filterhdr1:s/(.+)/special_run3/,,,,,^runusr3,^runusr3,,,,,,*default,*default,*default,*default
*out,cgrates.org,call,dan,*any,,extra1,,,,,,^rif2,^rif2,,,,,,*default,*default,*default,*default
*out,cgrates.org,call,1011,1011,GERMANY,extra1,,,+4915,,,,,,,,,,*default,*default,*default,*default
1 #Direction[0] Tenant[1] Category[2] Account[3] Subject[4] DestinationIds[5] RunId[6] RunFilter[7] ReqTypeField[8] DirectionField[9] TenantField[10] CategoryField[11] AccountField[12] SubjectField[13] DestinationField[14] SetupTimeField[15] PddField[16] AnswerTimeField[17] UsageField[18] SupplierField[19] DisconnectCause[20] RatedField[21] CostField[22]
3 *out cgrates.org call dan dan extra2 ^ivo ^ivo *default *default *default *default
4 *out cgrates.org call dan dan extra3 ~filterhdr1:s/(.+)/special_run3/ ^runusr3 ^runusr3 *default *default *default *default
5 *out cgrates.org call dan *any extra1 ^rif2 ^rif2 *default *default *default *default
6 *out cgrates.org call 1011 1011 GERMANY extra1 +4915 *default *default *default *default

View File

@@ -287,7 +287,7 @@ func (ub *Account) getBalancesForPrefix(prefix, category, direction, tor string,
if b.Disabled {
continue
}
if b.IsExpired() || (len(b.SharedGroups) == 0 && b.GetValue() <= 0) {
if b.IsExpired() || (len(b.SharedGroups) == 0 && b.GetValue() <= 0 && !b.Blocker) {
continue
}
if sharedGroup != "" && b.SharedGroups[sharedGroup] == false {

View File

@@ -370,7 +370,7 @@ func resetCountersAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Ac
}
func genericMakeNegative(a *Action) {
if a.Balance != nil && a.Balance.GetValue() >= 0 { // only apply if not allready negative
if a.Balance != nil && a.Balance.GetValue() > 0 { // only apply if not allready negative
a.Balance.SetValue(-a.Balance.GetValue())
}
}

View File

@@ -188,7 +188,11 @@ func (cd *CallDescriptor) getAccount() (ub *Account, err error) {
cd.account, err = accountingStorage.GetAccount(cd.GetAccountKey())
}
if cd.account != nil && cd.account.Disabled {
return nil, fmt.Errorf("User %s is disabled", cd.account.ID)
return nil, utils.ErrAccountDisabled
}
if err != nil || cd.account == nil {
utils.Logger.Warning(fmt.Sprintf("Account: %s, not found (%v)", cd.GetAccountKey(), err))
return nil, utils.ErrAccountNotFound
}
return cd.account, err
}
@@ -635,13 +639,9 @@ func (origCD *CallDescriptor) getMaxSessionDuration(origAcc *Account) (time.Dura
func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err error) {
cd.account = nil // make sure it's not cached
if account, err := cd.getAccount(); err != nil || account == nil {
utils.Logger.Err(fmt.Sprintf("Account: %s, not found", cd.GetAccountKey()))
return 0, utils.ErrAccountNotFound
if account, err := cd.getAccount(); err != nil {
return 0, err
} else {
if account.Disabled {
return 0, utils.ErrAccountDisabled
}
if memberIds, err := account.GetUniqueSharedGroupMembers(cd); err == nil {
if _, err := Guardian.Guard(func() (interface{}, error) {
duration, err = cd.getMaxSessionDuration(account)
@@ -705,13 +705,9 @@ func (cd *CallDescriptor) debit(account *Account, dryRun bool, goNegative bool)
func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
cd.account = nil // make sure it's not cached
// lock all group members
if account, err := cd.getAccount(); err != nil || account == nil {
utils.Logger.Err(fmt.Sprintf("Account: %s, not found", cd.GetAccountKey()))
return nil, utils.ErrAccountNotFound
if account, err := cd.getAccount(); err != nil {
return nil, err
} else {
if account.Disabled {
return nil, utils.ErrAccountDisabled
}
if memberIds, sgerr := account.GetUniqueSharedGroupMembers(cd); sgerr == nil {
_, err = Guardian.Guard(func() (interface{}, error) {
cc, err = cd.debit(account, cd.DryRun, true)
@@ -730,13 +726,9 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
// by the GetMaxSessionDuration method. The amount filed has to be filled in call descriptor.
func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
cd.account = nil // make sure it's not cached
if account, err := cd.getAccount(); err != nil || account == nil {
utils.Logger.Err(fmt.Sprintf("Account: %s, not found", cd.GetAccountKey()))
return nil, utils.ErrAccountNotFound
if account, err := cd.getAccount(); err != nil {
return nil, err
} else {
if account.Disabled {
return nil, utils.ErrAccountDisabled
}
//log.Printf("ACC: %+v", account)
if memberIDs, err := account.GetUniqueSharedGroupMembers(cd); err == nil {
_, err = Guardian.Guard(func() (interface{}, error) {

View File

@@ -566,7 +566,7 @@ func TestGetMaxSessiontWithBlocker(t *testing.T) {
MaxCostSoFar: 0,
}
result, err := cd.GetMaxSessionDuration()
expected := 30 * time.Minute
expected := 17 * time.Minute
if result != expected || err != nil {
t.Errorf("Expected %v was %v (%v)", expected, result, err)
}
@@ -588,6 +588,57 @@ func TestGetMaxSessiontWithBlocker(t *testing.T) {
}
}
func TestGetMaxSessiontWithBlockerEmpty(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("BLOCK_EMPTY_AT", false)
for _, at := range ap.ActionTimings {
at.accountIDs = ap.AccountIDs
at.Execute()
}
acc, err := accountingStorage.GetAccount("cgrates.org:block_empty")
if err != nil {
t.Error("error getting account: ", err)
}
if len(acc.BalanceMap[utils.MONETARY]) != 2 ||
acc.BalanceMap[utils.MONETARY][0].Blocker != true {
for _, b := range acc.BalanceMap[utils.MONETARY] {
t.Logf("B: %+v", b)
}
t.Error("Error executing action plan on account: ", acc.BalanceMap[utils.MONETARY])
}
cd := &CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "block",
Account: "block_empty",
Destination: "0723",
TimeStart: time.Date(2016, 1, 13, 14, 0, 0, 0, time.UTC),
TimeEnd: time.Date(2016, 1, 13, 14, 30, 0, 0, time.UTC),
MaxCostSoFar: 0,
}
result, err := cd.GetMaxSessionDuration()
expected := 0 * time.Minute
if result != expected || err != nil {
t.Errorf("Expected %v was %v (%v)", expected, result, err)
}
cd = &CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "block",
Account: "block_empty",
Destination: "444",
TimeStart: time.Date(2016, 1, 13, 14, 0, 0, 0, time.UTC),
TimeEnd: time.Date(2016, 1, 13, 14, 30, 0, 0, time.UTC),
MaxCostSoFar: 0,
}
result, err = cd.GetMaxSessionDuration()
expected = 30 * time.Minute
if result != expected || err != nil {
t.Errorf("Expected %v was %v (%v)", expected, result, err)
}
}
func TestGetCostWithMaxCost(t *testing.T) {
ap, _ := ratingStorage.GetActionPlan("TOPUP10_AT", false)
for _, at := range ap.ActionTimings {

View File

@@ -19,10 +19,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"encoding/json"
"flag"
"github.com/cgrates/cgrates/utils"
"testing"
"time"
"github.com/cgrates/cgrates/utils"
)
// Arguments received via test command
@@ -42,7 +44,8 @@ func TestHttpJsonPost(t *testing.T) {
RunID: utils.DEFAULT_RUNID,
Usage: "0.00000001", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
}
if _, err := utils.HttpJsonPost("http://localhost:8000", false, cdrOut); err == nil {
jsn, _ := json.Marshal(cdrOut)
if _, err := utils.HttpJsonPost("http://localhost:8000", false, jsn); err == nil {
t.Error(err)
}
}

View File

@@ -70,7 +70,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
}
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, pubsub rpcclient.RpcClientConnection, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) {
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{locksMap: make(map[string]chan bool)}}, nil
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: Guardian}, nil
}
type CdrServer struct {

View File

@@ -26,10 +26,6 @@ import (
// global package variable
var Guardian = &GuardianLock{locksMap: make(map[string]chan bool)}
func NewGuardianLock() *GuardianLock {
return &GuardianLock{locksMap: make(map[string]chan bool)}
}
type GuardianLock struct {
locksMap map[string]chan bool
mu sync.RWMutex

View File

@@ -172,8 +172,10 @@ EE0,*topup_reset,,,,*monetary,*out,,,,SG3,*unlimited,,0,10,false,false,10
EE0,*allow_negative,,,,*monetary,*out,,,,,*unlimited,,0,10,false,false,10
DEFEE,*cdrlog,"{""Category"":""^ddi"",""MediationRunId"":""^did_run""}",,,,,,,,,,,,,false,false,10
NEG,*allow_negative,,,,*monetary,*out,,,,,*unlimited,,0,10,false,false,10
BLOCK,*topup,,,bblocker,*monetary,*out,,NAT,,,*unlimited,,10,20,true,false,20
BLOCK,*topup,,,bblocker,*monetary,*out,,NAT,,,*unlimited,,1,20,true,false,20
BLOCK,*topup,,,bfree,*monetary,*out,,,,,*unlimited,,20,10,false,false,10
BLOCK_EMPTY,*topup,,,bblocker,*monetary,*out,,NAT,,,*unlimited,,0,20,true,false,20
BLOCK_EMPTY,*topup,,,bfree,*monetary,*out,,,,,*unlimited,,20,10,false,false,10
FILTER,*topup,,"{""*and"":[{""Value"":{""*lt"":0}},{""Id"":{""*eq"":""*default""}}]}",bfree,*monetary,*out,,,,,*unlimited,,20,10,false,false,10
EXP,*topup,,,,*voice,*out,,,,,*monthly,*any,300,10,false,false,10
NOEXP,*topup,,,,*voice,*out,,,,,*unlimited,*any,50,10,false,false,10
@@ -188,6 +190,7 @@ TOPUP_SHARED10_AT,SE10,*asap,10
TOPUP_EMPTY_AT,EE0,*asap,10
POST_AT,NEG,*asap,10
BLOCK_AT,BLOCK,*asap,10
BLOCK_EMPTY_AT,BLOCK_EMPTY,*asap,10
EXP_AT,EXP,*asap,10
`
@@ -216,6 +219,7 @@ vdf,emptyY,TOPUP_EMPTY_AT,,,
vdf,post,POST_AT,,,
cgrates.org,alodis,TOPUP_EMPTY_AT,,true,true
cgrates.org,block,BLOCK_AT,,false,false
cgrates.org,block_empty,BLOCK_EMPTY_AT,,false,false
cgrates.org,expo,EXP_AT,,false,false
cgrates.org,expnoexp,,,false,false
`
@@ -820,7 +824,7 @@ func TestLoadRatingProfiles(t *testing.T) {
}
func TestLoadActions(t *testing.T) {
if len(csvr.actions) != 13 {
if len(csvr.actions) != 14 {
t.Error("Failed to load actions: ", len(csvr.actions))
}
as1 := csvr.actions["MINI"]
@@ -1006,7 +1010,7 @@ func TestLoadLCRs(t *testing.T) {
}
func TestLoadActionTimings(t *testing.T) {
if len(csvr.actionPlans) != 8 {
if len(csvr.actionPlans) != 9 {
t.Error("Failed to load action timings: ", len(csvr.actionPlans))
}
atm := csvr.actionPlans["MORE_MINUTES"]
@@ -1101,7 +1105,7 @@ func TestLoadActionTriggers(t *testing.T) {
}
func TestLoadAccountActions(t *testing.T) {
if len(csvr.accountActions) != 14 {
if len(csvr.accountActions) != 15 {
t.Error("Failed to load account actions: ", len(csvr.accountActions))
}
aa := csvr.accountActions["vdf:minitsboy"]

View File

@@ -1,6 +1,7 @@
package engine
import (
"encoding/json"
"errors"
"fmt"
"sync"
@@ -43,7 +44,7 @@ type SubscriberData struct {
type PubSub struct {
subscribers map[string]*SubscriberData
ttlVerify bool
pubFunc func(string, bool, interface{}) ([]byte, error)
pubFunc func(string, bool, []byte) ([]byte, error)
mux *sync.Mutex
accountDb AccountingStorage
}
@@ -140,12 +141,16 @@ func (ps *PubSub) Publish(evt CgrEvent, reply *string) error {
transport := split[0]
address := split[1]
ttlVerify := ps.ttlVerify
jsn, err := json.Marshal(evt)
if err != nil {
return err
}
switch transport {
case utils.META_HTTP_POST:
go func() {
delay := utils.Fib()
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
if _, err := ps.pubFunc(address, ttlVerify, evt); err == nil {
if _, err := ps.pubFunc(address, ttlVerify, jsn); err == nil {
break // Success, no need to reinterate
} else if i == 4 { // Last iteration, syslog the warning
utils.Logger.Warning(fmt.Sprintf("<PubSub> Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), evt["EventName"]))

View File

@@ -116,43 +116,9 @@ func TestUnsubscribeSave(t *testing.T) {
}
}
func TestPublish(t *testing.T) {
ps := NewPubSub(accountingStorage, true)
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
obj.(CgrEvent)["called"] = url
return nil, nil
}
var r string
if err := ps.Subscribe(SubscribeInfo{
EventFilter: "EventName/test",
Transport: utils.META_HTTP_POST,
Address: "url",
LifeSpan: time.Second,
}, &r); err != nil {
t.Error("Error subscribing: ", err)
}
m := make(map[string]string)
m["EventFilter"] = "test"
if err := ps.Publish(m, &r); err != nil {
t.Error("Error publishing: ", err)
}
for i := 0; i < 1000; i++ { // wait for the theread to populate map
if len(m) == 2 {
time.Sleep(time.Microsecond)
} else {
break
}
}
if r, exists := m["called"]; !exists || r != "url" {
t.Error("Error calling publish function: ", m)
}
}
func TestPublishExpired(t *testing.T) {
ps := NewPubSub(accountingStorage, true)
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
m := obj.(map[string]string)
m["called"] = "yes"
ps.pubFunc = func(url string, ttl bool, obj []byte) ([]byte, error) {
return nil, nil
}
var r string
@@ -174,9 +140,7 @@ func TestPublishExpired(t *testing.T) {
func TestPublishExpiredSave(t *testing.T) {
ps := NewPubSub(accountingStorage, true)
ps.pubFunc = func(url string, ttl bool, obj interface{}) ([]byte, error) {
m := obj.(map[string]string)
m["called"] = "yes"
ps.pubFunc = func(url string, ttl bool, obj []byte) ([]byte, error) {
return nil, nil
}
var r string

View File

@@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"sync"
"strings"
"time"
@@ -36,6 +37,7 @@ type MapStorage struct {
dict map[string][]byte
tasks [][]byte
ms Marshaler
mu sync.RWMutex
}
func NewMapStorage() (*MapStorage, error) {
@@ -49,11 +51,15 @@ func NewMapStorageJson() (*MapStorage, error) {
func (ms *MapStorage) Close() {}
func (ms *MapStorage) Flush(ignore string) error {
ms.mu.Lock()
defer ms.mu.Unlock()
ms.dict = make(map[string][]byte)
return nil
}
func (ms *MapStorage) GetKeysForPrefix(prefix string, skipCache bool) ([]string, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if skipCache {
keysForPrefix := make([]string, 0)
for key := range ms.dict {
@@ -256,6 +262,8 @@ func (ms *MapStorage) cacheAccounting(alsKeys []string) error {
// Used to check if specific subject is stored using prefix key attached to entity
func (ms *MapStorage) HasData(categ, subject string) (bool, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
switch categ {
case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX:
_, exists := ms.dict[categ+subject]
@@ -265,6 +273,8 @@ func (ms *MapStorage) HasData(categ, subject string) (bool, error) {
}
func (ms *MapStorage) GetRatingPlan(key string, skipCache bool) (rp *RatingPlan, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key = utils.RATING_PLAN_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -294,6 +304,8 @@ func (ms *MapStorage) GetRatingPlan(key string, skipCache bool) (rp *RatingPlan,
}
func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(rp)
var b bytes.Buffer
w := zlib.NewWriter(&b)
@@ -308,6 +320,8 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) {
}
func (ms *MapStorage) GetRatingProfile(key string, skipCache bool) (rpf *RatingProfile, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key = utils.RATING_PROFILE_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -328,6 +342,8 @@ func (ms *MapStorage) GetRatingProfile(key string, skipCache bool) (rpf *RatingP
}
func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(rpf)
ms.dict[utils.RATING_PROFILE_PREFIX+rpf.Id] = result
response := 0
@@ -338,6 +354,8 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
}
func (ms *MapStorage) RemoveRatingProfile(key string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
for k := range ms.dict {
if strings.HasPrefix(k, key) {
delete(ms.dict, key)
@@ -353,6 +371,8 @@ func (ms *MapStorage) RemoveRatingProfile(key string) (err error) {
}
func (ms *MapStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key = utils.LCR_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -371,12 +391,16 @@ func (ms *MapStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error) {
}
func (ms *MapStorage) SetLCR(lcr *LCR) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(lcr)
ms.dict[utils.LCR_PREFIX+lcr.GetId()] = result
return
}
func (ms *MapStorage) GetDestination(key string) (dest *Destination, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key = utils.DESTINATION_PREFIX + key
if values, ok := ms.dict[key]; ok {
b := bytes.NewBuffer(values)
@@ -402,6 +426,8 @@ func (ms *MapStorage) GetDestination(key string) (dest *Destination, err error)
}
func (ms *MapStorage) SetDestination(dest *Destination) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(dest)
var b bytes.Buffer
w := zlib.NewWriter(&b)
@@ -416,6 +442,8 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) {
}
func (ms *MapStorage) GetActions(key string, skipCache bool) (as Actions, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key = utils.ACTION_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -434,12 +462,16 @@ func (ms *MapStorage) GetActions(key string, skipCache bool) (as Actions, err er
}
func (ms *MapStorage) SetActions(key string, as Actions) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(&as)
ms.dict[utils.ACTION_PREFIX+key] = result
return
}
func (ms *MapStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGroup, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key = utils.SHARED_GROUP_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -460,12 +492,16 @@ func (ms *MapStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGrou
}
func (ms *MapStorage) SetSharedGroup(sg *SharedGroup) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(sg)
ms.dict[utils.SHARED_GROUP_PREFIX+sg.Id] = result
return
}
func (ms *MapStorage) GetAccount(key string) (ub *Account, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if values, ok := ms.dict[utils.ACCOUNT_PREFIX+key]; ok {
ub = &Account{ID: key}
err = ms.ms.Unmarshal(values, ub)
@@ -488,17 +524,23 @@ func (ms *MapStorage) SetAccount(ub *Account) (err error) {
ub = ac
}
}
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(ub)
ms.dict[utils.ACCOUNT_PREFIX+ub.ID] = result
return
}
func (ms *MapStorage) RemoveAccount(key string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
delete(ms.dict, utils.ACCOUNT_PREFIX+key)
return
}
func (ms *MapStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if values, ok := ms.dict[utils.CDR_STATS_QUEUE_PREFIX+key]; ok {
sq = &StatsQueue{}
err = ms.ms.Unmarshal(values, sq)
@@ -509,12 +551,16 @@ func (ms *MapStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) {
}
func (ms *MapStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(sq)
ms.dict[utils.CDR_STATS_QUEUE_PREFIX+sq.GetId()] = result
return
}
func (ms *MapStorage) GetSubscribers() (result map[string]*SubscriberData, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
result = make(map[string]*SubscriberData)
for key, value := range ms.dict {
if strings.HasPrefix(key, utils.PUBSUB_SUBSCRIBERS_PREFIX) {
@@ -527,17 +573,23 @@ func (ms *MapStorage) GetSubscribers() (result map[string]*SubscriberData, err e
return
}
func (ms *MapStorage) SetSubscriber(key string, sub *SubscriberData) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(sub)
ms.dict[utils.PUBSUB_SUBSCRIBERS_PREFIX+key] = result
return
}
func (ms *MapStorage) RemoveSubscriber(key string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
delete(ms.dict, utils.PUBSUB_SUBSCRIBERS_PREFIX+key)
return
}
func (ms *MapStorage) SetUser(up *UserProfile) error {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(up)
if err != nil {
return err
@@ -546,6 +598,8 @@ func (ms *MapStorage) SetUser(up *UserProfile) error {
return nil
}
func (ms *MapStorage) GetUser(key string) (up *UserProfile, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
up = &UserProfile{}
if values, ok := ms.dict[utils.USERS_PREFIX+key]; ok {
err = ms.ms.Unmarshal(values, &up)
@@ -556,6 +610,8 @@ func (ms *MapStorage) GetUser(key string) (up *UserProfile, err error) {
}
func (ms *MapStorage) GetUsers() (result []*UserProfile, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
for key, value := range ms.dict {
if strings.HasPrefix(key, utils.USERS_PREFIX) {
up := &UserProfile{}
@@ -568,11 +624,15 @@ func (ms *MapStorage) GetUsers() (result []*UserProfile, err error) {
}
func (ms *MapStorage) RemoveUser(key string) error {
ms.mu.Lock()
defer ms.mu.Unlock()
delete(ms.dict, utils.USERS_PREFIX+key)
return nil
}
func (ms *MapStorage) SetAlias(al *Alias) error {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(al.Values)
if err != nil {
return err
@@ -582,6 +642,8 @@ func (ms *MapStorage) SetAlias(al *Alias) error {
}
func (ms *MapStorage) GetAlias(key string, skipCache bool) (al *Alias, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key = utils.ALIASES_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -607,6 +669,8 @@ func (ms *MapStorage) GetAlias(key string, skipCache bool) (al *Alias, err error
}
func (ms *MapStorage) RemoveAlias(key string) error {
ms.mu.Lock()
defer ms.mu.Unlock()
al := &Alias{}
al.SetId(key)
key = utils.ALIASES_PREFIX + key
@@ -622,14 +686,20 @@ func (ms *MapStorage) RemoveAlias(key string) error {
}
func (ms *MapStorage) GetLoadHistory(limitItems int, skipCache bool) ([]*LoadInstance, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
return nil, nil
}
func (ms *MapStorage) AddLoadHistory(*LoadInstance, int) error {
ms.mu.Lock()
defer ms.mu.Unlock()
return nil
}
func (ms *MapStorage) GetActionTriggers(key string) (atrs ActionTriggers, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if values, ok := ms.dict[utils.ACTION_TRIGGER_PREFIX+key]; ok {
err = ms.ms.Unmarshal(values, &atrs)
} else {
@@ -639,6 +709,8 @@ func (ms *MapStorage) GetActionTriggers(key string) (atrs ActionTriggers, err er
}
func (ms *MapStorage) SetActionTriggers(key string, atrs ActionTriggers) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
if len(atrs) == 0 {
// delete the key
delete(ms.dict, utils.ACTION_TRIGGER_PREFIX+key)
@@ -650,6 +722,8 @@ func (ms *MapStorage) SetActionTriggers(key string, atrs ActionTriggers) (err er
}
func (ms *MapStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPlan, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key = utils.ACTION_PLAN_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -669,6 +743,8 @@ func (ms *MapStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPlan
func (ms *MapStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool) (err error) {
if len(ats.ActionTimings) == 0 {
ms.mu.Lock()
defer ms.mu.Unlock()
// delete the key
delete(ms.dict, utils.ACTION_PLAN_PREFIX+key)
cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key)
@@ -685,12 +761,16 @@ func (ms *MapStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool)
}
}
}
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(&ats)
ms.dict[utils.ACTION_PLAN_PREFIX+key] = result
return
}
func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
apls, err := cache2go.GetAllEntries(utils.ACTION_PLAN_PREFIX)
if err != nil {
return nil, err
@@ -706,6 +786,8 @@ func (ms *MapStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err error
}
func (ms *MapStorage) PushTask(t *Task) error {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(t)
if err != nil {
return err
@@ -715,6 +797,8 @@ func (ms *MapStorage) PushTask(t *Task) error {
}
func (ms *MapStorage) PopTask() (t *Task, err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
if len(ms.tasks) > 0 {
var values []byte
values, ms.tasks = ms.tasks[0], ms.tasks[1:]
@@ -727,6 +811,8 @@ func (ms *MapStorage) PopTask() (t *Task, err error) {
}
func (ms *MapStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils.DerivedChargers, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key = utils.DERIVEDCHARGERS_PREFIX + key
if !skipCache {
if x, err := cache2go.Get(key); err == nil {
@@ -745,6 +831,8 @@ func (ms *MapStorage) GetDerivedChargers(key string, skipCache bool) (dcs *utils
}
func (ms *MapStorage) SetDerivedChargers(key string, dcs *utils.DerivedChargers) error {
ms.mu.Lock()
defer ms.mu.Unlock()
if dcs == nil || len(dcs.Chargers) == 0 {
delete(ms.dict, utils.DERIVEDCHARGERS_PREFIX+key)
cache2go.RemKey(utils.DERIVEDCHARGERS_PREFIX + key)
@@ -756,12 +844,16 @@ func (ms *MapStorage) SetDerivedChargers(key string, dcs *utils.DerivedChargers)
}
func (ms *MapStorage) SetCdrStats(cs *CdrStats) error {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(cs)
ms.dict[utils.CDR_STATS_PREFIX+cs.Id] = result
return err
}
func (ms *MapStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if values, ok := ms.dict[utils.CDR_STATS_PREFIX+key]; ok {
err = ms.ms.Unmarshal(values, &cs)
} else {
@@ -771,6 +863,8 @@ func (ms *MapStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
}
func (ms *MapStorage) GetAllCdrStats() (css []*CdrStats, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
for key, value := range ms.dict {
if !strings.HasPrefix(key, utils.CDR_STATS_PREFIX) {
continue
@@ -783,12 +877,16 @@ func (ms *MapStorage) GetAllCdrStats() (css []*CdrStats, err error) {
}
func (ms *MapStorage) SetSMCost(smCost *SMCost) error {
ms.mu.Lock()
defer ms.mu.Unlock()
result, err := ms.ms.Marshal(smCost)
ms.dict[utils.LOG_CALL_COST_PREFIX+smCost.CostSource+smCost.RunID+"_"+smCost.CGRID] = result
return err
}
func (ms *MapStorage) GetSMCost(cgrid, source, runid, originHost, originID string) (smCost *SMCost, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if values, ok := ms.dict[utils.LOG_CALL_COST_PREFIX+source+runid+"_"+cgrid]; ok {
err = ms.ms.Unmarshal(values, &smCost)
} else {
@@ -798,6 +896,8 @@ func (ms *MapStorage) GetSMCost(cgrid, source, runid, originHost, originID strin
}
func (ms *MapStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
mat, err := ms.ms.Marshal(at)
if err != nil {
return
@@ -811,6 +911,8 @@ func (ms *MapStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, a
}
func (ms *MapStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
mat, err := ms.ms.Marshal(at)
if err != nil {
return

View File

@@ -274,7 +274,7 @@ func TestDifferentUuid(t *testing.T) {
func TestStorageTask(t *testing.T) {
// clean previous unused tasks
for i := 0; i < 18; i++ {
for i := 0; i < 19; i++ {
ratingStorage.PopTask()
}
@@ -303,7 +303,7 @@ func TestStorageTask(t *testing.T) {
t.Error("Error poping task: ", task, err)
}
if task, err := ratingStorage.PopTask(); err == nil && task != nil {
t.Errorf("Error poping task %+v, %v: ", task, err)
t.Errorf("Error poping task %+v, %v ", task, err)
}
}

119
general_tests/tp_it_test.go Normal file
View File

@@ -0,0 +1,119 @@
package general_tests
import (
"net/rpc"
"net/rpc/jsonrpc"
"path"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var tpCfgPath string
var tpCfg *config.CGRConfig
var tpRPC *rpc.Client
var tpLoadInst engine.LoadInstance // Share load information between tests
func TestTpInitCfg(t *testing.T) {
if !*testIntegration {
return
}
tpCfgPath = path.Join(*dataDir, "conf", "samples", "tutlocal")
// Init config first
var err error
tpCfg, err = config.NewCGRConfigFromFolder(tpCfgPath)
if err != nil {
t.Error(err)
}
tpCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush()
config.SetCgrConfig(tpCfg)
}
// Remove data in both rating and accounting db
func TestTpResetDataDb(t *testing.T) {
if !*testIntegration {
return
}
if err := engine.InitDataDb(tpCfg); err != nil {
t.Fatal(err)
}
}
// Wipe out the cdr database
func TestTpResetStorDb(t *testing.T) {
if !*testIntegration {
return
}
if err := engine.InitStorDb(tpCfg); err != nil {
t.Fatal(err)
}
}
// Start CGR Engine
func TestTpStartEngine(t *testing.T) {
if !*testIntegration {
return
}
if _, err := engine.StopStartEngine(tpCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func TestTpRpcConn(t *testing.T) {
if !*testIntegration {
return
}
var err error
tpRPC, err = jsonrpc.Dial("tcp", tpCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
// Load the tariff plan, creating accounts and their balances
func TestTpLoadTariffPlanFromFolder(t *testing.T) {
if !*testIntegration {
return
}
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "testtp")}
if err := tpRPC.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &tpLoadInst); err != nil {
t.Error(err)
} else if tpLoadInst.LoadId == "" {
t.Error("Empty loadId received, loadInstance: ", tpLoadInst)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
}
func TestTpBalanceCounter(t *testing.T) {
if !*testIntegration {
return
}
tStart := time.Date(2016, 3, 31, 0, 0, 0, 0, time.UTC)
cd := engine.CallDescriptor{
Direction: "*out",
Category: "call",
Tenant: "cgrates.org",
Subject: "1001",
Destination: "+49",
DurationIndex: 0,
TimeStart: tStart,
TimeEnd: tStart.Add(time.Duration(20) * time.Second),
}
var cc engine.CallCost
if err := tpRPC.Call("Responder.Debit", cd, &cc); err != nil {
t.Error("Got error on Responder.GetCost: ", err.Error())
} else if cc.GetDuration() == 20 {
t.Errorf("Calling Responder.MaxDebit got callcost: %v", cc.GetDuration())
}
var acnt *engine.Account
attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"}
if err := tpRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error("Got error on ApierV2.GetAccount: ", err.Error())
} else if acnt.UnitCounters[utils.MONETARY][1].Counters[0].Value != 20.0 {
t.Errorf("Calling ApierV2.GetBalance received: %s", utils.ToIJSON(acnt))
}
}

View File

@@ -342,3 +342,45 @@ func TestSMGDataLastUsedMultipleData(t *testing.T) {
t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.DATA].GetTotalValue())
}
}
func TestSMGDataDerivedChargingNoCredit(t *testing.T) {
if !*testIntegration {
return
}
var acnt *engine.Account
attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1011"}
eAcntVal := 50000.000000
if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error(err)
} else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal {
t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue())
}
smgEv := SMGenericEvent{
utils.EVENT_NAME: "TEST_EVENT",
utils.TOR: utils.VOICE,
utils.ACCID: "12349",
utils.DIRECTION: utils.OUT,
utils.ACCOUNT: "1011",
utils.SUBJECT: "1011",
utils.DESTINATION: "+49",
utils.CATEGORY: "call",
utils.TENANT: "cgrates.org",
utils.REQTYPE: utils.META_PREPAID,
utils.SETUP_TIME: "2016-01-05 18:30:49",
utils.ANSWER_TIME: "2016-01-05 18:31:05",
utils.USAGE: "100",
}
var maxUsage float64
if err := smgRPC.Call("SMGenericV1.SessionStart", smgEv, &maxUsage); err != nil {
t.Error(err)
}
if maxUsage != 100 {
t.Error("Bad max usage: ", maxUsage)
}
eAcntVal = 50000.000000
if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil {
t.Error(err)
} else if acnt.BalanceMap[utils.DATA].GetTotalValue() != eAcntVal {
t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue())
}
}

View File

@@ -28,7 +28,7 @@ import (
func NewSessions() *Sessions {
return &Sessions{
sessionsMux: new(sync.Mutex),
guard: engine.NewGuardianLock(),
guard: engine.Guardian,
}
}

View File

@@ -35,7 +35,7 @@ var ErrPartiallyExecuted = errors.New("Partially executed")
func NewSMGeneric(cgrCfg *config.CGRConfig, rater engine.Connector, cdrsrv engine.Connector, timezone string, extconns *SMGExternalConnections) *SMGeneric {
gsm := &SMGeneric{cgrCfg: cgrCfg, rater: rater, cdrsrv: cdrsrv, extconns: extconns, timezone: timezone,
sessions: make(map[string][]*SMGSession), sessionsMux: new(sync.Mutex), guard: engine.NewGuardianLock()}
sessions: make(map[string][]*SMGSession), sessionsMux: new(sync.Mutex), guard: engine.Guardian}
return gsm
}
@@ -206,14 +206,6 @@ func (self *SMGeneric) GetLcrSuppliers(gev SMGenericEvent, clnt *rpc2.Client) ([
return lcr.SuppliersSlice()
}
// Called on session start
func (self *SMGeneric) SessionStart(gev SMGenericEvent, clnt *rpc2.Client) (time.Duration, error) {
if err := self.sessionStart(gev, getClientConnId(clnt)); err != nil {
return nilDuration, err
}
return self.SessionUpdate(gev, clnt)
}
// Execute debits for usage/maxUsage
func (self *SMGeneric) SessionUpdate(gev SMGenericEvent, clnt *rpc2.Client) (time.Duration, error) {
if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil {
@@ -246,6 +238,19 @@ func (self *SMGeneric) SessionUpdate(gev SMGenericEvent, clnt *rpc2.Client) (tim
return evMaxUsage, nil
}
// Called on session start
func (self *SMGeneric) SessionStart(gev SMGenericEvent, clnt *rpc2.Client) (time.Duration, error) {
if err := self.sessionStart(gev, getClientConnId(clnt)); err != nil {
self.sessionEnd(gev.GetUUID(), 0)
return nilDuration, err
}
d, err := self.SessionUpdate(gev, clnt)
if err != nil {
self.sessionEnd(gev.GetUUID(), 0)
}
return d, err
}
// Called on session end, should stop debit loop
func (self *SMGeneric) SessionEnd(gev SMGenericEvent, clnt *rpc2.Client) error {
if initialID, err := gev.GetFieldAsString(utils.InitialOriginID); err == nil {

View File

@@ -22,7 +22,6 @@ import (
"bytes"
"crypto/tls"
"encoding/gob"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
@@ -49,16 +48,12 @@ func GetBytes(content interface{}) ([]byte, error) {
}
// Post without automatic failover
func HttpJsonPost(url string, skipTlsVerify bool, content interface{}) ([]byte, error) {
body, err := json.Marshal(content)
if err != nil {
return nil, err
}
func HttpJsonPost(url string, skipTlsVerify bool, content []byte) ([]byte, error) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTlsVerify},
}
client := &http.Client{Transport: tr}
resp, err := client.Post(url, "application/json", bytes.NewBuffer(body))
resp, err := client.Post(url, "application/json", bytes.NewBuffer(content))
if err != nil {
return nil, err
}