From f07a0cc8858154e4fff87098671b582dedc59b10 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 7 Aug 2013 06:20:56 +0200 Subject: [PATCH] Fixing mediator and related for postpaid rating into storDb --- cdrs/cdrs.go | 13 ++++---- data/storage/mysql/create_mediator_tables.sql | 4 +-- engine/storage_interface.go | 2 +- engine/storage_map.go | 4 +-- engine/storage_mongo.go | 4 +-- engine/storage_redis.go | 4 +-- engine/storage_sql.go | 26 ++++++++++------ mediator/mediator.go | 31 +++++++++++++------ sessionmanager/fsevent.go | 1 - 9 files changed, 52 insertions(+), 37 deletions(-) diff --git a/cdrs/cdrs.go b/cdrs/cdrs.go index 4ef805615..6da450eb4 100644 --- a/cdrs/cdrs.go +++ b/cdrs/cdrs.go @@ -37,14 +37,13 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { body, _ := ioutil.ReadAll(r.Body) if fsCdr, err := new(FSCdr).New(body); err == nil { storage.SetCdr(fsCdr) - if cfg.CDRSMediator == "internal" { - errMedi := medi.MediateDBCDR(fsCdr, storage) - if errMedi != nil { - engine.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", errMedi.Error())) + go func() { //FS will not send us hangup_complete until we have send back the answer to CDR, so we need to handle mediation async + if cfg.CDRSMediator == "internal" { + medi.MediateDBCDR(fsCdr, storage) + } else { + //TODO: use the connection to mediator } - } else { - //TODO: use the connection to mediator - } + } () } else { engine.Logger.Err(fmt.Sprintf("Could not create CDR entry: %v", err)) } diff --git a/data/storage/mysql/create_mediator_tables.sql b/data/storage/mysql/create_mediator_tables.sql index aa8df80af..51f31a799 100644 --- a/data/storage/mysql/create_mediator_tables.sql +++ b/data/storage/mysql/create_mediator_tables.sql @@ -6,8 +6,8 @@ CREATE TABLE `rated_cdrs` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cgrid` char(40) NOT NULL, `subject` varchar(64) NOT NULL, - `cost` double(20,4) default NULL, - `source` char(64) NOT NULL, + `cost` double(20,4) DEFAULT NULL, + `extra_info` text, PRIMARY KEY (`id`), UNIQUE KEY `costid` (`cgrid`,`subject`) ); diff --git a/engine/storage_interface.go b/engine/storage_interface.go index e8e5aedb1..865aa5be1 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -106,7 +106,7 @@ type DataStorage interface { SetActionTimings(string, ActionTimings) error GetAllActionTimings() (map[string]ActionTimings, error) SetCdr(utils.CDR) error - SetRatedCdr(utils.CDR, *CallCost) error + SetRatedCdr(utils.CDR, *CallCost, string) error GetAllRatedCdr() ([]utils.CDR, error) //GetAllActionTimingsLogs() (map[string]ActionsTimings, error) LogCallCost(uuid, source string, cc *CallCost) error diff --git a/engine/storage_map.go b/engine/storage_map.go index 81392e4fe..1f055862f 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -349,8 +349,8 @@ func (ms *MapStorage) SetCdr(utils.CDR) error { return nil } -func (ms *MapStorage) SetRatedCdr(utils.CDR, *CallCost) error { - return nil +func (ms *MapStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost, extraInfo string) error { + return errors.New(utils.ERR_NOT_IMPLEMENTED) } func (ms *MapStorage) GetAllRatedCdr() ([]utils.CDR, error) { diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index f75b6a971..313093139 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -380,8 +380,8 @@ func (ms *MongoStorage) SetCdr(utils.CDR) error { return nil } -func (ms *MongoStorage) SetRatedCdr(utils.CDR, *CallCost) error { - return nil +func (ms *MongoStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost, extraInfo string) error { + return errors.New(utils.ERR_NOT_IMPLEMENTED) } func (ms *MongoStorage) GetAllRatedCdr() ([]utils.CDR, error) { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index a1c41cf10..db131e70d 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -388,8 +388,8 @@ func (rs *RedisStorage) SetCdr(utils.CDR) error { return nil } -func (rs *RedisStorage) SetRatedCdr(utils.CDR, *CallCost) error { - return nil +func (rs *RedisStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost, extraInfo string) error { + return errors.New(utils.ERR_NOT_IMPLEMENTED) } func (rs *RedisStorage) GetAllRatedCdr() ([]utils.CDR, error) { diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 6257a54a5..fabcd4e00 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -848,13 +848,19 @@ func (self *SQLStorage) LogCallCost(uuid, source string, cc *CallCost) (err erro return } -func (self *SQLStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) { - //ToDo: cgrid instead of uuid - row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, cost, connect_fee, timespans, source FROM %s WHERE cgrid='%s' AND source='%s'", utils.TBL_COST_DETAILS, utils.FSCgrId(uuid), source)) - var cgrid, accid, src string +func (self *SQLStorage) GetCallCostLog(cgrid, source string) (cc *CallCost, err error) { + row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, cost, connect_fee, timespans, source FROM %s WHERE cgrid='%s' AND source='%s'", utils.TBL_COST_DETAILS, cgrid, source)) + var accid, src string var timespansJson string - err = row.Scan(&cgrid, &accid, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson, &src) - err = json.Unmarshal([]byte(timespansJson), cc.Timespans) + cc = &CallCost{Cost:-1} + err = row.Scan(&cgrid, &accid, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Account, &cc.Subject, + &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson, &src) + fmt.Println(cgrid, accid, cc, timespansJson, src) + err = json.Unmarshal([]byte(timespansJson), &cc.Timespans) + fmt.Println("err", err) + if err != nil { + return nil, err + } return } @@ -903,14 +909,14 @@ func (self *SQLStorage) SetCdr(cdr utils.CDR) (err error) { return } -func (self *SQLStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost) (err error) { +func (self *SQLStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost, extraInfo string) (err error) { // ToDo: Add here source and subject - _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, subject, cost, source) VALUES ('%s', '%s', %f, '%s')", + _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, subject, cost, extra_info) VALUES ('%s', '%s', %f, '%s')", utils.TBL_RATED_CDRS, cdr.GetCgrId(), - "subject", + cdr.GetSubject(), cc.Cost+cc.ConnectFee, - "cdrsrc")) + extraInfo)) if err != nil { Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) } diff --git a/mediator/mediator.go b/mediator/mediator.go index 79f0f00f9..b1685cbe9 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -165,7 +165,14 @@ func (self *Mediator) TrackCDRFiles() (err error) { // Retrive the cost from logging database func (self *Mediator) getCostsFromDB(cdr utils.CDR) (cc *engine.CallCost, err error) { - return self.storDb.GetCallCostLog(cdr.GetCgrId(), engine.SESSION_MANAGER_SOURCE) + for i := 0; i < 3; i++ { // Mechanism to avoid concurrency between SessionManager writing the costs and mediator picking them up + cc, err = self.storDb.GetCallCostLog(cdr.GetCgrId(), engine.SESSION_MANAGER_SOURCE) //ToDo: What are we getting when there is no log? + if cc != nil { // There were no errors, chances are that we got what we are looking for + break + } + time.Sleep(time.Duration(i) * time.Second) + } + return } // Retrive the cost from engine @@ -267,20 +274,24 @@ func (self *Mediator) MediateCSVCDR(cdrfn string) (err error) { } func (self *Mediator) MediateDBCDR(cdr utils.CDR, db engine.DataStorage) error { - var cc *engine.CallCost + var qryCC *engine.CallCost + cc := &engine.CallCost{Cost:-1} var errCost error if cdr.GetReqType() == utils.PREPAID || cdr.GetReqType() == utils.POSTPAID { // Should be previously calculated and stored in DB - cc, errCost = self.getCostsFromDB(cdr) + qryCC, errCost = self.getCostsFromDB(cdr) } else { - cc, errCost = self.getCostsFromRater(cdr) + qryCC, errCost = self.getCostsFromRater(cdr) } - cost := "-1" - if errCost != nil || cc == nil { - engine.Logger.Err(fmt.Sprintf(" Could not calculate price for cgrid: <%s>, err: <%s>, cost: <%v>", cdr.GetCgrId(), errCost.Error(), cc)) + if errCost != nil || qryCC == nil { + engine.Logger.Err(fmt.Sprintf(" Could not calculate price for cgrid: <%s>, err: <%s>, cost: <%v>", cdr.GetCgrId(), errCost.Error(), qryCC)) } else { - cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64) - engine.Logger.Debug(fmt.Sprintf(" Calculated for cgrid:%s, cost: %v", cdr.GetCgrId(), cost)) + cc = qryCC + engine.Logger.Debug(fmt.Sprintf(" Calculated for cgrid:%s, cost: %f", cdr.GetCgrId(), cc.ConnectFee+cc.Cost)) } - return self.storDb.SetRatedCdr(cdr, cc) + extraInfo := "" + if errCost != nil { + extraInfo = errCost.Error() + } + return self.storDb.SetRatedCdr(cdr, cc, extraInfo) } diff --git a/sessionmanager/fsevent.go b/sessionmanager/fsevent.go index fd143d98e..91f669445 100644 --- a/sessionmanager/fsevent.go +++ b/sessionmanager/fsevent.go @@ -80,7 +80,6 @@ func (fsev FSEvent) GetName() string { } func (fsev FSEvent) GetDirection() string { //TODO: implement direction - fmt.Println("Returning direction *out") return "*out" //return fsev[DIRECTION] }