diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index a0d72d6a7..127fac4f3 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -203,5 +203,21 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC server.RpcRegister(responder) server.RpcRegister(apierRpcV1) server.RpcRegister(apierRpcV2) + + utils.RegisterRpcParams("", &engine.Stats{}) + utils.RegisterRpcParams("", &v1.CDRStatsV1{}) + utils.RegisterRpcParams("ScribeV1", &history.FileScribe{}) + utils.RegisterRpcParams("PubSubV1", &engine.PubSub{}) + utils.RegisterRpcParams("AliasesV1", &engine.AliasHandler{}) + utils.RegisterRpcParams("UsersV1", &engine.UserMap{}) + utils.RegisterRpcParams("UsersV1", &engine.UserMap{}) + utils.RegisterRpcParams("", &v1.CdrsV1{}) + utils.RegisterRpcParams("", &v2.CdrsV2{}) + utils.RegisterRpcParams("", &v1.SessionManagerV1{}) + utils.RegisterRpcParams("", &v1.SMGenericV1{}) + utils.RegisterRpcParams("", responder) + utils.RegisterRpcParams("", apierRpcV1) + utils.RegisterRpcParams("", apierRpcV2) + internalRaterChan <- responder // Rater done } diff --git a/data/tariffplans/testtp/AccountActions.csv b/data/tariffplans/testtp/AccountActions.csv index 917d8d0b9..850d03f77 100644 --- a/data/tariffplans/testtp/AccountActions.csv +++ b/data/tariffplans/testtp/AccountActions.csv @@ -8,4 +8,5 @@ cgrates.org,1009,TEST_EXE,,, cgrates.org,1010,TEST_DATA_r,,true, cgrates.org,1011,TEST_VOICE,,, cgrates.org,1012,PREPAID_10,,, -cgrates.org,1013,TEST_NEG,,, \ No newline at end of file +cgrates.org,1013,TEST_NEG,,, +cgrates.org,1014,TEST_RPC,,, \ No newline at end of file diff --git a/data/tariffplans/testtp/ActionPlans.csv b/data/tariffplans/testtp/ActionPlans.csv index 54731428a..897278d0e 100644 --- a/data/tariffplans/testtp/ActionPlans.csv +++ b/data/tariffplans/testtp/ActionPlans.csv @@ -5,3 +5,4 @@ TEST_EXE,TOPUP_EXE,ALWAYS,10 TEST_DATA_r,TOPUP_DATA_r,ASAP,10 TEST_VOICE,TOPUP_VOICE,ASAP,10 TEST_NEG,TOPUP_NEG,ASAP,10 +TEST_RPC,RPC,ALWAYS,10 \ No newline at end of file diff --git a/data/tariffplans/testtp/Actions.csv b/data/tariffplans/testtp/Actions.csv index 1525522f2..680866202 100644 --- a/data/tariffplans/testtp/Actions.csv +++ b/data/tariffplans/testtp/Actions.csv @@ -9,4 +9,4 @@ TOPUP_DATA_r,*topup,,,,*monetary,*out,,DATA_DEST,,,*unlimited,,5000000,10,false, TOPUP_DATA_r,*topup,,,,*data,*out,,DATA_DEST,datar,,*unlimited,,50000000000,10,false,false,10 TOPUP_VOICE,*topup,,,,*voice,*out,,GERMANY_MOBILE,,,*unlimited,,50000,10,false,false,10 TOPUP_NEG,*topup,,,,*voice,*out,,GERMANY;!GERMANY_MOBILE,*zero1m,,*unlimited,,100,10,false,false,10 -RPC,*cgr_rpc,"{""Address"": ""localhost:2013"",""Transport"":""*gob"",""Method"":""ApierV2.SetAccount"",""Attempts"":1,""Async"" :false,""Param"":{""Account"":""rpc"",""Tenant"":""cgrates.org""}}",,,,,,,,,,,,,,, +RPC,*cgr_rpc,"{""Address"": ""localhost:2013"",""Transport"":""*gob"",""Method"":""ApierV2.SetAccount"",""Attempts"":1,""Async"" :false,""Params"":{""Account"":""rpc"",""Tenant"":""cgrates.org""}}",,,,,,,,,,,,,,, diff --git a/engine/action.go b/engine/action.go index 358f87dc0..903ce7d90 100644 --- a/engine/action.go +++ b/engine/action.go @@ -657,7 +657,7 @@ type RPCRequest struct { Method string Attempts int Async bool - Param map[string]interface{} + Params map[string]interface{} } func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error { @@ -671,29 +671,25 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti } var client rpcclient.RpcClientConnection if req.Address != utils.MetaInternal { - if client, err = rpcclient.NewRpcClient(req.Method, req.Address, req.Attempts, 0, req.Transport, nil); err != nil { + if client, err = rpcclient.NewRpcClient("tcp", req.Address, req.Attempts, 0, req.Transport, nil); err != nil { return err } } else { - client = params.Object + client = params.Object.(rpcclient.RpcClientConnection) } - if client == nil { - return utils.ErrServerError - } - in, out := params.InParam, params.OutParam - p, err := utils.FromMapStringInterfaceValue(req.Param, in) + p, err := utils.FromMapStringInterfaceValue(req.Params, in) if err != nil { return err } if !req.Async { err = client.Call(req.Method, p, out) - utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %+v err: %v", out, err)) + utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %s err: %v", utils.ToJSON(out), err)) return err } go func() { err := client.Call(req.Method, p, out) - utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %+v err: %v", out, err)) + utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %s err: %v", utils.ToJSON(out), err)) }() return nil } diff --git a/engine/actions_test.go b/engine/actions_test.go index 14c3dd104..fb1481ef4 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -2212,7 +2212,7 @@ func TestCgrRpcAction(t *testing.T) { "Method": "TestRPCParameters.Hopa", "Attempts":1, "Async" :false, - "Param": {"Name":"n", "Surname":"s", "Age":10.2}}`, + "Params": {"Name":"n", "Surname":"s", "Age":10.2}}`, } if err := cgrRPCAction(nil, nil, a, nil); err != nil { t.Error("error executing cgr action: ", err) diff --git a/engine/storage_interface.go b/engine/storage_interface.go index c2123970e..96a4b32c6 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -92,6 +92,8 @@ type AccountingStorage interface { RemoveAlias(string) error GetLoadHistory(int, bool) ([]*LoadInstance, error) AddLoadHistory(*LoadInstance, int) error + GetStructVersion() (*StructVersion, error) + SetStructVersion(*StructVersion) error } type CdrStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index b6057619a..afab1a06d 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -924,3 +924,27 @@ func (ms *MapStorage) LogActionTiming(source string, at *ActionTiming, as Action ms.dict[utils.LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s", string(mat), string(mas))) return } + +func (ms *MapStorage) SetStructVersion(v *StructVersion) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + var result []byte + result, err = ms.ms.Marshal(v) + if err != nil { + return + } + ms.dict[utils.VERSION_PREFIX+"struct"] = result + return +} + +func (ms *MapStorage) GetStructVersion() (rsv *StructVersion, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + rsv = &StructVersion{} + if values, ok := ms.dict[utils.VERSION_PREFIX+"struct"]; ok { + err = ms.ms.Unmarshal(values, &rsv) + } else { + return nil, utils.ErrNotFound + } + return +} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 130845ec9..68ba06277 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -53,6 +53,7 @@ const ( colLogAtr = "action_trigger_logs" colLogApl = "action_plan_logs" colLogErr = "error_logs" + colVer = "version" ) var ( @@ -1363,3 +1364,20 @@ func (ms *MongoStorage) GetAllCdrStats() (css []*CdrStats, err error) { err = iter.Close() return } + +func (ms *MongoStorage) SetStructVersion(v *StructVersion) (err error) { + _, err = ms.db.C(colVer).Upsert(bson.M{"key": utils.VERSION_PREFIX + "struct"}, &struct { + Key string + Value *StructVersion + }{utils.VERSION_PREFIX + "struct", v}) + return +} + +func (ms *MongoStorage) GetStructVersion() (rsv *StructVersion, err error) { + rsv = new(StructVersion) + err = ms.db.C(colVer).Find(bson.M{"key": utils.VERSION_PREFIX + "struct"}).One(rsv) + if err == mgo.ErrNotFound { + rsv = nil + } + return +} diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 757960f4c..896181922 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1093,3 +1093,21 @@ func (rs *RedisStorage) LogActionTiming(source string, at *ActionTiming, as Acti } return rs.db.Cmd("SET", utils.LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v", string(mat), string(mas)))).Err } + +func (rs *RedisStorage) SetStructVersion(v *StructVersion) (err error) { + var result []byte + result, err = rs.ms.Marshal(v) + if err != nil { + return + } + return rs.db.Cmd("SET", utils.VERSION_PREFIX+"struct", result).Err +} + +func (rs *RedisStorage) GetStructVersion() (rsv *StructVersion, err error) { + var values []byte + rsv = &StructVersion{} + if values, err = rs.db.Cmd("GET", utils.VERSION_PREFIX+"struct").Bytes(); err == nil { + err = rs.ms.Unmarshal(values, &rsv) + } + return +} diff --git a/engine/version.go b/engine/version.go new file mode 100644 index 000000000..047036261 --- /dev/null +++ b/engine/version.go @@ -0,0 +1,133 @@ +package engine + +import ( + "fmt" + + "github.com/cgrates/cgrates/utils" +) + +func init() { + // get current db version + dbVersion, err := accountingStorage.GetStructVersion() + if err != nil { + utils.Logger.Warning(fmt.Sprintf("Could not retrive current version from db: %v", err)) + return + } + // comparing versions + if currentVersion.CompareAndMigrate(dbVersion) { + // write the new values + if err := accountingStorage.SetStructVersion(currentVersion); err != nil { + utils.Logger.Warning(fmt.Sprintf("Could not write current version to db: %v", err)) + } + } +} + +var ( + currentVersion = &StructVersion{ + Destinations: "1", + RatingPlans: "1", + RatingProfiles: "1", + Lcrs: "1", + DerivedChargers: "1", + Actions: "1", + ActionPlans: "1", + ActionTriggers: "1", + SharedGroups: "1", + Accounts: "1", + CdrStats: "1", + Users: "1", + Alias: "1", + PubSubs: "1", + LoadHistory: "1", + Cdrs: "1", + SMCosts: "1", + } +) + +type StructVersion struct { + // rating + Destinations string + RatingPlans string + RatingProfiles string + Lcrs string + DerivedChargers string + Actions string + ActionPlans string + ActionTriggers string + SharedGroups string + // accounting + Accounts string + CdrStats string + Users string + Alias string + PubSubs string + LoadHistory string + // cdr + Cdrs string + SMCosts string +} + +func (sv *StructVersion) CompareAndMigrate(dbVer *StructVersion) bool { + migrationPerformed := false + if sv.Destinations != dbVer.Destinations { + migrationPerformed = true + + } + if sv.RatingPlans != dbVer.RatingPlans { + migrationPerformed = true + + } + if sv.RatingProfiles != dbVer.RatingPlans { + migrationPerformed = true + + } + if sv.Lcrs != dbVer.Lcrs { + migrationPerformed = true + + } + if sv.DerivedChargers != dbVer.DerivedChargers { + migrationPerformed = true + + } + if sv.Actions != dbVer.Actions { + migrationPerformed = true + + } + if sv.ActionPlans != dbVer.ActionPlans { + migrationPerformed = true + + } + if sv.ActionTriggers != dbVer.ActionTriggers { + migrationPerformed = true + + } + if sv.SharedGroups != dbVer.SharedGroups { + migrationPerformed = true + + } + if sv.Accounts != dbVer.Accounts { + migrationPerformed = true + } + if sv.CdrStats != dbVer.CdrStats { + migrationPerformed = true + } + if sv.Users != dbVer.Users { + migrationPerformed = true + } + if sv.Alias != dbVer.Alias { + migrationPerformed = true + } + if sv.PubSubs != dbVer.PubSubs { + migrationPerformed = true + } + if sv.LoadHistory != dbVer.LoadHistory { + migrationPerformed = true + } + if sv.Cdrs != dbVer.Cdrs { + migrationPerformed = true + } + if sv.SMCosts != dbVer.SMCosts { + migrationPerformed = true + } + return migrationPerformed +} diff --git a/utils/consts.go b/utils/consts.go index 15889b7d8..b5a720173 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -197,6 +197,7 @@ const ( LOG_CALL_COST_PREFIX = "cco_" LOG_ACTION_TIMMING_PREFIX = "ltm_" LOG_ACTION_TRIGGER_PREFIX = "ltr_" + VERSION_PREFIX = "ver_" LOG_ERR = "ler_" LOG_CDR = "cdr_" LOG_MEDIATED_CDR = "mcd_" diff --git a/utils/rpc_params.go b/utils/rpc_params.go index 69020fdec..c48e84085 100644 --- a/utils/rpc_params.go +++ b/utils/rpc_params.go @@ -2,14 +2,12 @@ package utils import ( "reflect" - - "github.com/cgrates/rpcclient" ) var rpcParamsMap map[string]*RpcParams type RpcParams struct { - Object rpcclient.RpcClientConnection + Object interface{} InParam reflect.Value OutParam interface{} } @@ -18,7 +16,7 @@ func init() { rpcParamsMap = make(map[string]*RpcParams) } -func RegisterRpcParams(name string, obj rpcclient.RpcClientConnection) { +func RegisterRpcParams(name string, obj interface{}) { objType := reflect.TypeOf(obj) if name == "" { val := reflect.ValueOf(obj) @@ -31,10 +29,14 @@ func RegisterRpcParams(name string, obj rpcclient.RpcClientConnection) { method := objType.Method(i) methodType := method.Type if methodType.NumIn() == 3 { // if it has three parameters (one is self and two are rpc params) + out := methodType.In(2) + if out.Kind() == reflect.Ptr { + out = out.Elem() + } rpcParamsMap[name+"."+method.Name] = &RpcParams{ Object: obj, InParam: reflect.New(methodType.In(1)), - OutParam: reflect.New(methodType.In(2).Elem()).Interface(), + OutParam: reflect.New(out).Interface(), } } @@ -42,6 +44,7 @@ func RegisterRpcParams(name string, obj rpcclient.RpcClientConnection) { } func GetRpcParams(method string) (*RpcParams, error) { + Logger.Info("REGISTERED: " + ToJSON(rpcParamsMap)) x, found := rpcParamsMap[method] if !found { return nil, ErrNotFound