From fbacb250b63b6a9ffaa4813ea14f473b2d3aa6c8 Mon Sep 17 00:00:00 2001 From: Edwardro22 Date: Mon, 24 Jul 2017 02:27:03 +0300 Subject: [PATCH] Added stats csv read methods and test --- apier/v1/apier.go | 1 + apier/v2/apier.go | 1 + cmd/cgr-loader/cgr-loader.go | 1 + engine/libtest.go | 1 + engine/loader_csv_test.go | 36 ++++++++- engine/model_helpers.go | 132 +++++++++++++++++++++++++++++++++ engine/storage_csv.go | 40 ++++++++-- engine/storage_interface.go | 1 + engine/storage_mongo_stordb.go | 4 + engine/tp_reader.go | 22 ++++++ engine/tpimporter_csv.go | 14 ++++ utils/consts.go | 1 + 12 files changed, 247 insertions(+), 7 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index a3c3581f7..2e8db02ef 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1452,6 +1452,7 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, path.Join(attrs.FolderPath, utils.USERS_CSV), path.Join(attrs.FolderPath, utils.ALIASES_CSV), path.Join(attrs.FolderPath, utils.ResourceLimitsCsv), + path.Join(attrs.FolderPath, utils.StatsCsv), ), "", self.Config.DefaultTimezone) if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) diff --git a/apier/v2/apier.go b/apier/v2/apier.go index 5e8126516..d5f62801f 100644 --- a/apier/v2/apier.go +++ b/apier/v2/apier.go @@ -140,6 +140,7 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, path.Join(attrs.FolderPath, utils.USERS_CSV), path.Join(attrs.FolderPath, utils.ALIASES_CSV), path.Join(attrs.FolderPath, utils.ResourceLimitsCsv), + path.Join(attrs.FolderPath, utils.StatsCsv), ), "", self.Config.DefaultTimezone) if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index efa019d87..2fa25eb19 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -267,6 +267,7 @@ func main() { path.Join(*dataPath, utils.USERS_CSV), path.Join(*dataPath, utils.ALIASES_CSV), path.Join(*dataPath, utils.ResourceLimitsCsv), + path.Join(*dataPath, utils.StatsCsv), ) } tpReader := engine.NewTpReader(dataDB, loader, *tpid, *timezone) diff --git a/engine/libtest.go b/engine/libtest.go index 1962a0ba4..a06b36525 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -127,6 +127,7 @@ func LoadTariffPlanFromFolder(tpPath, timezone string, dataDB DataDB, disable_re path.Join(tpPath, utils.USERS_CSV), path.Join(tpPath, utils.ALIASES_CSV), path.Join(tpPath, utils.ResourceLimitsCsv), + path.Join(tpPath, utils.StatsCsv), ), "", timezone) if err := loader.LoadAll(); err != nil { return utils.NewErrServerError(err) diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index ed7f9c826..bf7c73433 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -272,6 +272,10 @@ ResGroup21,*string,HdrAccount,1001;1002,2014-07-29T15:00:00Z,1s,2,call,10, ResGroup21,*string_prefix,HdrDestination,10;20,,,,,, ResGroup21,*rsr_fields,,HdrSubject(~^1.*1$);HdrDestination(1002),,,,,, ResGroup22,*destinations,HdrDestination,DST_FS,2014-07-29T15:00:00Z,3600s,2,premium_call,10, +` + stats = ` +#Id,FilterType,FilterFieldName,FilterFieldValues,ActivationInterval,QueueLength,TTL,Metrics,Store,Thresholds,Weight +Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc,true,THRESH1;THRESH2,20 ` ) @@ -279,7 +283,7 @@ var csvr *TpReader func init() { csvr = NewTpReader(dataStorage, NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles, - sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits), testTPID, "") + sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats), testTPID, "") if err := csvr.LoadDestinations(); err != nil { log.Print("error in LoadDestinations:", err) } @@ -330,6 +334,8 @@ func init() { } if err := csvr.LoadResourceLimits(); err != nil { } + if err := csvr.LoadStats(); err != nil { + } csvr.WriteToDatabase(false, false, false) cache.Flush() dataStorage.LoadRatingCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) @@ -1413,3 +1419,31 @@ func TestLoadResourceLimits(t *testing.T) { } } + +func TestLoadStats(t *testing.T) { + eStats := map[string]*utils.TPStats{ + "Stats1": &utils.TPStats{ + TPid: testTPID, + ID: "Stats1", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}}, + }, + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + }, + QueueLength: 100, + TTL: "1s", + Metrics: "*asr;*acd;*acc", + Store: true, + Thresholds: "THRESH1;THRESH2", + Weight: 20, + }, + } + + // if len(csvr.stats) != len(eStats) { + // t.Error("Failed to load stats: ", len(csvr.stats)) + // } else + if !reflect.DeepEqual(eStats["Stats1"], csvr.stats["Stats1"]) { + t.Errorf("Expecting: %+v, received: %+v", eStats["Stats1"], csvr.stats["Stats1"]) + } +} diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 4b832488e..cb7e8b2be 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -1936,3 +1936,135 @@ func APItoResourceLimit(tpRL *utils.TPResourceLimit, timezone string) (rl *Resou } return rl, nil } + +type TpStatsS []*TpStats + +func (tps TpStatsS) AsTPStats() (result []*utils.TPStats) { + mrl := make(map[string]*utils.TPStats) + for _, tp := range tps { + rl, found := mrl[tp.Tag] + if !found { + rl = &utils.TPStats{ + TPid: tp.Tpid, + ID: tp.Tag, + } + } + if tp.QueueLength != 0 { + rl.QueueLength = tp.QueueLength + } + if tp.TTL != "" { + rl.TTL = tp.TTL + } + if tp.Metrics != "" { + rl.Metrics = tp.Metrics + } + if tp.Store != false { + rl.Store = tp.Store + } + if tp.Thresholds != "" { + rl.Thresholds = tp.Thresholds + } + if tp.Weight != 0 { + rl.Weight = tp.Weight + } + if len(tp.ActivationInterval) != 0 { + rl.ActivationInterval = new(utils.TPActivationInterval) + aiSplt := strings.Split(tp.ActivationInterval, utils.INFIELD_SEP) + if len(aiSplt) == 2 { + rl.ActivationInterval.ActivationTime = aiSplt[0] + rl.ActivationInterval.ExpiryTime = aiSplt[1] + } else if len(aiSplt) == 1 { + rl.ActivationInterval.ActivationTime = aiSplt[0] + } + } + if tp.FilterType != "" { + rl.Filters = append(rl.Filters, &utils.TPRequestFilter{ + Type: tp.FilterType, + FieldName: tp.FilterFieldName, + Values: strings.Split(tp.FilterFieldValues, utils.INFIELD_SEP)}) + } + mrl[tp.Tag] = rl + } + result = make([]*utils.TPStats, len(mrl)) + i := 0 + for _, rl := range mrl { + result[i] = rl + i++ + } + return +} + +// func APItoModelTPStats(tps *utils.TPStats) (mdls TpStatsS) { +// if len(rl.Filters) == 0 { +// return +// } +// for i, fltr := range rl.Filters { +// mdl := &TpStats{ +// Tpid: tps.TPid, +// Tag: tps.ID, +// } +// if i == 0 { +// mdl.QueueLength = tps.QueueLength +// mdl.TTL = tps.TTL +// mdl.Metrics = tps.Metrics +// mdl.Store = tps.Store +// mdl.Thresholds = tps.Thresholds +// mdl.Weight = tps.Weight +// if tps.ActivationInterval != nil { +// if tps.ActivationInterval.ActivationTime != "" { +// mdl.ActivationInterval = tps.ActivationInterval.ActivationTime +// } +// if tps.ActivationInterval.ExpiryTime != "" { +// mdl.ActivationInterval += utils.INFIELD_SEP + tps.ActivationInterval.ExpiryTime +// } +// } +// } +// mdl.FilterType = fltr.Type +// mdl.FilterFieldName = fltr.FieldName +// for i, val := range fltr.Values { +// if i != 0 { +// mdl.FilterFieldValues = mdl.FilterFieldValues + utils.INFIELD_SEP + val +// } else { +// mdl.FilterFieldValues = val +// } +// } +// mdls = append(mdls, mdl) +// } +// return +// }FilterFieldValues = mdl.FilterFieldValues + utils.INFIELD_SEP + val +// } else { +// mdl.FilterFieldValues = val +// } +// } +// mdls = append(mdls, mdl) +// } +// return +// } + +// func APItoTPStats(tpST *utils.TPStats, timezone string) (st *TpStats, err error) { +// st = &ResourceLimit{ID: tpST.ID, Weight: tpST.Weight, +// Filters: make([]*RequestFilter, len(tpST.Filters)), Usage: make(map[string]*ResourceUsage)} +// if tpST.TTL != "" { +// if rl.TTL, err = utils.ParseDurationWithSecs(tpST.TTL); err != nil { +// return nil, err +// } +// } +// for i, f := range tpST.Filters { +// rf := &RequestFilter{Type: f.Type, FieldName: f.FieldName, Values: f.Values} +// if err := rf.CompileValues(); err != nil { +// return nil, err +// } +// st.Filters[i] = rf +// } +// if tpST.ActivationInterval != nil { +// if st.ActivationInterval, err = tpST.ActivationInterval.AsActivationInterval(timezone); err != nil { +// return nil, err +// } +// } +// if tpST.Thresholds != "" { +// if st.Thresholds, err = strconv.ParseFloat(tpST.Thresholds, 64); err != nil { +// return nil, err +// } +// } +// return st, nil +// } diff --git a/engine/storage_csv.go b/engine/storage_csv.go index 112831eb9..fe680b2f3 100644 --- a/engine/storage_csv.go +++ b/engine/storage_csv.go @@ -32,26 +32,26 @@ type CSVStorage struct { readerFunc func(string, rune, int) (*csv.Reader, *os.File, error) // file names destinationsFn, ratesFn, destinationratesFn, timingsFn, destinationratetimingsFn, ratingprofilesFn, - sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn string + sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn string } func NewFileCSVStorage(sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, - actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn string) *CSVStorage { + actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn string) *CSVStorage { c := new(CSVStorage) c.sep = sep c.readerFunc = openFileCSVStorage c.destinationsFn, c.timingsFn, c.ratesFn, c.destinationratesFn, c.destinationratetimingsFn, c.ratingprofilesFn, - c.sharedgroupsFn, c.lcrFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn, c.derivedChargersFn, c.cdrStatsFn, c.usersFn, c.aliasesFn, c.resLimitsFn = destinationsFn, timingsFn, - ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn + c.sharedgroupsFn, c.lcrFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn, c.derivedChargersFn, c.cdrStatsFn, c.usersFn, c.aliasesFn, c.resLimitsFn, c.statsFn = destinationsFn, timingsFn, + ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn return c } func NewStringCSVStorage(sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, - actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn string) *CSVStorage { + actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn string) *CSVStorage { c := NewFileCSVStorage(sep, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, - ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn) + ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn) c.readerFunc = openStringCSVStorage return c } @@ -620,6 +620,34 @@ func (csvs *CSVStorage) GetTPResourceLimits(tpid, id string) ([]*utils.TPResourc return tpResLimits.AsTPResourceLimits(), nil } +func (csvs *CSVStorage) GetTPStats(tpid, id string) ([]*utils.TPStats, error) { + csvReader, fp, err := csvs.readerFunc(csvs.statsFn, csvs.sep, getColumnCount(TpStats{})) + if err != nil { + //log.Print("Could not load resource limits file: ", err) + // allow writing of the other values + return nil, err + } + if fp != nil { + defer fp.Close() + } + var tpStats TpStatsS + for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() { + if err != nil { + log.Print("bad line in TPStats csv: ", err) + return nil, err + } + if tpstats, err := csvLoad(TpStats{}, record); err != nil { + log.Print("error loading TPStats: ", err) + return nil, err + } else { + tPstats := tpstats.(TpStats) + tPstats.Tpid = tpid + tpStats = append(tpStats, &tPstats) + } + } + return tpStats.AsTPStats(), nil +} + func (csvs *CSVStorage) GetTpIds() ([]string, error) { return nil, utils.ErrNotImplemented } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 6df1ab266..005d9a2dc 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -162,6 +162,7 @@ type LoadReader interface { GetTPActionTriggers(string, string) ([]*utils.TPActionTriggers, error) GetTPAccountActions(*utils.TPAccountActions) ([]*utils.TPAccountActions, error) GetTPResourceLimits(string, string) ([]*utils.TPResourceLimit, error) + GetTPStats(string, string) ([]*utils.TPStats, error) } type LoadWriter interface { diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index ffe58547a..9000e2a44 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -1105,6 +1105,10 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, return cdrs, 0, nil } +func (ms *MongoStorage) GetTPStats(string, string) ([]*utils.TPStats, error) { + return nil, nil +} + func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) { return } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index df88093ad..15ae35433 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -53,6 +53,7 @@ type TpReader struct { users map[string]*UserProfile aliases map[string]*Alias resLimits map[string]*utils.TPResourceLimit + stats map[string]*utils.TPStats revDests, revAliases, acntActionPlans map[string][]string @@ -124,6 +125,7 @@ func (tpr *TpReader) Init() { tpr.aliases = make(map[string]*Alias) tpr.derivedChargers = make(map[string]*utils.DerivedChargers) tpr.resLimits = make(map[string]*utils.TPResourceLimit) + tpr.stats = make(map[string]*utils.TPStats) tpr.revDests = make(map[string][]string) tpr.revAliases = make(map[string][]string) tpr.acntActionPlans = make(map[string][]string) @@ -1603,6 +1605,23 @@ func (tpr *TpReader) LoadResourceLimits() error { return tpr.LoadResourceLimitsFiltered("") } +func (tpr *TpReader) LoadStatsFiltered(tag string) error { + tps, err := tpr.lr.GetTPStats(tpr.tpid, tag) + if err != nil { + return err + } + mapSTs := make(map[string]*utils.TPStats) + for _, st := range tps { + mapSTs[st.ID] = st + } + tpr.stats = mapSTs + return nil +} + +func (tpr *TpReader) LoadStats() error { + return tpr.LoadStatsFiltered("") +} + func (tpr *TpReader) LoadAll() (err error) { if err = tpr.LoadDestinations(); err != nil && err.Error() != utils.NotFoundCaps { return @@ -1655,6 +1674,9 @@ func (tpr *TpReader) LoadAll() (err error) { if err = tpr.LoadResourceLimits(); err != nil && err.Error() != utils.NotFoundCaps { return } + if err = tpr.LoadStats(); err != nil && err.Error() != utils.NotFoundCaps { + return + } return nil } diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go index 702f822dc..7dff78896 100644 --- a/engine/tpimporter_csv.go +++ b/engine/tpimporter_csv.go @@ -57,6 +57,7 @@ var fileHandlers = map[string]func(*TPCSVImporter, string) error{ utils.USERS_CSV: (*TPCSVImporter).importUsers, utils.ALIASES_CSV: (*TPCSVImporter).importAliases, utils.ResourceLimitsCsv: (*TPCSVImporter).importResourceLimits, + utils.StatsCsv: (*TPCSVImporter).importStats, } func (self *TPCSVImporter) Run() error { @@ -78,6 +79,7 @@ func (self *TPCSVImporter) Run() error { path.Join(self.DirPath, utils.USERS_CSV), path.Join(self.DirPath, utils.ALIASES_CSV), path.Join(self.DirPath, utils.ResourceLimitsCsv), + path.Join(self.DirPath, utils.StatsCsv), ) files, _ := ioutil.ReadDir(self.DirPath) for _, f := range files { @@ -357,3 +359,15 @@ func (self *TPCSVImporter) importResourceLimits(fn string) error { } return self.StorDb.SetTPResourceLimits(rls) } + +func (self *TPCSVImporter) importStats(fn string) error { + return nil + // if self.Verbose { + // log.Printf("Processing file: <%s> ", fn) + // } + // rls, err := self.csvr.GetTPResourceLimits(self.TPid, "") + // if err != nil { + // return err + // } + // return self.StorDb.SetTPResourceLimits(rls) +} diff --git a/utils/consts.go b/utils/consts.go index 37f0a1d8e..92a04ccab 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -101,6 +101,7 @@ const ( USERS_CSV = "Users.csv" ALIASES_CSV = "Aliases.csv" ResourceLimitsCsv = "ResourceLimits.csv" + StatsCsv = "Stats.csv" ROUNDING_UP = "*up" ROUNDING_MIDDLE = "*middle" ROUNDING_DOWN = "*down"