mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 13:19:53 +05:00
Fixup mediation with rerating
This commit is contained in:
@@ -646,7 +646,7 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e
|
||||
if err != nil {
|
||||
Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err))
|
||||
}
|
||||
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source, runid, cost_time)VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %f, %f, '%s','%s','%s',now()) ON DUPLICATE KEY UPDATE direction=values(direction), tenant=values(tenant), tor=values(tor), account=values(account), subject=values(subject), destination=values(destination), connect_fee=values(connect_fee), cost=values(cost), timespans=values(timespans), source=values(source), cost_time=now()",
|
||||
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, accid, direction, tenant, tor, account, subject, destination, cost, timespans, source, runid, cost_time)VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %f, '%s','%s','%s',now()) ON DUPLICATE KEY UPDATE direction=values(direction), tenant=values(tenant), tor=values(tor), account=values(account), subject=values(subject), destination=values(destination), cost=values(cost), timespans=values(timespans), source=values(source), cost_time=now()",
|
||||
utils.TBL_COST_DETAILS,
|
||||
utils.FSCgrId(uuid),
|
||||
uuid,
|
||||
@@ -667,7 +667,7 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e
|
||||
}
|
||||
|
||||
func (self *SQLStorage) GetCallCostLog(cgrid, source, runid string) (cc *CallCost, err error) {
|
||||
row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source FROM %s WHERE cgrid='%s' AND source='%s' AND runid='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid))
|
||||
row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, cost, timespans, source FROM %s WHERE cgrid='%s' AND source='%s' AND runid='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid))
|
||||
var accid, src string
|
||||
var timespansJson string
|
||||
cc = &CallCost{Cost: -1}
|
||||
@@ -739,7 +739,9 @@ func (self *SQLStorage) SetRatedCdr(storedCdr *utils.StoredCdr, extraInfo string
|
||||
return
|
||||
}
|
||||
|
||||
// Return a slice of CDRs from storDb using optional filters.
|
||||
// Return a slice of CDRs from storDb using optional filters.a
|
||||
// ignoreErr - do not consider cdrs with rating errors
|
||||
// ignoreRated - do not consider cdrs which were already rated, including here the ones with errors
|
||||
func (self *SQLStorage) GetStoredCdrs(timeStart, timeEnd time.Time, ignoreErr, ignoreRated bool) ([]*utils.StoredCdr, error) {
|
||||
var cdrs []*utils.StoredCdr
|
||||
q := fmt.Sprintf("SELECT %s.cgrid,accid,cdrhost,cdrsource,reqtype,direction,tenant,tor,account,%s.subject,destination,answer_time,duration,extra_fields,runid,cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid", utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_RATED_CDRS, utils.TBL_CDRS_PRIMARY, utils.TBL_RATED_CDRS)
|
||||
@@ -756,17 +758,20 @@ func (self *SQLStorage) GetStoredCdrs(timeStart, timeEnd time.Time, ignoreErr, i
|
||||
}
|
||||
fltr += fmt.Sprintf(" answer_time<'%d'", timeEnd)
|
||||
}
|
||||
if ignoreErr {
|
||||
if len(fltr) != 0 {
|
||||
fltr += " AND "
|
||||
}
|
||||
fltr += "cost>-1"
|
||||
}
|
||||
if ignoreRated {
|
||||
if len(fltr) != 0 {
|
||||
fltr += " AND "
|
||||
}
|
||||
fltr += "cost<=0"
|
||||
if ignoreErr {
|
||||
fltr += "cost IS NULL"
|
||||
} else {
|
||||
fltr += "(cost=-1 OR cost IS NULL)"
|
||||
}
|
||||
} else if ignoreErr {
|
||||
if len(fltr) != 0 {
|
||||
fltr += " AND "
|
||||
}
|
||||
fltr += "(cost!=-1 OR cost IS NULL)"
|
||||
}
|
||||
if len(fltr) != 0 {
|
||||
q += fmt.Sprintf(" WHERE %s", fltr)
|
||||
|
||||
@@ -130,12 +130,10 @@ func (self *Mediator) rateCDR(cdr *utils.StoredCdr) error {
|
||||
|
||||
// Forks original CDR based on original request plus runIds for extra mediation
|
||||
func (self *Mediator) RateCdr(dbcdr utils.RawCDR) error {
|
||||
//engine.Logger.Debug(fmt.Sprintf("Mediating rawCdr: %v, duration: %d",dbcdr, dbcdr.GetDuration()))
|
||||
rtCdr, err := utils.NewStoredCdrFromRawCDR(dbcdr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
//engine.Logger.Debug(fmt.Sprintf("Have converted raw into rated: %v", rtCdr))
|
||||
cdrs := []*utils.StoredCdr{rtCdr} // Start with initial dbcdr, will add here all to be mediated
|
||||
for runIdx, runId := range self.cgrCfg.MediatorRunIds {
|
||||
forkedCdr, err := dbcdr.AsStoredCdr(self.cgrCfg.MediatorRunIds[runIdx], self.cgrCfg.MediatorReqTypeFields[runIdx], self.cgrCfg.MediatorDirectionFields[runIdx],
|
||||
|
||||
@@ -138,10 +138,10 @@ func TestPostCdrs(t *testing.T) {
|
||||
}
|
||||
httpClient := new(http.Client)
|
||||
cdrForm1 := url.Values{"accid": []string{"dsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"},
|
||||
"tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"},
|
||||
"tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"+4986517174963"},
|
||||
"answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}}
|
||||
cdrForm2 := url.Values{"accid": []string{"adsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"},
|
||||
"tenant": []string{"itsyscom.com"}, "tor": []string{"call"}, "account": []string{"dan"}, "subject": []string{"dan"}, "destination": []string{"1002"},
|
||||
"tenant": []string{"itsyscom.com"}, "tor": []string{"call"}, "account": []string{"1003"}, "subject": []string{"1003"}, "destination": []string{"+4986517174964"},
|
||||
"answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}}
|
||||
for _, cdrForm := range []url.Values{cdrForm1, cdrForm2} {
|
||||
cdrForm.Set(utils.CDRSOURCE, engine.TEST_SQL)
|
||||
@@ -151,9 +151,14 @@ func TestPostCdrs(t *testing.T) {
|
||||
}
|
||||
if storedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, false); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(storedCdrs) != 2 {
|
||||
} else if len(storedCdrs) != 2 { // Make sure CDRs made it into StorDb
|
||||
t.Error(fmt.Sprintf("Unexpected number of CDRs stored: %d", len(storedCdrs)))
|
||||
}
|
||||
if nonErrorCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, true, false); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(nonErrorCdrs) != 0 { // Just two of them should be without errors
|
||||
t.Error(fmt.Sprintf("Unexpected number of CDRs stored: %d", len(nonErrorCdrs)))
|
||||
}
|
||||
}
|
||||
|
||||
// Directly inject CDRs into storDb
|
||||
@@ -161,17 +166,27 @@ func TestInjectCdrs(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
cgrCdr1 := utils.CgrCdr{"accid": "aaaaadsafdsaf", "cdr_source": engine.TEST_SQL, "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out",
|
||||
"tenant": "cgrates.org", "tor": "call", "account": "1001", "subject": "1001", "destination": "1002",
|
||||
cgrCdr1 := utils.CgrCdr{"accid": "aaaaadsafdsaf", "cdrsource": engine.TEST_SQL, "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out",
|
||||
"tenant": "cgrates.org", "tor": "call", "account": "dan", "subject": "dan", "destination": "+4986517174963",
|
||||
"answer_time": "2013-11-07T08:42:26Z", "duration": "10"}
|
||||
if err := cdrStor.SetCdr(cgrCdr1); err != nil {
|
||||
t.Error(err)
|
||||
cgrCdr2 := utils.CgrCdr{"accid": "baaaadsafdsaf", "cdrsource": engine.TEST_SQL, "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out",
|
||||
"tenant": "cgrates.org", "tor": "call", "account": "dan", "subject": "dan", "destination": "+4986517173964",
|
||||
"answer_time": "2013-11-07T09:42:26Z", "duration": "20"}
|
||||
for _, cdr := range []utils.CgrCdr{ cgrCdr1, cgrCdr2} {
|
||||
if err := cdrStor.SetCdr(cdr); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
if storedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, false); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(storedCdrs) != 3 {
|
||||
} else if len(storedCdrs) != 4 { // Make sure CDRs made it into StorDb
|
||||
t.Error(fmt.Sprintf("Unexpected number of CDRs stored: %d", len(storedCdrs)))
|
||||
}
|
||||
if nonRatedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, true, true); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(nonRatedCdrs) != 2 { // Just two of them should be non-rated
|
||||
t.Error(fmt.Sprintf("Unexpected number of CDRs non-rated: %d", len(nonRatedCdrs)))
|
||||
}
|
||||
}
|
||||
|
||||
// Test here LoadTariffPlanFromFolder
|
||||
@@ -199,6 +214,26 @@ func TestRateCdrs(t *testing.T) {
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Unexpected reply: %s", reply)
|
||||
}
|
||||
if nonRatedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, true, true); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(nonRatedCdrs) != 0 { // Just two of them should be non-rated
|
||||
t.Error(fmt.Sprintf("Unexpected number of CDRs non-rated: %d", len(nonRatedCdrs)))
|
||||
}
|
||||
if errRatedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, true); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(errRatedCdrs) != 2 { // The first 2 with errors should be still there before rerating
|
||||
t.Error(fmt.Sprintf("Unexpected number of CDRs with errors: %d", len(errRatedCdrs)))
|
||||
}
|
||||
if err := cgrRpc.Call("MediatorV1.RateCdrs", utils.AttrRateCdrs{RerateErrors: true}, &reply); err != nil {
|
||||
t.Error(err.Error())
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Unexpected reply: %s", reply)
|
||||
}
|
||||
if errRatedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, true); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(errRatedCdrs) != 1 { // One CDR with errors should be fixed now by rerating
|
||||
t.Error(fmt.Sprintf("Unexpected number of CDRs with errors: %d", len(errRatedCdrs)))
|
||||
}
|
||||
}
|
||||
|
||||
// Simply kill the engine after we are done with tests within this file
|
||||
|
||||
Reference in New Issue
Block a user