From 5c9f48e8974e93cd25bb815e64f8b21faab8cb03 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 20 Apr 2016 14:30:47 +0300 Subject: [PATCH 1/6] integration tests passing --- cmd/cgr-engine/rater.go | 10 ++++++++++ engine/action.go | 12 ++++-------- utils/rpc_params.go | 13 ++++++++----- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 41535636d..22d5570d0 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -247,5 +247,15 @@ func startRater(internalRaterChan chan *engine.Responder, cacheDoneChan chan str server.RpcRegister(responder) server.RpcRegister(apierRpcV1) server.RpcRegister(apierRpcV2) + + utils.RegisterRpcParams("", &engine.Stats{}) + utils.RegisterRpcParams("", &history.FileScribe{}) + utils.RegisterRpcParams("", &engine.PubSub{}) + utils.RegisterRpcParams("", &engine.AliasHandler{}) + utils.RegisterRpcParams("", &engine.UserMap{}) + utils.RegisterRpcParams("", responder) + utils.RegisterRpcParams("", apierRpcV1) + utils.RegisterRpcParams("", apierRpcV2) + internalRaterChan <- responder // Rater done } diff --git a/engine/action.go b/engine/action.go index 84669ff1f..71a0fdf05 100644 --- a/engine/action.go +++ b/engine/action.go @@ -671,16 +671,12 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti } var client rpcclient.RpcClientConnection if req.Address != utils.INTERNAL { - if client, err = rpcclient.NewRpcClient(req.Method, req.Address, req.Attempts, 0, req.Transport, nil); err != nil { + if client, err = rpcclient.NewRpcClient("tcp", req.Address, req.Attempts, 0, req.Transport, nil); err != nil { return err } } else { - client = params.Object + client = params.Object.(rpcclient.RpcClientConnection) } - if client == nil { - return utils.ErrServerError - } - in, out := params.InParam, params.OutParam p, err := utils.FromMapStringInterfaceValue(req.Param, in) if err != nil { @@ -688,12 +684,12 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti } if !req.Async { err = client.Call(req.Method, p, out) - utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %+v err: %v", out, err)) + utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %s err: %v", utils.ToJSON(out), err)) return err } go func() { err := client.Call(req.Method, p, out) - utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %+v err: %v", out, err)) + utils.Logger.Info(fmt.Sprintf("<*cgr_rpc> result: %s err: %v", utils.ToJSON(out), err)) }() return nil } diff --git a/utils/rpc_params.go b/utils/rpc_params.go index 69020fdec..c48e84085 100644 --- a/utils/rpc_params.go +++ b/utils/rpc_params.go @@ -2,14 +2,12 @@ package utils import ( "reflect" - - "github.com/cgrates/rpcclient" ) var rpcParamsMap map[string]*RpcParams type RpcParams struct { - Object rpcclient.RpcClientConnection + Object interface{} InParam reflect.Value OutParam interface{} } @@ -18,7 +16,7 @@ func init() { rpcParamsMap = make(map[string]*RpcParams) } -func RegisterRpcParams(name string, obj rpcclient.RpcClientConnection) { +func RegisterRpcParams(name string, obj interface{}) { objType := reflect.TypeOf(obj) if name == "" { val := reflect.ValueOf(obj) @@ -31,10 +29,14 @@ func RegisterRpcParams(name string, obj rpcclient.RpcClientConnection) { method := objType.Method(i) methodType := method.Type if methodType.NumIn() == 3 { // if it has three parameters (one is self and two are rpc params) + out := methodType.In(2) + if out.Kind() == reflect.Ptr { + out = out.Elem() + } rpcParamsMap[name+"."+method.Name] = &RpcParams{ Object: obj, InParam: reflect.New(methodType.In(1)), - OutParam: reflect.New(methodType.In(2).Elem()).Interface(), + OutParam: reflect.New(out).Interface(), } } @@ -42,6 +44,7 @@ func RegisterRpcParams(name string, obj rpcclient.RpcClientConnection) { } func GetRpcParams(method string) (*RpcParams, error) { + Logger.Info("REGISTERED: " + ToJSON(rpcParamsMap)) x, found := rpcParamsMap[method] if !found { return nil, ErrNotFound From 31ea08df333796c01d38311274e40570e84c95be Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 20 Apr 2016 14:38:28 +0300 Subject: [PATCH 2/6] correct names for registered objects --- cmd/cgr-engine/rater.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 22d5570d0..946c3b61f 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -249,10 +249,16 @@ func startRater(internalRaterChan chan *engine.Responder, cacheDoneChan chan str server.RpcRegister(apierRpcV2) utils.RegisterRpcParams("", &engine.Stats{}) - utils.RegisterRpcParams("", &history.FileScribe{}) - utils.RegisterRpcParams("", &engine.PubSub{}) - utils.RegisterRpcParams("", &engine.AliasHandler{}) - utils.RegisterRpcParams("", &engine.UserMap{}) + utils.RegisterRpcParams("", &v1.CDRStatsV1{}) + utils.RegisterRpcParams("ScribeV1", &history.FileScribe{}) + utils.RegisterRpcParams("PubSubV1", &engine.PubSub{}) + utils.RegisterRpcParams("AliasesV1", &engine.AliasHandler{}) + utils.RegisterRpcParams("UsersV1", &engine.UserMap{}) + utils.RegisterRpcParams("UsersV1", &engine.UserMap{}) + utils.RegisterRpcParams("", &v1.CdrsV1{}) + utils.RegisterRpcParams("", &v2.CdrsV2{}) + utils.RegisterRpcParams("", &v1.SessionManagerV1{}) + utils.RegisterRpcParams("", &v1.SMGenericV1{}) utils.RegisterRpcParams("", responder) utils.RegisterRpcParams("", apierRpcV1) utils.RegisterRpcParams("", apierRpcV2) From aa28ef31ce8d7e12c5458455f439a22bbade7773 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 20 Apr 2016 15:18:55 +0300 Subject: [PATCH 3/6] fix cross loading integration tests --- data/tariffplans/testtp/AccountActions.csv | 3 ++- data/tariffplans/testtp/ActionPlans.csv | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/data/tariffplans/testtp/AccountActions.csv b/data/tariffplans/testtp/AccountActions.csv index 917d8d0b9..850d03f77 100644 --- a/data/tariffplans/testtp/AccountActions.csv +++ b/data/tariffplans/testtp/AccountActions.csv @@ -8,4 +8,5 @@ cgrates.org,1009,TEST_EXE,,, cgrates.org,1010,TEST_DATA_r,,true, cgrates.org,1011,TEST_VOICE,,, cgrates.org,1012,PREPAID_10,,, -cgrates.org,1013,TEST_NEG,,, \ No newline at end of file +cgrates.org,1013,TEST_NEG,,, +cgrates.org,1014,TEST_RPC,,, \ No newline at end of file diff --git a/data/tariffplans/testtp/ActionPlans.csv b/data/tariffplans/testtp/ActionPlans.csv index 54731428a..897278d0e 100644 --- a/data/tariffplans/testtp/ActionPlans.csv +++ b/data/tariffplans/testtp/ActionPlans.csv @@ -5,3 +5,4 @@ TEST_EXE,TOPUP_EXE,ALWAYS,10 TEST_DATA_r,TOPUP_DATA_r,ASAP,10 TEST_VOICE,TOPUP_VOICE,ASAP,10 TEST_NEG,TOPUP_NEG,ASAP,10 +TEST_RPC,RPC,ALWAYS,10 \ No newline at end of file From 4f399e7683fb5cb37c92a1f06dd1ac515b34c76e Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 21 Apr 2016 15:16:13 +0300 Subject: [PATCH 4/6] renamed Param to params --- data/tariffplans/testtp/Actions.csv | 2 +- engine/action.go | 4 ++-- engine/actions_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/data/tariffplans/testtp/Actions.csv b/data/tariffplans/testtp/Actions.csv index 1525522f2..680866202 100644 --- a/data/tariffplans/testtp/Actions.csv +++ b/data/tariffplans/testtp/Actions.csv @@ -9,4 +9,4 @@ TOPUP_DATA_r,*topup,,,,*monetary,*out,,DATA_DEST,,,*unlimited,,5000000,10,false, TOPUP_DATA_r,*topup,,,,*data,*out,,DATA_DEST,datar,,*unlimited,,50000000000,10,false,false,10 TOPUP_VOICE,*topup,,,,*voice,*out,,GERMANY_MOBILE,,,*unlimited,,50000,10,false,false,10 TOPUP_NEG,*topup,,,,*voice,*out,,GERMANY;!GERMANY_MOBILE,*zero1m,,*unlimited,,100,10,false,false,10 -RPC,*cgr_rpc,"{""Address"": ""localhost:2013"",""Transport"":""*gob"",""Method"":""ApierV2.SetAccount"",""Attempts"":1,""Async"" :false,""Param"":{""Account"":""rpc"",""Tenant"":""cgrates.org""}}",,,,,,,,,,,,,,, +RPC,*cgr_rpc,"{""Address"": ""localhost:2013"",""Transport"":""*gob"",""Method"":""ApierV2.SetAccount"",""Attempts"":1,""Async"" :false,""Params"":{""Account"":""rpc"",""Tenant"":""cgrates.org""}}",,,,,,,,,,,,,,, diff --git a/engine/action.go b/engine/action.go index 71a0fdf05..de95875c4 100644 --- a/engine/action.go +++ b/engine/action.go @@ -657,7 +657,7 @@ type RPCRequest struct { Method string Attempts int Async bool - Param map[string]interface{} + Params map[string]interface{} } func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error { @@ -678,7 +678,7 @@ func cgrRPCAction(account *Account, sq *StatsQueueTriggered, a *Action, acs Acti client = params.Object.(rpcclient.RpcClientConnection) } in, out := params.InParam, params.OutParam - p, err := utils.FromMapStringInterfaceValue(req.Param, in) + p, err := utils.FromMapStringInterfaceValue(req.Params, in) if err != nil { return err } diff --git a/engine/actions_test.go b/engine/actions_test.go index f9297d25a..33d683ca5 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -2212,7 +2212,7 @@ func TestCgrRpcAction(t *testing.T) { "Method": "TestRPCParameters.Hopa", "Attempts":1, "Async" :false, - "Param": {"Name":"n", "Surname":"s", "Age":10.2}}`, + "Params": {"Name":"n", "Surname":"s", "Age":10.2}}`, } if err := cgrRPCAction(nil, nil, a, nil); err != nil { t.Error("error executing cgr action: ", err) From 1125c2425ce470eb1134a7ccb939e87b1e11c844 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sat, 23 Apr 2016 15:00:03 +0300 Subject: [PATCH 5/6] startd db structures version management --- engine/storage_interface.go | 6 ++ engine/storage_map.go | 71 +++++++++++++ engine/storage_mongo_datadb.go | 52 ++++++++++ engine/storage_redis.go | 53 ++++++++++ engine/storage_sql.go | 8 ++ engine/version.go | 176 +++++++++++++++++++++++++++++++++ utils/consts.go | 3 + 7 files changed, 369 insertions(+) create mode 100644 engine/version.go diff --git a/engine/storage_interface.go b/engine/storage_interface.go index c2123970e..35b1236cc 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -68,6 +68,8 @@ type RatingStorage interface { GetAllActionPlans() (map[string]*ActionPlan, error) PushTask(*Task) error PopTask() (*Task, error) + GetRatingStructuresVersion() (*RatingStructuresVersion, error) + SetRatingStructuresVersion(*RatingStructuresVersion) error } type AccountingStorage interface { @@ -92,6 +94,8 @@ type AccountingStorage interface { RemoveAlias(string) error GetLoadHistory(int, bool) ([]*LoadInstance, error) AddLoadHistory(*LoadInstance, int) error + GetAccountingStructuresVersion() (*AccountingStructuresVersion, error) + SetAccountingStructuresVersion(*AccountingStructuresVersion) error } type CdrStorage interface { @@ -100,6 +104,8 @@ type CdrStorage interface { SetSMCost(smc *SMCost) error GetSMCosts(cgrid, runid, originHost, originIDPrfx string) ([]*SMCost, error) GetCDRs(*utils.CDRsFilter, bool) ([]*CDR, int64, error) + GetCdrStructuresVersion() (*CdrStructuresVersion, error) + SetCdrStructuresVersion(*CdrStructuresVersion) error } type LogStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index f7e21306a..72e436497 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -924,3 +924,74 @@ func (ms *MapStorage) LogActionTiming(source string, at *ActionTiming, as Action ms.dict[utils.LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s", string(mat), string(mas))) return } + +func (ms *MapStorage) SetRatingStructuresVersion(v *RatingStructuresVersion) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + var result []byte + result, err = ms.ms.Marshal(v) + if err != nil { + return + } + ms.dict[utils.RATING_VERSION_PREFIX+"version"] = result + return +} + +func (ms *MapStorage) GetRatingStructuresVersion() (rsv *RatingStructuresVersion, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + rsv = &RatingStructuresVersion{} + if values, ok := ms.dict[utils.RATING_VERSION_PREFIX+"version"]; ok { + err = ms.ms.Unmarshal(values, &rsv) + } else { + return nil, utils.ErrNotFound + } + return +} + +func (ms *MapStorage) SetAccountingStructuresVersion(v *AccountingStructuresVersion) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + var result []byte + result, err = ms.ms.Marshal(v) + if err != nil { + return + } + ms.dict[utils.ACCOUNTING_VERSION_PREFIX+"version"] = result + return +} + +func (ms *MapStorage) GetAccountingStructuresVersion() (asv *AccountingStructuresVersion, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + asv = &AccountingStructuresVersion{} + if values, ok := ms.dict[utils.ACCOUNTING_VERSION_PREFIX+"version"]; ok { + err = ms.ms.Unmarshal(values, &asv) + } else { + return nil, utils.ErrNotFound + } + return +} +func (ms *MapStorage) SetCdrStructuresVersion(v *CdrStructuresVersion) (err error) { + ms.mu.Lock() + defer ms.mu.Unlock() + var result []byte + result, err = ms.ms.Marshal(v) + if err != nil { + return + } + ms.dict[utils.CDR_VERSION_PREFIX+"version"] = result + return +} + +func (ms *MapStorage) GetCdrStructuresVersion() (csv *CdrStructuresVersion, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + csv = &CdrStructuresVersion{} + if values, ok := ms.dict[utils.CDR_VERSION_PREFIX+"version"]; ok { + err = ms.ms.Unmarshal(values, &csv) + } else { + return nil, utils.ErrNotFound + } + return +} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 51abc1989..2201588e9 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -53,6 +53,7 @@ const ( colLogAtr = "action_trigger_logs" colLogApl = "action_plan_logs" colLogErr = "error_logs" + colVer = "versions" ) var ( @@ -1363,3 +1364,54 @@ func (ms *MongoStorage) GetAllCdrStats() (css []*CdrStats, err error) { err = iter.Close() return } + +func (ms *MongoStorage) SetRatingStructuresVersion(v *RatingStructuresVersion) (err error) { + _, err = ms.db.C(colVer).Upsert(bson.M{"key": utils.RATING_VERSION_PREFIX + "version"}, &struct { + Key string + Value *RatingStructuresVersion + }{utils.RATING_VERSION_PREFIX + "version", v}) + return +} + +func (ms *MongoStorage) GetRatingStructuresVersion() (rsv *RatingStructuresVersion, err error) { + rsv = new(RatingStructuresVersion) + err = ms.db.C(colVer).Find(bson.M{"key": utils.RATING_VERSION_PREFIX + "version"}).One(rsv) + if err == mgo.ErrNotFound { + rsv = nil + } + return +} + +func (ms *MongoStorage) SetAccountingStructuresVersion(v *AccountingStructuresVersion) (err error) { + _, err = ms.db.C(colVer).Upsert(bson.M{"key": utils.ACCOUNTING_VERSION_PREFIX + "version"}, &struct { + Key string + Value *AccountingStructuresVersion + }{utils.ACCOUNTING_VERSION_PREFIX + "version", v}) + return +} + +func (ms *MongoStorage) GetAccountingStructuresVersion() (asv *AccountingStructuresVersion, err error) { + asv = new(AccountingStructuresVersion) + err = ms.db.C(colVer).Find(bson.M{"key": utils.ACCOUNTING_VERSION_PREFIX + "version"}).One(asv) + if err == mgo.ErrNotFound { + asv = nil + } + return +} + +func (ms *MongoStorage) SetCdrStructuresVersion(v *CdrStructuresVersion) (err error) { + _, err = ms.db.C(colVer).Upsert(bson.M{"key": utils.CDR_VERSION_PREFIX + "version"}, &struct { + Key string + Value *CdrStructuresVersion + }{utils.CDR_VERSION_PREFIX + "version", v}) + return +} + +func (ms *MongoStorage) GetCdrStructuresVersion() (csv *CdrStructuresVersion, err error) { + csv = new(CdrStructuresVersion) + err = ms.db.C(colVer).Find(bson.M{"key": utils.CDR_VERSION_PREFIX + "version"}).One(csv) + if err == mgo.ErrNotFound { + csv = nil + } + return +} diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 8d8b1660a..5c69570df 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1093,3 +1093,56 @@ func (rs *RedisStorage) LogActionTiming(source string, at *ActionTiming, as Acti } return rs.db.Cmd("SET", utils.LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v", string(mat), string(mas)))).Err } + +func (rs *RedisStorage) SetRatingStructuresVersion(v *RatingStructuresVersion) (err error) { + var result []byte + result, err = rs.ms.Marshal(v) + if err != nil { + return + } + return rs.db.Cmd("SET", utils.RATING_VERSION_PREFIX+"version", result).Err +} + +func (rs *RedisStorage) GetRatingStructuresVersion() (rsv *RatingStructuresVersion, err error) { + var values []byte + rsv = &RatingStructuresVersion{} + if values, err = rs.db.Cmd("GET", utils.RATING_VERSION_PREFIX+"version").Bytes(); err == nil { + err = rs.ms.Unmarshal(values, &rsv) + } + return +} + +func (rs *RedisStorage) SetAccountingStructuresVersion(v *AccountingStructuresVersion) (err error) { + var result []byte + result, err = rs.ms.Marshal(v) + if err != nil { + return + } + return rs.db.Cmd("SET", utils.ACCOUNTING_VERSION_PREFIX+"version", result).Err +} + +func (rs *RedisStorage) GetAccountingStructuresVersion() (asv *AccountingStructuresVersion, err error) { + var values []byte + asv = &AccountingStructuresVersion{} + if values, err = rs.db.Cmd("GET", utils.ACCOUNTING_VERSION_PREFIX+"version").Bytes(); err == nil { + err = rs.ms.Unmarshal(values, &asv) + } + return +} +func (rs *RedisStorage) SetCdrStructuresVersion(v *CdrStructuresVersion) (err error) { + var result []byte + result, err = rs.ms.Marshal(v) + if err != nil { + return + } + return rs.db.Cmd("SET", utils.CDR_VERSION_PREFIX+"version", result).Err +} + +func (rs *RedisStorage) GetCdrStructuresVersion() (csv *CdrStructuresVersion, err error) { + var values []byte + csv = &CdrStructuresVersion{} + if values, err = rs.db.Cmd("GET", utils.CDR_VERSION_PREFIX+"version").Bytes(); err == nil { + err = rs.ms.Unmarshal(values, &csv) + } + return +} diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 5c8ab5528..7eeb8f166 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -1343,3 +1343,11 @@ func (self *SQLStorage) GetTpAliases(filter *TpAlias) ([]TpAlias, error) { return tpAliases, nil } + +func (self *SQLStorage) SetCdrStructuresVersion(v *CdrStructuresVersion) (err error) { + return utils.ErrNotImplemented +} + +func (self *SQLStorage) GetCdrStructuresVersion() (csv *CdrStructuresVersion, err error) { + return nil, utils.ErrNotImplemented +} diff --git a/engine/version.go b/engine/version.go new file mode 100644 index 000000000..9dc14c93c --- /dev/null +++ b/engine/version.go @@ -0,0 +1,176 @@ +package engine + +import ( + "fmt" + + "github.com/cgrates/cgrates/utils" +) + +func init() { + // get current db version + dbRsv, err := ratingStorage.GetRatingStructuresVersion() + if err != nil { + utils.Logger.Warning(fmt.Sprintf("Could not retrive current version from db: %v", err)) + return + } + // comparing versions + if currentRsv.CompareAndMigrate(dbRsv) { + // write the new values + if err := ratingStorage.SetRatingStructuresVersion(currentRsv); err != nil { + utils.Logger.Warning(fmt.Sprintf("Could not write current version to db: %v", err)) + } + } + dbAsv, err := accountingStorage.GetAccountingStructuresVersion() + if err != nil { + utils.Logger.Warning(fmt.Sprintf("Could not retrive current version from db: %v", err)) + return + } + // comparing versions + if currentAsv.CompareAndMigrate(dbAsv) { + // write the new values + if err := accountingStorage.SetAccountingStructuresVersion(currentAsv); err != nil { + utils.Logger.Warning(fmt.Sprintf("Could not write current version to db: %v", err)) + } + } + dbCsv, err := cdrStorage.GetCdrStructuresVersion() + if err != nil { + utils.Logger.Warning(fmt.Sprintf("Could not retrive current version from db: %v", err)) + return + } + // comparing versions + if currentCsv.CompareAndMigrate(dbCsv) { + // write the new values + if err := cdrStorage.SetCdrStructuresVersion(currentCsv); err != nil { + utils.Logger.Warning(fmt.Sprintf("Could not write current version to db: %v", err)) + } + } +} + +var ( + currentRsv = &RatingStructuresVersion{ + Destinations: "1", + RatingPlans: "1", + RatingProfiles: "1", + Lcrs: "1", + DerivedChargers: "1", + Actions: "1", + ActionPlans: "1", + ActionTriggers: "1", + SharedGroups: "1", + } + + currentAsv = &AccountingStructuresVersion{ + Accounts: "1", + CdrStats: "1", + Users: "1", + Alias: "1", + PubSubs: "1", + LoadHistory: "1", + } + + currentCsv = &CdrStructuresVersion{ + Cdrs: "1", + SMCosts: "1", + } +) + +type RatingStructuresVersion struct { + Destinations string + RatingPlans string + RatingProfiles string + Lcrs string + DerivedChargers string + Actions string + ActionPlans string + ActionTriggers string + SharedGroups string +} + +func (rsv *RatingStructuresVersion) CompareAndMigrate(dbRsv *RatingStructuresVersion) bool { + migrationPerformed := false + if rsv.Destinations != dbRsv.Destinations { + migrationPerformed = true + + } + if rsv.RatingPlans != dbRsv.RatingPlans { + migrationPerformed = true + + } + if rsv.RatingProfiles != dbRsv.RatingPlans { + migrationPerformed = true + + } + if rsv.Lcrs != dbRsv.Lcrs { + migrationPerformed = true + + } + if rsv.DerivedChargers != dbRsv.DerivedChargers { + migrationPerformed = true + + } + if rsv.Actions != dbRsv.Actions { + migrationPerformed = true + + } + if rsv.ActionPlans != dbRsv.ActionPlans { + migrationPerformed = true + + } + if rsv.ActionTriggers != dbRsv.ActionTriggers { + migrationPerformed = true + + } + if rsv.SharedGroups != dbRsv.SharedGroups { + migrationPerformed = true + + } + return migrationPerformed +} + +type AccountingStructuresVersion struct { + Accounts string + CdrStats string + Users string + Alias string + PubSubs string + LoadHistory string +} + +func (asv *AccountingStructuresVersion) CompareAndMigrate(dbAsv *AccountingStructuresVersion) bool { + migrationPerformed := false + if asv.Accounts != dbAsv.Accounts { + migrationPerformed = true + } + if asv.CdrStats != dbAsv.CdrStats { + migrationPerformed = true + } + if asv.Users != dbAsv.Users { + migrationPerformed = true + } + if asv.Alias != dbAsv.Alias { + migrationPerformed = true + } + if asv.PubSubs != dbAsv.PubSubs { + migrationPerformed = true + } + if asv.LoadHistory != dbAsv.LoadHistory { + migrationPerformed = true + } + return migrationPerformed +} + +type CdrStructuresVersion struct { + Cdrs string + SMCosts string +} + +func (csv *CdrStructuresVersion) CompareAndMigrate(dbCsv *CdrStructuresVersion) bool { + migrationPerformed := false + if csv.Cdrs != dbCsv.Cdrs { + migrationPerformed = true + } + if csv.SMCosts != dbCsv.SMCosts { + migrationPerformed = true + } + return migrationPerformed +} diff --git a/utils/consts.go b/utils/consts.go index b83e10808..cc39ffc95 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -197,6 +197,9 @@ const ( LOG_CALL_COST_PREFIX = "cco_" LOG_ACTION_TIMMING_PREFIX = "ltm_" LOG_ACTION_TRIGGER_PREFIX = "ltr_" + RATING_VERSION_PREFIX = "rve_" + ACCOUNTING_VERSION_PREFIX = "ave_" + CDR_VERSION_PREFIX = "cve_" LOG_ERR = "ler_" LOG_CDR = "cdr_" LOG_MEDIATED_CDR = "mcd_" From e6ce7a3bed37a7407fed9f37d92bdcc83d901124 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 25 Apr 2016 18:45:20 +0300 Subject: [PATCH 6/6] only one version structure in the db --- engine/storage_interface.go | 8 +- engine/storage_map.go | 57 +--------- engine/storage_mongo_datadb.go | 50 ++------- engine/storage_redis.go | 45 +------- engine/storage_sql.go | 8 -- engine/version.go | 189 +++++++++++++-------------------- utils/consts.go | 4 +- 7 files changed, 94 insertions(+), 267 deletions(-) diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 35b1236cc..96a4b32c6 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -68,8 +68,6 @@ type RatingStorage interface { GetAllActionPlans() (map[string]*ActionPlan, error) PushTask(*Task) error PopTask() (*Task, error) - GetRatingStructuresVersion() (*RatingStructuresVersion, error) - SetRatingStructuresVersion(*RatingStructuresVersion) error } type AccountingStorage interface { @@ -94,8 +92,8 @@ type AccountingStorage interface { RemoveAlias(string) error GetLoadHistory(int, bool) ([]*LoadInstance, error) AddLoadHistory(*LoadInstance, int) error - GetAccountingStructuresVersion() (*AccountingStructuresVersion, error) - SetAccountingStructuresVersion(*AccountingStructuresVersion) error + GetStructVersion() (*StructVersion, error) + SetStructVersion(*StructVersion) error } type CdrStorage interface { @@ -104,8 +102,6 @@ type CdrStorage interface { SetSMCost(smc *SMCost) error GetSMCosts(cgrid, runid, originHost, originIDPrfx string) ([]*SMCost, error) GetCDRs(*utils.CDRsFilter, bool) ([]*CDR, int64, error) - GetCdrStructuresVersion() (*CdrStructuresVersion, error) - SetCdrStructuresVersion(*CdrStructuresVersion) error } type LogStorage interface { diff --git a/engine/storage_map.go b/engine/storage_map.go index 72e436497..11afce2c7 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -925,7 +925,7 @@ func (ms *MapStorage) LogActionTiming(source string, at *ActionTiming, as Action return } -func (ms *MapStorage) SetRatingStructuresVersion(v *RatingStructuresVersion) (err error) { +func (ms *MapStorage) SetStructVersion(v *StructVersion) (err error) { ms.mu.Lock() defer ms.mu.Unlock() var result []byte @@ -933,65 +933,18 @@ func (ms *MapStorage) SetRatingStructuresVersion(v *RatingStructuresVersion) (er if err != nil { return } - ms.dict[utils.RATING_VERSION_PREFIX+"version"] = result + ms.dict[utils.VERSION_PREFIX+"struct"] = result return } -func (ms *MapStorage) GetRatingStructuresVersion() (rsv *RatingStructuresVersion, err error) { +func (ms *MapStorage) GetStructVersion() (rsv *StructVersion, err error) { ms.mu.RLock() defer ms.mu.RUnlock() - rsv = &RatingStructuresVersion{} - if values, ok := ms.dict[utils.RATING_VERSION_PREFIX+"version"]; ok { + rsv = &StructVersion{} + if values, ok := ms.dict[utils.VERSION_PREFIX+"struct"]; ok { err = ms.ms.Unmarshal(values, &rsv) } else { return nil, utils.ErrNotFound } return } - -func (ms *MapStorage) SetAccountingStructuresVersion(v *AccountingStructuresVersion) (err error) { - ms.mu.Lock() - defer ms.mu.Unlock() - var result []byte - result, err = ms.ms.Marshal(v) - if err != nil { - return - } - ms.dict[utils.ACCOUNTING_VERSION_PREFIX+"version"] = result - return -} - -func (ms *MapStorage) GetAccountingStructuresVersion() (asv *AccountingStructuresVersion, err error) { - ms.mu.RLock() - defer ms.mu.RUnlock() - asv = &AccountingStructuresVersion{} - if values, ok := ms.dict[utils.ACCOUNTING_VERSION_PREFIX+"version"]; ok { - err = ms.ms.Unmarshal(values, &asv) - } else { - return nil, utils.ErrNotFound - } - return -} -func (ms *MapStorage) SetCdrStructuresVersion(v *CdrStructuresVersion) (err error) { - ms.mu.Lock() - defer ms.mu.Unlock() - var result []byte - result, err = ms.ms.Marshal(v) - if err != nil { - return - } - ms.dict[utils.CDR_VERSION_PREFIX+"version"] = result - return -} - -func (ms *MapStorage) GetCdrStructuresVersion() (csv *CdrStructuresVersion, err error) { - ms.mu.RLock() - defer ms.mu.RUnlock() - csv = &CdrStructuresVersion{} - if values, ok := ms.dict[utils.CDR_VERSION_PREFIX+"version"]; ok { - err = ms.ms.Unmarshal(values, &csv) - } else { - return nil, utils.ErrNotFound - } - return -} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 2201588e9..36656fd70 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -53,7 +53,7 @@ const ( colLogAtr = "action_trigger_logs" colLogApl = "action_plan_logs" colLogErr = "error_logs" - colVer = "versions" + colVer = "version" ) var ( @@ -1365,53 +1365,19 @@ func (ms *MongoStorage) GetAllCdrStats() (css []*CdrStats, err error) { return } -func (ms *MongoStorage) SetRatingStructuresVersion(v *RatingStructuresVersion) (err error) { - _, err = ms.db.C(colVer).Upsert(bson.M{"key": utils.RATING_VERSION_PREFIX + "version"}, &struct { +func (ms *MongoStorage) SetStructVersion(v *StructVersion) (err error) { + _, err = ms.db.C(colVer).Upsert(bson.M{"key": utils.VERSION_PREFIX + "struct"}, &struct { Key string - Value *RatingStructuresVersion - }{utils.RATING_VERSION_PREFIX + "version", v}) + Value *StructVersion + }{utils.VERSION_PREFIX + "struct", v}) return } -func (ms *MongoStorage) GetRatingStructuresVersion() (rsv *RatingStructuresVersion, err error) { - rsv = new(RatingStructuresVersion) - err = ms.db.C(colVer).Find(bson.M{"key": utils.RATING_VERSION_PREFIX + "version"}).One(rsv) +func (ms *MongoStorage) GetStructVersion() (rsv *StructVersion, err error) { + rsv = new(StructVersion) + err = ms.db.C(colVer).Find(bson.M{"key": utils.VERSION_PREFIX + "struct"}).One(rsv) if err == mgo.ErrNotFound { rsv = nil } return } - -func (ms *MongoStorage) SetAccountingStructuresVersion(v *AccountingStructuresVersion) (err error) { - _, err = ms.db.C(colVer).Upsert(bson.M{"key": utils.ACCOUNTING_VERSION_PREFIX + "version"}, &struct { - Key string - Value *AccountingStructuresVersion - }{utils.ACCOUNTING_VERSION_PREFIX + "version", v}) - return -} - -func (ms *MongoStorage) GetAccountingStructuresVersion() (asv *AccountingStructuresVersion, err error) { - asv = new(AccountingStructuresVersion) - err = ms.db.C(colVer).Find(bson.M{"key": utils.ACCOUNTING_VERSION_PREFIX + "version"}).One(asv) - if err == mgo.ErrNotFound { - asv = nil - } - return -} - -func (ms *MongoStorage) SetCdrStructuresVersion(v *CdrStructuresVersion) (err error) { - _, err = ms.db.C(colVer).Upsert(bson.M{"key": utils.CDR_VERSION_PREFIX + "version"}, &struct { - Key string - Value *CdrStructuresVersion - }{utils.CDR_VERSION_PREFIX + "version", v}) - return -} - -func (ms *MongoStorage) GetCdrStructuresVersion() (csv *CdrStructuresVersion, err error) { - csv = new(CdrStructuresVersion) - err = ms.db.C(colVer).Find(bson.M{"key": utils.CDR_VERSION_PREFIX + "version"}).One(csv) - if err == mgo.ErrNotFound { - csv = nil - } - return -} diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 5c69570df..b4aef0fb7 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1094,55 +1094,20 @@ func (rs *RedisStorage) LogActionTiming(source string, at *ActionTiming, as Acti return rs.db.Cmd("SET", utils.LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v", string(mat), string(mas)))).Err } -func (rs *RedisStorage) SetRatingStructuresVersion(v *RatingStructuresVersion) (err error) { +func (rs *RedisStorage) SetStructVersion(v *StructVersion) (err error) { var result []byte result, err = rs.ms.Marshal(v) if err != nil { return } - return rs.db.Cmd("SET", utils.RATING_VERSION_PREFIX+"version", result).Err + return rs.db.Cmd("SET", utils.VERSION_PREFIX+"struct", result).Err } -func (rs *RedisStorage) GetRatingStructuresVersion() (rsv *RatingStructuresVersion, err error) { +func (rs *RedisStorage) GetStructVersion() (rsv *StructVersion, err error) { var values []byte - rsv = &RatingStructuresVersion{} - if values, err = rs.db.Cmd("GET", utils.RATING_VERSION_PREFIX+"version").Bytes(); err == nil { + rsv = &StructVersion{} + if values, err = rs.db.Cmd("GET", utils.VERSION_PREFIX+"struct").Bytes(); err == nil { err = rs.ms.Unmarshal(values, &rsv) } return } - -func (rs *RedisStorage) SetAccountingStructuresVersion(v *AccountingStructuresVersion) (err error) { - var result []byte - result, err = rs.ms.Marshal(v) - if err != nil { - return - } - return rs.db.Cmd("SET", utils.ACCOUNTING_VERSION_PREFIX+"version", result).Err -} - -func (rs *RedisStorage) GetAccountingStructuresVersion() (asv *AccountingStructuresVersion, err error) { - var values []byte - asv = &AccountingStructuresVersion{} - if values, err = rs.db.Cmd("GET", utils.ACCOUNTING_VERSION_PREFIX+"version").Bytes(); err == nil { - err = rs.ms.Unmarshal(values, &asv) - } - return -} -func (rs *RedisStorage) SetCdrStructuresVersion(v *CdrStructuresVersion) (err error) { - var result []byte - result, err = rs.ms.Marshal(v) - if err != nil { - return - } - return rs.db.Cmd("SET", utils.CDR_VERSION_PREFIX+"version", result).Err -} - -func (rs *RedisStorage) GetCdrStructuresVersion() (csv *CdrStructuresVersion, err error) { - var values []byte - csv = &CdrStructuresVersion{} - if values, err = rs.db.Cmd("GET", utils.CDR_VERSION_PREFIX+"version").Bytes(); err == nil { - err = rs.ms.Unmarshal(values, &csv) - } - return -} diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 7eeb8f166..5c8ab5528 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -1343,11 +1343,3 @@ func (self *SQLStorage) GetTpAliases(filter *TpAlias) ([]TpAlias, error) { return tpAliases, nil } - -func (self *SQLStorage) SetCdrStructuresVersion(v *CdrStructuresVersion) (err error) { - return utils.ErrNotImplemented -} - -func (self *SQLStorage) GetCdrStructuresVersion() (csv *CdrStructuresVersion, err error) { - return nil, utils.ErrNotImplemented -} diff --git a/engine/version.go b/engine/version.go index 9dc14c93c..047036261 100644 --- a/engine/version.go +++ b/engine/version.go @@ -8,46 +8,22 @@ import ( func init() { // get current db version - dbRsv, err := ratingStorage.GetRatingStructuresVersion() + dbVersion, err := accountingStorage.GetStructVersion() if err != nil { utils.Logger.Warning(fmt.Sprintf("Could not retrive current version from db: %v", err)) return } // comparing versions - if currentRsv.CompareAndMigrate(dbRsv) { + if currentVersion.CompareAndMigrate(dbVersion) { // write the new values - if err := ratingStorage.SetRatingStructuresVersion(currentRsv); err != nil { - utils.Logger.Warning(fmt.Sprintf("Could not write current version to db: %v", err)) - } - } - dbAsv, err := accountingStorage.GetAccountingStructuresVersion() - if err != nil { - utils.Logger.Warning(fmt.Sprintf("Could not retrive current version from db: %v", err)) - return - } - // comparing versions - if currentAsv.CompareAndMigrate(dbAsv) { - // write the new values - if err := accountingStorage.SetAccountingStructuresVersion(currentAsv); err != nil { - utils.Logger.Warning(fmt.Sprintf("Could not write current version to db: %v", err)) - } - } - dbCsv, err := cdrStorage.GetCdrStructuresVersion() - if err != nil { - utils.Logger.Warning(fmt.Sprintf("Could not retrive current version from db: %v", err)) - return - } - // comparing versions - if currentCsv.CompareAndMigrate(dbCsv) { - // write the new values - if err := cdrStorage.SetCdrStructuresVersion(currentCsv); err != nil { + if err := accountingStorage.SetStructVersion(currentVersion); err != nil { utils.Logger.Warning(fmt.Sprintf("Could not write current version to db: %v", err)) } } } var ( - currentRsv = &RatingStructuresVersion{ + currentVersion = &StructVersion{ Destinations: "1", RatingPlans: "1", RatingProfiles: "1", @@ -57,24 +33,19 @@ var ( ActionPlans: "1", ActionTriggers: "1", SharedGroups: "1", - } - - currentAsv = &AccountingStructuresVersion{ - Accounts: "1", - CdrStats: "1", - Users: "1", - Alias: "1", - PubSubs: "1", - LoadHistory: "1", - } - - currentCsv = &CdrStructuresVersion{ - Cdrs: "1", - SMCosts: "1", + Accounts: "1", + CdrStats: "1", + Users: "1", + Alias: "1", + PubSubs: "1", + LoadHistory: "1", + Cdrs: "1", + SMCosts: "1", } ) -type RatingStructuresVersion struct { +type StructVersion struct { + // rating Destinations string RatingPlans string RatingProfiles string @@ -84,92 +55,78 @@ type RatingStructuresVersion struct { ActionPlans string ActionTriggers string SharedGroups string -} - -func (rsv *RatingStructuresVersion) CompareAndMigrate(dbRsv *RatingStructuresVersion) bool { - migrationPerformed := false - if rsv.Destinations != dbRsv.Destinations { - migrationPerformed = true - - } - if rsv.RatingPlans != dbRsv.RatingPlans { - migrationPerformed = true - - } - if rsv.RatingProfiles != dbRsv.RatingPlans { - migrationPerformed = true - - } - if rsv.Lcrs != dbRsv.Lcrs { - migrationPerformed = true - - } - if rsv.DerivedChargers != dbRsv.DerivedChargers { - migrationPerformed = true - - } - if rsv.Actions != dbRsv.Actions { - migrationPerformed = true - - } - if rsv.ActionPlans != dbRsv.ActionPlans { - migrationPerformed = true - - } - if rsv.ActionTriggers != dbRsv.ActionTriggers { - migrationPerformed = true - - } - if rsv.SharedGroups != dbRsv.SharedGroups { - migrationPerformed = true - - } - return migrationPerformed -} - -type AccountingStructuresVersion struct { + // accounting Accounts string CdrStats string Users string Alias string PubSubs string LoadHistory string -} - -func (asv *AccountingStructuresVersion) CompareAndMigrate(dbAsv *AccountingStructuresVersion) bool { - migrationPerformed := false - if asv.Accounts != dbAsv.Accounts { - migrationPerformed = true - } - if asv.CdrStats != dbAsv.CdrStats { - migrationPerformed = true - } - if asv.Users != dbAsv.Users { - migrationPerformed = true - } - if asv.Alias != dbAsv.Alias { - migrationPerformed = true - } - if asv.PubSubs != dbAsv.PubSubs { - migrationPerformed = true - } - if asv.LoadHistory != dbAsv.LoadHistory { - migrationPerformed = true - } - return migrationPerformed -} - -type CdrStructuresVersion struct { + // cdr Cdrs string SMCosts string } -func (csv *CdrStructuresVersion) CompareAndMigrate(dbCsv *CdrStructuresVersion) bool { +func (sv *StructVersion) CompareAndMigrate(dbVer *StructVersion) bool { migrationPerformed := false - if csv.Cdrs != dbCsv.Cdrs { + if sv.Destinations != dbVer.Destinations { + migrationPerformed = true + + } + if sv.RatingPlans != dbVer.RatingPlans { + migrationPerformed = true + + } + if sv.RatingProfiles != dbVer.RatingPlans { + migrationPerformed = true + + } + if sv.Lcrs != dbVer.Lcrs { + migrationPerformed = true + + } + if sv.DerivedChargers != dbVer.DerivedChargers { + migrationPerformed = true + + } + if sv.Actions != dbVer.Actions { + migrationPerformed = true + + } + if sv.ActionPlans != dbVer.ActionPlans { + migrationPerformed = true + + } + if sv.ActionTriggers != dbVer.ActionTriggers { + migrationPerformed = true + + } + if sv.SharedGroups != dbVer.SharedGroups { + migrationPerformed = true + + } + if sv.Accounts != dbVer.Accounts { migrationPerformed = true } - if csv.SMCosts != dbCsv.SMCosts { + if sv.CdrStats != dbVer.CdrStats { + migrationPerformed = true + } + if sv.Users != dbVer.Users { + migrationPerformed = true + } + if sv.Alias != dbVer.Alias { + migrationPerformed = true + } + if sv.PubSubs != dbVer.PubSubs { + migrationPerformed = true + } + if sv.LoadHistory != dbVer.LoadHistory { + migrationPerformed = true + } + if sv.Cdrs != dbVer.Cdrs { + migrationPerformed = true + } + if sv.SMCosts != dbVer.SMCosts { migrationPerformed = true } return migrationPerformed diff --git a/utils/consts.go b/utils/consts.go index cc39ffc95..290824d53 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -197,9 +197,7 @@ const ( LOG_CALL_COST_PREFIX = "cco_" LOG_ACTION_TIMMING_PREFIX = "ltm_" LOG_ACTION_TRIGGER_PREFIX = "ltr_" - RATING_VERSION_PREFIX = "rve_" - ACCOUNTING_VERSION_PREFIX = "ave_" - CDR_VERSION_PREFIX = "cve_" + VERSION_PREFIX = "ver_" LOG_ERR = "ler_" LOG_CDR = "cdr_" LOG_MEDIATED_CDR = "mcd_"