removed all proxy classes

This commit is contained in:
Radu Ioan Fericean
2015-11-26 23:02:03 +02:00
parent f68dc3fcfd
commit 16e0eba5bd
24 changed files with 340 additions and 393 deletions

View File

@@ -20,6 +20,7 @@ package v1
import (
"errors"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -48,8 +49,8 @@ func (self *ApierV1) AddRatingSubjectAliases(attrs AttrAddRatingSubjectAliases,
}
var ignr string
for _, alias := range attrs.Aliases {
if err := aliases.SetAlias(
engine.Alias{Direction: utils.META_OUT, Tenant: attrs.Tenant, Category: attrs.Category, Account: alias, Subject: alias, Context: utils.ALIAS_CONTEXT_RATING,
if err := aliases.Call("AliasesV1.SetAlias",
&engine.Alias{Direction: utils.META_OUT, Tenant: attrs.Tenant, Category: attrs.Category, Account: alias, Subject: alias, Context: utils.ALIAS_CONTEXT_RATING,
Values: engine.AliasValues{&engine.AliasValue{DestinationId: utils.META_ANY,
Pairs: engine.AliasPairs{"Subject": map[string]string{alias: attrs.Subject}}, Weight: 10.0}}}, &ignr); err != nil {
return utils.NewErrServerError(err)
@@ -69,7 +70,7 @@ func (self *ApierV1) RemRatingSubjectAliases(tenantRatingSubject engine.TenantRa
return errors.New("ALIASES_NOT_ENABLED")
}
var reverseAliases map[string][]*engine.Alias
if err := aliases.GetReverseAlias(engine.AttrReverseAlias{Target: "Subject", Alias: tenantRatingSubject.Subject, Context: utils.ALIAS_CONTEXT_RATING}, &reverseAliases); err != nil {
if err := aliases.Call("AliasesV1.GetReverseAlias", &engine.AttrReverseAlias{Target: "Subject", Alias: tenantRatingSubject.Subject, Context: utils.ALIAS_CONTEXT_RATING}, &reverseAliases); err != nil {
return utils.NewErrServerError(err)
}
var ignr string
@@ -78,7 +79,7 @@ func (self *ApierV1) RemRatingSubjectAliases(tenantRatingSubject engine.TenantRa
if alias.Tenant != tenantRatingSubject.Tenant {
continue // From another tenant
}
if err := aliases.RemoveAlias(*alias, &ignr); err != nil {
if err := aliases.Call("AliasesV1.RemoveAlias", alias, &ignr); err != nil {
return utils.NewErrServerError(err)
}
}
@@ -100,8 +101,8 @@ func (self *ApierV1) AddAccountAliases(attrs AttrAddAccountAliases, reply *strin
}
var ignr string
for _, alias := range attrs.Aliases {
if err := aliases.SetAlias(
engine.Alias{Direction: utils.META_OUT, Tenant: attrs.Tenant, Category: attrs.Category, Account: alias, Subject: alias, Context: utils.ALIAS_CONTEXT_RATING,
if err := aliases.Call("AliasesV1.SetAlias",
&engine.Alias{Direction: utils.META_OUT, Tenant: attrs.Tenant, Category: attrs.Category, Account: alias, Subject: alias, Context: utils.ALIAS_CONTEXT_RATING,
Values: engine.AliasValues{&engine.AliasValue{DestinationId: utils.META_ANY,
Pairs: engine.AliasPairs{"Account": map[string]string{alias: attrs.Account}}, Weight: 10.0}}}, &ignr); err != nil {
return utils.NewErrServerError(err)
@@ -121,7 +122,7 @@ func (self *ApierV1) RemAccountAliases(tenantAccount engine.TenantAccount, reply
return errors.New("ALIASES_NOT_ENABLED")
}
var reverseAliases map[string][]*engine.Alias
if err := aliases.GetReverseAlias(engine.AttrReverseAlias{Target: "Account", Alias: tenantAccount.Account, Context: utils.ALIAS_CONTEXT_RATING}, &reverseAliases); err != nil {
if err := aliases.Call("AliasesV1.GetReverseAlias", &engine.AttrReverseAlias{Target: "Account", Alias: tenantAccount.Account, Context: utils.ALIAS_CONTEXT_RATING}, &reverseAliases); err != nil {
return utils.NewErrServerError(err)
}
var ignr string
@@ -130,7 +131,7 @@ func (self *ApierV1) RemAccountAliases(tenantAccount engine.TenantAccount, reply
if alias.Tenant != tenantAccount.Tenant {
continue // From another tenant
}
if err := aliases.RemoveAlias(*alias, &ignr); err != nil {
if err := aliases.Call("AliasesV1.RemoveAlias", alias, &ignr); err != nil {
return utils.NewErrServerError(err)
}
}

View File

@@ -33,6 +33,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
const (
@@ -48,8 +49,8 @@ type ApierV1 struct {
Sched *scheduler.Scheduler
Config *config.CGRConfig
Responder *engine.Responder
CdrStatsSrv engine.StatsInterface
Users engine.UserService
CdrStatsSrv rpcclient.RpcClientConnection
Users rpcclient.RpcClientConnection
}
func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error {
@@ -524,13 +525,13 @@ func (self *ApierV1) LoadTariffPlanFromStorDb(attrs AttrLoadTpFromStorDb, reply
}
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
if err := self.CdrStatsSrv.ReloadQueues(cstKeys, nil); err != nil {
if err := self.CdrStatsSrv.Call("CDRStatsV1.ReloadQueues", cstKeys, nil); err != nil {
return err
}
}
if len(userKeys) != 0 && self.Users != nil {
var r string
if err := self.Users.ReloadUsers("", &r); err != nil {
if err := self.Users.Call("AliasV1.ReloadUsers", "", &r); err != nil {
return err
}
}
@@ -1036,14 +1037,14 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach
cs.Aliases = cache2go.CountEntries(utils.ALIASES_PREFIX)
if self.CdrStatsSrv != nil && self.Config.CDRStatsEnabled {
var queueIds []string
if err := self.CdrStatsSrv.GetQueueIds(0, &queueIds); err != nil {
if err := self.CdrStatsSrv.Call("CDRStatsV1.GetQueueIds", 0, &queueIds); err != nil {
return utils.NewErrServerError(err)
}
cs.CdrStats = len(queueIds)
}
if self.Config.RaterUserServer == utils.INTERNAL {
var ups engine.UserProfiles
if err := self.Users.GetUsers(engine.UserProfile{}, &ups); err != nil {
if err := self.Users.Call("UsersV1.GetUsers", &engine.UserProfile{}, &ups); err != nil {
return utils.NewErrServerError(err)
}
cs.Users = len(ups)
@@ -1187,13 +1188,13 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
self.Sched.Restart()
}
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
if err := self.CdrStatsSrv.ReloadQueues(cstKeys, nil); err != nil {
if err := self.CdrStatsSrv.Call("CDRStatsV1.ReloadQueues", cstKeys, nil); err != nil {
return err
}
}
if len(userKeys) != 0 && self.Users != nil {
var r string
if err := self.Users.ReloadUsers("", &r); err != nil {
if err := self.Users.Call("UsersV1.ReloadUsers", "", &r); err != nil {
return err
}
}

View File

@@ -23,11 +23,12 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// Interact with Stats server
type CDRStatsV1 struct {
CdrStats engine.StatsInterface
CdrStats rpcclient.RpcClientConnection
}
type AttrGetMetrics struct {
@@ -38,23 +39,23 @@ func (sts *CDRStatsV1) GetMetrics(attr AttrGetMetrics, reply *map[string]float64
if len(attr.StatsQueueId) == 0 {
return fmt.Errorf("%s:StatsQueueId", utils.ErrMandatoryIeMissing.Error())
}
return sts.CdrStats.GetValues(attr.StatsQueueId, reply)
return sts.CdrStats.Call("CDRStatsV1.GetValues", attr.StatsQueueId, reply)
}
func (sts *CDRStatsV1) GetQueueIds(empty string, reply *[]string) error {
return sts.CdrStats.GetQueueIds(0, reply)
return sts.CdrStats.Call("CDRStatsV1.GetQueueIds", 0, reply)
}
func (sts *CDRStatsV1) GetQueue(id string, sq *engine.StatsQueue) error {
return sts.CdrStats.GetQueue(id, sq)
return sts.CdrStats.Call("CDRStatsV1.GetQueue", id, sq)
}
func (sts *CDRStatsV1) GetQueueTriggers(id string, ats *engine.ActionTriggers) error {
return sts.CdrStats.GetQueueTriggers(id, ats)
return sts.CdrStats.Call("CDRStatsV1.GetQueueTriggers", id, ats)
}
func (sts *CDRStatsV1) ReloadQueues(attr utils.AttrCDRStatsReloadQueues, reply *string) error {
if err := sts.CdrStats.ReloadQueues(attr.StatsQueueIds, nil); err != nil {
if err := sts.CdrStats.Call("CDRStatsV1.ReloadQueues", attr.StatsQueueIds, nil); err != nil {
return err
}
*reply = utils.OK
@@ -62,7 +63,7 @@ func (sts *CDRStatsV1) ReloadQueues(attr utils.AttrCDRStatsReloadQueues, reply *
}
func (sts *CDRStatsV1) ResetQueues(attr utils.AttrCDRStatsReloadQueues, reply *string) error {
if err := sts.CdrStats.ResetQueues(attr.StatsQueueIds, nil); err != nil {
if err := sts.CdrStats.Call("CDRStatsV1.ResetQueues", attr.StatsQueueIds, nil); err != nil {
return err
}
*reply = utils.OK

View File

@@ -253,14 +253,14 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
self.Sched.Restart()
}
if len(cstKeys) != 0 && self.CdrStatsSrv != nil {
if err := self.CdrStatsSrv.ReloadQueues(cstKeys, nil); err != nil {
if err := self.CdrStatsSrv.Call("CDRStatsV1.ReloadQueues", cstKeys, nil); err != nil {
return err
}
}
if len(userKeys) != 0 && self.Users != nil {
var r string
if err := self.Users.ReloadUsers("", &r); err != nil {
if err := self.Users.Call("UsersV1.ReloadUsers", "", &r); err != nil {
return err
}
}

View File

@@ -372,9 +372,9 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
}
func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, cdrDb engine.CdrStorage,
internalRaterChan chan *engine.Responder, internalPubSubSChan chan engine.PublisherSubscriber,
internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService,
internalCdrStatSChan chan engine.StatsInterface, server *utils.Server, exitChan chan bool) {
internalRaterChan chan *engine.Responder, internalPubSubSChan chan rpcclient.RpcClientConnection,
internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection,
internalCdrStatSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS CDRS service.")
var err error
var client *rpcclient.RpcClient
@@ -394,14 +394,14 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
raterConn = client
}
// Pubsub connection init
var pubSubConn engine.PublisherSubscriber
var pubSubConn rpcclient.RpcClientConnection
if cfg.CDRSPubSub == utils.INTERNAL {
pubSubs := <-internalPubSubSChan
pubSubConn = pubSubs
internalPubSubSChan <- pubSubs
} else if len(cfg.CDRSPubSub) != 0 {
if cfg.CDRSRater == cfg.CDRSPubSub {
pubSubConn = &engine.ProxyPubSub{Client: client}
pubSubConn = client
} else {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSPubSub, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
@@ -409,18 +409,18 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
exitChan <- true
return
}
pubSubConn = &engine.ProxyPubSub{Client: client}
pubSubConn = client
}
}
// Users connection init
var usersConn engine.UserService
var usersConn rpcclient.RpcClientConnection
if cfg.CDRSUsers == utils.INTERNAL {
userS := <-internalUserSChan
usersConn = userS
internalUserSChan <- userS
} else if len(cfg.CDRSUsers) != 0 {
if cfg.CDRSRater == cfg.CDRSUsers {
usersConn = &engine.ProxyUserService{Client: client}
usersConn = client
} else {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSUsers, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
@@ -428,18 +428,18 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
exitChan <- true
return
}
usersConn = &engine.ProxyUserService{Client: client}
usersConn = client
}
}
// Aliases connection init
var aliasesConn engine.AliasService
var aliasesConn rpcclient.RpcClientConnection
if cfg.CDRSAliases == utils.INTERNAL {
aliaseS := <-internalAliaseSChan
aliasesConn = aliaseS
internalAliaseSChan <- aliaseS
} else if len(cfg.CDRSAliases) != 0 {
if cfg.CDRSRater == cfg.CDRSAliases {
aliasesConn = &engine.ProxyAliasService{Client: client}
aliasesConn = client
} else {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSAliases, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
@@ -447,18 +447,18 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
exitChan <- true
return
}
aliasesConn = &engine.ProxyAliasService{Client: client}
aliasesConn = client
}
}
// Stats connection init
var statsConn engine.StatsInterface
var statsConn rpcclient.RpcClientConnection
if cfg.CDRSStats == utils.INTERNAL {
statS := <-internalCdrStatSChan
statsConn = statS
internalCdrStatSChan <- statS
} else if len(cfg.CDRSStats) != 0 {
if cfg.CDRSRater == cfg.CDRSStats {
statsConn = &engine.ProxyStats{Client: client}
statsConn = client
} else {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSStats, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
@@ -466,7 +466,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
exitChan <- true
return
}
statsConn = &engine.ProxyStats{Client: client}
statsConn = client
}
}
@@ -495,31 +495,31 @@ func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, ratingDb en
exitChan <- true // Should not get out of loop though
}
func startCdrStats(internalCdrStatSChan chan engine.StatsInterface, ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, server *utils.Server) {
func startCdrStats(internalCdrStatSChan chan rpcclient.RpcClientConnection, ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, server *utils.Server) {
cdrStats := engine.NewStats(ratingDb, accountDb, cfg.CDRStatsSaveInterval)
server.RpcRegister(cdrStats)
server.RpcRegister(&v1.CDRStatsV1{CdrStats: cdrStats}) // Public APIs
internalCdrStatSChan <- cdrStats
}
func startHistoryServer(internalHistorySChan chan history.Scribe, server *utils.Server, exitChan chan bool) {
func startHistoryServer(internalHistorySChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
scribeServer, err := history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<HistoryServer> Could not start, error: %s", err.Error()))
exitChan <- true
}
server.RpcRegisterName("ScribeV1", scribeServer)
server.RpcRegisterName("HistoryV1", scribeServer)
internalHistorySChan <- scribeServer
}
func startPubSubServer(internalPubSubSChan chan engine.PublisherSubscriber, accountDb engine.AccountingStorage, server *utils.Server) {
func startPubSubServer(internalPubSubSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server) {
pubSubServer := engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify)
server.RpcRegisterName("PubSubV1", pubSubServer)
internalPubSubSChan <- pubSubServer
}
// ToDo: Make sure we are caching before starting this one
func startAliasesServer(internalAliaseSChan chan engine.AliasService, accountDb engine.AccountingStorage, server *utils.Server, exitChan chan bool) {
func startAliasesServer(internalAliaseSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server, exitChan chan bool) {
aliasesServer := engine.NewAliasHandler(accountDb)
server.RpcRegisterName("AliasesV1", aliasesServer)
if err := accountDb.CacheAccountingPrefixes(utils.ALIASES_PREFIX); err != nil {
@@ -530,7 +530,7 @@ func startAliasesServer(internalAliaseSChan chan engine.AliasService, accountDb
internalAliaseSChan <- aliasesServer
}
func startUsersServer(internalUserSChan chan engine.UserService, accountDb engine.AccountingStorage, server *utils.Server, exitChan chan bool) {
func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server, exitChan chan bool) {
userServer, err := engine.NewUserMap(accountDb, cfg.UserServerIndexes)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<UsersService> Could not start, error: %s", err.Error()))
@@ -543,11 +543,11 @@ func startUsersServer(internalUserSChan chan engine.UserService, accountDb engin
func startRpc(server *utils.Server, internalRaterChan chan *engine.Responder,
internalCdrSChan chan *engine.CdrServer,
internalCdrStatSChan chan engine.StatsInterface,
internalHistorySChan chan history.Scribe,
internalPubSubSChan chan engine.PublisherSubscriber,
internalUserSChan chan engine.UserService,
internalAliaseSChan chan engine.AliasService) {
internalCdrStatSChan chan rpcclient.RpcClientConnection,
internalHistorySChan chan rpcclient.RpcClientConnection,
internalPubSubSChan chan rpcclient.RpcClientConnection,
internalUserSChan chan rpcclient.RpcClientConnection,
internalAliaseSChan chan rpcclient.RpcClientConnection) {
select { // Any of the rpc methods will unlock listening to rpc requests
case resp := <-internalRaterChan:
internalRaterChan <- resp
@@ -668,11 +668,11 @@ func main() {
internalRaterChan := make(chan *engine.Responder, 1)
internalSchedulerChan := make(chan *scheduler.Scheduler, 1)
internalCdrSChan := make(chan *engine.CdrServer, 1)
internalCdrStatSChan := make(chan engine.StatsInterface, 1)
internalHistorySChan := make(chan history.Scribe, 1)
internalPubSubSChan := make(chan engine.PublisherSubscriber, 1)
internalUserSChan := make(chan engine.UserService, 1)
internalAliaseSChan := make(chan engine.AliasService, 1)
internalCdrStatSChan := make(chan rpcclient.RpcClientConnection, 1)
internalHistorySChan := make(chan rpcclient.RpcClientConnection, 1)
internalPubSubSChan := make(chan rpcclient.RpcClientConnection, 1)
internalUserSChan := make(chan rpcclient.RpcClientConnection, 1)
internalAliaseSChan := make(chan rpcclient.RpcClientConnection, 1)
internalSMGChan := make(chan rpcclient.RpcClientConnection, 1)
// Start balancer service
if cfg.BalancerEnabled {

View File

@@ -26,9 +26,9 @@ import (
"github.com/cgrates/cgrates/apier/v2"
"github.com/cgrates/cgrates/balancer2go"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/history"
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled *bool, exitChan chan bool) {
@@ -40,8 +40,8 @@ func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled
// Starts rater and reports on chan
func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler,
internalCdrStatSChan chan engine.StatsInterface, internalHistorySChan chan history.Scribe,
internalPubSubSChan chan engine.PublisherSubscriber, internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService,
internalCdrStatSChan chan rpcclient.RpcClientConnection, internalHistorySChan chan rpcclient.RpcClientConnection,
internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection,
server *utils.Server,
ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, logDb engine.LogStorage,
stopHandled *bool, exitChan chan bool) {
@@ -109,7 +109,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
}
// Connection to CDRStats
var cdrStats engine.StatsInterface
var cdrStats rpcclient.RpcClientConnection
if cfg.RaterCdrStats != "" {
cdrstatTaskChan := make(chan struct{})
waitTasks = append(waitTasks, cdrstatTaskChan)
@@ -124,7 +124,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
exitChan <- true
return
}
} else if cdrStats, err = engine.NewProxyStats(cfg.RaterCdrStats, cfg.ConnectAttempts, -1); err != nil {
} else if cdrStats, err = rpcclient.NewRpcClient("tcp", cfg.RaterCdrStats, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect to cdrstats, error: %s", err.Error()))
exitChan <- true
return
@@ -138,7 +138,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
waitTasks = append(waitTasks, histTaskChan)
go func() {
defer close(histTaskChan)
var scribeServer history.Scribe
var scribeServer rpcclient.RpcClientConnection
if cfg.RaterHistoryServer == utils.INTERNAL {
select {
case scribeServer = <-internalHistorySChan:
@@ -148,7 +148,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
exitChan <- true
return
}
} else if scribeServer, err = history.NewProxyScribe(cfg.RaterHistoryServer, cfg.ConnectAttempts, -1); err != nil {
} else if scribeServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterHistoryServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect historys, error: %s", err.Error()))
exitChan <- true
return
@@ -163,7 +163,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
waitTasks = append(waitTasks, pubsubTaskChan)
go func() {
defer close(pubsubTaskChan)
var pubSubServer engine.PublisherSubscriber
var pubSubServer rpcclient.RpcClientConnection
if cfg.RaterPubSubServer == utils.INTERNAL {
select {
case pubSubServer = <-internalPubSubSChan:
@@ -173,7 +173,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
exitChan <- true
return
}
} else if pubSubServer, err = engine.NewProxyPubSub(cfg.RaterPubSubServer, cfg.ConnectAttempts, -1); err != nil {
} else if pubSubServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterPubSubServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect to pubsubs: %s", err.Error()))
exitChan <- true
return
@@ -188,7 +188,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
waitTasks = append(waitTasks, aliasesTaskChan)
go func() {
defer close(aliasesTaskChan)
var aliasesServer engine.AliasService
var aliasesServer rpcclient.RpcClientConnection
if cfg.RaterAliasesServer == utils.INTERNAL {
select {
case aliasesServer = <-internalAliaseSChan:
@@ -198,7 +198,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
exitChan <- true
return
}
} else if aliasesServer, err = engine.NewProxyAliasService(cfg.RaterAliasesServer, cfg.ConnectAttempts, -1); err != nil {
} else if aliasesServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterAliasesServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect to aliases, error: %s", err.Error()))
exitChan <- true
return
@@ -208,7 +208,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
}
// Connection to UserService
var userServer engine.UserService
var userServer rpcclient.RpcClientConnection
if cfg.RaterUserServer != "" {
usersTaskChan := make(chan struct{})
waitTasks = append(waitTasks, usersTaskChan)
@@ -223,7 +223,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
exitChan <- true
return
}
} else if userServer, err = engine.NewProxyUserService(cfg.RaterUserServer, cfg.ConnectAttempts, -1); err != nil {
} else if userServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterUserServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil {
utils.Logger.Crit(fmt.Sprintf("<Rater> Could not connect users, error: %s", err.Error()))
exitChan <- true
return

View File

@@ -29,6 +29,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
/*
@@ -43,7 +44,7 @@ func stopBalancerSignalHandler(bal *balancer2go.Balancer, exitChan chan bool) {
exitChan <- true
}
func generalSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exitChan chan bool) {
func generalSignalHandler(internalCdrStatSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
@@ -52,7 +53,7 @@ func generalSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exitC
var dummyInt int
select {
case cdrStats := <-internalCdrStatSChan:
cdrStats.Stop(dummyInt, &dummyInt)
cdrStats.Call("CDRStatsV1.Stop", dummyInt, &dummyInt)
default:
}
@@ -62,7 +63,7 @@ func generalSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exitC
/*
Listens for the SIGTERM, SIGINT, SIGQUIT system signals and gracefuly unregister from balancer and closes the storage before exiting.
*/
func stopRaterSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exitChan chan bool) {
func stopRaterSignalHandler(internalCdrStatSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
sig := <-c
@@ -72,7 +73,7 @@ func stopRaterSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exi
var dummyInt int
select {
case cdrStats := <-internalCdrStatSChan:
cdrStats.Stop(dummyInt, &dummyInt)
cdrStats.Call("CDRStatsV1.Stop", dummyInt, &dummyInt)
default:
}
exitChan <- true

View File

@@ -12,7 +12,7 @@ import (
)
// Temporary export AliasService for the ApierV1 to be able to emulate old APIs
func GetAliasService() AliasService {
func GetAliasService() rpcclient.RpcClientConnection {
return aliasService
}
@@ -156,16 +156,6 @@ type AttrReverseAlias struct {
Context string
}
type AliasService interface {
SetAlias(Alias, *string) error
UpdateAlias(Alias, *string) error
RemoveAlias(Alias, *string) error
GetAlias(Alias, *Alias) error
GetMatchingAlias(AttrMatchingAlias, *string) error
GetReverseAlias(AttrReverseAlias, *map[string][]*Alias) error
RemoveReverseAlias(AttrReverseAlias, *string) error
}
type AliasHandler struct {
accountingDb AccountingStorage
mu sync.RWMutex
@@ -177,11 +167,11 @@ func NewAliasHandler(accountingDb AccountingStorage) *AliasHandler {
}
}
func (am *AliasHandler) SetAlias(al Alias, reply *string) error {
func (am *AliasHandler) SetAlias(al *Alias, reply *string) error {
am.mu.Lock()
defer am.mu.Unlock()
if err := am.accountingDb.SetAlias(&al); err != nil {
if err := am.accountingDb.SetAlias(al); err != nil {
*reply = err.Error()
return err
} //add to cache
@@ -194,7 +184,7 @@ func (am *AliasHandler) SetAlias(al Alias, reply *string) error {
return nil
}
func (am *AliasHandler) UpdateAlias(al Alias, reply *string) error {
func (am *AliasHandler) UpdateAlias(al *Alias, reply *string) error {
am.mu.Lock()
defer am.mu.Unlock()
// get previous value
@@ -216,7 +206,7 @@ func (am *AliasHandler) UpdateAlias(al Alias, reply *string) error {
}
}
if err := am.accountingDb.SetAlias(&al); err != nil {
if err := am.accountingDb.SetAlias(al); err != nil {
*reply = err.Error()
return err
} //add to cache
@@ -229,7 +219,7 @@ func (am *AliasHandler) UpdateAlias(al Alias, reply *string) error {
return nil
}
func (am *AliasHandler) RemoveAlias(al Alias, reply *string) error {
func (am *AliasHandler) RemoveAlias(al *Alias, reply *string) error {
am.mu.Lock()
defer am.mu.Unlock()
if err := am.accountingDb.RemoveAlias(al.GetId()); err != nil {
@@ -240,7 +230,7 @@ func (am *AliasHandler) RemoveAlias(al Alias, reply *string) error {
return nil
}
func (am *AliasHandler) RemoveReverseAlias(attr AttrReverseAlias, reply *string) error {
func (am *AliasHandler) RemoveReverseAlias(attr *AttrReverseAlias, reply *string) error {
am.mu.Lock()
defer am.mu.Unlock()
rKey := utils.REVERSE_ALIASES_PREFIX + attr.Alias + attr.Target + attr.Context
@@ -263,7 +253,7 @@ func (am *AliasHandler) RemoveReverseAlias(attr AttrReverseAlias, reply *string)
return nil
}
func (am *AliasHandler) GetAlias(al Alias, result *Alias) error {
func (am *AliasHandler) GetAlias(al *Alias, result *Alias) error {
am.mu.RLock()
defer am.mu.RUnlock()
variants := al.GenerateIds()
@@ -276,7 +266,7 @@ func (am *AliasHandler) GetAlias(al Alias, result *Alias) error {
return utils.ErrNotFound
}
func (am *AliasHandler) GetReverseAlias(attr AttrReverseAlias, result *map[string][]*Alias) error {
func (am *AliasHandler) GetReverseAlias(attr *AttrReverseAlias, result *map[string][]*Alias) error {
am.mu.Lock()
defer am.mu.Unlock()
aliases := make(map[string][]*Alias)
@@ -303,9 +293,9 @@ func (am *AliasHandler) GetReverseAlias(attr AttrReverseAlias, result *map[strin
return nil
}
func (am *AliasHandler) GetMatchingAlias(attr AttrMatchingAlias, result *string) error {
func (am *AliasHandler) GetMatchingAlias(attr *AttrMatchingAlias, result *string) error {
response := Alias{}
if err := am.GetAlias(Alias{
if err := am.GetAlias(&Alias{
Direction: attr.Direction,
Tenant: attr.Tenant,
Category: attr.Category,
@@ -357,48 +347,32 @@ func (am *AliasHandler) GetMatchingAlias(attr AttrMatchingAlias, result *string)
return utils.ErrNotFound
}
type ProxyAliasService struct {
Client *rpcclient.RpcClient
}
func NewProxyAliasService(addr string, attempts, reconnects int) (*ProxyAliasService, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil)
if err != nil {
return nil, err
func (am *AliasHandler) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return utils.ErrNotImplemented
}
// get method
method := reflect.ValueOf(am).MethodByName(parts[1])
if !method.IsValid() {
return utils.ErrNotImplemented
}
return &ProxyAliasService{Client: client}, nil
}
func (ps *ProxyAliasService) SetAlias(al Alias, reply *string) error {
return ps.Client.Call("AliasesV1.SetAlias", al, reply)
}
// construct the params
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
func (ps *ProxyAliasService) UpdateAlias(al Alias, reply *string) error {
return ps.Client.Call("AliasesV1.UpdateAlias", al, reply)
}
func (ps *ProxyAliasService) RemoveAlias(al Alias, reply *string) error {
return ps.Client.Call("AliasesV1.RemoveAlias", al, reply)
}
func (ps *ProxyAliasService) GetAlias(al Alias, alias *Alias) error {
return ps.Client.Call("AliasesV1.GetAlias", al, alias)
}
func (ps *ProxyAliasService) GetMatchingAlias(attr AttrMatchingAlias, alias *string) error {
return ps.Client.Call("AliasesV1.GetMatchingAlias", attr, alias)
}
func (ps *ProxyAliasService) GetReverseAlias(attr AttrReverseAlias, alias *map[string][]*Alias) error {
return ps.Client.Call("AliasesV1.GetReverseAlias", attr, alias)
}
func (ps *ProxyAliasService) RemoveReverseAlias(attr AttrReverseAlias, reply *string) error {
return ps.Client.Call("AliasesV1.RemoveReverseAlias", attr, reply)
}
func (ps *ProxyAliasService) ReloadAliases(in string, reply *string) error {
return ps.Client.Call("AliasesV1.ReloadAliases", in, reply)
ret := method.Call(params)
if len(ret) != 1 {
return utils.ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError
}
return err
}
func LoadAlias(attr *AttrMatchingAlias, in interface{}, extraFields string) error {
@@ -406,7 +380,7 @@ func LoadAlias(attr *AttrMatchingAlias, in interface{}, extraFields string) erro
return nil
}
response := Alias{}
if err := aliasService.GetAlias(Alias{
if err := aliasService.Call("AliasesV1.GetAlias", &Alias{
Direction: attr.Direction,
Tenant: attr.Tenant,
Category: attr.Category,

View File

@@ -12,7 +12,7 @@ func init() {
}
func TestAliasesGetAlias(t *testing.T) {
alias := Alias{}
err := aliasService.GetAlias(Alias{
err := aliasService.Call("AliasesV1.GetAlias", &Alias{
Direction: "*out",
Tenant: "cgrates.org",
Category: "call",
@@ -23,13 +23,13 @@ func TestAliasesGetAlias(t *testing.T) {
if err != nil ||
len(alias.Values) != 2 ||
len(alias.Values[0].Pairs) != 2 {
t.Error("Error getting alias: ", err, alias)
t.Error("Error getting alias: ", err, alias, alias.Values[0])
}
}
func TestAliasesGetMatchingAlias(t *testing.T) {
var response string
err := aliasService.GetMatchingAlias(AttrMatchingAlias{
err := aliasService.Call("AliasesV1.GetMatchingAlias", &AttrMatchingAlias{
Direction: "*out",
Tenant: "cgrates.org",
Category: "call",

View File

@@ -28,8 +28,8 @@ import (
"time"
"github.com/cgrates/cgrates/cache2go"
"github.com/cgrates/cgrates/history"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
const (
@@ -75,10 +75,10 @@ var (
cdrStorage CdrStorage
debitPeriod = 10 * time.Second
globalRoundingDecimals = 5
historyScribe history.Scribe
pubSubServer PublisherSubscriber
userService UserService
aliasService AliasService
historyScribe rpcclient.RpcClientConnection
pubSubServer rpcclient.RpcClientConnection
userService rpcclient.RpcClientConnection
aliasService rpcclient.RpcClientConnection
)
// Exported method to set the storage getter.
@@ -110,26 +110,26 @@ func SetCdrStorage(cStorage CdrStorage) {
}
// Exported method to set the history scribe.
func SetHistoryScribe(scribe history.Scribe) {
func SetHistoryScribe(scribe rpcclient.RpcClientConnection) {
historyScribe = scribe
}
func SetPubSub(ps PublisherSubscriber) {
func SetPubSub(ps rpcclient.RpcClientConnection) {
pubSubServer = ps
}
func SetUserService(us UserService) {
func SetUserService(us rpcclient.RpcClientConnection) {
userService = us
}
func SetAliasService(as AliasService) {
func SetAliasService(as rpcclient.RpcClientConnection) {
aliasService = as
}
func Publish(event CgrEvent) {
if pubSubServer != nil {
var s string
pubSubServer.Publish(event, &s)
pubSubServer.Call("PubSubV1.Publish", event, &s)
}
}
@@ -820,7 +820,7 @@ func (cd *CallDescriptor) GetLCRFromStorage() (*LCR, error) {
return nil, utils.ErrNotFound
}
func (cd *CallDescriptor) GetLCR(stats StatsInterface, p *utils.Paginator) (*LCRCost, error) {
func (cd *CallDescriptor) GetLCR(stats rpcclient.RpcClientConnection, p *utils.Paginator) (*LCRCost, error) {
cd.account = nil // make sure it's not cached
lcr, err := cd.GetLCRFromStorage()
if err != nil {
@@ -951,7 +951,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface, p *utils.Paginator) (*LCR
if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD {
for _, qId := range cdrStatsQueueIds {
sq := &StatsQueue{}
if err := stats.GetQueue(qId, sq); err == nil {
if err := stats.Call("CDRStatsV1.GetQueue", qId, sq); err == nil {
if sq.conf.QueueLength == 0 { //only add qeues that don't have fixed length
supplierQueues = append(supplierQueues, sq)
}
@@ -959,7 +959,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface, p *utils.Paginator) (*LCR
}
} else {
statValues := make(map[string]float64)
if err := stats.GetValues(qId, &statValues); err != nil {
if err := stats.Call("CDRStatsV1.GetValues", qId, &statValues); err != nil {
lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{
Supplier: fullSupplier,
Error: fmt.Sprintf("Get stats values for queue id %s, error %s", qId, err.Error()),

View File

@@ -66,7 +66,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
}
}
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, client rpcclient.RpcClientConnection, pubsub PublisherSubscriber, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) {
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, client rpcclient.RpcClientConnection, pubsub rpcclient.RpcClientConnection, users rpcclient.RpcClientConnection, aliases rpcclient.RpcClientConnection, stats rpcclient.RpcClientConnection) (*CdrServer, error) {
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, client: client, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{queue: make(map[string]chan bool)}}, nil
}
@@ -74,10 +74,10 @@ type CdrServer struct {
cgrCfg *config.CGRConfig
cdrDb CdrStorage
client rpcclient.RpcClientConnection
pubsub PublisherSubscriber
users UserService
aliases AliasService
stats StatsInterface
pubsub rpcclient.RpcClientConnection
users rpcclient.RpcClientConnection
aliases rpcclient.RpcClientConnection
stats rpcclient.RpcClientConnection
guard *GuardianLock
}
@@ -255,7 +255,7 @@ func (self *CdrServer) rateStoreStatsReplicate(cdr *StoredCdr) error {
}
// Attach CDR to stats
if self.stats != nil { // Send CDR to stats
if err := self.stats.AppendCDR(cdr, nil); err != nil {
if err := self.stats.Call("Stats.AppendCDR", cdr, nil); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Could not append cdr to stats: %s", err.Error()))
}
}

View File

@@ -3,11 +3,12 @@ package engine
import (
"errors"
"fmt"
"reflect"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
type SubscribeInfo struct {
@@ -28,13 +29,6 @@ func (ce CgrEvent) PassFilters(rsrFields utils.RSRFields) bool {
return true
}
type PublisherSubscriber interface {
Subscribe(SubscribeInfo, *string) error
Unsubscribe(SubscribeInfo, *string) error
Publish(CgrEvent, *string) error
ShowSubscribers(string, *map[string]*SubscriberData) error
}
type SubscriberData struct {
ExpTime time.Time
Filters utils.RSRFields
@@ -165,28 +159,30 @@ func (ps *PubSub) ShowSubscribers(in string, out *map[string]*SubscriberData) er
return nil
}
type ProxyPubSub struct {
Client *rpcclient.RpcClient
}
func NewProxyPubSub(addr string, attempts, reconnects int) (*ProxyPubSub, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil)
if err != nil {
return nil, err
func (ps *PubSub) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return utils.ErrNotImplemented
}
// get method
method := reflect.ValueOf(ps).MethodByName(parts[1])
if !method.IsValid() {
return utils.ErrNotImplemented
}
return &ProxyPubSub{Client: client}, nil
}
func (ps *ProxyPubSub) Subscribe(si SubscribeInfo, reply *string) error {
return ps.Client.Call("PubSubV1.Subscribe", si, reply)
}
func (ps *ProxyPubSub) Unsubscribe(si SubscribeInfo, reply *string) error {
return ps.Client.Call("PubSubV1.Unsubscribe", si, reply)
}
func (ps *ProxyPubSub) Publish(evt CgrEvent, reply *string) error {
return ps.Client.Call("PubSubV1.Publish", evt, reply)
}
// construct the params
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]*SubscriberData) error {
return ps.Client.Call("PubSubV1.ShowSubscribers", in, reply)
ret := method.Call(params)
if len(ret) != 1 {
return utils.ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError
}
return err
}

View File

@@ -31,6 +31,7 @@ import (
"github.com/cgrates/cgrates/cache2go"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// Individual session run
@@ -49,13 +50,13 @@ type Responder struct {
Bal *balancer2go.Balancer
ExitChan chan bool
CdrSrv *CdrServer
Stats StatsInterface
Stats rpcclient.RpcClientConnection
Timezone string
cnt int64
responseCache *cache2go.ResponseCache
}
func NewResponder(exitChan chan bool, cdrSrv *CdrServer, stats StatsInterface, timeToLive time.Duration) *Responder {
func NewResponder(exitChan chan bool, cdrSrv *CdrServer, stats rpcclient.RpcClientConnection, timeToLive time.Duration) *Responder {
return &Responder{
ExitChan: exitChan,
Stats: stats,
@@ -615,12 +616,12 @@ func (rs *Responder) UnRegisterRater(clientAddress string, replay *int) error {
}
func (rs *Responder) Call(serviceMethod string, args interface{}, reply interface{}) error {
if !strings.HasPrefix(serviceMethod, "Responder.") {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return utils.ErrNotImplemented
}
methodName := strings.TrimLeft(serviceMethod, "Responder.")
// get method
method := reflect.ValueOf(rs).MethodByName(methodName)
method := reflect.ValueOf(rs).MethodByName(parts[1])
if !method.IsValid() {
return utils.ErrNotImplemented
}
@@ -632,6 +633,9 @@ func (rs *Responder) Call(serviceMethod string, args interface{}, reply interfac
if len(ret) != 1 {
return utils.ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError

View File

@@ -282,7 +282,8 @@ func TestResponderGetLCR(t *testing.T) {
}
}
danStatsId := "dan12_stats"
rsponder.Stats.AddQueue(&CdrStats{Id: danStatsId, Supplier: []string{"dan12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, nil)
var r int
rsponder.Stats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: danStatsId, Supplier: []string{"dan12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r)
danRpfl := &RatingProfile{Id: "*out:tenant12:call:dan12",
RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{
ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC),
@@ -292,7 +293,7 @@ func TestResponderGetLCR(t *testing.T) {
}},
}
rifStatsId := "rif12_stats"
rsponder.Stats.AddQueue(&CdrStats{Id: rifStatsId, Supplier: []string{"rif12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, nil)
rsponder.Stats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: rifStatsId, Supplier: []string{"rif12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r)
rifRpfl := &RatingProfile{Id: "*out:tenant12:call:rif12",
RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{
ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC),
@@ -302,7 +303,7 @@ func TestResponderGetLCR(t *testing.T) {
}},
}
ivoStatsId := "ivo12_stats"
rsponder.Stats.AddQueue(&CdrStats{Id: ivoStatsId, Supplier: []string{"ivo12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, nil)
rsponder.Stats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: ivoStatsId, Supplier: []string{"ivo12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r)
ivoRpfl := &RatingProfile{Id: "*out:tenant12:call:ivo12",
RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{
ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC),
@@ -483,9 +484,10 @@ func TestResponderGetLCR(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", eQTLcr.SupplierCosts, lcrQT.SupplierCosts)
}
cdr := &StoredCdr{Supplier: "rif12", AnswerTime: time.Now(), Usage: 3 * time.Minute, Cost: 1}
rsponder.Stats.AppendCDR(cdr, nil)
rsponder.Stats.Call("CDRStatsV1.AppendCDR", cdr, &r)
cdr = &StoredCdr{Supplier: "dan12", AnswerTime: time.Now(), Usage: 5 * time.Minute, Cost: 2}
rsponder.Stats.AppendCDR(cdr, nil)
rsponder.Stats.Call("CDRStatsV1.AppendCDR", cdr, &r)
eQTLcr = &LCRCost{
Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_QOS_THRESHOLD, StrategyParams: "35;;;;4m;;;;;;;;;", Weight: 10.0},
SupplierCosts: []*LCRSupplierCost{

View File

@@ -20,25 +20,14 @@ package engine
import (
"fmt"
"reflect"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
type StatsInterface interface {
GetValues(string, *map[string]float64) error
GetQueueIds(int, *[]string) error
GetQueue(string, *StatsQueue) error
GetQueueTriggers(string, *ActionTriggers) error
AppendCDR(*StoredCdr, *int) error
AddQueue(*CdrStats, *int) error
ReloadQueues([]string, *int) error
ResetQueues([]string, *int) error
Stop(int, *int) error
}
type Stats struct {
queues map[string]*StatsQueue
queueSavers map[string]*queueSaver
@@ -286,50 +275,30 @@ func (s *Stats) Stop(int, *int) error {
return nil
}
type ProxyStats struct {
Client *rpcclient.RpcClient
}
func NewProxyStats(addr string, attempts, reconnects int) (*ProxyStats, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil)
if err != nil {
return nil, err
func (s *Stats) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return utils.ErrNotImplemented
}
// get method
method := reflect.ValueOf(s).MethodByName(parts[1])
if !method.IsValid() {
return utils.ErrNotImplemented
}
return &ProxyStats{Client: client}, nil
}
func (ps *ProxyStats) GetValues(sqID string, values *map[string]float64) error {
return ps.Client.Call("Stats.GetValues", sqID, values)
}
// construct the params
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
func (ps *ProxyStats) AppendCDR(cdr *StoredCdr, out *int) error {
return ps.Client.Call("Stats.AppendCDR", cdr, out)
}
func (ps *ProxyStats) GetQueueIds(in int, ids *[]string) error {
return ps.Client.Call("Stats.GetQueueIds", in, ids)
}
func (ps *ProxyStats) GetQueue(id string, sq *StatsQueue) error {
return ps.Client.Call("Stats.GetQueue", id, sq)
}
func (ps *ProxyStats) GetQueueTriggers(id string, ats *ActionTriggers) error {
return ps.Client.Call("Stats.GetQueueTriggers", id, ats)
}
func (ps *ProxyStats) AddQueue(cs *CdrStats, out *int) error {
return ps.Client.Call("Stats.AddQueue", cs, out)
}
func (ps *ProxyStats) ReloadQueues(ids []string, out *int) error {
return ps.Client.Call("Stats.ReloadQueues", ids, out)
}
func (ps *ProxyStats) ResetQueues(ids []string, out *int) error {
return ps.Client.Call("Stats.ResetQueues", ids, out)
}
func (ps *ProxyStats) Stop(i int, r *int) error {
return ps.Client.Call("Stats.Stop", 0, i)
ret := method.Call(params)
if len(ret) != 1 {
return utils.ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError
}
return err
}

View File

@@ -298,7 +298,7 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) {
ms.dict[utils.RATING_PLAN_PREFIX+rp.Id] = b.Bytes()
response := 0
if historyScribe != nil {
go historyScribe.Record(rp.GetHistoryRecord(), &response)
go historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(), &response)
}
return
}
@@ -328,7 +328,7 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
ms.dict[utils.RATING_PROFILE_PREFIX+rpf.Id] = result
response := 0
if historyScribe != nil {
go historyScribe.Record(rpf.GetHistoryRecord(false), &response)
go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(false), &response)
}
return
}
@@ -341,7 +341,7 @@ func (ms *MapStorage) RemoveRatingProfile(key string) (err error) {
response := 0
rpf := &RatingProfile{Id: key}
if historyScribe != nil {
go historyScribe.Record(rpf.GetHistoryRecord(true), &response)
go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(true), &response)
}
}
}
@@ -406,7 +406,7 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) {
ms.dict[utils.DESTINATION_PREFIX+dest.Id] = b.Bytes()
response := 0
if historyScribe != nil {
go historyScribe.Record(dest.GetHistoryRecord(), &response)
go historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(), &response)
}
return
}

View File

@@ -612,7 +612,7 @@ func (ms *MongoStorage) SetRatingPlan(rp *RatingPlan) error {
_, err := ms.db.C(colRpl).Upsert(bson.M{"id": rp.Id}, rp)
if err == nil && historyScribe != nil {
var response int
historyScribe.Record(rp.GetHistoryRecord(), &response)
historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(), &response)
}
return err
}
@@ -637,7 +637,7 @@ func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error {
_, err := ms.db.C(colRpf).Upsert(bson.M{"id": rp.Id}, rp)
if err == nil && historyScribe != nil {
var response int
historyScribe.Record(rp.GetHistoryRecord(false), &response)
historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(false), &response)
}
return err
}
@@ -653,7 +653,7 @@ func (ms *MongoStorage) RemoveRatingProfile(key string) error {
rpf := &RatingProfile{Id: result.Id}
if historyScribe != nil {
var response int
go historyScribe.Record(rpf.GetHistoryRecord(true), &response)
go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(true), &response)
}
}
return iter.Close()
@@ -705,7 +705,7 @@ func (ms *MongoStorage) SetDestination(dest *Destination) (err error) {
_, err = ms.db.C(colDst).Upsert(bson.M{"id": dest.Id}, dest)
if err == nil && historyScribe != nil {
var response int
historyScribe.Record(dest.GetHistoryRecord(), &response)
historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(), &response)
}
return
}

View File

@@ -436,7 +436,7 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) {
err = rs.db.Cmd("SET", utils.RATING_PLAN_PREFIX+rp.Id, b.Bytes()).Err
if err == nil && historyScribe != nil {
response := 0
go historyScribe.Record(rp.GetHistoryRecord(), &response)
go historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(), &response)
}
return
}
@@ -465,7 +465,7 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
err = rs.db.Cmd("SET", utils.RATING_PROFILE_PREFIX+rpf.Id, result).Err
if err == nil && historyScribe != nil {
response := 0
go historyScribe.Record(rpf.GetHistoryRecord(false), &response)
go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(false), &response)
}
return
}
@@ -488,7 +488,7 @@ func (rs *RedisStorage) RemoveRatingProfile(key string) error {
rpf := &RatingProfile{Id: key}
if historyScribe != nil {
response := 0
go historyScribe.Record(rpf.GetHistoryRecord(true), &response)
go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(true), &response)
}
}
return nil
@@ -556,7 +556,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) {
err = rs.db.Cmd("SET", utils.DESTINATION_PREFIX+dest.Id, b.Bytes()).Err
if err == nil && historyScribe != nil {
response := 0
go historyScribe.Record(dest.GetHistoryRecord(), &response)
go historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(), &response)
}
return
}

View File

@@ -2,12 +2,12 @@ package engine
import (
"fmt"
"reflect"
"sort"
"strings"
"sync"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
type UserProfile struct {
@@ -52,16 +52,6 @@ func (ud *UserProfile) SetId(id string) error {
return nil
}
type UserService interface {
SetUser(UserProfile, *string) error
RemoveUser(UserProfile, *string) error
UpdateUser(UserProfile, *string) error
GetUsers(UserProfile, *UserProfiles) error
AddIndex([]string, *string) error
GetIndexes(string, *map[string][]string) error
ReloadUsers(string, *string) error
}
type prop struct {
masked bool
weight float64
@@ -139,21 +129,21 @@ func (um *UserMap) ReloadUsers(in string, reply *string) error {
return nil
}
func (um *UserMap) SetUser(up UserProfile, reply *string) error {
func (um *UserMap) SetUser(up *UserProfile, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
if err := um.accountingDb.SetUser(&up); err != nil {
if err := um.accountingDb.SetUser(up); err != nil {
*reply = err.Error()
return err
}
um.table[up.GetId()] = up.Profile
um.properties[up.GetId()] = &prop{weight: up.Weight, masked: up.Masked}
um.addIndex(&up, um.indexKeys)
um.addIndex(up, um.indexKeys)
*reply = utils.OK
return nil
}
func (um *UserMap) RemoveUser(up UserProfile, reply *string) error {
func (um *UserMap) RemoveUser(up *UserProfile, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
if err := um.accountingDb.RemoveUser(up.GetId()); err != nil {
@@ -162,12 +152,12 @@ func (um *UserMap) RemoveUser(up UserProfile, reply *string) error {
}
delete(um.table, up.GetId())
delete(um.properties, up.GetId())
um.deleteIndex(&up)
um.deleteIndex(up)
*reply = utils.OK
return nil
}
func (um *UserMap) UpdateUser(up UserProfile, reply *string) error {
func (um *UserMap) UpdateUser(up *UserProfile, reply *string) error {
um.mu.Lock()
defer um.mu.Unlock()
m, found := um.table[up.GetId()]
@@ -212,7 +202,7 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error {
return nil
}
func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error {
func (um *UserMap) GetUsers(up *UserProfile, results *UserProfiles) error {
um.mu.RLock()
defer um.mu.RUnlock()
table := um.table // no index
@@ -402,44 +392,32 @@ func (um *UserMap) GetIndexes(in string, reply *map[string][]string) error {
return nil
}
type ProxyUserService struct {
Client *rpcclient.RpcClient
}
func NewProxyUserService(addr string, attempts, reconnects int) (*ProxyUserService, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil)
if err != nil {
return nil, err
func (um *UserMap) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return utils.ErrNotImplemented
}
// get method
method := reflect.ValueOf(um).MethodByName(parts[1])
if !method.IsValid() {
return utils.ErrNotImplemented
}
return &ProxyUserService{Client: client}, nil
}
func (ps *ProxyUserService) SetUser(ud UserProfile, reply *string) error {
return ps.Client.Call("UsersV1.SetUser", ud, reply)
}
// construct the params
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
func (ps *ProxyUserService) RemoveUser(ud UserProfile, reply *string) error {
return ps.Client.Call("UsersV1.RemoveUser", ud, reply)
}
func (ps *ProxyUserService) UpdateUser(ud UserProfile, reply *string) error {
return ps.Client.Call("UsersV1.UpdateUser", ud, reply)
}
func (ps *ProxyUserService) GetUsers(ud UserProfile, users *UserProfiles) error {
return ps.Client.Call("UsersV1.GetUsers", ud, users)
}
func (ps *ProxyUserService) AddIndex(indexes []string, reply *string) error {
return ps.Client.Call("UsersV1.AddIndex", indexes, reply)
}
func (ps *ProxyUserService) GetIndexes(in string, reply *map[string][]string) error {
return ps.Client.Call("UsersV1.AddIndex", in, reply)
}
func (ps *ProxyUserService) ReloadUsers(in string, reply *string) error {
return ps.Client.Call("UsersV1.ReloadUsers", in, reply)
ret := method.Call(params)
if len(ret) != 1 {
return utils.ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError
}
return err
}
// extraFields - Field name in the interface containing extraFields information
@@ -484,7 +462,7 @@ func LoadUserProfile(in interface{}, extraFields string) error {
}
}
ups := UserProfiles{}
if err := userService.GetUsers(*up, &ups); err != nil {
if err := userService.Call("UsersV1.GetUsers", up, &ups); err != nil {
return err
}
if len(ups) > 0 {

View File

@@ -36,7 +36,7 @@ var testMap2 = UserMap{
func TestUsersAdd(t *testing.T) {
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
up := &UserProfile{
Tenant: "test",
UserName: "user",
Profile: map[string]string{
@@ -57,7 +57,7 @@ func TestUsersAdd(t *testing.T) {
func TestUsersUpdate(t *testing.T) {
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
up := &UserProfile{
Tenant: "test",
UserName: "user",
Profile: map[string]string{
@@ -88,7 +88,7 @@ func TestUsersUpdate(t *testing.T) {
func TestUsersUpdateNotFound(t *testing.T) {
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
up := &UserProfile{
Tenant: "test",
UserName: "user",
Profile: map[string]string{
@@ -106,12 +106,12 @@ func TestUsersUpdateNotFound(t *testing.T) {
func TestUsersUpdateInit(t *testing.T) {
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
up := &UserProfile{
Tenant: "test",
UserName: "user",
}
tm.SetUser(up, &r)
up = UserProfile{
up = &UserProfile{
Tenant: "test",
UserName: "user",
Profile: map[string]string{
@@ -132,7 +132,7 @@ func TestUsersUpdateInit(t *testing.T) {
func TestUsersRemove(t *testing.T) {
tm := newUserMap(accountingStorage, nil)
var r string
up := UserProfile{
up := &UserProfile{
Tenant: "test",
UserName: "user",
Profile: map[string]string{
@@ -158,7 +158,7 @@ func TestUsersRemove(t *testing.T) {
}
func TestUsersGetFull(t *testing.T) {
up := UserProfile{
up := &UserProfile{
Tenant: "test",
UserName: "user",
Profile: map[string]string{
@@ -173,7 +173,7 @@ func TestUsersGetFull(t *testing.T) {
}
func TestUsersGetFullMasked(t *testing.T) {
up := UserProfile{
up := &UserProfile{
Tenant: "test",
}
results := UserProfiles{}
@@ -184,7 +184,7 @@ func TestUsersGetFullMasked(t *testing.T) {
}
func TestUsersGetFullUnMasked(t *testing.T) {
up := UserProfile{
up := &UserProfile{
Tenant: "test",
Masked: true,
}
@@ -199,7 +199,7 @@ func TestUsersGetFullUnMasked(t *testing.T) {
}
func TestUsersGetTenant(t *testing.T) {
up := UserProfile{
up := &UserProfile{
Tenant: "testX",
UserName: "user",
Profile: map[string]string{
@@ -214,7 +214,7 @@ func TestUsersGetTenant(t *testing.T) {
}
func TestUsersGetUserName(t *testing.T) {
up := UserProfile{
up := &UserProfile{
Tenant: "test",
UserName: "userX",
Profile: map[string]string{
@@ -229,7 +229,7 @@ func TestUsersGetUserName(t *testing.T) {
}
func TestUsersGetNotFoundProfile(t *testing.T) {
up := UserProfile{
up := &UserProfile{
Tenant: "test",
UserName: "user",
Profile: map[string]string{
@@ -244,7 +244,7 @@ func TestUsersGetNotFoundProfile(t *testing.T) {
}
func TestUsersGetMissingTenant(t *testing.T) {
up := UserProfile{
up := &UserProfile{
UserName: "user",
Profile: map[string]string{
"t": "v",
@@ -258,7 +258,7 @@ func TestUsersGetMissingTenant(t *testing.T) {
}
func TestUsersGetMissingUserName(t *testing.T) {
up := UserProfile{
up := &UserProfile{
Tenant: "test",
Profile: map[string]string{
"t": "v",
@@ -272,7 +272,7 @@ func TestUsersGetMissingUserName(t *testing.T) {
}
func TestUsersGetMissingId(t *testing.T) {
up := UserProfile{
up := &UserProfile{
Profile: map[string]string{
"t": "v",
},
@@ -285,7 +285,7 @@ func TestUsersGetMissingId(t *testing.T) {
}
func TestUsersGetMissingIdTwo(t *testing.T) {
up := UserProfile{
up := &UserProfile{
Profile: map[string]string{
"t": "v",
"x": "y",
@@ -299,7 +299,7 @@ func TestUsersGetMissingIdTwo(t *testing.T) {
}
func TestUsersGetMissingIdTwoSort(t *testing.T) {
up := UserProfile{
up := &UserProfile{
Profile: map[string]string{
"t": "v",
"x": "y",
@@ -316,7 +316,7 @@ func TestUsersGetMissingIdTwoSort(t *testing.T) {
}
func TestUsersGetMissingIdTwoSortWeight(t *testing.T) {
up := UserProfile{
up := &UserProfile{
Profile: map[string]string{
"a": "b",
"c": "d",
@@ -367,7 +367,7 @@ func TestUsersGetFullindex(t *testing.T) {
var r string
testMap.index = make(map[string]map[string]bool) // reset index
testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r)
up := UserProfile{
up := &UserProfile{
Tenant: "test",
UserName: "user",
Profile: map[string]string{
@@ -385,7 +385,7 @@ func TestUsersGetTenantindex(t *testing.T) {
var r string
testMap.index = make(map[string]map[string]bool) // reset index
testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r)
up := UserProfile{
up := &UserProfile{
Tenant: "testX",
UserName: "user",
Profile: map[string]string{
@@ -403,7 +403,7 @@ func TestUsersGetUserNameindex(t *testing.T) {
var r string
testMap.index = make(map[string]map[string]bool) // reset index
testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r)
up := UserProfile{
up := &UserProfile{
Tenant: "test",
UserName: "userX",
Profile: map[string]string{
@@ -421,7 +421,7 @@ func TestUsersGetNotFoundProfileindex(t *testing.T) {
var r string
testMap.index = make(map[string]map[string]bool) // reset index
testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r)
up := UserProfile{
up := &UserProfile{
Tenant: "test",
UserName: "user",
Profile: map[string]string{
@@ -439,7 +439,7 @@ func TestUsersGetMissingTenantindex(t *testing.T) {
var r string
testMap.index = make(map[string]map[string]bool) // reset index
testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r)
up := UserProfile{
up := &UserProfile{
UserName: "user",
Profile: map[string]string{
"t": "v",
@@ -456,7 +456,7 @@ func TestUsersGetMissingUserNameindex(t *testing.T) {
var r string
testMap.index = make(map[string]map[string]bool) // reset index
testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r)
up := UserProfile{
up := &UserProfile{
Tenant: "test",
Profile: map[string]string{
"t": "v",
@@ -473,7 +473,7 @@ func TestUsersGetMissingIdindex(t *testing.T) {
var r string
testMap.index = make(map[string]map[string]bool) // reset index
testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r)
up := UserProfile{
up := &UserProfile{
Profile: map[string]string{
"t": "v",
},
@@ -489,7 +489,7 @@ func TestUsersGetMissingIdTwoINdex(t *testing.T) {
var r string
testMap.index = make(map[string]map[string]bool) // reset index
testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r)
up := UserProfile{
up := &UserProfile{
Profile: map[string]string{
"t": "v",
"x": "y",
@@ -509,7 +509,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) {
if len(tm.index) != 0 {
t.Error("error adding indexes: ", tm.index)
}
tm.SetUser(UserProfile{
tm.SetUser(&UserProfile{
Tenant: "test",
UserName: "user",
Profile: map[string]string{
@@ -519,7 +519,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) {
if len(tm.index) != 1 || !tm.index["t:v"]["test:user"] {
t.Error("error adding indexes: ", tm.index)
}
tm.SetUser(UserProfile{
tm.SetUser(&UserProfile{
Tenant: "test",
UserName: "best",
Profile: map[string]string{
@@ -531,7 +531,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) {
!tm.index["t:v"]["test:best"] {
t.Error("error adding indexes: ", tm.index)
}
tm.UpdateUser(UserProfile{
tm.UpdateUser(&UserProfile{
Tenant: "test",
UserName: "best",
Profile: map[string]string{
@@ -543,7 +543,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) {
!tm.index["t:v1"]["test:best"] {
t.Error("error adding indexes: ", tm.index)
}
tm.UpdateUser(UserProfile{
tm.UpdateUser(&UserProfile{
Tenant: "test",
UserName: "best",
Profile: map[string]string{
@@ -555,7 +555,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) {
!tm.index["t:v"]["test:best"] {
t.Error("error adding indexes: ", tm.index)
}
tm.RemoveUser(UserProfile{
tm.RemoveUser(&UserProfile{
Tenant: "test",
UserName: "best",
Profile: map[string]string{
@@ -567,7 +567,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) {
tm.index["t:v"]["test:best"] {
t.Error("error adding indexes: ", tm.index)
}
tm.RemoveUser(UserProfile{
tm.RemoveUser(&UserProfile{
Tenant: "test",
UserName: "user",
Profile: map[string]string{

View File

@@ -28,8 +28,12 @@ import (
"os"
"os/exec"
"path/filepath"
"reflect"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/utils"
)
type FileScribe struct {
@@ -170,3 +174,31 @@ func (s *FileScribe) save(filename string) error {
f.Close()
return s.gitCommit()
}
func (s *FileScribe) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return utils.ErrNotImplemented
}
// get method
method := reflect.ValueOf(s).MethodByName(parts[1])
if !method.IsValid() {
return utils.ErrNotImplemented
}
// construct the params
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
ret := method.Call(params)
if len(ret) != 1 {
return utils.ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError
}
return err
}

View File

@@ -21,7 +21,11 @@ package history
import (
"bufio"
"bytes"
"reflect"
"strings"
"sync"
"github.com/cgrates/cgrates/utils"
)
type MockScribe struct {
@@ -64,3 +68,31 @@ func (s *MockScribe) GetBuffer(fn string) *bytes.Buffer {
defer s.mu.Unlock()
return s.BufMap[fn]
}
func (s *MockScribe) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return utils.ErrNotImplemented
}
// get method
method := reflect.ValueOf(s).MethodByName(parts[1])
if !method.IsValid() {
return utils.ErrNotImplemented
}
// construct the params
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
ret := method.Call(params)
if len(ret) != 1 {
return utils.ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError
}
return err
}

View File

@@ -1,40 +0,0 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2012-2015 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package history
import (
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
type ProxyScribe struct {
Client *rpcclient.RpcClient
}
func NewProxyScribe(addr string, attempts, reconnects int) (*ProxyScribe, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil)
if err != nil {
return nil, err
}
return &ProxyScribe{Client: client}, nil
}
func (ps *ProxyScribe) Record(rec Record, out *int) error {
return ps.Client.Call("Scribe.Record", rec, out)
}

View File

@@ -30,10 +30,6 @@ const (
RATING_PROFILES_FN = "rating_profiles.json"
)
type Scribe interface {
Record(Record, *int) error
}
type Record struct {
Id string
Filename string