diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index c9b641092..e3a9ead15 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -197,12 +197,14 @@ func (self *Cdrc) processFile(filePath string) error { csvReader := csv.NewReader(bufio.NewReader(file)) csvReader.Comma = self.csvSep procRowNr := 0 + timeStart := time.Now() for { - procRowNr += 1 record, err := csvReader.Read() if err != nil && err == io.EOF { break // End of file - } else if err != nil { + } + procRowNr += 1 // Only increase if not end of file + if err != nil { engine.Logger.Err(fmt.Sprintf(" Error in csv file: %s", err.Error())) continue // Other csv related errors, ignore } @@ -229,6 +231,7 @@ func (self *Cdrc) processFile(filePath string) error { engine.Logger.Err(err.Error()) return err } - engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s", fn, newPath)) + engine.Logger.Info(fmt.Sprintf("Finished processing %s, moved to %s. Total records processed: %d, run duration: %s", + fn, newPath, procRowNr, time.Now().Sub(timeStart))) return nil } diff --git a/config/helpers.go b/config/helpers.go index b4c061f97..53522e241 100644 --- a/config/helpers.go +++ b/config/helpers.go @@ -29,11 +29,8 @@ import ( // Adds support for slice values in config func ConfigSlice(cfgVal string) ([]string, error) { - cfgValStrs := strings.Split(cfgVal, ",") // If need arrises, we can make the separator configurable + cfgValStrs := strings.Split(cfgVal, utils.FIELDS_SEP) // If need arrises, we can make the separator configurable for idx, elm := range cfgValStrs { - //if elm == "" { //One empty element is presented when splitting empty string - // return nil, errors.New("Empty values in config slice") - //} cfgValStrs[idx] = strings.TrimSpace(elm) // By default spaces are not removed so we do it here to avoid unpredicted results in config } return cfgValStrs, nil diff --git a/config/helpers_test.go b/config/helpers_test.go index a64d02f52..11f0177bc 100644 --- a/config/helpers_test.go +++ b/config/helpers_test.go @@ -19,6 +19,7 @@ along with this program. If not, see package config import ( + "encoding/json" "reflect" "testing" @@ -61,6 +62,43 @@ usage_fields = test1, test2 } } +func TestParseCfgDerivedChargingDn1(t *testing.T) { + eFieldsCfg := []byte(`[derived_charging] +run_ids = run1, run2 +run_filters =~account:s/^\w+[mpls]\d{6}$//,~account:s/^0\d{9}$//;^account/value/ +reqtype_fields = test1, test2 +direction_fields = test1, test2 +tenant_fields = test1, test2 +category_fields = test1, test2 +account_fields = test1, test2 +subject_fields = test1, test2 +destination_fields = test1, test2 +setup_time_fields = test1, test2 +answer_time_fields = test1, test2 +usage_fields = test1, test2 +`) + eDcs := make(utils.DerivedChargers, 2) + if dc, err := utils.NewDerivedCharger("run1", `~account:s/^\w+[mpls]\d{6}$//`, "test1", "test1", "test1", + "test1", "test1", "test1", "test1", "test1", "test1", "test1"); err != nil { + t.Error("Unexpected error: ", err) + } else { + eDcs[0] = dc + } + if dc, err := utils.NewDerivedCharger("run2", `~account:s/^0\d{9}$//;^account/value/`, "test2", "test2", "test2", + "test2", "test2", "test2", "test2", "test2", "test2", "test2"); err != nil { + t.Error("Unexpected error: ", err) + } else { + eDcs[1] = dc + } + + if cfg, err := NewCGRConfigFromBytes(eFieldsCfg); err != nil { + t.Error("Could not parse the config", err.Error()) + } else if !reflect.DeepEqual(cfg.DerivedChargers, eDcs) { + dcsJson, _ := json.Marshal(cfg.DerivedChargers) + t.Errorf("Received: %s", string(dcsJson)) + } +} + func TestParseCdrcCdrFields(t *testing.T) { eFieldsCfg := []byte(`[cdrc] cdr_type = test diff --git a/mediator/mediator.go b/mediator/mediator.go index d2005dfcb..9dbbe88d9 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -121,11 +121,16 @@ func (self *Mediator) RateCdr(storedCdr *utils.StoredCdr) error { } for _, dc := range dcs { runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP) + matchingAllFilters := true for _, dcRunFilter := range runFilters { if fltrPass, _ := storedCdr.PassesFieldFilter(dcRunFilter); !fltrPass { - continue + matchingAllFilters = false + break } } + if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched + continue + } dcReqTypeFld, _ := utils.NewRSRField(dc.ReqTypeField) dcDirFld, _ := utils.NewRSRField(dc.DirectionField) dcTenantFld, _ := utils.NewRSRField(dc.TenantField) @@ -143,6 +148,7 @@ func (self *Mediator) RateCdr(storedCdr *utils.StoredCdr) error { err.Error()) // Cannot fork CDR, important just runid and error continue } + engine.Logger.Debug(fmt.Sprintf("Appending CdrRun: %+v\n", forkedCdr)) cdrRuns = append(cdrRuns, forkedCdr) } for _, cdr := range cdrRuns { diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 341791f2f..29fe30bf6 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -162,11 +162,16 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { dcs, _ = dcs.AppendDefaultRun() for _, dc := range dcs { runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP) + matchingAllFilters := true for _, dcRunFilter := range runFilters { if fltrPass, _ := ev.PassesFieldFilter(dcRunFilter); !fltrPass { - continue + matchingAllFilters = false + break } } + if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched + continue + } startTime, err := ev.GetAnswerTime(PARK_TIME) if err != nil { engine.Logger.Err("Error parsing answer event start time, using time.Now!") diff --git a/utils/consts.go b/utils/consts.go index 014fba60a..f410e311a 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -74,6 +74,7 @@ const ( FIELDS_SEP = "," REGEXP_PREFIX = "~" JSON = "json" + GOB = "gob" MSGPACK = "msgpack" CSV_LOAD = "CSVLOAD" CGRID = "cgrid" diff --git a/utils/rsrfield_test.go b/utils/rsrfield_test.go index 9296da4dc..2eca3667d 100644 --- a/utils/rsrfield_test.go +++ b/utils/rsrfield_test.go @@ -165,3 +165,13 @@ func TestParseRSRFields(t *testing.T) { t.Errorf("Unexpected value of parsed fields") } } + +func TestParseCdrcDn1(t *testing.T) { + if rl, err := NewRSRField(`~1:s/^00(\d+)(?:[a-zA-Z].{3})*0*([1-9]\d+)$/+$1$2/:s/^\+49(18\d{2})$/+491400$1/`); err != nil { + t.Error("Unexpected error: ", err) + } else if parsed := rl.ParseValue("0049ABOC0630415354"); parsed != "+49630415354" { + t.Errorf("Expecting: +49630415354, received: %s", parsed) + } else if parsed2 := rl.ParseValue("00491888"); parsed2 != "+4914001888" { + t.Errorf("Expecting: +4914001888, received: %s", parsed2) + } +} diff --git a/utils/storedcdr_test.go b/utils/storedcdr_test.go index bfc02bfd9..62a3bf97c 100644 --- a/utils/storedcdr_test.go +++ b/utils/storedcdr_test.go @@ -130,6 +130,7 @@ func TestPassesFieldFilterDn1(t *testing.T) { if pass, _ := cdr.PassesFieldFilter(acntPrefxFltr); !pass { t.Error("Not passing valid filter") } + cdr = &StoredCdr{CgrId: Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), Account: "futurem00005", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, } @@ -153,6 +154,23 @@ func TestPassesFieldFilterDn1(t *testing.T) { if pass, _ := cdr.PassesFieldFilter(acntPrefxFltr); pass { t.Error("Should not pass filter") } + cdr = &StoredCdr{CgrId: Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), Account: "0162447222", + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, + } + if acntPrefxFltr, err := NewRSRField(`~account:s/^0\d{9}$//`); err != nil { + t.Error("Unexpected parse error", err) + } else if acntPrefxFltr == nil { + t.Error("Failed parsing rule") + } else if pass, _ := cdr.PassesFieldFilter(acntPrefxFltr); !pass { + t.Error("Not passing valid filter") + } + if acntPrefxFltr, err := NewRSRField(`~account:s/^\w+[shmp]\d{4}$//`); err != nil { + t.Error("Unexpected parse error", err) + } else if acntPrefxFltr == nil { + t.Error("Failed parsing rule") + } else if pass, _ := cdr.PassesFieldFilter(acntPrefxFltr); pass { + t.Error("Should not pass filter") + } } func TestUsageMultiply(t *testing.T) {