use cdrs for callcost logging

This commit is contained in:
Radu Ioan Fericean
2015-06-09 11:45:30 +03:00
parent cfc766677a
commit afea97d0b5
8 changed files with 79 additions and 26 deletions

View File

@@ -153,7 +153,7 @@ func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cac
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, cdrDb, raterConn, cdrsConn)
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, raterConn, cdrsConn)
if err = sm.Connect(); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> error: %s!", err))
}
@@ -203,7 +203,7 @@ func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cache
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, raterConn, cdrsConn, cdrDb)
sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, raterConn, cdrsConn)
if err = sm.Connect(); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> error: %s!", err))
}
@@ -253,7 +253,7 @@ func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cache
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn, cdrDb)
sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn)
if err := sm.Connect(); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> error: %s!", err))
}

View File

@@ -98,6 +98,18 @@ func (self *CdrServer) ProcessExternalCdr(cdr *ExternalCdr) error {
return self.rateStoreStatsReplicate(storedCdr)
}
type CallCostLog struct {
CgrId string
Source string
RunId string
CallCost *CallCost
}
// RPC method, used to log callcosts to db
func (self *CdrServer) LogCallCost(ccl *CallCostLog) error {
return self.cdrDb.LogCallCost(ccl.CgrId, ccl.Source, ccl.RunId, ccl.CallCost)
}
// Called by rate/re-rate API
func (self *CdrServer) RateCdrs(cgrIds, runIds, tors, cdrHosts, cdrSources, reqTypes, directions, tenants, categories, accounts, subjects, destPrefixes, ratedAccounts, ratedSubjects []string,
orderIdStart, orderIdEnd int64, timeStart, timeEnd time.Time, rerateErrors, rerateRated, sendToStats bool) error {
@@ -290,7 +302,20 @@ func (self *CdrServer) rateCDR(storedCdr *StoredCdr) error {
var errCost error
if utils.IsSliceMember([]string{utils.META_PREPAID, utils.PREPAID}, storedCdr.ReqType) { // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards
// Should be previously calculated and stored in DB
qryCC, errCost = self.getCostsFromDB(storedCdr.CgrId, storedCdr.MediationRunId)
delay := utils.Fib()
var err error
for i := 0; i < 4; i++ {
qryCC, errCost = self.getCostsFromDB(storedCdr.CgrId, storedCdr.MediationRunId)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(delay())
}
if err != nil { //calculate CDR as for pseudoprepaid
qryCC, errCost = self.getCostFromRater(storedCdr)
}
} else {
qryCC, errCost = self.getCostFromRater(storedCdr)
}

View File

@@ -236,6 +236,17 @@ func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error {
return nil
}
func (rs *Responder) LogCallCost(ccl *CallCostLog, reply *int) error {
if rs.CdrSrv == nil {
return errors.New("CDR_SERVER_NOT_RUNNING")
}
if err := rs.CdrSrv.LogCallCost(ccl); err != nil {
return err
}
*reply = 0
return nil
}
func (rs *Responder) GetLCR(cd *CallDescriptor, reply *LCRCost) error {
lcrCost, err := cd.GetLCR(rs.Stats)
if err != nil {
@@ -403,6 +414,7 @@ type Connector interface {
GetDerivedMaxSessionTime(StoredCdr, *float64) error
GetSessionRuns(StoredCdr, *[]*SessionRun) error
ProcessCdr(*StoredCdr, *string) error
LogCallCost(*CallCostLog, *int) error
GetLCR(*CallDescriptor, *LCRCost) error
}
@@ -446,6 +458,10 @@ func (rcc *RPCClientConnector) ProcessCdr(cdr *StoredCdr, reply *string) error {
return rcc.Client.Call("CDRSV1.ProcessCdr", cdr, reply)
}
func (rcc *RPCClientConnector) LogCallCost(ccl *CallCostLog, reply *int) error {
return rcc.Client.Call("CDRSV1.LogCallCost", ccl, reply)
}
func (rcc *RPCClientConnector) GetLCR(cd *CallDescriptor, reply *LCRCost) error {
return rcc.Client.Call("Responder.GetLCR", cd, reply)
}

View File

@@ -38,11 +38,15 @@ type FSSessionManager struct {
sessions []*Session
rater engine.Connector
cdrs engine.Connector
cdrDb engine.CdrStorage
}
func NewFSSessionManager(smFsConfig *config.SmFsConfig, storage engine.CdrStorage, rater, cdrs engine.Connector) *FSSessionManager {
return &FSSessionManager{cfg: smFsConfig, conns: make(map[string]*fsock.FSock), cdrDb: storage, rater: rater, cdrs: cdrs}
func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs engine.Connector) *FSSessionManager {
return &FSSessionManager{
cfg: smFsConfig,
conns: make(map[string]*fsock.FSock),
rater: rater,
cdrs: cdrs,
}
}
// Connects to the freeswitch mod_event_socket server and starts
@@ -268,8 +272,8 @@ func (sm *FSSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error {
func (sm *FSSessionManager) DebitInterval() time.Duration {
return sm.cfg.DebitInterval
}
func (sm *FSSessionManager) CdrDb() engine.CdrStorage {
return sm.cdrDb
func (sm *FSSessionManager) CdrSrv() engine.Connector {
return sm.cdrs
}
func (sm *FSSessionManager) Rater() engine.Connector {

View File

@@ -21,17 +21,18 @@ package sessionmanager
import (
"errors"
"fmt"
"log/syslog"
"regexp"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/kamevapi"
"log/syslog"
"regexp"
"time"
)
func NewKamailioSessionManager(smKamCfg *config.SmKamConfig, rater, cdrsrv engine.Connector, cdrDb engine.CdrStorage) (*KamailioSessionManager, error) {
ksm := &KamailioSessionManager{cfg: smKamCfg, rater: rater, cdrsrv: cdrsrv, cdrDb: cdrDb, conns: make(map[string]*kamevapi.KamEvapi)}
func NewKamailioSessionManager(smKamCfg *config.SmKamConfig, rater, cdrsrv engine.Connector) (*KamailioSessionManager, error) {
ksm := &KamailioSessionManager{cfg: smKamCfg, rater: rater, cdrsrv: cdrsrv, conns: make(map[string]*kamevapi.KamEvapi)}
return ksm, nil
}
@@ -39,7 +40,6 @@ type KamailioSessionManager struct {
cfg *config.SmKamConfig
rater engine.Connector
cdrsrv engine.Connector
cdrDb engine.CdrStorage
conns map[string]*kamevapi.KamEvapi
sessions []*Session
}
@@ -214,8 +214,8 @@ func (self *KamailioSessionManager) GetSession(uuid string) *Session {
func (self *KamailioSessionManager) DebitInterval() time.Duration {
return self.cfg.DebitInterval
}
func (self *KamailioSessionManager) CdrDb() engine.CdrStorage {
return self.cdrDb
func (self *KamailioSessionManager) CdrSrv() engine.Connector {
return self.cdrsrv
}
func (self *KamailioSessionManager) Rater() engine.Connector {
return self.rater

View File

@@ -22,12 +22,13 @@ import (
"bytes"
"errors"
"fmt"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/osipsdagram"
"strings"
"time"
)
/*
@@ -79,8 +80,8 @@ duration::
*/
func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, rater, cdrsrv engine.Connector, cdrDb engine.CdrStorage) (*OsipsSessionManager, error) {
osm := &OsipsSessionManager{cfg: smOsipsCfg, rater: rater, cdrsrv: cdrsrv, cdrDb: cdrDb, cdrStartEvents: make(map[string]*OsipsEvent)}
func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, rater, cdrsrv engine.Connector) (*OsipsSessionManager, error) {
osm := &OsipsSessionManager{cfg: smOsipsCfg, rater: rater, cdrsrv: cdrsrv, cdrStartEvents: make(map[string]*OsipsEvent)}
osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){
"E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart}, // Raised when OpenSIPS starts so we can register our event handlers
"E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr}, // Raised if cdr_flag is configured
@@ -94,7 +95,6 @@ type OsipsSessionManager struct {
cfg *config.SmOsipsConfig
rater engine.Connector
cdrsrv engine.Connector
cdrDb engine.CdrStorage
eventHandlers map[string][]func(*osipsdagram.OsipsEvent)
evSubscribeStop chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it
stopServing chan struct{} // Stop serving datagrams
@@ -138,8 +138,8 @@ func (osm *OsipsSessionManager) DebitInterval() time.Duration {
}
// Returns the connection to local cdr database, used by session to log it's final costs
func (osm *OsipsSessionManager) CdrDb() engine.CdrStorage {
return osm.cdrDb
func (osm *OsipsSessionManager) CdrSrv() engine.Connector {
return osm.cdrsrv
}
// Returns connection to rater/controller

View File

@@ -204,6 +204,14 @@ func (s *Session) SaveOperations() {
for _, cc := range sr.CallCosts[1:] {
firstCC.Merge(cc)
}
s.sessionManager.CdrDb().LogCallCost(s.eventStart.GetCgrId(), engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC)
var existingDuration int
s.sessionManager.CdrSrv().LogCallCost(&engine.CallCostLog{
CgrId: s.eventStart.GetCgrId(),
Source: engine.SESSION_MANAGER_SOURCE,
RunId: sr.DerivedCharger.RunId,
CallCost: firstCC,
}, &existingDuration)
// on duplicate error refound extra from existing database
}
}

View File

@@ -25,8 +25,8 @@ import (
)
type SessionManager interface {
CdrDb() engine.CdrStorage
Rater() engine.Connector
CdrSrv() engine.Connector
DebitInterval() time.Duration
Connect() error
DisconnectSession(engine.Event, string, string) error