From 9042c984928f5a4ee2b45f1b8a6039ce75b01f0f Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 11 Feb 2014 12:57:00 +0100 Subject: [PATCH] Fixup mediation with rerating --- engine/storage_sql.go | 25 +++++++++------- mediator/mediator.go | 2 -- mediator/mediator_local_test.go | 51 +++++++++++++++++++++++++++------ 3 files changed, 58 insertions(+), 20 deletions(-) diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 52a8e9a29..b837c2ecf 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -646,7 +646,7 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e if err != nil { Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err)) } - _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source, runid, cost_time)VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %f, %f, '%s','%s','%s',now()) ON DUPLICATE KEY UPDATE direction=values(direction), tenant=values(tenant), tor=values(tor), account=values(account), subject=values(subject), destination=values(destination), connect_fee=values(connect_fee), cost=values(cost), timespans=values(timespans), source=values(source), cost_time=now()", + _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, accid, direction, tenant, tor, account, subject, destination, cost, timespans, source, runid, cost_time)VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %f, '%s','%s','%s',now()) ON DUPLICATE KEY UPDATE direction=values(direction), tenant=values(tenant), tor=values(tor), account=values(account), subject=values(subject), destination=values(destination), cost=values(cost), timespans=values(timespans), source=values(source), cost_time=now()", utils.TBL_COST_DETAILS, utils.FSCgrId(uuid), uuid, @@ -667,7 +667,7 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e } func (self *SQLStorage) GetCallCostLog(cgrid, source, runid string) (cc *CallCost, err error) { - row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source FROM %s WHERE cgrid='%s' AND source='%s' AND runid='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid)) + row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, cost, timespans, source FROM %s WHERE cgrid='%s' AND source='%s' AND runid='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid)) var accid, src string var timespansJson string cc = &CallCost{Cost: -1} @@ -739,7 +739,9 @@ func (self *SQLStorage) SetRatedCdr(storedCdr *utils.StoredCdr, extraInfo string return } -// Return a slice of CDRs from storDb using optional filters. +// Return a slice of CDRs from storDb using optional filters.a +// ignoreErr - do not consider cdrs with rating errors +// ignoreRated - do not consider cdrs which were already rated, including here the ones with errors func (self *SQLStorage) GetStoredCdrs(timeStart, timeEnd time.Time, ignoreErr, ignoreRated bool) ([]*utils.StoredCdr, error) { var cdrs []*utils.StoredCdr q := fmt.Sprintf("SELECT %s.cgrid,accid,cdrhost,cdrsource,reqtype,direction,tenant,tor,account,%s.subject,destination,answer_time,duration,extra_fields,runid,cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid", utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_RATED_CDRS, utils.TBL_CDRS_PRIMARY, utils.TBL_RATED_CDRS) @@ -756,17 +758,20 @@ func (self *SQLStorage) GetStoredCdrs(timeStart, timeEnd time.Time, ignoreErr, i } fltr += fmt.Sprintf(" answer_time<'%d'", timeEnd) } - if ignoreErr { - if len(fltr) != 0 { - fltr += " AND " - } - fltr += "cost>-1" - } if ignoreRated { if len(fltr) != 0 { fltr += " AND " } - fltr += "cost<=0" + if ignoreErr { + fltr += "cost IS NULL" + } else { + fltr += "(cost=-1 OR cost IS NULL)" + } + } else if ignoreErr { + if len(fltr) != 0 { + fltr += " AND " + } + fltr += "(cost!=-1 OR cost IS NULL)" } if len(fltr) != 0 { q += fmt.Sprintf(" WHERE %s", fltr) diff --git a/mediator/mediator.go b/mediator/mediator.go index 12ec519ed..cc34b6127 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -130,12 +130,10 @@ func (self *Mediator) rateCDR(cdr *utils.StoredCdr) error { // Forks original CDR based on original request plus runIds for extra mediation func (self *Mediator) RateCdr(dbcdr utils.RawCDR) error { - //engine.Logger.Debug(fmt.Sprintf("Mediating rawCdr: %v, duration: %d",dbcdr, dbcdr.GetDuration())) rtCdr, err := utils.NewStoredCdrFromRawCDR(dbcdr) if err != nil { return err } - //engine.Logger.Debug(fmt.Sprintf("Have converted raw into rated: %v", rtCdr)) cdrs := []*utils.StoredCdr{rtCdr} // Start with initial dbcdr, will add here all to be mediated for runIdx, runId := range self.cgrCfg.MediatorRunIds { forkedCdr, err := dbcdr.AsStoredCdr(self.cgrCfg.MediatorRunIds[runIdx], self.cgrCfg.MediatorReqTypeFields[runIdx], self.cgrCfg.MediatorDirectionFields[runIdx], diff --git a/mediator/mediator_local_test.go b/mediator/mediator_local_test.go index eca4ddb90..f0d76aeca 100644 --- a/mediator/mediator_local_test.go +++ b/mediator/mediator_local_test.go @@ -138,10 +138,10 @@ func TestPostCdrs(t *testing.T) { } httpClient := new(http.Client) cdrForm1 := url.Values{"accid": []string{"dsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, - "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"}, + "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"+4986517174963"}, "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} cdrForm2 := url.Values{"accid": []string{"adsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, - "tenant": []string{"itsyscom.com"}, "tor": []string{"call"}, "account": []string{"dan"}, "subject": []string{"dan"}, "destination": []string{"1002"}, + "tenant": []string{"itsyscom.com"}, "tor": []string{"call"}, "account": []string{"1003"}, "subject": []string{"1003"}, "destination": []string{"+4986517174964"}, "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} for _, cdrForm := range []url.Values{cdrForm1, cdrForm2} { cdrForm.Set(utils.CDRSOURCE, engine.TEST_SQL) @@ -151,9 +151,14 @@ func TestPostCdrs(t *testing.T) { } if storedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, false); err != nil { t.Error(err) - } else if len(storedCdrs) != 2 { + } else if len(storedCdrs) != 2 { // Make sure CDRs made it into StorDb t.Error(fmt.Sprintf("Unexpected number of CDRs stored: %d", len(storedCdrs))) } + if nonErrorCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, true, false); err != nil { + t.Error(err) + } else if len(nonErrorCdrs) != 0 { // Just two of them should be without errors + t.Error(fmt.Sprintf("Unexpected number of CDRs stored: %d", len(nonErrorCdrs))) + } } // Directly inject CDRs into storDb @@ -161,17 +166,27 @@ func TestInjectCdrs(t *testing.T) { if !*testLocal { return } - cgrCdr1 := utils.CgrCdr{"accid": "aaaaadsafdsaf", "cdr_source": engine.TEST_SQL, "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out", - "tenant": "cgrates.org", "tor": "call", "account": "1001", "subject": "1001", "destination": "1002", + cgrCdr1 := utils.CgrCdr{"accid": "aaaaadsafdsaf", "cdrsource": engine.TEST_SQL, "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out", + "tenant": "cgrates.org", "tor": "call", "account": "dan", "subject": "dan", "destination": "+4986517174963", "answer_time": "2013-11-07T08:42:26Z", "duration": "10"} - if err := cdrStor.SetCdr(cgrCdr1); err != nil { - t.Error(err) + cgrCdr2 := utils.CgrCdr{"accid": "baaaadsafdsaf", "cdrsource": engine.TEST_SQL, "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out", + "tenant": "cgrates.org", "tor": "call", "account": "dan", "subject": "dan", "destination": "+4986517173964", + "answer_time": "2013-11-07T09:42:26Z", "duration": "20"} + for _, cdr := range []utils.CgrCdr{ cgrCdr1, cgrCdr2} { + if err := cdrStor.SetCdr(cdr); err != nil { + t.Error(err) + } } if storedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, false); err != nil { t.Error(err) - } else if len(storedCdrs) != 3 { + } else if len(storedCdrs) != 4 { // Make sure CDRs made it into StorDb t.Error(fmt.Sprintf("Unexpected number of CDRs stored: %d", len(storedCdrs))) } + if nonRatedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, true, true); err != nil { + t.Error(err) + } else if len(nonRatedCdrs) != 2 { // Just two of them should be non-rated + t.Error(fmt.Sprintf("Unexpected number of CDRs non-rated: %d", len(nonRatedCdrs))) + } } // Test here LoadTariffPlanFromFolder @@ -199,6 +214,26 @@ func TestRateCdrs(t *testing.T) { } else if reply != utils.OK { t.Errorf("Unexpected reply: %s", reply) } + if nonRatedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, true, true); err != nil { + t.Error(err) + } else if len(nonRatedCdrs) != 0 { // Just two of them should be non-rated + t.Error(fmt.Sprintf("Unexpected number of CDRs non-rated: %d", len(nonRatedCdrs))) + } + if errRatedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, true); err != nil { + t.Error(err) + } else if len(errRatedCdrs) != 2 { // The first 2 with errors should be still there before rerating + t.Error(fmt.Sprintf("Unexpected number of CDRs with errors: %d", len(errRatedCdrs))) + } + if err := cgrRpc.Call("MediatorV1.RateCdrs", utils.AttrRateCdrs{RerateErrors: true}, &reply); err != nil { + t.Error(err.Error()) + } else if reply != utils.OK { + t.Errorf("Unexpected reply: %s", reply) + } + if errRatedCdrs, err := cdrStor.GetStoredCdrs(time.Time{}, time.Time{}, false, true); err != nil { + t.Error(err) + } else if len(errRatedCdrs) != 1 { // One CDR with errors should be fixed now by rerating + t.Error(fmt.Sprintf("Unexpected number of CDRs with errors: %d", len(errRatedCdrs))) + } } // Simply kill the engine after we are done with tests within this file