From c8a86788aef04e00956245b10b1b390ef03e0df8 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 15 Jun 2016 18:13:32 +0200 Subject: [PATCH] CDRs - cache ProcessCdr API --- apier/v1/apier_local_test.go | 2 +- apier/v1/cdrsv1.go | 26 ++++++++++++++++++++------ apier/v2/cdrs_psql_local_test.go | 6 +++--- engine/cdrs.go | 16 ++++------------ sessionmanager/fssessionmanager.go | 2 +- sessionmanager/kamailiosm.go | 2 +- sessionmanager/osipssm.go | 2 +- sessionmanager/smgeneric.go | 4 ++-- 8 files changed, 33 insertions(+), 27 deletions(-) diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 34028cfd0..4019a5b6d 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -1472,7 +1472,7 @@ func TestApierLocalProcessCdr(t *testing.T) { SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), RunID: utils.DEFAULT_RUNID, Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, } - if err := rater.Call("CdrsV1.ProcessCdr", cdr, &reply); err != nil { + if err := rater.Call("CdrsV1.ProcessCDR", cdr, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) diff --git a/apier/v1/cdrsv1.go b/apier/v1/cdrsv1.go index c6b93d504..2433f4730 100644 --- a/apier/v1/cdrsv1.go +++ b/apier/v1/cdrsv1.go @@ -29,16 +29,24 @@ type CdrsV1 struct { } // Designed for CGR internal usage +// Deprecated func (self *CdrsV1) ProcessCdr(cdr *engine.CDR, reply *string) error { - if err := self.CdrSrv.LocalProcessCdr(cdr); err != nil { - return utils.NewErrServerError(err) - } - *reply = utils.OK - return nil + return self.ProcessCDR(cdr, reply) +} + +// Designed for CGR internal usage +func (self *CdrsV1) ProcessCDR(cdr *engine.CDR, reply *string) error { + return self.CdrSrv.V1ProcessCDR(cdr, reply) } // Designed for external programs feeding CDRs to CGRateS +// Deprecated func (self *CdrsV1) ProcessExternalCdr(cdr *engine.ExternalCDR, reply *string) error { + return self.ProcessExternalCDR(cdr, reply) +} + +// Designed for external programs feeding CDRs to CGRateS +func (self *CdrsV1) ProcessExternalCDR(cdr *engine.ExternalCDR, reply *string) error { if err := self.CdrSrv.ProcessExternalCdr(cdr); err != nil { return utils.NewErrServerError(err) } @@ -46,8 +54,14 @@ func (self *CdrsV1) ProcessExternalCdr(cdr *engine.ExternalCDR, reply *string) e return nil } -// Remotely (re)rating, deprecated +// Remotely (re)rating +// Deprecated func (self *CdrsV1) RateCdrs(attrs utils.AttrRateCdrs, reply *string) error { + return self.RateCDRs(attrs, reply) +} + +// Remotely (re)rating +func (self *CdrsV1) RateCDRs(attrs utils.AttrRateCdrs, reply *string) error { cdrsFltr, err := attrs.AsCDRsFilter(self.CdrSrv.Timezone()) if err != nil { return utils.NewErrServerError(err) diff --git a/apier/v2/cdrs_psql_local_test.go b/apier/v2/cdrs_psql_local_test.go index 03d387c01..50ff80dd5 100644 --- a/apier/v2/cdrs_psql_local_test.go +++ b/apier/v2/cdrs_psql_local_test.go @@ -122,7 +122,7 @@ func TestV2CDRsPSQLProcessCdrRated(t *testing.T) { Cost: 1.01, CostSource: "TestV2CDRsPSQLProcessCdrRated", Rated: true, } var reply string - if err := cdrsPsqlRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil { + if err := cdrsPsqlRpc.Call("CdrsV2.ProcessCDR", cdr, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) @@ -142,7 +142,7 @@ func TestV2CDRsPSQLProcessCdrRaw(t *testing.T) { Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, } var reply string - if err := cdrsPsqlRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil { + if err := cdrsPsqlRpc.Call("CdrsV2.ProcessCDR", cdr, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) @@ -229,7 +229,7 @@ func TestV2CDRsPSQLProcessPrepaidCdr(t *testing.T) { } tStart := time.Now() for _, cdr := range cdrs { - if err := cdrsPsqlRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil { + if err := cdrsPsqlRpc.Call("CdrsV2.ProcessCDR", cdr, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) diff --git a/engine/cdrs.go b/engine/cdrs.go index 5dc77035f..82c5ec30c 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -122,11 +122,6 @@ func (self *CdrServer) RegisterHandlersToServer(server *utils.Server) { server.RegisterHttpFunc("/freeswitch_json", fsCdrHandler) } -// Used to internally process CDR -func (self *CdrServer) LocalProcessCdr(cdr *CDR) error { - return self.processCdr(cdr) -} - // Used to process external CDRs func (self *CdrServer) ProcessExternalCdr(eCDR *ExternalCDR) error { cdr, err := NewCDRFromExternalCDR(eCDR, self.cgrCfg.DefaultTimezone) @@ -494,15 +489,17 @@ func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) err return nil } +// Internally used and called from CDRSv1 +// Cached requests for HA setups func (self *CdrServer) V1ProcessCDR(cdr *CDR, reply *string) error { - cacheKey := "ProcessCdr" + cdr.CGRID + cacheKey := "V1ProcessCDR" + cdr.CGRID + cdr.RunID if item, err := self.getCache().Get(cacheKey); err == nil && item != nil { if item.Value != nil { *reply = item.Value.(string) } return item.Err } - if err := self.LocalProcessCdr(cdr); err != nil { + if err := self.processCdr(cdr); err != nil { self.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err}) return utils.NewErrServerError(err) } @@ -511,11 +508,6 @@ func (self *CdrServer) V1ProcessCDR(cdr *CDR, reply *string) error { return nil } -// Alias, deprecated after removing CdrServerV1.ProcessCdr -func (self *CdrServer) V1ProcessCdr(cdr *CDR, reply *string) error { - return self.V1ProcessCDR(cdr, reply) -} - // RPC method, differs from storeSMCost through it's signature func (self *CdrServer) V1StoreSMCost(attr AttrCDRSStoreSMCost, reply *string) error { if err := self.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil { diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 8fa3faa54..37e4a86a3 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -298,7 +298,7 @@ func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify st func (sm *FSSessionManager) ProcessCdr(storedCdr *engine.CDR) error { var reply string - if err := sm.cdrsrv.Call("CdrsV1.ProcessCdr", storedCdr, &reply); err != nil { + if err := sm.cdrsrv.Call("CdrsV1.ProcessCDR", storedCdr, &reply); err != nil { utils.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CGRID, storedCdr.OriginID, err.Error())) } return nil diff --git a/sessionmanager/kamailiosm.go b/sessionmanager/kamailiosm.go index be0b3eacd..86b0352cd 100644 --- a/sessionmanager/kamailiosm.go +++ b/sessionmanager/kamailiosm.go @@ -210,7 +210,7 @@ func (self *KamailioSessionManager) ProcessCdr(cdr *engine.CDR) error { return nil } var reply string - if err := self.cdrsrv.Call("CdrsV1.ProcessCdr", cdr, &reply); err != nil { + if err := self.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply); err != nil { utils.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", cdr.CGRID, cdr.OriginID, err.Error())) } return nil diff --git a/sessionmanager/osipssm.go b/sessionmanager/osipssm.go index 0a27c1458..ae021afbe 100644 --- a/sessionmanager/osipssm.go +++ b/sessionmanager/osipssm.go @@ -153,7 +153,7 @@ func (osm *OsipsSessionManager) Shutdown() error { // Process the CDR with CDRS component func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.CDR) error { var reply string - return osm.cdrsrv.Call("CdrsV1.ProcessCdr", storedCdr, &reply) + return osm.cdrsrv.Call("CdrsV1.ProcessCDR", storedCdr, &reply) } // Disconnects the session diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index de1135168..31869cff4 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -91,7 +91,7 @@ func (self *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) { cdr := s.eventStart.AsStoredCdr(self.cgrCfg, self.timezone) cdr.Usage = s.TotalUsage() var reply string - self.cdrsrv.Call("CdrsV1.ProcessCdr", cdr, &reply) + self.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply) } func (self *SMGeneric) indexSession(uuid string, s *SMGSession) { @@ -491,7 +491,7 @@ func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDu func (self *SMGeneric) ProcessCDR(gev SMGenericEvent) error { var reply string - if err := self.cdrsrv.Call("CdrsV1.ProcessCdr", gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil { + if err := self.cdrsrv.Call("CdrsV1.ProcessCDR", gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil { return err } return nil