Merge branch 'hapool' of https://github.com/cgrates/cgrates into hapool

This commit is contained in:
DanB
2016-04-26 12:04:17 +02:00
13 changed files with 231 additions and 18 deletions

View File

@@ -203,5 +203,21 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
server.RpcRegister(responder)
server.RpcRegister(apierRpcV1)
server.RpcRegister(apierRpcV2)
utils.RegisterRpcParams("", &engine.Stats{})
utils.RegisterRpcParams("", &v1.CDRStatsV1{})
utils.RegisterRpcParams("ScribeV1", &history.FileScribe{})
utils.RegisterRpcParams("PubSubV1", &engine.PubSub{})
utils.RegisterRpcParams("AliasesV1", &engine.AliasHandler{})
utils.RegisterRpcParams("UsersV1", &engine.UserMap{})
utils.RegisterRpcParams("UsersV1", &engine.UserMap{})
utils.RegisterRpcParams("", &v1.CdrsV1{})
utils.RegisterRpcParams("", &v2.CdrsV2{})
utils.RegisterRpcParams("", &v1.SessionManagerV1{})
utils.RegisterRpcParams("", &v1.SMGenericV1{})
utils.RegisterRpcParams("", responder)
utils.RegisterRpcParams("", apierRpcV1)
utils.RegisterRpcParams("", apierRpcV2)
internalRaterChan <- responder // Rater done
}

View File

@@ -8,4 +8,5 @@ cgrates.org,1009,TEST_EXE,,,
cgrates.org,1010,TEST_DATA_r,,true,
cgrates.org,1011,TEST_VOICE,,,
cgrates.org,1012,PREPAID_10,,,
cgrates.org,1013,TEST_NEG,,,
cgrates.org,1013,TEST_NEG,,,
cgrates.org,1014,TEST_RPC,,,
1 #Tenant Account ActionPlanId ActionTriggersId AllowNegative Disabled
8 cgrates.org 1010 TEST_DATA_r true
9 cgrates.org 1011 TEST_VOICE
10 cgrates.org 1012 PREPAID_10
11 cgrates.org 1013 TEST_NEG
12 cgrates.org 1014 TEST_RPC

View File

@@ -5,3 +5,4 @@ TEST_EXE,TOPUP_EXE,ALWAYS,10
TEST_DATA_r,TOPUP_DATA_r,ASAP,10
TEST_VOICE,TOPUP_VOICE,ASAP,10
TEST_NEG,TOPUP_NEG,ASAP,10
TEST_RPC,RPC,ALWAYS,10
1 #Tag ActionsTag TimingTag Weight
5 TEST_DATA_r TOPUP_DATA_r ASAP 10
6 TEST_VOICE TOPUP_VOICE ASAP 10
7 TEST_NEG TOPUP_NEG ASAP 10
8 TEST_RPC RPC ALWAYS 10

View File

@@ -9,4 +9,4 @@ TOPUP_DATA_r,*topup,,,,*monetary,*out,,DATA_DEST,,,*unlimited,,5000000,10,false,
TOPUP_DATA_r,*topup,,,,*data,*out,,DATA_DEST,datar,,*unlimited,,50000000000,10,false,false,10
TOPUP_VOICE,*topup,,,,*voice,*out,,GERMANY_MOBILE,,,*unlimited,,50000,10,false,false,10
TOPUP_NEG,*topup,,,,*voice,*out,,GERMANY;!GERMANY_MOBILE,*zero1m,,*unlimited,,100,10,false,false,10
RPC,*cgr_rpc,"{""Address"": ""localhost:2013"",""Transport"":""*gob"",""Method"":""ApierV2.SetAccount"",""Attempts"":1,""Async"" :false,""Param"":{""Account"":""rpc"",""Tenant"":""cgrates.org""}}",,,,,,,,,,,,,,,
RPC,*cgr_rpc,"{""Address"": ""localhost:2013"",""Transport"":""*gob"",""Method"":""ApierV2.SetAccount"",""Attempts"":1,""Async"" :false,""Params"":{""Account"":""rpc"",""Tenant"":""cgrates.org""}}",,,,,,,,,,,,,,,
1 #ActionsTag[0] Action[1] ActionExtraParameters[2] Filter[3] BalanceTag[4] BalanceType[5] Directions[6] Categories[7] DestinationIds[8] RatingSubject[9] SharedGroup[10] ExpiryTime[11] TimingTags[12] Units[13] BalanceWeight[14] BalanceBlocker[15] BalanceDisabled[16] Weight[17]
9 TOPUP_DATA_r *topup *data *out DATA_DEST datar *unlimited 50000000000 10 false false 10
10 TOPUP_VOICE *topup *voice *out GERMANY_MOBILE *unlimited 50000 10 false false 10
11 TOPUP_NEG *topup *voice *out GERMANY;!GERMANY_MOBILE *zero1m *unlimited 100 10 false false 10
12 RPC *cgr_rpc {"Address": "localhost:2013","Transport":"*gob","Method":"ApierV2.SetAccount","Attempts":1,"Async" :false,"Param":{"Account":"rpc","Tenant":"cgrates.org"}} {"Address": "localhost:2013","Transport":"*gob","Method":"ApierV2.SetAccount","Attempts":1,"Async" :false,"Params":{"Account":"rpc","Tenant":"cgrates.org"}}

View File

@@ -657,7 +657,7 @@ type RPCRequest struct {
Method string
Attempts int
Async bool
Param map[string]interface{}
Params map[string]interface{}
}
func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error {
@@ -671,29 +671,25 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti
}
var client rpcclient.RpcClientConnection
if req.Address != utils.MetaInternal {
if client, err = rpcclient.NewRpcClient(req.Method, req.Address, req.Attempts, 0, req.Transport, nil); err != nil {
if client, err = rpcclient.NewRpcClient("tcp", req.Address, req.Attempts, 0, req.Transport, nil); err != nil {
return err
}
} else {
client = params.Object
client = params.Object.(rpcclient.RpcClientConnection)
}
if client == nil {
return utils.ErrServerError
}
in, out := params.InParam, params.OutParam
p, err := utils.FromMapStringInterfaceValue(req.Param, in)
p, err := utils.FromMapStringInterfaceValue(req.Params, in)
if err != nil {
return err
}
if !req.Async {
err = client.Call(req.Method, p, out)
utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %+v err: %v", out, err))
utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %s err: %v", utils.ToJSON(out), err))
return err
}
go func() {
err := client.Call(req.Method, p, out)
utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %+v err: %v", out, err))
utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %s err: %v", utils.ToJSON(out), err))
}()
return nil
}

View File

@@ -2212,7 +2212,7 @@ func TestCgrRpcAction(t *testing.T) {
"Method": "TestRPCParameters.Hopa",
"Attempts":1,
"Async" :false,
"Param": {"Name":"n", "Surname":"s", "Age":10.2}}`,
"Params": {"Name":"n", "Surname":"s", "Age":10.2}}`,
}
if err := cgrRPCAction(nil, nil, a, nil); err != nil {
t.Error("error executing cgr action: ", err)

View File

@@ -92,6 +92,8 @@ type AccountingStorage interface {
RemoveAlias(string) error
GetLoadHistory(int, bool) ([]*LoadInstance, error)
AddLoadHistory(*LoadInstance, int) error
GetStructVersion() (*StructVersion, error)
SetStructVersion(*StructVersion) error
}
type CdrStorage interface {

View File

@@ -924,3 +924,27 @@ func (ms *MapStorage) LogActionTiming(source string, at *ActionTiming, as Action
ms.dict[utils.LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s", string(mat), string(mas)))
return
}
func (ms *MapStorage) SetStructVersion(v *StructVersion) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
var result []byte
result, err = ms.ms.Marshal(v)
if err != nil {
return
}
ms.dict[utils.VERSION_PREFIX+"struct"] = result
return
}
func (ms *MapStorage) GetStructVersion() (rsv *StructVersion, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
rsv = &StructVersion{}
if values, ok := ms.dict[utils.VERSION_PREFIX+"struct"]; ok {
err = ms.ms.Unmarshal(values, &rsv)
} else {
return nil, utils.ErrNotFound
}
return
}

View File

@@ -53,6 +53,7 @@ const (
colLogAtr = "action_trigger_logs"
colLogApl = "action_plan_logs"
colLogErr = "error_logs"
colVer = "version"
)
var (
@@ -1363,3 +1364,20 @@ func (ms *MongoStorage) GetAllCdrStats() (css []*CdrStats, err error) {
err = iter.Close()
return
}
func (ms *MongoStorage) SetStructVersion(v *StructVersion) (err error) {
_, err = ms.db.C(colVer).Upsert(bson.M{"key": utils.VERSION_PREFIX + "struct"}, &struct {
Key string
Value *StructVersion
}{utils.VERSION_PREFIX + "struct", v})
return
}
func (ms *MongoStorage) GetStructVersion() (rsv *StructVersion, err error) {
rsv = new(StructVersion)
err = ms.db.C(colVer).Find(bson.M{"key": utils.VERSION_PREFIX + "struct"}).One(rsv)
if err == mgo.ErrNotFound {
rsv = nil
}
return
}

View File

@@ -1093,3 +1093,21 @@ func (rs *RedisStorage) LogActionTiming(source string, at *ActionTiming, as Acti
}
return rs.db.Cmd("SET", utils.LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v", string(mat), string(mas)))).Err
}
func (rs *RedisStorage) SetStructVersion(v *StructVersion) (err error) {
var result []byte
result, err = rs.ms.Marshal(v)
if err != nil {
return
}
return rs.db.Cmd("SET", utils.VERSION_PREFIX+"struct", result).Err
}
func (rs *RedisStorage) GetStructVersion() (rsv *StructVersion, err error) {
var values []byte
rsv = &StructVersion{}
if values, err = rs.db.Cmd("GET", utils.VERSION_PREFIX+"struct").Bytes(); err == nil {
err = rs.ms.Unmarshal(values, &rsv)
}
return
}

133
engine/version.go Normal file
View File

@@ -0,0 +1,133 @@
package engine
import (
"fmt"
"github.com/cgrates/cgrates/utils"
)
func init() {
// get current db version
dbVersion, err := accountingStorage.GetStructVersion()
if err != nil {
utils.Logger.Warning(fmt.Sprintf("Could not retrive current version from db: %v", err))
return
}
// comparing versions
if currentVersion.CompareAndMigrate(dbVersion) {
// write the new values
if err := accountingStorage.SetStructVersion(currentVersion); err != nil {
utils.Logger.Warning(fmt.Sprintf("Could not write current version to db: %v", err))
}
}
}
var (
currentVersion = &StructVersion{
Destinations: "1",
RatingPlans: "1",
RatingProfiles: "1",
Lcrs: "1",
DerivedChargers: "1",
Actions: "1",
ActionPlans: "1",
ActionTriggers: "1",
SharedGroups: "1",
Accounts: "1",
CdrStats: "1",
Users: "1",
Alias: "1",
PubSubs: "1",
LoadHistory: "1",
Cdrs: "1",
SMCosts: "1",
}
)
type StructVersion struct {
// rating
Destinations string
RatingPlans string
RatingProfiles string
Lcrs string
DerivedChargers string
Actions string
ActionPlans string
ActionTriggers string
SharedGroups string
// accounting
Accounts string
CdrStats string
Users string
Alias string
PubSubs string
LoadHistory string
// cdr
Cdrs string
SMCosts string
}
func (sv *StructVersion) CompareAndMigrate(dbVer *StructVersion) bool {
migrationPerformed := false
if sv.Destinations != dbVer.Destinations {
migrationPerformed = true
}
if sv.RatingPlans != dbVer.RatingPlans {
migrationPerformed = true
}
if sv.RatingProfiles != dbVer.RatingPlans {
migrationPerformed = true
}
if sv.Lcrs != dbVer.Lcrs {
migrationPerformed = true
}
if sv.DerivedChargers != dbVer.DerivedChargers {
migrationPerformed = true
}
if sv.Actions != dbVer.Actions {
migrationPerformed = true
}
if sv.ActionPlans != dbVer.ActionPlans {
migrationPerformed = true
}
if sv.ActionTriggers != dbVer.ActionTriggers {
migrationPerformed = true
}
if sv.SharedGroups != dbVer.SharedGroups {
migrationPerformed = true
}
if sv.Accounts != dbVer.Accounts {
migrationPerformed = true
}
if sv.CdrStats != dbVer.CdrStats {
migrationPerformed = true
}
if sv.Users != dbVer.Users {
migrationPerformed = true
}
if sv.Alias != dbVer.Alias {
migrationPerformed = true
}
if sv.PubSubs != dbVer.PubSubs {
migrationPerformed = true
}
if sv.LoadHistory != dbVer.LoadHistory {
migrationPerformed = true
}
if sv.Cdrs != dbVer.Cdrs {
migrationPerformed = true
}
if sv.SMCosts != dbVer.SMCosts {
migrationPerformed = true
}
return migrationPerformed
}

View File

@@ -197,6 +197,7 @@ const (
LOG_CALL_COST_PREFIX = "cco_"
LOG_ACTION_TIMMING_PREFIX = "ltm_"
LOG_ACTION_TRIGGER_PREFIX = "ltr_"
VERSION_PREFIX = "ver_"
LOG_ERR = "ler_"
LOG_CDR = "cdr_"
LOG_MEDIATED_CDR = "mcd_"

View File

@@ -2,14 +2,12 @@ package utils
import (
"reflect"
"github.com/cgrates/rpcclient"
)
var rpcParamsMap map[string]*RpcParams
type RpcParams struct {
Object rpcclient.RpcClientConnection
Object interface{}
InParam reflect.Value
OutParam interface{}
}
@@ -18,7 +16,7 @@ func init() {
rpcParamsMap = make(map[string]*RpcParams)
}
func RegisterRpcParams(name string, obj rpcclient.RpcClientConnection) {
func RegisterRpcParams(name string, obj interface{}) {
objType := reflect.TypeOf(obj)
if name == "" {
val := reflect.ValueOf(obj)
@@ -31,10 +29,14 @@ func RegisterRpcParams(name string, obj rpcclient.RpcClientConnection) {
method := objType.Method(i)
methodType := method.Type
if methodType.NumIn() == 3 { // if it has three parameters (one is self and two are rpc params)
out := methodType.In(2)
if out.Kind() == reflect.Ptr {
out = out.Elem()
}
rpcParamsMap[name+"."+method.Name] = &RpcParams{
Object: obj,
InParam: reflect.New(methodType.In(1)),
OutParam: reflect.New(methodType.In(2).Elem()).Interface(),
OutParam: reflect.New(out).Interface(),
}
}
@@ -42,6 +44,7 @@ func RegisterRpcParams(name string, obj rpcclient.RpcClientConnection) {
}
func GetRpcParams(method string) (*RpcParams, error) {
Logger.Info("REGISTERED: " + ToJSON(rpcParamsMap))
x, found := rpcParamsMap[method]
if !found {
return nil, ErrNotFound