Adding async processing to the CDRs out of http interface, lock checkDuplicate in CDRS

This commit is contained in:
DanB
2015-06-17 19:34:41 +02:00
parent 07f43532ed
commit 3d8be3c60b
2 changed files with 22 additions and 16 deletions

View File

@@ -1380,6 +1380,7 @@ func TestApierCdrServer(t *testing.T) {
t.Error(err.Error())
}
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
}
/*

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
"github.com/cgrates/cgrates/config"
@@ -38,10 +39,11 @@ func cgrCdrHandler(w http.ResponseWriter, r *http.Request) {
Logger.Err(fmt.Sprintf("<CDRS> Could not create CDR entry: %s", err.Error()))
return
}
if err := cdrServer.rateStoreStatsReplicate(cgrCdr.AsStoredCdr()); err != nil {
Logger.Err(fmt.Sprintf("<CDRS> Errors when storing CDR entry: %s", err.Error()))
return
}
go func() {
if err := cdrServer.rateStoreStatsReplicate(cgrCdr.AsStoredCdr()); err != nil {
Logger.Err(fmt.Sprintf("<CDRS> Errors when storing CDR entry: %s", err.Error()))
}
}()
}
// Handler for fs http
@@ -52,14 +54,15 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
Logger.Err(fmt.Sprintf("<CDRS> Could not create CDR entry: %s", err.Error()))
return
}
if err := cdrServer.rateStoreStatsReplicate(fsCdr.AsStoredCdr()); err != nil {
Logger.Err(fmt.Sprintf("<CDRS> Errors when storing CDR entry: %s", err.Error()))
return
}
go func() {
if err := cdrServer.rateStoreStatsReplicate(fsCdr.AsStoredCdr()); err != nil {
Logger.Err(fmt.Sprintf("<CDRS> Errors when storing CDR entry: %s", err.Error()))
}
}()
}
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, stats StatsInterface) (*CdrServer, error) {
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, stats: stats}, nil
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, stats: stats, callCostMutex: new(sync.RWMutex)}, nil
/*
if cfg.CDRSStats != "" {
if cfg.CDRSStats != utils.INTERNAL {
@@ -77,10 +80,11 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, s
}
type CdrServer struct {
cgrCfg *config.CGRConfig
cdrDb CdrStorage
rater Connector
stats StatsInterface
cgrCfg *config.CGRConfig
cdrDb CdrStorage
rater Connector
stats StatsInterface
callCostMutex *sync.RWMutex
}
func (self *CdrServer) RegisterHanlersToServer(server *Server) {
@@ -115,6 +119,8 @@ type CallCostLog struct {
func (self *CdrServer) LogCallCost(ccl *CallCostLog) error {
Logger.Debug(fmt.Sprintf("<Cdrs> DEBUG: LogCallCost, callCostLog: %+v, time: %v", ccl, time.Now()))
if ccl.CheckDuplicate {
self.callCostMutex.Lock() // Avoid writing between checkDuplicate and logCallCost
defer self.callCostMutex.Unlock()
cc, err := self.cdrDb.GetCallCostLog(ccl.CgrId, ccl.Source, ccl.RunId)
Logger.Debug(fmt.Sprintf("<Cdrs> DEBUG: GetCallCostLog, cgrId: %s, source: %s: runId: %s - received: cc: %v, error: %v, time: %v", ccl.CgrId, ccl.Source, ccl.RunId, cc, err, time.Now()))
if err != nil && err != gorm.RecordNotFound {
@@ -307,15 +313,14 @@ func (self *CdrServer) rateCDR(storedCdr *StoredCdr) error {
var err error
if utils.IsSliceMember([]string{utils.META_PREPAID, utils.PREPAID}, storedCdr.ReqType) { // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards
// Should be previously calculated and stored in DB
//delay := utils.Fib()
delay := utils.Fib()
for i := 0; i < 4; i++ {
qryCC, err = self.cdrDb.GetCallCostLog(storedCdr.CgrId, utils.SESSION_MANAGER_SOURCE, storedCdr.MediationRunId)
Logger.Debug(fmt.Sprintf("<Cdrs> DEBUG: GetCallCostLog, storedCdr: %+v, callCost: %+v, error: %+v, run: %d, time: %v", storedCdr, qryCC, err, i, time.Now()))
if err == nil {
break
}
//time.Sleep(delay())
time.Sleep(time.Duration(i+1) * time.Second)
time.Sleep(delay())
}
if err != nil && err == gorm.RecordNotFound { //calculate CDR as for pseudoprepaid
Logger.Warning(fmt.Sprintf("<Cdrs> WARNING: Could not find CallCostLog for cgrid: %s, source: %s, runid: %s, will recalculate", storedCdr.CgrId, utils.SESSION_MANAGER_SOURCE, storedCdr.MediationRunId))