mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 05:09:54 +05:00
DerivedCharging run filters are now chained
This commit is contained in:
@@ -197,12 +197,14 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
csvReader := csv.NewReader(bufio.NewReader(file))
|
||||
csvReader.Comma = self.csvSep
|
||||
procRowNr := 0
|
||||
timeStart := time.Now()
|
||||
for {
|
||||
procRowNr += 1
|
||||
record, err := csvReader.Read()
|
||||
if err != nil && err == io.EOF {
|
||||
break // End of file
|
||||
} else if err != nil {
|
||||
}
|
||||
procRowNr += 1 // Only increase if not end of file
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Error in csv file: %s", err.Error()))
|
||||
continue // Other csv related errors, ignore
|
||||
}
|
||||
@@ -229,6 +231,7 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
engine.Logger.Err(err.Error())
|
||||
return err
|
||||
}
|
||||
engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s", fn, newPath))
|
||||
engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s. Total records processed: %d, run duration: %s",
|
||||
fn, newPath, procRowNr, time.Now().Sub(timeStart)))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -29,11 +29,8 @@ import (
|
||||
|
||||
// Adds support for slice values in config
|
||||
func ConfigSlice(cfgVal string) ([]string, error) {
|
||||
cfgValStrs := strings.Split(cfgVal, ",") // If need arrises, we can make the separator configurable
|
||||
cfgValStrs := strings.Split(cfgVal, utils.FIELDS_SEP) // If need arrises, we can make the separator configurable
|
||||
for idx, elm := range cfgValStrs {
|
||||
//if elm == "" { //One empty element is presented when splitting empty string
|
||||
// return nil, errors.New("Empty values in config slice")
|
||||
//}
|
||||
cfgValStrs[idx] = strings.TrimSpace(elm) // By default spaces are not removed so we do it here to avoid unpredicted results in config
|
||||
}
|
||||
return cfgValStrs, nil
|
||||
|
||||
@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
@@ -61,6 +62,43 @@ usage_fields = test1, test2
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseCfgDerivedChargingDn1(t *testing.T) {
|
||||
eFieldsCfg := []byte(`[derived_charging]
|
||||
run_ids = run1, run2
|
||||
run_filters =~account:s/^\w+[mpls]\d{6}$//,~account:s/^0\d{9}$//;^account/value/
|
||||
reqtype_fields = test1, test2
|
||||
direction_fields = test1, test2
|
||||
tenant_fields = test1, test2
|
||||
category_fields = test1, test2
|
||||
account_fields = test1, test2
|
||||
subject_fields = test1, test2
|
||||
destination_fields = test1, test2
|
||||
setup_time_fields = test1, test2
|
||||
answer_time_fields = test1, test2
|
||||
usage_fields = test1, test2
|
||||
`)
|
||||
eDcs := make(utils.DerivedChargers, 2)
|
||||
if dc, err := utils.NewDerivedCharger("run1", `~account:s/^\w+[mpls]\d{6}$//`, "test1", "test1", "test1",
|
||||
"test1", "test1", "test1", "test1", "test1", "test1", "test1"); err != nil {
|
||||
t.Error("Unexpected error: ", err)
|
||||
} else {
|
||||
eDcs[0] = dc
|
||||
}
|
||||
if dc, err := utils.NewDerivedCharger("run2", `~account:s/^0\d{9}$//;^account/value/`, "test2", "test2", "test2",
|
||||
"test2", "test2", "test2", "test2", "test2", "test2", "test2"); err != nil {
|
||||
t.Error("Unexpected error: ", err)
|
||||
} else {
|
||||
eDcs[1] = dc
|
||||
}
|
||||
|
||||
if cfg, err := NewCGRConfigFromBytes(eFieldsCfg); err != nil {
|
||||
t.Error("Could not parse the config", err.Error())
|
||||
} else if !reflect.DeepEqual(cfg.DerivedChargers, eDcs) {
|
||||
dcsJson, _ := json.Marshal(cfg.DerivedChargers)
|
||||
t.Errorf("Received: %s", string(dcsJson))
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseCdrcCdrFields(t *testing.T) {
|
||||
eFieldsCfg := []byte(`[cdrc]
|
||||
cdr_type = test
|
||||
|
||||
@@ -121,11 +121,16 @@ func (self *Mediator) RateCdr(storedCdr *utils.StoredCdr) error {
|
||||
}
|
||||
for _, dc := range dcs {
|
||||
runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP)
|
||||
matchingAllFilters := true
|
||||
for _, dcRunFilter := range runFilters {
|
||||
if fltrPass, _ := storedCdr.PassesFieldFilter(dcRunFilter); !fltrPass {
|
||||
continue
|
||||
matchingAllFilters = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched
|
||||
continue
|
||||
}
|
||||
dcReqTypeFld, _ := utils.NewRSRField(dc.ReqTypeField)
|
||||
dcDirFld, _ := utils.NewRSRField(dc.DirectionField)
|
||||
dcTenantFld, _ := utils.NewRSRField(dc.TenantField)
|
||||
@@ -143,6 +148,7 @@ func (self *Mediator) RateCdr(storedCdr *utils.StoredCdr) error {
|
||||
err.Error()) // Cannot fork CDR, important just runid and error
|
||||
continue
|
||||
}
|
||||
engine.Logger.Debug(fmt.Sprintf("Appending CdrRun: %+v\n", forkedCdr))
|
||||
cdrRuns = append(cdrRuns, forkedCdr)
|
||||
}
|
||||
for _, cdr := range cdrRuns {
|
||||
|
||||
@@ -162,11 +162,16 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) {
|
||||
dcs, _ = dcs.AppendDefaultRun()
|
||||
for _, dc := range dcs {
|
||||
runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP)
|
||||
matchingAllFilters := true
|
||||
for _, dcRunFilter := range runFilters {
|
||||
if fltrPass, _ := ev.PassesFieldFilter(dcRunFilter); !fltrPass {
|
||||
continue
|
||||
matchingAllFilters = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched
|
||||
continue
|
||||
}
|
||||
startTime, err := ev.GetAnswerTime(PARK_TIME)
|
||||
if err != nil {
|
||||
engine.Logger.Err("Error parsing answer event start time, using time.Now!")
|
||||
|
||||
@@ -74,6 +74,7 @@ const (
|
||||
FIELDS_SEP = ","
|
||||
REGEXP_PREFIX = "~"
|
||||
JSON = "json"
|
||||
GOB = "gob"
|
||||
MSGPACK = "msgpack"
|
||||
CSV_LOAD = "CSVLOAD"
|
||||
CGRID = "cgrid"
|
||||
|
||||
@@ -165,3 +165,13 @@ func TestParseRSRFields(t *testing.T) {
|
||||
t.Errorf("Unexpected value of parsed fields")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseCdrcDn1(t *testing.T) {
|
||||
if rl, err := NewRSRField(`~1:s/^00(\d+)(?:[a-zA-Z].{3})*0*([1-9]\d+)$/+$1$2/:s/^\+49(18\d{2})$/+491400$1/`); err != nil {
|
||||
t.Error("Unexpected error: ", err)
|
||||
} else if parsed := rl.ParseValue("0049ABOC0630415354"); parsed != "+49630415354" {
|
||||
t.Errorf("Expecting: +49630415354, received: %s", parsed)
|
||||
} else if parsed2 := rl.ParseValue("00491888"); parsed2 != "+4914001888" {
|
||||
t.Errorf("Expecting: +4914001888, received: %s", parsed2)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,6 +130,7 @@ func TestPassesFieldFilterDn1(t *testing.T) {
|
||||
if pass, _ := cdr.PassesFieldFilter(acntPrefxFltr); !pass {
|
||||
t.Error("Not passing valid filter")
|
||||
}
|
||||
|
||||
cdr = &StoredCdr{CgrId: Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), Account: "futurem00005",
|
||||
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
|
||||
}
|
||||
@@ -153,6 +154,23 @@ func TestPassesFieldFilterDn1(t *testing.T) {
|
||||
if pass, _ := cdr.PassesFieldFilter(acntPrefxFltr); pass {
|
||||
t.Error("Should not pass filter")
|
||||
}
|
||||
cdr = &StoredCdr{CgrId: Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), Account: "0162447222",
|
||||
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
|
||||
}
|
||||
if acntPrefxFltr, err := NewRSRField(`~account:s/^0\d{9}$//`); err != nil {
|
||||
t.Error("Unexpected parse error", err)
|
||||
} else if acntPrefxFltr == nil {
|
||||
t.Error("Failed parsing rule")
|
||||
} else if pass, _ := cdr.PassesFieldFilter(acntPrefxFltr); !pass {
|
||||
t.Error("Not passing valid filter")
|
||||
}
|
||||
if acntPrefxFltr, err := NewRSRField(`~account:s/^\w+[shmp]\d{4}$//`); err != nil {
|
||||
t.Error("Unexpected parse error", err)
|
||||
} else if acntPrefxFltr == nil {
|
||||
t.Error("Failed parsing rule")
|
||||
} else if pass, _ := cdr.PassesFieldFilter(acntPrefxFltr); pass {
|
||||
t.Error("Should not pass filter")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUsageMultiply(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user