cmd/cgr-engine using chans instead of sync.Group to synchronize dependencies

This commit is contained in:
DanB
2015-09-04 12:07:45 +02:00
parent 7f0925c4e0
commit 84313bc87d
5 changed files with 64 additions and 150 deletions

View File

@@ -18,94 +18,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
/*
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
"errors"
type AttrGetTPAlias struct {
TPid string
Direction string
Tenant string
Category string
Account string
Subject string
Group string
}
// Creates a new alias within a tariff plan
func (self *ApierV1) SetTPAlias(attrs AttrGetTPAlias, reply *string) error {
if missing := utils.MissingStructFields(&attrs, []string{"TPid", "Direction", "Tenant", "Category", "Account", "Subject", "Group"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
tm := engine.APItoModelTiming(&attrs)
if err := self.StorDb.SetTpAliases([]engine.TpAlias{*tm}); err != nil {
return utils.NewErrServerError(err)
}
*reply = "OK"
return nil
}
type AttrGetTPAlias struct {
TPid string // Tariff plan id
AliasId string // Alias id
}
// Queries specific Alias on Tariff plan
func (self *ApierV1) GetTPAlias(attrs AttrGetTPAlias, reply *utils.ApierTPAlias) error {
if missing := utils.MissingStructFields(&attrs, []string{"TPid", "AliasId"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
if tms, err := self.StorDb.GetTpAliases(attrs.TPid, attrs.AliasId); err != nil {
return utils.NewErrServerError(err)
} else if len(tms) == 0 {
return utils.ErrNotFound
} else {
tmMap, err := engine.TpAliases(tms).GetApierAliases()
if err != nil {
return err
}
*reply = *tmMap[attrs.AliasId]
}
return nil
}
type AttrGetTPAliasIds struct {
TPid string // Tariff plan id
utils.Paginator
}
// Queries alias identities on specific tariff plan.
func (self *ApierV1) GetTPAliasIds(attrs AttrGetTPAliasIds, reply *[]string) error {
if missing := utils.MissingStructFields(&attrs, []string{"TPid"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
if ids, err := self.StorDb.GetTpTableIds(attrs.TPid, utils.TBL_TP_TIMINGS, utils.TPDistinctIds{"tag"}, nil, &attrs.Paginator); err != nil {
return utils.NewErrServerError(err)
} else if ids == nil {
return utils.ErrNotFound
} else {
*reply = ids
}
return nil
}
// Removes specific Alias on Tariff plan
func (self *ApierV1) RemTPAlias(attrs AttrGetTPAlias, reply *string) error {
if missing := utils.MissingStructFields(&attrs, []string{"TPid", "AliasId"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
if err := self.StorDb.RemTpData(utils.TBL_TP_TIMINGS, attrs.TPid, attrs.AliasId); err != nil {
return utils.NewErrServerError(err)
} else {
*reply = "OK"
}
return nil
}
*/
/*
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -122,40 +37,30 @@ type AttrAddAccountAliases struct {
// Retrieve aliases configured for a rating profile subject
func (self *ApierV1) AddRatingSubjectAliases(attrs AttrAddRatingSubjectAliases, reply *string) error {
if engine.GetAliasService() == nil {
return errors.New("ALIASES_NOT_ENABLED")
}
if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Subject", "Aliases"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
aliasesChanged := []string{}
for _, alias := range attrs.Aliases {
if err := self.RatingDb.SetRpAlias(utils.RatingSubjectAliasKey(attrs.Tenant, alias), attrs.Subject); err != nil {
var ignr string
if err := engine.GetAliasService().SetAlias(
engine.Alias{Direction: utils.META_OUT, Tenant: attrs.Tenant, Account: utils.META_ANY, Subject: attrs.Subject, Group: utils.ALIAS_GROUP_RP,
Values: engine.AliasValues{&engine.AliasValue{DestinationId: utils.META_ANY, Alias: alias, Weight: 10.0}}}, &ignr); err != nil {
return utils.NewErrServerError(err)
}
aliasesChanged = append(aliasesChanged, utils.RP_ALIAS_PREFIX+utils.RatingSubjectAliasKey(attrs.Tenant, alias))
}
if err := self.RatingDb.CachePrefixValues(map[string][]string{utils.RP_ALIAS_PREFIX: aliasesChanged}); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
return nil
}
// Retrieve aliases configured for a rating profile subject
func (self *ApierV1) GetRatingSubjectAliases(attrs engine.TenantRatingSubject, reply *[]string) error {
if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Subject"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
if aliases, err := self.RatingDb.GetRPAliases(attrs.Tenant, attrs.Subject, false); err != nil {
return utils.NewErrServerError(err)
} else if len(aliases) == 0 { // Need it since otherwise we get some unexpected errrors in the client
return utils.ErrNotFound
} else {
*reply = aliases
}
return nil
}
/*
// Retrieve aliases configured for a rating profile subject
func (self *ApierV1) RemRatingSubjectAliases(tenantRatingSubject engine.TenantRatingSubject, reply *string) error {
if engine.GetAliasService() == nil {
return errors.New("ALIASES_NOT_ENABLED")
}
if missing := utils.MissingStructFields(&tenantRatingSubject, []string{"Tenant", "Subject"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
@@ -170,42 +75,33 @@ func (self *ApierV1) RemRatingSubjectAliases(tenantRatingSubject engine.TenantRa
*reply = utils.OK
return nil
}
*/
func (self *ApierV1) AddAccountAliases(attrs AttrAddAccountAliases, reply *string) error {
if engine.GetAliasService() == nil {
return errors.New("ALIASES_NOT_ENABLED")
}
if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account", "Aliases"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
aliasesChanged := []string{}
for _, alias := range attrs.Aliases {
if err := self.RatingDb.SetAccAlias(utils.AccountAliasKey(attrs.Tenant, alias), attrs.Account); err != nil {
var ignr string
if err := engine.GetAliasService().SetAlias(
engine.Alias{Direction: utils.META_OUT, Tenant: attrs.Tenant, Account: attrs.Account, Subject: utils.META_ANY, Group: utils.ALIAS_GROUP_ACC,
Values: engine.AliasValues{&engine.AliasValue{DestinationId: utils.META_ANY, Alias: alias, Weight: 10.0}}}, &ignr); err != nil {
return utils.NewErrServerError(err)
}
aliasesChanged = append(aliasesChanged, utils.ACC_ALIAS_PREFIX+utils.AccountAliasKey(attrs.Tenant, alias))
}
if err := self.RatingDb.CachePrefixValues(map[string][]string{utils.ACC_ALIAS_PREFIX: aliasesChanged}); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
return nil
}
// Retrieve aliases configured for an account
func (self *ApierV1) GetAccountAliases(attrs engine.TenantAccount, reply *[]string) error {
if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "Account"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
if aliases, err := self.RatingDb.GetAccountAliases(attrs.Tenant, attrs.Account, false); err != nil {
return utils.NewErrServerError(err)
} else if len(aliases) == 0 {
return utils.ErrNotFound
} else {
*reply = aliases
}
return nil
}
/*
// Retrieve aliases configured for a rating profile subject
func (self *ApierV1) RemAccountAliases(tenantAccount engine.TenantAccount, reply *string) error {
if engine.GetAliasService() == nil {
return errors.New("ALIASES_NOT_ENABLED")
}
if missing := utils.MissingStructFields(&tenantAccount, []string{"Tenant", "Account"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}

View File

@@ -20,7 +20,6 @@ package main
import (
"fmt"
"sync"
"github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/apier/v2"
@@ -45,12 +44,13 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
server *engine.Server,
ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, logDb engine.LogStorage,
stopHandled *bool, exitChan chan bool) {
var wg sync.WaitGroup // Sync all external connections in a group
waitTasks := make([]chan struct{}, 0)
// Cache data
wg.Add(1)
//Cache load
cacheTaskChan := make(chan struct{})
waitTasks = append(waitTasks, cacheTaskChan)
go func() {
defer wg.Done()
defer close(cacheTaskChan)
if err := ratingDb.CacheRatingAll(); err != nil {
engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error()))
exitChan <- true
@@ -61,25 +61,29 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
exitChan <- true
return
}
}()
// Retrieve scheduler for it's API methods
var sched *scheduler.Scheduler // Need the scheduler in APIer
if cfg.SchedulerEnabled {
wg.Add(1)
schedTaskChan := make(chan struct{})
waitTasks = append(waitTasks, schedTaskChan)
go func() {
defer wg.Done()
defer close(schedTaskChan)
sched = <-internalSchedulerChan
internalSchedulerChan <- sched
}()
}
// Connection to balancer
var bal *balancer2go.Balancer
if cfg.RaterBalancer != "" {
wg.Add(1)
balTaskChan := make(chan struct{})
waitTasks = append(waitTasks, balTaskChan)
go func() {
defer wg.Done()
defer close(balTaskChan)
if cfg.RaterBalancer == utils.INTERNAL {
bal = <-internalBalancerChan
internalBalancerChan <- bal // Put it back if someone else is interested about
@@ -94,9 +98,10 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
// Connection to CDRStats
var cdrStats engine.StatsInterface
if cfg.RaterCdrStats != "" {
wg.Add(1)
cdrstatTaskChan := make(chan struct{})
waitTasks = append(waitTasks, cdrstatTaskChan)
go func() {
defer wg.Done()
defer close(cdrstatTaskChan)
if cfg.RaterCdrStats == utils.INTERNAL {
cdrStats = <-internalCdrStatSChan
internalCdrStatSChan <- cdrStats
@@ -110,9 +115,10 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
// Connection to HistoryS
if cfg.RaterHistoryServer != "" {
wg.Add(1)
histTaskChan := make(chan struct{})
waitTasks = append(waitTasks, histTaskChan)
go func() {
defer wg.Done()
defer close(histTaskChan)
var scribeServer history.Scribe
if cfg.RaterHistoryServer == utils.INTERNAL {
scribeServer = <-internalHistorySChan
@@ -128,9 +134,10 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
// Connection to pubsubs
if cfg.RaterPubSubServer != "" {
wg.Add(1)
pubsubTaskChan := make(chan struct{})
waitTasks = append(waitTasks, pubsubTaskChan)
go func() {
defer wg.Done()
defer close(pubsubTaskChan)
var pubSubServer engine.PublisherSubscriber
if cfg.RaterPubSubServer == utils.INTERNAL {
pubSubServer = <-internalPubSubSChan
@@ -146,9 +153,10 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
// Connection to AliasService
if cfg.RaterAliasesServer != "" {
wg.Add(1)
aliasesTaskChan := make(chan struct{})
waitTasks = append(waitTasks, aliasesTaskChan)
go func() {
defer wg.Done()
defer close(aliasesTaskChan)
var aliasesServer engine.AliasService
if cfg.RaterAliasesServer == utils.INTERNAL {
aliasesServer = <-internalAliaseSChan
@@ -165,9 +173,10 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
// Connection to UserService
var userServer engine.UserService
if cfg.RaterUserServer != "" {
wg.Add(1)
usersTaskChan := make(chan struct{})
waitTasks = append(waitTasks, usersTaskChan)
go func() {
defer wg.Done()
defer close(usersTaskChan)
if cfg.RaterUserServer == utils.INTERNAL {
userServer = <-internalUserSChan
internalUserSChan <- userServer
@@ -181,7 +190,9 @@ func startRater(internalRaterChan chan *engine.Responder, internalBalancerChan c
}
// Wait for all connections to complete before going further
wg.Wait()
for _, chn := range waitTasks {
<-chn
}
responder := &engine.Responder{Bal: bal, ExitChan: exitChan, Stats: cdrStats}
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Sched: sched,

View File

@@ -13,8 +13,8 @@
"rater": {
"enabled": true, // enable Rater service: <true|false>
"cdrstats": "internal", // address where to reach the cdrstats service, empty to disable stats functionality<""|internal|x.y.z.y:1234>
"pubsubs": "internal", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234>
"users": "internal", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234>
"pubsubs": "internal", // address where to reach the pubusb service, empty to disable pubsub functionality: <""|internal|x.y.z.y:1234>
"users": "internal", // address where to reach the user service, empty to disable user profile functionality: <""|internal|x.y.z.y:1234>
},
"scheduler": {

View File

@@ -10,6 +10,11 @@ import (
"github.com/cgrates/rpcclient"
)
// Temporary export AliasService for the ApierV1 to be able to emulate old APIs
func GetAliasService() AliasService {
return aliasService
}
type Alias struct {
Direction string
Tenant string

View File

@@ -155,6 +155,8 @@ const (
NANOSECONDS = "nanoseconds"
SECONDS = "seconds"
OUT = "*out"
META_OUT = "*out"
META_ANY = "*any"
CDR_IMPORT = "cdr_import"
CDR_EXPORT = "cdr_export"
CDRFIELD = "cdrfield"