Action *cdrlog to StorDb, moved GetCallCost and LogCallCost in CdrStorage, local tests for *cdrlog

This commit is contained in:
DanB
2015-04-29 19:50:03 +02:00
parent 3ce339a592
commit 5ce11a28a3
28 changed files with 138 additions and 401 deletions

View File

@@ -129,11 +129,10 @@ func logAction(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (er
}
func cdrLogAction(acc *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
tor, _ := utils.NewRSRField("^tor_test")
cdrhost, _ := utils.NewRSRField("^127.0.0.1")
defaultTemplate := map[string]*utils.RSRField{
"TOR": tor,
"CdrHost": cdrhost,
"CdrHost": utils.ParseRSRFieldsMustCompile("^127.0.0.1", utils.INFIELD_SEP)[0],
"ReqType": utils.ParseRSRFieldsMustCompile("^"+utils.META_PREPAID, utils.INFIELD_SEP)[0],
"MediationRunId": utils.ParseRSRFieldsMustCompile("^"+utils.META_DEFAULT, utils.INFIELD_SEP)[0],
}
template := make(map[string]string)
@@ -143,38 +142,61 @@ func cdrLogAction(acc *Account, sq *StatsQueueTriggered, a *Action, acs Actions)
return
}
for field, rsr := range template {
if rsrField, err := utils.NewRSRField(rsr); err == nil {
defaultTemplate[field] = rsrField
} else {
defaultTemplate[field], err = utils.NewRSRField(rsr)
if err != nil {
return err
}
}
}
// set sored cdr values
// set stored cdr values
var cdrs []*StoredCdr
for _, action := range acs {
if action.ActionType == DEBIT || action.ActionType == DEBIT_RESET {
cdr := &StoredCdr{CdrSource: CDRLOG, SetupTime: time.Now(), AnswerTime: time.Now(), AccId: utils.GenUUID()}
cdr.CgrId = utils.Sha1(cdr.AccId, cdr.SetupTime.String())
elem := reflect.ValueOf(cdr).Elem()
for key, rsr := range defaultTemplate {
field := elem.FieldByName(key)
if field.IsValid() && field.CanSet() {
switch field.Kind() {
case reflect.Float64:
value, err := strconv.ParseFloat(rsr.ParseValue(""), 64)
if err != nil {
continue
}
field.SetFloat(value)
case reflect.String:
field.SetString(rsr.ParseValue(""))
if !utils.IsSliceMember([]string{DEBIT, DEBIT_RESET}, action.ActionType) {
continue // Only log specific actions
}
cdr := &StoredCdr{CdrSource: CDRLOG, SetupTime: time.Now(), AnswerTime: time.Now(), AccId: utils.GenUUID()}
cdr.CgrId = utils.Sha1(cdr.AccId, cdr.SetupTime.String())
cdr.Usage = time.Duration(1) * time.Second
elem := reflect.ValueOf(cdr).Elem()
for key, rsr := range defaultTemplate {
field := elem.FieldByName(key)
if field.IsValid() && field.CanSet() {
switch field.Kind() {
case reflect.Float64:
value, err := strconv.ParseFloat(rsr.ParseValue(""), 64)
if err != nil {
continue
}
field.SetFloat(value)
case reflect.String:
field.SetString(rsr.ParseValue(""))
}
}
cdrs = append(cdrs, cdr)
}
// Hardcode the data for now, expand it with templates later
cdr.TOR = action.BalanceType
cdr.Direction = action.Direction
if action.Balance != nil {
cdr.Cost = action.Balance.Value
}
Logger.Debug(fmt.Sprintf("action: %+v, balance: %+v", action, action.Balance))
cdrs = append(cdrs, cdr)
if cdrStorage == nil { // Only save if the cdrStorage is defined
continue
}
Logger.Debug(fmt.Sprintf("SetCdr: %+v\n", cdr))
if err := cdrStorage.SetCdr(cdr); err != nil {
return err
}
if err := cdrStorage.SetRatedCdr(cdr); err != nil {
return err
}
// FixMe
//if err := cdrStorage.LogCallCost(); err != nil {
// return err
//}
}
b, _ := json.Marshal(cdrs)

View File

@@ -1,5 +1,5 @@
/*
Rating system designed to be used in VoIP Carriers World
Real-time Charging System for Telecom & ISP environments
Copyright (C) 2012-2015 ITsysCOM
This program is free software: you can redistribute it and/or modify
@@ -1126,7 +1126,7 @@ func TestActionCdrlogEmpty(t *testing.T) {
}
cdrs := make([]*StoredCdr, 0)
json.Unmarshal([]byte(cdrlog.ExpirationString), &cdrs)
if len(cdrs) != 1 || cdrs[0].TOR != "tor_test" {
if len(cdrs) != 1 || cdrs[0].CdrSource != CDRLOG {
t.Errorf("Wrong cdrlogs: %+v", cdrs[0])
}
}
@@ -1150,7 +1150,6 @@ func TestActionCdrlogWithParams(t *testing.T) {
cdrs := make([]*StoredCdr, 0)
json.Unmarshal([]byte(cdrlog.ExpirationString), &cdrs)
if len(cdrs) != 2 ||
cdrs[0].TOR != "tor_test" ||
cdrs[0].Subject != "rif" {
t.Errorf("Wrong cdrlogs: %+v", cdrs[0])
}

View File

@@ -64,6 +64,7 @@ var (
dataStorage RatingStorage
accountingStorage AccountingStorage
storageLogger LogStorage
cdrStorage CdrStorage
debitPeriod = 10 * time.Second
globalRoundingDecimals = 10
historyScribe history.Scribe
@@ -91,6 +92,13 @@ func SetStorageLogger(sg LogStorage) {
storageLogger = sg
}
/*
Sets the database for CDR storing, used by *cdrlog in first place
*/
func SetCdrStorage(cStorage CdrStorage) {
cdrStorage = cStorage
}
// Exported method to set the history scribe.
func SetHistoryScribe(scribe history.Scribe) {
historyScribe = scribe

View File

@@ -53,8 +53,8 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
}
}
func NewCdrServer(cgrCfg *config.CGRConfig, logDb LogStorage, cdrDb CdrStorage, rater Connector, stats StatsInterface) (*CdrServer, error) {
return &CdrServer{cgrCfg: cgrCfg, logDb: logDb, cdrDb: cdrDb, rater: rater, stats: stats}, nil
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, stats StatsInterface) (*CdrServer, error) {
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, stats: stats}, nil
/*
if cfg.CDRSStats != "" {
if cfg.CDRSStats != utils.INTERNAL {
@@ -73,7 +73,6 @@ func NewCdrServer(cgrCfg *config.CGRConfig, logDb LogStorage, cdrDb CdrStorage,
type CdrServer struct {
cgrCfg *config.CGRConfig
logDb LogStorage
cdrDb CdrStorage
rater Connector
stats StatsInterface
@@ -150,7 +149,7 @@ func (self *CdrServer) rateStoreStatsReplicate(storedCdr *StoredCdr) (err error)
}
// Store CostDetails
if cdr.Rated || utils.IsSliceMember([]string{utils.RATED, utils.META_RATED}, cdr.ReqType) { // Account related CDRs are saved automatically, so save the others here if requested
if err := self.logDb.LogCallCost(cdr.CgrId, utils.CDRS_SOURCE, cdr.MediationRunId, storedCdr.CostDetails); err != nil {
if err := self.cdrDb.LogCallCost(cdr.CgrId, utils.CDRS_SOURCE, cdr.MediationRunId, storedCdr.CostDetails); err != nil {
Logger.Err(fmt.Sprintf("<CDRS> Storing costs for CDR %+v, costDetails: %+v, got error: %s", cdr, cdr.CostDetails, err.Error()))
}
}
@@ -191,7 +190,7 @@ func (self *CdrServer) deriveAndRateCdr(storedCdr *StoredCdr) ([]*StoredCdr, err
// Retrive the cost from logging database, nil in case of no log
func (self *CdrServer) getCostsFromDB(cgrid, runId string) (cc *CallCost, err error) {
for i := 0; i < 3; i++ { // Mechanism to avoid concurrency between SessionManager writing the costs and mediator picking them up
cc, err = self.logDb.GetCallCostLog(cgrid, SESSION_MANAGER_SOURCE, runId)
cc, err = self.cdrDb.GetCallCostLog(cgrid, SESSION_MANAGER_SOURCE, runId)
if cc != nil {
break
}
@@ -221,7 +220,7 @@ func (self *CdrServer) getCostFromRater(storedCdr *StoredCdr) (*CallCost, error)
}
if utils.IsSliceMember([]string{utils.META_PSEUDOPREPAID, utils.META_POSTPAID, utils.PSEUDOPREPAID, utils.POSTPAID}, storedCdr.ReqType) {
if err = self.rater.Debit(cd, cc); err == nil { // Debit has occured, we are forced to write the log, even if CDR store is disabled
self.logDb.LogCallCost(storedCdr.CgrId, MEDIATOR_SOURCE, storedCdr.MediationRunId, cc)
self.cdrDb.LogCallCost(storedCdr.CgrId, MEDIATOR_SOURCE, storedCdr.MediationRunId, cc)
}
} else {
err = self.rater.GetCost(cd, cc)

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"fmt"
"path"
"reflect"
"testing"
@@ -112,7 +111,6 @@ func TestCdrsHttpJsonRpcCdrReplication(t *testing.T) {
} else {
rcvSetupTime, _ := utils.ParseTimeDetectLayout(rcvedCdrs[0].SetupTime)
rcvAnswerTime, _ := utils.ParseTimeDetectLayout(rcvedCdrs[0].AnswerTime)
fmt.Printf("rcv: %+v, answer: %+v", rcvSetupTime, rcvAnswerTime)
rcvUsage, _ := utils.ParseDurationWithSecs(rcvedCdrs[0].Usage)
if rcvedCdrs[0].CgrId != testCdr1.CgrId ||
rcvedCdrs[0].TOR != testCdr1.TOR ||

View File

@@ -1,44 +0,0 @@
/*
Real-time Charging System for Telecom & ISP environments
Copyright (C) 2012-2015 ITsysCOM GmbH
This program is free software: you can Storagetribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITH*out ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
/*
func NewMediator(connector Connector, logDb LogStorage, cdrDb CdrStorage, st StatsInterface, cfg *config.CGRConfig) (m *Mediator, err error) {
m = &Mediator{
connector: connector,
logDb: logDb,
cdrDb: cdrDb,
stats: st,
cgrCfg: cfg,
}
if m.cgrCfg.MediatorStats != "" {
if m.cgrCfg.MediatorStats != utils.INTERNAL {
if s, err := NewProxyStats(m.cgrCfg.MediatorStats); err == nil {
m.stats = s
} else {
Logger.Err(fmt.Sprintf("Errors connecting to CDRS stats service (mediator): %s", err.Error()))
}
}
} else {
// disable stats for mediator
m.stats = nil
}
return m, nil
}
*/

View File

@@ -114,6 +114,8 @@ type CdrStorage interface {
Storage
SetCdr(*StoredCdr) error
SetRatedCdr(*StoredCdr) error
LogCallCost(cgrid, source, runid string, cc *CallCost) error
GetCallCostLog(cgrid, source, runid string) (*CallCost, error)
GetStoredCdrs(*utils.CdrsFilter) ([]*StoredCdr, int64, error)
RemStoredCdrs([]string) error
}
@@ -121,11 +123,9 @@ type CdrStorage interface {
type LogStorage interface {
Storage
//GetAllActionTimingsLogs() (map[string]ActionsTimings, error)
LogCallCost(cgrid, source, runid string, cc *CallCost) error
LogError(uuid, source, runid, errstr string) error
LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) error
LogActionTiming(source string, at *ActionTiming, as Actions) error
GetCallCostLog(cgrid, source, runid string) (*CallCost, error)
}
type LoadStorage interface {