mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added stats csv read methods and test
This commit is contained in:
@@ -1452,6 +1452,7 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
|
||||
path.Join(attrs.FolderPath, utils.USERS_CSV),
|
||||
path.Join(attrs.FolderPath, utils.ALIASES_CSV),
|
||||
path.Join(attrs.FolderPath, utils.ResourceLimitsCsv),
|
||||
path.Join(attrs.FolderPath, utils.StatsCsv),
|
||||
), "", self.Config.DefaultTimezone)
|
||||
if err := loader.LoadAll(); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
|
||||
@@ -140,6 +140,7 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
|
||||
path.Join(attrs.FolderPath, utils.USERS_CSV),
|
||||
path.Join(attrs.FolderPath, utils.ALIASES_CSV),
|
||||
path.Join(attrs.FolderPath, utils.ResourceLimitsCsv),
|
||||
path.Join(attrs.FolderPath, utils.StatsCsv),
|
||||
), "", self.Config.DefaultTimezone)
|
||||
if err := loader.LoadAll(); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
|
||||
@@ -267,6 +267,7 @@ func main() {
|
||||
path.Join(*dataPath, utils.USERS_CSV),
|
||||
path.Join(*dataPath, utils.ALIASES_CSV),
|
||||
path.Join(*dataPath, utils.ResourceLimitsCsv),
|
||||
path.Join(*dataPath, utils.StatsCsv),
|
||||
)
|
||||
}
|
||||
tpReader := engine.NewTpReader(dataDB, loader, *tpid, *timezone)
|
||||
|
||||
@@ -127,6 +127,7 @@ func LoadTariffPlanFromFolder(tpPath, timezone string, dataDB DataDB, disable_re
|
||||
path.Join(tpPath, utils.USERS_CSV),
|
||||
path.Join(tpPath, utils.ALIASES_CSV),
|
||||
path.Join(tpPath, utils.ResourceLimitsCsv),
|
||||
path.Join(tpPath, utils.StatsCsv),
|
||||
), "", timezone)
|
||||
if err := loader.LoadAll(); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
|
||||
@@ -272,6 +272,10 @@ ResGroup21,*string,HdrAccount,1001;1002,2014-07-29T15:00:00Z,1s,2,call,10,
|
||||
ResGroup21,*string_prefix,HdrDestination,10;20,,,,,,
|
||||
ResGroup21,*rsr_fields,,HdrSubject(~^1.*1$);HdrDestination(1002),,,,,,
|
||||
ResGroup22,*destinations,HdrDestination,DST_FS,2014-07-29T15:00:00Z,3600s,2,premium_call,10,
|
||||
`
|
||||
stats = `
|
||||
#Id,FilterType,FilterFieldName,FilterFieldValues,ActivationInterval,QueueLength,TTL,Metrics,Store,Thresholds,Weight
|
||||
Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc,true,THRESH1;THRESH2,20
|
||||
`
|
||||
)
|
||||
|
||||
@@ -279,7 +283,7 @@ var csvr *TpReader
|
||||
|
||||
func init() {
|
||||
csvr = NewTpReader(dataStorage, NewStringCSVStorage(',', destinations, timings, rates, destinationRates, ratingPlans, ratingProfiles,
|
||||
sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits), testTPID, "")
|
||||
sharedGroups, lcrs, actions, actionPlans, actionTriggers, accountActions, derivedCharges, cdrStats, users, aliases, resLimits, stats), testTPID, "")
|
||||
if err := csvr.LoadDestinations(); err != nil {
|
||||
log.Print("error in LoadDestinations:", err)
|
||||
}
|
||||
@@ -330,6 +334,8 @@ func init() {
|
||||
}
|
||||
if err := csvr.LoadResourceLimits(); err != nil {
|
||||
}
|
||||
if err := csvr.LoadStats(); err != nil {
|
||||
}
|
||||
csvr.WriteToDatabase(false, false, false)
|
||||
cache.Flush()
|
||||
dataStorage.LoadRatingCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
|
||||
@@ -1413,3 +1419,31 @@ func TestLoadResourceLimits(t *testing.T) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestLoadStats(t *testing.T) {
|
||||
eStats := map[string]*utils.TPStats{
|
||||
"Stats1": &utils.TPStats{
|
||||
TPid: testTPID,
|
||||
ID: "Stats1",
|
||||
Filters: []*utils.TPRequestFilter{
|
||||
&utils.TPRequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}},
|
||||
},
|
||||
ActivationInterval: &utils.TPActivationInterval{
|
||||
ActivationTime: "2014-07-29T15:00:00Z",
|
||||
},
|
||||
QueueLength: 100,
|
||||
TTL: "1s",
|
||||
Metrics: "*asr;*acd;*acc",
|
||||
Store: true,
|
||||
Thresholds: "THRESH1;THRESH2",
|
||||
Weight: 20,
|
||||
},
|
||||
}
|
||||
|
||||
// if len(csvr.stats) != len(eStats) {
|
||||
// t.Error("Failed to load stats: ", len(csvr.stats))
|
||||
// } else
|
||||
if !reflect.DeepEqual(eStats["Stats1"], csvr.stats["Stats1"]) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eStats["Stats1"], csvr.stats["Stats1"])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1936,3 +1936,135 @@ func APItoResourceLimit(tpRL *utils.TPResourceLimit, timezone string) (rl *Resou
|
||||
}
|
||||
return rl, nil
|
||||
}
|
||||
|
||||
type TpStatsS []*TpStats
|
||||
|
||||
func (tps TpStatsS) AsTPStats() (result []*utils.TPStats) {
|
||||
mrl := make(map[string]*utils.TPStats)
|
||||
for _, tp := range tps {
|
||||
rl, found := mrl[tp.Tag]
|
||||
if !found {
|
||||
rl = &utils.TPStats{
|
||||
TPid: tp.Tpid,
|
||||
ID: tp.Tag,
|
||||
}
|
||||
}
|
||||
if tp.QueueLength != 0 {
|
||||
rl.QueueLength = tp.QueueLength
|
||||
}
|
||||
if tp.TTL != "" {
|
||||
rl.TTL = tp.TTL
|
||||
}
|
||||
if tp.Metrics != "" {
|
||||
rl.Metrics = tp.Metrics
|
||||
}
|
||||
if tp.Store != false {
|
||||
rl.Store = tp.Store
|
||||
}
|
||||
if tp.Thresholds != "" {
|
||||
rl.Thresholds = tp.Thresholds
|
||||
}
|
||||
if tp.Weight != 0 {
|
||||
rl.Weight = tp.Weight
|
||||
}
|
||||
if len(tp.ActivationInterval) != 0 {
|
||||
rl.ActivationInterval = new(utils.TPActivationInterval)
|
||||
aiSplt := strings.Split(tp.ActivationInterval, utils.INFIELD_SEP)
|
||||
if len(aiSplt) == 2 {
|
||||
rl.ActivationInterval.ActivationTime = aiSplt[0]
|
||||
rl.ActivationInterval.ExpiryTime = aiSplt[1]
|
||||
} else if len(aiSplt) == 1 {
|
||||
rl.ActivationInterval.ActivationTime = aiSplt[0]
|
||||
}
|
||||
}
|
||||
if tp.FilterType != "" {
|
||||
rl.Filters = append(rl.Filters, &utils.TPRequestFilter{
|
||||
Type: tp.FilterType,
|
||||
FieldName: tp.FilterFieldName,
|
||||
Values: strings.Split(tp.FilterFieldValues, utils.INFIELD_SEP)})
|
||||
}
|
||||
mrl[tp.Tag] = rl
|
||||
}
|
||||
result = make([]*utils.TPStats, len(mrl))
|
||||
i := 0
|
||||
for _, rl := range mrl {
|
||||
result[i] = rl
|
||||
i++
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// func APItoModelTPStats(tps *utils.TPStats) (mdls TpStatsS) {
|
||||
// if len(rl.Filters) == 0 {
|
||||
// return
|
||||
// }
|
||||
// for i, fltr := range rl.Filters {
|
||||
// mdl := &TpStats{
|
||||
// Tpid: tps.TPid,
|
||||
// Tag: tps.ID,
|
||||
// }
|
||||
// if i == 0 {
|
||||
// mdl.QueueLength = tps.QueueLength
|
||||
// mdl.TTL = tps.TTL
|
||||
// mdl.Metrics = tps.Metrics
|
||||
// mdl.Store = tps.Store
|
||||
// mdl.Thresholds = tps.Thresholds
|
||||
// mdl.Weight = tps.Weight
|
||||
// if tps.ActivationInterval != nil {
|
||||
// if tps.ActivationInterval.ActivationTime != "" {
|
||||
// mdl.ActivationInterval = tps.ActivationInterval.ActivationTime
|
||||
// }
|
||||
// if tps.ActivationInterval.ExpiryTime != "" {
|
||||
// mdl.ActivationInterval += utils.INFIELD_SEP + tps.ActivationInterval.ExpiryTime
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// mdl.FilterType = fltr.Type
|
||||
// mdl.FilterFieldName = fltr.FieldName
|
||||
// for i, val := range fltr.Values {
|
||||
// if i != 0 {
|
||||
// mdl.FilterFieldValues = mdl.FilterFieldValues + utils.INFIELD_SEP + val
|
||||
// } else {
|
||||
// mdl.FilterFieldValues = val
|
||||
// }
|
||||
// }
|
||||
// mdls = append(mdls, mdl)
|
||||
// }
|
||||
// return
|
||||
// }FilterFieldValues = mdl.FilterFieldValues + utils.INFIELD_SEP + val
|
||||
// } else {
|
||||
// mdl.FilterFieldValues = val
|
||||
// }
|
||||
// }
|
||||
// mdls = append(mdls, mdl)
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
// func APItoTPStats(tpST *utils.TPStats, timezone string) (st *TpStats, err error) {
|
||||
// st = &ResourceLimit{ID: tpST.ID, Weight: tpST.Weight,
|
||||
// Filters: make([]*RequestFilter, len(tpST.Filters)), Usage: make(map[string]*ResourceUsage)}
|
||||
// if tpST.TTL != "" {
|
||||
// if rl.TTL, err = utils.ParseDurationWithSecs(tpST.TTL); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// }
|
||||
// for i, f := range tpST.Filters {
|
||||
// rf := &RequestFilter{Type: f.Type, FieldName: f.FieldName, Values: f.Values}
|
||||
// if err := rf.CompileValues(); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// st.Filters[i] = rf
|
||||
// }
|
||||
// if tpST.ActivationInterval != nil {
|
||||
// if st.ActivationInterval, err = tpST.ActivationInterval.AsActivationInterval(timezone); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// }
|
||||
// if tpST.Thresholds != "" {
|
||||
// if st.Thresholds, err = strconv.ParseFloat(tpST.Thresholds, 64); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// }
|
||||
// return st, nil
|
||||
// }
|
||||
|
||||
@@ -32,26 +32,26 @@ type CSVStorage struct {
|
||||
readerFunc func(string, rune, int) (*csv.Reader, *os.File, error)
|
||||
// file names
|
||||
destinationsFn, ratesFn, destinationratesFn, timingsFn, destinationratetimingsFn, ratingprofilesFn,
|
||||
sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn string
|
||||
sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn string
|
||||
}
|
||||
|
||||
func NewFileCSVStorage(sep rune,
|
||||
destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn,
|
||||
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn string) *CSVStorage {
|
||||
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn string) *CSVStorage {
|
||||
c := new(CSVStorage)
|
||||
c.sep = sep
|
||||
c.readerFunc = openFileCSVStorage
|
||||
c.destinationsFn, c.timingsFn, c.ratesFn, c.destinationratesFn, c.destinationratetimingsFn, c.ratingprofilesFn,
|
||||
c.sharedgroupsFn, c.lcrFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn, c.derivedChargersFn, c.cdrStatsFn, c.usersFn, c.aliasesFn, c.resLimitsFn = destinationsFn, timingsFn,
|
||||
ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn
|
||||
c.sharedgroupsFn, c.lcrFn, c.actionsFn, c.actiontimingsFn, c.actiontriggersFn, c.accountactionsFn, c.derivedChargersFn, c.cdrStatsFn, c.usersFn, c.aliasesFn, c.resLimitsFn, c.statsFn = destinationsFn, timingsFn,
|
||||
ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn
|
||||
return c
|
||||
}
|
||||
|
||||
func NewStringCSVStorage(sep rune,
|
||||
destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, sharedgroupsFn, lcrFn,
|
||||
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn string) *CSVStorage {
|
||||
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn string) *CSVStorage {
|
||||
c := NewFileCSVStorage(sep, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn,
|
||||
ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn)
|
||||
ratingprofilesFn, sharedgroupsFn, lcrFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn, derivedChargersFn, cdrStatsFn, usersFn, aliasesFn, resLimitsFn, statsFn)
|
||||
c.readerFunc = openStringCSVStorage
|
||||
return c
|
||||
}
|
||||
@@ -620,6 +620,34 @@ func (csvs *CSVStorage) GetTPResourceLimits(tpid, id string) ([]*utils.TPResourc
|
||||
return tpResLimits.AsTPResourceLimits(), nil
|
||||
}
|
||||
|
||||
func (csvs *CSVStorage) GetTPStats(tpid, id string) ([]*utils.TPStats, error) {
|
||||
csvReader, fp, err := csvs.readerFunc(csvs.statsFn, csvs.sep, getColumnCount(TpStats{}))
|
||||
if err != nil {
|
||||
//log.Print("Could not load resource limits file: ", err)
|
||||
// allow writing of the other values
|
||||
return nil, err
|
||||
}
|
||||
if fp != nil {
|
||||
defer fp.Close()
|
||||
}
|
||||
var tpStats TpStatsS
|
||||
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
|
||||
if err != nil {
|
||||
log.Print("bad line in TPStats csv: ", err)
|
||||
return nil, err
|
||||
}
|
||||
if tpstats, err := csvLoad(TpStats{}, record); err != nil {
|
||||
log.Print("error loading TPStats: ", err)
|
||||
return nil, err
|
||||
} else {
|
||||
tPstats := tpstats.(TpStats)
|
||||
tPstats.Tpid = tpid
|
||||
tpStats = append(tpStats, &tPstats)
|
||||
}
|
||||
}
|
||||
return tpStats.AsTPStats(), nil
|
||||
}
|
||||
|
||||
func (csvs *CSVStorage) GetTpIds() ([]string, error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
@@ -162,6 +162,7 @@ type LoadReader interface {
|
||||
GetTPActionTriggers(string, string) ([]*utils.TPActionTriggers, error)
|
||||
GetTPAccountActions(*utils.TPAccountActions) ([]*utils.TPAccountActions, error)
|
||||
GetTPResourceLimits(string, string) ([]*utils.TPResourceLimit, error)
|
||||
GetTPStats(string, string) ([]*utils.TPStats, error)
|
||||
}
|
||||
|
||||
type LoadWriter interface {
|
||||
|
||||
@@ -1105,6 +1105,10 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
|
||||
return cdrs, 0, nil
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPStats(string, string) ([]*utils.TPStats, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@ type TpReader struct {
|
||||
users map[string]*UserProfile
|
||||
aliases map[string]*Alias
|
||||
resLimits map[string]*utils.TPResourceLimit
|
||||
stats map[string]*utils.TPStats
|
||||
revDests,
|
||||
revAliases,
|
||||
acntActionPlans map[string][]string
|
||||
@@ -124,6 +125,7 @@ func (tpr *TpReader) Init() {
|
||||
tpr.aliases = make(map[string]*Alias)
|
||||
tpr.derivedChargers = make(map[string]*utils.DerivedChargers)
|
||||
tpr.resLimits = make(map[string]*utils.TPResourceLimit)
|
||||
tpr.stats = make(map[string]*utils.TPStats)
|
||||
tpr.revDests = make(map[string][]string)
|
||||
tpr.revAliases = make(map[string][]string)
|
||||
tpr.acntActionPlans = make(map[string][]string)
|
||||
@@ -1603,6 +1605,23 @@ func (tpr *TpReader) LoadResourceLimits() error {
|
||||
return tpr.LoadResourceLimitsFiltered("")
|
||||
}
|
||||
|
||||
func (tpr *TpReader) LoadStatsFiltered(tag string) error {
|
||||
tps, err := tpr.lr.GetTPStats(tpr.tpid, tag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mapSTs := make(map[string]*utils.TPStats)
|
||||
for _, st := range tps {
|
||||
mapSTs[st.ID] = st
|
||||
}
|
||||
tpr.stats = mapSTs
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tpr *TpReader) LoadStats() error {
|
||||
return tpr.LoadStatsFiltered("")
|
||||
}
|
||||
|
||||
func (tpr *TpReader) LoadAll() (err error) {
|
||||
if err = tpr.LoadDestinations(); err != nil && err.Error() != utils.NotFoundCaps {
|
||||
return
|
||||
@@ -1655,6 +1674,9 @@ func (tpr *TpReader) LoadAll() (err error) {
|
||||
if err = tpr.LoadResourceLimits(); err != nil && err.Error() != utils.NotFoundCaps {
|
||||
return
|
||||
}
|
||||
if err = tpr.LoadStats(); err != nil && err.Error() != utils.NotFoundCaps {
|
||||
return
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -57,6 +57,7 @@ var fileHandlers = map[string]func(*TPCSVImporter, string) error{
|
||||
utils.USERS_CSV: (*TPCSVImporter).importUsers,
|
||||
utils.ALIASES_CSV: (*TPCSVImporter).importAliases,
|
||||
utils.ResourceLimitsCsv: (*TPCSVImporter).importResourceLimits,
|
||||
utils.StatsCsv: (*TPCSVImporter).importStats,
|
||||
}
|
||||
|
||||
func (self *TPCSVImporter) Run() error {
|
||||
@@ -78,6 +79,7 @@ func (self *TPCSVImporter) Run() error {
|
||||
path.Join(self.DirPath, utils.USERS_CSV),
|
||||
path.Join(self.DirPath, utils.ALIASES_CSV),
|
||||
path.Join(self.DirPath, utils.ResourceLimitsCsv),
|
||||
path.Join(self.DirPath, utils.StatsCsv),
|
||||
)
|
||||
files, _ := ioutil.ReadDir(self.DirPath)
|
||||
for _, f := range files {
|
||||
@@ -357,3 +359,15 @@ func (self *TPCSVImporter) importResourceLimits(fn string) error {
|
||||
}
|
||||
return self.StorDb.SetTPResourceLimits(rls)
|
||||
}
|
||||
|
||||
func (self *TPCSVImporter) importStats(fn string) error {
|
||||
return nil
|
||||
// if self.Verbose {
|
||||
// log.Printf("Processing file: <%s> ", fn)
|
||||
// }
|
||||
// rls, err := self.csvr.GetTPResourceLimits(self.TPid, "")
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return self.StorDb.SetTPResourceLimits(rls)
|
||||
}
|
||||
|
||||
@@ -101,6 +101,7 @@ const (
|
||||
USERS_CSV = "Users.csv"
|
||||
ALIASES_CSV = "Aliases.csv"
|
||||
ResourceLimitsCsv = "ResourceLimits.csv"
|
||||
StatsCsv = "Stats.csv"
|
||||
ROUNDING_UP = "*up"
|
||||
ROUNDING_MIDDLE = "*middle"
|
||||
ROUNDING_DOWN = "*down"
|
||||
|
||||
Reference in New Issue
Block a user