mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Fixing mediator and related for postpaid rating into storDb
This commit is contained in:
13
cdrs/cdrs.go
13
cdrs/cdrs.go
@@ -37,14 +37,13 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
|
||||
body, _ := ioutil.ReadAll(r.Body)
|
||||
if fsCdr, err := new(FSCdr).New(body); err == nil {
|
||||
storage.SetCdr(fsCdr)
|
||||
if cfg.CDRSMediator == "internal" {
|
||||
errMedi := medi.MediateDBCDR(fsCdr, storage)
|
||||
if errMedi != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", errMedi.Error()))
|
||||
go func() { //FS will not send us hangup_complete until we have send back the answer to CDR, so we need to handle mediation async
|
||||
if cfg.CDRSMediator == "internal" {
|
||||
medi.MediateDBCDR(fsCdr, storage)
|
||||
} else {
|
||||
//TODO: use the connection to mediator
|
||||
}
|
||||
} else {
|
||||
//TODO: use the connection to mediator
|
||||
}
|
||||
} ()
|
||||
} else {
|
||||
engine.Logger.Err(fmt.Sprintf("Could not create CDR entry: %v", err))
|
||||
}
|
||||
|
||||
@@ -6,8 +6,8 @@ CREATE TABLE `rated_cdrs` (
|
||||
`id` int(11) NOT NULL AUTO_INCREMENT,
|
||||
`cgrid` char(40) NOT NULL,
|
||||
`subject` varchar(64) NOT NULL,
|
||||
`cost` double(20,4) default NULL,
|
||||
`source` char(64) NOT NULL,
|
||||
`cost` double(20,4) DEFAULT NULL,
|
||||
`extra_info` text,
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `costid` (`cgrid`,`subject`)
|
||||
);
|
||||
|
||||
@@ -106,7 +106,7 @@ type DataStorage interface {
|
||||
SetActionTimings(string, ActionTimings) error
|
||||
GetAllActionTimings() (map[string]ActionTimings, error)
|
||||
SetCdr(utils.CDR) error
|
||||
SetRatedCdr(utils.CDR, *CallCost) error
|
||||
SetRatedCdr(utils.CDR, *CallCost, string) error
|
||||
GetAllRatedCdr() ([]utils.CDR, error)
|
||||
//GetAllActionTimingsLogs() (map[string]ActionsTimings, error)
|
||||
LogCallCost(uuid, source string, cc *CallCost) error
|
||||
|
||||
@@ -349,8 +349,8 @@ func (ms *MapStorage) SetCdr(utils.CDR) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MapStorage) SetRatedCdr(utils.CDR, *CallCost) error {
|
||||
return nil
|
||||
func (ms *MapStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost, extraInfo string) error {
|
||||
return errors.New(utils.ERR_NOT_IMPLEMENTED)
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetAllRatedCdr() ([]utils.CDR, error) {
|
||||
|
||||
@@ -380,8 +380,8 @@ func (ms *MongoStorage) SetCdr(utils.CDR) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetRatedCdr(utils.CDR, *CallCost) error {
|
||||
return nil
|
||||
func (ms *MongoStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost, extraInfo string) error {
|
||||
return errors.New(utils.ERR_NOT_IMPLEMENTED)
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetAllRatedCdr() ([]utils.CDR, error) {
|
||||
|
||||
@@ -388,8 +388,8 @@ func (rs *RedisStorage) SetCdr(utils.CDR) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetRatedCdr(utils.CDR, *CallCost) error {
|
||||
return nil
|
||||
func (rs *RedisStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost, extraInfo string) error {
|
||||
return errors.New(utils.ERR_NOT_IMPLEMENTED)
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetAllRatedCdr() ([]utils.CDR, error) {
|
||||
|
||||
@@ -848,13 +848,19 @@ func (self *SQLStorage) LogCallCost(uuid, source string, cc *CallCost) (err erro
|
||||
return
|
||||
}
|
||||
|
||||
func (self *SQLStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) {
|
||||
//ToDo: cgrid instead of uuid
|
||||
row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, cost, connect_fee, timespans, source FROM %s WHERE cgrid='%s' AND source='%s'", utils.TBL_COST_DETAILS, utils.FSCgrId(uuid), source))
|
||||
var cgrid, accid, src string
|
||||
func (self *SQLStorage) GetCallCostLog(cgrid, source string) (cc *CallCost, err error) {
|
||||
row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, cost, connect_fee, timespans, source FROM %s WHERE cgrid='%s' AND source='%s'", utils.TBL_COST_DETAILS, cgrid, source))
|
||||
var accid, src string
|
||||
var timespansJson string
|
||||
err = row.Scan(&cgrid, &accid, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson, &src)
|
||||
err = json.Unmarshal([]byte(timespansJson), cc.Timespans)
|
||||
cc = &CallCost{Cost:-1}
|
||||
err = row.Scan(&cgrid, &accid, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Account, &cc.Subject,
|
||||
&cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson, &src)
|
||||
fmt.Println(cgrid, accid, cc, timespansJson, src)
|
||||
err = json.Unmarshal([]byte(timespansJson), &cc.Timespans)
|
||||
fmt.Println("err", err)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -903,14 +909,14 @@ func (self *SQLStorage) SetCdr(cdr utils.CDR) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (self *SQLStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost) (err error) {
|
||||
func (self *SQLStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost, extraInfo string) (err error) {
|
||||
// ToDo: Add here source and subject
|
||||
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, subject, cost, source) VALUES ('%s', '%s', %f, '%s')",
|
||||
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, subject, cost, extra_info) VALUES ('%s', '%s', %f, '%s')",
|
||||
utils.TBL_RATED_CDRS,
|
||||
cdr.GetCgrId(),
|
||||
"subject",
|
||||
cdr.GetSubject(),
|
||||
cc.Cost+cc.ConnectFee,
|
||||
"cdrsrc"))
|
||||
extraInfo))
|
||||
if err != nil {
|
||||
Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err))
|
||||
}
|
||||
|
||||
@@ -165,7 +165,14 @@ func (self *Mediator) TrackCDRFiles() (err error) {
|
||||
|
||||
// Retrive the cost from logging database
|
||||
func (self *Mediator) getCostsFromDB(cdr utils.CDR) (cc *engine.CallCost, err error) {
|
||||
return self.storDb.GetCallCostLog(cdr.GetCgrId(), engine.SESSION_MANAGER_SOURCE)
|
||||
for i := 0; i < 3; i++ { // Mechanism to avoid concurrency between SessionManager writing the costs and mediator picking them up
|
||||
cc, err = self.storDb.GetCallCostLog(cdr.GetCgrId(), engine.SESSION_MANAGER_SOURCE) //ToDo: What are we getting when there is no log?
|
||||
if cc != nil { // There were no errors, chances are that we got what we are looking for
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Duration(i) * time.Second)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Retrive the cost from engine
|
||||
@@ -267,20 +274,24 @@ func (self *Mediator) MediateCSVCDR(cdrfn string) (err error) {
|
||||
}
|
||||
|
||||
func (self *Mediator) MediateDBCDR(cdr utils.CDR, db engine.DataStorage) error {
|
||||
var cc *engine.CallCost
|
||||
var qryCC *engine.CallCost
|
||||
cc := &engine.CallCost{Cost:-1}
|
||||
var errCost error
|
||||
if cdr.GetReqType() == utils.PREPAID || cdr.GetReqType() == utils.POSTPAID {
|
||||
// Should be previously calculated and stored in DB
|
||||
cc, errCost = self.getCostsFromDB(cdr)
|
||||
qryCC, errCost = self.getCostsFromDB(cdr)
|
||||
} else {
|
||||
cc, errCost = self.getCostsFromRater(cdr)
|
||||
qryCC, errCost = self.getCostsFromRater(cdr)
|
||||
}
|
||||
cost := "-1"
|
||||
if errCost != nil || cc == nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Mediator> Could not calculate price for cgrid: <%s>, err: <%s>, cost: <%v>", cdr.GetCgrId(), errCost.Error(), cc))
|
||||
if errCost != nil || qryCC == nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Mediator> Could not calculate price for cgrid: <%s>, err: <%s>, cost: <%v>", cdr.GetCgrId(), errCost.Error(), qryCC))
|
||||
} else {
|
||||
cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64)
|
||||
engine.Logger.Debug(fmt.Sprintf("<Mediator> Calculated for cgrid:%s, cost: %v", cdr.GetCgrId(), cost))
|
||||
cc = qryCC
|
||||
engine.Logger.Debug(fmt.Sprintf("<Mediator> Calculated for cgrid:%s, cost: %f", cdr.GetCgrId(), cc.ConnectFee+cc.Cost))
|
||||
}
|
||||
return self.storDb.SetRatedCdr(cdr, cc)
|
||||
extraInfo := ""
|
||||
if errCost != nil {
|
||||
extraInfo = errCost.Error()
|
||||
}
|
||||
return self.storDb.SetRatedCdr(cdr, cc, extraInfo)
|
||||
}
|
||||
|
||||
@@ -80,7 +80,6 @@ func (fsev FSEvent) GetName() string {
|
||||
}
|
||||
func (fsev FSEvent) GetDirection() string {
|
||||
//TODO: implement direction
|
||||
fmt.Println("Returning direction *out")
|
||||
return "*out"
|
||||
//return fsev[DIRECTION]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user