diff --git a/apier/v2/cdrs.go b/apier/v2/cdrs.go index 1fee6d42f..125be49b5 100644 --- a/apier/v2/cdrs.go +++ b/apier/v2/cdrs.go @@ -66,3 +66,7 @@ func (apier *ApierV2) CountCdrs(attrs utils.RPCCDRsFilter, reply *int64) error { type CdrsV2 struct { v1.CdrsV1 } + +func (self *CdrsV2) StoreSMCost(args engine.ArgsV2CDRSStoreSMCost, reply *string) error { + return self.CdrSrv.V2StoreSMCost(args, reply) +} diff --git a/engine/account.go b/engine/account.go index e3361e747..25b75ce80 100644 --- a/engine/account.go +++ b/engine/account.go @@ -1084,3 +1084,19 @@ type AccountSummary struct { AllowNegative bool Disabled bool } + +func (as *AccountSummary) Clone() (cln *AccountSummary) { + cln = new(AccountSummary) + cln.Tenant = as.Tenant + cln.ID = as.ID + cln.AllowNegative = as.AllowNegative + cln.Disabled = as.Disabled + if as.BalanceSummaries != nil { + cln.BalanceSummaries = make([]*BalanceSummary, len(as.BalanceSummaries)) + for i, bs := range as.BalanceSummaries { + cln.BalanceSummaries[i] = new(BalanceSummary) + *cln.BalanceSummaries[i] = *bs + } + } + return +} diff --git a/engine/cdrs.go b/engine/cdrs.go index 31a6092fe..65cde01d8 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -526,6 +526,50 @@ func (self *CdrServer) V1StoreSMCost(attr AttrCDRSStoreSMCost, reply *string) er return nil } +func (cdrs *CdrServer) V2StoreSMCost(args ArgsV2CDRSStoreSMCost, reply *string) error { + if args.Cost.CGRID == "" { + return utils.NewCGRError(utils.CDRSCtx, + utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing), + "SMCost: %+v with empty CGRID") + } + cacheKey := "V2StoreSMCost" + args.Cost.CGRID + args.Cost.RunID + args.Cost.OriginID + if item, err := cdrs.getCache().Get(cacheKey); err == nil && item != nil { + if item.Value != nil { + *reply = item.Value.(string) + } + return item.Err + } + cc := args.Cost.CostDetails.AsCallCost() + cc.Round() + roundIncrements := cc.GetRoundIncrements() + if len(roundIncrements) != 0 { + cd := cc.CreateCallDescriptor() + cd.CgrID = args.Cost.CGRID + cd.RunID = args.Cost.RunID + cd.Increments = roundIncrements + var response float64 + if err := cdrs.rals.Call("Responder.RefundRounding", cd, &response); err != nil { + utils.Logger.Err(fmt.Sprintf(" RefundRounding for cc: %+v, got error: %s", cc, err.Error())) + } + } + if err := cdrs.storeSMCost(&SMCost{ + CGRID: args.Cost.CGRID, + RunID: args.Cost.RunID, + OriginHost: args.Cost.OriginHost, + OriginID: args.Cost.OriginID, + CostSource: args.Cost.CostSource, + Usage: args.Cost.Usage, + CostDetails: cc, + }, args.CheckDuplicate); err != nil { + cdrs.getCache().Cache(cacheKey, &cache.CacheItem{Err: err}) + return utils.NewErrServerError(err) + } + *reply = utils.OK + cdrs.getCache().Cache(cacheKey, &cache.CacheItem{Value: *reply}) + return nil + +} + // Called by rate/re-rate API, RPC method func (self *CdrServer) V1RateCDRs(attrs utils.AttrRateCDRs, reply *string) error { cdrFltr, err := attrs.RPCCDRsFilter.AsCDRsFilter(self.cgrCfg.DefaultTimezone) diff --git a/engine/eventcost.go b/engine/eventcost.go index d4c63c1e3..d30482dc8 100644 --- a/engine/eventcost.go +++ b/engine/eventcost.go @@ -41,6 +41,14 @@ func (rfs RatingFilters) GetUUIDWithSet(rmf RatingMatchedFilters) string { return uuid } +func (rfs RatingFilters) Clone() (cln RatingFilters) { + cln = make(RatingFilters, len(rfs)) + for k, v := range rfs { + cln[k] = v.Clone() + } + return +} + type Rating map[string]*RatingUnit // GetUUIDWithSet attempts to retrieve the UUID of a matching data or create a new one @@ -59,6 +67,14 @@ func (crus Rating) GetUUIDWithSet(cru *RatingUnit) string { return uuid } +func (crus Rating) Clone() (cln Rating) { + cln = make(Rating, len(crus)) + for k, v := range crus { + cln[k] = v.Clone() + } + return +} + type ChargedRates map[string]RateGroups // GetUUIDWithSet attempts to retrieve the UUID of a matching data or create a new one @@ -77,6 +93,14 @@ func (crs ChargedRates) GetUUIDWithSet(rg RateGroups) string { return uuid } +func (crs ChargedRates) Clone() (cln ChargedRates) { + cln = make(ChargedRates, len(crs)) + for k, v := range crs { + cln[k] = v.Clone() + } + return +} + type ChargedTimings map[string]*ChargedTiming // GetUUIDWithSet attempts to retrieve the UUID of a matching data or create a new one @@ -95,6 +119,14 @@ func (cts ChargedTimings) GetUUIDWithSet(ct *ChargedTiming) string { return uuid } +func (cts ChargedTimings) Clone() (cln ChargedTimings) { + cln = make(ChargedTimings, len(cts)) + for k, v := range cts { + cln[k] = v.Clone() + } + return +} + type Accounting map[string]*BalanceCharge // GetUUIDWithSet attempts to retrieve the UUID of a matching data or create a new one @@ -113,15 +145,29 @@ func (cbs Accounting) GetUUIDWithSet(cb *BalanceCharge) string { return uuid } -func NewEventCostFromCallCost(cc *CallCost, cgrID, runID string) (ec *EventCost) { - ec = &EventCost{CGRID: cgrID, RunID: runID, - AccountSummary: cc.AccountSummary, - RatingFilters: make(RatingFilters), - Rating: make(Rating), - Rates: make(ChargedRates), - Timings: make(ChargedTimings), - Accounting: make(Accounting), +func (cbs Accounting) Clone() (cln Accounting) { + cln = make(Accounting, len(cbs)) + for k, v := range cbs { + cln[k] = v.Clone() } + return +} + +func NewBareEventCost() *EventCost { + return &EventCost{ + Rating: make(Rating), + Accounting: make(Accounting), + RatingFilters: make(RatingFilters), + Rates: make(ChargedRates), + Timings: make(ChargedTimings), + } +} + +func NewEventCostFromCallCost(cc *CallCost, cgrID, runID string) (ec *EventCost) { + ec = NewBareEventCost() + ec.CGRID = cgrID + ec.RunID = runID + ec.AccountSummary = cc.AccountSummary if len(cc.Timespans) != 0 { ec.Charges = make([]*ChargingInterval, len(cc.Timespans)) ec.StartTime = cc.Timespans[0].TimeStart @@ -183,9 +229,9 @@ func NewEventCostFromCallCost(cc *CallCost, cgrID, runID string) (ec *EventCost) type EventCost struct { CGRID string RunID string - Cost *float64 // pointer so we can nil it when dirty StartTime time.Time Usage *time.Duration + Cost *float64 // pointer so we can nil it when dirty Charges []*ChargingInterval AccountSummary *AccountSummary // Account summary at the end of the event calculation Rating Rating @@ -250,10 +296,38 @@ func (ec *EventCost) rateIntervalForRatingUUID(ratingUUID string) (ri *RateInter return } +func (ec *EventCost) Clone() (cln *EventCost) { + cln = new(EventCost) + cln.CGRID = ec.CGRID + cln.RunID = ec.RunID + cln.StartTime = ec.StartTime + if ec.Usage != nil { + cln.Usage = utils.DurationPointer(*ec.Usage) + } + if ec.Cost != nil { + cln.Cost = utils.Float64Pointer(*ec.Cost) + } + if ec.Charges != nil { + cln.Charges = make([]*ChargingInterval, len(ec.Charges)) + for i, cIl := range ec.Charges { + cln.Charges[i] = cIl.Clone() + } + } + if ec.AccountSummary != nil { + cln.AccountSummary = ec.AccountSummary.Clone() + } + cln.Rating = ec.Rating.Clone() + cln.Accounting = ec.Accounting.Clone() + cln.RatingFilters = ec.RatingFilters.Clone() + cln.Rates = ec.Rates.Clone() + cln.Timings = ec.Timings.Clone() + return +} + // Compute aggregates all the compute methods on EventCost func (ec *EventCost) Compute() { ec.ComputeUsage() - ec.ComputeUsageIndexes() + ec.ComputeEventCostUsageIndexes() ec.ComputeCost() } @@ -264,7 +338,7 @@ func (ec *EventCost) ResetCounters() { for _, cIl := range ec.Charges { cIl.cost = nil cIl.usage = nil - cIl.totalUsageIndex = nil + cIl.ecUsageIdx = nil } } @@ -293,12 +367,12 @@ func (ec *EventCost) ComputeUsage() time.Duration { return *ec.Usage } -// ComputeUsageIndexes will iterate through Chargers and populate their totalUsageIndex -func (ec *EventCost) ComputeUsageIndexes() { +// ComputeEventCostUsageIndexes will iterate through Chargers and populate their ecUsageIdx +func (ec *EventCost) ComputeEventCostUsageIndexes() { var totalUsage time.Duration for _, cIl := range ec.Charges { - if cIl.totalUsageIndex == nil { - cIl.totalUsageIndex = utils.DurationPointer(totalUsage) + if cIl.ecUsageIdx == nil { + cIl.ecUsageIdx = utils.DurationPointer(totalUsage) } totalUsage += time.Duration(cIl.Usage().Nanoseconds() * int64(cIl.CompressFactor)) } @@ -312,10 +386,10 @@ func (ec *EventCost) AsCallCost() *CallCost { for i, cIl := range ec.Charges { ts := &TimeSpan{Cost: cIl.Cost(), DurationIndex: *cIl.Usage(), CompressFactor: cIl.CompressFactor} - if cIl.totalUsageIndex == nil { // index was not populated yet - ec.ComputeUsageIndexes() + if cIl.ecUsageIdx == nil { // index was not populated yet + ec.ComputeEventCostUsageIndexes() } - ts.TimeStart = ec.StartTime.Add(*cIl.totalUsageIndex) + ts.TimeStart = ec.StartTime.Add(*cIl.ecUsageIdx) ts.TimeEnd = ts.TimeStart.Add( time.Duration(cIl.Usage().Nanoseconds() * int64(cIl.CompressFactor))) if cIl.RatingUUID != "" { @@ -408,25 +482,113 @@ func (ec *EventCost) Merge(ecs ...*EventCost) { ec.Cost = nil } -/* -// Cut will cut the EventCost on specifiedTime at ChargingIncrement level, returning the surplus -func (ec *EventCost) Trim(atTime time.Time) (surplus *EventCost) { - var limitIndex int - for i, cIl := range ec.Charges { - if cIl.StartTime > +// Trim will cut the EventCost at specific duration +// returns the srplusEC as separate EventCost +func (ec *EventCost) Trim(atUsage time.Duration) (srplusEC *EventCost, err error) { + if ec.Usage == nil { + ec.ComputeUsage() } + if atUsage >= *ec.Usage { + return // no trim + } + if atUsage == 0 { + srplusEC = ec + ec = NewBareEventCost() + ec.CGRID = srplusEC.CGRID + ec.RunID = srplusEC.RunID + ec.StartTime = srplusEC.StartTime + ec.AccountSummary = srplusEC.AccountSummary.Clone() + return // trim all, fresh EC with 0 usage + } + /* + var lastActiveCIlIdx *int // marks last index which should stay with ec + for i, cIl := range ec.Charges { + if cIl.ecUsageIdx == nil { + ec.ComputeEventCostUsageIndexes() + } + if *cIl.ecUsageIdx >= atUsage { + lastActiveCIlIdx = utils.IntPointer(i - 1) + break + } + } + if lastActiveCIlIdx == nil { + return + } + if *lastActiveCIlIdx == -1 { // trim full usage + srplusEC = NewBareEventCost() + *srplusEC = *ec // no need of cloning since we will not keep info in ec + ec = NewBareEventCost() + ec.CGRID = srplusEC.CGRID + ec.RunID = srplusEC.RunID + return + } + /* + lastActiveCIl := ec.Charges[lastActiveCIlIdx] + if lastActiveCI.ecUsageIdx >= atUsage { + return nil, errors.New("failed detecting last active ChargingInterval") + } else if lastActiveCI.CompressFactor == 0 { + return nil, errors.New("ChargingInterval with 0 compressFactor") + } + + srplsCIl := new(ChargingInterval) + srplsCIl.RatingUUID = lastActiveCIl.RatingUUID + if lastActiveCI.CompressFactor != 1 { + var laCF int + for ciCnt := 1; ciCnt <= lastActiveCI.CompressFactor; ciCnt++ { + if *lastActiveCI.ecUsageIdx.Add( + time.Duration(lastActiveCI.Usage.Nanoseconds() * int64(ciCnt))) > atUsage { + laCF = ciCnt + break + } + } + if laCF == 0 { + return nil, errors.New("cannot detect last active CompressFactor in ChargingInterval") + } + lastActiveCIl.CompressFactor = laCF // this factor will stay uncompressed + + } + + var lastActiveCItIdx *int + cIlUIdx := ec.Charges[lastActiveCIlIdx].ecUsageIdx + for i, cIt := range ec.Charges[lastActiveCIlIdx].Increments { + if cIlUIdx.Add(cIt.Usage) > atUsage { + lastActiveCItIdx = utils.IntPointer(i) + break + } + } + if lastActiveCItIdx == nil { // bug in increments + return nil, errors.New("no active increment found") + } + ec.ResetCounters() // avoid stale counters + var ciUncompressed bool // marks whether we needed to uncomrpess the last ChargingInterval + + + if ec.Charges[lastActiveCIlIdx]. + srplusEC = NewBareEventCost() + srplusEC.CGRID = ec.CGRID + srplusEC.RunID = ec.RunID + var laCIlIncrmts []*ChargingIncrement // surplus increments in the last active ChargingInterval + for _, cIl := range ec.Charges[lastActiveCIlIdx].Increments[*lastActiveCItIdx+1:] { + laCIlIncrmts = append(laCIlIncrmts, cIl) + } + srplusEC.Charges = []*ChargingInterval{} + ec.Charges[lastActiveCIlIdx].Increments = ec.Charges[lastActiveCIlIdx].Increments[:*lastActiveCItIdx+1] // remove srplusEC increments out of last active CIl + for _, cIl := range ec.Charges[lastActiveCIlIdx+1:] { + + } + */ + return } -*/ // ChargingInterval represents one interval out of Usage providing charging info // eg: PEAK vs OFFPEAK type ChargingInterval struct { - RatingUUID string // reference to RatingUnit - Increments []*ChargingIncrement // specific increments applied to this interval - CompressFactor int - usage *time.Duration // cache usage computation for this interval - totalUsageIndex *time.Duration // computed value of totalUsage at the starting of the interval - cost *float64 // cache cost calculation on this interval + RatingUUID string // reference to RatingUnit + Increments []*ChargingIncrement // specific increments applied to this interval + CompressFactor int + usage *time.Duration // cache usage computation for this interval + ecUsageIdx *time.Duration // computed value of totalUsage at the starting of the interval + cost *float64 // cache cost calculation on this interval } @@ -457,15 +619,26 @@ func (cIl *ChargingInterval) Usage() *time.Duration { return cIl.usage } -// TotalUsageIndex publishes the value of totalUsageIndex -func (cIl *ChargingInterval) TotalUsageIndex() *time.Duration { - return cIl.totalUsageIndex +// TotalUsage returns the total usage of this interval, considering compress factor +func (cIl *ChargingInterval) TotalUsage() (tu *time.Duration) { + usage := cIl.Usage() + if usage == nil { + return + } + tu = new(time.Duration) + *tu = time.Duration(usage.Nanoseconds() * int64(cIl.CompressFactor)) + return } -// StartTime computes a StartTime based on EventCost.Start time and totalUsageIndex +// EventCostUsageIndex publishes the value of ecUsageIdx +func (cIl *ChargingInterval) EventCostUsageIndex() *time.Duration { + return cIl.ecUsageIdx +} + +// StartTime computes a StartTime based on EventCost.Start time and ecUsageIdx func (cIl *ChargingInterval) StartTime(ecST time.Time) (st time.Time) { - if cIl.totalUsageIndex != nil { - st = ecST.Add(*cIl.totalUsageIndex) + if cIl.ecUsageIdx != nil { + st = ecST.Add(*cIl.ecUsageIdx) } return } @@ -488,6 +661,18 @@ func (cIl *ChargingInterval) Cost() float64 { return *cIl.cost } +// Clone returns a new instance of ChargingInterval with independent data +func (cIl *ChargingInterval) Clone() (cln *ChargingInterval) { + cln = new(ChargingInterval) + cln.RatingUUID = cIl.RatingUUID + cln.CompressFactor = cIl.CompressFactor + cln.Increments = make([]*ChargingIncrement, len(cIl.Increments)) + for i, cIt := range cIl.Increments { + cln.Increments[i] = cIt.Clone() + } + return +} + // ChargingIncrement represents one unit charged inside an interval type ChargingIncrement struct { Usage time.Duration @@ -503,6 +688,17 @@ func (cIt *ChargingIncrement) Equals(oCIt *ChargingIncrement) bool { cIt.CompressFactor == oCIt.CompressFactor } +func (cIt *ChargingIncrement) Clone() (cln *ChargingIncrement) { + cln = new(ChargingIncrement) + *cln = *cIt + return +} + +// TotalUsage returns the total usage of the increment, considering compress factor +func (cIt *ChargingIncrement) TotalUsage() time.Duration { + return time.Duration(cIt.Usage.Nanoseconds() * int64(cIt.CompressFactor)) +} + // BalanceCharge represents one unit charged to a balance type BalanceCharge struct { AccountID string // keep reference for shared balances @@ -539,6 +735,12 @@ func (rf RatingMatchedFilters) Equals(oRF RatingMatchedFilters) (equals bool) { return } +func (rf RatingMatchedFilters) Clone() (cln map[string]interface{}) { + cln = make(map[string]interface{}) + utils.Clone(rf, &cln) + return +} + // ChargedTiming represents one timing attached to a charge type ChargedTiming struct { Years utils.Years @@ -556,6 +758,12 @@ func (ct *ChargedTiming) Equals(oCT *ChargedTiming) bool { ct.StartTime == oCT.StartTime } +func (ct *ChargedTiming) Clone() (cln *ChargedTiming) { + cln = new(ChargedTiming) + *cln = *ct + return +} + // RatingUnit represents one unit out of RatingPlan matching for an event type RatingUnit struct { ConnectFee float64 @@ -579,8 +787,8 @@ func (ru *RatingUnit) Equals(oRU *RatingUnit) bool { ru.RatingFiltersUUID == oRU.RatingFiltersUUID } -func (ru *RatingUnit) Clone() *RatingUnit { - clnRU := new(RatingUnit) - *clnRU = *ru - return clnRU +func (ru *RatingUnit) Clone() (cln *RatingUnit) { + cln = new(RatingUnit) + *cln = *ru + return } diff --git a/engine/eventcost_test.go b/engine/eventcost_test.go index 1d3c14121..334544532 100644 --- a/engine/eventcost_test.go +++ b/engine/eventcost_test.go @@ -234,10 +234,10 @@ func TestNewEventCostFromCallCost(t *testing.T) { CompressFactor: 30, }, }, - CompressFactor: 1, - usage: utils.DurationPointer(time.Duration(60 * time.Second)), - cost: utils.Float64Pointer(0.25), - totalUsageIndex: utils.DurationPointer(time.Duration(0)), + CompressFactor: 1, + usage: utils.DurationPointer(time.Duration(60 * time.Second)), + cost: utils.Float64Pointer(0.25), + ecUsageIdx: utils.DurationPointer(time.Duration(0)), }, &ChargingInterval{ RatingUUID: "f2518464-68b8-42f4-acec-aef23d714314", @@ -249,10 +249,10 @@ func TestNewEventCostFromCallCost(t *testing.T) { CompressFactor: 60, }, }, - CompressFactor: 1, - usage: utils.DurationPointer(time.Duration(60 * time.Second)), - cost: utils.Float64Pointer(0.6), - totalUsageIndex: utils.DurationPointer(time.Duration(60 * time.Second)), + CompressFactor: 1, + usage: utils.DurationPointer(time.Duration(60 * time.Second)), + cost: utils.Float64Pointer(0.6), + ecUsageIdx: utils.DurationPointer(time.Duration(60 * time.Second)), }, }, Rating: Rating{ @@ -349,7 +349,7 @@ func TestNewEventCostFromCallCost(t *testing.T) { if len(ec.Charges) != len(eEC.Charges) { t.Errorf("Expecting: %+v, received: %+v", eEC, ec) } - ec.ComputeUsageIndexes() + ec.ComputeEventCostUsageIndexes() for i := range ec.Charges { // Make sure main rating is correct if cc.Timespans[i].RateInterval.Rating != nil && @@ -378,9 +378,9 @@ func TestNewEventCostFromCallCost(t *testing.T) { t.Errorf("Expecting: %f, received: %f", eEC.Charges[i].Cost(), ec.Charges[i].Cost()) } - if !reflect.DeepEqual(eEC.Charges[i].totalUsageIndex, ec.Charges[i].totalUsageIndex) { + if !reflect.DeepEqual(eEC.Charges[i].ecUsageIdx, ec.Charges[i].ecUsageIdx) { t.Errorf("Expecting: %v, received: %v", - eEC.Charges[i].totalUsageIndex, ec.Charges[i].totalUsageIndex) + eEC.Charges[i].ecUsageIdx, ec.Charges[i].ecUsageIdx) } cIlStartTime := ec.Charges[i].StartTime(ec.StartTime) if !cc.Timespans[i].TimeStart.Equal(cIlStartTime) { @@ -717,3 +717,166 @@ func TestEventCostAsCallCost(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eCC), utils.ToJSON(cc)) } } + +func TestEventCostTrim(t *testing.T) { + acntSummary := &AccountSummary{ + Tenant: "cgrates.org", + ID: "dan", + BalanceSummaries: []*BalanceSummary{ + &BalanceSummary{ + Type: "*monetary", + Value: 50, + Disabled: false}, + &BalanceSummary{ + ID: "4b8b53d7-c1a1-4159-b845-4623a00a0165", + Type: "*monetary", + Value: 25, + Disabled: false}, + &BalanceSummary{ + Type: "*voice", + Value: 200, + Disabled: false, + }, + }, + AllowNegative: false, + Disabled: false, + } + ec := &EventCost{ + CGRID: "164b0422fdc6a5117031b427439482c6a4f90e41", + RunID: utils.META_DEFAULT, + StartTime: time.Date(2017, 1, 9, 16, 18, 21, 0, time.UTC), + Cost: utils.Float64Pointer(2.05), + Usage: utils.DurationPointer(time.Duration(4 * time.Minute)), + Charges: []*ChargingInterval{ + &ChargingInterval{ + RatingUUID: "f2518464-68b8-42f4-acec-aef23d714314", + Increments: []*ChargingIncrement{ + &ChargingIncrement{ + Usage: time.Duration(0), + Cost: 0.1, + BalanceChargeUUID: "44e97dec-8a7e-43d0-8b0a-736d46b5613e", + CompressFactor: 1, + }, + &ChargingIncrement{ + Usage: time.Duration(1 * time.Second), + Cost: 0, + BalanceChargeUUID: "a555cde8-4bd0-408a-afbc-c3ba64888927", + CompressFactor: 30, + }, + &ChargingIncrement{ + Usage: time.Duration(1 * time.Second), + Cost: 0.005, + BalanceChargeUUID: "906bfd0f-035c-40a3-93a8-46f71627983e", + CompressFactor: 30, + }, + }, + CompressFactor: 1, + }, + &ChargingInterval{ + RatingUUID: "f2518464-68b8-42f4-acec-aef23d714314", + Increments: []*ChargingIncrement{ + &ChargingIncrement{ + Usage: time.Duration(1 * time.Second), + Cost: 0.01, + BalanceChargeUUID: "c890a899-df43-497a-9979-38492713f57b", + CompressFactor: 60, + }, + }, + CompressFactor: 3, + }, + }, + AccountSummary: acntSummary, + Rating: Rating{ + "4607d907-02c3-4f2b-bc08-95a0dcc7222c": &RatingUnit{ + RoundingMethod: "*up", + RoundingDecimals: 5, + TimingUUID: "27f1e5f8-05bb-4f1c-a596-bf1010ad296c", + RatesUUID: "e5eb0f1c-3612-4e8c-b749-7f8f41dd90d4", + RatingFiltersUUID: "7e73a00d-be53-4083-a1ee-8ee0b546c62a", + }, + "f2518464-68b8-42f4-acec-aef23d714314": &RatingUnit{ + ConnectFee: 0.1, + RoundingMethod: "*up", + RoundingDecimals: 5, + TimingUUID: "27f1e5f8-05bb-4f1c-a596-bf1010ad296c", + RatesUUID: "6504fb84-6b27-47a8-a1c6-c0d843959f89", + RatingFiltersUUID: "7e73a00d-be53-4083-a1ee-8ee0b546c62a", + }, + }, + Accounting: Accounting{ + "c890a899-df43-497a-9979-38492713f57b": &BalanceCharge{ + AccountID: "cgrates.org:dan", + BalanceUUID: "8c54a9e9-d610-4c82-bcb5-a315b9a65010", + Units: 0.01, + }, + "a894f8f1-206a-4457-99ce-df21a0c7fedc": &BalanceCharge{ + AccountID: "cgrates.org:dan", + BalanceUUID: "8c54a9e9-d610-4c82-bcb5-a315b9a65010", + Units: 0.005, + }, + "44e97dec-8a7e-43d0-8b0a-736d46b5613e": &BalanceCharge{ + AccountID: "cgrates.org:dan", + BalanceUUID: "8c54a9e9-d610-4c82-bcb5-a315b9a65010", + Units: 0.1, + }, + "906bfd0f-035c-40a3-93a8-46f71627983e": &BalanceCharge{ + AccountID: "cgrates.org:dan", + BalanceUUID: "7a54a9e9-d610-4c82-bcb5-a315b9a65010", + RatingUUID: "4607d907-02c3-4f2b-bc08-95a0dcc7222c", + Units: 1, + ExtraChargeUUID: "a894f8f1-206a-4457-99ce-df21a0c7fedc", + }, + "a555cde8-4bd0-408a-afbc-c3ba64888927": &BalanceCharge{ + AccountID: "cgrates.org:dan", + BalanceUUID: "9d54a9e9-d610-4c82-bcb5-a315b9a65089", + Units: 1, + ExtraChargeUUID: "*none", + }, + }, + RatingFilters: RatingFilters{ + "7e73a00d-be53-4083-a1ee-8ee0b546c62a": RatingMatchedFilters{ + "DestinationID": "GERMANY", + "DestinationPrefix": "+49", + "RatingPlanID": "RPL_RETAIL1", + "Subject": "*out:cgrates.org:call:*any", + }, + }, + Rates: ChargedRates{ + "6504fb84-6b27-47a8-a1c6-c0d843959f89": RateGroups{ + &Rate{ + GroupIntervalStart: time.Duration(0), + Value: 0.01, + RateIncrement: time.Duration(1 * time.Minute), + RateUnit: time.Duration(1 * time.Second)}, + }, + "e5eb0f1c-3612-4e8c-b749-7f8f41dd90d4": RateGroups{ + &Rate{ + GroupIntervalStart: time.Duration(0), + Value: 0.005, + RateIncrement: time.Duration(1 * time.Second), + RateUnit: time.Duration(1 * time.Second)}, + &Rate{ + GroupIntervalStart: time.Duration(60 * time.Second), + Value: 0.005, + RateIncrement: time.Duration(1 * time.Second), + RateUnit: time.Duration(1 * time.Second)}, + }, + }, + Timings: ChargedTimings{ + "27f1e5f8-05bb-4f1c-a596-bf1010ad296c": &ChargedTiming{ + StartTime: "00:00:00", + }, + }, + } + origEC := ec.Clone() + if srplsEC, err := ec.Trim(time.Duration(4 * time.Minute)); err != nil { + t.Error(err) + } else if srplsEC != nil { + t.Errorf("Expecting nil, got: %+v", srplsEC) + } + if srplsEC, err := ec.Trim(time.Duration(0)); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(origEC, srplsEC) { + t.Errorf("Expecting: %s,\n received: %s", utils.ToJSON(origEC), utils.ToJSON(srplsEC)) + } +} diff --git a/engine/rateinterval.go b/engine/rateinterval.go index 0823b0876..91bebe5c4 100644 --- a/engine/rateinterval.go +++ b/engine/rateinterval.go @@ -295,6 +295,15 @@ func (pg RateGroups) Equals(oRG RateGroups) bool { return true } +func (pg RateGroups) Clone() (cln RateGroups) { + cln = make(RateGroups, len(pg)) + for i, rt := range pg { + cln[i] = new(Rate) + *cln[i] = *rt + } + return +} + /* Returns true if the received time result inside the interval */ diff --git a/engine/storage_utils.go b/engine/storage_utils.go index 5839d2179..7ee9b9b87 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -141,7 +141,7 @@ type AttrCDRSStoreSMCost struct { CheckDuplicate bool } -type ArgsCDRSStoreSMCost struct { +type ArgsV2CDRSStoreSMCost struct { Cost *V2SMCost CheckDuplicate bool } diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index c00ab9465..0e6b38cd6 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -44,7 +44,7 @@ type SMGSession struct { EventStart SMGenericEvent // Event which started the session CD *engine.CallDescriptor // initial CD used for debits, updated on each debit - CallCosts []*engine.CallCost + EventCost *engine.EventCost ExtraDuration time.Duration // keeps the current duration debited on top of what heas been asked LastUsage time.Duration // last requested Duration LastDebit time.Duration // last real debited duration @@ -136,15 +136,88 @@ func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time. self.CD.DurationIndex += ccDuration self.CD.MaxCostSoFar += cc.Cost self.CD.LoopIndex += 1 - self.CallCosts = append(self.CallCosts, cc) self.LastDebit = initialExtraDuration + ccDuration self.TotalUsage += self.LastUsage + ec := engine.NewEventCostFromCallCost(self.CGRID, self.RunID) + if self.EventCost == nil { + self.EventCost = ec + } else { + self.EventCost.Merge(ec) + } if ccDuration < dur { return initialExtraDuration + ccDuration, nil } return requestedDuration, nil } +// Send disconnect order to remote connection +func (self *SMGSession) disconnectSession(reason string) error { + self.EventStart[utils.USAGE] = strconv.FormatFloat(self.TotalUsage.Seconds(), 'f', -1, 64) // Set the usage to total one debitted + if self.clntConn == nil || reflect.ValueOf(self.clntConn).IsNil() { + return errors.New("Calling SMGClientV1.DisconnectSession requires bidirectional JSON connection") + } + var reply string + if err := self.clntConn.Call("SMGClientV1.DisconnectSession", utils.AttrDisconnectSession{EventStart: self.EventStart, Reason: reason}, &reply); err != nil { + return err + } else if reply != utils.OK { + return errors.New(fmt.Sprintf("Unexpected disconnect reply: %s", reply)) + } + return nil +} + +// Session has ended, check debits and refund the extra charged duration +func (self *SMGSession) close(endTime time.Time) (err error) { + self.mux.Lock() + defer self.mux.Unlock() + if self.EventCost != nil { // We have had at least one cost calculation + chargedEndTime := self.EventCost.Charges[len(self.EventCost.Charges)-1].GetEndTime() + if endTime.After(chargedEndTime) { // we did not charge enough, make a manual debit here + extraDur := endTime.Sub(chargedEndTime) + if self.CD.LoopIndex > 0 { + self.CD.TimeStart = self.CD.TimeEnd + } + self.CD.TimeEnd = self.CD.TimeStart.Add(extraDur) + self.CD.DurationIndex += extraDur + cc := &engine.CallCost{} + if err = self.rals.Call("Responder.Debit", self.CD, cc); err == nil { + self.EventCost.Merge( + engine.NewEventCostFromCallCost(self.CGRID, self.RunID)) + } + } else { + err = self.refund(chargedEndTime.Sub(endTime)) + } + } + return +} + +// storeSMCost will send the SMCost to CDRs for storing +func (self *SMGSession) storeSMCost() error { + if self.EventCost == nil { + return nil // There are no costs to save, ignore the operation + } + self.mux.Lock() + self.mux.Unlock() + smCost := &engine.V2SMCost{ + CGRID: self.CGRID, + CostSource: utils.SESSION_MANAGER_SOURCE, + RunID: self.RunID, + OriginHost: self.EventStart.GetOriginatorIP(utils.META_DEFAULT), + OriginID: self.EventStart.GetOriginID(utils.META_DEFAULT), + Usage: self.TotalUsage.Seconds(), + CostDetails: self.EventCost, + } + var reply string + if err := self.cdrsrv.Call("CdrsV2.StoreSMCost", engine.ArgsV2CDRSStoreSMCost{Cost: smCost, + CheckDuplicate: true}, &reply); err != nil { + if err == utils.ErrExists { + self.refund(self.CD.GetDuration()) // Refund entire duration + } else { + return err + } + } + return nil +} + // Attempts to refund a duration, error on failure func (self *SMGSession) refund(refundDuration time.Duration) error { if refundDuration == 0 { // Nothing to refund @@ -210,120 +283,6 @@ func (self *SMGSession) refund(refundDuration time.Duration) error { return nil } -// mergeCCs will merge the CallCosts recorded for this session -func (self *SMGSession) mergeCCs() { - if len(self.CallCosts) != 0 { // We have had at least one cost calculation - firstCC := self.CallCosts[0] - for _, cc := range self.CallCosts[1:] { - firstCC.Merge(cc) - } - } -} - -// Session has ended, check debits and refund the extra charged duration -func (self *SMGSession) close(endTime time.Time) (err error) { - self.mux.Lock() - defer self.mux.Unlock() - if len(self.CallCosts) != 0 { // We have had at least one cost calculation - chargedEndTime := self.CallCosts[len(self.CallCosts)-1].GetEndTime() - if endTime.After(chargedEndTime) { // we did not charge enough, make a manual debit here - extraDur := endTime.Sub(chargedEndTime) - if self.CD.LoopIndex > 0 { - self.CD.TimeStart = self.CD.TimeEnd - } - self.CD.TimeEnd = self.CD.TimeStart.Add(extraDur) - self.CD.DurationIndex += extraDur - cc := &engine.CallCost{} - if err = self.rals.Call("Responder.Debit", self.CD, cc); err == nil { - self.CallCosts = append(self.CallCosts, cc) - self.mergeCCs() // merge again so we can store the right value in db - } - } else { - self.mergeCCs() - err = self.refund(chargedEndTime.Sub(endTime)) - } - } - return -} - -// Send disconnect order to remote connection -func (self *SMGSession) disconnectSession(reason string) error { - self.EventStart[utils.USAGE] = strconv.FormatFloat(self.TotalUsage.Seconds(), 'f', -1, 64) // Set the usage to total one debitted - if self.clntConn == nil || reflect.ValueOf(self.clntConn).IsNil() { - return errors.New("Calling SMGClientV1.DisconnectSession requires bidirectional JSON connection") - } - var reply string - if err := self.clntConn.Call("SMGClientV1.DisconnectSession", utils.AttrDisconnectSession{EventStart: self.EventStart, Reason: reason}, &reply); err != nil { - return err - } else if reply != utils.OK { - return errors.New(fmt.Sprintf("Unexpected disconnect reply: %s", reply)) - } - return nil -} - -// Merge the sum of costs and sends it to CDRS for storage -// originID could have been changed from original event, hence passing as argument here -// pass cc as the clone of original to avoid concurrency issues -func (self *SMGSession) saveOperations(cgrID string) error { - if len(self.CallCosts) == 0 { - return nil // There are no costs to save, ignore the operation - } - self.mux.Lock() - self.mux.Unlock() - cc := self.CallCosts[0] // was merged in close method - cc.Round() - roundIncrements := cc.GetRoundIncrements() - if len(roundIncrements) != 0 { - cd := cc.CreateCallDescriptor() - cd.CgrID = self.CD.CgrID - cd.RunID = self.CD.RunID - cd.Increments = roundIncrements - var response float64 - if err := self.rals.Call("Responder.RefundRounding", cd, &response); err != nil { - return err - } - } - smCost := &engine.SMCost{ - CGRID: self.CGRID, - CostSource: utils.SESSION_MANAGER_SOURCE, - RunID: self.RunID, - OriginHost: self.EventStart.GetOriginatorIP(utils.META_DEFAULT), - OriginID: self.EventStart.GetOriginID(utils.META_DEFAULT), - Usage: self.TotalUsage.Seconds(), - CostDetails: cc, - } - if len(smCost.CostDetails.Timespans) > MaxTimespansInCost { // Merge since we will get a callCost too big - if err := utils.Clone(cc, &smCost.CostDetails); err != nil { // Avoid concurrency on CC - utils.Logger.Err(fmt.Sprintf(" Could not clone callcost for sessionID: %s, RunID: %s, error: %s", cgrID, self.RunID, err.Error())) - } - go func(smCost *engine.SMCost) { // could take longer than the locked stage - if err := self.storeSMCost(smCost); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not store callcost for sessionID: %s, RunID: %s, error: %s", cgrID, self.RunID, err.Error())) - } - }(smCost) - } else { - return self.storeSMCost(smCost) - } - return nil -} - -func (self *SMGSession) storeSMCost(smCost *engine.SMCost) error { - if len(smCost.CostDetails.Timespans) > MaxTimespansInCost { // Merge so we can compress the CostDetails - smCost.CostDetails.Timespans.Decompress() - smCost.CostDetails.Timespans.Merge() - smCost.CostDetails.Timespans.Compress() - } - var reply string - if err := self.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, CheckDuplicate: true}, &reply); err != nil { - if err == utils.ErrExists { - self.refund(self.CD.GetDuration()) // Refund entire duration - } else { - return err - } - } - return nil -} - func (self *SMGSession) AsActiveSession(timezone string) *ActiveSession { self.mux.RLock() defer self.mux.RUnlock()