CDRs - cache ProcessCdr API

This commit is contained in:
DanB
2016-06-15 18:13:32 +02:00
parent df4b2cb3c9
commit c8a86788ae
8 changed files with 33 additions and 27 deletions

View File

@@ -1472,7 +1472,7 @@ func TestApierLocalProcessCdr(t *testing.T) {
SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), RunID: utils.DEFAULT_RUNID,
Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
}
if err := rater.Call("CdrsV1.ProcessCdr", cdr, &reply); err != nil {
if err := rater.Call("CdrsV1.ProcessCDR", cdr, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)

View File

@@ -29,16 +29,24 @@ type CdrsV1 struct {
}
// Designed for CGR internal usage
// Deprecated
func (self *CdrsV1) ProcessCdr(cdr *engine.CDR, reply *string) error {
if err := self.CdrSrv.LocalProcessCdr(cdr); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
return nil
return self.ProcessCDR(cdr, reply)
}
// Designed for CGR internal usage
func (self *CdrsV1) ProcessCDR(cdr *engine.CDR, reply *string) error {
return self.CdrSrv.V1ProcessCDR(cdr, reply)
}
// Designed for external programs feeding CDRs to CGRateS
// Deprecated
func (self *CdrsV1) ProcessExternalCdr(cdr *engine.ExternalCDR, reply *string) error {
return self.ProcessExternalCDR(cdr, reply)
}
// Designed for external programs feeding CDRs to CGRateS
func (self *CdrsV1) ProcessExternalCDR(cdr *engine.ExternalCDR, reply *string) error {
if err := self.CdrSrv.ProcessExternalCdr(cdr); err != nil {
return utils.NewErrServerError(err)
}
@@ -46,8 +54,14 @@ func (self *CdrsV1) ProcessExternalCdr(cdr *engine.ExternalCDR, reply *string) e
return nil
}
// Remotely (re)rating, deprecated
// Remotely (re)rating
// Deprecated
func (self *CdrsV1) RateCdrs(attrs utils.AttrRateCdrs, reply *string) error {
return self.RateCDRs(attrs, reply)
}
// Remotely (re)rating
func (self *CdrsV1) RateCDRs(attrs utils.AttrRateCdrs, reply *string) error {
cdrsFltr, err := attrs.AsCDRsFilter(self.CdrSrv.Timezone())
if err != nil {
return utils.NewErrServerError(err)

View File

@@ -122,7 +122,7 @@ func TestV2CDRsPSQLProcessCdrRated(t *testing.T) {
Cost: 1.01, CostSource: "TestV2CDRsPSQLProcessCdrRated", Rated: true,
}
var reply string
if err := cdrsPsqlRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil {
if err := cdrsPsqlRpc.Call("CdrsV2.ProcessCDR", cdr, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
@@ -142,7 +142,7 @@ func TestV2CDRsPSQLProcessCdrRaw(t *testing.T) {
Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
}
var reply string
if err := cdrsPsqlRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil {
if err := cdrsPsqlRpc.Call("CdrsV2.ProcessCDR", cdr, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
@@ -229,7 +229,7 @@ func TestV2CDRsPSQLProcessPrepaidCdr(t *testing.T) {
}
tStart := time.Now()
for _, cdr := range cdrs {
if err := cdrsPsqlRpc.Call("CdrsV2.ProcessCdr", cdr, &reply); err != nil {
if err := cdrsPsqlRpc.Call("CdrsV2.ProcessCDR", cdr, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)

View File

@@ -122,11 +122,6 @@ func (self *CdrServer) RegisterHandlersToServer(server *utils.Server) {
server.RegisterHttpFunc("/freeswitch_json", fsCdrHandler)
}
// Used to internally process CDR
func (self *CdrServer) LocalProcessCdr(cdr *CDR) error {
return self.processCdr(cdr)
}
// Used to process external CDRs
func (self *CdrServer) ProcessExternalCdr(eCDR *ExternalCDR) error {
cdr, err := NewCDRFromExternalCDR(eCDR, self.cgrCfg.DefaultTimezone)
@@ -494,15 +489,17 @@ func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) err
return nil
}
// Internally used and called from CDRSv1
// Cached requests for HA setups
func (self *CdrServer) V1ProcessCDR(cdr *CDR, reply *string) error {
cacheKey := "ProcessCdr" + cdr.CGRID
cacheKey := "V1ProcessCDR" + cdr.CGRID + cdr.RunID
if item, err := self.getCache().Get(cacheKey); err == nil && item != nil {
if item.Value != nil {
*reply = item.Value.(string)
}
return item.Err
}
if err := self.LocalProcessCdr(cdr); err != nil {
if err := self.processCdr(cdr); err != nil {
self.getCache().Cache(cacheKey, &cache2go.CacheItem{Err: err})
return utils.NewErrServerError(err)
}
@@ -511,11 +508,6 @@ func (self *CdrServer) V1ProcessCDR(cdr *CDR, reply *string) error {
return nil
}
// Alias, deprecated after removing CdrServerV1.ProcessCdr
func (self *CdrServer) V1ProcessCdr(cdr *CDR, reply *string) error {
return self.V1ProcessCDR(cdr, reply)
}
// RPC method, differs from storeSMCost through it's signature
func (self *CdrServer) V1StoreSMCost(attr AttrCDRSStoreSMCost, reply *string) error {
if err := self.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil {

View File

@@ -298,7 +298,7 @@ func (sm *FSSessionManager) DisconnectSession(ev engine.Event, connId, notify st
func (sm *FSSessionManager) ProcessCdr(storedCdr *engine.CDR) error {
var reply string
if err := sm.cdrsrv.Call("CdrsV1.ProcessCdr", storedCdr, &reply); err != nil {
if err := sm.cdrsrv.Call("CdrsV1.ProcessCDR", storedCdr, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CGRID, storedCdr.OriginID, err.Error()))
}
return nil

View File

@@ -210,7 +210,7 @@ func (self *KamailioSessionManager) ProcessCdr(cdr *engine.CDR) error {
return nil
}
var reply string
if err := self.cdrsrv.Call("CdrsV1.ProcessCdr", cdr, &reply); err != nil {
if err := self.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", cdr.CGRID, cdr.OriginID, err.Error()))
}
return nil

View File

@@ -153,7 +153,7 @@ func (osm *OsipsSessionManager) Shutdown() error {
// Process the CDR with CDRS component
func (osm *OsipsSessionManager) ProcessCdr(storedCdr *engine.CDR) error {
var reply string
return osm.cdrsrv.Call("CdrsV1.ProcessCdr", storedCdr, &reply)
return osm.cdrsrv.Call("CdrsV1.ProcessCDR", storedCdr, &reply)
}
// Disconnects the session

View File

@@ -91,7 +91,7 @@ func (self *SMGeneric) ttlTerminate(s *SMGSession, tmtr *smgSessionTerminator) {
cdr := s.eventStart.AsStoredCdr(self.cgrCfg, self.timezone)
cdr.Usage = s.TotalUsage()
var reply string
self.cdrsrv.Call("CdrsV1.ProcessCdr", cdr, &reply)
self.cdrsrv.Call("CdrsV1.ProcessCDR", cdr, &reply)
}
func (self *SMGeneric) indexSession(uuid string, s *SMGSession) {
@@ -491,7 +491,7 @@ func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDu
func (self *SMGeneric) ProcessCDR(gev SMGenericEvent) error {
var reply string
if err := self.cdrsrv.Call("CdrsV1.ProcessCdr", gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil {
if err := self.cdrsrv.Call("CdrsV1.ProcessCDR", gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil {
return err
}
return nil