diff --git a/apier/v1/aliases.go b/apier/v1/aliases.go index e17798433..72753b357 100644 --- a/apier/v1/aliases.go +++ b/apier/v1/aliases.go @@ -20,6 +20,7 @@ package v1 import ( "errors" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -48,8 +49,8 @@ func (self *ApierV1) AddRatingSubjectAliases(attrs AttrAddRatingSubjectAliases, } var ignr string for _, alias := range attrs.Aliases { - if err := aliases.SetAlias( - engine.Alias{Direction: utils.META_OUT, Tenant: attrs.Tenant, Category: attrs.Category, Account: alias, Subject: alias, Context: utils.ALIAS_CONTEXT_RATING, + if err := aliases.Call("AliasesV1.SetAlias", + &engine.Alias{Direction: utils.META_OUT, Tenant: attrs.Tenant, Category: attrs.Category, Account: alias, Subject: alias, Context: utils.ALIAS_CONTEXT_RATING, Values: engine.AliasValues{&engine.AliasValue{DestinationId: utils.META_ANY, Pairs: engine.AliasPairs{"Subject": map[string]string{alias: attrs.Subject}}, Weight: 10.0}}}, &ignr); err != nil { return utils.NewErrServerError(err) @@ -69,7 +70,7 @@ func (self *ApierV1) RemRatingSubjectAliases(tenantRatingSubject engine.TenantRa return errors.New("ALIASES_NOT_ENABLED") } var reverseAliases map[string][]*engine.Alias - if err := aliases.GetReverseAlias(engine.AttrReverseAlias{Target: "Subject", Alias: tenantRatingSubject.Subject, Context: utils.ALIAS_CONTEXT_RATING}, &reverseAliases); err != nil { + if err := aliases.Call("AliasesV1.GetReverseAlias", &engine.AttrReverseAlias{Target: "Subject", Alias: tenantRatingSubject.Subject, Context: utils.ALIAS_CONTEXT_RATING}, &reverseAliases); err != nil { return utils.NewErrServerError(err) } var ignr string @@ -78,7 +79,7 @@ func (self *ApierV1) RemRatingSubjectAliases(tenantRatingSubject engine.TenantRa if alias.Tenant != tenantRatingSubject.Tenant { continue // From another tenant } - if err := aliases.RemoveAlias(*alias, &ignr); err != nil { + if err := aliases.Call("AliasesV1.RemoveAlias", alias, &ignr); err != nil { return utils.NewErrServerError(err) } } @@ -100,8 +101,8 @@ func (self *ApierV1) AddAccountAliases(attrs AttrAddAccountAliases, reply *strin } var ignr string for _, alias := range attrs.Aliases { - if err := aliases.SetAlias( - engine.Alias{Direction: utils.META_OUT, Tenant: attrs.Tenant, Category: attrs.Category, Account: alias, Subject: alias, Context: utils.ALIAS_CONTEXT_RATING, + if err := aliases.Call("AliasesV1.SetAlias", + &engine.Alias{Direction: utils.META_OUT, Tenant: attrs.Tenant, Category: attrs.Category, Account: alias, Subject: alias, Context: utils.ALIAS_CONTEXT_RATING, Values: engine.AliasValues{&engine.AliasValue{DestinationId: utils.META_ANY, Pairs: engine.AliasPairs{"Account": map[string]string{alias: attrs.Account}}, Weight: 10.0}}}, &ignr); err != nil { return utils.NewErrServerError(err) @@ -121,7 +122,7 @@ func (self *ApierV1) RemAccountAliases(tenantAccount engine.TenantAccount, reply return errors.New("ALIASES_NOT_ENABLED") } var reverseAliases map[string][]*engine.Alias - if err := aliases.GetReverseAlias(engine.AttrReverseAlias{Target: "Account", Alias: tenantAccount.Account, Context: utils.ALIAS_CONTEXT_RATING}, &reverseAliases); err != nil { + if err := aliases.Call("AliasesV1.GetReverseAlias", &engine.AttrReverseAlias{Target: "Account", Alias: tenantAccount.Account, Context: utils.ALIAS_CONTEXT_RATING}, &reverseAliases); err != nil { return utils.NewErrServerError(err) } var ignr string @@ -130,7 +131,7 @@ func (self *ApierV1) RemAccountAliases(tenantAccount engine.TenantAccount, reply if alias.Tenant != tenantAccount.Tenant { continue // From another tenant } - if err := aliases.RemoveAlias(*alias, &ignr); err != nil { + if err := aliases.Call("AliasesV1.RemoveAlias", alias, &ignr); err != nil { return utils.NewErrServerError(err) } } diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 38157f5a8..53b56fb46 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -33,6 +33,7 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) const ( @@ -48,8 +49,8 @@ type ApierV1 struct { Sched *scheduler.Scheduler Config *config.CGRConfig Responder *engine.Responder - CdrStatsSrv engine.StatsInterface - Users engine.UserService + CdrStatsSrv rpcclient.RpcClientConnection + Users rpcclient.RpcClientConnection } func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error { @@ -524,13 +525,13 @@ func (self *ApierV1) LoadTariffPlanFromStorDb(attrs AttrLoadTpFromStorDb, reply } if len(cstKeys) != 0 && self.CdrStatsSrv != nil { - if err := self.CdrStatsSrv.ReloadQueues(cstKeys, nil); err != nil { + if err := self.CdrStatsSrv.Call("CDRStatsV1.ReloadQueues", cstKeys, nil); err != nil { return err } } if len(userKeys) != 0 && self.Users != nil { var r string - if err := self.Users.ReloadUsers("", &r); err != nil { + if err := self.Users.Call("AliasV1.ReloadUsers", "", &r); err != nil { return err } } @@ -1036,14 +1037,14 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach cs.Aliases = cache2go.CountEntries(utils.ALIASES_PREFIX) if self.CdrStatsSrv != nil && self.Config.CDRStatsEnabled { var queueIds []string - if err := self.CdrStatsSrv.GetQueueIds(0, &queueIds); err != nil { + if err := self.CdrStatsSrv.Call("CDRStatsV1.GetQueueIds", 0, &queueIds); err != nil { return utils.NewErrServerError(err) } cs.CdrStats = len(queueIds) } if self.Config.RaterUserServer == utils.INTERNAL { var ups engine.UserProfiles - if err := self.Users.GetUsers(engine.UserProfile{}, &ups); err != nil { + if err := self.Users.Call("UsersV1.GetUsers", &engine.UserProfile{}, &ups); err != nil { return utils.NewErrServerError(err) } cs.Users = len(ups) @@ -1187,13 +1188,13 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, self.Sched.Restart() } if len(cstKeys) != 0 && self.CdrStatsSrv != nil { - if err := self.CdrStatsSrv.ReloadQueues(cstKeys, nil); err != nil { + if err := self.CdrStatsSrv.Call("CDRStatsV1.ReloadQueues", cstKeys, nil); err != nil { return err } } if len(userKeys) != 0 && self.Users != nil { var r string - if err := self.Users.ReloadUsers("", &r); err != nil { + if err := self.Users.Call("UsersV1.ReloadUsers", "", &r); err != nil { return err } } diff --git a/apier/v1/cdrstatsv1.go b/apier/v1/cdrstatsv1.go index 1677021b0..a9bef703f 100644 --- a/apier/v1/cdrstatsv1.go +++ b/apier/v1/cdrstatsv1.go @@ -23,11 +23,12 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) // Interact with Stats server type CDRStatsV1 struct { - CdrStats engine.StatsInterface + CdrStats rpcclient.RpcClientConnection } type AttrGetMetrics struct { @@ -38,23 +39,23 @@ func (sts *CDRStatsV1) GetMetrics(attr AttrGetMetrics, reply *map[string]float64 if len(attr.StatsQueueId) == 0 { return fmt.Errorf("%s:StatsQueueId", utils.ErrMandatoryIeMissing.Error()) } - return sts.CdrStats.GetValues(attr.StatsQueueId, reply) + return sts.CdrStats.Call("CDRStatsV1.GetValues", attr.StatsQueueId, reply) } func (sts *CDRStatsV1) GetQueueIds(empty string, reply *[]string) error { - return sts.CdrStats.GetQueueIds(0, reply) + return sts.CdrStats.Call("CDRStatsV1.GetQueueIds", 0, reply) } func (sts *CDRStatsV1) GetQueue(id string, sq *engine.StatsQueue) error { - return sts.CdrStats.GetQueue(id, sq) + return sts.CdrStats.Call("CDRStatsV1.GetQueue", id, sq) } func (sts *CDRStatsV1) GetQueueTriggers(id string, ats *engine.ActionTriggers) error { - return sts.CdrStats.GetQueueTriggers(id, ats) + return sts.CdrStats.Call("CDRStatsV1.GetQueueTriggers", id, ats) } func (sts *CDRStatsV1) ReloadQueues(attr utils.AttrCDRStatsReloadQueues, reply *string) error { - if err := sts.CdrStats.ReloadQueues(attr.StatsQueueIds, nil); err != nil { + if err := sts.CdrStats.Call("CDRStatsV1.ReloadQueues", attr.StatsQueueIds, nil); err != nil { return err } *reply = utils.OK @@ -62,7 +63,7 @@ func (sts *CDRStatsV1) ReloadQueues(attr utils.AttrCDRStatsReloadQueues, reply * } func (sts *CDRStatsV1) ResetQueues(attr utils.AttrCDRStatsReloadQueues, reply *string) error { - if err := sts.CdrStats.ResetQueues(attr.StatsQueueIds, nil); err != nil { + if err := sts.CdrStats.Call("CDRStatsV1.ResetQueues", attr.StatsQueueIds, nil); err != nil { return err } *reply = utils.OK diff --git a/apier/v2/apier.go b/apier/v2/apier.go index e35c48a7d..bb773ece8 100644 --- a/apier/v2/apier.go +++ b/apier/v2/apier.go @@ -253,14 +253,14 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, self.Sched.Restart() } if len(cstKeys) != 0 && self.CdrStatsSrv != nil { - if err := self.CdrStatsSrv.ReloadQueues(cstKeys, nil); err != nil { + if err := self.CdrStatsSrv.Call("CDRStatsV1.ReloadQueues", cstKeys, nil); err != nil { return err } } if len(userKeys) != 0 && self.Users != nil { var r string - if err := self.Users.ReloadUsers("", &r); err != nil { + if err := self.Users.Call("UsersV1.ReloadUsers", "", &r); err != nil { return err } } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index cc6a85f2f..13c28408d 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -372,9 +372,9 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS } func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, cdrDb engine.CdrStorage, - internalRaterChan chan *engine.Responder, internalPubSubSChan chan engine.PublisherSubscriber, - internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService, - internalCdrStatSChan chan engine.StatsInterface, server *utils.Server, exitChan chan bool) { + internalRaterChan chan *engine.Responder, internalPubSubSChan chan rpcclient.RpcClientConnection, + internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection, + internalCdrStatSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS CDRS service.") var err error var client *rpcclient.RpcClient @@ -394,14 +394,14 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, raterConn = client } // Pubsub connection init - var pubSubConn engine.PublisherSubscriber + var pubSubConn rpcclient.RpcClientConnection if cfg.CDRSPubSub == utils.INTERNAL { pubSubs := <-internalPubSubSChan pubSubConn = pubSubs internalPubSubSChan <- pubSubs } else if len(cfg.CDRSPubSub) != 0 { if cfg.CDRSRater == cfg.CDRSPubSub { - pubSubConn = &engine.ProxyPubSub{Client: client} + pubSubConn = client } else { client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSPubSub, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { @@ -409,18 +409,18 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, exitChan <- true return } - pubSubConn = &engine.ProxyPubSub{Client: client} + pubSubConn = client } } // Users connection init - var usersConn engine.UserService + var usersConn rpcclient.RpcClientConnection if cfg.CDRSUsers == utils.INTERNAL { userS := <-internalUserSChan usersConn = userS internalUserSChan <- userS } else if len(cfg.CDRSUsers) != 0 { if cfg.CDRSRater == cfg.CDRSUsers { - usersConn = &engine.ProxyUserService{Client: client} + usersConn = client } else { client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSUsers, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { @@ -428,18 +428,18 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, exitChan <- true return } - usersConn = &engine.ProxyUserService{Client: client} + usersConn = client } } // Aliases connection init - var aliasesConn engine.AliasService + var aliasesConn rpcclient.RpcClientConnection if cfg.CDRSAliases == utils.INTERNAL { aliaseS := <-internalAliaseSChan aliasesConn = aliaseS internalAliaseSChan <- aliaseS } else if len(cfg.CDRSAliases) != 0 { if cfg.CDRSRater == cfg.CDRSAliases { - aliasesConn = &engine.ProxyAliasService{Client: client} + aliasesConn = client } else { client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSAliases, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { @@ -447,18 +447,18 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, exitChan <- true return } - aliasesConn = &engine.ProxyAliasService{Client: client} + aliasesConn = client } } // Stats connection init - var statsConn engine.StatsInterface + var statsConn rpcclient.RpcClientConnection if cfg.CDRSStats == utils.INTERNAL { statS := <-internalCdrStatSChan statsConn = statS internalCdrStatSChan <- statS } else if len(cfg.CDRSStats) != 0 { if cfg.CDRSRater == cfg.CDRSStats { - statsConn = &engine.ProxyStats{Client: client} + statsConn = client } else { client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSStats, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { @@ -466,7 +466,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, exitChan <- true return } - statsConn = &engine.ProxyStats{Client: client} + statsConn = client } } @@ -495,31 +495,31 @@ func startScheduler(internalSchedulerChan chan *scheduler.Scheduler, ratingDb en exitChan <- true // Should not get out of loop though } -func startCdrStats(internalCdrStatSChan chan engine.StatsInterface, ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, server *utils.Server) { +func startCdrStats(internalCdrStatSChan chan rpcclient.RpcClientConnection, ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, server *utils.Server) { cdrStats := engine.NewStats(ratingDb, accountDb, cfg.CDRStatsSaveInterval) server.RpcRegister(cdrStats) server.RpcRegister(&v1.CDRStatsV1{CdrStats: cdrStats}) // Public APIs internalCdrStatSChan <- cdrStats } -func startHistoryServer(internalHistorySChan chan history.Scribe, server *utils.Server, exitChan chan bool) { +func startHistoryServer(internalHistorySChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { scribeServer, err := history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) exitChan <- true } - server.RpcRegisterName("ScribeV1", scribeServer) + server.RpcRegisterName("HistoryV1", scribeServer) internalHistorySChan <- scribeServer } -func startPubSubServer(internalPubSubSChan chan engine.PublisherSubscriber, accountDb engine.AccountingStorage, server *utils.Server) { +func startPubSubServer(internalPubSubSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server) { pubSubServer := engine.NewPubSub(accountDb, cfg.HttpSkipTlsVerify) server.RpcRegisterName("PubSubV1", pubSubServer) internalPubSubSChan <- pubSubServer } // ToDo: Make sure we are caching before starting this one -func startAliasesServer(internalAliaseSChan chan engine.AliasService, accountDb engine.AccountingStorage, server *utils.Server, exitChan chan bool) { +func startAliasesServer(internalAliaseSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server, exitChan chan bool) { aliasesServer := engine.NewAliasHandler(accountDb) server.RpcRegisterName("AliasesV1", aliasesServer) if err := accountDb.CacheAccountingPrefixes(utils.ALIASES_PREFIX); err != nil { @@ -530,7 +530,7 @@ func startAliasesServer(internalAliaseSChan chan engine.AliasService, accountDb internalAliaseSChan <- aliasesServer } -func startUsersServer(internalUserSChan chan engine.UserService, accountDb engine.AccountingStorage, server *utils.Server, exitChan chan bool) { +func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, accountDb engine.AccountingStorage, server *utils.Server, exitChan chan bool) { userServer, err := engine.NewUserMap(accountDb, cfg.UserServerIndexes) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) @@ -543,11 +543,11 @@ func startUsersServer(internalUserSChan chan engine.UserService, accountDb engin func startRpc(server *utils.Server, internalRaterChan chan *engine.Responder, internalCdrSChan chan *engine.CdrServer, - internalCdrStatSChan chan engine.StatsInterface, - internalHistorySChan chan history.Scribe, - internalPubSubSChan chan engine.PublisherSubscriber, - internalUserSChan chan engine.UserService, - internalAliaseSChan chan engine.AliasService) { + internalCdrStatSChan chan rpcclient.RpcClientConnection, + internalHistorySChan chan rpcclient.RpcClientConnection, + internalPubSubSChan chan rpcclient.RpcClientConnection, + internalUserSChan chan rpcclient.RpcClientConnection, + internalAliaseSChan chan rpcclient.RpcClientConnection) { select { // Any of the rpc methods will unlock listening to rpc requests case resp := <-internalRaterChan: internalRaterChan <- resp @@ -668,11 +668,11 @@ func main() { internalRaterChan := make(chan *engine.Responder, 1) internalSchedulerChan := make(chan *scheduler.Scheduler, 1) internalCdrSChan := make(chan *engine.CdrServer, 1) - internalCdrStatSChan := make(chan engine.StatsInterface, 1) - internalHistorySChan := make(chan history.Scribe, 1) - internalPubSubSChan := make(chan engine.PublisherSubscriber, 1) - internalUserSChan := make(chan engine.UserService, 1) - internalAliaseSChan := make(chan engine.AliasService, 1) + internalCdrStatSChan := make(chan rpcclient.RpcClientConnection, 1) + internalHistorySChan := make(chan rpcclient.RpcClientConnection, 1) + internalPubSubSChan := make(chan rpcclient.RpcClientConnection, 1) + internalUserSChan := make(chan rpcclient.RpcClientConnection, 1) + internalAliaseSChan := make(chan rpcclient.RpcClientConnection, 1) internalSMGChan := make(chan rpcclient.RpcClientConnection, 1) // Start balancer service if cfg.BalancerEnabled { diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index f5a5a655f..edef08c05 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -26,9 +26,9 @@ import ( "github.com/cgrates/cgrates/apier/v2" "github.com/cgrates/cgrates/balancer2go" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled *bool, exitChan chan bool) { @@ -40,8 +40,8 @@ func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled // Starts rater and reports on chan func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler, - internalCdrStatSChan chan engine.StatsInterface, internalHistorySChan chan history.Scribe, - internalPubSubSChan chan engine.PublisherSubscriber, internalUserSChan chan engine.UserService, internalAliaseSChan chan engine.AliasService, + internalCdrStatSChan chan rpcclient.RpcClientConnection, internalHistorySChan chan rpcclient.RpcClientConnection, + internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection, server *utils.Server, ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, logDb engine.LogStorage, stopHandled *bool, exitChan chan bool) { @@ -109,7 +109,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c } // Connection to CDRStats - var cdrStats engine.StatsInterface + var cdrStats rpcclient.RpcClientConnection if cfg.RaterCdrStats != "" { cdrstatTaskChan := make(chan struct{}) waitTasks = append(waitTasks, cdrstatTaskChan) @@ -124,7 +124,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c exitChan <- true return } - } else if cdrStats, err = engine.NewProxyStats(cfg.RaterCdrStats, cfg.ConnectAttempts, -1); err != nil { + } else if cdrStats, err = rpcclient.NewRpcClient("tcp", cfg.RaterCdrStats, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to cdrstats, error: %s", err.Error())) exitChan <- true return @@ -138,7 +138,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c waitTasks = append(waitTasks, histTaskChan) go func() { defer close(histTaskChan) - var scribeServer history.Scribe + var scribeServer rpcclient.RpcClientConnection if cfg.RaterHistoryServer == utils.INTERNAL { select { case scribeServer = <-internalHistorySChan: @@ -148,7 +148,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c exitChan <- true return } - } else if scribeServer, err = history.NewProxyScribe(cfg.RaterHistoryServer, cfg.ConnectAttempts, -1); err != nil { + } else if scribeServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterHistoryServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect historys, error: %s", err.Error())) exitChan <- true return @@ -163,7 +163,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c waitTasks = append(waitTasks, pubsubTaskChan) go func() { defer close(pubsubTaskChan) - var pubSubServer engine.PublisherSubscriber + var pubSubServer rpcclient.RpcClientConnection if cfg.RaterPubSubServer == utils.INTERNAL { select { case pubSubServer = <-internalPubSubSChan: @@ -173,7 +173,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c exitChan <- true return } - } else if pubSubServer, err = engine.NewProxyPubSub(cfg.RaterPubSubServer, cfg.ConnectAttempts, -1); err != nil { + } else if pubSubServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterPubSubServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to pubsubs: %s", err.Error())) exitChan <- true return @@ -188,7 +188,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c waitTasks = append(waitTasks, aliasesTaskChan) go func() { defer close(aliasesTaskChan) - var aliasesServer engine.AliasService + var aliasesServer rpcclient.RpcClientConnection if cfg.RaterAliasesServer == utils.INTERNAL { select { case aliasesServer = <-internalAliaseSChan: @@ -198,7 +198,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c exitChan <- true return } - } else if aliasesServer, err = engine.NewProxyAliasService(cfg.RaterAliasesServer, cfg.ConnectAttempts, -1); err != nil { + } else if aliasesServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterAliasesServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to aliases, error: %s", err.Error())) exitChan <- true return @@ -208,7 +208,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c } // Connection to UserService - var userServer engine.UserService + var userServer rpcclient.RpcClientConnection if cfg.RaterUserServer != "" { usersTaskChan := make(chan struct{}) waitTasks = append(waitTasks, usersTaskChan) @@ -223,7 +223,7 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c exitChan <- true return } - } else if userServer, err = engine.NewProxyUserService(cfg.RaterUserServer, cfg.ConnectAttempts, -1); err != nil { + } else if userServer, err = rpcclient.NewRpcClient("tcp", cfg.RaterUserServer, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect users, error: %s", err.Error())) exitChan <- true return diff --git a/cmd/cgr-engine/registration.go b/cmd/cgr-engine/registration.go index ec856d6f5..0b3a45ea0 100644 --- a/cmd/cgr-engine/registration.go +++ b/cmd/cgr-engine/registration.go @@ -29,6 +29,7 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) /* @@ -43,7 +44,7 @@ func stopBalancerSignalHandler(bal *balancer2go.Balancer, exitChan chan bool) { exitChan <- true } -func generalSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exitChan chan bool) { +func generalSignalHandler(internalCdrStatSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { c := make(chan os.Signal) signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) @@ -52,7 +53,7 @@ func generalSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exitC var dummyInt int select { case cdrStats := <-internalCdrStatSChan: - cdrStats.Stop(dummyInt, &dummyInt) + cdrStats.Call("CDRStatsV1.Stop", dummyInt, &dummyInt) default: } @@ -62,7 +63,7 @@ func generalSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exitC /* Listens for the SIGTERM, SIGINT, SIGQUIT system signals and gracefuly unregister from balancer and closes the storage before exiting. */ -func stopRaterSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exitChan chan bool) { +func stopRaterSignalHandler(internalCdrStatSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { c := make(chan os.Signal) signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) sig := <-c @@ -72,7 +73,7 @@ func stopRaterSignalHandler(internalCdrStatSChan chan engine.StatsInterface, exi var dummyInt int select { case cdrStats := <-internalCdrStatSChan: - cdrStats.Stop(dummyInt, &dummyInt) + cdrStats.Call("CDRStatsV1.Stop", dummyInt, &dummyInt) default: } exitChan <- true diff --git a/engine/aliases.go b/engine/aliases.go index eda219baa..701e8d6b3 100644 --- a/engine/aliases.go +++ b/engine/aliases.go @@ -12,7 +12,7 @@ import ( ) // Temporary export AliasService for the ApierV1 to be able to emulate old APIs -func GetAliasService() AliasService { +func GetAliasService() rpcclient.RpcClientConnection { return aliasService } @@ -156,16 +156,6 @@ type AttrReverseAlias struct { Context string } -type AliasService interface { - SetAlias(Alias, *string) error - UpdateAlias(Alias, *string) error - RemoveAlias(Alias, *string) error - GetAlias(Alias, *Alias) error - GetMatchingAlias(AttrMatchingAlias, *string) error - GetReverseAlias(AttrReverseAlias, *map[string][]*Alias) error - RemoveReverseAlias(AttrReverseAlias, *string) error -} - type AliasHandler struct { accountingDb AccountingStorage mu sync.RWMutex @@ -177,11 +167,11 @@ func NewAliasHandler(accountingDb AccountingStorage) *AliasHandler { } } -func (am *AliasHandler) SetAlias(al Alias, reply *string) error { +func (am *AliasHandler) SetAlias(al *Alias, reply *string) error { am.mu.Lock() defer am.mu.Unlock() - if err := am.accountingDb.SetAlias(&al); err != nil { + if err := am.accountingDb.SetAlias(al); err != nil { *reply = err.Error() return err } //add to cache @@ -194,7 +184,7 @@ func (am *AliasHandler) SetAlias(al Alias, reply *string) error { return nil } -func (am *AliasHandler) UpdateAlias(al Alias, reply *string) error { +func (am *AliasHandler) UpdateAlias(al *Alias, reply *string) error { am.mu.Lock() defer am.mu.Unlock() // get previous value @@ -216,7 +206,7 @@ func (am *AliasHandler) UpdateAlias(al Alias, reply *string) error { } } - if err := am.accountingDb.SetAlias(&al); err != nil { + if err := am.accountingDb.SetAlias(al); err != nil { *reply = err.Error() return err } //add to cache @@ -229,7 +219,7 @@ func (am *AliasHandler) UpdateAlias(al Alias, reply *string) error { return nil } -func (am *AliasHandler) RemoveAlias(al Alias, reply *string) error { +func (am *AliasHandler) RemoveAlias(al *Alias, reply *string) error { am.mu.Lock() defer am.mu.Unlock() if err := am.accountingDb.RemoveAlias(al.GetId()); err != nil { @@ -240,7 +230,7 @@ func (am *AliasHandler) RemoveAlias(al Alias, reply *string) error { return nil } -func (am *AliasHandler) RemoveReverseAlias(attr AttrReverseAlias, reply *string) error { +func (am *AliasHandler) RemoveReverseAlias(attr *AttrReverseAlias, reply *string) error { am.mu.Lock() defer am.mu.Unlock() rKey := utils.REVERSE_ALIASES_PREFIX + attr.Alias + attr.Target + attr.Context @@ -263,7 +253,7 @@ func (am *AliasHandler) RemoveReverseAlias(attr AttrReverseAlias, reply *string) return nil } -func (am *AliasHandler) GetAlias(al Alias, result *Alias) error { +func (am *AliasHandler) GetAlias(al *Alias, result *Alias) error { am.mu.RLock() defer am.mu.RUnlock() variants := al.GenerateIds() @@ -276,7 +266,7 @@ func (am *AliasHandler) GetAlias(al Alias, result *Alias) error { return utils.ErrNotFound } -func (am *AliasHandler) GetReverseAlias(attr AttrReverseAlias, result *map[string][]*Alias) error { +func (am *AliasHandler) GetReverseAlias(attr *AttrReverseAlias, result *map[string][]*Alias) error { am.mu.Lock() defer am.mu.Unlock() aliases := make(map[string][]*Alias) @@ -303,9 +293,9 @@ func (am *AliasHandler) GetReverseAlias(attr AttrReverseAlias, result *map[strin return nil } -func (am *AliasHandler) GetMatchingAlias(attr AttrMatchingAlias, result *string) error { +func (am *AliasHandler) GetMatchingAlias(attr *AttrMatchingAlias, result *string) error { response := Alias{} - if err := am.GetAlias(Alias{ + if err := am.GetAlias(&Alias{ Direction: attr.Direction, Tenant: attr.Tenant, Category: attr.Category, @@ -357,48 +347,32 @@ func (am *AliasHandler) GetMatchingAlias(attr AttrMatchingAlias, result *string) return utils.ErrNotFound } -type ProxyAliasService struct { - Client *rpcclient.RpcClient -} - -func NewProxyAliasService(addr string, attempts, reconnects int) (*ProxyAliasService, error) { - client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil) - if err != nil { - return nil, err +func (am *AliasHandler) Call(serviceMethod string, args interface{}, reply interface{}) error { + parts := strings.Split(serviceMethod, ".") + if len(parts) != 2 { + return utils.ErrNotImplemented + } + // get method + method := reflect.ValueOf(am).MethodByName(parts[1]) + if !method.IsValid() { + return utils.ErrNotImplemented } - return &ProxyAliasService{Client: client}, nil -} -func (ps *ProxyAliasService) SetAlias(al Alias, reply *string) error { - return ps.Client.Call("AliasesV1.SetAlias", al, reply) -} + // construct the params + params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} -func (ps *ProxyAliasService) UpdateAlias(al Alias, reply *string) error { - return ps.Client.Call("AliasesV1.UpdateAlias", al, reply) -} - -func (ps *ProxyAliasService) RemoveAlias(al Alias, reply *string) error { - return ps.Client.Call("AliasesV1.RemoveAlias", al, reply) -} - -func (ps *ProxyAliasService) GetAlias(al Alias, alias *Alias) error { - return ps.Client.Call("AliasesV1.GetAlias", al, alias) -} - -func (ps *ProxyAliasService) GetMatchingAlias(attr AttrMatchingAlias, alias *string) error { - return ps.Client.Call("AliasesV1.GetMatchingAlias", attr, alias) -} - -func (ps *ProxyAliasService) GetReverseAlias(attr AttrReverseAlias, alias *map[string][]*Alias) error { - return ps.Client.Call("AliasesV1.GetReverseAlias", attr, alias) -} - -func (ps *ProxyAliasService) RemoveReverseAlias(attr AttrReverseAlias, reply *string) error { - return ps.Client.Call("AliasesV1.RemoveReverseAlias", attr, reply) -} - -func (ps *ProxyAliasService) ReloadAliases(in string, reply *string) error { - return ps.Client.Call("AliasesV1.ReloadAliases", in, reply) + ret := method.Call(params) + if len(ret) != 1 { + return utils.ErrServerError + } + if ret[0].Interface() == nil { + return nil + } + err, ok := ret[0].Interface().(error) + if !ok { + return utils.ErrServerError + } + return err } func LoadAlias(attr *AttrMatchingAlias, in interface{}, extraFields string) error { @@ -406,7 +380,7 @@ func LoadAlias(attr *AttrMatchingAlias, in interface{}, extraFields string) erro return nil } response := Alias{} - if err := aliasService.GetAlias(Alias{ + if err := aliasService.Call("AliasesV1.GetAlias", &Alias{ Direction: attr.Direction, Tenant: attr.Tenant, Category: attr.Category, diff --git a/engine/aliases_test.go b/engine/aliases_test.go index 67d89dd9a..2e87db3d0 100644 --- a/engine/aliases_test.go +++ b/engine/aliases_test.go @@ -12,7 +12,7 @@ func init() { } func TestAliasesGetAlias(t *testing.T) { alias := Alias{} - err := aliasService.GetAlias(Alias{ + err := aliasService.Call("AliasesV1.GetAlias", &Alias{ Direction: "*out", Tenant: "cgrates.org", Category: "call", @@ -23,13 +23,13 @@ func TestAliasesGetAlias(t *testing.T) { if err != nil || len(alias.Values) != 2 || len(alias.Values[0].Pairs) != 2 { - t.Error("Error getting alias: ", err, alias) + t.Error("Error getting alias: ", err, alias, alias.Values[0]) } } func TestAliasesGetMatchingAlias(t *testing.T) { var response string - err := aliasService.GetMatchingAlias(AttrMatchingAlias{ + err := aliasService.Call("AliasesV1.GetMatchingAlias", &AttrMatchingAlias{ Direction: "*out", Tenant: "cgrates.org", Category: "call", diff --git a/engine/calldesc.go b/engine/calldesc.go index a83cb997d..ab12be895 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -28,8 +28,8 @@ import ( "time" "github.com/cgrates/cgrates/cache2go" - "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) const ( @@ -75,10 +75,10 @@ var ( cdrStorage CdrStorage debitPeriod = 10 * time.Second globalRoundingDecimals = 5 - historyScribe history.Scribe - pubSubServer PublisherSubscriber - userService UserService - aliasService AliasService + historyScribe rpcclient.RpcClientConnection + pubSubServer rpcclient.RpcClientConnection + userService rpcclient.RpcClientConnection + aliasService rpcclient.RpcClientConnection ) // Exported method to set the storage getter. @@ -110,26 +110,26 @@ func SetCdrStorage(cStorage CdrStorage) { } // Exported method to set the history scribe. -func SetHistoryScribe(scribe history.Scribe) { +func SetHistoryScribe(scribe rpcclient.RpcClientConnection) { historyScribe = scribe } -func SetPubSub(ps PublisherSubscriber) { +func SetPubSub(ps rpcclient.RpcClientConnection) { pubSubServer = ps } -func SetUserService(us UserService) { +func SetUserService(us rpcclient.RpcClientConnection) { userService = us } -func SetAliasService(as AliasService) { +func SetAliasService(as rpcclient.RpcClientConnection) { aliasService = as } func Publish(event CgrEvent) { if pubSubServer != nil { var s string - pubSubServer.Publish(event, &s) + pubSubServer.Call("PubSubV1.Publish", event, &s) } } @@ -820,7 +820,7 @@ func (cd *CallDescriptor) GetLCRFromStorage() (*LCR, error) { return nil, utils.ErrNotFound } -func (cd *CallDescriptor) GetLCR(stats StatsInterface, p *utils.Paginator) (*LCRCost, error) { +func (cd *CallDescriptor) GetLCR(stats rpcclient.RpcClientConnection, p *utils.Paginator) (*LCRCost, error) { cd.account = nil // make sure it's not cached lcr, err := cd.GetLCRFromStorage() if err != nil { @@ -951,7 +951,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface, p *utils.Paginator) (*LCR if lcrCost.Entry.Strategy == LCR_STRATEGY_LOAD { for _, qId := range cdrStatsQueueIds { sq := &StatsQueue{} - if err := stats.GetQueue(qId, sq); err == nil { + if err := stats.Call("CDRStatsV1.GetQueue", qId, sq); err == nil { if sq.conf.QueueLength == 0 { //only add qeues that don't have fixed length supplierQueues = append(supplierQueues, sq) } @@ -959,7 +959,7 @@ func (cd *CallDescriptor) GetLCR(stats StatsInterface, p *utils.Paginator) (*LCR } } else { statValues := make(map[string]float64) - if err := stats.GetValues(qId, &statValues); err != nil { + if err := stats.Call("CDRStatsV1.GetValues", qId, &statValues); err != nil { lcrCost.SupplierCosts = append(lcrCost.SupplierCosts, &LCRSupplierCost{ Supplier: fullSupplier, Error: fmt.Sprintf("Get stats values for queue id %s, error %s", qId, err.Error()), diff --git a/engine/cdrs.go b/engine/cdrs.go index 34b78110d..2d02a7fd1 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -66,7 +66,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { } } -func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, client rpcclient.RpcClientConnection, pubsub PublisherSubscriber, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) { +func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, client rpcclient.RpcClientConnection, pubsub rpcclient.RpcClientConnection, users rpcclient.RpcClientConnection, aliases rpcclient.RpcClientConnection, stats rpcclient.RpcClientConnection) (*CdrServer, error) { return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, client: client, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{queue: make(map[string]chan bool)}}, nil } @@ -74,10 +74,10 @@ type CdrServer struct { cgrCfg *config.CGRConfig cdrDb CdrStorage client rpcclient.RpcClientConnection - pubsub PublisherSubscriber - users UserService - aliases AliasService - stats StatsInterface + pubsub rpcclient.RpcClientConnection + users rpcclient.RpcClientConnection + aliases rpcclient.RpcClientConnection + stats rpcclient.RpcClientConnection guard *GuardianLock } @@ -255,7 +255,7 @@ func (self *CdrServer) rateStoreStatsReplicate(cdr *StoredCdr) error { } // Attach CDR to stats if self.stats != nil { // Send CDR to stats - if err := self.stats.AppendCDR(cdr, nil); err != nil { + if err := self.stats.Call("Stats.AppendCDR", cdr, nil); err != nil { utils.Logger.Err(fmt.Sprintf(" Could not append cdr to stats: %s", err.Error())) } } diff --git a/engine/pubsub.go b/engine/pubsub.go index 61e609020..59678d472 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -3,11 +3,12 @@ package engine import ( "errors" "fmt" + "reflect" + "strings" "sync" "time" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) type SubscribeInfo struct { @@ -28,13 +29,6 @@ func (ce CgrEvent) PassFilters(rsrFields utils.RSRFields) bool { return true } -type PublisherSubscriber interface { - Subscribe(SubscribeInfo, *string) error - Unsubscribe(SubscribeInfo, *string) error - Publish(CgrEvent, *string) error - ShowSubscribers(string, *map[string]*SubscriberData) error -} - type SubscriberData struct { ExpTime time.Time Filters utils.RSRFields @@ -165,28 +159,30 @@ func (ps *PubSub) ShowSubscribers(in string, out *map[string]*SubscriberData) er return nil } -type ProxyPubSub struct { - Client *rpcclient.RpcClient -} - -func NewProxyPubSub(addr string, attempts, reconnects int) (*ProxyPubSub, error) { - client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil) - if err != nil { - return nil, err +func (ps *PubSub) Call(serviceMethod string, args interface{}, reply interface{}) error { + parts := strings.Split(serviceMethod, ".") + if len(parts) != 2 { + return utils.ErrNotImplemented + } + // get method + method := reflect.ValueOf(ps).MethodByName(parts[1]) + if !method.IsValid() { + return utils.ErrNotImplemented } - return &ProxyPubSub{Client: client}, nil -} -func (ps *ProxyPubSub) Subscribe(si SubscribeInfo, reply *string) error { - return ps.Client.Call("PubSubV1.Subscribe", si, reply) -} -func (ps *ProxyPubSub) Unsubscribe(si SubscribeInfo, reply *string) error { - return ps.Client.Call("PubSubV1.Unsubscribe", si, reply) -} -func (ps *ProxyPubSub) Publish(evt CgrEvent, reply *string) error { - return ps.Client.Call("PubSubV1.Publish", evt, reply) -} + // construct the params + params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} -func (ps *ProxyPubSub) ShowSubscribers(in string, reply *map[string]*SubscriberData) error { - return ps.Client.Call("PubSubV1.ShowSubscribers", in, reply) + ret := method.Call(params) + if len(ret) != 1 { + return utils.ErrServerError + } + if ret[0].Interface() == nil { + return nil + } + err, ok := ret[0].Interface().(error) + if !ok { + return utils.ErrServerError + } + return err } diff --git a/engine/responder.go b/engine/responder.go index a54756683..76b9a9b9d 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -31,6 +31,7 @@ import ( "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) // Individual session run @@ -49,13 +50,13 @@ type Responder struct { Bal *balancer2go.Balancer ExitChan chan bool CdrSrv *CdrServer - Stats StatsInterface + Stats rpcclient.RpcClientConnection Timezone string cnt int64 responseCache *cache2go.ResponseCache } -func NewResponder(exitChan chan bool, cdrSrv *CdrServer, stats StatsInterface, timeToLive time.Duration) *Responder { +func NewResponder(exitChan chan bool, cdrSrv *CdrServer, stats rpcclient.RpcClientConnection, timeToLive time.Duration) *Responder { return &Responder{ ExitChan: exitChan, Stats: stats, @@ -615,12 +616,12 @@ func (rs *Responder) UnRegisterRater(clientAddress string, replay *int) error { } func (rs *Responder) Call(serviceMethod string, args interface{}, reply interface{}) error { - if !strings.HasPrefix(serviceMethod, "Responder.") { + parts := strings.Split(serviceMethod, ".") + if len(parts) != 2 { return utils.ErrNotImplemented } - methodName := strings.TrimLeft(serviceMethod, "Responder.") // get method - method := reflect.ValueOf(rs).MethodByName(methodName) + method := reflect.ValueOf(rs).MethodByName(parts[1]) if !method.IsValid() { return utils.ErrNotImplemented } @@ -632,6 +633,9 @@ func (rs *Responder) Call(serviceMethod string, args interface{}, reply interfac if len(ret) != 1 { return utils.ErrServerError } + if ret[0].Interface() == nil { + return nil + } err, ok := ret[0].Interface().(error) if !ok { return utils.ErrServerError diff --git a/engine/responder_test.go b/engine/responder_test.go index 80801bfad..74e87ccdd 100644 --- a/engine/responder_test.go +++ b/engine/responder_test.go @@ -282,7 +282,8 @@ func TestResponderGetLCR(t *testing.T) { } } danStatsId := "dan12_stats" - rsponder.Stats.AddQueue(&CdrStats{Id: danStatsId, Supplier: []string{"dan12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, nil) + var r int + rsponder.Stats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: danStatsId, Supplier: []string{"dan12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r) danRpfl := &RatingProfile{Id: "*out:tenant12:call:dan12", RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{ ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC), @@ -292,7 +293,7 @@ func TestResponderGetLCR(t *testing.T) { }}, } rifStatsId := "rif12_stats" - rsponder.Stats.AddQueue(&CdrStats{Id: rifStatsId, Supplier: []string{"rif12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, nil) + rsponder.Stats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: rifStatsId, Supplier: []string{"rif12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r) rifRpfl := &RatingProfile{Id: "*out:tenant12:call:rif12", RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{ ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC), @@ -302,7 +303,7 @@ func TestResponderGetLCR(t *testing.T) { }}, } ivoStatsId := "ivo12_stats" - rsponder.Stats.AddQueue(&CdrStats{Id: ivoStatsId, Supplier: []string{"ivo12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, nil) + rsponder.Stats.Call("CDRStatsV1.AddQueue", &CdrStats{Id: ivoStatsId, Supplier: []string{"ivo12"}, Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}, &r) ivoRpfl := &RatingProfile{Id: "*out:tenant12:call:ivo12", RatingPlanActivations: RatingPlanActivations{&RatingPlanActivation{ ActivationTime: time.Date(2015, 01, 01, 8, 0, 0, 0, time.UTC), @@ -483,9 +484,10 @@ func TestResponderGetLCR(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", eQTLcr.SupplierCosts, lcrQT.SupplierCosts) } cdr := &StoredCdr{Supplier: "rif12", AnswerTime: time.Now(), Usage: 3 * time.Minute, Cost: 1} - rsponder.Stats.AppendCDR(cdr, nil) + rsponder.Stats.Call("CDRStatsV1.AppendCDR", cdr, &r) cdr = &StoredCdr{Supplier: "dan12", AnswerTime: time.Now(), Usage: 5 * time.Minute, Cost: 2} - rsponder.Stats.AppendCDR(cdr, nil) + rsponder.Stats.Call("CDRStatsV1.AppendCDR", cdr, &r) + eQTLcr = &LCRCost{ Entry: &LCREntry{DestinationId: utils.ANY, RPCategory: "call", Strategy: LCR_STRATEGY_QOS_THRESHOLD, StrategyParams: "35;;;;4m;;;;;;;;;", Weight: 10.0}, SupplierCosts: []*LCRSupplierCost{ diff --git a/engine/stats.go b/engine/stats.go index 4c9599463..62ff95c48 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -20,25 +20,14 @@ package engine import ( "fmt" + "reflect" + "strings" "sync" "time" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) -type StatsInterface interface { - GetValues(string, *map[string]float64) error - GetQueueIds(int, *[]string) error - GetQueue(string, *StatsQueue) error - GetQueueTriggers(string, *ActionTriggers) error - AppendCDR(*StoredCdr, *int) error - AddQueue(*CdrStats, *int) error - ReloadQueues([]string, *int) error - ResetQueues([]string, *int) error - Stop(int, *int) error -} - type Stats struct { queues map[string]*StatsQueue queueSavers map[string]*queueSaver @@ -286,50 +275,30 @@ func (s *Stats) Stop(int, *int) error { return nil } -type ProxyStats struct { - Client *rpcclient.RpcClient -} - -func NewProxyStats(addr string, attempts, reconnects int) (*ProxyStats, error) { - client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil) - if err != nil { - return nil, err +func (s *Stats) Call(serviceMethod string, args interface{}, reply interface{}) error { + parts := strings.Split(serviceMethod, ".") + if len(parts) != 2 { + return utils.ErrNotImplemented + } + // get method + method := reflect.ValueOf(s).MethodByName(parts[1]) + if !method.IsValid() { + return utils.ErrNotImplemented } - return &ProxyStats{Client: client}, nil -} -func (ps *ProxyStats) GetValues(sqID string, values *map[string]float64) error { - return ps.Client.Call("Stats.GetValues", sqID, values) -} + // construct the params + params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} -func (ps *ProxyStats) AppendCDR(cdr *StoredCdr, out *int) error { - return ps.Client.Call("Stats.AppendCDR", cdr, out) -} - -func (ps *ProxyStats) GetQueueIds(in int, ids *[]string) error { - return ps.Client.Call("Stats.GetQueueIds", in, ids) -} - -func (ps *ProxyStats) GetQueue(id string, sq *StatsQueue) error { - return ps.Client.Call("Stats.GetQueue", id, sq) -} - -func (ps *ProxyStats) GetQueueTriggers(id string, ats *ActionTriggers) error { - return ps.Client.Call("Stats.GetQueueTriggers", id, ats) -} - -func (ps *ProxyStats) AddQueue(cs *CdrStats, out *int) error { - return ps.Client.Call("Stats.AddQueue", cs, out) -} - -func (ps *ProxyStats) ReloadQueues(ids []string, out *int) error { - return ps.Client.Call("Stats.ReloadQueues", ids, out) -} - -func (ps *ProxyStats) ResetQueues(ids []string, out *int) error { - return ps.Client.Call("Stats.ResetQueues", ids, out) -} - -func (ps *ProxyStats) Stop(i int, r *int) error { - return ps.Client.Call("Stats.Stop", 0, i) + ret := method.Call(params) + if len(ret) != 1 { + return utils.ErrServerError + } + if ret[0].Interface() == nil { + return nil + } + err, ok := ret[0].Interface().(error) + if !ok { + return utils.ErrServerError + } + return err } diff --git a/engine/storage_map.go b/engine/storage_map.go index adf785a69..f2f3dc133 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -298,7 +298,7 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) { ms.dict[utils.RATING_PLAN_PREFIX+rp.Id] = b.Bytes() response := 0 if historyScribe != nil { - go historyScribe.Record(rp.GetHistoryRecord(), &response) + go historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(), &response) } return } @@ -328,7 +328,7 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) { ms.dict[utils.RATING_PROFILE_PREFIX+rpf.Id] = result response := 0 if historyScribe != nil { - go historyScribe.Record(rpf.GetHistoryRecord(false), &response) + go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(false), &response) } return } @@ -341,7 +341,7 @@ func (ms *MapStorage) RemoveRatingProfile(key string) (err error) { response := 0 rpf := &RatingProfile{Id: key} if historyScribe != nil { - go historyScribe.Record(rpf.GetHistoryRecord(true), &response) + go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(true), &response) } } } @@ -406,7 +406,7 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) { ms.dict[utils.DESTINATION_PREFIX+dest.Id] = b.Bytes() response := 0 if historyScribe != nil { - go historyScribe.Record(dest.GetHistoryRecord(), &response) + go historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(), &response) } return } diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index aa171baa3..4f01bb942 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -612,7 +612,7 @@ func (ms *MongoStorage) SetRatingPlan(rp *RatingPlan) error { _, err := ms.db.C(colRpl).Upsert(bson.M{"id": rp.Id}, rp) if err == nil && historyScribe != nil { var response int - historyScribe.Record(rp.GetHistoryRecord(), &response) + historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(), &response) } return err } @@ -637,7 +637,7 @@ func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error { _, err := ms.db.C(colRpf).Upsert(bson.M{"id": rp.Id}, rp) if err == nil && historyScribe != nil { var response int - historyScribe.Record(rp.GetHistoryRecord(false), &response) + historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(false), &response) } return err } @@ -653,7 +653,7 @@ func (ms *MongoStorage) RemoveRatingProfile(key string) error { rpf := &RatingProfile{Id: result.Id} if historyScribe != nil { var response int - go historyScribe.Record(rpf.GetHistoryRecord(true), &response) + go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(true), &response) } } return iter.Close() @@ -705,7 +705,7 @@ func (ms *MongoStorage) SetDestination(dest *Destination) (err error) { _, err = ms.db.C(colDst).Upsert(bson.M{"id": dest.Id}, dest) if err == nil && historyScribe != nil { var response int - historyScribe.Record(dest.GetHistoryRecord(), &response) + historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(), &response) } return } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index fe05e1c86..fbeac4032 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -436,7 +436,7 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) { err = rs.db.Cmd("SET", utils.RATING_PLAN_PREFIX+rp.Id, b.Bytes()).Err if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(rp.GetHistoryRecord(), &response) + go historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(), &response) } return } @@ -465,7 +465,7 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) { err = rs.db.Cmd("SET", utils.RATING_PROFILE_PREFIX+rpf.Id, result).Err if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(rpf.GetHistoryRecord(false), &response) + go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(false), &response) } return } @@ -488,7 +488,7 @@ func (rs *RedisStorage) RemoveRatingProfile(key string) error { rpf := &RatingProfile{Id: key} if historyScribe != nil { response := 0 - go historyScribe.Record(rpf.GetHistoryRecord(true), &response) + go historyScribe.Call("HistoryV1.Record", rpf.GetHistoryRecord(true), &response) } } return nil @@ -556,7 +556,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { err = rs.db.Cmd("SET", utils.DESTINATION_PREFIX+dest.Id, b.Bytes()).Err if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(dest.GetHistoryRecord(), &response) + go historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(), &response) } return } diff --git a/engine/users.go b/engine/users.go index 69dcc2ef4..5149b7c64 100644 --- a/engine/users.go +++ b/engine/users.go @@ -2,12 +2,12 @@ package engine import ( "fmt" + "reflect" "sort" "strings" "sync" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) type UserProfile struct { @@ -52,16 +52,6 @@ func (ud *UserProfile) SetId(id string) error { return nil } -type UserService interface { - SetUser(UserProfile, *string) error - RemoveUser(UserProfile, *string) error - UpdateUser(UserProfile, *string) error - GetUsers(UserProfile, *UserProfiles) error - AddIndex([]string, *string) error - GetIndexes(string, *map[string][]string) error - ReloadUsers(string, *string) error -} - type prop struct { masked bool weight float64 @@ -139,21 +129,21 @@ func (um *UserMap) ReloadUsers(in string, reply *string) error { return nil } -func (um *UserMap) SetUser(up UserProfile, reply *string) error { +func (um *UserMap) SetUser(up *UserProfile, reply *string) error { um.mu.Lock() defer um.mu.Unlock() - if err := um.accountingDb.SetUser(&up); err != nil { + if err := um.accountingDb.SetUser(up); err != nil { *reply = err.Error() return err } um.table[up.GetId()] = up.Profile um.properties[up.GetId()] = &prop{weight: up.Weight, masked: up.Masked} - um.addIndex(&up, um.indexKeys) + um.addIndex(up, um.indexKeys) *reply = utils.OK return nil } -func (um *UserMap) RemoveUser(up UserProfile, reply *string) error { +func (um *UserMap) RemoveUser(up *UserProfile, reply *string) error { um.mu.Lock() defer um.mu.Unlock() if err := um.accountingDb.RemoveUser(up.GetId()); err != nil { @@ -162,12 +152,12 @@ func (um *UserMap) RemoveUser(up UserProfile, reply *string) error { } delete(um.table, up.GetId()) delete(um.properties, up.GetId()) - um.deleteIndex(&up) + um.deleteIndex(up) *reply = utils.OK return nil } -func (um *UserMap) UpdateUser(up UserProfile, reply *string) error { +func (um *UserMap) UpdateUser(up *UserProfile, reply *string) error { um.mu.Lock() defer um.mu.Unlock() m, found := um.table[up.GetId()] @@ -212,7 +202,7 @@ func (um *UserMap) UpdateUser(up UserProfile, reply *string) error { return nil } -func (um *UserMap) GetUsers(up UserProfile, results *UserProfiles) error { +func (um *UserMap) GetUsers(up *UserProfile, results *UserProfiles) error { um.mu.RLock() defer um.mu.RUnlock() table := um.table // no index @@ -402,44 +392,32 @@ func (um *UserMap) GetIndexes(in string, reply *map[string][]string) error { return nil } -type ProxyUserService struct { - Client *rpcclient.RpcClient -} - -func NewProxyUserService(addr string, attempts, reconnects int) (*ProxyUserService, error) { - client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil) - if err != nil { - return nil, err +func (um *UserMap) Call(serviceMethod string, args interface{}, reply interface{}) error { + parts := strings.Split(serviceMethod, ".") + if len(parts) != 2 { + return utils.ErrNotImplemented + } + // get method + method := reflect.ValueOf(um).MethodByName(parts[1]) + if !method.IsValid() { + return utils.ErrNotImplemented } - return &ProxyUserService{Client: client}, nil -} -func (ps *ProxyUserService) SetUser(ud UserProfile, reply *string) error { - return ps.Client.Call("UsersV1.SetUser", ud, reply) -} + // construct the params + params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} -func (ps *ProxyUserService) RemoveUser(ud UserProfile, reply *string) error { - return ps.Client.Call("UsersV1.RemoveUser", ud, reply) -} - -func (ps *ProxyUserService) UpdateUser(ud UserProfile, reply *string) error { - return ps.Client.Call("UsersV1.UpdateUser", ud, reply) -} - -func (ps *ProxyUserService) GetUsers(ud UserProfile, users *UserProfiles) error { - return ps.Client.Call("UsersV1.GetUsers", ud, users) -} - -func (ps *ProxyUserService) AddIndex(indexes []string, reply *string) error { - return ps.Client.Call("UsersV1.AddIndex", indexes, reply) -} - -func (ps *ProxyUserService) GetIndexes(in string, reply *map[string][]string) error { - return ps.Client.Call("UsersV1.AddIndex", in, reply) -} - -func (ps *ProxyUserService) ReloadUsers(in string, reply *string) error { - return ps.Client.Call("UsersV1.ReloadUsers", in, reply) + ret := method.Call(params) + if len(ret) != 1 { + return utils.ErrServerError + } + if ret[0].Interface() == nil { + return nil + } + err, ok := ret[0].Interface().(error) + if !ok { + return utils.ErrServerError + } + return err } // extraFields - Field name in the interface containing extraFields information @@ -484,7 +462,7 @@ func LoadUserProfile(in interface{}, extraFields string) error { } } ups := UserProfiles{} - if err := userService.GetUsers(*up, &ups); err != nil { + if err := userService.Call("UsersV1.GetUsers", up, &ups); err != nil { return err } if len(ups) > 0 { diff --git a/engine/users_test.go b/engine/users_test.go index 0862d96fe..a9e640351 100644 --- a/engine/users_test.go +++ b/engine/users_test.go @@ -36,7 +36,7 @@ var testMap2 = UserMap{ func TestUsersAdd(t *testing.T) { tm := newUserMap(accountingStorage, nil) var r string - up := UserProfile{ + up := &UserProfile{ Tenant: "test", UserName: "user", Profile: map[string]string{ @@ -57,7 +57,7 @@ func TestUsersAdd(t *testing.T) { func TestUsersUpdate(t *testing.T) { tm := newUserMap(accountingStorage, nil) var r string - up := UserProfile{ + up := &UserProfile{ Tenant: "test", UserName: "user", Profile: map[string]string{ @@ -88,7 +88,7 @@ func TestUsersUpdate(t *testing.T) { func TestUsersUpdateNotFound(t *testing.T) { tm := newUserMap(accountingStorage, nil) var r string - up := UserProfile{ + up := &UserProfile{ Tenant: "test", UserName: "user", Profile: map[string]string{ @@ -106,12 +106,12 @@ func TestUsersUpdateNotFound(t *testing.T) { func TestUsersUpdateInit(t *testing.T) { tm := newUserMap(accountingStorage, nil) var r string - up := UserProfile{ + up := &UserProfile{ Tenant: "test", UserName: "user", } tm.SetUser(up, &r) - up = UserProfile{ + up = &UserProfile{ Tenant: "test", UserName: "user", Profile: map[string]string{ @@ -132,7 +132,7 @@ func TestUsersUpdateInit(t *testing.T) { func TestUsersRemove(t *testing.T) { tm := newUserMap(accountingStorage, nil) var r string - up := UserProfile{ + up := &UserProfile{ Tenant: "test", UserName: "user", Profile: map[string]string{ @@ -158,7 +158,7 @@ func TestUsersRemove(t *testing.T) { } func TestUsersGetFull(t *testing.T) { - up := UserProfile{ + up := &UserProfile{ Tenant: "test", UserName: "user", Profile: map[string]string{ @@ -173,7 +173,7 @@ func TestUsersGetFull(t *testing.T) { } func TestUsersGetFullMasked(t *testing.T) { - up := UserProfile{ + up := &UserProfile{ Tenant: "test", } results := UserProfiles{} @@ -184,7 +184,7 @@ func TestUsersGetFullMasked(t *testing.T) { } func TestUsersGetFullUnMasked(t *testing.T) { - up := UserProfile{ + up := &UserProfile{ Tenant: "test", Masked: true, } @@ -199,7 +199,7 @@ func TestUsersGetFullUnMasked(t *testing.T) { } func TestUsersGetTenant(t *testing.T) { - up := UserProfile{ + up := &UserProfile{ Tenant: "testX", UserName: "user", Profile: map[string]string{ @@ -214,7 +214,7 @@ func TestUsersGetTenant(t *testing.T) { } func TestUsersGetUserName(t *testing.T) { - up := UserProfile{ + up := &UserProfile{ Tenant: "test", UserName: "userX", Profile: map[string]string{ @@ -229,7 +229,7 @@ func TestUsersGetUserName(t *testing.T) { } func TestUsersGetNotFoundProfile(t *testing.T) { - up := UserProfile{ + up := &UserProfile{ Tenant: "test", UserName: "user", Profile: map[string]string{ @@ -244,7 +244,7 @@ func TestUsersGetNotFoundProfile(t *testing.T) { } func TestUsersGetMissingTenant(t *testing.T) { - up := UserProfile{ + up := &UserProfile{ UserName: "user", Profile: map[string]string{ "t": "v", @@ -258,7 +258,7 @@ func TestUsersGetMissingTenant(t *testing.T) { } func TestUsersGetMissingUserName(t *testing.T) { - up := UserProfile{ + up := &UserProfile{ Tenant: "test", Profile: map[string]string{ "t": "v", @@ -272,7 +272,7 @@ func TestUsersGetMissingUserName(t *testing.T) { } func TestUsersGetMissingId(t *testing.T) { - up := UserProfile{ + up := &UserProfile{ Profile: map[string]string{ "t": "v", }, @@ -285,7 +285,7 @@ func TestUsersGetMissingId(t *testing.T) { } func TestUsersGetMissingIdTwo(t *testing.T) { - up := UserProfile{ + up := &UserProfile{ Profile: map[string]string{ "t": "v", "x": "y", @@ -299,7 +299,7 @@ func TestUsersGetMissingIdTwo(t *testing.T) { } func TestUsersGetMissingIdTwoSort(t *testing.T) { - up := UserProfile{ + up := &UserProfile{ Profile: map[string]string{ "t": "v", "x": "y", @@ -316,7 +316,7 @@ func TestUsersGetMissingIdTwoSort(t *testing.T) { } func TestUsersGetMissingIdTwoSortWeight(t *testing.T) { - up := UserProfile{ + up := &UserProfile{ Profile: map[string]string{ "a": "b", "c": "d", @@ -367,7 +367,7 @@ func TestUsersGetFullindex(t *testing.T) { var r string testMap.index = make(map[string]map[string]bool) // reset index testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r) - up := UserProfile{ + up := &UserProfile{ Tenant: "test", UserName: "user", Profile: map[string]string{ @@ -385,7 +385,7 @@ func TestUsersGetTenantindex(t *testing.T) { var r string testMap.index = make(map[string]map[string]bool) // reset index testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r) - up := UserProfile{ + up := &UserProfile{ Tenant: "testX", UserName: "user", Profile: map[string]string{ @@ -403,7 +403,7 @@ func TestUsersGetUserNameindex(t *testing.T) { var r string testMap.index = make(map[string]map[string]bool) // reset index testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r) - up := UserProfile{ + up := &UserProfile{ Tenant: "test", UserName: "userX", Profile: map[string]string{ @@ -421,7 +421,7 @@ func TestUsersGetNotFoundProfileindex(t *testing.T) { var r string testMap.index = make(map[string]map[string]bool) // reset index testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r) - up := UserProfile{ + up := &UserProfile{ Tenant: "test", UserName: "user", Profile: map[string]string{ @@ -439,7 +439,7 @@ func TestUsersGetMissingTenantindex(t *testing.T) { var r string testMap.index = make(map[string]map[string]bool) // reset index testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r) - up := UserProfile{ + up := &UserProfile{ UserName: "user", Profile: map[string]string{ "t": "v", @@ -456,7 +456,7 @@ func TestUsersGetMissingUserNameindex(t *testing.T) { var r string testMap.index = make(map[string]map[string]bool) // reset index testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r) - up := UserProfile{ + up := &UserProfile{ Tenant: "test", Profile: map[string]string{ "t": "v", @@ -473,7 +473,7 @@ func TestUsersGetMissingIdindex(t *testing.T) { var r string testMap.index = make(map[string]map[string]bool) // reset index testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r) - up := UserProfile{ + up := &UserProfile{ Profile: map[string]string{ "t": "v", }, @@ -489,7 +489,7 @@ func TestUsersGetMissingIdTwoINdex(t *testing.T) { var r string testMap.index = make(map[string]map[string]bool) // reset index testMap.AddIndex([]string{"t", "x", "UserName", "Tenant"}, &r) - up := UserProfile{ + up := &UserProfile{ Profile: map[string]string{ "t": "v", "x": "y", @@ -509,7 +509,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) { if len(tm.index) != 0 { t.Error("error adding indexes: ", tm.index) } - tm.SetUser(UserProfile{ + tm.SetUser(&UserProfile{ Tenant: "test", UserName: "user", Profile: map[string]string{ @@ -519,7 +519,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) { if len(tm.index) != 1 || !tm.index["t:v"]["test:user"] { t.Error("error adding indexes: ", tm.index) } - tm.SetUser(UserProfile{ + tm.SetUser(&UserProfile{ Tenant: "test", UserName: "best", Profile: map[string]string{ @@ -531,7 +531,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) { !tm.index["t:v"]["test:best"] { t.Error("error adding indexes: ", tm.index) } - tm.UpdateUser(UserProfile{ + tm.UpdateUser(&UserProfile{ Tenant: "test", UserName: "best", Profile: map[string]string{ @@ -543,7 +543,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) { !tm.index["t:v1"]["test:best"] { t.Error("error adding indexes: ", tm.index) } - tm.UpdateUser(UserProfile{ + tm.UpdateUser(&UserProfile{ Tenant: "test", UserName: "best", Profile: map[string]string{ @@ -555,7 +555,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) { !tm.index["t:v"]["test:best"] { t.Error("error adding indexes: ", tm.index) } - tm.RemoveUser(UserProfile{ + tm.RemoveUser(&UserProfile{ Tenant: "test", UserName: "best", Profile: map[string]string{ @@ -567,7 +567,7 @@ func TestUsersAddUpdateRemoveIndexes(t *testing.T) { tm.index["t:v"]["test:best"] { t.Error("error adding indexes: ", tm.index) } - tm.RemoveUser(UserProfile{ + tm.RemoveUser(&UserProfile{ Tenant: "test", UserName: "user", Profile: map[string]string{ diff --git a/history/file_scribe.go b/history/file_scribe.go index 8d023ba09..e6b3d31c7 100644 --- a/history/file_scribe.go +++ b/history/file_scribe.go @@ -28,8 +28,12 @@ import ( "os" "os/exec" "path/filepath" + "reflect" + "strings" "sync" "time" + + "github.com/cgrates/cgrates/utils" ) type FileScribe struct { @@ -170,3 +174,31 @@ func (s *FileScribe) save(filename string) error { f.Close() return s.gitCommit() } + +func (s *FileScribe) Call(serviceMethod string, args interface{}, reply interface{}) error { + parts := strings.Split(serviceMethod, ".") + if len(parts) != 2 { + return utils.ErrNotImplemented + } + // get method + method := reflect.ValueOf(s).MethodByName(parts[1]) + if !method.IsValid() { + return utils.ErrNotImplemented + } + + // construct the params + params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} + + ret := method.Call(params) + if len(ret) != 1 { + return utils.ErrServerError + } + if ret[0].Interface() == nil { + return nil + } + err, ok := ret[0].Interface().(error) + if !ok { + return utils.ErrServerError + } + return err +} diff --git a/history/mock_scribe.go b/history/mock_scribe.go index 1192f9cb4..a529bef51 100644 --- a/history/mock_scribe.go +++ b/history/mock_scribe.go @@ -21,7 +21,11 @@ package history import ( "bufio" "bytes" + "reflect" + "strings" "sync" + + "github.com/cgrates/cgrates/utils" ) type MockScribe struct { @@ -64,3 +68,31 @@ func (s *MockScribe) GetBuffer(fn string) *bytes.Buffer { defer s.mu.Unlock() return s.BufMap[fn] } + +func (s *MockScribe) Call(serviceMethod string, args interface{}, reply interface{}) error { + parts := strings.Split(serviceMethod, ".") + if len(parts) != 2 { + return utils.ErrNotImplemented + } + // get method + method := reflect.ValueOf(s).MethodByName(parts[1]) + if !method.IsValid() { + return utils.ErrNotImplemented + } + + // construct the params + params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} + + ret := method.Call(params) + if len(ret) != 1 { + return utils.ErrServerError + } + if ret[0].Interface() == nil { + return nil + } + err, ok := ret[0].Interface().(error) + if !ok { + return utils.ErrServerError + } + return err +} diff --git a/history/proxy_scribe.go b/history/proxy_scribe.go deleted file mode 100644 index 3b00ebc0a..000000000 --- a/history/proxy_scribe.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012-2015 ITsysCOM - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package history - -import ( - "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" -) - -type ProxyScribe struct { - Client *rpcclient.RpcClient -} - -func NewProxyScribe(addr string, attempts, reconnects int) (*ProxyScribe, error) { - client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil) - if err != nil { - return nil, err - } - return &ProxyScribe{Client: client}, nil -} - -func (ps *ProxyScribe) Record(rec Record, out *int) error { - return ps.Client.Call("Scribe.Record", rec, out) -} diff --git a/history/scribe.go b/history/scribe.go index 819bada44..2a6a1f3ba 100644 --- a/history/scribe.go +++ b/history/scribe.go @@ -30,10 +30,6 @@ const ( RATING_PROFILES_FN = "rating_profiles.json" ) -type Scribe interface { - Record(Record, *int) error -} - type Record struct { Id string Filename string