added accounting loaders

This commit is contained in:
Radu Ioan Fericean
2015-05-28 20:48:19 +03:00
parent 99a244060e
commit 0fd480e5c1
6 changed files with 877 additions and 286 deletions

View File

@@ -2,10 +2,12 @@ package engine
import (
"fmt"
"log"
"reflect"
"regexp"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/utils"
)
@@ -240,3 +242,310 @@ func (tps TpRatingProfiles) GetRatingProfiles() (map[string]*utils.TPRatingProfi
}
return rpfs, nil
}
type TpSharedGroups []*TpSharedGroup
func (tps TpSharedGroups) GetSharedGroups() (map[string][]*utils.TPSharedGroup, error) {
sgs := make(map[string][]*utils.TPSharedGroup)
for _, tpSg := range tps {
sgs[tpSg.Tag] = append(sgs[tpSg.Tag], &utils.TPSharedGroup{
Account: tpSg.Account,
Strategy: tpSg.Strategy,
RatingSubject: tpSg.RatingSubject,
})
}
return sgs, nil
}
type TpActions []*TpAction
func (tps TpActions) GetActions() (map[string][]*utils.TPAction, error) {
as := make(map[string][]*utils.TPAction)
for _, tpAc := range tps {
a := &utils.TPAction{
Identifier: tpAc.Action,
BalanceId: tpAc.BalanceTag,
BalanceType: tpAc.BalanceType,
Direction: tpAc.Direction,
Units: tpAc.Units,
ExpiryTime: tpAc.ExpiryTime,
TimingTags: tpAc.TimingTags,
DestinationIds: tpAc.DestinationTags,
RatingSubject: tpAc.RatingSubject,
Category: tpAc.Category,
SharedGroup: tpAc.SharedGroup,
BalanceWeight: tpAc.BalanceWeight,
ExtraParameters: tpAc.ExtraParameters,
Weight: tpAc.Weight,
}
as[tpAc.Tag] = append(as[tpAc.Tag], a)
}
return as, nil
}
type TpActionPlans []*TpActionPlan
func (tps TpActionPlans) GetActionPlans() (map[string][]*utils.TPActionTiming, error) {
ats := make(map[string][]*utils.TPActionTiming)
for _, tpAp := range tps {
ats[tpAp.Tag] = append(ats[tpAp.Tag], &utils.TPActionTiming{ActionsId: tpAp.ActionsTag, TimingId: tpAp.TimingTag, Weight: tpAp.Weight})
}
return ats, nil
}
type TpActionTriggers []*TpActionTrigger
func (tps TpActionTriggers) GetActionTriggers() (map[string][]*utils.TPActionTrigger, error) {
ats := make(map[string][]*utils.TPActionTrigger)
for _, tpAt := range tps {
at := &utils.TPActionTrigger{
Id: tpAt.UniqueId,
ThresholdType: tpAt.ThresholdType,
ThresholdValue: tpAt.ThresholdValue,
Recurrent: tpAt.Recurrent,
MinSleep: tpAt.MinSleep,
BalanceId: tpAt.BalanceTag,
BalanceType: tpAt.BalanceType,
BalanceDirection: tpAt.BalanceDirection,
BalanceDestinationIds: tpAt.BalanceDestinationTags,
BalanceWeight: tpAt.BalanceWeight,
BalanceExpirationDate: tpAt.BalanceExpiryTime,
BalanceTimingTags: tpAt.BalanceTimingTags,
BalanceRatingSubject: tpAt.BalanceRatingSubject,
BalanceCategory: tpAt.BalanceCategory,
BalanceSharedGroup: tpAt.BalanceSharedGroup,
Weight: tpAt.Weight,
ActionsId: tpAt.ActionsTag,
MinQueuedItems: tpAt.MinQueuedItems,
}
ats[tpAt.Tag] = append(ats[tpAt.Tag], at)
}
return ats, nil
}
type TpAccountActions []*TpAccountAction
func (tps TpAccountActions) GetAccountActions() (map[string]*utils.TPAccountActions, error) {
aas := make(map[string]*utils.TPAccountActions)
for _, tpAa := range tps {
aacts := &utils.TPAccountActions{
TPid: tpAa.Tpid,
LoadId: tpAa.Loadid,
Tenant: tpAa.Tenant,
Account: tpAa.Account,
Direction: tpAa.Direction,
ActionPlanId: tpAa.ActionPlanTag,
ActionTriggersId: tpAa.ActionTriggersTag,
}
aas[aacts.KeyId()] = aacts
}
return aas, nil
}
type TpDerivedChargers []*TpDerivedCharger
func (tps TpDerivedChargers) GetDerivedChargers() (map[string]*utils.TPDerivedChargers, error) {
dcs := make(map[string]*utils.TPDerivedChargers)
for _, tpDcMdl := range tps {
tpDc := &utils.TPDerivedChargers{TPid: tpDcMdl.Tpid, Loadid: tpDcMdl.Loadid, Direction: tpDcMdl.Direction, Tenant: tpDcMdl.Tenant, Category: tpDcMdl.Category,
Account: tpDcMdl.Account, Subject: tpDcMdl.Subject}
tag := tpDc.GetDerivedChargesId()
if _, hasIt := dcs[tag]; !hasIt {
dcs[tag] = tpDc
}
dcs[tag].DerivedChargers = append(dcs[tag].DerivedChargers, &utils.TPDerivedCharger{
RunId: tpDcMdl.Runid,
RunFilters: tpDcMdl.RunFilters,
ReqTypeField: tpDcMdl.ReqTypeField,
DirectionField: tpDcMdl.DirectionField,
TenantField: tpDcMdl.TenantField,
CategoryField: tpDcMdl.CategoryField,
AccountField: tpDcMdl.AccountField,
SubjectField: tpDcMdl.SubjectField,
DestinationField: tpDcMdl.DestinationField,
SetupTimeField: tpDcMdl.SetupTimeField,
AnswerTimeField: tpDcMdl.AnswerTimeField,
UsageField: tpDcMdl.UsageField,
SupplierField: tpDcMdl.SupplierField,
DisconnectCauseField: tpDcMdl.DisconnectCauseField,
})
}
return dcs, nil
}
type TpCdrStats []*TpCdrStat
func (tps TpCdrStats) GetCdrStats() (map[string][]*utils.TPCdrStat, error) {
css := make(map[string][]*utils.TPCdrStat)
for _, tpCs := range tps {
css[tpCs.Tag] = append(css[tpCs.Tag], &utils.TPCdrStat{
QueueLength: strconv.Itoa(tpCs.QueueLength),
TimeWindow: tpCs.TimeWindow,
Metrics: tpCs.Metrics,
SetupInterval: tpCs.SetupInterval,
TORs: tpCs.Tors,
CdrHosts: tpCs.CdrHosts,
CdrSources: tpCs.CdrSources,
ReqTypes: tpCs.ReqTypes,
Directions: tpCs.Directions,
Tenants: tpCs.Tenants,
Categories: tpCs.Categories,
Accounts: tpCs.Accounts,
Subjects: tpCs.Subjects,
DestinationPrefixes: tpCs.DestinationPrefixes,
UsageInterval: tpCs.UsageInterval,
Suppliers: tpCs.Suppliers,
DisconnectCauses: tpCs.DisconnectCauses,
MediationRunIds: tpCs.MediationRunids,
RatedAccounts: tpCs.RatedAccounts,
RatedSubjects: tpCs.RatedSubjects,
CostInterval: tpCs.CostInterval,
ActionTriggers: tpCs.ActionTriggers,
})
}
return css, nil
}
func UpdateCdrStats(cs *CdrStats, triggers ActionTriggerPriotityList, tpCs *utils.TPCdrStat) {
if tpCs.QueueLength != "" {
if qi, err := strconv.Atoi(tpCs.QueueLength); err == nil {
cs.QueueLength = qi
} else {
log.Printf("Error parsing QueuedLength %v for cdrs stats %v", tpCs.QueueLength, cs.Id)
}
}
if tpCs.TimeWindow != "" {
if d, err := time.ParseDuration(tpCs.TimeWindow); err == nil {
cs.TimeWindow = d
} else {
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", tpCs.TimeWindow, cs.Id)
}
}
if tpCs.Metrics != "" {
cs.Metrics = append(cs.Metrics, tpCs.Metrics)
}
if tpCs.SetupInterval != "" {
times := strings.Split(tpCs.SetupInterval, utils.INFIELD_SEP)
if len(times) > 0 {
if sTime, err := utils.ParseTimeDetectLayout(times[0]); err == nil {
if len(cs.SetupInterval) < 1 {
cs.SetupInterval = append(cs.SetupInterval, sTime)
} else {
cs.SetupInterval[0] = sTime
}
} else {
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", tpCs.SetupInterval, cs.Id)
}
}
if len(times) > 1 {
if eTime, err := utils.ParseTimeDetectLayout(times[1]); err == nil {
if len(cs.SetupInterval) < 2 {
cs.SetupInterval = append(cs.SetupInterval, eTime)
} else {
cs.SetupInterval[1] = eTime
}
} else {
log.Printf("Error parsing TimeWindow %v for cdrs stats %v", tpCs.SetupInterval, cs.Id)
}
}
}
if tpCs.TORs != "" {
cs.TOR = append(cs.TOR, tpCs.TORs)
}
if tpCs.CdrHosts != "" {
cs.CdrHost = append(cs.CdrHost, tpCs.CdrHosts)
}
if tpCs.CdrSources != "" {
cs.CdrSource = append(cs.CdrSource, tpCs.CdrSources)
}
if tpCs.ReqTypes != "" {
cs.ReqType = append(cs.ReqType, tpCs.ReqTypes)
}
if tpCs.Directions != "" {
cs.Direction = append(cs.Direction, tpCs.Directions)
}
if tpCs.Tenants != "" {
cs.Tenant = append(cs.Tenant, tpCs.Tenants)
}
if tpCs.Categories != "" {
cs.Category = append(cs.Category, tpCs.Categories)
}
if tpCs.Accounts != "" {
cs.Account = append(cs.Account, tpCs.Accounts)
}
if tpCs.Subjects != "" {
cs.Subject = append(cs.Subject, tpCs.Subjects)
}
if tpCs.DestinationPrefixes != "" {
cs.DestinationPrefix = append(cs.DestinationPrefix, tpCs.DestinationPrefixes)
}
if tpCs.UsageInterval != "" {
durations := strings.Split(tpCs.UsageInterval, utils.INFIELD_SEP)
if len(durations) > 0 {
if sDuration, err := time.ParseDuration(durations[0]); err == nil {
if len(cs.UsageInterval) < 1 {
cs.UsageInterval = append(cs.UsageInterval, sDuration)
} else {
cs.UsageInterval[0] = sDuration
}
} else {
log.Printf("Error parsing UsageInterval %v for cdrs stats %v", tpCs.UsageInterval, cs.Id)
}
}
if len(durations) > 1 {
if eDuration, err := time.ParseDuration(durations[1]); err == nil {
if len(cs.UsageInterval) < 2 {
cs.UsageInterval = append(cs.UsageInterval, eDuration)
} else {
cs.UsageInterval[1] = eDuration
}
} else {
log.Printf("Error parsing UsageInterval %v for cdrs stats %v", tpCs.UsageInterval, cs.Id)
}
}
}
if tpCs.Suppliers != "" {
cs.Supplier = append(cs.Supplier, tpCs.Suppliers)
}
if tpCs.DisconnectCauses != "" {
cs.DisconnectCause = append(cs.DisconnectCause, tpCs.DisconnectCauses)
}
if tpCs.MediationRunIds != "" {
cs.MediationRunIds = append(cs.MediationRunIds, tpCs.MediationRunIds)
}
if tpCs.RatedAccounts != "" {
cs.RatedAccount = append(cs.RatedAccount, tpCs.RatedAccounts)
}
if tpCs.RatedSubjects != "" {
cs.RatedSubject = append(cs.RatedSubject, tpCs.RatedSubjects)
}
if tpCs.CostInterval != "" {
costs := strings.Split(tpCs.CostInterval, utils.INFIELD_SEP)
if len(costs) > 0 {
if sCost, err := strconv.ParseFloat(costs[0], 64); err == nil {
if len(cs.CostInterval) < 1 {
cs.CostInterval = append(cs.CostInterval, sCost)
} else {
cs.CostInterval[0] = sCost
}
} else {
log.Printf("Error parsing CostInterval %v for cdrs stats %v", tpCs.CostInterval, cs.Id)
}
}
if len(costs) > 1 {
if eCost, err := strconv.ParseFloat(costs[1], 64); err == nil {
if len(cs.CostInterval) < 2 {
cs.CostInterval = append(cs.CostInterval, eCost)
} else {
cs.CostInterval[1] = eCost
}
} else {
log.Printf("Error parsing CostInterval %v for cdrs stats %v", tpCs.CostInterval, cs.Id)
}
}
}
if triggers != nil {
cs.Triggers = append(cs.Triggers, triggers...)
}
}

View File

@@ -117,12 +117,14 @@ type TpLcrRules struct {
Tpid string
Direction string
Tenant string
Customer string
DestinationTag string
Category string
Account string
Subject string
DestinationTag string
RpCategory string
Strategy string
Suppliers string
ActivatinTime string
StrategyParams string
ActivationTime string
Weight float64
CreatedAt time.Time
}

View File

@@ -101,7 +101,6 @@ func (csvs *CSVStorage) GetTpDestinations(tpid, tag string) ([]*TpDestination, e
tp := tpDest.(TpDestination)
tpDests = append(tpDests, &tp)
}
//log.Printf("%+v\n", tpDest)
}
return tpDests, nil
}
@@ -124,7 +123,6 @@ func (csvs *CSVStorage) GetTpRates(tpid, tag string) ([]*TpRate, error) {
tp := tpRate.(TpRate)
tpRates = append(tpRates, &tp)
}
//log.Printf("%+v\n", tpRate)
}
return tpRates, nil
}
@@ -134,7 +132,7 @@ func (csvs *CSVStorage) GetTpDestinationRates(tpid, tag string, p *utils.Paginat
if err != nil {
log.Print("Could not load destination_rates file: ", err)
// allow writing of the other values
return nil
return nil, nil
}
if fp != nil {
defer fp.Close()
@@ -147,7 +145,6 @@ func (csvs *CSVStorage) GetTpDestinationRates(tpid, tag string, p *utils.Paginat
tp := tpRate.(TpDestinationRate)
tpDestinationRates = append(tpDestinationRates, &tp)
}
//log.Printf("%+v\n", tpRate)
}
return tpDestinationRates, nil
}
@@ -157,7 +154,7 @@ func (csvs *CSVStorage) GetTpRatingPlans(tpid, tag string, p *utils.Paginator) (
if err != nil {
log.Print("Could not load rate plans file: ", err)
// allow writing of the other values
return nil
return nil, nil
}
if fp != nil {
defer fp.Close()
@@ -170,7 +167,6 @@ func (csvs *CSVStorage) GetTpRatingPlans(tpid, tag string, p *utils.Paginator) (
tp := tpRate.(TpRatingPlan)
tpRatingPlans = append(tpRatingPlans, &tp)
}
//log.Printf("%+v\n", tpRate)
}
return tpRatingPlans, nil
}
@@ -180,7 +176,7 @@ func (csvs *CSVStorage) GetTpRatingProfiles(filter *utils.TPRatingProfile) ([]*T
if err != nil {
log.Print("Could not load rating profiles file: ", err)
// allow writing of the other values
return nil
return nil, nil
}
if fp != nil {
defer fp.Close()
@@ -193,39 +189,185 @@ func (csvs *CSVStorage) GetTpRatingProfiles(filter *utils.TPRatingProfile) ([]*T
tp := tpRate.(TpRatingProfile)
tpRatingProfiles = append(tpRatingProfiles, &tp)
}
//log.Printf("%+v\n", tpRate)
}
return tpRatingProfiles, nil
}
func (csvs *CSVStorage) GetTpSharedGroups(tpid, tag string) ([]*TpSharedGroup, error) {
return nil, nil
csvReader, fp, err := csvs.readerFunc(csvs.sharedgroupsFn, csvs.sep, getColumnCount(TpSharedGroup{}))
if err != nil {
log.Print("Could not load shared groups file: ", err)
// allow writing of the other values
return nil, nil
}
if fp != nil {
defer fp.Close()
}
var tpSharedGroups []*TpSharedGroup
for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() {
if tpRate, err := csvLoad(TpSharedGroup{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpSharedGroup)
tpSharedGroups = append(tpSharedGroups, &tp)
}
}
return tpSharedGroups, nil
}
func (csvs *CSVStorage) GetTpCdrStats(tpid, tag string) ([]*TpCdrStat, error) {
return nil, nil
func (csvs *CSVStorage) GetTpLCRs(tpid, tag string) ([]*TpLcrRules, error) {
csvReader, fp, err := csvs.readerFunc(csvs.lcrFn, csvs.sep, getColumnCount(TpLcrRules{}))
if err != nil {
log.Print("Could not load LCR rules file: ", err)
// allow writing of the other values
return nil, nil
}
if fp != nil {
defer fp.Close()
}
var tpLCRs []*TpLcrRules
for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() {
if tpRate, err := csvLoad(TpLcrRules{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpLcrRules)
tpLCRs = append(tpLCRs, &tp)
}
}
return tpLCRs, nil
}
func (csvs *CSVStorage) GetTpDerivedChargers(filter *utils.TPDerivedChargers) ([]*TpDerivedCharger, error) {
return nil, nil
}
func (csvs *CSVStorage) GetTpLCRs(tpid, tag string) ([]*TpLcrRules, error) { return nil, nil }
func (csvs *CSVStorage) GetTpActions(tpid, tag string) ([]*TpAction, error) {
return nil, nil
csvReader, fp, err := csvs.readerFunc(csvs.actionsFn, csvs.sep, getColumnCount(TpAction{}))
if err != nil {
log.Print("Could not load action file: ", err)
// allow writing of the other values
return nil, nil
}
if fp != nil {
defer fp.Close()
}
var tpActions []*TpAction
for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() {
if tpRate, err := csvLoad(TpAction{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpAction)
tpActions = append(tpActions, &tp)
}
}
return tpActions, nil
}
func (csvs *CSVStorage) GetTPActionTimings(tpid, tag string) ([]*TpActionPlan, error) {
return nil, nil
func (csvs *CSVStorage) GetTPActionPlans(tpid, tag string) ([]*TpActionPlan, error) {
csvReader, fp, err := csvs.readerFunc(csvs.actiontimingsFn, csvs.sep, getColumnCount(TpActionPlan{}))
if err != nil {
log.Print("Could not load action plans file: ", err)
// allow writing of the other values
return nil, nil
}
if fp != nil {
defer fp.Close()
}
var tpActionPlans []*TpActionPlan
for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() {
if tpRate, err := csvLoad(TpActionPlan{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpActionPlan)
tpActionPlans = append(tpActionPlans, &tp)
}
}
return tpActionPlans, nil
}
func (csvs *CSVStorage) GetTpActionTriggers(tpid, tag string) ([]*TpActionTrigger, error) {
return nil, nil
csvReader, fp, err := csvs.readerFunc(csvs.actiontriggersFn, csvs.sep, getColumnCount(TpActionTrigger{}))
if err != nil {
log.Print("Could not load action triggers file: ", err)
// allow writing of the other values
return nil, nil
}
if fp != nil {
defer fp.Close()
}
var tpActionTriggers []*TpActionTrigger
for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() {
if tpRate, err := csvLoad(TpActionTrigger{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpActionTrigger)
tpActionTriggers = append(tpActionTriggers, &tp)
}
}
return tpActionTriggers, nil
}
func (csvs *CSVStorage) GetTpAccountActions(filter []*TpAccountAction) ([]*TpAccountAction, error) {
return nil, nil
csvReader, fp, err := csvs.readerFunc(csvs.accountactionsFn, csvs.sep, getColumnCount(TpAccountAction{}))
if err != nil {
log.Print("Could not load account actions file: ", err)
// allow writing of the other values
return nil, nil
}
if fp != nil {
defer fp.Close()
}
var tpAccountActions []*TpAccountAction
for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() {
if tpRate, err := csvLoad(TpAccountAction{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpAccountAction)
tpAccountActions = append(tpAccountActions, &tp)
}
}
return tpAccountActions, nil
}
func (csvs *CSVStorage) GetTpDerivedChargers(filter *utils.TPDerivedChargers) ([]*TpDerivedCharger, error) {
csvReader, fp, err := csvs.readerFunc(csvs.derivedChargersFn, csvs.sep, getColumnCount(TpDerivedCharger{}))
if err != nil {
log.Print("Could not load derivedChargers file: ", err)
// allow writing of the other values
return nil, nil
}
if fp != nil {
defer fp.Close()
}
var tpDerivedChargers []*TpDerivedCharger
for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() {
if tpRate, err := csvLoad(TpDerivedCharger{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpDerivedCharger)
tpDerivedChargers = append(tpDerivedChargers, &tp)
}
}
return tpDerivedChargers, nil
}
func (csvs *CSVStorage) GetTpCdrStats(tpid, tag string) ([]*TpCdrStat, error) {
csvReader, fp, err := csvs.readerFunc(csvs.derivedChargersFn, csvs.sep, getColumnCount(TpCdrStat{}))
if err != nil {
log.Print("Could not load derivedChargers file: ", err)
// allow writing of the other values
return nil, nil
}
if fp != nil {
defer fp.Close()
}
var tpCdrStats []*TpCdrStat
for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() {
if tpRate, err := csvLoad(TpCdrStat{}, record); err != nil {
return nil, err
} else {
tp := tpRate.(TpCdrStat)
tpCdrStats = append(tpCdrStats, &tp)
}
}
return tpCdrStats, nil
}
func (csvs *CSVStorage) GetTPIds() ([]string, error) {

View File

@@ -148,7 +148,7 @@ type LoadReader interface {
GetTpDerivedChargers(*utils.TPDerivedChargers) ([]*TpDerivedCharger, error)
GetTpLCRs(string, string) ([]*TpLcrRules, error)
GetTpActions(string, string) ([]*TpAction, error)
GetTpActionTimings(string, string) ([]*TpActionPlan, error)
GetTpActionPlans(string, string) ([]*TpActionPlan, error)
GetTpActionTriggers(string, string) ([]*TpActionTrigger, error)
GetTpAccountActions(*utils.TPAccountActions) ([]*TpAccountAction, error)
}

View File

@@ -539,31 +539,6 @@ func (self *SQLStorage) SetTpActions(tpid string, acts map[string][]*utils.TPAct
return nil
}
func (self *SQLStorage) GetTpActions(tpid, actsId string) (*utils.TPActions, error) {
acts := &utils.TPActions{TPid: tpid, ActionsId: actsId}
var tpActions []*TpAction
if err := self.db.Where(&TpAction{Tpid: tpid, Tag: actsId}).Find(&tpActions).Error; err != nil {
return nil, err
}
for _, tpAct := range tpActions {
acts.Actions = append(acts.Actions, &utils.TPAction{
Identifier: tpAct.Action,
BalanceType: tpAct.BalanceType,
Direction: tpAct.Direction,
Units: tpAct.Units,
ExpiryTime: tpAct.ExpiryTime,
TimingTags: tpAct.TimingTags,
DestinationIds: tpAct.DestinationTags,
RatingSubject: tpAct.RatingSubject,
Category: tpAct.Category,
BalanceWeight: tpAct.BalanceWeight,
SharedGroup: tpAct.SharedGroup,
ExtraParameters: tpAct.ExtraParameters,
Weight: tpAct.Weight})
}
return acts, nil
}
// Sets actionTimings in sqlDB. Imput is expected in form map[actionTimingId][]rows, eg a full .csv file content
func (self *SQLStorage) SetTpActionTimings(tpid string, ats map[string][]*utils.TPActionTiming) error {
if len(ats) == 0 {
@@ -594,16 +569,13 @@ func (self *SQLStorage) SetTpActionTimings(tpid string, ats map[string][]*utils.
return r.Error
}
func (self *SQLStorage) GetTPActionTimings(tpid, tag string) (map[string][]*utils.TPActionTiming, error) {
ats := make(map[string][]*utils.TPActionTiming)
var tpActionPlans []TpActionPlan
func (self *SQLStorage) GetTPActionPlan(tpid, tag string) ([]*TpActionPlan, error) {
var tpActionPlans []*TpActionPlan
if err := self.db.Where(&TpActionPlan{Tpid: tpid, Tag: tag}).Find(&tpActionPlans).Error; err != nil {
return nil, err
}
for _, tpAp := range tpActionPlans {
ats[tpAp.Tag] = append(ats[tpAp.Tag], &utils.TPActionTiming{ActionsId: tpAp.ActionsTag, TimingId: tpAp.TimingTag, Weight: tpAp.Weight})
}
return ats, nil
return tpActionPlans, nil
}
func (self *SQLStorage) SetTpActionTriggers(tpid string, ats map[string][]*utils.TPActionTrigger) error {
@@ -1291,72 +1263,78 @@ func (self *SQLStorage) GetTpRatingProfiles(qryRpf *utils.TPRatingProfile) ([]*T
return tpRpfs, nil
}
func (self *SQLStorage) GetTpSharedGroups(tpid, tag string) (map[string][]*utils.TPSharedGroup, error) {
sgs := make(map[string][]*utils.TPSharedGroup)
var tpCdrStats []TpSharedGroup
func (self *SQLStorage) GetTpSharedGroups(tpid, tag string) ([]*TpSharedGroup, error) {
var tpShareGroups []*TpSharedGroup
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
}
if err := q.Find(&tpCdrStats).Error; err != nil {
if err := q.Find(&tpShareGroups).Error; err != nil {
return nil, err
}
return tpShareGroups, nil
for _, tpSg := range tpCdrStats {
sgs[tpSg.Tag] = append(sgs[tpSg.Tag], &utils.TPSharedGroup{
Account: tpSg.Account,
Strategy: tpSg.Strategy,
RatingSubject: tpSg.RatingSubject,
})
}
return sgs, nil
}
func (self *SQLStorage) GetTpCdrStats(tpid, tag string) (map[string][]*utils.TPCdrStat, error) {
css := make(map[string][]*utils.TPCdrStat)
var tpCdrStats []TpCdrStat
func (self *SQLStorage) GetTpLCRs(tpid, tag string) ([]*TpLcrRules, error) {
var tpLcrRules []*TpLcrRules
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
}
if err := q.Find(&tpCdrStats).Error; err != nil {
if err := q.Find(&tpLcrRules).Error; err != nil {
return nil, err
}
for _, tpCs := range tpCdrStats {
css[tpCs.Tag] = append(css[tpCs.Tag], &utils.TPCdrStat{
QueueLength: strconv.Itoa(tpCs.QueueLength),
TimeWindow: tpCs.TimeWindow,
Metrics: tpCs.Metrics,
SetupInterval: tpCs.SetupInterval,
TORs: tpCs.Tors,
CdrHosts: tpCs.CdrHosts,
CdrSources: tpCs.CdrSources,
ReqTypes: tpCs.ReqTypes,
Directions: tpCs.Directions,
Tenants: tpCs.Tenants,
Categories: tpCs.Categories,
Accounts: tpCs.Accounts,
Subjects: tpCs.Subjects,
DestinationPrefixes: tpCs.DestinationPrefixes,
UsageInterval: tpCs.UsageInterval,
Suppliers: tpCs.Suppliers,
DisconnectCauses: tpCs.DisconnectCauses,
MediationRunIds: tpCs.MediationRunids,
RatedAccounts: tpCs.RatedAccounts,
RatedSubjects: tpCs.RatedSubjects,
CostInterval: tpCs.CostInterval,
ActionTriggers: tpCs.ActionTriggers,
})
}
return css, nil
return tpLcrRules, nil
}
func (self *SQLStorage) GetTpDerivedChargers(dc *utils.TPDerivedChargers) (map[string]*utils.TPDerivedChargers, error) {
dcs := make(map[string]*utils.TPDerivedChargers)
var tpDerivedChargers []TpDerivedCharger
func (self *SQLStorage) GetTpActions(tpid, tag string) ([]*TpAction, error) {
var tpActions []*TpAction
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
}
if err := q.Find(&tpActions).Error; err != nil {
return nil, err
}
return tpActions, nil
}
func (self *SQLStorage) GetTpActionTriggers(tpid, tag string) ([]*TpActionTrigger, error) {
var tpActionTriggers []*TpActionTrigger
if err := self.db.Where(&TpActionTrigger{Tpid: tpid, Tag: tag}).Find(&tpActionTriggers).Error; err != nil {
return nil, err
}
return tpActionTriggers, nil
}
func (self *SQLStorage) GetTpAccountActions(aaFltr *utils.TPAccountActions) ([]*TpAccountAction, error) {
var tpAccActs []*TpAccountAction
q := self.db.Where("tpid = ?", aaFltr.TPid)
if len(aaFltr.Direction) != 0 {
q = q.Where("direction = ?", aaFltr.Direction)
}
if len(aaFltr.Tenant) != 0 {
q = q.Where("tenant = ?", aaFltr.Tenant)
}
if len(aaFltr.Account) != 0 {
q = q.Where("account = ?", aaFltr.Account)
}
if len(aaFltr.LoadId) != 0 {
q = q.Where("loadid = ?", aaFltr.LoadId)
}
if err := q.Find(&tpAccActs).Error; err != nil {
return nil, err
}
return tpAccActs, nil
}
func (self *SQLStorage) GetTpDerivedChargers(dc *utils.TPDerivedChargers) ([]*TpDerivedCharger, error) {
var tpDerivedChargers []*TpDerivedCharger
q := self.db.Where("tpid = ?", dc.TPid)
if len(dc.Direction) != 0 {
q = q.Where("direction = ?", dc.Direction)
@@ -1379,184 +1357,18 @@ func (self *SQLStorage) GetTpDerivedChargers(dc *utils.TPDerivedChargers) (map[s
if err := q.Find(&tpDerivedChargers).Error; err != nil {
return nil, err
}
for _, tpDcMdl := range tpDerivedChargers {
tpDc := &utils.TPDerivedChargers{TPid: tpDcMdl.Tpid, Loadid: tpDcMdl.Loadid, Direction: tpDcMdl.Direction, Tenant: tpDcMdl.Tenant, Category: tpDcMdl.Category,
Account: tpDcMdl.Account, Subject: tpDcMdl.Subject}
tag := tpDc.GetDerivedChargesId()
if _, hasIt := dcs[tag]; !hasIt {
dcs[tag] = tpDc
}
dcs[tag].DerivedChargers = append(dcs[tag].DerivedChargers, &utils.TPDerivedCharger{
RunId: tpDcMdl.Runid,
RunFilters: tpDcMdl.RunFilters,
ReqTypeField: tpDcMdl.ReqTypeField,
DirectionField: tpDcMdl.DirectionField,
TenantField: tpDcMdl.TenantField,
CategoryField: tpDcMdl.CategoryField,
AccountField: tpDcMdl.AccountField,
SubjectField: tpDcMdl.SubjectField,
DestinationField: tpDcMdl.DestinationField,
SetupTimeField: tpDcMdl.SetupTimeField,
AnswerTimeField: tpDcMdl.AnswerTimeField,
UsageField: tpDcMdl.UsageField,
SupplierField: tpDcMdl.SupplierField,
DisconnectCauseField: tpDcMdl.DisconnectCauseField,
})
}
return dcs, nil
return tpDerivedChargers, nil
}
func (self *SQLStorage) GetTpLCRs(tpid, tag string) (map[string]*LCR, error) {
lcrs := make(map[string]*LCR)
q := fmt.Sprintf("SELECT * FROM %s WHERE tpid='%s'", utils.TBL_TP_LCRS, tpid)
if tag != "" {
q += fmt.Sprintf(" AND tag='%s'", tag)
}
rows, err := self.Db.Query(q)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var id int
var tpid, direction, tenant, category, account, subject, destinationId, rpCategory, strategy, strategyParams, suppliers, activationTimeString string
var weight float64
if err := rows.Scan(&id, &tpid, &direction, &tenant, &category, &account, &subject, &destinationId, &rpCategory, &strategy, &strategyParams, &suppliers, &activationTimeString, &weight); err != nil {
return nil, err
}
tag := utils.LCRKey(direction, tenant, category, account, subject)
lcr, found := lcrs[tag]
activationTime, _ := utils.ParseTimeDetectLayout(activationTimeString)
if !found {
lcr = &LCR{
Direction: direction,
Tenant: tenant,
Category: category,
Account: account,
Subject: subject,
}
}
var act *LCRActivation
for _, existingAct := range lcr.Activations {
if existingAct.ActivationTime.Equal(activationTime) {
act = existingAct
break
}
}
if act == nil {
act = &LCRActivation{
ActivationTime: activationTime,
}
lcr.Activations = append(lcr.Activations, act)
}
act.Entries = append(act.Entries, &LCREntry{
DestinationId: destinationId,
RPCategory: category,
Strategy: strategy,
StrategyParams: strategyParams,
Weight: weight,
})
lcrs[tag] = lcr
}
return lcrs, nil
}
func (self *SQLStorage) GetTpActions(tpid, tag string) (map[string][]*utils.TPAction, error) {
as := make(map[string][]*utils.TPAction)
var tpActions []TpAction
func (self *SQLStorage) GetTpCdrStats(tpid, tag string) ([]*TpCdrStat, error) {
var tpCdrStats []*TpCdrStat
q := self.db.Where("tpid = ?", tpid)
if len(tag) != 0 {
q = q.Where("tag = ?", tag)
}
if err := q.Find(&tpActions).Error; err != nil {
if err := q.Find(&tpCdrStats).Error; err != nil {
return nil, err
}
for _, tpAc := range tpActions {
a := &utils.TPAction{
Identifier: tpAc.Action,
BalanceId: tpAc.BalanceTag,
BalanceType: tpAc.BalanceType,
Direction: tpAc.Direction,
Units: tpAc.Units,
ExpiryTime: tpAc.ExpiryTime,
TimingTags: tpAc.TimingTags,
DestinationIds: tpAc.DestinationTags,
RatingSubject: tpAc.RatingSubject,
Category: tpAc.Category,
SharedGroup: tpAc.SharedGroup,
BalanceWeight: tpAc.BalanceWeight,
ExtraParameters: tpAc.ExtraParameters,
Weight: tpAc.Weight,
}
as[tpAc.Tag] = append(as[tpAc.Tag], a)
}
return as, nil
}
func (self *SQLStorage) GetTpActionTriggers(tpid, tag string) (map[string][]*utils.TPActionTrigger, error) {
ats := make(map[string][]*utils.TPActionTrigger)
var tpActionTriggers []TpActionTrigger
if err := self.db.Where(&TpActionTrigger{Tpid: tpid, Tag: tag}).Find(&tpActionTriggers).Error; err != nil {
return nil, err
}
for _, tpAt := range tpActionTriggers {
at := &utils.TPActionTrigger{
Id: tpAt.UniqueId,
ThresholdType: tpAt.ThresholdType,
ThresholdValue: tpAt.ThresholdValue,
Recurrent: tpAt.Recurrent,
MinSleep: tpAt.MinSleep,
BalanceId: tpAt.BalanceTag,
BalanceType: tpAt.BalanceType,
BalanceDirection: tpAt.BalanceDirection,
BalanceDestinationIds: tpAt.BalanceDestinationTags,
BalanceWeight: tpAt.BalanceWeight,
BalanceExpirationDate: tpAt.BalanceExpiryTime,
BalanceTimingTags: tpAt.BalanceTimingTags,
BalanceRatingSubject: tpAt.BalanceRatingSubject,
BalanceCategory: tpAt.BalanceCategory,
BalanceSharedGroup: tpAt.BalanceSharedGroup,
Weight: tpAt.Weight,
ActionsId: tpAt.ActionsTag,
MinQueuedItems: tpAt.MinQueuedItems,
}
ats[tpAt.Tag] = append(ats[tpAt.Tag], at)
}
return ats, nil
}
func (self *SQLStorage) GetTpAccountActions(aaFltr *utils.TPAccountActions) (map[string]*utils.TPAccountActions, error) {
aas := make(map[string]*utils.TPAccountActions)
var tpAccActs []TpAccountAction
q := self.db.Where("tpid = ?", aaFltr.TPid)
if len(aaFltr.Direction) != 0 {
q = q.Where("direction = ?", aaFltr.Direction)
}
if len(aaFltr.Tenant) != 0 {
q = q.Where("tenant = ?", aaFltr.Tenant)
}
if len(aaFltr.Account) != 0 {
q = q.Where("account = ?", aaFltr.Account)
}
if len(aaFltr.LoadId) != 0 {
q = q.Where("loadid = ?", aaFltr.LoadId)
}
if err := q.Find(&tpAccActs).Error; err != nil {
return nil, err
}
for _, tpAa := range tpAccActs {
aacts := &utils.TPAccountActions{
TPid: tpAa.Tpid,
LoadId: tpAa.Loadid,
Tenant: tpAa.Tenant,
Account: tpAa.Account,
Direction: tpAa.Direction,
ActionPlanId: tpAa.ActionPlanTag,
ActionTriggersId: tpAa.ActionTriggersTag,
}
aas[aacts.KeyId()] = aacts
}
return aas, nil
return tpCdrStats, nil
}

View File

@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"log"
"strconv"
"strings"
"github.com/cgrates/cgrates/utils"
@@ -100,7 +101,7 @@ func (tpr *TpReader) LoadDestinationRates() (err error) {
for _, dr := range drs.DestinationRates {
rate, exists := tpr.rates[dr.RateId]
if !exists {
return fmt.Errorf("Could not find rate for tag %v", dr.RateId)
return fmt.Errorf("could not find rate for tag %v", dr.RateId)
}
dr.Rate = rate
destinationExists := dr.DestinationId == utils.ANY
@@ -111,7 +112,7 @@ func (tpr *TpReader) LoadDestinationRates() (err error) {
if dbExists, err := tpr.ratingStorage.HasData(DESTINATION_PREFIX, dr.DestinationId); err != nil {
return err
} else if !dbExists {
return fmt.Errorf("Could not get destination for tag %v", dr.DestinationId)
return fmt.Errorf("could not get destination for tag %v", dr.DestinationId)
}
}
}
@@ -134,12 +135,12 @@ func (tpr *TpReader) LoadRatingPlans() (err error) {
for _, rplBnd := range rplBnds {
t, exists := tpr.timings[rplBnd.TimingId]
if !exists {
return fmt.Errorf("Could not get timing for tag %v", rplBnd.TimingId)
return fmt.Errorf("could not get timing for tag %v", rplBnd.TimingId)
}
rplBnd.SetTiming(t)
drs, exists := tpr.destinationRates[rplBnd.DestinationRatesId]
if !exists {
return fmt.Errorf("Could not find destination rate for tag %v", rplBnd.DestinationRatesId)
return fmt.Errorf("could not find destination rate for tag %v", rplBnd.DestinationRatesId)
}
plan, exists := tpr.ratingPlans[tag]
if !exists {
@@ -178,14 +179,14 @@ func (tpr *TpReader) LoadRatingProfiles() (err error) {
for _, tpRa := range tpRpf.RatingPlanActivations {
at, err := utils.ParseDate(tpRa.ActivationTime)
if err != nil {
return fmt.Errorf("Cannot parse activation time from %v", tpRa.ActivationTime)
return fmt.Errorf("cannot parse activation time from %v", tpRa.ActivationTime)
}
_, exists := tpr.ratingPlans[tpRa.RatingPlanId]
if !exists {
if dbExists, err := tpr.ratingStorage.HasData(RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil {
return err
} else if !dbExists {
return fmt.Errorf("Could not load rating plans for tag: %v", tpRa.RatingPlanId)
return fmt.Errorf("could not load rating plans for tag: %v", tpRa.RatingPlanId)
}
}
rpf.RatingPlanActivations = append(rpf.RatingPlanActivations,
@@ -201,6 +202,331 @@ func (tpr *TpReader) LoadRatingProfiles() (err error) {
return nil
}
func (tpr *TpReader) LoadSharedGroups() (err error) {
tps, err := tpr.lr.GetTpSharedGroups(tpr.tpid, "")
if err != nil {
return err
}
storSgs, err := TpSharedGroups(tps).GetSharedGroups()
if err != nil {
return err
}
for tag, tpSgs := range storSgs {
sg, exists := tpr.sharedGroups[tag]
if !exists {
sg = &SharedGroup{
Id: tag,
AccountParameters: make(map[string]*SharingParameters, len(tpSgs)),
}
}
for _, tpSg := range tpSgs {
sg.AccountParameters[tpSg.Account] = &SharingParameters{
Strategy: tpSg.Strategy,
RatingSubject: tpSg.RatingSubject,
}
}
tpr.sharedGroups[tag] = sg
}
return nil
}
func (tpr *TpReader) LoadLCRs() (err error) {
tps, err := tpr.lr.GetTpLCRs(tpr.tpid, "")
if err != nil {
return err
}
for _, tpLcr := range tps {
tag := utils.LCRKey(tpLcr.Direction, tpLcr.Tenant, tpLcr.Category, tpLcr.Account, tpLcr.Subject)
activationTime, _ := utils.ParseTimeDetectLayout(tpLcr.ActivationTime)
lcr, found := tpr.lcrs[tag]
if !found {
lcr = &LCR{
Direction: tpLcr.Direction,
Tenant: tpLcr.Tenant,
Category: tpLcr.Category,
Account: tpLcr.Account,
Subject: tpLcr.Subject,
}
}
var act *LCRActivation
for _, existingAct := range lcr.Activations {
if existingAct.ActivationTime.Equal(activationTime) {
act = existingAct
break
}
}
if act == nil {
act = &LCRActivation{
ActivationTime: activationTime,
}
lcr.Activations = append(lcr.Activations, act)
}
act.Entries = append(act.Entries, &LCREntry{
DestinationId: tpLcr.DestinationTag,
RPCategory: tpLcr.Category,
Strategy: tpLcr.Strategy,
StrategyParams: tpLcr.StrategyParams,
Weight: tpLcr.Weight,
})
tpr.lcrs[tag] = lcr
}
return nil
}
func (tpr *TpReader) LoadActions() (err error) {
tps, err := tpr.lr.GetTpActions(tpr.tpid, "")
if err != nil {
return err
}
storActs, err := TpActions(tps).GetActions()
if err != nil {
return err
}
// map[string][]*Action
for tag, tpacts := range storActs {
acts := make([]*Action, len(tpacts))
for idx, tpact := range tpacts {
acts[idx] = &Action{
Id: tag + strconv.Itoa(idx),
ActionType: tpact.Identifier,
BalanceType: tpact.BalanceType,
Direction: tpact.Direction,
Weight: tpact.Weight,
ExtraParameters: tpact.ExtraParameters,
ExpirationString: tpact.ExpiryTime,
Balance: &Balance{
Uuid: utils.GenUUID(),
Id: tpact.BalanceId,
Value: tpact.Units,
Weight: tpact.BalanceWeight,
TimingIDs: tpact.TimingTags,
RatingSubject: tpact.RatingSubject,
Category: tpact.Category,
DestinationIds: tpact.DestinationIds,
},
}
// load action timings from tags
if acts[idx].Balance.TimingIDs != "" {
timingIds := strings.Split(acts[idx].Balance.TimingIDs, utils.INFIELD_SEP)
for _, timingID := range timingIds {
if timing, found := tpr.timings[timingID]; found {
acts[idx].Balance.Timings = append(acts[idx].Balance.Timings, &RITiming{
Years: timing.Years,
Months: timing.Months,
MonthDays: timing.MonthDays,
WeekDays: timing.WeekDays,
StartTime: timing.StartTime,
EndTime: timing.EndTime,
})
} else {
return fmt.Errorf("could not find timing: %v", timingID)
}
}
}
}
tpr.actions[tag] = acts
}
return nil
}
func (tpr *TpReader) LoadActionPlans() (err error) {
tps, err := tpr.lr.GetTpActionPlans(tpr.tpid, "")
if err != nil {
return err
}
storAps, err := TpActionPlans(tps).GetActionPlans()
if err != nil {
return err
}
for atId, ats := range storAps {
for _, at := range ats {
_, exists := tpr.actions[at.ActionsId]
if !exists {
return fmt.Errorf("actionTiming: Could not load the action for tag: %v", at.ActionsId)
}
t, exists := tpr.timings[at.TimingId]
if !exists {
return fmt.Errorf("actionTiming: Could not load the timing for tag: %v", at.TimingId)
}
actTmg := &ActionTiming{
Uuid: utils.GenUUID(),
Id: atId,
Weight: at.Weight,
Timing: &RateInterval{
Timing: &RITiming{
Years: t.Years,
Months: t.Months,
MonthDays: t.MonthDays,
WeekDays: t.WeekDays,
StartTime: t.StartTime,
},
},
ActionsId: at.ActionsId,
}
tpr.actionsTimings[atId] = append(tpr.actionsTimings[atId], actTmg)
}
}
return nil
}
func (tpr *TpReader) LoadActionTriggers() (err error) {
tps, err := tpr.lr.GetTpActionTriggers(tpr.tpid, "")
if err != nil {
return err
}
storAts, err := TpActionTriggers(tps).GetActionTriggers()
if err != nil {
return err
}
for key, atrsLst := range storAts {
atrs := make([]*ActionTrigger, len(atrsLst))
for idx, atr := range atrsLst {
balanceExpirationDate, _ := utils.ParseTimeDetectLayout(atr.BalanceExpirationDate)
id := atr.Id
if id == "" {
id = utils.GenUUID()
}
minSleep, err := utils.ParseDurationWithSecs(atr.MinSleep)
if err != nil {
return err
}
atrs[idx] = &ActionTrigger{
Id: id,
ThresholdType: atr.ThresholdType,
ThresholdValue: atr.ThresholdValue,
Recurrent: atr.Recurrent,
MinSleep: minSleep,
BalanceId: atr.BalanceId,
BalanceType: atr.BalanceType,
BalanceDirection: atr.BalanceDirection,
BalanceDestinationIds: atr.BalanceDestinationIds,
BalanceWeight: atr.BalanceWeight,
BalanceExpirationDate: balanceExpirationDate,
BalanceTimingTags: atr.BalanceTimingTags,
BalanceRatingSubject: atr.BalanceRatingSubject,
BalanceCategory: atr.BalanceCategory,
BalanceSharedGroup: atr.BalanceSharedGroup,
Weight: atr.Weight,
ActionsId: atr.ActionsId,
MinQueuedItems: atr.MinQueuedItems,
}
if atrs[idx].Id == "" {
atrs[idx].Id = utils.GenUUID()
}
}
tpr.actionsTriggers[key] = atrs
}
return nil
}
func (tpr *TpReader) LoadAccountActions() (err error) {
tps, err := tpr.lr.GetTpAccountActions(nil)
if err != nil {
return err
}
storAts, err := TpAccountActions(tps).GetAccountActions()
if err != nil {
return err
}
for _, aa := range storAts {
if _, alreadyDefined := tpr.accountActions[aa.KeyId()]; alreadyDefined {
return fmt.Errorf("Duplicate account action found: %s", aa.KeyId())
}
// extract aliases from subject
aliases := strings.Split(aa.Account, ";")
tpr.dirtyAccAliases = append(tpr.dirtyAccAliases, &TenantAccount{Tenant: aa.Tenant, Account: aliases[0]})
if len(aliases) > 1 {
aa.Account = aliases[0]
for _, alias := range aliases[1:] {
tpr.accAliases[utils.AccountAliasKey(aa.Tenant, alias)] = aa.Account
}
}
aTriggers, exists := tpr.actionsTriggers[aa.ActionTriggersId]
if !exists {
return fmt.Errorf("Could not get action triggers for tag %v", aa.ActionTriggersId)
}
ub := &Account{
Id: aa.KeyId(),
ActionTriggers: aTriggers,
}
tpr.accountActions[aa.KeyId()] = ub
aTimings, exists := tpr.actionsTimings[aa.ActionPlanId]
if !exists {
log.Printf("Could not get action timing for tag %v", aa.ActionPlanId)
// must not continue here
}
for _, at := range aTimings {
at.AccountIds = append(at.AccountIds, aa.KeyId())
}
}
return nil
}
func (tpr *TpReader) LoadDerivedChargers() (err error) {
tps, err := tpr.lr.GetTpDerivedChargers(nil)
if err != nil {
return err
}
storDcs, err := TpDerivedChargers(tps).GetDerivedChargers()
if err != nil {
return err
}
for _, tpDcs := range storDcs {
tag := tpDcs.GetDerivedChargersKey()
if _, hasIt := tpr.derivedChargers[tag]; !hasIt {
tpr.derivedChargers[tag] = make(utils.DerivedChargers, 0) // Load object map since we use this method also from LoadDerivedChargers
}
for _, tpDc := range tpDcs.DerivedChargers {
if dc, err := utils.NewDerivedCharger(tpDc.RunId, tpDc.RunFilters, tpDc.ReqTypeField, tpDc.DirectionField, tpDc.TenantField, tpDc.CategoryField,
tpDc.AccountField, tpDc.SubjectField, tpDc.DestinationField, tpDc.SetupTimeField, tpDc.AnswerTimeField, tpDc.UsageField, tpDc.SupplierField,
tpDc.DisconnectCauseField); err != nil {
return err
} else {
tpr.derivedChargers[tag] = append(tpr.derivedChargers[tag], dc)
}
}
}
return nil
}
func (tpr *TpReader) LoadCdrStats() (err error) {
tps, err := tpr.lr.GetTpCdrStats(tpr.tpid, "")
if err != nil {
return err
}
storStats, err := TpCdrStats(tps).GetCdrStats()
if err != nil {
return err
}
for tag, tpStats := range storStats {
for _, tpStat := range tpStats {
var cs *CdrStats
var exists bool
if cs, exists = tpr.cdrStats[tag]; !exists {
cs = &CdrStats{Id: tag}
}
triggerTag := tpStat.ActionTriggers
triggers, exists := tpr.actionsTriggers[triggerTag]
if triggerTag != "" && !exists {
// only return error if there was something there for the tag
return fmt.Errorf("Could not get action triggers for cdr stats id %s: %s", cs.Id, triggerTag)
}
UpdateCdrStats(cs, triggers, tpStat)
tpr.cdrStats[tag] = cs
}
}
return nil
}
func (tpr *TpReader) LoadAll() error {
var err error
if err = tpr.LoadDestinations(); err != nil {
@@ -230,7 +556,7 @@ func (tpr *TpReader) LoadAll() error {
if err = tpr.LoadActions(); err != nil {
return err
}
if err = tpr.LoadActionTimings(); err != nil {
if err = tpr.LoadActionPlans(); err != nil {
return err
}
if err = tpr.LoadActionTriggers(); err != nil {
@@ -269,7 +595,7 @@ func (tpr *TpReader) IsValid() bool {
func (tpr *TpReader) WriteToDatabase(dataStorage RatingStorage, accountingStorage AccountingStorage, flush, verbose bool) (err error) {
if dataStorage == nil {
return errors.New("No database connection!")
return errors.New("no database connection!")
}
if flush {
dataStorage.Flush("")