From 3d8be3c60b84bd3e98ee0f232f325cef47987481 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 17 Jun 2015 19:34:41 +0200 Subject: [PATCH] Adding async processing to the CDRs out of http interface, lock checkDuplicate in CDRS --- apier/v1/apier_local_test.go | 1 + engine/cdrs.go | 37 ++++++++++++++++++++---------------- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 881b516d6..022699b90 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -1380,6 +1380,7 @@ func TestApierCdrServer(t *testing.T) { t.Error(err.Error()) } } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) } /* diff --git a/engine/cdrs.go b/engine/cdrs.go index f5ba15edb..96208f113 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -22,6 +22,7 @@ import ( "fmt" "io/ioutil" "net/http" + "sync" "time" "github.com/cgrates/cgrates/config" @@ -38,10 +39,11 @@ func cgrCdrHandler(w http.ResponseWriter, r *http.Request) { Logger.Err(fmt.Sprintf(" Could not create CDR entry: %s", err.Error())) return } - if err := cdrServer.rateStoreStatsReplicate(cgrCdr.AsStoredCdr()); err != nil { - Logger.Err(fmt.Sprintf(" Errors when storing CDR entry: %s", err.Error())) - return - } + go func() { + if err := cdrServer.rateStoreStatsReplicate(cgrCdr.AsStoredCdr()); err != nil { + Logger.Err(fmt.Sprintf(" Errors when storing CDR entry: %s", err.Error())) + } + }() } // Handler for fs http @@ -52,14 +54,15 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { Logger.Err(fmt.Sprintf(" Could not create CDR entry: %s", err.Error())) return } - if err := cdrServer.rateStoreStatsReplicate(fsCdr.AsStoredCdr()); err != nil { - Logger.Err(fmt.Sprintf(" Errors when storing CDR entry: %s", err.Error())) - return - } + go func() { + if err := cdrServer.rateStoreStatsReplicate(fsCdr.AsStoredCdr()); err != nil { + Logger.Err(fmt.Sprintf(" Errors when storing CDR entry: %s", err.Error())) + } + }() } func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, stats StatsInterface) (*CdrServer, error) { - return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, stats: stats}, nil + return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, stats: stats, callCostMutex: new(sync.RWMutex)}, nil /* if cfg.CDRSStats != "" { if cfg.CDRSStats != utils.INTERNAL { @@ -77,10 +80,11 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, s } type CdrServer struct { - cgrCfg *config.CGRConfig - cdrDb CdrStorage - rater Connector - stats StatsInterface + cgrCfg *config.CGRConfig + cdrDb CdrStorage + rater Connector + stats StatsInterface + callCostMutex *sync.RWMutex } func (self *CdrServer) RegisterHanlersToServer(server *Server) { @@ -115,6 +119,8 @@ type CallCostLog struct { func (self *CdrServer) LogCallCost(ccl *CallCostLog) error { Logger.Debug(fmt.Sprintf(" DEBUG: LogCallCost, callCostLog: %+v, time: %v", ccl, time.Now())) if ccl.CheckDuplicate { + self.callCostMutex.Lock() // Avoid writing between checkDuplicate and logCallCost + defer self.callCostMutex.Unlock() cc, err := self.cdrDb.GetCallCostLog(ccl.CgrId, ccl.Source, ccl.RunId) Logger.Debug(fmt.Sprintf(" DEBUG: GetCallCostLog, cgrId: %s, source: %s: runId: %s - received: cc: %v, error: %v, time: %v", ccl.CgrId, ccl.Source, ccl.RunId, cc, err, time.Now())) if err != nil && err != gorm.RecordNotFound { @@ -307,15 +313,14 @@ func (self *CdrServer) rateCDR(storedCdr *StoredCdr) error { var err error if utils.IsSliceMember([]string{utils.META_PREPAID, utils.PREPAID}, storedCdr.ReqType) { // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards // Should be previously calculated and stored in DB - //delay := utils.Fib() + delay := utils.Fib() for i := 0; i < 4; i++ { qryCC, err = self.cdrDb.GetCallCostLog(storedCdr.CgrId, utils.SESSION_MANAGER_SOURCE, storedCdr.MediationRunId) Logger.Debug(fmt.Sprintf(" DEBUG: GetCallCostLog, storedCdr: %+v, callCost: %+v, error: %+v, run: %d, time: %v", storedCdr, qryCC, err, i, time.Now())) if err == nil { break } - //time.Sleep(delay()) - time.Sleep(time.Duration(i+1) * time.Second) + time.Sleep(delay()) } if err != nil && err == gorm.RecordNotFound { //calculate CDR as for pseudoprepaid Logger.Warning(fmt.Sprintf(" WARNING: Could not find CallCostLog for cgrid: %s, source: %s, runid: %s, will recalculate", storedCdr.CgrId, utils.SESSION_MANAGER_SOURCE, storedCdr.MediationRunId))