diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 1acc1077c..e2a55c9e5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -483,18 +483,12 @@ func main() { server.RpcRegisterName("ScribeV1", scribeServer) } if cfg.UserServerEnabled { - userServer, err = engine.NewUserMap(accountDb) + userServer, err = engine.NewUserMap(accountDb, cfg.UserServerIndexes) if err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) exitChan <- true } server.RpcRegisterName("UsersV1", userServer) - if len(cfg.UserServerIndexes) != 0 { - var s string - if err := userServer.AddIndex(cfg.UserServerIndexes, &s); err != nil { - engine.Logger.Err(fmt.Sprintf("Error adding %v indexes to user profile service: %v", cfg.UserServerIndexes, err)) - } - } } // Register session manager service // FixMe: make sure this is thread safe diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 268bdbbf9..0aa4f8cd6 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -70,6 +70,7 @@ var ( historyServer = flag.String("history_server", cgrConfig.RPCGOBListen, "The history server address:port, empty to disable automaticautomatic history archiving") raterAddress = flag.String("rater_address", cgrConfig.RPCGOBListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") cdrstatsAddress = flag.String("cdrstats_address", cgrConfig.RPCGOBListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads") + usersAddress = flag.String("users_address", cgrConfig.RPCGOBListen, "Users service to contact for data reloads, empty to disable automatic data reloads") runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields") ) @@ -83,7 +84,7 @@ func main() { var ratingDb engine.RatingStorage var accountDb engine.AccountingStorage var storDb engine.LoadStorage - var rater, cdrstats *rpc.Client + var rater, cdrstats, users *rpc.Client var loader engine.LoadReader // Init necessary db connections, only if not already if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb @@ -206,6 +207,19 @@ func main() { } else { log.Print("WARNING: CDRStats automatic data reload is disabled!") } + if *usersAddress != "" { // Init connection to rater so we can reload it's data + if *usersAddress == *raterAddress { + users = rater + } else { + users, err = rpc.Dial("tcp", *usersAddress) + if err != nil { + log.Fatalf("Could not connect to Users API: %s", err.Error()) + return + } + } + } else { + log.Print("WARNING: Users automatic data reload is disabled!") + } // write maps to database if err := tpReader.WriteToDatabase(*flush, *verbose); err != nil { @@ -272,4 +286,14 @@ func main() { } } } + + if users != nil { + if *verbose { + log.Print("Reloading Users data") + } + var reply string + if err := cdrstats.Call("UsersV1.ReloadUsers", "", &reply); err != nil { + log.Printf("WARNING: Failed reloading users data, error: %s\n", err.Error()) + } + } } diff --git a/engine/users.go b/engine/users.go index 4c06d3824..3c3cc22ec 100644 --- a/engine/users.go +++ b/engine/users.go @@ -1,6 +1,7 @@ package engine import ( + "fmt" "sort" "strings" "sync" @@ -55,6 +56,7 @@ type UserService interface { GetUsers(UserProfile, *UserProfiles) error AddIndex([]string, *string) error GetIndexes(string, *map[string][]string) error + ReloadUsers(string, *string) error } type UserMap struct { @@ -65,25 +67,45 @@ type UserMap struct { mu sync.RWMutex } -func NewUserMap(accountingDb AccountingStorage) (*UserMap, error) { - um := newUserMap(accountingDb) +func NewUserMap(accountingDb AccountingStorage, indexes []string) (*UserMap, error) { + um := newUserMap(accountingDb, indexes) + var reply string + if err := um.ReloadUsers("", &reply); err != nil { + return nil, err + } + return um, nil +} + +func newUserMap(accountingDb AccountingStorage, indexes []string) *UserMap { + return &UserMap{ + table: make(map[string]map[string]string), + index: make(map[string]map[string]bool), + indexKeys: indexes, + accountingDb: accountingDb, + } +} + +func (um *UserMap) ReloadUsers(in string, reply *string) error { + um.mu.Lock() + defer um.mu.Unlock() // load from rating db if ups, err := um.accountingDb.GetUsers(); err == nil { for _, up := range ups { um.table[up.GetId()] = up.Profile } } else { - return nil, err + *reply = err.Error() + return err } - return um, nil -} -func newUserMap(accountingDb AccountingStorage) *UserMap { - return &UserMap{ - table: make(map[string]map[string]string), - index: make(map[string]map[string]bool), - accountingDb: accountingDb, + if len(um.indexKeys) != 0 { + var s string + if err := um.AddIndex(um.indexKeys, &s); err != nil { + Logger.Err(fmt.Sprintf("Error adding %v indexes to user profile service: %v", um.indexKeys, err)) + } } + *reply = utils.OK + return nil } func (um *UserMap) SetUser(up UserProfile, reply *string) error { @@ -94,7 +116,7 @@ func (um *UserMap) SetUser(up UserProfile, reply *string) error { return err } um.table[up.GetId()] = up.Profile - um.addIndex(&up) + um.addIndex(&up, um.indexKeys) *reply = utils.OK return nil } @@ -146,7 +168,7 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error { } um.table[up.GetId()] = m um.deleteIndex(oldUp) - um.addIndex(finalUp) + um.addIndex(finalUp, um.indexKeys) *reply = utils.OK return nil } @@ -235,19 +257,19 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error { func (um *UserMap) AddIndex(indexes []string, reply *string) error { um.mu.Lock() defer um.mu.Unlock() - um.indexKeys = indexes + um.indexKeys = append(um.indexKeys, indexes...) for key, values := range um.table { up := &UserProfile{Profile: values} up.SetId(key) - um.addIndex(up) + um.addIndex(up, indexes) } *reply = utils.OK return nil } -func (um *UserMap) addIndex(up *UserProfile) { +func (um *UserMap) addIndex(up *UserProfile, indexes []string) { key := up.GetId() - for _, index := range um.indexKeys { + for _, index := range indexes { if index == "Tenant" { if up.Tenant != "" { indexKey := utils.ConcatenatedKey(index, up.Tenant) @@ -369,6 +391,10 @@ func (ps *ProxyUserService) GetIndexes(in string, reply *map[string][]string) er return ps.Client.Call("UsersV1.AddIndex", in, reply) } +func (ps *ProxyUserService) ReloadUsers(in string, reply *string) error { + return ps.Client.Call("UsersV1.ReloadUsers", in, reply) +} + // extraFields - Field name in the interface containing extraFields information func LoadUserProfile(in interface{}, extraFields string) (interface{}, error) { if userService == nil { // no user service => no fun diff --git a/engine/users_test.go b/engine/users_test.go index 2cfcc4f11..42fd3d549 100644 --- a/engine/users_test.go +++ b/engine/users_test.go @@ -19,7 +19,7 @@ var testMap = UserMap{ } func TestUsersAdd(t *testing.T) { - tm := newUserMap(accountingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -40,7 +40,7 @@ func TestUsersAdd(t *testing.T) { } func TestUsersUpdate(t *testing.T) { - tm := newUserMap(accountingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -71,7 +71,7 @@ func TestUsersUpdate(t *testing.T) { } func TestUsersUpdateNotFound(t *testing.T) { - tm := newUserMap(accountingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -89,7 +89,7 @@ func TestUsersUpdateNotFound(t *testing.T) { } func TestUsersUpdateInit(t *testing.T) { - tm := newUserMap(accountingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -115,7 +115,7 @@ func TestUsersUpdateInit(t *testing.T) { } func TestUsersRemove(t *testing.T) { - tm := newUserMap(accountingStorage) + tm := newUserMap(accountingStorage, nil) var r string up := UserProfile{ Tenant: "test", @@ -445,7 +445,7 @@ func TestUsersGetMissingIdTwoINdex(t *testing.T) { } func TestUsersAddUpdateRemoveIndexes(t *testing.T) { - tm := newUserMap(accountingStorage) + tm := newUserMap(accountingStorage, nil) var r string tm.AddIndex([]string{"t"}, &r) if len(tm.index) != 0 {