From afea97d0b50e88dcc4fe67f3ed878878c98f750f Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 9 Jun 2015 11:45:30 +0300 Subject: [PATCH] use cdrs for callcost logging --- cmd/cgr-engine/cgr-engine.go | 6 +++--- engine/cdrs.go | 27 ++++++++++++++++++++++++++- engine/responder.go | 16 ++++++++++++++++ sessionmanager/fssessionmanager.go | 14 +++++++++----- sessionmanager/kamailiosm.go | 16 ++++++++-------- sessionmanager/osipssm.go | 14 +++++++------- sessionmanager/session.go | 10 +++++++++- sessionmanager/sessionmanager.go | 2 +- 8 files changed, 79 insertions(+), 26 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 858d54a9a..e9b53c341 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -153,7 +153,7 @@ func startSmFreeSWITCH(responder *engine.Responder, cdrDb engine.CdrStorage, cac } cdrsConn = &engine.RPCClientConnector{Client: client} } - sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, cdrDb, raterConn, cdrsConn) + sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, raterConn, cdrsConn) if err = sm.Connect(); err != nil { engine.Logger.Err(fmt.Sprintf(" error: %s!", err)) } @@ -203,7 +203,7 @@ func startSmKamailio(responder *engine.Responder, cdrDb engine.CdrStorage, cache } cdrsConn = &engine.RPCClientConnector{Client: client} } - sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, raterConn, cdrsConn, cdrDb) + sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, raterConn, cdrsConn) if err = sm.Connect(); err != nil { engine.Logger.Err(fmt.Sprintf(" error: %s!", err)) } @@ -253,7 +253,7 @@ func startSmOpenSIPS(responder *engine.Responder, cdrDb engine.CdrStorage, cache } cdrsConn = &engine.RPCClientConnector{Client: client} } - sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn, cdrDb) + sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn) if err := sm.Connect(); err != nil { engine.Logger.Err(fmt.Sprintf(" error: %s!", err)) } diff --git a/engine/cdrs.go b/engine/cdrs.go index 1087d78ed..4fa47a5e4 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -98,6 +98,18 @@ func (self *CdrServer) ProcessExternalCdr(cdr *ExternalCdr) error { return self.rateStoreStatsReplicate(storedCdr) } +type CallCostLog struct { + CgrId string + Source string + RunId string + CallCost *CallCost +} + +// RPC method, used to log callcosts to db +func (self *CdrServer) LogCallCost(ccl *CallCostLog) error { + return self.cdrDb.LogCallCost(ccl.CgrId, ccl.Source, ccl.RunId, ccl.CallCost) +} + // Called by rate/re-rate API func (self *CdrServer) RateCdrs(cgrIds, runIds, tors, cdrHosts, cdrSources, reqTypes, directions, tenants, categories, accounts, subjects, destPrefixes, ratedAccounts, ratedSubjects []string, orderIdStart, orderIdEnd int64, timeStart, timeEnd time.Time, rerateErrors, rerateRated, sendToStats bool) error { @@ -290,7 +302,20 @@ func (self *CdrServer) rateCDR(storedCdr *StoredCdr) error { var errCost error if utils.IsSliceMember([]string{utils.META_PREPAID, utils.PREPAID}, storedCdr.ReqType) { // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards // Should be previously calculated and stored in DB - qryCC, errCost = self.getCostsFromDB(storedCdr.CgrId, storedCdr.MediationRunId) + delay := utils.Fib() + var err error + for i := 0; i < 4; i++ { + qryCC, errCost = self.getCostsFromDB(storedCdr.CgrId, storedCdr.MediationRunId) + + if err == nil { //Connected so no need to reiterate + break + } + time.Sleep(delay()) + } + if err != nil { //calculate CDR as for pseudoprepaid + qryCC, errCost = self.getCostFromRater(storedCdr) + } + } else { qryCC, errCost = self.getCostFromRater(storedCdr) } diff --git a/engine/responder.go b/engine/responder.go index 2c011ec04..603f23134 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -236,6 +236,17 @@ func (rs *Responder) ProcessCdr(cdr *StoredCdr, reply *string) error { return nil } +func (rs *Responder) LogCallCost(ccl *CallCostLog, reply *int) error { + if rs.CdrSrv == nil { + return errors.New("CDR_SERVER_NOT_RUNNING") + } + if err := rs.CdrSrv.LogCallCost(ccl); err != nil { + return err + } + *reply = 0 + return nil +} + func (rs *Responder) GetLCR(cd *CallDescriptor, reply *LCRCost) error { lcrCost, err := cd.GetLCR(rs.Stats) if err != nil { @@ -403,6 +414,7 @@ type Connector interface { GetDerivedMaxSessionTime(StoredCdr, *float64) error GetSessionRuns(StoredCdr, *[]*SessionRun) error ProcessCdr(*StoredCdr, *string) error + LogCallCost(*CallCostLog, *int) error GetLCR(*CallDescriptor, *LCRCost) error } @@ -446,6 +458,10 @@ func (rcc *RPCClientConnector) ProcessCdr(cdr *StoredCdr, reply *string) error { return rcc.Client.Call("CDRSV1.ProcessCdr", cdr, reply) } +func (rcc *RPCClientConnector) LogCallCost(ccl *CallCostLog, reply *int) error { + return rcc.Client.Call("CDRSV1.LogCallCost", ccl, reply) +} + func (rcc *RPCClientConnector) GetLCR(cd *CallDescriptor, reply *LCRCost) error { return rcc.Client.Call("Responder.GetLCR", cd, reply) } diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 641988d35..78f9b85a6 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -38,11 +38,15 @@ type FSSessionManager struct { sessions []*Session rater engine.Connector cdrs engine.Connector - cdrDb engine.CdrStorage } -func NewFSSessionManager(smFsConfig *config.SmFsConfig, storage engine.CdrStorage, rater, cdrs engine.Connector) *FSSessionManager { - return &FSSessionManager{cfg: smFsConfig, conns: make(map[string]*fsock.FSock), cdrDb: storage, rater: rater, cdrs: cdrs} +func NewFSSessionManager(smFsConfig *config.SmFsConfig, rater, cdrs engine.Connector) *FSSessionManager { + return &FSSessionManager{ + cfg: smFsConfig, + conns: make(map[string]*fsock.FSock), + rater: rater, + cdrs: cdrs, + } } // Connects to the freeswitch mod_event_socket server and starts @@ -268,8 +272,8 @@ func (sm *FSSessionManager) ProcessCdr(storedCdr *engine.StoredCdr) error { func (sm *FSSessionManager) DebitInterval() time.Duration { return sm.cfg.DebitInterval } -func (sm *FSSessionManager) CdrDb() engine.CdrStorage { - return sm.cdrDb +func (sm *FSSessionManager) CdrSrv() engine.Connector { + return sm.cdrs } func (sm *FSSessionManager) Rater() engine.Connector { diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index 4afab5904..97dcdc02f 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -21,17 +21,18 @@ package sessionmanager import ( "errors" "fmt" + "log/syslog" + "regexp" + "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/kamevapi" - "log/syslog" - "regexp" - "time" ) -func NewKamailioSessionManager(smKamCfg *config.SmKamConfig, rater, cdrsrv engine.Connector, cdrDb engine.CdrStorage) (*KamailioSessionManager, error) { - ksm := &KamailioSessionManager{cfg: smKamCfg, rater: rater, cdrsrv: cdrsrv, cdrDb: cdrDb, conns: make(map[string]*kamevapi.KamEvapi)} +func NewKamailioSessionManager(smKamCfg *config.SmKamConfig, rater, cdrsrv engine.Connector) (*KamailioSessionManager, error) { + ksm := &KamailioSessionManager{cfg: smKamCfg, rater: rater, cdrsrv: cdrsrv, conns: make(map[string]*kamevapi.KamEvapi)} return ksm, nil } @@ -39,7 +40,6 @@ type KamailioSessionManager struct { cfg *config.SmKamConfig rater engine.Connector cdrsrv engine.Connector - cdrDb engine.CdrStorage conns map[string]*kamevapi.KamEvapi sessions []*Session } @@ -214,8 +214,8 @@ func (self *KamailioSessionManager) GetSession(uuid string) *Session { func (self *KamailioSessionManager) DebitInterval() time.Duration { return self.cfg.DebitInterval } -func (self *KamailioSessionManager) CdrDb() engine.CdrStorage { - return self.cdrDb +func (self *KamailioSessionManager) CdrSrv() engine.Connector { + return self.cdrsrv } func (self *KamailioSessionManager) Rater() engine.Connector { return self.rater diff --git a/sessionmanager/osipssm.go b/sessionmanager/osipssm.go index b69121f1c..d2f7cfead 100644 --- a/sessionmanager/osipssm.go +++ b/sessionmanager/osipssm.go @@ -22,12 +22,13 @@ import ( "bytes" "errors" "fmt" + "strings" + "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/osipsdagram" - "strings" - "time" ) /* @@ -79,8 +80,8 @@ duration:: */ -func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, rater, cdrsrv engine.Connector, cdrDb engine.CdrStorage) (*OsipsSessionManager, error) { - osm := &OsipsSessionManager{cfg: smOsipsCfg, rater: rater, cdrsrv: cdrsrv, cdrDb: cdrDb, cdrStartEvents: make(map[string]*OsipsEvent)} +func NewOSipsSessionManager(smOsipsCfg *config.SmOsipsConfig, rater, cdrsrv engine.Connector) (*OsipsSessionManager, error) { + osm := &OsipsSessionManager{cfg: smOsipsCfg, rater: rater, cdrsrv: cdrsrv, cdrStartEvents: make(map[string]*OsipsEvent)} osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){ "E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.onOpensipsStart}, // Raised when OpenSIPS starts so we can register our event handlers "E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.onCdr}, // Raised if cdr_flag is configured @@ -94,7 +95,6 @@ type OsipsSessionManager struct { cfg *config.SmOsipsConfig rater engine.Connector cdrsrv engine.Connector - cdrDb engine.CdrStorage eventHandlers map[string][]func(*osipsdagram.OsipsEvent) evSubscribeStop chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it stopServing chan struct{} // Stop serving datagrams @@ -138,8 +138,8 @@ func (osm *OsipsSessionManager) DebitInterval() time.Duration { } // Returns the connection to local cdr database, used by session to log it's final costs -func (osm *OsipsSessionManager) CdrDb() engine.CdrStorage { - return osm.cdrDb +func (osm *OsipsSessionManager) CdrSrv() engine.Connector { + return osm.cdrsrv } // Returns connection to rater/controller diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 0a7b2cf1a..90d65b6f4 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -204,6 +204,14 @@ func (s *Session) SaveOperations() { for _, cc := range sr.CallCosts[1:] { firstCC.Merge(cc) } - s.sessionManager.CdrDb().LogCallCost(s.eventStart.GetCgrId(), engine.SESSION_MANAGER_SOURCE, sr.DerivedCharger.RunId, firstCC) + var existingDuration int + s.sessionManager.CdrSrv().LogCallCost(&engine.CallCostLog{ + CgrId: s.eventStart.GetCgrId(), + Source: engine.SESSION_MANAGER_SOURCE, + RunId: sr.DerivedCharger.RunId, + CallCost: firstCC, + }, &existingDuration) + // on duplicate error refound extra from existing database + } } diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index 3673b361e..086313fcb 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -25,8 +25,8 @@ import ( ) type SessionManager interface { - CdrDb() engine.CdrStorage Rater() engine.Connector + CdrSrv() engine.Connector DebitInterval() time.Duration Connect() error DisconnectSession(engine.Event, string, string) error