fix for users csv load

This commit is contained in:
Radu Ioan Fericean
2015-07-31 16:44:25 +03:00
parent aa3730d0cf
commit a09e24d5da
4 changed files with 74 additions and 30 deletions

View File

@@ -483,18 +483,12 @@ func main() {
server.RpcRegisterName("ScribeV1", scribeServer)
}
if cfg.UserServerEnabled {
userServer, err = engine.NewUserMap(accountDb)
userServer, err = engine.NewUserMap(accountDb, cfg.UserServerIndexes)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<UsersService> Could not start, error: %s", err.Error()))
exitChan <- true
}
server.RpcRegisterName("UsersV1", userServer)
if len(cfg.UserServerIndexes) != 0 {
var s string
if err := userServer.AddIndex(cfg.UserServerIndexes, &s); err != nil {
engine.Logger.Err(fmt.Sprintf("Error adding %v indexes to user profile service: %v", cfg.UserServerIndexes, err))
}
}
}
// Register session manager service // FixMe: make sure this is thread safe

View File

@@ -70,6 +70,7 @@ var (
historyServer = flag.String("history_server", cgrConfig.RPCGOBListen, "The history server address:port, empty to disable automaticautomatic history archiving")
raterAddress = flag.String("rater_address", cgrConfig.RPCGOBListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads")
cdrstatsAddress = flag.String("cdrstats_address", cgrConfig.RPCGOBListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads")
usersAddress = flag.String("users_address", cgrConfig.RPCGOBListen, "Users service to contact for data reloads, empty to disable automatic data reloads")
runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields")
)
@@ -83,7 +84,7 @@ func main() {
var ratingDb engine.RatingStorage
var accountDb engine.AccountingStorage
var storDb engine.LoadStorage
var rater, cdrstats *rpc.Client
var rater, cdrstats, users *rpc.Client
var loader engine.LoadReader
// Init necessary db connections, only if not already
if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb
@@ -206,6 +207,19 @@ func main() {
} else {
log.Print("WARNING: CDRStats automatic data reload is disabled!")
}
if *usersAddress != "" { // Init connection to rater so we can reload it's data
if *usersAddress == *raterAddress {
users = rater
} else {
users, err = rpc.Dial("tcp", *usersAddress)
if err != nil {
log.Fatalf("Could not connect to Users API: %s", err.Error())
return
}
}
} else {
log.Print("WARNING: Users automatic data reload is disabled!")
}
// write maps to database
if err := tpReader.WriteToDatabase(*flush, *verbose); err != nil {
@@ -272,4 +286,14 @@ func main() {
}
}
}
if users != nil {
if *verbose {
log.Print("Reloading Users data")
}
var reply string
if err := cdrstats.Call("UsersV1.ReloadUsers", "", &reply); err != nil {
log.Printf("WARNING: Failed reloading users data, error: %s\n", err.Error())
}
}
}

View File

@@ -1,6 +1,7 @@
package engine
import (
"fmt"
"sort"
"strings"
"sync"
@@ -55,6 +56,7 @@ type UserService interface {
GetUsers(UserProfile, *UserProfiles) error
AddIndex([]string, *string) error
GetIndexes(string, *map[string][]string) error
ReloadUsers(string, *string) error
}
type UserMap struct {
@@ -65,25 +67,45 @@ type UserMap struct {
mu sync.RWMutex
}
func NewUserMap(accountingDb AccountingStorage) (*UserMap, error) {
um := newUserMap(accountingDb)
func NewUserMap(accountingDb AccountingStorage, indexes []string) (*UserMap, error) {
um := newUserMap(accountingDb, indexes)
var reply string
if err := um.ReloadUsers("", &reply); err != nil {
return nil, err
}
return um, nil
}
func newUserMap(accountingDb AccountingStorage, indexes []string) *UserMap {
return &UserMap{
table: make(map[string]map[string]string),
index: make(map[string]map[string]bool),
indexKeys: indexes,
accountingDb: accountingDb,
}
}
func (um *UserMap) ReloadUsers(in string, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
// load from rating db
if ups, err := um.accountingDb.GetUsers(); err == nil {
for _, up := range ups {
um.table[up.GetId()] = up.Profile
}
} else {
return nil, err
*reply = err.Error()
return err
}
return um, nil
}
func newUserMap(accountingDb AccountingStorage) *UserMap {
return &UserMap{
table: make(map[string]map[string]string),
index: make(map[string]map[string]bool),
accountingDb: accountingDb,
if len(um.indexKeys) != 0 {
var s string
if err := um.AddIndex(um.indexKeys, &s); err != nil {
Logger.Err(fmt.Sprintf("Error adding %v indexes to user profile service: %v", um.indexKeys, err))
}
}
*reply = utils.OK
return nil
}
func (um *UserMap) SetUser(up UserProfile, reply *string) error {
@@ -94,7 +116,7 @@ func (um *UserMap) SetUser(up UserProfile, reply *string) error {
return err
}
um.table[up.GetId()] = up.Profile
um.addIndex(&up)
um.addIndex(&up, um.indexKeys)
*reply = utils.OK
return nil
}
@@ -146,7 +168,7 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error {
}
um.table[up.GetId()] = m
um.deleteIndex(oldUp)
um.addIndex(finalUp)
um.addIndex(finalUp, um.indexKeys)
*reply = utils.OK
return nil
}
@@ -235,19 +257,19 @@ func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error {
func (um *UserMap) AddIndex(indexes []string, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
um.indexKeys = indexes
um.indexKeys = append(um.indexKeys, indexes...)
for key, values := range um.table {
up := &UserProfile{Profile: values}
up.SetId(key)
um.addIndex(up)
um.addIndex(up, indexes)
}
*reply = utils.OK
return nil
}
func (um *UserMap) addIndex(up *UserProfile) {
func (um *UserMap) addIndex(up *UserProfile, indexes []string) {
key := up.GetId()
for _, index := range um.indexKeys {
for _, index := range indexes {
if index == "Tenant" {
if up.Tenant != "" {
indexKey := utils.ConcatenatedKey(index, up.Tenant)
@@ -369,6 +391,10 @@ func (ps *ProxyUserService) GetIndexes(in string, reply *map[string][]string) er
return ps.Client.Call("UsersV1.AddIndex", in, reply)
}
func (ps *ProxyUserService) ReloadUsers(in string, reply *string) error {
return ps.Client.Call("UsersV1.ReloadUsers", in, reply)
}
// extraFields - Field name in the interface containing extraFields information
func LoadUserProfile(in interface{}, extraFields string) (interface{}, error) {
if userService == nil { // no user service => no fun

View File

@@ -19,7 +19,7 @@ var testMap = UserMap{
}
func TestUsersAdd(t *testing.T) {
tm := newUserMap(accountingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -40,7 +40,7 @@ func TestUsersAdd(t *testing.T) {
}
func TestUsersUpdate(t *testing.T) {
tm := newUserMap(accountingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -71,7 +71,7 @@ func TestUsersUpdate(t *testing.T) {
}
func TestUsersUpdateNotFound(t *testing.T) {
tm := newUserMap(accountingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -89,7 +89,7 @@ func TestUsersUpdateNotFound(t *testing.T) {
}
func TestUsersUpdateInit(t *testing.T) {
tm := newUserMap(accountingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -115,7 +115,7 @@ func TestUsersUpdateInit(t *testing.T) {
}
func TestUsersRemove(t *testing.T) {
tm := newUserMap(accountingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
Tenant: "test",
@@ -445,7 +445,7 @@ func TestUsersGetMissingIdTwoINdex(t *testing.T) {
}
func TestUsersAddUpdateRemoveIndexes(t *testing.T) {
tm := newUserMap(accountingStorage)
tm := newUserMap(accountingStorage, nil)
var r string
tm.AddIndex([]string{"t"}, &r)
if len(tm.index) != 0 {