diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 54afcb49e..faf3be155 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -36,7 +36,7 @@ import ( var ( //separator = flag.String("separator", ",", "Default field separator") cgrConfig, _ = config.NewDefaultCGRConfig() - migrateRC8 = flag.String("migrate_rc8", "", "Migrate Accounts, Actions, ActionTriggers and DerivedChargers to RC8 structures, possible values: *all,acc,atr,act,dcs,apl") + migrateRC8 = flag.String("migrate_rc8", "", "Migrate Accounts, Actions, ActionTriggers, DerivedChargers, ActionPlans and SharedGroups to RC8 structures, possible values: *all,acc,atr,act,dcs,apl,shg") tpdb_type = flag.String("tpdb_type", cgrConfig.TpDbType, "The type of the TariffPlan database ") tpdb_host = flag.String("tpdb_host", cgrConfig.TpDbHost, "The TariffPlan host to connect to.") tpdb_port = flag.String("tpdb_port", cgrConfig.TpDbPort, "The TariffPlan port to bind to.") @@ -147,6 +147,11 @@ func main() { log.Print(err.Error()) } } + if strings.Contains(*migrateRC8, "shg") || strings.Contains(*migrateRC8, "*all") { + if err := migratorRC8rat.migrateSharedGroups(); err != nil { + log.Print(err.Error()) + } + } log.Print("Done!") return } diff --git a/cmd/cgr-loader/migrator_rc8.go b/cmd/cgr-loader/migrator_rc8.go index 421da2e40..08eca3650 100644 --- a/cmd/cgr-loader/migrator_rc8.go +++ b/cmd/cgr-loader/migrator_rc8.go @@ -145,6 +145,12 @@ func (at *ActionPlan) IsASAP() bool { return at.Timing.Timing.StartTime == utils.ASAP } +type SharedGroup struct { + Id string + AccountParameters map[string]*engine.SharingParameters + MemberIds []string +} + type ActionPlans []*ActionPlan func (mig MigratorRC8) migrateAccounts() error { @@ -519,3 +525,41 @@ func (mig MigratorRC8) migrateActionPlans() error { } return nil } + +func (mig MigratorRC8) migrateSharedGroups() error { + keys, err := mig.db.Cmd("KEYS", utils.SHARED_GROUP_PREFIX+"*").List() + if err != nil { + return err + } + newShgMap := make(map[string]*engine.SharedGroup, len(keys)) + for _, key := range keys { + log.Printf("Migrating shared groups: %s...", key) + oldShg := SharedGroup{} + var values []byte + if values, err = mig.db.Cmd("GET", key).Bytes(); err == nil { + if err := mig.ms.Unmarshal(values, &oldShg); err != nil { + return err + } + } + newShg := &engine.SharedGroup{ + Id: oldShg.Id, + AccountParameters: oldShg.AccountParameters, + MemberIds: make(utils.StringMap), + } + for _, accID := range oldShg.MemberIds { + newShg.MemberIds[accID] = true + } + newShgMap[key] = newShg + } + // write data back + for key, shg := range newShgMap { + result, err := mig.ms.Marshal(&shg) + if err != nil { + return err + } + if err = mig.db.Cmd("SET", key, result).Err; err != nil { + return err + } + } + return nil +} diff --git a/console/smg_event.go b/console/smg_event.go new file mode 100644 index 000000000..0ce6cfacd --- /dev/null +++ b/console/smg_event.go @@ -0,0 +1,87 @@ +/* +Real-time Charging System for Telecom & ISP environments +Copyright (C) 2012-2015 ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package console + +import ( + "strings" + + "github.com/cgrates/cgrates/sessionmanager" +) + +type AttrSmgEvent struct { + Method string // shoul be ignored after RPC call + sessionmanager.SMGenericEvent +} + +func init() { + c := &CmdSmgEvent{ + name: "smg_event", + } + commands[c.Name()] = c + c.CommandExecuter = &CommandExecuter{c} +} + +// Commander implementation +type CmdSmgEvent struct { + name string + rpcMethod string + rpcParams interface{} + *CommandExecuter +} + +func (self *CmdSmgEvent) Name() string { + return self.name +} + +func (self *CmdSmgEvent) RpcMethod() string { + return self.rpcMethod +} + +func (self *CmdSmgEvent) RpcParams(reset bool) interface{} { + if reset || self.rpcParams == nil { + self.rpcParams = &AttrSmgEvent{} + } + return self.rpcParams +} + +func (self *CmdSmgEvent) PostprocessRpcParams() error { + param := self.rpcParams.(*AttrSmgEvent) + self.rpcMethod = "SMGenericV1." + param.Method + self.rpcParams = param.SMGenericEvent + return nil +} + +func (self *CmdSmgEvent) RpcResult() interface{} { + methodElems := strings.Split(self.rpcMethod, ".") + if len(methodElems) != 2 { + return nil + } + switch methodElems[1] { + case "SessionEnd", "ChargeEvent", "ProcessCdr": + var s string + return &s + case "SessionStart", "SessionUpdate", "GetMaxUsage": + var f float64 + return &f + case "GetLcrSuppliers": + ss := make([]string, 0) + return ss + } + return nil +} diff --git a/engine/account.go b/engine/account.go index 747722756..b3dcb777b 100644 --- a/engine/account.go +++ b/engine/account.go @@ -149,12 +149,15 @@ func (ub *Account) debitBalanceAction(a *Action, reset bool) error { // add shared group member sg, err := ratingStorage.GetSharedGroup(sgId, false) if err != nil || sg == nil { - //than problem + //than is problem utils.Logger.Warning(fmt.Sprintf("Could not get shared group: %v", sgId)) } else { - if !utils.IsSliceMember(sg.MemberIds, ub.Id) { + if _, found := sg.MemberIds[ub.Id]; !found { // add member and save - sg.MemberIds = append(sg.MemberIds, ub.Id) + if sg.MemberIds == nil { + sg.MemberIds = make(utils.StringMap) + } + sg.MemberIds[ub.Id] = true ratingStorage.SetSharedGroup(sg) } } @@ -610,7 +613,7 @@ func (acc *Account) GetSharedGroups() (groups []string) { return } -func (account *Account) GetUniqueSharedGroupMembers(cd *CallDescriptor) ([]string, error) { +func (account *Account) GetUniqueSharedGroupMembers(cd *CallDescriptor) (utils.StringMap, error) { var balances []*Balance balances = append(balances, account.getBalancesForPrefix(cd.Destination, cd.Category, cd.Direction, utils.MONETARY, "")...) balances = append(balances, account.getBalancesForPrefix(cd.Destination, cd.Category, cd.Direction, cd.TOR, "")...) @@ -621,17 +624,15 @@ func (account *Account) GetUniqueSharedGroupMembers(cd *CallDescriptor) ([]strin sharedGroupIds = append(sharedGroupIds, sg) } } - var memberIds []string + memberIds := make(utils.StringMap) for _, sgID := range sharedGroupIds { sharedGroup, err := ratingStorage.GetSharedGroup(sgID, false) if err != nil { utils.Logger.Warning(fmt.Sprintf("Could not get shared group: %v", sgID)) return nil, err } - for _, memberId := range sharedGroup.MemberIds { - if !utils.IsSliceMember(memberIds, memberId) { - memberIds = append(memberIds, memberId) - } + for memberID := range sharedGroup.MemberIds { + memberIds[memberID] = true } } return memberIds, nil diff --git a/engine/account_test.go b/engine/account_test.go index cc3be8e1d..334836999 100644 --- a/engine/account_test.go +++ b/engine/account_test.go @@ -1111,7 +1111,7 @@ func TestDebitShared(t *testing.T) { utils.MONETARY: BalanceChain{&Balance{Uuid: "moneyc", Value: 130, SharedGroups: utils.NewStringMap("SG_TEST")}}, }} - sg := &SharedGroup{Id: "SG_TEST", MemberIds: []string{rif.Id, groupie.Id}, AccountParameters: map[string]*SharingParameters{"*any": &SharingParameters{Strategy: STRATEGY_MINE_RANDOM}}} + sg := &SharedGroup{Id: "SG_TEST", MemberIds: utils.NewStringMap(rif.Id, groupie.Id), AccountParameters: map[string]*SharingParameters{"*any": &SharingParameters{Strategy: STRATEGY_MINE_RANDOM}}} accountingStorage.SetAccount(groupie) ratingStorage.SetSharedGroup(sg) @@ -1181,7 +1181,7 @@ func TestMaxDurationShared(t *testing.T) { utils.MONETARY: BalanceChain{&Balance{Uuid: "moneyc", Value: 130, SharedGroups: utils.NewStringMap("SG_TEST")}}, }} - sg := &SharedGroup{Id: "SG_TEST", MemberIds: []string{rif.Id, groupie.Id}, AccountParameters: map[string]*SharingParameters{"*any": &SharingParameters{Strategy: STRATEGY_MINE_RANDOM}}} + sg := &SharedGroup{Id: "SG_TEST", MemberIds: utils.NewStringMap(rif.Id, groupie.Id), AccountParameters: map[string]*SharingParameters{"*any": &SharingParameters{Strategy: STRATEGY_MINE_RANDOM}}} accountingStorage.SetAccount(groupie) ratingStorage.SetSharedGroup(sg) diff --git a/engine/action.go b/engine/action.go index abd90f72c..734b86c64 100644 --- a/engine/action.go +++ b/engine/action.go @@ -48,29 +48,30 @@ type Action struct { } const ( - LOG = "*log" - RESET_TRIGGERS = "*reset_triggers" - SET_RECURRENT = "*set_recurrent" - UNSET_RECURRENT = "*unset_recurrent" - ALLOW_NEGATIVE = "*allow_negative" - DENY_NEGATIVE = "*deny_negative" - RESET_ACCOUNT = "*reset_account" - REMOVE_ACCOUNT = "*remove_account" - REMOVE_BALANCE = "*remove_balance" - TOPUP_RESET = "*topup_reset" - TOPUP = "*topup" - DEBIT_RESET = "*debit_reset" - DEBIT = "*debit" - RESET_COUNTERS = "*reset_counters" - ENABLE_ACCOUNT = "*enable_account" - DISABLE_ACCOUNT = "*disable_account" - ENABLE_DISABLE_BALANCE = "*enable_disable_balance" - CALL_URL = "*call_url" - CALL_URL_ASYNC = "*call_url_async" - MAIL_ASYNC = "*mail_async" - UNLIMITED = "*unlimited" - CDRLOG = "*cdrlog" - SET_DDESTINATIONS = "*set_ddestinations" + LOG = "*log" + RESET_TRIGGERS = "*reset_triggers" + SET_RECURRENT = "*set_recurrent" + UNSET_RECURRENT = "*unset_recurrent" + ALLOW_NEGATIVE = "*allow_negative" + DENY_NEGATIVE = "*deny_negative" + RESET_ACCOUNT = "*reset_account" + REMOVE_ACCOUNT = "*remove_account" + REMOVE_BALANCE = "*remove_balance" + TOPUP_RESET = "*topup_reset" + TOPUP = "*topup" + DEBIT_RESET = "*debit_reset" + DEBIT = "*debit" + RESET_COUNTERS = "*reset_counters" + ENABLE_ACCOUNT = "*enable_account" + DISABLE_ACCOUNT = "*disable_account" + ENABLE_DISABLE_BALANCE = "*enable_disable_balance" + CALL_URL = "*call_url" + CALL_URL_ASYNC = "*call_url_async" + MAIL_ASYNC = "*mail_async" + UNLIMITED = "*unlimited" + CDRLOG = "*cdrlog" + SET_DDESTINATIONS = "*set_ddestinations" + TRANSFER_MONETARY_DEFAULT = "*transfer_monetary_default" ) func (a *Action) Clone() *Action { @@ -133,6 +134,8 @@ func getActionFunc(typ string) (actionTypeFunc, bool) { return removeAccount, true case REMOVE_BALANCE: return removeBalance, true + case TRANSFER_MONETARY_DEFAULT: + return transferMonetaryDefault, true } return nil, false } @@ -588,6 +591,25 @@ func removeBalance(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) return accountingStorage.SetAccount(ub) } +func transferMonetaryDefault(acc *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error { + if _, exists := acc.BalanceMap[utils.MONETARY]; !exists { + return utils.ErrNotFound + } + defaultBalance := acc.GetDefaultMoneyBalance() + bChain := acc.BalanceMap[utils.MONETARY] + for _, balance := range bChain { + if balance.Uuid != defaultBalance.Uuid && + balance.Id != defaultBalance.Id { // extra caution + if balance.Value > 0 { + defaultBalance.Value += balance.Value + balance.Value = 0 + } + } + } + // update account in storage + return accountingStorage.SetAccount(acc) +} + // Structure to store actions according to weight type Actions []*Action diff --git a/engine/actions_test.go b/engine/actions_test.go index a3904b150..bccf168c6 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -1451,6 +1451,62 @@ func TestActionRemoveBalance(t *testing.T) { } } +func TestActionTransferMonetaryDefault(t *testing.T) { + err := accountingStorage.SetAccount( + &Account{ + Id: "cgrates.org:trans", + BalanceMap: map[string]BalanceChain{ + utils.MONETARY: BalanceChain{ + &Balance{ + Uuid: utils.GenUUID(), + Id: utils.META_DEFAULT, + Value: 10, + }, + &Balance{ + Uuid: utils.GenUUID(), + Value: 3, + }, + &Balance{ + Uuid: utils.GenUUID(), + Value: 6, + }, + &Balance{ + Uuid: utils.GenUUID(), + Value: -2, + }, + }, + }, + }) + if err != nil { + t.Errorf("error setting account: %v", err) + } + + a := &Action{ + ActionType: TRANSFER_MONETARY_DEFAULT, + } + + at := &ActionTiming{ + accountIDs: map[string]struct{}{"cgrates.org:trans": struct{}{}}, + actions: Actions{a}, + } + at.Execute() + + afterUb, err := accountingStorage.GetAccount("cgrates.org:trans") + if err != nil { + t.Error("account not found: ", err, afterUb) + } + if afterUb.BalanceMap[utils.MONETARY].GetTotalValue() != 17 || + afterUb.BalanceMap[utils.MONETARY][0].Value != 19 || + afterUb.BalanceMap[utils.MONETARY][1].Value != 0 || + afterUb.BalanceMap[utils.MONETARY][2].Value != 0 || + afterUb.BalanceMap[utils.MONETARY][3].Value != -2 { + for _, b := range afterUb.BalanceMap[utils.MONETARY] { + t.Logf("B: %+v", b) + } + t.Error("ransfer balance value: ", afterUb.BalanceMap[utils.MONETARY].GetTotalValue()) + } +} + /**************** Benchmarks ********************************/ func BenchmarkUUID(b *testing.B) { diff --git a/engine/calldesc.go b/engine/calldesc.go index 144c20153..00d7c801e 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -624,7 +624,7 @@ func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err e if _, err := Guardian.Guard(func() (interface{}, error) { duration, err = cd.getMaxSessionDuration(account) return 0, err - }, 0, memberIds...); err != nil { + }, 0, memberIds.Slice()...); err != nil { return 0, err } } else { @@ -681,7 +681,7 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) { _, err = Guardian.Guard(func() (interface{}, error) { cc, err = cd.debit(account, false, true) return 0, err - }, 0, memberIds...) + }, 0, memberIds.Slice()...) } else { return nil, sgerr } @@ -700,7 +700,7 @@ func (cd *CallDescriptor) FakeDebit() (cc *CallCost, err error) { _, err = Guardian.Guard(func() (interface{}, error) { cc, err = cd.debit(account, true, true) return 0, err - }, 0, memberIds...) + }, 0, memberIds.Slice()...) } else { return nil, sgerr } @@ -749,7 +749,7 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) { cc, err = cd.debit(account, false, true) //log.Print(balanceMap[0].Value, balanceMap[1].Value) return 0, err - }, 0, memberIds...) + }, 0, memberIds.Slice()...) if err != nil { return cc, err } diff --git a/engine/sharedgroup.go b/engine/sharedgroup.go index 4ec1bde23..ab8e4750b 100644 --- a/engine/sharedgroup.go +++ b/engine/sharedgroup.go @@ -40,7 +40,7 @@ const ( type SharedGroup struct { Id string AccountParameters map[string]*SharingParameters - MemberIds []string + MemberIds utils.StringMap //members []*Account // accounts caching } @@ -92,7 +92,7 @@ func (sg *SharedGroup) SortBalancesByStrategy(myBalance *Balance, bc BalanceChai // Returns all shared group's balances collected from user accounts' func (sg *SharedGroup) GetBalances(destination, category, direction, balanceType string, ub *Account) (bc BalanceChain) { // if len(sg.members) == 0 { - for _, ubId := range sg.MemberIds { + for ubId := range sg.MemberIds { var nUb *Account if ubId == ub.Id { // skip the initiating user nUb = ub diff --git a/engine/sharedgroup_test.go b/engine/sharedgroup_test.go index 52d3c3a06..b74df0846 100644 --- a/engine/sharedgroup_test.go +++ b/engine/sharedgroup_test.go @@ -21,6 +21,8 @@ package engine import ( "reflect" "testing" + + "github.com/cgrates/cgrates/utils" ) func TestSharedSetGet(t *testing.T) { @@ -30,7 +32,7 @@ func TestSharedSetGet(t *testing.T) { AccountParameters: map[string]*SharingParameters{ "test": &SharingParameters{Strategy: STRATEGY_HIGHEST}, }, - MemberIds: []string{"1", "2", "3"}, + MemberIds: utils.NewStringMap("1", "2", "3"), } err := ratingStorage.SetSharedGroup(sg) if err != nil { diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index 7c760fd24..e2b5289e9 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -108,6 +108,7 @@ func (self *SMGSession) debit(dur time.Duration) (time.Duration, error) { // Attempts to refund a duration, error on failure func (self *SMGSession) refund(refundDuration time.Duration) error { + initialRefundDuration := refundDuration lastCC := self.callCosts[len(self.callCosts)-1] lastCC.Timespans.Decompress() var refundIncrements engine.Increments @@ -157,7 +158,7 @@ func (self *SMGSession) refund(refundDuration time.Duration) error { Increments: refundIncrements, } cd.Increments.Compress() - utils.Logger.Info(fmt.Sprintf("Refunding duration %v with cd: %+v", refundDuration, cd)) + utils.Logger.Info(fmt.Sprintf("Refunding duration %v with cd: %+v", initialRefundDuration, utils.ToJSON(cd))) var response float64 err := self.rater.Call("Responder.RefundIncrements", cd, &response) if err != nil {