Merge branch 'master' into hapool

This commit is contained in:
Radu Ioan Fericean
2015-08-03 21:05:39 +03:00
33 changed files with 512 additions and 420 deletions

View File

@@ -109,7 +109,7 @@ func (b *Balance) IsActiveAt(t time.Time) bool {
return true
}
for _, tim := range b.Timings {
if tim.IsActiveAt(t, false) {
if tim.IsActiveAt(t) {
return true
}
}

View File

@@ -55,6 +55,7 @@ func TestMultipleResultMerge(t *testing.T) {
cd := &CallDescriptor{Direction: OUTBOUND, Category: "0", Tenant: "vdf", Subject: "rif", Destination: "0256", TimeStart: t1, TimeEnd: t2}
cc1, _ := cd.getCost()
if cc1.Cost != 61 {
//ils.LogFull(cc1)
t.Errorf("expected 61 was %v", cc1.Cost)
for _, ts := range cc1.Timespans {
t.Log(ts.RateInterval)

View File

@@ -339,6 +339,7 @@ func (cd *CallDescriptor) splitInTimeSpans() (timespans []*TimeSpan) {
for i := 0; i < len(timespans); i++ {
newTs := timespans[i].SplitByRatingPlan(rp)
if newTs != nil {
//log.Print("NEW TS", newTs.TimeStart)
timespans = append(timespans, newTs)
} else {
afterEnd = true
@@ -347,8 +348,23 @@ func (cd *CallDescriptor) splitInTimeSpans() (timespans []*TimeSpan) {
}
}
}
}
// split on days
/*for i := 0; i < len(timespans); i++ {
if timespans[i].TimeStart.Day() != timespans[i].TimeEnd.Day() {
//log.Print("TS: ", timespans[i].TimeStart, timespans[i].TimeEnd)
start := timespans[i].TimeStart
newTs := timespans[i].SplitByTime(time.Date(start.Year(), start.Month(), start.Day(), 0, 0, 0, 0, start.Location()).Add(24 * time.Hour))
if newTs != nil {
//log.Print("NEW TS: ", newTs.TimeStart, newTs.TimeEnd)
// insert the new timespan
index := i + 1
timespans = append(timespans, nil)
copy(timespans[index+1:], timespans[index:])
timespans[index] = newTs
}
}
}*/
// Logger.Debug(fmt.Sprintf("After SplitByRatingPlan: %+v", timespans))
// split on rate intervals
for i := 0; i < len(timespans); i++ {
@@ -358,16 +374,20 @@ func (cd *CallDescriptor) splitInTimeSpans() (timespans []*TimeSpan) {
// Logger.Debug(fmt.Sprintf("rp: %+v", rp))
//timespans[i].RatingPlan = nil
rp.RateIntervals.Sort()
/*for _, interval := range rp.RateIntervals {
if !timespans[i].hasBetterRateIntervalThan(interval) {
timespans[i].SetRateInterval(interval)
}
}*/
//log.Print("ORIG TS: ", timespans[i].TimeStart, timespans[i].TimeEnd)
//log.Print(timespans[i].RateInterval)
for _, interval := range rp.RateIntervals {
//log.Printf("\tINTERVAL: %+v", interval.Timing)
if timespans[i].hasBetterRateIntervalThan(interval) {
//log.Print("continue")
continue // if the timespan has an interval than it already has a heigher weight
}
newTs := timespans[i].SplitByRateInterval(interval, cd.TOR != utils.VOICE)
//utils.PrintFull(timespans[i])
//utils.PrintFull(newTs)
if newTs != nil {
//log.Print("NEW TS: ", newTs.TimeStart, newTs.TimeEnd)
newTs.setRatingInfo(rp)
// insert the new timespan
index := i + 1
@@ -379,6 +399,8 @@ func (cd *CallDescriptor) splitInTimeSpans() (timespans []*TimeSpan) {
}
}
}
//log.Print("TS: ", timespans[i].TimeStart, timespans[i].TimeEnd)
//log.Print(timespans[i].RateInterval.Timing)
}
//Logger.Debug(fmt.Sprintf("After SplitByRateInterval: %+v", timespans))
@@ -422,6 +444,7 @@ func (cd *CallDescriptor) GetDuration() time.Duration {
Creates a CallCost structure with the cost information calculated for the received CallDescriptor.
*/
func (cd *CallDescriptor) GetCost() (*CallCost, error) {
cd.account = nil // make sure it's not cached
cc, err := cd.getCost()
if err != nil {
return nil, err
@@ -586,6 +609,7 @@ func (origCD *CallDescriptor) getMaxSessionDuration(origAcc *Account) (time.Dura
}
func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err error) {
cd.account = nil // make sure it's not cached
if account, err := cd.getAccount(); err != nil || account == nil {
Logger.Err(fmt.Sprintf("Could not get user balance for <%s>: %s.", cd.GetAccountKey(), err.Error()))
return 0, err
@@ -639,6 +663,7 @@ func (cd *CallDescriptor) debit(account *Account, dryRun bool, goNegative bool)
}
func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
cd.account = nil // make sure it's not cached
// lock all group members
if account, err := cd.getAccount(); err != nil || account == nil {
Logger.Err(fmt.Sprintf("Could not get user balance for <%s>: %s.", cd.GetAccountKey(), err.Error()))
@@ -661,6 +686,7 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
// This methods combines the Debit and GetMaxSessionDuration and will debit the max available time as returned
// by the GetMaxSessionDuration method. The amount filed has to be filled in call descriptor.
func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
cd.account = nil // make sure it's not cached
if account, err := cd.getAccount(); err != nil || account == nil {
Logger.Err(fmt.Sprintf("Could not get user balance for <%s>: %s.", cd.GetAccountKey(), err.Error()))
return nil, err
@@ -692,6 +718,7 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
}
func (cd *CallDescriptor) RefundIncrements() (left float64, err error) {
cd.account = nil // make sure it's not cached
accountsCache := make(map[string]*Account)
for _, increment := range cd.Increments {
account, found := accountsCache[increment.BalanceInfo.AccountId]
@@ -770,6 +797,7 @@ func (cd *CallDescriptor) GetLCRFromStorage() (*LCR, error) {
}
func (cd *CallDescriptor) GetLCR(stats StatsInterface) (*LCRCost, error) {
cd.account = nil // make sure it's not cached
lcr, err := cd.GetLCRFromStorage()
if err != nil {
return nil, err

View File

@@ -195,7 +195,6 @@ func TestSplitSpansWeekend(t *testing.T) {
},
}
//log.Print("=============================")
timespans := cd.splitInTimeSpans()
if len(timespans) != 2 {
t.Log(cd.RatingInfos)
@@ -404,9 +403,10 @@ func TestSpansMultipleRatingPlans(t *testing.T) {
t1 := time.Date(2012, time.February, 7, 23, 50, 0, 0, time.UTC)
t2 := time.Date(2012, time.February, 8, 0, 30, 0, 0, time.UTC)
cd := &CallDescriptor{Direction: "*out", Category: "0", Tenant: "vdf", Subject: "rif", Destination: "0257308200", TimeStart: t1, TimeEnd: t2}
result, _ := cd.GetCost()
if result.Cost != 1200 || result.GetConnectFee() != 0 {
t.Errorf("Expected %v was %v", 1200, result)
cc, _ := cd.GetCost()
if cc.Cost != 2100 || cc.GetConnectFee() != 0 {
utils.LogFull(cc)
t.Errorf("Expected %v was %v (%v)", 2100, cc, cc.GetConnectFee())
}
}
@@ -977,7 +977,9 @@ func TestDebitNegatve(t *testing.T) {
t.Errorf("Error debiting from empty share: %+v", balanceMap[0].GetValue())
}
cc, err = cd.MaxDebit()
//utils.PrintFull(cc)
acc, _ = cd.getAccount()
balanceMap = acc.BalanceMap[utils.MONETARY+OUTBOUND]
//utils.LogFull(balanceMap)
if err != nil || cc.Cost != 2.5 {
t.Errorf("Debit from empty share error: %+v, %v", cc, err)
}

View File

@@ -324,7 +324,6 @@ func (self *CdrServer) rateCDR(storedCdr *StoredCdr) error {
func (self *CdrServer) replicateCdr(cdr *StoredCdr) error {
Logger.Debug(fmt.Sprintf("replicateCdr cdr: %+v, configuration: %+v", cdr, self.cgrCfg.CDRSCdrReplication))
for _, rplCfg := range self.cgrCfg.CDRSCdrReplication {
Logger.Debug(fmt.Sprintf("Replicating CDR with configuration: %+v", rplCfg))
passesFilters := true
for _, cdfFltr := range rplCfg.CdrFilter {
if fltrPass, _ := cdr.PassesFieldFilter(cdfFltr); !fltrPass {

View File

@@ -337,25 +337,25 @@ func (lc *LCRCost) SortLoadDistribution() {
/*for supplier, sq := range supplierQueues {
log.Printf("Useful supplier qeues: %s %v", supplier, sq.conf.TimeWindow)
}*/
// if all have less than ponder return random order
// if some have a cdr count not divisible by ponder return them first and all ordered by cdr times, oldest first
// if all have a multiple of ponder return in the order of cdr times, oldest first
// if all have less than ratio return random order
// if some have a cdr count not divisible by ratio return them first and all ordered by cdr times, oldest first
// if all have a multiple of ratio return in the order of cdr times, oldest first
// first put them in one of the above categories
havePonderlessSuppliers := false
haveRatiolessSuppliers := false
for supCost, sq := range supplierQueues {
ponder := lc.GetSupplierPonder(supCost.Supplier)
if ponder == -1 {
ratio := lc.GetSupplierRatio(supCost.Supplier)
if ratio == -1 {
supCost.Cost = -1
havePonderlessSuppliers = true
haveRatiolessSuppliers = true
continue
}
cdrCount := len(sq.Cdrs)
if cdrCount < ponder {
if cdrCount < ratio {
supCost.Cost = float64(LOW_PRIORITY_LIMIT + rand.Intn(RAND_LIMIT))
continue
}
if cdrCount%ponder == 0 {
if cdrCount%ratio == 0 {
supCost.Cost = float64(MED_PRIORITY_LIMIT+rand.Intn(RAND_LIMIT)) + (time.Now().Sub(sq.Cdrs[len(sq.Cdrs)-1].SetupTime).Seconds() / RAND_LIMIT)
continue
} else {
@@ -363,7 +363,7 @@ func (lc *LCRCost) SortLoadDistribution() {
continue
}
}
if havePonderlessSuppliers {
if haveRatiolessSuppliers {
var filteredSupplierCost []*LCRSupplierCost
for _, supCost := range lc.SupplierCosts {
if supCost.Cost != -1 {
@@ -375,35 +375,35 @@ func (lc *LCRCost) SortLoadDistribution() {
}
// used in load distribution strategy only
// receives a long supplier id and will return the ponder found in strategy params
func (lc *LCRCost) GetSupplierPonder(supplier string) int {
// receives a long supplier id and will return the ratio found in strategy params
func (lc *LCRCost) GetSupplierRatio(supplier string) int {
// parse strategy params
ponders := make(map[string]int)
ratios := make(map[string]int)
params := strings.Split(lc.Entry.StrategyParams, utils.INFIELD_SEP)
for _, param := range params {
ponderSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP)
if len(ponderSlice) != 2 {
ratioSlice := strings.Split(param, utils.CONCATENATED_KEY_SEP)
if len(ratioSlice) != 2 {
Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams))
continue
}
p, err := strconv.Atoi(ponderSlice[1])
p, err := strconv.Atoi(ratioSlice[1])
if err != nil {
Logger.Warning(fmt.Sprintf("bad format in load distribution strategy param: %s", lc.Entry.StrategyParams))
continue
}
ponders[ponderSlice[0]] = p
ratios[ratioSlice[0]] = p
}
parts := strings.Split(supplier, utils.CONCATENATED_KEY_SEP)
if len(parts) > 0 {
supplierSubject := parts[len(parts)-1]
if ponder, found := ponders[supplierSubject]; found {
return ponder
if ratio, found := ratios[supplierSubject]; found {
return ratio
}
if ponder, found := ponders[utils.META_DEFAULT]; found {
return ponder
if ratio, found := ratios[utils.META_DEFAULT]; found {
return ratio
}
}
if len(ponders) == 0 {
if len(ratios) == 0 {
return 1 // use random/last cdr date sorting
}
return -1 // exclude missing suppliers

View File

@@ -215,11 +215,11 @@ func TestLcrGet(t *testing.T) {
func TestLcrRequestAsCallDescriptor(t *testing.T) {
sTime := time.Date(2015, 04, 06, 17, 40, 0, 0, time.UTC)
callDur := time.Duration(1) * time.Minute
lcrReq := &LcrRequest{Account: "1001", StartTime: sTime.String()}
lcrReq := &LcrRequest{Account: "2001", StartTime: sTime.String()}
if _, err := lcrReq.AsCallDescriptor(); err == nil || err != utils.ErrMandatoryIeMissing {
t.Error("Unexpected error received: %v", err)
}
lcrReq = &LcrRequest{Account: "1001", Destination: "1002", StartTime: sTime.String()}
lcrReq = &LcrRequest{Account: "2001", Destination: "2002", StartTime: sTime.String()}
eCd := &CallDescriptor{
Direction: utils.OUT,
Tenant: config.CgrConfig().DefaultTenant,
@@ -412,7 +412,7 @@ func TestLCRCostSuppliersLoadAllRounded(t *testing.T) {
Supplier: "*out:tenant12:call:dan12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
@@ -452,7 +452,7 @@ func TestLCRCostSuppliersLoadAllRounded(t *testing.T) {
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(300 * time.Minute)}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
@@ -512,7 +512,7 @@ func TestLCRCostSuppliersLoadAllOver(t *testing.T) {
Supplier: "*out:tenant12:call:dan12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
@@ -552,7 +552,7 @@ func TestLCRCostSuppliersLoadAllOver(t *testing.T) {
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(300 * time.Minute)}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
@@ -612,7 +612,7 @@ func TestLCRCostSuppliersLoadAllOverMisingDefault(t *testing.T) {
Supplier: "*out:tenant12:call:dan12",
supplierQueues: []*StatsQueue{
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(100 * time.Minute)}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(200 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,
@@ -652,7 +652,7 @@ func TestLCRCostSuppliersLoadAllOverMisingDefault(t *testing.T) {
},
},
&StatsQueue{
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(300 * time.Minute)}},
Cdrs: []*QCdr{&QCdr{}, &QCdr{}, &QCdr{SetupTime: setupTime.Add(400 * time.Minute)}},
conf: &CdrStats{
QueueLength: 0,
TimeWindow: 10 * time.Minute,

View File

@@ -116,13 +116,44 @@ func (rit *RITiming) CronString() string {
return rit.cronString
}
// Returns wheter the Timing is active at the specified time
func (rit *RITiming) IsActiveAt(t time.Time, endTime bool) bool {
// if the received time represents an endtime consider it 24 instead of 0
hour := t.Hour()
if endTime && hour == 0 {
hour = 24
/*
Returns a time object that represents the end of the interval realtive to the received time
*/
func (rit *RITiming) getRightMargin(t time.Time) (rigthtTime time.Time) {
year, month, day := t.Year(), t.Month(), t.Day()
hour, min, sec, nsec := 23, 59, 59, 0
loc := t.Location()
if rit.EndTime != "" {
split := strings.Split(rit.EndTime, ":")
hour, _ = strconv.Atoi(split[0])
min, _ = strconv.Atoi(split[1])
sec, _ = strconv.Atoi(split[2])
//log.Print("RIGHT1: ", time.Date(year, month, day, hour, min, sec, nsec, loc))
return time.Date(year, month, day, hour, min, sec, nsec, loc)
}
//log.Print("RIGHT2: ", time.Date(year, month, day, hour, min, sec, nsec, loc).Add(time.Second))
return time.Date(year, month, day, hour, min, sec, nsec, loc).Add(time.Second)
}
/*
Returns a time object that represents the start of the interval realtive to the received time
*/
func (rit *RITiming) getLeftMargin(t time.Time) (rigthtTime time.Time) {
year, month, day := t.Year(), t.Month(), t.Day()
hour, min, sec, nsec := 0, 0, 0, 0
loc := t.Location()
if rit.StartTime != "" {
split := strings.Split(rit.StartTime, ":")
hour, _ = strconv.Atoi(split[0])
min, _ = strconv.Atoi(split[1])
sec, _ = strconv.Atoi(split[2])
}
//log.Print("LEFT: ", time.Date(year, month, day, hour, min, sec, nsec, loc))
return time.Date(year, month, day, hour, min, sec, nsec, loc)
}
// Returns wheter the Timing is active at the specified time
func (rit *RITiming) IsActiveAt(t time.Time) bool {
// check for years
if len(rit.Years) > 0 && !rit.Years.Contains(t.Year()) {
return false
@@ -139,38 +170,25 @@ func (rit *RITiming) IsActiveAt(t time.Time, endTime bool) bool {
if len(rit.WeekDays) > 0 && !rit.WeekDays.Contains(t.Weekday()) {
return false
}
//log.Print("Time: ", t)
//log.Print("Left Margin: ", rit.getLeftMargin(t))
// check for start hour
if rit.StartTime != "" {
split := strings.Split(rit.StartTime, ":")
sh, _ := strconv.Atoi(split[0])
sm, _ := strconv.Atoi(split[1])
ss, _ := strconv.Atoi(split[2])
// if the hour result before or result the same hour but the minute result before
if hour < sh ||
(hour == sh && t.Minute() < sm) ||
(hour == sh && t.Minute() == sm && t.Second() < ss) {
return false
}
if t.Before(rit.getLeftMargin(t)) {
return false
}
//log.Print("Right Margin: ", rit.getRightMargin(t))
// check for end hour
if rit.EndTime != "" {
split := strings.Split(rit.EndTime, ":")
eh, _ := strconv.Atoi(split[0])
em, _ := strconv.Atoi(split[1])
es, _ := strconv.Atoi(split[2])
// if the hour result after or result the same hour but the minute result after
if hour > eh ||
(hour == eh && t.Minute() > em) ||
(hour == eh && t.Minute() == em && t.Second() > es) {
return false
}
if t.After(rit.getRightMargin(t)) {
return false
}
return true
}
// IsActive returns wheter the Timing is active now
func (rit *RITiming) IsActive() bool {
return rit.IsActiveAt(time.Now(), false)
return rit.IsActiveAt(time.Now())
}
func (rit *RITiming) IsBlank() bool {
@@ -271,43 +289,12 @@ func (pg *RateGroups) AddRate(ps ...*Rate) {
Returns true if the received time result inside the interval
*/
func (i *RateInterval) Contains(t time.Time, endTime bool) bool {
return i.Timing.IsActiveAt(t, endTime)
}
/*
Returns a time object that represents the end of the interval realtive to the received time
*/
func (i *RateInterval) getRightMargin(t time.Time) (rigthtTime time.Time) {
year, month, day := t.Year(), t.Month(), t.Day()
hour, min, sec, nsec := 23, 59, 59, 0
loc := t.Location()
if i.Timing.EndTime != "" {
split := strings.Split(i.Timing.EndTime, ":")
hour, _ = strconv.Atoi(split[0])
min, _ = strconv.Atoi(split[1])
sec, _ = strconv.Atoi(split[2])
//log.Print("RIGHT1: ", time.Date(year, month, day, hour, min, sec, nsec, loc))
return time.Date(year, month, day, hour, min, sec, nsec, loc)
if endTime {
if t.Hour() == 0 && t.Minute() == 0 && t.Second() == 0 { // back one second to 23:59:59
t = t.Add(-1 * time.Second)
}
}
//log.Print("RIGHT2: ", time.Date(year, month, day, hour, min, sec, nsec, loc).Add(time.Second))
return time.Date(year, month, day, hour, min, sec, nsec, loc).Add(time.Second)
}
/*
Returns a time object that represents the start of the interval realtive to the received time
*/
func (i *RateInterval) getLeftMargin(t time.Time) (rigthtTime time.Time) {
year, month, day := t.Year(), t.Month(), t.Day()
hour, min, sec, nsec := 0, 0, 0, 0
loc := t.Location()
if i.Timing.StartTime != "" {
split := strings.Split(i.Timing.StartTime, ":")
hour, _ = strconv.Atoi(split[0])
min, _ = strconv.Atoi(split[1])
sec, _ = strconv.Atoi(split[2])
}
//log.Print("LEFT: ", time.Date(year, month, day, hour, min, sec, nsec, loc))
return time.Date(year, month, day, hour, min, sec, nsec, loc)
return i.Timing.IsActiveAt(t)
}
func (i *RateInterval) String_DISABLED() string {
@@ -376,7 +363,7 @@ func (il RateIntervalList) Swap(i, j int) {
// we need higher weights earlyer in the list
func (il RateIntervalList) Less(j, i int) bool {
return il[i].Weight < il[j].Weight
return il[i].Weight < il[j].Weight //|| il[i].Timing.StartTime > il[j].Timing.StartTime
}
func (il RateIntervalList) Sort() {

View File

@@ -46,7 +46,7 @@ type Responder struct {
ExitChan chan bool
CdrSrv *CdrServer
Stats StatsInterface
Timeout time.Duration
cnt int64
responseCache *cache2go.ResponseCache
}
@@ -70,6 +70,7 @@ func (rs *Responder) getCache() *cache2go.ResponseCache {
RPC method thet provides the external RPC interface for getting the rating information.
*/
func (rs *Responder) GetCost(arg *CallDescriptor, reply *CallCost) (err error) {
rs.cnt += 1
if arg.Subject == "" {
arg.Subject = arg.Account
}

View File

@@ -71,10 +71,6 @@ type RatingStorage interface {
SetAccAlias(string, string) error
RemoveAccAliases([]*TenantAccount, bool) error
GetAccountAliases(string, string, bool) ([]string, error)
SetUser(*UserProfile) error
GetUser(string) (*UserProfile, error)
GetUsers() ([]*UserProfile, error)
RemoveUser(string) error
}
type AccountingStorage interface {
@@ -87,6 +83,10 @@ type AccountingStorage interface {
GetSubscribers() (map[string]*SubscriberData, error)
SetSubscriber(string, *SubscriberData) error
RemoveSubscriber(string) error
SetUser(*UserProfile) error
GetUser(string) (*UserProfile, error)
GetUsers() ([]*UserProfile, error)
RemoveUser(string) error
}
type CdrStorage interface {

View File

@@ -242,24 +242,13 @@ func (ts *TimeSpan) Contains(t time.Time) bool {
return t.After(ts.TimeStart) && t.Before(ts.TimeEnd)
}
/*
Will set the interval as spans's interval if new Weight is lower then span's interval Weight
or if the Weights are equal and new price is lower then spans's interval price
*/
func (ts *TimeSpan) SetRateInterval(i *RateInterval) {
//log.Printf("SETRATEINTERVAL: %+v", i.Timing)
// higher weights are better
if ts.RateInterval == nil || ts.RateInterval.Weight < i.Weight {
ts.RateInterval = i
//log.Printf("RET TS: %+v", ts.RateInterval.Timing)
func (ts *TimeSpan) SetRateInterval(interval *RateInterval) {
if interval == nil {
return
}
iPrice, _, _ := i.GetRateParameters(ts.GetGroupStart())
tsPrice, _, _ := ts.RateInterval.GetRateParameters(ts.GetGroupStart())
if ts.RateInterval.Weight == i.Weight && iPrice <= tsPrice {
ts.RateInterval = i
if !ts.hasBetterRateIntervalThan(interval) {
ts.RateInterval = interval
}
//log.Printf("END TS: %+v", ts.RateInterval.Timing)
}
// Returns the cost of the timespan according to the relevant cost interval.
@@ -354,7 +343,7 @@ func (ts *TimeSpan) SplitByRateInterval(i *RateInterval, data bool) (nts *TimeSp
for _, rate := range i.Rating.Rates {
//Logger.Debug(fmt.Sprintf("Rate: %+v", rate))
if ts.GetGroupStart() < rate.GroupIntervalStart && ts.GetGroupEnd() > rate.GroupIntervalStart {
// Logger.Debug(fmt.Sprintf("Splitting"))
//log.Print("Splitting")
ts.SetRateInterval(i)
splitTime := ts.TimeStart.Add(rate.GroupIntervalStart - ts.GetGroupStart())
nts = &TimeSpan{
@@ -387,8 +376,7 @@ func (ts *TimeSpan) SplitByRateInterval(i *RateInterval, data bool) (nts *TimeSp
// if only the start time is in the interval split the interval to the right
if i.Contains(ts.TimeStart, false) {
//log.Print("Start in interval")
splitTime := i.getRightMargin(ts.TimeStart)
splitTime := i.Timing.getRightMargin(ts.TimeStart)
ts.SetRateInterval(i)
if splitTime == ts.TimeStart || splitTime.Equal(ts.TimeEnd) {
return
@@ -408,7 +396,7 @@ func (ts *TimeSpan) SplitByRateInterval(i *RateInterval, data bool) (nts *TimeSp
if i.Contains(ts.TimeEnd, true) {
//log.Print("End in interval")
//tmpTime := time.Date(ts.TimeStart.)
splitTime := i.getLeftMargin(ts.TimeEnd)
splitTime := i.Timing.getLeftMargin(ts.TimeEnd)
splitTime = utils.CopyHour(splitTime, ts.TimeStart)
if splitTime.Equal(ts.TimeEnd) {
return
@@ -419,7 +407,6 @@ func (ts *TimeSpan) SplitByRateInterval(i *RateInterval, data bool) (nts *TimeSp
}
nts.copyRatingInfo(ts)
ts.TimeEnd = splitTime
nts.SetRateInterval(i)
nts.DurationIndex = ts.DurationIndex
ts.SetNewDurationIndex(nts)
@@ -429,6 +416,22 @@ func (ts *TimeSpan) SplitByRateInterval(i *RateInterval, data bool) (nts *TimeSp
return
}
/*func (ts *TimeSpan) SplitByTime(splitTime time.Time) (nts *TimeSpan) {
if splitTime.Equal(ts.TimeEnd) {
return
}
nts = &TimeSpan{
TimeStart: splitTime,
TimeEnd: ts.TimeEnd,
}
nts.copyRatingInfo(ts)
ts.TimeEnd = splitTime
nts.SetRateInterval(ts.RateInterval)
nts.DurationIndex = ts.DurationIndex
ts.SetNewDurationIndex(nts)
return
}*/
// Split the timespan at the given increment start
func (ts *TimeSpan) SplitByIncrement(index int) *TimeSpan {
if index <= 0 || index >= len(ts.Increments) {
@@ -552,30 +555,46 @@ func (ts *TimeSpan) AddIncrement(inc *Increment) {
}
func (ts *TimeSpan) hasBetterRateIntervalThan(interval *RateInterval) bool {
if interval.Timing == nil {
return false
}
otherLeftMargin := interval.Timing.getLeftMargin(ts.TimeStart)
otherDistance := ts.TimeStart.Sub(otherLeftMargin)
//log.Print("OTHER LEFT: ", otherLeftMargin)
//log.Print("OTHER DISTANCE: ", otherDistance)
// if the distance is negative it's not usable
if otherDistance < 0 {
return true
}
//log.Print("RI: ", ts.RateInterval)
if ts.RateInterval == nil {
return false
}
//log.Print("StartTime: ", ts.TimeStart)
//log.Printf("OWN: %+v", ts.RateInterval)
//log.Printf("OTHER: %+v", interval)
// the higher the weight the better
if ts.RateInterval != nil &&
ts.RateInterval.Weight > interval.Weight {
return true
ts.RateInterval.Weight < interval.Weight {
return false
}
// check interval is closer than the new one
ownLeftMargin := ts.RateInterval.getLeftMargin(ts.TimeStart)
otherLeftMargin := interval.getLeftMargin(ts.TimeStart)
ownLeftMargin := ts.RateInterval.Timing.getLeftMargin(ts.TimeStart)
ownDistance := ts.TimeStart.Sub(ownLeftMargin)
otherDistance := ts.TimeStart.Sub(otherLeftMargin)
endOtherDistance := ts.TimeEnd.Sub(otherLeftMargin)
// if thr distance is negative relative to both ends it's not usable
if otherDistance < 0 && endOtherDistance < 0 {
return true
}
//log.Print("OWN LEFT: ", otherLeftMargin)
//log.Print("OWN DISTANCE: ", otherDistance)
//endOtherDistance := ts.TimeEnd.Sub(otherLeftMargin)
// if own interval is closer than its better
if ownDistance <= otherDistance {
//log.Print(ownDistance)
if ownDistance > otherDistance {
return false
}
ownPrice, _, _ := ts.RateInterval.GetRateParameters(ts.GetGroupStart())
otherPrice, _, _ := interval.GetRateParameters(ts.GetGroupStart())
// if own price is smaller than it's better
//log.Print(ownPrice, otherPrice)
if ownPrice < otherPrice {
return true
}
return false
return true
}

View File

@@ -217,7 +217,12 @@ func TestTimespanGetCost(t *testing.T) {
if ts1.getCost() != 0 {
t.Error("No interval and still kicking")
}
ts1.SetRateInterval(&RateInterval{Rating: &RIRate{Rates: RateGroups{&Rate{0, 1.0, 1 * time.Second, 1 * time.Second}}}})
ts1.SetRateInterval(
&RateInterval{
Timing: &RITiming{},
Rating: &RIRate{Rates: RateGroups{&Rate{0, 1.0, 1 * time.Second, 1 * time.Second}}},
},
)
if ts1.getCost() != 600 {
t.Error("Expected 10 got ", ts1.Cost)
}
@@ -240,10 +245,18 @@ func TestTimespanGetCostIntervals(t *testing.T) {
}
func TestSetRateInterval(t *testing.T) {
i1 := &RateInterval{Rating: &RIRate{Rates: RateGroups{&Rate{0, 1.0, 1 * time.Second, 1 * time.Second}}}}
i1 := &RateInterval{
Timing: &RITiming{},
Rating: &RIRate{Rates: RateGroups{&Rate{0, 1.0, 1 * time.Second, 1 * time.Second}}},
}
ts1 := TimeSpan{RateInterval: i1}
i2 := &RateInterval{Rating: &RIRate{Rates: RateGroups{&Rate{0, 2.0, 1 * time.Second, 1 * time.Second}}}}
ts1.SetRateInterval(i2)
i2 := &RateInterval{
Timing: &RITiming{},
Rating: &RIRate{Rates: RateGroups{&Rate{0, 2.0, 1 * time.Second, 1 * time.Second}}},
}
if !ts1.hasBetterRateIntervalThan(i2) {
ts1.SetRateInterval(i2)
}
if ts1.RateInterval != i1 {
t.Error("Smaller price interval should win")
}

View File

@@ -1063,7 +1063,7 @@ func (tpr *TpReader) LoadUsersFiltered(filter *TpUser) (bool, error) {
for _, tpUser := range tpUsers {
user.Profile[tpUser.AttributeName] = tpUser.AttributeValue
}
tpr.ratingStorage.SetUser(user)
tpr.accountingStorage.SetUser(user)
return len(tpUsers) > 0, err
}
@@ -1306,7 +1306,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) {
log.Print("Users:")
}
for _, u := range tpr.users {
err = tpr.ratingStorage.SetUser(u)
err = tpr.accountingStorage.SetUser(u)
if err != nil {
return err
}
@@ -1459,6 +1459,14 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) {
i++
}
return keys, nil
case utils.USERS_PREFIX:
keys := make([]string, len(tpr.users))
i := 0
for k := range tpr.users {
keys[i] = k
i++
}
return keys, nil
}
return nil, errors.New("Unsupported category")
}

View File

@@ -1,6 +1,7 @@
package engine
import (
"fmt"
"sort"
"strings"
"sync"
@@ -55,46 +56,83 @@ type UserService interface {
GetUsers(UserProfile, *UserProfiles) error
AddIndex([]string, *string) error
GetIndexes(string, *map[string][]string) error
ReloadUsers(string, *string) error
}
type UserMap struct {
table map[string]map[string]string
index map[string]map[string]bool
indexKeys []string
ratingDb RatingStorage
mu sync.RWMutex
table map[string]map[string]string
index map[string]map[string]bool
indexKeys []string
accountingDb AccountingStorage
mu sync.RWMutex
}
func NewUserMap(ratingDb RatingStorage) (*UserMap, error) {
um := newUserMap(ratingDb)
// load from rating db
if ups, err := um.ratingDb.GetUsers(); err == nil {
for _, up := range ups {
um.table[up.GetId()] = up.Profile
}
} else {
func NewUserMap(accountingDb AccountingStorage, indexes []string) (*UserMap, error) {
um := newUserMap(accountingDb, indexes)
var reply string
if err := um.ReloadUsers("", &reply); err != nil {
return nil, err
}
return um, nil
}
func newUserMap(ratingDb RatingStorage) *UserMap {
func newUserMap(accountingDb AccountingStorage, indexes []string) *UserMap {
return &UserMap{
table: make(map[string]map[string]string),
index: make(map[string]map[string]bool),
ratingDb: ratingDb,
table: make(map[string]map[string]string),
index: make(map[string]map[string]bool),
indexKeys: indexes,
accountingDb: accountingDb,
}
}
func (um *UserMap) ReloadUsers(in string, reply *string) error {
um.mu.Lock()
// backup old data
oldTable := um.table
oldIndex := um.index
um.table = make(map[string]map[string]string)
um.index = make(map[string]map[string]bool)
// load from rating db
if ups, err := um.accountingDb.GetUsers(); err == nil {
for _, up := range ups {
um.table[up.GetId()] = up.Profile
}
} else {
// restore old data before return
um.table = oldTable
um.index = oldIndex
*reply = err.Error()
return err
}
um.mu.Unlock()
if len(um.indexKeys) != 0 {
var s string
if err := um.AddIndex(um.indexKeys, &s); err != nil {
Logger.Err(fmt.Sprintf("Error adding %v indexes to user profile service: %v", um.indexKeys, err))
um.table = oldTable
um.index = oldIndex
*reply = err.Error()
return err
}
}
*reply = utils.OK
return nil
}
func (um *UserMap) SetUser(up UserProfile, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
if err := um.ratingDb.SetUser(&up); err != nil {
if err := um.accountingDb.SetUser(&up); err != nil {
*reply = err.Error()
return err
}
um.table[up.GetId()] = up.Profile
um.addIndex(&up)
um.addIndex(&up, um.indexKeys)
*reply = utils.OK
return nil
}
@@ -102,7 +140,7 @@ func (um *UserMap) SetUser(up UserProfile, reply *string) error {
func (um *UserMap) RemoveUser(up UserProfile, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
if err := um.ratingDb.RemoveUser(up.GetId()); err != nil {
if err := um.accountingDb.RemoveUser(up.GetId()); err != nil {
*reply = err.Error()
return err
}
@@ -140,13 +178,13 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error {
UserName: up.UserName,
Profile: m,
}
if err := um.ratingDb.SetUser(finalUp); err != nil {
if err := um.accountingDb.SetUser(finalUp); err != nil {
*reply = err.Error()
return err
}
um.table[up.GetId()] = m
um.deleteIndex(oldUp)
um.addIndex(finalUp)
um.addIndex(finalUp, um.indexKeys)
*reply = utils.OK
return nil
}
@@ -186,7 +224,7 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error {
}
}
var candidates UserProfiles
candidates := make(UserProfiles, 0) // It should not return nil in case of no users but []
for key, values := range table {
ponder := 0
tableUP := &UserProfile{
@@ -235,19 +273,19 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error {
func (um *UserMap) AddIndex(indexes []string, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
um.indexKeys = indexes
um.indexKeys = append(um.indexKeys, indexes...)
for key, values := range um.table {
up := &UserProfile{Profile: values}
up.SetId(key)
um.addIndex(up)
um.addIndex(up, indexes)
}
*reply = utils.OK
return nil
}
func (um *UserMap) addIndex(up *UserProfile) {
func (um *UserMap) addIndex(up *UserProfile, indexes []string) {
key := up.GetId()
for _, index := range um.indexKeys {
for _, index := range indexes {
if index == "Tenant" {
if up.Tenant != "" {
indexKey := utils.ConcatenatedKey(index, up.Tenant)
@@ -369,6 +407,10 @@ func (ps *ProxyUserService) GetIndexes(in string, reply *map[string][]string) er
return ps.Client.Call("UsersV1.AddIndex", in, reply)
}
func (ps *ProxyUserService) ReloadUsers(in string, reply *string) error {
return ps.Client.Call("UsersV1.ReloadUsers", in, reply)
}
// extraFields - Field name in the interface containing extraFields information
func LoadUserProfile(in interface{}, extraFields string) (interface{}, error) {
if userService == nil { // no user service => no fun

View File

@@ -19,7 +19,7 @@ var testMap = UserMap{
}
func TestUsersAdd(t *testing.T) {
tm := newUserMap(ratingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -40,7 +40,7 @@ func TestUsersAdd(t *testing.T) {
}
func TestUsersUpdate(t *testing.T) {
tm := newUserMap(ratingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -71,7 +71,7 @@ func TestUsersUpdate(t *testing.T) {
}
func TestUsersUpdateNotFound(t *testing.T) {
tm := newUserMap(ratingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -89,7 +89,7 @@ func TestUsersUpdateNotFound(t *testing.T) {
}
func TestUsersUpdateInit(t *testing.T) {
tm := newUserMap(ratingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -115,7 +115,7 @@ func TestUsersUpdateInit(t *testing.T) {
}
func TestUsersRemove(t *testing.T) {
tm := newUserMap(ratingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -445,7 +445,7 @@ func TestUsersGetMissingIdTwoINdex(t *testing.T) {
}
func TestUsersAddUpdateRemoveIndexes(t *testing.T) {
tm := newUserMap(ratingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
tm.AddIndex([]string{"t"}, &r)
if len(tm.index) != 0 {