refactored LoadWriter, tpexporter and tpimporter

This commit is contained in:
Radu Ioan Fericean
2015-06-01 23:21:00 +03:00
parent 3ce3084962
commit 7330061c1f
11 changed files with 508 additions and 1414 deletions

View File

@@ -52,7 +52,7 @@ func csvLoad(s interface{}, values []string) (interface{}, error) {
return elem.Interface(), nil
}
func csvDump(s interface{}, sep string) (string, error) {
func csvDump(s interface{}) ([]string, error) {
fieldIndexMap := make(map[string]int)
st := reflect.TypeOf(s)
numFields := st.NumField()
@@ -61,7 +61,7 @@ func csvDump(s interface{}, sep string) (string, error) {
index := field.Tag.Get("index")
if index != "" {
if idx, err := strconv.Atoi(index); err != nil {
return "", fmt.Errorf("invalid %v.%v index %v", st.Name(), field.Name, index)
return nil, fmt.Errorf("invalid %v.%v index %v", st.Name(), field.Name, index)
} else {
fieldIndexMap[field.Name] = idx
}
@@ -75,7 +75,7 @@ func csvDump(s interface{}, sep string) (string, error) {
result[fieldIndex] = field.String()
}
}
return strings.Join(result, sep), nil
return result, nil
}
func getColumnCount(s interface{}) int {
@@ -92,7 +92,7 @@ func getColumnCount(s interface{}) int {
return count
}
type TpDestinations []*TpDestination
type TpDestinations []TpDestination
func (tps TpDestinations) GetDestinations() (map[string]*Destination, error) {
destinations := make(map[string]*Destination)
@@ -108,7 +108,7 @@ func (tps TpDestinations) GetDestinations() (map[string]*Destination, error) {
return destinations, nil
}
type TpTimings []*TpTiming
type TpTimings []TpTiming
func (tps TpTimings) GetTimings() (map[string]*utils.TPTiming, error) {
timings := make(map[string]*utils.TPTiming)
@@ -133,7 +133,7 @@ func (tps TpTimings) GetTimings() (map[string]*utils.TPTiming, error) {
return timings, nil
}
type TpRates []*TpRate
type TpRates []TpRate
func (tps TpRates) GetRates() (map[string]*utils.TPRate, error) {
rates := make(map[string]*utils.TPRate)
@@ -159,7 +159,7 @@ func (tps TpRates) GetRates() (map[string]*utils.TPRate, error) {
return rates, nil
}
type TpDestinationRates []*TpDestinationRate
type TpDestinationRates []TpDestinationRate
func (tps TpDestinationRates) GetDestinationRates() (map[string]*utils.TPDestinationRate, error) {
rts := make(map[string]*utils.TPDestinationRate)
@@ -189,7 +189,7 @@ func (tps TpDestinationRates) GetDestinationRates() (map[string]*utils.TPDestina
return rts, nil
}
type TpRatingPlans []*TpRatingPlan
type TpRatingPlans []TpRatingPlan
func (tps TpRatingPlans) GetRatingPlans() (map[string][]*utils.TPRatingPlanBinding, error) {
rpbns := make(map[string][]*utils.TPRatingPlanBinding)
@@ -238,7 +238,7 @@ func GetRateInterval(rpl *utils.TPRatingPlanBinding, dr *utils.DestinationRate)
return
}
type TpRatingProfiles []*TpRatingProfile
type TpRatingProfiles []TpRatingProfile
func (tps TpRatingProfiles) GetRatingProfiles() (map[string]*utils.TPRatingProfile, error) {
rpfs := make(map[string]*utils.TPRatingProfile)
@@ -269,7 +269,7 @@ func (tps TpRatingProfiles) GetRatingProfiles() (map[string]*utils.TPRatingProfi
return rpfs, nil
}
type TpSharedGroups []*TpSharedGroup
type TpSharedGroups []TpSharedGroup
func (tps TpSharedGroups) GetSharedGroups() (map[string][]*utils.TPSharedGroup, error) {
sgs := make(map[string][]*utils.TPSharedGroup)
@@ -283,7 +283,7 @@ func (tps TpSharedGroups) GetSharedGroups() (map[string][]*utils.TPSharedGroup,
return sgs, nil
}
type TpActions []*TpAction
type TpActions []TpAction
func (tps TpActions) GetActions() (map[string][]*utils.TPAction, error) {
as := make(map[string][]*utils.TPAction)
@@ -310,7 +310,7 @@ func (tps TpActions) GetActions() (map[string][]*utils.TPAction, error) {
return as, nil
}
type TpActionPlans []*TpActionPlan
type TpActionPlans []TpActionPlan
func (tps TpActionPlans) GetActionPlans() (map[string][]*utils.TPActionTiming, error) {
ats := make(map[string][]*utils.TPActionTiming)
@@ -320,7 +320,7 @@ func (tps TpActionPlans) GetActionPlans() (map[string][]*utils.TPActionTiming, e
return ats, nil
}
type TpActionTriggers []*TpActionTrigger
type TpActionTriggers []TpActionTrigger
func (tps TpActionTriggers) GetActionTriggers() (map[string][]*utils.TPActionTrigger, error) {
ats := make(map[string][]*utils.TPActionTrigger)
@@ -350,7 +350,7 @@ func (tps TpActionTriggers) GetActionTriggers() (map[string][]*utils.TPActionTri
return ats, nil
}
type TpAccountActions []*TpAccountAction
type TpAccountActions []TpAccountAction
func (tps TpAccountActions) GetAccountActions() (map[string]*utils.TPAccountActions, error) {
aas := make(map[string]*utils.TPAccountActions)
@@ -369,7 +369,7 @@ func (tps TpAccountActions) GetAccountActions() (map[string]*utils.TPAccountActi
return aas, nil
}
type TpDerivedChargers []*TpDerivedCharger
type TpDerivedChargers []TpDerivedCharger
func (tps TpDerivedChargers) GetDerivedChargers() (map[string]*utils.TPDerivedChargers, error) {
dcs := make(map[string]*utils.TPDerivedChargers)
@@ -400,7 +400,7 @@ func (tps TpDerivedChargers) GetDerivedChargers() (map[string]*utils.TPDerivedCh
return dcs, nil
}
type TpCdrStats []*TpCdrStat
type TpCdrStats []TpCdrStat
func (tps TpCdrStats) GetCdrStats() (map[string][]*utils.TPCdrStat, error) {
css := make(map[string][]*utils.TPCdrStat)

View File

@@ -112,6 +112,10 @@ func (rpf *TpRatingProfile) SetRatingProfileId(id string) error {
return nil
}
func (rpf *TpRatingProfile) GetRatingProfileId() string {
return utils.ConcatenatedKey(rpf.Loadid, rpf.Direction, rpf.Tenant, rpf.Category, rpf.Subject)
}
type TpLcrRules struct {
Id int64
Tpid string
@@ -129,6 +133,23 @@ type TpLcrRules struct {
CreatedAt time.Time
}
func (lcr *TpLcrRules) SetLcrRulesId(id string) error {
ids := strings.Split(id, utils.CONCATENATED_KEY_SEP)
if len(ids) != 5 {
return fmt.Errorf("wrong LcrRules Id: %s", id)
}
lcr.Direction = ids[0]
lcr.Tenant = ids[2]
lcr.Category = ids[3]
lcr.Account = ids[3]
lcr.Subject = ids[5]
return nil
}
func (lcr *TpLcrRules) GetLcrRulesId() string {
return utils.LCRKey(lcr.Direction, lcr.Tenant, lcr.Category, lcr.Account, lcr.Subject)
}
type TpAction struct {
Id int64
Tpid string
@@ -209,6 +230,10 @@ func (aa *TpAccountAction) SetAccountActionId(id string) error {
return nil
}
func (aa *TpAccountAction) GetAccountActionId() string {
return utils.AccountKey(aa.Tenant, aa.Account, aa.Direction)
}
type TpSharedGroup struct {
Id int64
Tpid string
@@ -259,6 +284,10 @@ func (tpdc *TpDerivedCharger) SetDerivedChargersId(id string) error {
return nil
}
func (tpdc *TpDerivedCharger) GetDerivedChargersId() string {
return utils.ConcatenatedKey(tpdc.Loadid, tpdc.Direction, tpdc.Tenant, tpdc.Category, tpdc.Account, tpdc.Subject)
}
type TpCdrStat struct {
Id int64
Tpid string

View File

@@ -62,7 +62,7 @@ func openStringCSVStorage(data string, comma rune, nrFields int) (csvReader *csv
return
}
func (csvs *CSVStorage) GetTpTimings(string, string) ([]*TpTiming, error) {
func (csvs *CSVStorage) GetTpTimings(string, string) ([]TpTiming, error) {
csvReader, fp, err := csvs.readerFunc(csvs.timingsFn, csvs.sep, getColumnCount(TpTiming{}))
if err != nil {
log.Print("Could not load timings file: ", err)
@@ -72,7 +72,7 @@ func (csvs *CSVStorage) GetTpTimings(string, string) ([]*TpTiming, error) {
if fp != nil {
defer fp.Close()
}
var tpTimings []*TpTiming
var tpTimings []TpTiming
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in timings csv: ", err)
@@ -81,14 +81,13 @@ func (csvs *CSVStorage) GetTpTimings(string, string) ([]*TpTiming, error) {
if tpTiming, err := csvLoad(TpTiming{}, record); err != nil {
return nil, err
} else {
tp := tpTiming.(TpTiming)
tpTimings = append(tpTimings, &tp)
tpTimings = append(tpTimings, tpTiming.(TpTiming))
}
}
return nil, nil
}
func (csvs *CSVStorage) GetTpDestinations(tpid, tag string) ([]*TpDestination, error) {
func (csvs *CSVStorage) GetTpDestinations(tpid, tag string) ([]TpDestination, error) {
csvReader, fp, err := csvs.readerFunc(csvs.destinationsFn, csvs.sep, getColumnCount(TpDestination{}))
if err != nil {
log.Print("Could not load destinations file: ", err)
@@ -98,7 +97,7 @@ func (csvs *CSVStorage) GetTpDestinations(tpid, tag string) ([]*TpDestination, e
if fp != nil {
defer fp.Close()
}
var tpDests []*TpDestination
var tpDests []TpDestination
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in destinations csv: ", err)
@@ -107,14 +106,13 @@ func (csvs *CSVStorage) GetTpDestinations(tpid, tag string) ([]*TpDestination, e
if tpDest, err := csvLoad(TpDestination{}, record); err != nil {
return nil, err
} else {
tp := tpDest.(TpDestination)
tpDests = append(tpDests, &tp)
tpDests = append(tpDests, tpDest.(TpDestination))
}
}
return tpDests, nil
}
func (csvs *CSVStorage) GetTpRates(tpid, tag string) ([]*TpRate, error) {
func (csvs *CSVStorage) GetTpRates(tpid, tag string) ([]TpRate, error) {
csvReader, fp, err := csvs.readerFunc(csvs.ratesFn, csvs.sep, getColumnCount(TpRate{}))
if err != nil {
log.Print("Could not load rates file: ", err)
@@ -124,7 +122,7 @@ func (csvs *CSVStorage) GetTpRates(tpid, tag string) ([]*TpRate, error) {
if fp != nil {
defer fp.Close()
}
var tpRates []*TpRate
var tpRates []TpRate
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in rates csv: ", err)
@@ -133,14 +131,13 @@ func (csvs *CSVStorage) GetTpRates(tpid, tag string) ([]*TpRate, error) {
if tpRate, err := csvLoad(TpRate{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpRate)
tpRates = append(tpRates, &tp)
tpRates = append(tpRates, tpRate.(TpRate))
}
}
return tpRates, nil
}
func (csvs *CSVStorage) GetTpDestinationRates(tpid, tag string, p *utils.Paginator) ([]*TpDestinationRate, error) {
func (csvs *CSVStorage) GetTpDestinationRates(tpid, tag string, p *utils.Paginator) ([]TpDestinationRate, error) {
csvReader, fp, err := csvs.readerFunc(csvs.destinationratesFn, csvs.sep, getColumnCount(TpDestinationRate{}))
if err != nil {
log.Print("Could not load destination_rates file: ", err)
@@ -150,7 +147,7 @@ func (csvs *CSVStorage) GetTpDestinationRates(tpid, tag string, p *utils.Paginat
if fp != nil {
defer fp.Close()
}
var tpDestinationRates []*TpDestinationRate
var tpDestinationRates []TpDestinationRate
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in destinationrates csv: ", err)
@@ -159,14 +156,13 @@ func (csvs *CSVStorage) GetTpDestinationRates(tpid, tag string, p *utils.Paginat
if tpRate, err := csvLoad(TpDestinationRate{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpDestinationRate)
tpDestinationRates = append(tpDestinationRates, &tp)
tpDestinationRates = append(tpDestinationRates, tpRate.(TpDestinationRate))
}
}
return tpDestinationRates, nil
}
func (csvs *CSVStorage) GetTpRatingPlans(tpid, tag string, p *utils.Paginator) ([]*TpRatingPlan, error) {
func (csvs *CSVStorage) GetTpRatingPlans(tpid, tag string, p *utils.Paginator) ([]TpRatingPlan, error) {
csvReader, fp, err := csvs.readerFunc(csvs.destinationratetimingsFn, csvs.sep, getColumnCount(TpRatingPlan{}))
if err != nil {
log.Print("Could not load rate plans file: ", err)
@@ -176,7 +172,7 @@ func (csvs *CSVStorage) GetTpRatingPlans(tpid, tag string, p *utils.Paginator) (
if fp != nil {
defer fp.Close()
}
var tpRatingPlans []*TpRatingPlan
var tpRatingPlans []TpRatingPlan
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in rating plans csv: ", err)
@@ -185,14 +181,13 @@ func (csvs *CSVStorage) GetTpRatingPlans(tpid, tag string, p *utils.Paginator) (
if tpRate, err := csvLoad(TpRatingPlan{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpRatingPlan)
tpRatingPlans = append(tpRatingPlans, &tp)
tpRatingPlans = append(tpRatingPlans, tpRate.(TpRatingPlan))
}
}
return tpRatingPlans, nil
}
func (csvs *CSVStorage) GetTpRatingProfiles(filter *utils.TPRatingProfile) ([]*TpRatingProfile, error) {
func (csvs *CSVStorage) GetTpRatingProfiles(filter *utils.TPRatingProfile) ([]TpRatingProfile, error) {
csvReader, fp, err := csvs.readerFunc(csvs.ratingprofilesFn, csvs.sep, getColumnCount(TpRatingProfile{}))
if err != nil {
log.Print("Could not load rating profiles file: ", err)
@@ -202,7 +197,7 @@ func (csvs *CSVStorage) GetTpRatingProfiles(filter *utils.TPRatingProfile) ([]*T
if fp != nil {
defer fp.Close()
}
var tpRatingProfiles []*TpRatingProfile
var tpRatingProfiles []TpRatingProfile
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line rating profiles csv: ", err)
@@ -211,14 +206,13 @@ func (csvs *CSVStorage) GetTpRatingProfiles(filter *utils.TPRatingProfile) ([]*T
if tpRate, err := csvLoad(TpRatingProfile{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpRatingProfile)
tpRatingProfiles = append(tpRatingProfiles, &tp)
tpRatingProfiles = append(tpRatingProfiles, tpRate.(TpRatingProfile))
}
}
return tpRatingProfiles, nil
}
func (csvs *CSVStorage) GetTpSharedGroups(tpid, tag string) ([]*TpSharedGroup, error) {
func (csvs *CSVStorage) GetTpSharedGroups(tpid, tag string) ([]TpSharedGroup, error) {
csvReader, fp, err := csvs.readerFunc(csvs.sharedgroupsFn, csvs.sep, getColumnCount(TpSharedGroup{}))
if err != nil {
log.Print("Could not load shared groups file: ", err)
@@ -229,7 +223,7 @@ func (csvs *CSVStorage) GetTpSharedGroups(tpid, tag string) ([]*TpSharedGroup, e
defer fp.Close()
}
var tpSharedGroups []*TpSharedGroup
var tpSharedGroups []TpSharedGroup
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in shared groups csv: ", err)
@@ -238,14 +232,13 @@ func (csvs *CSVStorage) GetTpSharedGroups(tpid, tag string) ([]*TpSharedGroup, e
if tpRate, err := csvLoad(TpSharedGroup{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpSharedGroup)
tpSharedGroups = append(tpSharedGroups, &tp)
tpSharedGroups = append(tpSharedGroups, tpRate.(TpSharedGroup))
}
}
return tpSharedGroups, nil
}
func (csvs *CSVStorage) GetTpLCRs(tpid, tag string) ([]*TpLcrRules, error) {
func (csvs *CSVStorage) GetTpLCRs(tpid, tag string) ([]TpLcrRules, error) {
csvReader, fp, err := csvs.readerFunc(csvs.lcrFn, csvs.sep, getColumnCount(TpLcrRules{}))
if err != nil {
log.Print("Could not load LCR rules file: ", err)
@@ -255,7 +248,7 @@ func (csvs *CSVStorage) GetTpLCRs(tpid, tag string) ([]*TpLcrRules, error) {
if fp != nil {
defer fp.Close()
}
var tpLCRs []*TpLcrRules
var tpLCRs []TpLcrRules
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if tpRate, err := csvLoad(TpLcrRules{}, record); err != nil {
if err != nil {
@@ -264,14 +257,13 @@ func (csvs *CSVStorage) GetTpLCRs(tpid, tag string) ([]*TpLcrRules, error) {
}
return nil, err
} else {
tp := tpRate.(TpLcrRules)
tpLCRs = append(tpLCRs, &tp)
tpLCRs = append(tpLCRs, tpRate.(TpLcrRules))
}
}
return tpLCRs, nil
}
func (csvs *CSVStorage) GetTpActions(tpid, tag string) ([]*TpAction, error) {
func (csvs *CSVStorage) GetTpActions(tpid, tag string) ([]TpAction, error) {
csvReader, fp, err := csvs.readerFunc(csvs.actionsFn, csvs.sep, getColumnCount(TpAction{}))
if err != nil {
log.Print("Could not load action file: ", err)
@@ -281,7 +273,7 @@ func (csvs *CSVStorage) GetTpActions(tpid, tag string) ([]*TpAction, error) {
if fp != nil {
defer fp.Close()
}
var tpActions []*TpAction
var tpActions []TpAction
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in actions csv: ", err)
@@ -290,14 +282,13 @@ func (csvs *CSVStorage) GetTpActions(tpid, tag string) ([]*TpAction, error) {
if tpRate, err := csvLoad(TpAction{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpAction)
tpActions = append(tpActions, &tp)
tpActions = append(tpActions, tpRate.(TpAction))
}
}
return tpActions, nil
}
func (csvs *CSVStorage) GetTPActionPlans(tpid, tag string) ([]*TpActionPlan, error) {
func (csvs *CSVStorage) GetTPActionPlans(tpid, tag string) ([]TpActionPlan, error) {
csvReader, fp, err := csvs.readerFunc(csvs.actiontimingsFn, csvs.sep, getColumnCount(TpActionPlan{}))
if err != nil {
log.Print("Could not load action plans file: ", err)
@@ -307,19 +298,18 @@ func (csvs *CSVStorage) GetTPActionPlans(tpid, tag string) ([]*TpActionPlan, err
if fp != nil {
defer fp.Close()
}
var tpActionPlans []*TpActionPlan
var tpActionPlans []TpActionPlan
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if tpRate, err := csvLoad(TpActionPlan{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpActionPlan)
tpActionPlans = append(tpActionPlans, &tp)
tpActionPlans = append(tpActionPlans, tpRate.(TpActionPlan))
}
}
return tpActionPlans, nil
}
func (csvs *CSVStorage) GetTpActionTriggers(tpid, tag string) ([]*TpActionTrigger, error) {
func (csvs *CSVStorage) GetTpActionTriggers(tpid, tag string) ([]TpActionTrigger, error) {
csvReader, fp, err := csvs.readerFunc(csvs.actiontriggersFn, csvs.sep, getColumnCount(TpActionTrigger{}))
if err != nil {
log.Print("Could not load action triggers file: ", err)
@@ -329,7 +319,7 @@ func (csvs *CSVStorage) GetTpActionTriggers(tpid, tag string) ([]*TpActionTrigge
if fp != nil {
defer fp.Close()
}
var tpActionTriggers []*TpActionTrigger
var tpActionTriggers []TpActionTrigger
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in action triggers csv: ", err)
@@ -338,14 +328,13 @@ func (csvs *CSVStorage) GetTpActionTriggers(tpid, tag string) ([]*TpActionTrigge
if tpRate, err := csvLoad(TpActionTrigger{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpActionTrigger)
tpActionTriggers = append(tpActionTriggers, &tp)
tpActionTriggers = append(tpActionTriggers, tpRate.(TpActionTrigger))
}
}
return tpActionTriggers, nil
}
func (csvs *CSVStorage) GetTpAccountActions(filter []*TpAccountAction) ([]*TpAccountAction, error) {
func (csvs *CSVStorage) GetTpAccountActions(filter []TpAccountAction) ([]*TpAccountAction, error) {
csvReader, fp, err := csvs.readerFunc(csvs.accountactionsFn, csvs.sep, getColumnCount(TpAccountAction{}))
if err != nil {
log.Print("Could not load account actions file: ", err)
@@ -355,7 +344,7 @@ func (csvs *CSVStorage) GetTpAccountActions(filter []*TpAccountAction) ([]*TpAcc
if fp != nil {
defer fp.Close()
}
var tpAccountActions []*TpAccountAction
var tpAccountActions []TpAccountAction
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in account actions csv: ", err)
@@ -364,14 +353,13 @@ func (csvs *CSVStorage) GetTpAccountActions(filter []*TpAccountAction) ([]*TpAcc
if tpRate, err := csvLoad(TpAccountAction{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpAccountAction)
tpAccountActions = append(tpAccountActions, &tp)
tpAccountActions = append(tpAccountActions, tpRate.(TpAccountAction))
}
}
return tpAccountActions, nil
}
func (csvs *CSVStorage) GetTpDerivedChargers(filter *utils.TPDerivedChargers) ([]*TpDerivedCharger, error) {
func (csvs *CSVStorage) GetTpDerivedChargers(filter *utils.TPDerivedChargers) ([]TpDerivedCharger, error) {
csvReader, fp, err := csvs.readerFunc(csvs.derivedChargersFn, csvs.sep, getColumnCount(TpDerivedCharger{}))
if err != nil {
log.Print("Could not load derivedChargers file: ", err)
@@ -381,7 +369,7 @@ func (csvs *CSVStorage) GetTpDerivedChargers(filter *utils.TPDerivedChargers) ([
if fp != nil {
defer fp.Close()
}
var tpDerivedChargers []*TpDerivedCharger
var tpDerivedChargers []TpDerivedCharger
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in derived chargers csv: ", err)
@@ -390,14 +378,13 @@ func (csvs *CSVStorage) GetTpDerivedChargers(filter *utils.TPDerivedChargers) ([
if tpRate, err := csvLoad(TpDerivedCharger{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpDerivedCharger)
tpDerivedChargers = append(tpDerivedChargers, &tp)
tpDerivedChargers = append(tpDerivedChargers, tpRate.(TpDerivedCharger))
}
}
return tpDerivedChargers, nil
}
func (csvs *CSVStorage) GetTpCdrStats(tpid, tag string) ([]*TpCdrStat, error) {
func (csvs *CSVStorage) GetTpCdrStats(tpid, tag string) ([]TpCdrStat, error) {
csvReader, fp, err := csvs.readerFunc(csvs.derivedChargersFn, csvs.sep, getColumnCount(TpCdrStat{}))
if err != nil {
log.Print("Could not load derivedChargers file: ", err)
@@ -407,7 +394,7 @@ func (csvs *CSVStorage) GetTpCdrStats(tpid, tag string) ([]*TpCdrStat, error) {
if fp != nil {
defer fp.Close()
}
var tpCdrStats []*TpCdrStat
var tpCdrStats []TpCdrStat
for record, err := csvReader.Read(); err != io.EOF; record, err = csvReader.Read() {
if err != nil {
log.Print("bad line in cdr stats csv: ", err)
@@ -416,8 +403,7 @@ func (csvs *CSVStorage) GetTpCdrStats(tpid, tag string) ([]*TpCdrStat, error) {
if tpRate, err := csvLoad(TpCdrStat{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpCdrStat)
tpCdrStats = append(tpCdrStats, &tp)
tpCdrStats = append(tpCdrStats, tpRate.(TpCdrStat))
}
}
return tpCdrStats, nil

View File

@@ -137,38 +137,38 @@ type LoadStorage interface {
type LoadReader interface {
GetTpIds() ([]string, error)
GetTpTableIds(string, string, utils.TPDistinctIds, map[string]string, *utils.Paginator) ([]string, error)
GetTpTimings(string, string) ([]*TpTiming, error)
GetTpDestinations(string, string) ([]*TpDestination, error)
GetTpRates(string, string) ([]*TpRate, error)
GetTpDestinationRates(string, string, *utils.Paginator) ([]*TpDestinationRate, error)
GetTpRatingPlans(string, string, *utils.Paginator) ([]*TpRatingPlan, error)
GetTpRatingProfiles(*utils.TPRatingProfile) ([]*TpRatingProfile, error)
GetTpSharedGroups(string, string) ([]*TpSharedGroup, error)
GetTpCdrStats(string, string) ([]*TpCdrStat, error)
GetTpDerivedChargers(*utils.TPDerivedChargers) ([]*TpDerivedCharger, error)
GetTpLCRs(string, string) ([]*TpLcrRules, error)
GetTpActions(string, string) ([]*TpAction, error)
GetTpActionPlans(string, string) ([]*TpActionPlan, error)
GetTpActionTriggers(string, string) ([]*TpActionTrigger, error)
GetTpAccountActions(*utils.TPAccountActions) ([]*TpAccountAction, error)
GetTpTimings(string, string) ([]TpTiming, error)
GetTpDestinations(string, string) ([]TpDestination, error)
GetTpRates(string, string) ([]TpRate, error)
GetTpDestinationRates(string, string, *utils.Paginator) ([]TpDestinationRate, error)
GetTpRatingPlans(string, string, *utils.Paginator) ([]TpRatingPlan, error)
GetTpRatingProfiles(*utils.TPRatingProfile) ([]TpRatingProfile, error)
GetTpSharedGroups(string, string) ([]TpSharedGroup, error)
GetTpCdrStats(string, string) ([]TpCdrStat, error)
GetTpDerivedChargers(*utils.TPDerivedChargers) ([]TpDerivedCharger, error)
GetTpLCRs(string, string) ([]TpLcrRules, error)
GetTpActions(string, string) ([]TpAction, error)
GetTpActionPlans(string, string) ([]TpActionPlan, error)
GetTpActionTriggers(string, string) ([]TpActionTrigger, error)
GetTpAccountActions(*utils.TPAccountActions) ([]TpAccountAction, error)
}
type LoadWriter interface {
RemTpData(string, string, ...string) error
SetTpTiming(*utils.ApierTPTiming) error
SetTpDestination(string, *Destination) error
SetTpRates(string, map[string][]*utils.RateSlot) error
SetTpDestinationRates(string, map[string][]*utils.DestinationRate) error
SetTpRatingPlans(string, map[string][]*utils.TPRatingPlanBinding) error
SetTpRatingProfiles(string, map[string]*utils.TPRatingProfile) error
SetTpSharedGroups(string, map[string][]*utils.TPSharedGroup) error
SetTpCdrStats(string, map[string][]*utils.TPCdrStat) error
SetTpDerivedChargers(string, map[string][]*utils.TPDerivedCharger) error
SetTpLCRs(string, map[string]*LCR) error
SetTpActions(string, map[string][]*utils.TPAction) error
SetTpActionTimings(string, map[string][]*utils.TPActionTiming) error
SetTpActionTriggers(string, map[string][]*utils.TPActionTrigger) error
SetTpAccountActions(string, map[string]*utils.TPAccountActions) error
SetTpTimings([]TpTiming) error
SetTpDestinations([]TpDestination) error
SetTpRates([]TpRate) error
SetTpDestinationRates([]TpDestinationRate) error
SetTpRatingPlans([]TpRatingPlan) error
SetTpRatingProfiles([]TpRatingProfile) error
SetTpSharedGroups([]TpSharedGroup) error
SetTpCdrStats([]TpCdrStat) error
SetTpDerivedChargers([]TpDerivedCharger) error
SetTpLCRs([]TpLcrRules) error
SetTpActions([]TpAction) error
SetTpActionTimings([]TpActionPlan) error
SetTpActionTriggers([]TpActionTrigger) error
SetTpAccountActions([]TpAccountActions) error
}
type Marshaler interface {

View File

@@ -62,17 +62,6 @@ func (self *MySQLStorage) Flush(scriptsPath string) (err error) {
return nil
}
func (self *MySQLStorage) SetTPTiming(tm *utils.ApierTPTiming) error {
if tm == nil {
return nil //Nothing to set
}
if _, err := self.Db.Exec(fmt.Sprintf("INSERT INTO %s (tpid, tag, years, months, month_days, week_days, time, created_at) VALUES('%s','%s','%s','%s','%s','%s','%s', %d) ON DUPLICATE KEY UPDATE years=values(years), months=values(months), month_days=values(month_days), week_days=values(week_days), time=values(time)",
utils.TBL_TP_TIMINGS, tm.TPid, tm.TimingId, tm.Years, tm.Months, tm.MonthDays, tm.WeekDays, tm.Time, time.Now().Unix())); err != nil {
return err
}
return nil
}
func (self *MySQLStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) (err error) {
if cc == nil {
return nil

View File

@@ -65,24 +65,6 @@ func (self *PostgresStorage) Flush(scriptsPath string) (err error) {
return nil
}
func (self *PostgresStorage) SetTPTiming(tm *utils.ApierTPTiming) error {
if tm == nil {
return nil //Nothing to set
}
tx := self.db.Begin()
if err := tx.Save(&TpTiming{Tpid: tm.TPid, Tag: tm.TimingId, Years: tm.Years, Months: tm.Months, MonthDays: tm.MonthDays, WeekDays: tm.WeekDays, Time: tm.Time, CreatedAt: time.Now()}).Error; err != nil {
tx.Rollback()
tx = self.db.Begin()
updated := tx.Model(TpTiming{}).Where(&TpTiming{Tpid: tm.TPid, Tag: tm.TimingId}).Updates(&TpTiming{Years: tm.Years, Months: tm.Months, MonthDays: tm.MonthDays, WeekDays: tm.WeekDays, Time: tm.Time})
if updated.Error != nil {
tx.Rollback()
return updated.Error
}
}
tx.Commit()
return nil
}
func (self *PostgresStorage) LogCallCost(cgrid, source, runid string, cc *CallCost) (err error) {
if cc == nil {
return nil

View File

@@ -160,10 +160,6 @@ func (self *SQLStorage) GetTpTableIds(tpid, table string, distinct utils.TPDisti
return ids, nil
}
func (self *SQLStorage) SetTpTiming(tm *utils.ApierTPTiming) error {
return errors.New(utils.ERR_NOT_IMPLEMENTED)
}
func (self *SQLStorage) RemTpData(table, tpid string, args ...string) error {
tx := self.db.Begin()
if len(table) == 0 { // Remove tpid out of all tables
@@ -197,22 +193,22 @@ func (self *SQLStorage) RemTpData(table, tpid string, args ...string) error {
return nil
}
func (self *SQLStorage) SetTpDestination(tpid string, dest *Destination) error {
if len(dest.Prefixes) == 0 {
func (self *PostgresStorage) SetTPTiming(timings []TpTiming) error {
if len(timings) == 0 {
return nil
}
m := make(map[string]bool)
tx := self.db.Begin()
if err := tx.Where(&TpDestination{Tpid: tpid, Tag: dest.Id}).Delete(TpDestination{}).Error; err != nil {
tx.Rollback()
return err
}
for _, prefix := range dest.Prefixes {
save := tx.Save(&TpDestination{
Tpid: tpid,
Tag: dest.Id,
Prefix: prefix,
CreatedAt: time.Now(),
})
for _, timing := range timings {
if found, _ := m[timing.Tag]; !found {
m[timing.Tag] = true
if err := tx.Where(&TpTiming{Tpid: timing.Tpid, Tag: timing.Tag}).Delete(TpDestination{}).Error; err != nil {
tx.Rollback()
return err
}
}
save := tx.Save(timing)
if save.Error != nil {
tx.Rollback()
return save.Error
@@ -222,431 +218,339 @@ func (self *SQLStorage) SetTpDestination(tpid string, dest *Destination) error {
return nil
}
func (self *SQLStorage) SetTpRates(tpid string, rts map[string][]*utils.RateSlot) error {
if len(rts) == 0 {
return nil //Nothing to set
func (self *SQLStorage) SetTpDestination(dests []TpDestination) error {
if len(dests) == 0 {
return nil
}
tx := self.db.Begin()
for rtId, rSlots := range rts {
if err := tx.Where(&TpRate{Tpid: tpid, Tag: rtId}).Delete(TpRate{}).Error; err != nil {
tx.Rollback()
return err
}
for _, rs := range rSlots {
save := tx.Save(&TpRate{
Tpid: tpid,
Tag: rtId,
ConnectFee: rs.ConnectFee,
Rate: rs.Rate,
RateUnit: rs.RateUnit,
RateIncrement: rs.RateIncrement,
GroupIntervalStart: rs.GroupIntervalStart,
CreatedAt: time.Now(),
})
if save.Error != nil {
tx.Rollback()
return save.Error
}
m := make(map[string]bool)
tx := self.db.Begin()
for _, dest := range dests {
if found, _ := m[dest.Tag]; !found {
m[dest.Tag] = true
if err := tx.Where(&TpDestination{Tpid: dest.Tpid, Tag: dest.Tag}).Delete(TpDestination{}).Error; err != nil {
tx.Rollback()
return err
}
}
save := tx.Save(dest)
if save.Error != nil {
tx.Rollback()
return save.Error
}
}
tx.Commit()
return nil
}
func (self *SQLStorage) SetTpDestinationRates(tpid string, drs map[string][]*utils.DestinationRate) error {
func (self *SQLStorage) SetTpRates(rs []TpRate) error {
if len(rs) == 0 {
return nil //Nothing to set
}
m := make(map[string]bool)
tx := self.db.Begin()
for _, rate := range rs {
if found, _ := m[rate.Tag]; !found {
m[rate.Tag] = true
if err := tx.Where(&TpRate{Tpid: rate.Tpid, Tag: rate.Tag}).Delete(TpRate{}).Error; err != nil {
tx.Rollback()
return err
}
}
save := tx.Save(rPlan)
if save.Error != nil {
tx.Rollback()
return save.Error
}
}
tx.Commit()
return nil
}
func (self *SQLStorage) SetTpDestinationRates(drs []TpDestinationRate) error {
if len(drs) == 0 {
return nil //Nothing to set
}
m := make(map[string]bool)
tx := self.db.Begin()
for drId, dRates := range drs {
if err := tx.Where(&TpDestinationRate{Tpid: tpid, Tag: drId}).Delete(TpDestinationRate{}).Error; err != nil {
tx.Rollback()
return err
}
for _, dr := range dRates {
saved := tx.Save(&TpDestinationRate{
Tpid: tpid,
Tag: drId,
DestinationsTag: dr.DestinationId,
RatesTag: dr.RateId,
RoundingMethod: dr.RoundingMethod,
RoundingDecimals: dr.RoundingDecimals,
CreatedAt: time.Now(),
})
if saved.Error != nil {
for _, dRate := range drs {
if found, _ := m[dRate.Tag]; !found {
m[dRate.Tag] = true
if err := tx.Where(&TpDestinationRate{Tpid: dRate.Tpid, Tag: dRate.Tag}).Delete(TpDestinationRate{}).Error; err != nil {
tx.Rollback()
return saved.Error
return err
}
}
saved := tx.Save(dRate)
if saved.Error != nil {
tx.Rollback()
return saved.Error
}
}
tx.Commit()
return nil
}
func (self *SQLStorage) SetTpRatingPlans(tpid string, drts map[string][]*utils.TPRatingPlanBinding) error {
func (self *SQLStorage) SetTpRatingPlans(drts []TpRatingPlan) error {
if len(drts) == 0 {
return nil //Nothing to set
}
m := make(map[string]bool)
tx := self.db.Begin()
for rpId, rPlans := range drts {
if err := tx.Where(&TpRatingPlan{Tpid: tpid, Tag: rpId}).Delete(TpRatingPlan{}).Error; err != nil {
tx.Rollback()
return err
}
for _, rp := range rPlans {
saved := tx.Save(&TpRatingPlan{
Tpid: tpid,
Tag: rpId,
DestratesTag: rp.DestinationRatesId,
TimingTag: rp.TimingId,
Weight: rp.Weight,
CreatedAt: time.Now(),
})
if saved.Error != nil {
for _, rPlan := range drts {
if found, _ := m[rPlan.Tag]; !found {
m[rPlan.Tag] = true
if err := tx.Where(&TpRatingPlan{Tpid: rPlan.Tpid, Tag: rPlan.Tag}).Delete(TpRatingPlan{}).Error; err != nil {
tx.Rollback()
return saved.Error
return err
}
}
saved := tx.Save(rPlan)
if saved.Error != nil {
tx.Rollback()
return saved.Error
}
}
tx.Commit()
return nil
}
func (self *SQLStorage) SetTpRatingProfiles(tpid string, rpfs map[string]*utils.TPRatingProfile) error {
func (self *SQLStorage) SetTpRatingProfiles(rpfs []TpRatingProfile) error {
if len(rpfs) == 0 {
return nil //Nothing to set
}
m := make(map[string]bool)
tx := self.db.Begin()
for _, rpf := range rpfs {
if err := tx.Where(&TpRatingProfile{Tpid: tpid, Loadid: rpf.LoadId, Direction: rpf.Direction, Tenant: rpf.Tenant, Category: rpf.Category, Subject: rpf.Subject}).Delete(TpRatingProfile{}).Error; err != nil {
tx.Rollback()
return err
}
for _, ra := range rpf.RatingPlanActivations {
saved := tx.Save(&TpRatingProfile{
Tpid: rpf.TPid,
Loadid: rpf.LoadId,
Tenant: rpf.Tenant,
Category: rpf.Category,
Subject: rpf.Subject,
Direction: rpf.Direction,
ActivationTime: ra.ActivationTime,
RatingPlanTag: ra.RatingPlanId,
FallbackSubjects: ra.FallbackSubjects,
CdrStatQueueIds: ra.CdrStatQueueIds,
CreatedAt: time.Now(),
})
if saved.Error != nil {
if found, _ := m[rpf.GetRatingProfileId()]; !found {
m[rpf.GetRatingProfileId()] = true
if err := tx.Where(&TpRatingProfile{Tpid: rpf.Tpid, Loadid: rpf.Loadid, Direction: rpf.Direction, Tenant: rpf.Tenant, Category: rpf.Category, Subject: rpf.Subject}).Delete(TpRatingProfile{}).Error; err != nil {
tx.Rollback()
return saved.Error
return err
}
}
saved := tx.Save(rpf)
if saved.Error != nil {
tx.Rollback()
return saved.Error
}
}
tx.Commit()
return nil
}
func (self *SQLStorage) SetTpSharedGroups(tpid string, sgs map[string][]*utils.TPSharedGroup) error {
func (self *SQLStorage) SetTpSharedGroups(sgs []TpSharedGroup) error {
if len(sgs) == 0 {
return nil //Nothing to set
}
m := make(map[string]bool)
tx := self.db.Begin()
for sgId, sGroups := range sgs {
if err := tx.Where(&TpSharedGroup{Tpid: tpid, Tag: sgId}).Delete(TpSharedGroup{}).Error; err != nil {
tx.Rollback()
return err
}
for _, sg := range sGroups {
saved := tx.Save(&TpSharedGroup{
Tpid: tpid,
Tag: sgId,
Account: sg.Account,
Strategy: sg.Strategy,
RatingSubject: sg.RatingSubject,
CreatedAt: time.Now(),
})
if saved.Error != nil {
for _, sGroup := range sgs {
if found, _ := m[sGroup.Tag]; !found {
m[sGroup.Tag] = true
if err := tx.Where(&TpSharedGroup{Tpid: sGroup.Tpid, Tag: sGroup.Tag}).Delete(TpSharedGroup{}).Error; err != nil {
tx.Rollback()
return saved.Error
return err
}
}
saved := tx.Save(sGroup)
if saved.Error != nil {
tx.Rollback()
return saved.Error
}
}
tx.Commit()
return nil
}
func (self *SQLStorage) SetTpCdrStats(tpid string, css map[string][]*utils.TPCdrStat) error {
func (self *SQLStorage) SetTpCdrStats(css []TpCdrStat) error {
if len(css) == 0 {
return nil //Nothing to set
}
m := make(map[string]bool)
tx := self.db.Begin()
for csId, cStats := range css {
if err := tx.Where(&TpCdrStat{Tpid: tpid, Tag: csId}).Delete(TpCdrStat{}).Error; err != nil {
tx.Rollback()
return err
}
for _, cs := range cStats {
ql, _ := strconv.Atoi(cs.QueueLength)
saved := tx.Save(&TpCdrStat{
Tpid: tpid,
Tag: csId,
QueueLength: ql,
TimeWindow: cs.TimeWindow,
Metrics: cs.Metrics,
SetupInterval: cs.SetupInterval,
Tors: cs.TORs,
CdrHosts: cs.CdrHosts,
CdrSources: cs.CdrSources,
ReqTypes: cs.ReqTypes,
Directions: cs.Directions,
Tenants: cs.Tenants,
Categories: cs.Categories,
Accounts: cs.Accounts,
Subjects: cs.Subjects,
DestinationPrefixes: cs.DestinationPrefixes,
UsageInterval: cs.UsageInterval,
Suppliers: cs.Suppliers,
DisconnectCauses: cs.DisconnectCauses,
MediationRunids: cs.MediationRunIds,
RatedAccounts: cs.RatedAccounts,
RatedSubjects: cs.RatedSubjects,
CostInterval: cs.CostInterval,
ActionTriggers: cs.ActionTriggers,
CreatedAt: time.Now(),
})
if saved.Error != nil {
for _, cStat := range css {
if found, _ := m[cStat.Tag]; !found {
m[cStat.Tag] = true
if err := tx.Where(&TpCdrStat{Tpid: cStat.Tpid, Tag: cStat.Tag}).Delete(TpCdrStat{}).Error; err != nil {
tx.Rollback()
return saved.Error
return err
}
}
saved := tx.Save(cStat)
if saved.Error != nil {
tx.Rollback()
return saved.Error
}
}
tx.Commit()
return nil
}
func (self *SQLStorage) SetTpDerivedChargers(tpid string, sgs map[string][]*utils.TPDerivedCharger) error {
func (self *SQLStorage) SetTpDerivedChargers(sgs []TpDerivedCharger) error {
if len(sgs) == 0 {
return nil //Nothing to set
}
m := make(map[string]bool)
tx := self.db.Begin()
for dcId, dChargers := range sgs {
tmpDc := &TpDerivedCharger{}
if err := tmpDc.SetDerivedChargersId(dcId); err != nil {
tx.Rollback()
return err
}
if err := tx.Where(tmpDc).Delete(TpDerivedCharger{}).Error; err != nil {
tx.Rollback()
return err
}
for _, dc := range dChargers {
newDc := &TpDerivedCharger{
Tpid: tpid,
Runid: dc.RunId,
RunFilters: dc.RunFilters,
ReqTypeField: dc.ReqTypeField,
DirectionField: dc.DirectionField,
TenantField: dc.TenantField,
CategoryField: dc.CategoryField,
AccountField: dc.AccountField,
SubjectField: dc.SubjectField,
DestinationField: dc.DestinationField,
SetupTimeField: dc.SetupTimeField,
AnswerTimeField: dc.AnswerTimeField,
UsageField: dc.UsageField,
SupplierField: dc.SupplierField,
CreatedAt: time.Now(),
}
if err := newDc.SetDerivedChargersId(dcId); err != nil {
for _, dCharger := range sgs {
if found, _ := m[dCharger.GetDerivedChargersId()]; !found {
m[dCharger.GetDerivedChargersId()] = true
tmpDc := &TpDerivedCharger{}
if err := tmpDc.SetDerivedChargersId(dCharger.GetDerivedChargersId()); err != nil {
tx.Rollback()
return err
}
if err := tx.Save(newDc).Error; err != nil {
if err := tx.Where(tmpDc).Delete(TpDerivedCharger{}).Error; err != nil {
tx.Rollback()
return err
}
}
if err := tx.Save(dCharger).Error; err != nil {
tx.Rollback()
return err
}
}
tx.Commit()
return nil
}
func (self *SQLStorage) SetTpLCRs(tpid string, lcrs map[string]*LCR) error {
if len(lcrs) == 0 {
func (self *SQLStorage) SetTpLcrRules(sgs []TpLcrRules) error {
if len(sgs) == 0 {
return nil //Nothing to set
}
var buffer bytes.Buffer
buffer.WriteString(fmt.Sprintf("INSERT INTO %s (tpid,direction,tenant,customer,destination_tag,category,strategy,suppliers,activation_time,weight) VALUES ", utils.TBL_TP_LCRS))
i := 0
for _, lcr := range lcrs {
for _, act := range lcr.Activations {
for _, entry := range act.Entries {
if i != 0 { //Consecutive values after the first will be prefixed with "," as separator
buffer.WriteRune(',')
}
buffer.WriteString(fmt.Sprintf("('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%v', '%v')",
tpid, lcr.Tenant, lcr.Category, lcr.Direction, lcr.Account, lcr.Subject, entry.DestinationId, entry.RPCategory, entry.Strategy, entry.RPCategory, act.ActivationTime, entry.Weight))
i++
m := make(map[string]bool)
tx := self.db.Begin()
for _, lcr := range sgs {
if found, _ := m[lcr.GetLcrRulesId()]; !found {
m[lcr.GetLcrRulesId()] = true
tmpDc := &TpLcrRules{}
if err := tmpDc.SetLcrRulesId(lcr.GetLcrRulesId()); err != nil {
tx.Rollback()
return err
}
if err := tx.Where(tmpDc).Delete(TpLcrRules{}).Error; err != nil {
tx.Rollback()
return err
}
}
if err := tx.Save(lcr).Error; err != nil {
tx.Rollback()
return err
}
}
if _, err := self.Db.Exec(buffer.String()); err != nil {
return err
}
tx.Commit()
return nil
}
func (self *SQLStorage) SetTpActions(tpid string, acts map[string][]*utils.TPAction) error {
func (self *SQLStorage) SetTpActions(acts []TpAction) error {
if len(acts) == 0 {
return nil //Nothing to set
}
m := make(map[string]bool)
tx := self.db.Begin()
for acId, acs := range acts {
if err := tx.Where(&TpAction{Tpid: tpid, Tag: acId}).Delete(TpAction{}).Error; err != nil {
tx.Rollback()
return err
}
for _, ac := range acs {
saved := tx.Save(&TpAction{
Tpid: tpid,
Tag: acId,
Action: ac.Identifier,
BalanceTag: ac.BalanceId,
BalanceType: ac.BalanceType,
Direction: ac.Direction,
Units: ac.Units,
ExpiryTime: ac.ExpiryTime,
TimingTags: ac.TimingTags,
DestinationTags: ac.DestinationIds,
RatingSubject: ac.RatingSubject,
Category: ac.Category,
SharedGroup: ac.SharedGroup,
BalanceWeight: ac.BalanceWeight,
ExtraParameters: ac.ExtraParameters,
Weight: ac.Weight,
CreatedAt: time.Now(),
})
if saved.Error != nil {
for _, a := range acts {
if found, _ := m[a.Tag]; !found {
m[a.Tag] = true
if err := tx.Where(&TpAction{Tpid: a.Tpid, Tag: a.Tag}).Delete(TpAction{}).Error; err != nil {
tx.Rollback()
return saved.Error
return err
}
}
saved := tx.Save(a)
if saved.Error != nil {
tx.Rollback()
return saved.Error
}
}
tx.Commit()
return nil
}
// Sets actionTimings in sqlDB. Imput is expected in form map[actionTimingId][]rows, eg a full .csv file content
func (self *SQLStorage) SetTpActionTimings(tpid string, ats map[string][]*utils.TPActionTiming) error {
func (self *SQLStorage) SetTpActionPlan(ats []TpActionPlan) error {
if len(ats) == 0 {
return nil //Nothing to set
}
m := make(map[string]bool)
tx := self.db.Begin()
for apId, aPlans := range ats {
if err := tx.Where(&TpActionPlan{Tpid: tpid, Tag: apId}).Delete(TpActionPlan{}).Error; err != nil {
tx.Rollback()
return err
}
for _, ap := range aPlans {
saved := tx.Save(&TpActionPlan{
Tpid: tpid,
Tag: apId,
ActionsTag: ap.ActionsId,
TimingTag: ap.TimingId,
Weight: ap.Weight,
CreatedAt: time.Now(),
})
if saved.Error != nil {
for _, aPlan := range ats {
if found, _ := m[aPlan.Tag]; !found {
m[aPlan.Tag] = true
if err := tx.Where(&TpActionPlan{Tpid: aPlan.Tpid, Tag: aPlan.Tag}).Delete(TpActionPlan{}).Error; err != nil {
tx.Rollback()
return saved.Error
return err
}
}
saved := tx.Save(aPlan)
if saved.Error != nil {
tx.Rollback()
return saved.Error
}
}
r := tx.Commit()
return r.Error
}
func (self *SQLStorage) GetTPActionPlan(tpid, tag string) ([]*TpActionPlan, error) {
var tpActionPlans []*TpActionPlan
if err := self.db.Where(&TpActionPlan{Tpid: tpid, Tag: tag}).Find(&tpActionPlans).Error; err != nil {
return nil, err
}
return tpActionPlans, nil
}
func (self *SQLStorage) SetTpActionTriggers(tpid string, ats map[string][]*utils.TPActionTrigger) error {
func (self *SQLStorage) SetTpActionTriggers(ats []TpActionTrigger) error {
if len(ats) == 0 {
return nil //Nothing to set
}
m := make(map[string]bool)
tx := self.db.Begin()
for atId, aTriggers := range ats {
if err := tx.Where(&TpActionTrigger{Tpid: tpid, Tag: atId}).Delete(TpActionTrigger{}).Error; err != nil {
tx.Rollback()
return err
}
for _, at := range aTriggers {
id := at.Id
if id == "" {
id = utils.GenUUID()
}
saved := tx.Save(&TpActionTrigger{
Tpid: tpid,
UniqueId: id,
Tag: atId,
ThresholdType: at.ThresholdType,
ThresholdValue: at.ThresholdValue,
Recurrent: at.Recurrent,
MinSleep: at.MinSleep,
BalanceTag: at.BalanceId,
BalanceType: at.BalanceType,
BalanceDirection: at.BalanceDirection,
BalanceDestinationTags: at.BalanceDestinationIds,
BalanceWeight: at.BalanceWeight,
BalanceExpiryTime: at.BalanceExpirationDate,
BalanceTimingTags: at.BalanceTimingTags,
BalanceRatingSubject: at.BalanceRatingSubject,
BalanceCategory: at.BalanceCategory,
BalanceSharedGroup: at.BalanceSharedGroup,
MinQueuedItems: at.MinQueuedItems,
ActionsTag: at.ActionsId,
Weight: at.Weight,
CreatedAt: time.Now(),
})
if saved.Error != nil {
for _, aTrigger := range ats {
if found, _ := m[aTrigger.Tag]; !found {
m[aTrigger.Tag] = true
if err := tx.Where(&TpActionTrigger{Tpid: aTrigger.Tpid, Tag: aTrigger.Tag}).Delete(TpActionTrigger{}).Error; err != nil {
tx.Rollback()
return saved.Error
return err
}
}
saved := tx.Save(aTrigger)
if saved.Error != nil {
tx.Rollback()
return saved.Error
}
}
tx.Commit()
return nil
}
// Sets a group of account actions. Map key has the role of grouping within a tpid
func (self *SQLStorage) SetTpAccountActions(tpid string, aas map[string]*utils.TPAccountActions) error {
func (self *SQLStorage) SetTpAccountActions(aas []TpAccountAction) error {
if len(aas) == 0 {
return nil //Nothing to set
}
m := make(map[string]bool)
tx := self.db.Begin()
for _, aa := range aas {
if err := tx.Where(&TpAccountAction{Tpid: tpid, Loadid: aa.LoadId, Direction: aa.Direction, Tenant: aa.Tenant, Account: aa.Account}).Delete(TpAccountAction{}).Error; err != nil {
tx.Rollback()
return err
if found, _ := m[aa.GetAccountActionId()]; !found {
m[aa.GetAccountActionId()] = true
if err := tx.Where(&TpAccountAction{Tpid: aa.Tpid, Loadid: aa.Loadid, Direction: aa.Direction, Tenant: aa.Tenant, Account: aa.Account}).Delete(TpAccountAction{}).Error; err != nil {
tx.Rollback()
return err
}
}
saved := tx.Save(&TpAccountAction{
Tpid: aa.TPid,
Loadid: aa.LoadId,
Tenant: aa.Tenant,
Account: aa.Account,
Direction: aa.Direction,
ActionPlanTag: aa.ActionPlanId,
ActionTriggersTag: aa.ActionTriggersId,
CreatedAt: time.Now(),
})
saved := tx.Save(aa)
if saved.Error != nil {
tx.Rollback()
return saved.Error
@@ -1158,8 +1062,8 @@ func (self *SQLStorage) RemStoredCdrs(cgrIds []string) error {
return nil
}
func (self *SQLStorage) GetTpDestinations(tpid, tag string) ([]*TpDestination, error) {
var tpDests []*TpDestination
func (self *SQLStorage) GetTpDestinations(tpid, tag string) ([]TpDestination, error) {
var tpDests []TpDestination
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
@@ -1171,9 +1075,9 @@ func (self *SQLStorage) GetTpDestinations(tpid, tag string) ([]*TpDestination, e
return tpDests, nil
}
func (self *SQLStorage) GetTpRates(tpid, tag string) ([]*TpRate, error) {
func (self *SQLStorage) GetTpRates(tpid, tag string) ([]TpRate, error) {
rts := make(map[string]*utils.TPRate)
var tpRates []*TpRate
var tpRates []TpRate
q := self.db.Where("tpid = ?", tpid).Order("id")
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
@@ -1184,8 +1088,8 @@ func (self *SQLStorage) GetTpRates(tpid, tag string) ([]*TpRate, error) {
return tpRates, nil
}
func (self *SQLStorage) GetTpDestinationRates(tpid, tag string, pagination *utils.Paginator) ([]*TpDestinationRate, error) {
var tpDestinationRates []*TpDestinationRate
func (self *SQLStorage) GetTpDestinationRates(tpid, tag string, pagination *utils.Paginator) ([]TpDestinationRate, error) {
var tpDestinationRates []TpDestinationRate
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
@@ -1205,8 +1109,8 @@ func (self *SQLStorage) GetTpDestinationRates(tpid, tag string, pagination *util
return tpDestinationRates, nil
}
func (self *SQLStorage) GetTpTimings(tpid, tag string) ([]*TpTiming, error) {
var tpTimings []*TpTiming
func (self *SQLStorage) GetTpTimings(tpid, tag string) ([]TpTiming, error) {
var tpTimings []TpTiming
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
@@ -1217,8 +1121,8 @@ func (self *SQLStorage) GetTpTimings(tpid, tag string) ([]*TpTiming, error) {
return tpTimings, nil
}
func (self *SQLStorage) GetTpRatingPlans(tpid, tag string, pagination *utils.Paginator) ([]*TpRatingPlan, error) {
var tpRatingPlans []*TpRatingPlan
func (self *SQLStorage) GetTpRatingPlans(tpid, tag string, pagination *utils.Paginator) ([]TpRatingPlan, error) {
var tpRatingPlans []TpRatingPlan
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
@@ -1238,8 +1142,8 @@ func (self *SQLStorage) GetTpRatingPlans(tpid, tag string, pagination *utils.Pag
return tpRatingPlans, nil
}
func (self *SQLStorage) GetTpRatingProfiles(qryRpf *utils.TPRatingProfile) ([]*TpRatingProfile, error) {
var tpRpfs []*TpRatingProfile
func (self *SQLStorage) GetTpRatingProfiles(qryRpf *utils.TPRatingProfile) ([]TpRatingProfile, error) {
var tpRpfs []TpRatingProfile
q := self.db.Where("tpid = ?", qryRpf.TPid)
if len(qryRpf.Direction) != 0 {
q = q.Where("direction = ?", qryRpf.Direction)
@@ -1263,8 +1167,8 @@ func (self *SQLStorage) GetTpRatingProfiles(qryRpf *utils.TPRatingProfile) ([]*T
return tpRpfs, nil
}
func (self *SQLStorage) GetTpSharedGroups(tpid, tag string) ([]*TpSharedGroup, error) {
var tpShareGroups []*TpSharedGroup
func (self *SQLStorage) GetTpSharedGroups(tpid, tag string) ([]TpSharedGroup, error) {
var tpShareGroups []TpSharedGroup
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
@@ -1276,8 +1180,8 @@ func (self *SQLStorage) GetTpSharedGroups(tpid, tag string) ([]*TpSharedGroup, e
}
func (self *SQLStorage) GetTpLCRs(tpid, tag string) ([]*TpLcrRules, error) {
var tpLcrRules []*TpLcrRules
func (self *SQLStorage) GetTpLCRs(tpid, tag string) ([]TpLcrRules, error) {
var tpLcrRules []TpLcrRules
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
@@ -1289,8 +1193,8 @@ func (self *SQLStorage) GetTpLCRs(tpid, tag string) ([]*TpLcrRules, error) {
return tpLcrRules, nil
}
func (self *SQLStorage) GetTpActions(tpid, tag string) ([]*TpAction, error) {
var tpActions []*TpAction
func (self *SQLStorage) GetTpActions(tpid, tag string) ([]TpAction, error) {
var tpActions []TpAction
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
@@ -1302,18 +1206,35 @@ func (self *SQLStorage) GetTpActions(tpid, tag string) ([]*TpAction, error) {
return tpActions, nil
}
func (self *SQLStorage) GetTpActionTriggers(tpid, tag string) ([]*TpActionTrigger, error) {
var tpActionTriggers []*TpActionTrigger
if err := self.db.Where(&TpActionTrigger{Tpid: tpid, Tag: tag}).Find(&tpActionTriggers).Error; err != nil {
func (self *SQLStorage) GetTpActionTriggers(tpid, tag string) ([]TpActionTrigger, error) {
var tpActionTriggers []TpActionTrigger
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
}
if err := q.Find(&tpActionTriggers).Error; err != nil {
return nil, err
}
return tpActionTriggers, nil
}
func (self *SQLStorage) GetTpAccountActions(aaFltr *utils.TPAccountActions) ([]*TpAccountAction, error) {
func (self *SQLStorage) GetTpActionPlan(tpid, tag string) ([]*TpActionPlan, error) {
var tpActionPlans []*TpActionPlan
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
}
if err := q.Find(&tpActionPlans).Error; err != nil {
return nil, err
}
var tpAccActs []*TpAccountAction
return tpActionPlans, nil
}
func (self *SQLStorage) GetTpAccountActions(aaFltr *utils.TPAccountActions) ([]TpAccountAction, error) {
var tpAccActs []TpAccountAction
q := self.db.Where("tpid = ?", aaFltr.TPid)
if len(aaFltr.Direction) != 0 {
q = q.Where("direction = ?", aaFltr.Direction)
@@ -1333,8 +1254,8 @@ func (self *SQLStorage) GetTpAccountActions(aaFltr *utils.TPAccountActions) ([]*
return tpAccActs, nil
}
func (self *SQLStorage) GetTpDerivedChargers(dc *utils.TPDerivedChargers) ([]*TpDerivedCharger, error) {
var tpDerivedChargers []*TpDerivedCharger
func (self *SQLStorage) GetTpDerivedChargers(dc *utils.TPDerivedChargers) ([]TpDerivedCharger, error) {
var tpDerivedChargers []TpDerivedCharger
q := self.db.Where("tpid = ?", dc.TPid)
if len(dc.Direction) != 0 {
q = q.Where("direction = ?", dc.Direction)
@@ -1360,8 +1281,8 @@ func (self *SQLStorage) GetTpDerivedChargers(dc *utils.TPDerivedChargers) ([]*Tp
return tpDerivedChargers, nil
}
func (self *SQLStorage) GetTpCdrStats(tpid, tag string) ([]*TpCdrStat, error) {
var tpCdrStats []*TpCdrStat
func (self *SQLStorage) GetTpCdrStats(tpid, tag string) ([]TpCdrStat, error) {
var tpCdrStats []TpCdrStat
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)

View File

@@ -34,8 +34,6 @@ import (
var (
TPExportFormats = []string{utils.CSV}
exportedFiles = []string{utils.TIMINGS_CSV, utils.DESTINATIONS_CSV, utils.RATES_CSV, utils.DESTINATION_RATES_CSV, utils.RATING_PLANS_CSV, utils.RATING_PROFILES_CSV,
utils.SHARED_GROUPS_CSV, utils.ACTIONS_CSV, utils.ACTION_PLANS_CSV, utils.ACTION_TRIGGERS_CSV, utils.ACCOUNT_ACTIONS_CSV, utils.DERIVED_CHARGERS_CSV, utils.CDR_STATS_CSV}
)
func NewTPExporter(storDb LoadStorage, tpID, expPath, fileFormat, sep string, compress bool) (*TPExporter, error) {
@@ -88,26 +86,103 @@ type TPExporter struct {
func (self *TPExporter) Run() error {
self.removeFiles() // Make sure we clean the folder before starting with new one
for _, fHandler := range []func() error{
self.exportTimings,
self.exportDestinations,
self.exportRates,
self.exportDestinationRates,
self.exportRatingPlans,
self.exportRatingProfiles,
self.exportSharedGroups,
self.exportActions,
self.exportActionPlans,
self.exportActionTriggers,
self.exportAccountActions,
self.exportDerivedChargers,
self.exportCdrStats,
} {
if err := fHandler(); err != nil {
toExportMap := make(map[string]interface{})
storData, err := self.storDb.GetTpTimings(self.tpID, "")
if err != nil {
return err
}
toExportMap[utils.TIMINGS_CSV] = storData
storData, err := self.storDb.GetTpDestinations(self.tpID, "")
if err != nil {
return err
}
toExportMap[utils.DESTINATIONS_CSV] = storData
storData, err := self.storDb.GetTpRates(self.tpID, "")
if err != nil {
return err
}
toExportMap[utils.RATES_CSV] = storData
storData, err = self.storDb.GetTpRates(self.tpID, "")
if err != nil {
return err
}
toExportMap[utils.RATES_CSV] = storData
storData, err = self.storDb.GetTpDestinationRates(self.tpID, "", nil)
if err != nil {
return err
}
toExportMap[utils.DESTINATION_RATES_CSV] = storData
storData, err = self.storDb.GetTpRatingPlans(self.tpID, "", nil)
if err != nil {
return err
}
toExportMap[utils.RATING_PLANS_CSV] = storData
storData, err = self.storDb.GetTpRatingProfiles(&utils.TPRatingProfile{TPid: self.tpID})
if err != nil {
return err
}
toExportMap[utils.RATING_PROFILE_CSV] = storData
storData, err = self.storDb.GetTpSharedGroups(self.tpID, "")
if err != nil {
return err
}
toExportMap[utils.SHARED_GROUPS_CSV] = storData
storData, err = self.storDb.GetTpActions(self.tpID, "")
if err != nil {
return err
}
toExportMap[utils.ACTIONS_CSV] = storData
storData, err = self.storDb.GetTpActionPlans(self.tpID, "")
if err != nil {
return err
}
toExportMap[utils.ACTION_PLANS_CSV] = storData
storData, err = self.storDb.GetTpActionTriggers(self.tpID, "")
if err != nil {
return err
}
toExportMap[utils.ACTIONS_TRIGGERS_CSV] = storData
storData, err = self.storDb.GetTpAccountActions(&utils.TPAccountActions{TPid: self.tpID})
if err != nil {
return err
}
toExportMap[utils.ACCOUNT_ACTIONS_CSV] = storData
storData, err = self.storDb.GetTpDerivedChargers(&utils.TPDerivedChargers{TPid: self.tpID})
if err != nil {
return err
}
toExportMap[utils.DERIVED_CHARGERS_CSV] = storData
storData, err = self.storDb.GetTpCdrStats(self.tpID, "")
if err != nil {
return err
}
toExportMap[utils.CDR_STATS_CSV] = storData
for fileName, storData := range toExportMap {
for _, tpItem := range storData {
exportedData = append(exportedData, tpItem)
}
if err := self.writeOut(fileName, exportedData); err != nil {
self.removeFiles()
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
}
if self.compress {
if err := self.zipWritter.Close(); err != nil {
return err
@@ -128,7 +203,7 @@ func (self *TPExporter) removeFiles() error {
}
// General method to write the content out to a file on path or zip archive
func (self *TPExporter) writeOut(fileName string, tpData []utils.ExportedData) error {
func (self *TPExporter) writeOut(fileName string, tpData []interface{}) error {
if len(tpData) == 0 {
return nil
}
@@ -162,264 +237,18 @@ func (self *TPExporter) writeOut(fileName string, tpData []utils.ExportedData) e
}
for _, tpItem := range tpData {
for _, record := range tpItem.AsExportSlice() {
if err := writerOut.Write(record); err != nil {
return err
}
record, err := csvDump(tpItem)
if err != nil {
return err
}
if err := writerOut.Write(record); err != nil {
return err
}
}
writerOut.Flush() // In case of .csv will dump data on hdd
return nil
}
func (self *TPExporter) exportTimings() error {
fileName := exportedFiles[0] // Define it out of group so we make sure it is cleaned up by removeFiles
storData, err := self.storDb.GetTpTimings(self.tpID, "")
if err != nil {
return nil
}
exportedData := make([]utils.ExportedData, len(storData))
idx := 0
for _, tpItem := range storData {
exportedData[idx] = tpItem
idx += 1
}
if err := self.writeOut(fileName, exportedData); err != nil {
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
return nil
}
func (self *TPExporter) exportDestinations() error {
fileName := exportedFiles[1]
storData, err := self.storDb.GetTpDestinations(self.tpID, "")
dsts := TpDestinations(storData).GetDestinations()
if err != nil {
return nil
}
exportedData := make([]utils.ExportedData, len(storData))
idx := 0
for _, dst := range dsts {
exportedData[idx] = &utils.TPDestination{TPid: self.tpID, DestinationId: dst.Id, Prefixes: dst.Prefixes}
idx += 1
}
if err := self.writeOut(fileName, exportedData); err != nil {
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
return nil
}
func (self *TPExporter) exportRates() error {
fileName := exportedFiles[2]
storData, err := self.storDb.GetTpRates(self.tpID, "")
if err != nil {
return nil
}
exportedData := make([]utils.ExportedData, len(storData))
idx := 0
for _, tpItem := range storData {
exportedData[idx] = tpItem
idx += 1
}
if err := self.writeOut(fileName, exportedData); err != nil {
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
return nil
}
func (self *TPExporter) exportDestinationRates() error {
fileName := exportedFiles[3]
storData, err := self.storDb.GetTpDestinationRates(self.tpID, "", nil)
if err != nil {
return nil
}
exportedData := make([]utils.ExportedData, len(storData))
idx := 0
for _, tpItem := range storData {
exportedData[idx] = tpItem
idx += 1
}
if err := self.writeOut(fileName, exportedData); err != nil {
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
return nil
}
func (self *TPExporter) exportRatingPlans() error {
fileName := exportedFiles[4]
storData, err := self.storDb.GetTpRatingPlans(self.tpID, "", nil)
if err != nil {
return nil
}
exportedData := make([]utils.ExportedData, len(storData))
idx := 0
for rpId, rpBinding := range storData {
exportedData[idx] = &utils.TPRatingPlan{TPid: self.tpID, RatingPlanId: rpId, RatingPlanBindings: rpBinding}
idx += 1
}
if err := self.writeOut(fileName, exportedData); err != nil {
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
return nil
}
func (self *TPExporter) exportRatingProfiles() error {
fileName := exportedFiles[5]
storData, err := self.storDb.GetTpRatingProfiles(&utils.TPRatingProfile{TPid: self.tpID})
if err != nil {
return nil
}
exportedData := make([]utils.ExportedData, len(storData))
idx := 0
for _, tpItem := range storData {
exportedData[idx] = tpItem
idx += 1
}
if err := self.writeOut(fileName, exportedData); err != nil {
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
return nil
}
func (self *TPExporter) exportSharedGroups() error {
fileName := exportedFiles[6]
storData, err := self.storDb.GetTpSharedGroups(self.tpID, "")
if err != nil {
return nil
}
exportedData := make([]utils.ExportedData, len(storData))
idx := 0
for sgId, sg := range storData {
exportedData[idx] = &utils.TPSharedGroups{TPid: self.tpID, SharedGroupsId: sgId, SharedGroups: sg}
idx += 1
}
if err := self.writeOut(fileName, exportedData); err != nil {
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
return nil
}
func (self *TPExporter) exportActions() error {
fileName := exportedFiles[7]
storData, err := self.storDb.GetTpActions(self.tpID, "")
if err != nil {
return nil
}
exportedData := make([]utils.ExportedData, len(storData))
idx := 0
for actsId, acts := range storData {
exportedData[idx] = &utils.TPActions{TPid: self.tpID, ActionsId: actsId, Actions: acts}
idx += 1
}
if err := self.writeOut(fileName, exportedData); err != nil {
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
return nil
}
func (self *TPExporter) exportActionPlans() error {
fileName := exportedFiles[8]
storData, err := self.storDb.GetTPActionTimings(self.tpID, "")
if err != nil {
return nil
}
exportedData := make([]utils.ExportedData, len(storData))
idx := 0
for apId, ats := range storData {
exportedData[idx] = &utils.TPActionPlan{TPid: self.tpID, Id: apId, ActionPlan: ats}
idx += 1
}
if err := self.writeOut(fileName, exportedData); err != nil {
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
return nil
}
func (self *TPExporter) exportActionTriggers() error {
fileName := exportedFiles[9]
storData, err := self.storDb.GetTpActionTriggers(self.tpID, "")
if err != nil {
return nil
}
exportedData := make([]utils.ExportedData, len(storData))
idx := 0
for atId, ats := range storData {
exportedData[idx] = &utils.TPActionTriggers{TPid: self.tpID, ActionTriggersId: atId, ActionTriggers: ats}
idx += 1
}
if err := self.writeOut(fileName, exportedData); err != nil {
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
return nil
}
func (self *TPExporter) exportAccountActions() error {
fileName := exportedFiles[10]
storData, err := self.storDb.GetTpAccountActions(&utils.TPAccountActions{TPid: self.tpID})
if err != nil {
return nil
}
exportedData := make([]utils.ExportedData, len(storData))
idx := 0
for _, tpItem := range storData {
exportedData[idx] = tpItem
idx += 1
}
if err := self.writeOut(fileName, exportedData); err != nil {
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
return nil
}
func (self *TPExporter) exportDerivedChargers() error {
fileName := exportedFiles[11]
storData, err := self.storDb.GetTpDerivedChargers(&utils.TPDerivedChargers{TPid: self.tpID})
if err != nil {
return nil
}
exportedData := make([]utils.ExportedData, len(storData))
idx := 0
for _, tpItem := range storData {
exportedData[idx] = tpItem
idx += 1
}
if err := self.writeOut(fileName, exportedData); err != nil {
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
return nil
}
func (self *TPExporter) exportCdrStats() error {
fileName := exportedFiles[12]
storData, err := self.storDb.GetTpCdrStats(self.tpID, "")
if err != nil {
return nil
}
exportedData := make([]utils.ExportedData, len(storData))
idx := 0
for cdrstId, cdrsts := range storData {
exportedData[idx] = &utils.TPCdrStats{TPid: self.tpID, CdrStatsId: cdrstId, CdrStats: cdrsts}
idx += 1
}
if err := self.writeOut(fileName, exportedData); err != nil {
return err
}
self.exportedFiles = append(self.exportedFiles, fileName)
return nil
}
func (self *TPExporter) ExportStats() *utils.ExportedTPStats {
return &utils.ExportedTPStats{ExportPath: self.exportPath, ExportedFiles: self.exportedFiles, Compressed: self.compress}
}

View File

@@ -20,22 +20,21 @@ package engine
import (
"fmt"
"io"
"io/ioutil"
"log"
"strconv"
"github.com/cgrates/cgrates/utils"
)
// Import tariff plan from csv into storDb
type TPCSVImporter struct {
TPid string // Load data on this tpid
StorDb LoadStorage // StorDb connection handle
DirPath string // Directory path to import from
Sep rune // Separator in the csv file
Verbose bool // If true will print a detailed information instead of silently discarding it
ImportId string // Use this to differentiate between imports (eg: when autogenerating fields like RatingProfileId
TPid string // Load data on this tpid
StorDb LoadWriter // StorDb connection handle
csvr LoadReader
DirPath string // Directory path to import from
Sep rune // Separator in the csv file
Verbose bool // If true will print a detailed information instead of silently discarding it
ImportId string // Use this to differentiate between imports (eg: when autogenerating fields like RatingProfileId
}
// Maps csv file to handler which should process it. Defined like this since tests on 1.0.3 were failing on Travis.
@@ -57,6 +56,7 @@ var fileHandlers = map[string]func(*TPCSVImporter, string) error{
}
func (self *TPCSVImporter) Run() error {
self.csvr = NewFileCSVStorage(self.sep, utils.DESTINATIONS_CSV, utils.TIMINGS_CSV, utils.RATES_CSV, utils.DESTINATION_RATES_CSV, utils.RATING_PLANS_CSV, utils.RATING_PROFILES_CSV, utils.SHARED_GROUPS_CSV, utils.LCRS_CSV, utils.ACTIONS_CSV, utils.ACTION_PLANS_CSV, utils.ACTION_TRIGGERS_CSV, utils.ACCOUNT_ACTIONS_CSV, utils.DERIVED_CHARGERS_CSV, utils.CDR_STATS_CSV)
files, _ := ioutil.ReadDir(self.DirPath)
for _, f := range files {
fHandler, hasName := fileHandlers[f.Name()]
@@ -75,658 +75,154 @@ func (self *TPCSVImporter) importTimings(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
tps, err := self.csvr.GetTpTimings(self.TPid, "")
if err != nil {
return err
}
lineNr := 0
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
tm := &utils.ApierTPTiming{TPid: self.TPid, TimingId: record[0], Years: record[1], Months: record[2], MonthDays: record[3], WeekDays: record[4], Time: record[5]}
if err := self.StorDb.SetTPTiming(tm); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
}
return nil
return self.StorDb.SetTPTiming(tps)
}
func (self *TPCSVImporter) importDestinations(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
tps, err := self.csvr.GetTpDestinations(self.TPid, "")
if err != nil {
return err
}
lineNr := 0
dests := make(map[string]*Destination) // Key:destId, value: listOfPrefixes
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
} else {
if dst, hasIt := dests[record[0]]; hasIt {
dst.Prefixes = append(dst.Prefixes, record[1])
} else {
dests[record[0]] = &Destination{record[0], []string{record[1]}}
}
}
}
for _, dst := range dests {
if err := self.StorDb.SetTPDestination(self.TPid, dst); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
}
return nil
return self.StorDb.SetTpDestinations(tps)
}
func (self *TPCSVImporter) importRates(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
tps, err := self.csvr.GetTpRates(self.TPid, "")
if err != nil {
return err
}
lineNr := 0
rates := make(map[string][]*utils.RateSlot)
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
newRt, err := NewLoadRate(record[0], record[1], record[2], record[3], record[4], record[5])
if err != nil {
return err
}
if _, hasIt := rates[record[0]]; !hasIt {
rates[record[0]] = make([]*utils.RateSlot, 0)
}
rates[record[0]] = append(rates[record[0]], newRt.RateSlots...)
}
if err := self.StorDb.SetTPRates(self.TPid, rates); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
return self.StorDb.SetTpRates(tps)
}
func (self *TPCSVImporter) importDestinationRates(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
tps, err := self.csvr.GetTpDestinationRates(self.TPid, "", nil)
if err != nil {
return err
}
lineNr := 0
drs := make(map[string][]*utils.DestinationRate)
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
roundingDecimals, err := strconv.Atoi(record[4])
if err != nil {
log.Printf("Error parsing rounding decimals: %s", record[4])
return err
}
maxCost, err := strconv.ParseFloat(record[5], 64)
if err != nil {
log.Printf("Error parsing max cost from: %v", record[5])
return err
}
if _, hasIt := drs[record[0]]; !hasIt {
drs[record[0]] = make([]*utils.DestinationRate, 0)
}
drs[record[0]] = append(drs[record[0]], &utils.DestinationRate{
DestinationId: record[1],
RateId: record[2],
RoundingMethod: record[3],
RoundingDecimals: roundingDecimals,
MaxCost: maxCost,
MaxCostStrategy: record[6],
})
}
if err := self.StorDb.SetTPDestinationRates(self.TPid, drs); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
return self.StorDb.SetTpDestinationRates(tps)
}
func (self *TPCSVImporter) importRatingPlans(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
tps, err := self.csvr.GetTpRatingPlans(self.TPid, "", nil)
if err != nil {
return err
}
lineNr := 0
rpls := make(map[string][]*utils.TPRatingPlanBinding)
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
weight, err := strconv.ParseFloat(record[3], 64)
if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
if _, hasIt := rpls[record[0]]; !hasIt {
rpls[record[0]] = make([]*utils.TPRatingPlanBinding, 0)
}
rpls[record[0]] = append(rpls[record[0]], &utils.TPRatingPlanBinding{
DestinationRatesId: record[1],
Weight: weight,
TimingId: record[2],
})
}
if err := self.StorDb.SetTPRatingPlans(self.TPid, rpls); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
return self.StorDb.SetTpRatingPlans(tps)
}
func (self *TPCSVImporter) importRatingProfiles(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
tps, err := self.csvr.GetTpRatingProfiles(self.TPid, "", nil)
if err != nil {
return err
}
lineNr := 0
rpfs := make(map[string]*utils.TPRatingProfile)
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
direction, tenant, tor, subject, ratingPlanTag, fallbacksubject := record[0], record[1], record[2], record[3], record[5], record[6]
_, err = utils.ParseDate(record[4])
if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
loadId := utils.CSV_LOAD //Autogenerate rating profile id
if self.ImportId != "" {
loadId += "_" + self.ImportId
}
newRp := &utils.TPRatingProfile{
TPid: self.TPid,
LoadId: loadId,
Tenant: tenant,
Category: tor,
Direction: direction,
Subject: subject,
RatingPlanActivations: []*utils.TPRatingActivation{
&utils.TPRatingActivation{ActivationTime: record[4], RatingPlanId: ratingPlanTag, FallbackSubjects: fallbacksubject}},
}
if rp, hasIt := rpfs[newRp.KeyId()]; hasIt {
rp.RatingPlanActivations = append(rp.RatingPlanActivations, newRp.RatingPlanActivations...)
} else {
rpfs[newRp.KeyId()] = newRp
}
}
if err := self.StorDb.SetTPRatingProfiles(self.TPid, rpfs); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
return self.StorDb.SetTpRatingProfiles(tps)
}
func (self *TPCSVImporter) importSharedGroups(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
tps, err := self.csvr.GetTpSharedGroups(self.TPid, "")
if err != nil {
return err
}
shgs := make(map[string][]*utils.TPSharedGroup)
lineNr := 0
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
if _, hasIt := shgs[record[0]]; !hasIt {
shgs[record[0]] = make([]*utils.TPSharedGroup, 0)
}
shgs[record[0]] = append(shgs[record[0]], &utils.TPSharedGroup{Account: record[1], Strategy: record[2], RatingSubject: record[3]})
}
if err := self.StorDb.SetTPSharedGroups(self.TPid, shgs); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
return self.StorDb.SetTpSharedGroups(tps)
}
func (self *TPCSVImporter) importActions(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
tps, err := self.csvr.GetTpActions(self.TPid, "")
if err != nil {
return err
}
acts := make(map[string][]*utils.TPAction)
lineNr := 0
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
actId, actionType, balanceTag, balanceType, direction, destTags, rateSubject, category, sharedGroup := record[ACTSCSVIDX_TAG], record[ACTSCSVIDX_ACTION],
record[ACTSCSVIDX_BALANCE_TAG], record[ACTSCSVIDX_BALANCE_TYPE], record[ACTSCSVIDX_DIRECTION], record[ACTSCSVIDX_DESTINATION_TAG], record[ACTSCSVIDX_RATING_SUBJECT],
record[ACTSCSVIDX_CATEGORY], record[ACTSCSVIDX_SHARED_GROUP]
units, err := strconv.ParseFloat(record[ACTSCSVIDX_UNITS], 64)
if err != nil && record[ACTSCSVIDX_UNITS] != "" {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
balanceWeight, _ := strconv.ParseFloat(record[ACTSCSVIDX_BALANCE_WEIGHT], 64)
weight, err := strconv.ParseFloat(record[ACTSCSVIDX_WEIGHT], 64)
if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
if _, hasIt := acts[actId]; !hasIt {
acts[actId] = make([]*utils.TPAction, 0)
}
acts[actId] = append(acts[actId], &utils.TPAction{
Identifier: actionType,
BalanceId: balanceTag,
BalanceType: balanceType,
Direction: direction,
Units: units,
ExpiryTime: record[ACTSCSVIDX_EXPIRY_TIME],
DestinationIds: destTags,
RatingSubject: rateSubject,
Category: category,
SharedGroup: sharedGroup,
BalanceWeight: balanceWeight,
ExtraParameters: record[ACTSCSVIDX_EXTRA_PARAMS],
Weight: weight,
})
}
if err := self.StorDb.SetTPActions(self.TPid, acts); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
return self.StorDb.SetTpActions(tps)
}
func (self *TPCSVImporter) importActionTimings(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
tps, err := self.csvr.GetTpActionPlans(self.TPid, "")
if err != nil {
return err
}
lineNr := 0
aplns := make(map[string][]*utils.TPActionTiming)
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
tag, actionsTag, timingTag := record[0], record[1], record[2]
weight, err := strconv.ParseFloat(record[3], 64)
if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
if _, hasIt := aplns[tag]; !hasIt {
aplns[tag] = make([]*utils.TPActionTiming, 0)
}
aplns[tag] = append(aplns[tag], &utils.TPActionTiming{
ActionsId: actionsTag,
TimingId: timingTag,
Weight: weight,
})
}
if err := self.StorDb.SetTPActionTimings(self.TPid, aplns); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
return self.StorDb.SetTpActionPlans(tps)
}
func (self *TPCSVImporter) importActionTriggers(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
tps, err := self.csvr.GetTpActionTriggers(self.TPid, "")
if err != nil {
return err
}
lineNr := 0
atrs := make(map[string][]*utils.TPActionTrigger)
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
tag, balanceId, balanceType, direction, thresholdType, destinationTags, balanceExpirationDate, balanceRatingSubject, balanceCategory, balanceSharedGroup, actionsTag := record[ATRIGCSVIDX_TAG], record[ATRIGCSVIDX_BAL_TAG], record[ATRIGCSVIDX_BAL_TYPE],
record[ATRIGCSVIDX_BAL_DIRECTION], record[ATRIGCSVIDX_THRESHOLD_TYPE], record[ATRIGCSVIDX_BAL_DESTINATION_TAG], record[ATRIGCSVIDX_BAL_EXPIRY_TIME], record[ATRIGCSVIDX_BAL_RATING_SUBJECT],
record[ATRIGCSVIDX_BAL_CATEGORY], record[ATRIGCSVIDX_BAL_SHARED_GROUP], record[ATRIGCSVIDX_ACTIONS_TAG]
threshold, err := strconv.ParseFloat(record[ATRIGCSVIDX_THRESHOLD_VALUE], 64)
if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
recurrent, err := strconv.ParseBool(record[ATRIGCSVIDX_RECURRENT])
if err != nil {
log.Printf("Ignoring line %d, warning: <%s>", lineNr, err.Error())
continue
}
balanceWeight, err := strconv.ParseFloat(record[ATRIGCSVIDX_BAL_WEIGHT], 64)
if err != nil && record[ATRIGCSVIDX_BAL_WEIGHT] != "" {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
minQueuedItems, err := strconv.Atoi(record[ATRIGCSVIDX_STATS_MIN_QUEUED_ITEMS])
if err != nil && record[ATRIGCSVIDX_STATS_MIN_QUEUED_ITEMS] != "" {
log.Printf("Ignoring line %d, warning: <%s>", lineNr, err.Error())
continue
}
weight, err := strconv.ParseFloat(record[ATRIGCSVIDX_WEIGHT], 64)
if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
if _, hasIt := atrs[tag]; !hasIt {
atrs[tag] = make([]*utils.TPActionTrigger, 0)
}
atrs[tag] = append(atrs[tag], &utils.TPActionTrigger{
ThresholdType: thresholdType,
ThresholdValue: threshold,
Recurrent: recurrent,
MinSleep: record[ATRIGCSVIDX_MIN_SLEEP],
BalanceId: balanceId,
BalanceType: balanceType,
BalanceDirection: direction,
BalanceDestinationIds: destinationTags,
BalanceWeight: balanceWeight,
BalanceExpirationDate: balanceExpirationDate,
BalanceRatingSubject: balanceRatingSubject,
BalanceCategory: balanceCategory,
BalanceSharedGroup: balanceSharedGroup,
MinQueuedItems: minQueuedItems,
Weight: weight,
ActionsId: actionsTag,
})
}
if err := self.StorDb.SetTPActionTriggers(self.TPid, atrs); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
return self.StorDb.SetTpActionTriggers(tps)
}
func (self *TPCSVImporter) importAccountActions(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
tps, err := self.csvr.GetTpAccountActions(&utils.TPAccountActions{TPid: self.TPid})
if err != nil {
return err
}
loadId := utils.CSV_LOAD //Autogenerate account actions profile id
if self.ImportId != "" {
loadId += "_" + self.ImportId
}
lineNr := 0
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
tenant, account, direction, actionTimingsTag, actionTriggersTag := record[0], record[1], record[2], record[3], record[4]
tpaa := &utils.TPAccountActions{TPid: self.TPid, LoadId: loadId, Tenant: tenant, Account: account, Direction: direction,
ActionPlanId: actionTimingsTag, ActionTriggersId: actionTriggersTag}
aa := map[string]*utils.TPAccountActions{tpaa.KeyId(): tpaa}
if err := self.StorDb.SetTPAccountActions(self.TPid, aa); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
}
return nil
return self.StorDb.SetTpAccountActions(tps)
}
func (self *TPCSVImporter) importDerivedChargers(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
tps, err := self.csvr.GetTpDerivedChargers(self.TPid, "")
if err != nil {
return err
}
loadId := utils.CSV_LOAD //Autogenerate account actions profile id
if self.ImportId != "" {
loadId += "_" + self.ImportId
}
dcs := make(map[string][]*utils.TPDerivedCharger)
lineNr := 0
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
newDcs := utils.TPDerivedChargers{TPid: self.TPid,
Loadid: loadId,
Direction: record[0],
Tenant: record[1],
Category: record[2],
Account: record[3],
Subject: record[4]}
dcsId := newDcs.GetDerivedChargesId()
if _, hasIt := dcs[dcsId]; !hasIt {
dcs[dcsId] = make([]*utils.TPDerivedCharger, 0)
}
dcs[dcsId] = append(dcs[dcsId], &utils.TPDerivedCharger{
RunId: ValueOrDefault(record[5], "*default"),
RunFilters: record[6],
ReqTypeField: ValueOrDefault(record[7], "*default"),
DirectionField: ValueOrDefault(record[8], "*default"),
TenantField: ValueOrDefault(record[9], "*default"),
CategoryField: ValueOrDefault(record[10], "*default"),
AccountField: ValueOrDefault(record[11], "*default"),
SubjectField: ValueOrDefault(record[12], "*default"),
DestinationField: ValueOrDefault(record[13], "*default"),
SetupTimeField: ValueOrDefault(record[14], "*default"),
AnswerTimeField: ValueOrDefault(record[15], "*default"),
UsageField: ValueOrDefault(record[16], "*default"),
SupplierField: ValueOrDefault(record[17], "*default"),
})
}
if err := self.StorDb.SetTPDerivedChargers(self.TPid, dcs); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
return self.StorDb.SetTpDerivedChargers(tps)
}
func (self *TPCSVImporter) importCdrStats(fn string) error {
if self.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
fParser, err := NewTPCSVFileParser(self.DirPath, fn)
tps, err := self.csvr.GetTpDerivedChargers(self.TPid, "")
if err != nil {
return err
}
css := make(map[string][]*utils.TPCdrStat)
lineNr := 0
for {
lineNr++
record, err := fParser.ParseNextLine()
if err == io.EOF { // Reached end of file
break
} else if err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, warning: <%s> ", lineNr, err.Error())
}
continue
}
if len(record[CDRSTATIDX_QLENGHT]) == 0 {
record[CDRSTATIDX_QLENGHT] = "0"
}
if _, err = strconv.Atoi(record[CDRSTATIDX_QLENGHT]); err != nil {
log.Printf("Ignoring line %d, warning: <%s>", lineNr, err.Error())
continue
}
if _, hasIt := css[record[CDRSTATIDX_TAG]]; !hasIt {
css[record[CDRSTATIDX_TAG]] = make([]*utils.TPCdrStat, 0)
}
css[record[0]] = append(css[record[0]], &utils.TPCdrStat{
QueueLength: record[CDRSTATIDX_QLENGHT],
TimeWindow: ValueOrDefault(record[CDRSTATIDX_TIMEWINDOW], "0"),
Metrics: record[CDRSTATIDX_METRICS],
SetupInterval: record[CDRSTATIDX_SETUPTIME],
TORs: record[CDRSTATIDX_TOR],
CdrHosts: record[CDRSTATIDX_CDRHOST],
CdrSources: record[CDRSTATIDX_CDRSRC],
ReqTypes: record[CDRSTATIDX_REQTYPE],
Directions: record[CDRSTATIDX_DIRECTION],
Tenants: record[CDRSTATIDX_TENANT],
Categories: record[CDRSTATIDX_CATEGORY],
Accounts: record[CDRSTATIDX_ACCOUNT],
Subjects: record[CDRSTATIDX_SUBJECT],
DestinationPrefixes: record[CDRSTATIDX_DSTPREFIX],
UsageInterval: record[CDRSTATIDX_USAGE],
Suppliers: record[CDRSTATIDX_SUPPLIER],
DisconnectCauses: record[CDRSTATIDX_DISCONNECT_CAUSE],
MediationRunIds: record[CDRSTATIDX_MEDRUN],
RatedAccounts: record[CDRSTATIDX_RTACCOUNT],
RatedSubjects: record[CDRSTATIDX_RTSUBJECT],
CostInterval: record[CDRSTATIDX_COST],
ActionTriggers: record[CDRSTATIDX_ATRIGGER],
})
}
if err := self.StorDb.SetTPCdrStats(self.TPid, css); err != nil {
if self.Verbose {
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
}
}
return nil
return self.StorDb.SetTpDerivedChargers(tps)
}

View File

@@ -21,7 +21,6 @@ package utils
import (
"fmt"
"sort"
"strconv"
"strings"
"time"
)
@@ -48,26 +47,12 @@ type Paginator struct {
}
*/
// Used on exports (eg: TPExport)
type ExportedData interface {
AsExportSlice() [][]string
}
type TPDestination struct {
TPid string // Tariff plan id
DestinationId string // Destination id
Prefixes []string // Prefixes attached to this destination
}
// Convert as slice so we can use it in exports (eg: csv)
func (self *TPDestination) AsExportSlice() [][]string {
retSlice := make([][]string, len(self.Prefixes))
for idx, prefix := range self.Prefixes {
retSlice[idx] = []string{self.DestinationId, prefix}
}
return retSlice
}
// This file deals with tp_* data definition
type TPRate struct {
@@ -76,15 +61,6 @@ type TPRate struct {
RateSlots []*RateSlot // One or more RateSlots
}
//#TPid,Tag,ConnectFee,Rate,RateUnit,RateIncrement,GroupIntervalStart
func (self *TPRate) AsExportSlice() [][]string {
retSlice := make([][]string, len(self.RateSlots))
for idx, rtSlot := range self.RateSlots {
retSlice[idx] = []string{self.RateId, strconv.FormatFloat(rtSlot.ConnectFee, 'f', -1, 64), strconv.FormatFloat(rtSlot.Rate, 'f', -1, 64), rtSlot.RateUnit, rtSlot.RateIncrement, rtSlot.GroupIntervalStart}
}
return retSlice
}
// Needed so we make sure we always use SetDurations() on a newly created value
func NewRateSlot(connectFee, rate float64, rateUnit, rateIncrement, grpInterval string) (*RateSlot, error) {
rs := &RateSlot{ConnectFee: connectFee, Rate: rate, RateUnit: rateUnit, RateIncrement: rateIncrement,
@@ -136,15 +112,6 @@ type TPDestinationRate struct {
DestinationRates []*DestinationRate // Set of destinationid-rateid bindings
}
//#TPid,Tag,DestinationsTag,RatesTag,RoundingMethod,RoundingDecimals
func (self *TPDestinationRate) AsExportSlice() [][]string {
retSlice := make([][]string, len(self.DestinationRates))
for idx, dstRate := range self.DestinationRates {
retSlice[idx] = []string{self.DestinationRateId, dstRate.DestinationId, dstRate.RateId, dstRate.RoundingMethod, strconv.Itoa(dstRate.RoundingDecimals)}
}
return retSlice
}
type DestinationRate struct {
DestinationId string // The destination identity
RateId string // The rate identity
@@ -165,13 +132,6 @@ type ApierTPTiming struct {
Time string // String representing the time this timing starts on
}
// Keep the ExportSlice interface, although we only need a single slice to be generated
func (self *ApierTPTiming) AsExportSlice() [][]string {
return [][]string{
[]string{self.TimingId, self.Years, self.Months, self.MonthDays, self.WeekDays, self.Time},
}
}
type TPTiming struct {
Id string
Years Years
@@ -188,14 +148,6 @@ type TPRatingPlan struct {
RatingPlanBindings []*TPRatingPlanBinding // Set of destinationid-rateid bindings
}
func (self *TPRatingPlan) AsExportSlice() [][]string {
retSlice := make([][]string, len(self.RatingPlanBindings))
for idx, rp := range self.RatingPlanBindings {
retSlice[idx] = []string{self.RatingPlanId, rp.DestinationRatesId, rp.TimingId, strconv.FormatFloat(rp.Weight, 'f', -1, 64)}
}
return retSlice
}
type TPRatingPlanBinding struct {
DestinationRatesId string // The DestinationRate identity
TimingId string // The timing identity
@@ -232,15 +184,6 @@ type TPRatingProfile struct {
RatingPlanActivations []*TPRatingActivation // Activate rate profiles at specific time
}
//TPid,LoadId,Direction,Tenant,Category,Subject,ActivationTime,RatingPlanId,RatesFallbackSubject
func (self *TPRatingProfile) AsExportSlice() [][]string {
retSlice := make([][]string, len(self.RatingPlanActivations))
for idx, rpln := range self.RatingPlanActivations {
retSlice[idx] = []string{self.Direction, self.Tenant, self.Category, self.Subject, rpln.ActivationTime, rpln.RatingPlanId, rpln.FallbackSubjects}
}
return retSlice
}
// Used as key in nosql db (eg: redis)
func (self *TPRatingProfile) KeyId() string {
return fmt.Sprintf("%s:%s:%s:%s", self.Direction, self.Tenant, self.Category, self.Subject)
@@ -307,16 +250,6 @@ type TPActions struct {
Actions []*TPAction // Set of actions this Actions profile will perform
}
//TPid,ActionsTag[0],Action[1],ExtraParameters[2],BalanceType[3],Direction[4],Category[5],DestinationTag[6],RatingSubject[7],SharedGroup[8],ExpiryTime[9],Units[10],BalanceWeight[11],Weight[12]
func (self *TPActions) AsExportSlice() [][]string {
retSlice := make([][]string, len(self.Actions))
for idx, act := range self.Actions {
retSlice[idx] = []string{self.ActionsId, act.Identifier, act.ExtraParameters, act.BalanceType, act.Direction, act.Category, act.DestinationIds, act.RatingSubject,
act.SharedGroup, act.ExpiryTime, strconv.FormatFloat(act.Units, 'f', -1, 64), strconv.FormatFloat(act.BalanceWeight, 'f', -1, 64), strconv.FormatFloat(act.Weight, 'f', -1, 64)}
}
return retSlice
}
type TPAction struct {
Identifier string // Identifier mapped in the code
BalanceId string // Balance identification string (account scope)
@@ -340,15 +273,6 @@ type TPSharedGroups struct {
SharedGroups []*TPSharedGroup
}
// TPid,Id,Account,Strategy,RatingSubject
func (self *TPSharedGroups) AsExportSlice() [][]string {
retSlice := make([][]string, len(self.SharedGroups))
for idx, sg := range self.SharedGroups {
retSlice[idx] = []string{self.SharedGroupsId, sg.Account, sg.Strategy, sg.RatingSubject}
}
return retSlice
}
type TPSharedGroup struct {
Account string
Strategy string
@@ -361,16 +285,6 @@ type TPLcrRules struct {
LcrRules []*TPLcrRule
}
//*in,cgrates.org,*any,EU_LANDLINE,LCR_STANDARD,*static,ivo;dan;rif,2012-01-01T00:00:00Z,10
func (self *TPLcrRules) AsExportSlice() [][]string {
retSlice := make([][]string, len(self.LcrRules))
for idx, rl := range self.LcrRules {
retSlice[idx] = []string{self.LcrRulesId, rl.Direction, rl.Tenant, rl.Customer, rl.DestinationId, rl.Category, rl.Strategy,
rl.Suppliers, rl.ActivatinTime, strconv.FormatFloat(rl.Weight, 'f', -1, 64)}
}
return retSlice
}
type TPLcrRule struct {
Direction string
Tenant string
@@ -389,20 +303,6 @@ type TPCdrStats struct {
CdrStats []*TPCdrStat
}
//TPid,Id,QueueLength,TimeWindow,Metric,SetupInterval,TOR,CdrHost,CdrSource,ReqType,Direction,Tenant,Category,Account,Subject,
//DestinationPrefix,UsageInterval,MediationRunIds,RatedAccount,RatedSubject,CostInterval,Triggers
func (self *TPCdrStats) AsExportSlice() [][]string {
retSlice := make([][]string, len(self.CdrStats))
for idx, cdrStat := range self.CdrStats {
retSlice[idx] = []string{self.CdrStatsId, cdrStat.QueueLength, cdrStat.TimeWindow, cdrStat.Metrics, cdrStat.SetupInterval, cdrStat.TORs, cdrStat.CdrHosts,
cdrStat.CdrSources, cdrStat.ReqTypes, cdrStat.Directions, cdrStat.Tenants, cdrStat.Categories, cdrStat.Accounts, cdrStat.Subjects, cdrStat.DestinationPrefixes,
cdrStat.UsageInterval, cdrStat.Suppliers, cdrStat.DisconnectCauses, cdrStat.MediationRunIds, cdrStat.RatedAccounts, cdrStat.RatedSubjects, cdrStat.CostInterval,
cdrStat.ActionTriggers}
}
return retSlice
}
type TPCdrStat struct {
QueueLength string
TimeWindow string
@@ -439,17 +339,6 @@ type TPDerivedChargers struct {
DerivedChargers []*TPDerivedCharger
}
//#Direction,Tenant,Category,Account,Subject,RunId,RunFilter,ReqTypeField,DirectionField,TenantField,CategoryField,AccountField,SubjectField,DestinationField,SetupTimeField,AnswerTimeField,UsageField,SupplierField
func (self *TPDerivedChargers) AsExportSlice() [][]string {
retSlice := make([][]string, len(self.DerivedChargers))
for idx, dc := range self.DerivedChargers {
retSlice[idx] = []string{self.Direction, self.Tenant, self.Category, self.Account, self.Subject, dc.RunId, dc.RunFilters, dc.ReqTypeField,
dc.DirectionField, dc.TenantField, dc.CategoryField, dc.AccountField, dc.SubjectField, dc.DestinationField, dc.SetupTimeField, dc.AnswerTimeField,
dc.UsageField, dc.SupplierField, dc.DisconnectCauseField}
}
return retSlice
}
// Key used in dataDb to identify DerivedChargers set
func (tpdc *TPDerivedChargers) GetDerivedChargersKey() string {
return DerivedChargersKey(tpdc.Direction, tpdc.Tenant, tpdc.Category, tpdc.Account, tpdc.Subject)
@@ -507,15 +396,6 @@ type TPActionPlan struct {
ActionPlan []*TPActionTiming // Set of ActionTiming bindings this profile will group
}
//TPid,Tag,ActionsTag,TimingTag,Weight
func (self *TPActionPlan) AsExportSlice() [][]string {
retSlice := make([][]string, len(self.ActionPlan))
for idx, ap := range self.ActionPlan {
retSlice[idx] = []string{self.Id, ap.ActionsId, ap.TimingId, strconv.FormatFloat(ap.Weight, 'f', -1, 64)}
}
return retSlice
}
type TPActionTiming struct {
ActionsId string // Actions id
TimingId string // Timing profile id
@@ -528,18 +408,6 @@ type TPActionTriggers struct {
ActionTriggers []*TPActionTrigger // Set of triggers grouped in this profile
}
// TPid,Tag[0],ThresholdType[1],ThresholdValue[2],Recurrent[3],MinSleep[4],BalanceId[5],BalanceType[6],BalanceDirection[7],BalanceCategory[8],BalanceDestinationTag[9],
// BalanceRatingSubject[10],BalanceSharedGroup[11],BalanceExpiryTime[12],BalanceWeight[13],StatsMinQueuedItems[14],ActionsTag[15],Weight[16]
func (self *TPActionTriggers) AsExportSlice() [][]string {
retSlice := make([][]string, len(self.ActionTriggers))
for idx, at := range self.ActionTriggers {
retSlice[idx] = []string{self.ActionTriggersId, at.ThresholdType, strconv.FormatFloat(at.ThresholdValue, 'f', -1, 64), strconv.FormatBool(at.Recurrent), at.MinSleep,
at.BalanceId, at.BalanceType, at.BalanceDirection, at.BalanceCategory, at.BalanceDestinationIds, at.BalanceRatingSubject, at.BalanceSharedGroup, at.BalanceExpirationDate, at.BalanceTimingTags,
strconv.FormatFloat(at.BalanceWeight, 'f', -1, 64), strconv.Itoa(at.MinQueuedItems), at.ActionsId, strconv.FormatFloat(at.Weight, 'f', -1, 64)}
}
return retSlice
}
type TPActionTrigger struct {
Id string
ThresholdType string // This threshold type
@@ -583,13 +451,6 @@ type TPAccountActions struct {
ActionTriggersId string // Id of ActionTriggers profile to use
}
//TPid,Tenant,Account,Direction,ActionPlanTag,ActionTriggersTag
func (self *TPAccountActions) AsExportSlice() [][]string {
return [][]string{
[]string{self.Tenant, self.Account, self.Direction, self.ActionPlanId, self.ActionTriggersId},
}
}
// Returns the id used in some nosql dbs (eg: redis)
func (self *TPAccountActions) KeyId() string {
return fmt.Sprintf("%s:%s:%s", self.Direction, self.Tenant, self.Account)

View File

@@ -263,6 +263,7 @@ func ConcatenatedKey(keyVals ...string) string {
func LCRKey(direction, tenant, category, account, subject string) string {
return ConcatenatedKey(direction, tenant, category, account, subject)
}
func RatingSubjectAliasKey(tenant, subject string) string {