Merge branch 'hapool' of https://github.com/cgrates/cgrates into hapool

This commit is contained in:
DanB
2016-01-17 18:52:37 +01:00
11 changed files with 261 additions and 43 deletions

View File

@@ -36,7 +36,7 @@ import (
var (
//separator = flag.String("separator", ",", "Default field separator")
cgrConfig, _ = config.NewDefaultCGRConfig()
migrateRC8 = flag.String("migrate_rc8", "", "Migrate Accounts, Actions, ActionTriggers and DerivedChargers to RC8 structures, possible values: *all,acc,atr,act,dcs,apl")
migrateRC8 = flag.String("migrate_rc8", "", "Migrate Accounts, Actions, ActionTriggers, DerivedChargers, ActionPlans and SharedGroups to RC8 structures, possible values: *all,acc,atr,act,dcs,apl,shg")
tpdb_type = flag.String("tpdb_type", cgrConfig.TpDbType, "The type of the TariffPlan database <redis>")
tpdb_host = flag.String("tpdb_host", cgrConfig.TpDbHost, "The TariffPlan host to connect to.")
tpdb_port = flag.String("tpdb_port", cgrConfig.TpDbPort, "The TariffPlan port to bind to.")
@@ -147,6 +147,11 @@ func main() {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "shg") || strings.Contains(*migrateRC8, "*all") {
if err := migratorRC8rat.migrateSharedGroups(); err != nil {
log.Print(err.Error())
}
}
log.Print("Done!")
return
}

View File

@@ -145,6 +145,12 @@ func (at *ActionPlan) IsASAP() bool {
return at.Timing.Timing.StartTime == utils.ASAP
}
type SharedGroup struct {
Id string
AccountParameters map[string]*engine.SharingParameters
MemberIds []string
}
type ActionPlans []*ActionPlan
func (mig MigratorRC8) migrateAccounts() error {
@@ -519,3 +525,41 @@ func (mig MigratorRC8) migrateActionPlans() error {
}
return nil
}
func (mig MigratorRC8) migrateSharedGroups() error {
keys, err := mig.db.Cmd("KEYS", utils.SHARED_GROUP_PREFIX+"*").List()
if err != nil {
return err
}
newShgMap := make(map[string]*engine.SharedGroup, len(keys))
for _, key := range keys {
log.Printf("Migrating shared groups: %s...", key)
oldShg := SharedGroup{}
var values []byte
if values, err = mig.db.Cmd("GET", key).Bytes(); err == nil {
if err := mig.ms.Unmarshal(values, &oldShg); err != nil {
return err
}
}
newShg := &engine.SharedGroup{
Id: oldShg.Id,
AccountParameters: oldShg.AccountParameters,
MemberIds: make(utils.StringMap),
}
for _, accID := range oldShg.MemberIds {
newShg.MemberIds[accID] = true
}
newShgMap[key] = newShg
}
// write data back
for key, shg := range newShgMap {
result, err := mig.ms.Marshal(&shg)
if err != nil {
return err
}
if err = mig.db.Cmd("SET", key, result).Err; err != nil {
return err
}
}
return nil
}

87
console/smg_event.go Normal file
View File

@@ -0,0 +1,87 @@
/*
Real-time Charging System for Telecom & ISP environments
Copyright (C) 2012-2015 ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package console
import (
"strings"
"github.com/cgrates/cgrates/sessionmanager"
)
type AttrSmgEvent struct {
Method string // shoul be ignored after RPC call
sessionmanager.SMGenericEvent
}
func init() {
c := &CmdSmgEvent{
name: "smg_event",
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
}
// Commander implementation
type CmdSmgEvent struct {
name string
rpcMethod string
rpcParams interface{}
*CommandExecuter
}
func (self *CmdSmgEvent) Name() string {
return self.name
}
func (self *CmdSmgEvent) RpcMethod() string {
return self.rpcMethod
}
func (self *CmdSmgEvent) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &AttrSmgEvent{}
}
return self.rpcParams
}
func (self *CmdSmgEvent) PostprocessRpcParams() error {
param := self.rpcParams.(*AttrSmgEvent)
self.rpcMethod = "SMGenericV1." + param.Method
self.rpcParams = param.SMGenericEvent
return nil
}
func (self *CmdSmgEvent) RpcResult() interface{} {
methodElems := strings.Split(self.rpcMethod, ".")
if len(methodElems) != 2 {
return nil
}
switch methodElems[1] {
case "SessionEnd", "ChargeEvent", "ProcessCdr":
var s string
return &s
case "SessionStart", "SessionUpdate", "GetMaxUsage":
var f float64
return &f
case "GetLcrSuppliers":
ss := make([]string, 0)
return ss
}
return nil
}

View File

@@ -149,12 +149,15 @@ func (ub *Account) debitBalanceAction(a *Action, reset bool) error {
// add shared group member
sg, err := ratingStorage.GetSharedGroup(sgId, false)
if err != nil || sg == nil {
//than problem
//than is problem
utils.Logger.Warning(fmt.Sprintf("Could not get shared group: %v", sgId))
} else {
if !utils.IsSliceMember(sg.MemberIds, ub.Id) {
if _, found := sg.MemberIds[ub.Id]; !found {
// add member and save
sg.MemberIds = append(sg.MemberIds, ub.Id)
if sg.MemberIds == nil {
sg.MemberIds = make(utils.StringMap)
}
sg.MemberIds[ub.Id] = true
ratingStorage.SetSharedGroup(sg)
}
}
@@ -610,7 +613,7 @@ func (acc *Account) GetSharedGroups() (groups []string) {
return
}
func (account *Account) GetUniqueSharedGroupMembers(cd *CallDescriptor) ([]string, error) {
func (account *Account) GetUniqueSharedGroupMembers(cd *CallDescriptor) (utils.StringMap, error) {
var balances []*Balance
balances = append(balances, account.getBalancesForPrefix(cd.Destination, cd.Category, cd.Direction, utils.MONETARY, "")...)
balances = append(balances, account.getBalancesForPrefix(cd.Destination, cd.Category, cd.Direction, cd.TOR, "")...)
@@ -621,17 +624,15 @@ func (account *Account) GetUniqueSharedGroupMembers(cd *CallDescriptor) ([]strin
sharedGroupIds = append(sharedGroupIds, sg)
}
}
var memberIds []string
memberIds := make(utils.StringMap)
for _, sgID := range sharedGroupIds {
sharedGroup, err := ratingStorage.GetSharedGroup(sgID, false)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("Could not get shared group: %v", sgID))
return nil, err
}
for _, memberId := range sharedGroup.MemberIds {
if !utils.IsSliceMember(memberIds, memberId) {
memberIds = append(memberIds, memberId)
}
for memberID := range sharedGroup.MemberIds {
memberIds[memberID] = true
}
}
return memberIds, nil

View File

@@ -1111,7 +1111,7 @@ func TestDebitShared(t *testing.T) {
utils.MONETARY: BalanceChain{&Balance{Uuid: "moneyc", Value: 130, SharedGroups: utils.NewStringMap("SG_TEST")}},
}}
sg := &SharedGroup{Id: "SG_TEST", MemberIds: []string{rif.Id, groupie.Id}, AccountParameters: map[string]*SharingParameters{"*any": &SharingParameters{Strategy: STRATEGY_MINE_RANDOM}}}
sg := &SharedGroup{Id: "SG_TEST", MemberIds: utils.NewStringMap(rif.Id, groupie.Id), AccountParameters: map[string]*SharingParameters{"*any": &SharingParameters{Strategy: STRATEGY_MINE_RANDOM}}}
accountingStorage.SetAccount(groupie)
ratingStorage.SetSharedGroup(sg)
@@ -1181,7 +1181,7 @@ func TestMaxDurationShared(t *testing.T) {
utils.MONETARY: BalanceChain{&Balance{Uuid: "moneyc", Value: 130, SharedGroups: utils.NewStringMap("SG_TEST")}},
}}
sg := &SharedGroup{Id: "SG_TEST", MemberIds: []string{rif.Id, groupie.Id}, AccountParameters: map[string]*SharingParameters{"*any": &SharingParameters{Strategy: STRATEGY_MINE_RANDOM}}}
sg := &SharedGroup{Id: "SG_TEST", MemberIds: utils.NewStringMap(rif.Id, groupie.Id), AccountParameters: map[string]*SharingParameters{"*any": &SharingParameters{Strategy: STRATEGY_MINE_RANDOM}}}
accountingStorage.SetAccount(groupie)
ratingStorage.SetSharedGroup(sg)

View File

@@ -48,29 +48,30 @@ type Action struct {
}
const (
LOG = "*log"
RESET_TRIGGERS = "*reset_triggers"
SET_RECURRENT = "*set_recurrent"
UNSET_RECURRENT = "*unset_recurrent"
ALLOW_NEGATIVE = "*allow_negative"
DENY_NEGATIVE = "*deny_negative"
RESET_ACCOUNT = "*reset_account"
REMOVE_ACCOUNT = "*remove_account"
REMOVE_BALANCE = "*remove_balance"
TOPUP_RESET = "*topup_reset"
TOPUP = "*topup"
DEBIT_RESET = "*debit_reset"
DEBIT = "*debit"
RESET_COUNTERS = "*reset_counters"
ENABLE_ACCOUNT = "*enable_account"
DISABLE_ACCOUNT = "*disable_account"
ENABLE_DISABLE_BALANCE = "*enable_disable_balance"
CALL_URL = "*call_url"
CALL_URL_ASYNC = "*call_url_async"
MAIL_ASYNC = "*mail_async"
UNLIMITED = "*unlimited"
CDRLOG = "*cdrlog"
SET_DDESTINATIONS = "*set_ddestinations"
LOG = "*log"
RESET_TRIGGERS = "*reset_triggers"
SET_RECURRENT = "*set_recurrent"
UNSET_RECURRENT = "*unset_recurrent"
ALLOW_NEGATIVE = "*allow_negative"
DENY_NEGATIVE = "*deny_negative"
RESET_ACCOUNT = "*reset_account"
REMOVE_ACCOUNT = "*remove_account"
REMOVE_BALANCE = "*remove_balance"
TOPUP_RESET = "*topup_reset"
TOPUP = "*topup"
DEBIT_RESET = "*debit_reset"
DEBIT = "*debit"
RESET_COUNTERS = "*reset_counters"
ENABLE_ACCOUNT = "*enable_account"
DISABLE_ACCOUNT = "*disable_account"
ENABLE_DISABLE_BALANCE = "*enable_disable_balance"
CALL_URL = "*call_url"
CALL_URL_ASYNC = "*call_url_async"
MAIL_ASYNC = "*mail_async"
UNLIMITED = "*unlimited"
CDRLOG = "*cdrlog"
SET_DDESTINATIONS = "*set_ddestinations"
TRANSFER_MONETARY_DEFAULT = "*transfer_monetary_default"
)
func (a *Action) Clone() *Action {
@@ -133,6 +134,8 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
return removeAccount, true
case REMOVE_BALANCE:
return removeBalance, true
case TRANSFER_MONETARY_DEFAULT:
return transferMonetaryDefault, true
}
return nil, false
}
@@ -588,6 +591,25 @@ func removeBalance(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions)
return accountingStorage.SetAccount(ub)
}
func transferMonetaryDefault(acc *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error {
if _, exists := acc.BalanceMap[utils.MONETARY]; !exists {
return utils.ErrNotFound
}
defaultBalance := acc.GetDefaultMoneyBalance()
bChain := acc.BalanceMap[utils.MONETARY]
for _, balance := range bChain {
if balance.Uuid != defaultBalance.Uuid &&
balance.Id != defaultBalance.Id { // extra caution
if balance.Value > 0 {
defaultBalance.Value += balance.Value
balance.Value = 0
}
}
}
// update account in storage
return accountingStorage.SetAccount(acc)
}
// Structure to store actions according to weight
type Actions []*Action

View File

@@ -1451,6 +1451,62 @@ func TestActionRemoveBalance(t *testing.T) {
}
}
func TestActionTransferMonetaryDefault(t *testing.T) {
err := accountingStorage.SetAccount(
&Account{
Id: "cgrates.org:trans",
BalanceMap: map[string]BalanceChain{
utils.MONETARY: BalanceChain{
&Balance{
Uuid: utils.GenUUID(),
Id: utils.META_DEFAULT,
Value: 10,
},
&Balance{
Uuid: utils.GenUUID(),
Value: 3,
},
&Balance{
Uuid: utils.GenUUID(),
Value: 6,
},
&Balance{
Uuid: utils.GenUUID(),
Value: -2,
},
},
},
})
if err != nil {
t.Errorf("error setting account: %v", err)
}
a := &Action{
ActionType: TRANSFER_MONETARY_DEFAULT,
}
at := &ActionTiming{
accountIDs: map[string]struct{}{"cgrates.org:trans": struct{}{}},
actions: Actions{a},
}
at.Execute()
afterUb, err := accountingStorage.GetAccount("cgrates.org:trans")
if err != nil {
t.Error("account not found: ", err, afterUb)
}
if afterUb.BalanceMap[utils.MONETARY].GetTotalValue() != 17 ||
afterUb.BalanceMap[utils.MONETARY][0].Value != 19 ||
afterUb.BalanceMap[utils.MONETARY][1].Value != 0 ||
afterUb.BalanceMap[utils.MONETARY][2].Value != 0 ||
afterUb.BalanceMap[utils.MONETARY][3].Value != -2 {
for _, b := range afterUb.BalanceMap[utils.MONETARY] {
t.Logf("B: %+v", b)
}
t.Error("ransfer balance value: ", afterUb.BalanceMap[utils.MONETARY].GetTotalValue())
}
}
/**************** Benchmarks ********************************/
func BenchmarkUUID(b *testing.B) {

View File

@@ -624,7 +624,7 @@ func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err e
if _, err := Guardian.Guard(func() (interface{}, error) {
duration, err = cd.getMaxSessionDuration(account)
return 0, err
}, 0, memberIds...); err != nil {
}, 0, memberIds.Slice()...); err != nil {
return 0, err
}
} else {
@@ -681,7 +681,7 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
_, err = Guardian.Guard(func() (interface{}, error) {
cc, err = cd.debit(account, false, true)
return 0, err
}, 0, memberIds...)
}, 0, memberIds.Slice()...)
} else {
return nil, sgerr
}
@@ -700,7 +700,7 @@ func (cd *CallDescriptor) FakeDebit() (cc *CallCost, err error) {
_, err = Guardian.Guard(func() (interface{}, error) {
cc, err = cd.debit(account, true, true)
return 0, err
}, 0, memberIds...)
}, 0, memberIds.Slice()...)
} else {
return nil, sgerr
}
@@ -749,7 +749,7 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
cc, err = cd.debit(account, false, true)
//log.Print(balanceMap[0].Value, balanceMap[1].Value)
return 0, err
}, 0, memberIds...)
}, 0, memberIds.Slice()...)
if err != nil {
return cc, err
}

View File

@@ -40,7 +40,7 @@ const (
type SharedGroup struct {
Id string
AccountParameters map[string]*SharingParameters
MemberIds []string
MemberIds utils.StringMap
//members []*Account // accounts caching
}
@@ -92,7 +92,7 @@ func (sg *SharedGroup) SortBalancesByStrategy(myBalance *Balance, bc BalanceChai
// Returns all shared group's balances collected from user accounts'
func (sg *SharedGroup) GetBalances(destination, category, direction, balanceType string, ub *Account) (bc BalanceChain) {
// if len(sg.members) == 0 {
for _, ubId := range sg.MemberIds {
for ubId := range sg.MemberIds {
var nUb *Account
if ubId == ub.Id { // skip the initiating user
nUb = ub

View File

@@ -21,6 +21,8 @@ package engine
import (
"reflect"
"testing"
"github.com/cgrates/cgrates/utils"
)
func TestSharedSetGet(t *testing.T) {
@@ -30,7 +32,7 @@ func TestSharedSetGet(t *testing.T) {
AccountParameters: map[string]*SharingParameters{
"test": &SharingParameters{Strategy: STRATEGY_HIGHEST},
},
MemberIds: []string{"1", "2", "3"},
MemberIds: utils.NewStringMap("1", "2", "3"),
}
err := ratingStorage.SetSharedGroup(sg)
if err != nil {

View File

@@ -108,6 +108,7 @@ func (self *SMGSession) debit(dur time.Duration) (time.Duration, error) {
// Attempts to refund a duration, error on failure
func (self *SMGSession) refund(refundDuration time.Duration) error {
initialRefundDuration := refundDuration
lastCC := self.callCosts[len(self.callCosts)-1]
lastCC.Timespans.Decompress()
var refundIncrements engine.Increments
@@ -157,7 +158,7 @@ func (self *SMGSession) refund(refundDuration time.Duration) error {
Increments: refundIncrements,
}
cd.Increments.Compress()
utils.Logger.Info(fmt.Sprintf("Refunding duration %v with cd: %+v", refundDuration, cd))
utils.Logger.Info(fmt.Sprintf("Refunding duration %v with cd: %+v", initialRefundDuration, utils.ToJSON(cd)))
var response float64
err := self.rater.Call("Responder.RefundIncrements", cd, &response)
if err != nil {