Added StorDB config reload

This commit is contained in:
Trial97
2019-11-08 15:51:30 +02:00
committed by Dan Christian Bogos
parent 8e32e73e0f
commit b110537b54
13 changed files with 341 additions and 117 deletions

View File

@@ -42,9 +42,9 @@ type SchedulerGeter interface {
}
type ApierV1 struct {
StorDb engine.LoadStorage
DataManager *engine.DataManager
StorDb engine.LoadStorage // we should consider keeping only one of StorDB type
CdrDb engine.CdrStorage
DataManager *engine.DataManager
Config *config.CGRConfig
Responder *engine.Responder
CDRs rpcclient.RpcClientConnection // FixMe: populate it from cgr-engine
@@ -96,12 +96,12 @@ func (apiv1 *ApierV1) RemoveDestination(attr AttrRemoveDestination, reply *strin
}
// GetReverseDestination retrieves revese destination list for a prefix
func (apierv1 *ApierV1) GetReverseDestination(prefix string, reply *[]string) (err error) {
func (apiv1 *ApierV1) GetReverseDestination(prefix string, reply *[]string) (err error) {
if prefix == "" {
return utils.NewErrMandatoryIeMissing("prefix")
}
var revLst []string
if revLst, err = apierv1.DataManager.DataDB().GetReverseDestination(prefix, false, utils.NonTransactional); err != nil {
if revLst, err = apiv1.DataManager.DataDB().GetReverseDestination(prefix, false, utils.NonTransactional); err != nil {
return
}
*reply = revLst
@@ -109,8 +109,8 @@ func (apierv1 *ApierV1) GetReverseDestination(prefix string, reply *[]string) (e
}
// ComputeReverseDestinations will rebuild complete reverse destinations data
func (apierv1 *ApierV1) ComputeReverseDestinations(ignr string, reply *string) (err error) {
if err = apierv1.DataManager.DataDB().RebuildReverseForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil {
func (apiv1 *ApierV1) ComputeReverseDestinations(ignr string, reply *string) (err error) {
if err = apiv1.DataManager.DataDB().RebuildReverseForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil {
return
}
*reply = utils.OK
@@ -118,16 +118,16 @@ func (apierv1 *ApierV1) ComputeReverseDestinations(ignr string, reply *string) (
}
// ComputeAccountActionPlans will rebuild complete reverse accountActions data
func (apierv1 *ApierV1) ComputeAccountActionPlans(ignr string, reply *string) (err error) {
if err = apierv1.DataManager.DataDB().RebuildReverseForPrefix(utils.AccountActionPlansPrefix); err != nil {
func (apiv1 *ApierV1) ComputeAccountActionPlans(ignr string, reply *string) (err error) {
if err = apiv1.DataManager.DataDB().RebuildReverseForPrefix(utils.AccountActionPlansPrefix); err != nil {
return
}
*reply = utils.OK
return
}
func (apierv1 *ApierV1) GetSharedGroup(sgId string, reply *engine.SharedGroup) error {
if sg, err := apierv1.DataManager.GetSharedGroup(sgId, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { // Not found is not an error here
func (apiv1 *ApierV1) GetSharedGroup(sgId string, reply *engine.SharedGroup) error {
if sg, err := apiv1.DataManager.GetSharedGroup(sgId, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { // Not found is not an error here
return err
} else {
if sg != nil {
@@ -436,12 +436,12 @@ func (apiv1 *ApierV1) SetRatingProfile(attrs utils.AttrSetRatingProfile, reply *
}
// GetRatingProfileIDs returns list of resourceProfile IDs registered for a tenant
func (apierV1 *ApierV1) GetRatingProfileIDs(args utils.TenantArgWithPaginator, rsPrfIDs *[]string) error {
func (apiv1 *ApierV1) GetRatingProfileIDs(args utils.TenantArgWithPaginator, rsPrfIDs *[]string) error {
if missing := utils.MissingStructFields(&args, []string{utils.Tenant}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
prfx := utils.RATING_PROFILE_PREFIX + "*out:" + args.Tenant + ":"
keys, err := apierV1.DataManager.DataDB().GetKeysForPrefix(prfx)
keys, err := apiv1.DataManager.DataDB().GetKeysForPrefix(prfx)
if err != nil {
return err
}
@@ -1132,8 +1132,8 @@ type ArgsReplyFailedPosts struct {
}
// ReplayFailedPosts will repost failed requests found in the FailedRequestsInDir
func (apierv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (err error) {
failedReqsInDir := apierv1.Config.GeneralCfg().FailedPostsDir
func (apiv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (err error) {
failedReqsInDir := apiv1.Config.GeneralCfg().FailedPostsDir
if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != "" {
failedReqsInDir = *args.FailedRequestsInDir
}
@@ -1172,7 +1172,7 @@ func (apierv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *stri
return 0, err
}
return 0, os.Remove(filePath)
}, apierv1.Config.GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath)
}, apiv1.Config.GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath)
if err != nil {
return utils.NewErrServerError(err)
}
@@ -1182,26 +1182,26 @@ func (apierv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *stri
}
switch ffn.Transport {
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
_, err = engine.NewHTTPPoster(apierv1.Config.GeneralCfg().HttpSkipTlsVerify,
apierv1.Config.GeneralCfg().ReplyTimeout).Post(ffn.Address,
_, err = engine.NewHTTPPoster(apiv1.Config.GeneralCfg().HttpSkipTlsVerify,
apiv1.Config.GeneralCfg().ReplyTimeout).Post(ffn.Address,
utils.PosterTransportContentTypes[ffn.Transport], fileContent,
apierv1.Config.GeneralCfg().PosterAttempts, failoverPath)
apiv1.Config.GeneralCfg().PosterAttempts, failoverPath)
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
err = engine.PostersCache.PostAMQP(ffn.Address,
apierv1.Config.GeneralCfg().PosterAttempts, fileContent,
apiv1.Config.GeneralCfg().PosterAttempts, fileContent,
utils.PosterTransportContentTypes[ffn.Transport],
failedReqsOutDir, file.Name())
case utils.MetaAMQPV1jsonMap:
err = engine.PostersCache.PostAMQPv1(ffn.Address, apierv1.Config.GeneralCfg().PosterAttempts,
err = engine.PostersCache.PostAMQPv1(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts,
fileContent, failedReqsOutDir, file.Name())
case utils.MetaSQSjsonMap:
err = engine.PostersCache.PostSQS(ffn.Address, apierv1.Config.GeneralCfg().PosterAttempts,
err = engine.PostersCache.PostSQS(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts,
fileContent, failedReqsOutDir, file.Name())
case utils.MetaKafkajsonMap:
err = engine.PostersCache.PostKafka(ffn.Address, apierv1.Config.GeneralCfg().PosterAttempts,
err = engine.PostersCache.PostKafka(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts,
fileContent, failedReqsOutDir, file.Name(), utils.UUIDSha1Prefix())
case utils.MetaS3jsonMap:
err = engine.PostersCache.PostS3(ffn.Address, apierv1.Config.GeneralCfg().PosterAttempts,
err = engine.PostersCache.PostS3(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts,
fileContent, failedReqsOutDir, file.Name(), utils.UUIDSha1Prefix())
default:
err = fmt.Errorf("unsupported replication transport: %s", ffn.Transport)
@@ -1218,7 +1218,7 @@ func (apierv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *stri
_, err = fileOut.Write(fileContent)
fileOut.Close()
return 0, err
}, apierv1.Config.GeneralCfg().LockingTimeout, utils.FileLockPrefix+failoverPath)
}, apiv1.Config.GeneralCfg().LockingTimeout, utils.FileLockPrefix+failoverPath)
if err != nil {
return utils.NewErrServerError(err)
}
@@ -1230,28 +1230,28 @@ func (apierv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *stri
// CallCache caching the item based on cacheopt
// visible in ApierV2
func (apierv1 *ApierV1) CallCache(cacheOpt string, args utils.ArgsGetCacheItem) (err error) {
func (apiv1 *ApierV1) CallCache(cacheOpt string, args utils.ArgsGetCacheItem) (err error) {
var reply string
switch cacheOpt {
case utils.META_NONE:
return
case utils.MetaReload:
if err = apierv1.CacheS.Call(utils.CacheSv1ReloadCache, utils.AttrReloadCacheWithArgDispatcher{
if err = apiv1.CacheS.Call(utils.CacheSv1ReloadCache, utils.AttrReloadCacheWithArgDispatcher{
AttrReloadCache: composeArgsReload(args)}, &reply); err != nil {
return err
}
case utils.MetaLoad:
if err = apierv1.CacheS.Call(utils.CacheSv1LoadCache, utils.AttrReloadCacheWithArgDispatcher{
if err = apiv1.CacheS.Call(utils.CacheSv1LoadCache, utils.AttrReloadCacheWithArgDispatcher{
AttrReloadCache: composeArgsReload(args)}, &reply); err != nil {
return err
}
case utils.MetaRemove:
if err = apierv1.CacheS.Call(utils.CacheSv1RemoveItem,
if err = apiv1.CacheS.Call(utils.CacheSv1RemoveItem,
&utils.ArgsGetCacheItemWithArgDispatcher{ArgsGetCacheItem: args}, &reply); err != nil {
return err
}
case utils.MetaClear:
if err = apierv1.CacheS.Call(utils.CacheSv1FlushCache, utils.AttrReloadCacheWithArgDispatcher{
if err = apiv1.CacheS.Call(utils.CacheSv1FlushCache, utils.AttrReloadCacheWithArgDispatcher{
AttrReloadCache: composeArgsReload(args)}, &reply); err != nil {
return err
}
@@ -1259,8 +1259,8 @@ func (apierv1 *ApierV1) CallCache(cacheOpt string, args utils.ArgsGetCacheItem)
return
}
func (apierv1 *ApierV1) GetLoadIDs(args string, reply *map[string]int64) (err error) {
if loadIDs, err := apierv1.DataManager.GetItemLoadIDs(args, false); err != nil {
func (apiv1 *ApierV1) GetLoadIDs(args string, reply *map[string]int64) (err error) {
if loadIDs, err := apiv1.DataManager.GetItemLoadIDs(args, false); err != nil {
return err
} else {
*reply = loadIDs
@@ -1273,8 +1273,8 @@ type LoadTimeArgs struct {
Item string
}
func (apierv1 *ApierV1) GetLoadTimes(args LoadTimeArgs, reply *map[string]string) (err error) {
if loadIDs, err := apierv1.DataManager.GetItemLoadIDs(args.Item, false); err != nil {
func (apiv1 *ApierV1) GetLoadTimes(args LoadTimeArgs, reply *map[string]string) (err error) {
if loadIDs, err := apiv1.DataManager.GetItemLoadIDs(args.Item, false); err != nil {
return err
} else {
provMp := make(map[string]string)
@@ -1306,9 +1306,9 @@ func (apiv1 *ApierV1) ComputeActionPlanIndexes(_ string, reply *string) (err err
}
// GetActionPlanIDs returns list of ActionPlan IDs registered for a tenant
func (apierV1 *ApierV1) GetActionPlanIDs(args utils.TenantArgWithPaginator, attrPrfIDs *[]string) error {
func (apiv1 *ApierV1) GetActionPlanIDs(args utils.TenantArgWithPaginator, attrPrfIDs *[]string) error {
prfx := utils.ACTION_PLAN_PREFIX
keys, err := apierV1.DataManager.DataDB().GetKeysForPrefix(utils.ACTION_PLAN_PREFIX)
keys, err := apiv1.DataManager.DataDB().GetKeysForPrefix(utils.ACTION_PLAN_PREFIX)
if err != nil {
return err
}
@@ -1324,9 +1324,9 @@ func (apierV1 *ApierV1) GetActionPlanIDs(args utils.TenantArgWithPaginator, attr
}
// GetRatingPlanIDs returns list of RatingPlan IDs registered for a tenant
func (apierV1 *ApierV1) GetRatingPlanIDs(args utils.TenantArgWithPaginator, attrPrfIDs *[]string) error {
func (apiv1 *ApierV1) GetRatingPlanIDs(args utils.TenantArgWithPaginator, attrPrfIDs *[]string) error {
prfx := utils.RATING_PLAN_PREFIX
keys, err := apierV1.DataManager.DataDB().GetKeysForPrefix(utils.RATING_PLAN_PREFIX)
keys, err := apiv1.DataManager.DataDB().GetKeysForPrefix(utils.RATING_PLAN_PREFIX)
if err != nil {
return err
}
@@ -1343,18 +1343,25 @@ func (apierV1 *ApierV1) GetRatingPlanIDs(args utils.TenantArgWithPaginator, attr
// SetAttributeSConnection sets the new connection to the attribute service
// only used on reload
func (apierv1 *ApierV1) SetAttributeSConnection(attrS rpcclient.RpcClientConnection) {
apierv1.AttributeS = attrS
func (apiv1 *ApierV1) SetAttributeSConnection(attrS rpcclient.RpcClientConnection) {
apiv1.AttributeS = attrS
}
// SetCacheSConnection sets the new connection to the cache service
// only used on reload
func (apierv1 *ApierV1) SetCacheSConnection(chS rpcclient.RpcClientConnection) {
apierv1.CacheS = chS
func (apiv1 *ApierV1) SetCacheSConnection(chS rpcclient.RpcClientConnection) {
apiv1.CacheS = chS
}
// SetSchedulerSConnection sets the new connection to the scheduler service
// only used on reload
func (apierv1 *ApierV1) SetSchedulerSConnection(schS rpcclient.RpcClientConnection) {
apierv1.SchedulerS = schS
func (apiv1 *ApierV1) SetSchedulerSConnection(schS rpcclient.RpcClientConnection) {
apiv1.SchedulerS = schS
}
// SetStorDB sets the new connection for StorDB
// only used on reload
func (apiv1 *ApierV1) SetStorDB(storDB engine.StorDB) {
apiv1.CdrDb = storDB
apiv1.StorDb = storDB
}

View File

@@ -454,31 +454,15 @@ func main() {
cfg.LazySanityCheck()
// we should use only one database for StorDB
var storDb engine.StorDB
// var cdrDb engine.CdrStorage
dmService := services.NewDataDBService(cfg)
storDBService := services.NewStorDBService(cfg)
if dmService.ShouldRun() { // Some services can run without db, ie: CDRC
if err = dmService.Start(); err != nil {
return
}
}
if cfg.RalsCfg().Enabled || cfg.CdrsCfg().Enabled {
storDb, err := engine.NewStorDBConn(cfg.StorDbCfg().Type,
cfg.StorDbCfg().Host, cfg.StorDbCfg().Port,
cfg.StorDbCfg().Name, cfg.StorDbCfg().User,
cfg.StorDbCfg().Password, cfg.StorDbCfg().SSLMode,
cfg.StorDbCfg().MaxOpenConns, cfg.StorDbCfg().MaxIdleConns,
cfg.StorDbCfg().ConnMaxLifetime, cfg.StorDbCfg().StringIndexedFields,
cfg.StorDbCfg().PrefixIndexedFields)
if err != nil { // Cannot configure logger database, show stopper
utils.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err))
return
}
defer storDb.Close()
engine.SetCdrStorage(storDb)
if err := engine.CheckVersions(storDb); err != nil {
fmt.Println(err.Error())
if storDBService.ShouldRun() {
if err = storDBService.Start(); err != nil {
return
}
}
@@ -529,11 +513,11 @@ func main() {
attrS.GetIntenternalChan(), stS.GetIntenternalChan(),
reS.GetIntenternalChan(), dspS.GetIntenternalChan())
schS := services.NewSchedulerService(cfg, dmService, cacheS, filterSChan, server, internalCDRServerChan, dspS.GetIntenternalChan())
rals := services.NewRalService(cfg, dmService, storDb, storDb, cacheS, filterSChan, server,
rals := services.NewRalService(cfg, dmService, storDBService, cacheS, filterSChan, server,
tS.GetIntenternalChan(), stS.GetIntenternalChan(), internalCacheSChan,
schS.GetIntenternalChan(), attrS.GetIntenternalChan(), dspS.GetIntenternalChan(),
schS, exitChan)
cdrS := services.NewCDRServer(cfg, dmService, storDb, filterSChan, server, internalCDRServerChan,
cdrS := services.NewCDRServer(cfg, dmService, storDBService, filterSChan, server, internalCDRServerChan,
chrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(),
attrS.GetIntenternalChan(), tS.GetIntenternalChan(),
stS.GetIntenternalChan(), dspS.GetIntenternalChan())

View File

@@ -1575,7 +1575,11 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
}
fallthrough
case STORDB_JSN:
cfg.rldChans[STORDB_JSN] <- struct{}{}
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
if !fall {
cfg.rldChans[CDRS_JSN] <- struct{}{}
cfg.rldChans[Apier] <- struct{}{}
break
}
fallthrough
@@ -1607,7 +1611,8 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
case RALS_JSN:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
cfg.rldChans[STORDB_JSN] <- struct{}{}
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[RALS_JSN] <- struct{}{}
if !fall {
@@ -1617,7 +1622,8 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
case CDRS_JSN:
if !fall {
cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
cfg.rldChans[STORDB_JSN] <- struct{}{}
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[CDRS_JSN] <- struct{}{}
if !fall {
@@ -1773,6 +1779,10 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
}
fallthrough
case Apier:
if !fall {
cfg.rldChans[STORDB_JSN] <- struct{}{}
time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it)
}
cfg.rldChans[Apier] <- struct{}{}
if !fall {
break

View File

@@ -95,3 +95,22 @@ func (dbcfg *StorDbCfg) loadFromJsonCfg(jsnDbCfg *DbJsonCfg) (err error) {
}
return nil
}
// Clone returns the cloned object
func (dbcfg *StorDbCfg) Clone() *StorDbCfg {
return &StorDbCfg{
Type: dbcfg.Type,
Host: dbcfg.Host,
Port: dbcfg.Port,
Name: dbcfg.Name,
User: dbcfg.User,
Password: dbcfg.Password,
MaxOpenConns: dbcfg.MaxOpenConns,
MaxIdleConns: dbcfg.MaxIdleConns,
ConnMaxLifetime: dbcfg.ConnMaxLifetime,
StringIndexedFields: dbcfg.StringIndexedFields,
PrefixIndexedFields: dbcfg.PrefixIndexedFields,
QueryTimeout: dbcfg.QueryTimeout,
SSLMode: dbcfg.SSLMode,
}
}

View File

@@ -85,10 +85,16 @@ func NewCDRServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, r
if chargerS != nil && reflect.ValueOf(chargerS).IsNil() {
chargerS = nil
}
return &CDRServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm,
rals: rater, attrS: attrS,
statS: statS, thdS: thdS,
chargerS: chargerS, guard: guardian.Guardian,
return &CDRServer{
cgrCfg: cgrCfg,
cdrDb: cdrDb,
dm: dm,
rals: rater,
attrS: attrS,
statS: statS,
thdS: thdS,
chargerS: chargerS,
guard: guardian.Guardian,
httpPoster: NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify,
cgrCfg.GeneralCfg().ReplyTimeout), filterS: filterS}
}
@@ -859,3 +865,9 @@ func (cdrS *CDRServer) SetChargerSConnection(chS rpcclient.RpcClientConnection)
func (cdrS *CDRServer) SetRALsConnection(rls rpcclient.RpcClientConnection) {
cdrS.rals = rls
}
// SetStorDB sets the new StorDB
// only used on reload
func (cdrS *CDRServer) SetStorDB(cdrDb CdrStorage) {
cdrS.cdrDb = cdrDb
}

View File

@@ -17,7 +17,6 @@ package engine
import (
"fmt"
"strconv"
"strings"
"github.com/cgrates/cgrates/config"
@@ -2051,33 +2050,8 @@ func (dm *DataManager) SetLoadIDs(loadIDs map[string]int64) (err error) {
// Reconnect recconnects to the DB when the config was changes
func (dm *DataManager) Reconnect(marshaler string, newcfg *config.DataDbCfg) (err error) {
var d DataDB
switch newcfg.DataDbType {
case utils.REDIS:
var dbNb int
dbNb, err = strconv.Atoi(newcfg.DataDbName)
if err != nil {
utils.Logger.Crit("Redis db name must be an integer!")
return
}
host := newcfg.DataDbHost
if newcfg.DataDbPort != "" && strings.Index(host, ":") == -1 {
host += ":" + newcfg.DataDbPort
}
d, err = NewRedisStorage(host, dbNb, newcfg.DataDbPass, marshaler, utils.REDIS_MAX_CONNS, newcfg.DataDbSentinelName)
case utils.MONGO:
d, err = NewMongoStorage(newcfg.DataDbHost, newcfg.DataDbPort, newcfg.DataDbName,
newcfg.DataDbUser, newcfg.DataDbPass, utils.DataDB, nil, true)
case utils.INTERNAL:
if marshaler == utils.JSON {
d = NewInternalDBJson(nil, nil)
} else {
d = NewInternalDB(nil, nil)
}
default:
err = fmt.Errorf("unknown db '%s' valid options are '%s' or '%s or '%s'",
newcfg.DataDbType, utils.REDIS, utils.MONGO, utils.INTERNAL)
}
d, err := NewDataDBConn(newcfg.DataDbType, newcfg.DataDbHost, newcfg.DataDbPort, newcfg.DataDbName,
newcfg.DataDbUser, newcfg.DataDbPass, marshaler, newcfg.DataDbSentinelName)
if err != nil {
return
}

View File

@@ -41,9 +41,13 @@ type InternalDB struct {
func NewInternalDB(stringIndexedFields, prefixIndexedFields []string) *InternalDB {
dfltCfg, _ := config.NewDefaultCGRConfig()
return &InternalDB{db: ltcache.NewTransCache(dfltCfg.CacheCfg().AsTransCacheConfig()),
ms: NewCodecMsgpackMarshaler(), stringIndexedFields: stringIndexedFields,
prefixIndexedFields: prefixIndexedFields, cnter: utils.NewCounter(time.Now().UnixNano(), 0)}
return &InternalDB{
db: ltcache.NewTransCache(dfltCfg.CacheCfg().AsTransCacheConfig()),
ms: NewCodecMsgpackMarshaler(),
stringIndexedFields: stringIndexedFields,
prefixIndexedFields: prefixIndexedFields,
cnter: utils.NewCounter(time.Now().UnixNano(), 0),
}
}
func NewInternalDBJson(stringIndexedFields, prefixIndexedFields []string) (InternalDB *InternalDB) {
@@ -52,6 +56,16 @@ func NewInternalDBJson(stringIndexedFields, prefixIndexedFields []string) (Inter
return
}
// SetStringIndexedFields set the stringIndexedFields, used at StorDB reload
func (iDB *InternalDB) SetStringIndexedFields(stringIndexedFields []string) {
iDB.stringIndexedFields = stringIndexedFields
}
// SetPrefixIndexedFields set the prefixIndexedFields, used at StorDB reload
func (iDB *InternalDB) SetPrefixIndexedFields(prefixIndexedFields []string) {
iDB.prefixIndexedFields = prefixIndexedFields
}
func (iDB *InternalDB) Close() {}
func (iDB *InternalDB) Flush(_ string) error {

View File

@@ -31,8 +31,7 @@ import (
// NewApierV1Service returns the ApierV1 Service
func NewApierV1Service(cfg *config.CGRConfig, dm *DataDBService,
cdrStorage engine.CdrStorage, loadStorage engine.LoadStorage,
filterSChan chan *engine.FilterS,
storDB *StorDBService, filterSChan chan *engine.FilterS,
server *utils.Server, cacheSChan, schedChan, attrsChan,
dispatcherChan chan rpcclient.RpcClientConnection,
schedService *SchedulerService,
@@ -41,8 +40,7 @@ func NewApierV1Service(cfg *config.CGRConfig, dm *DataDBService,
connChan: make(chan rpcclient.RpcClientConnection, 1),
cfg: cfg,
dm: dm,
cdrStorage: cdrStorage,
loadStorage: loadStorage,
storDB: storDB,
filterSChan: filterSChan,
server: server,
cacheSChan: cacheSChan,
@@ -59,8 +57,7 @@ type ApierV1Service struct {
sync.RWMutex
cfg *config.CGRConfig
dm *DataDBService
cdrStorage engine.CdrStorage
loadStorage engine.LoadStorage
storDB *StorDBService
filterSChan chan *engine.FilterS
server *utils.Server
cacheSChan chan rpcclient.RpcClientConnection
@@ -110,9 +107,9 @@ func (api *ApierV1Service) Start() (err error) {
}
api.api = &v1.ApierV1{
StorDb: api.loadStorage,
DataManager: api.dm.GetDM(),
CdrDb: api.cdrStorage,
CdrDb: api.storDB.GetDM(),
StorDb: api.storDB.GetDM(),
Config: api.cfg,
Responder: api.responderService.GetResponder(),
SchedulerService: api.schedService,
@@ -164,6 +161,9 @@ func (api *ApierV1Service) Reload() (err error) {
return
}
api.Lock()
if api.storDB.WasReconnected() { // rewrite the connection if was changed
api.api.SetStorDB(api.storDB.GetDM())
}
api.api.SetAttributeSConnection(attributeSrpc)
api.api.SetCacheSConnection(cacheSrpc)
api.api.SetSchedulerSConnection(schedulerSrpc)

View File

@@ -33,14 +33,14 @@ import (
// NewCDRServer returns the CDR Server
func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
cdrStorage engine.CdrStorage, filterSChan chan *engine.FilterS,
storDB *StorDBService, filterSChan chan *engine.FilterS,
server *utils.Server, internalCDRServerChan, chrsChan, respChan, attrsChan, thsChan, stsChan,
dispatcherChan chan rpcclient.RpcClientConnection) servmanager.Service {
return &CDRServer{
connChan: internalCDRServerChan,
cfg: cfg,
dm: dm,
cdrStorage: cdrStorage,
storDB: storDB,
filterSChan: filterSChan,
server: server,
chrsChan: chrsChan,
@@ -57,7 +57,7 @@ type CDRServer struct {
sync.RWMutex
cfg *config.CGRConfig
dm *DataDBService
cdrStorage engine.CdrStorage
storDB *StorDBService
filterSChan chan *engine.FilterS
server *utils.Server
chrsChan chan rpcclient.RpcClientConnection
@@ -118,7 +118,7 @@ func (cdrS *CDRServer) Start() (err error) {
}
cdrS.Lock()
defer cdrS.Unlock()
cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, cdrS.cdrStorage, cdrS.dm.GetDM(),
cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, cdrS.storDB.GetDM(), cdrS.dm.GetDM(),
ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn, filterS)
utils.Logger.Info("Registering CDRS HTTP Handlers.")
cdrS.cdrS.RegisterHandlersToServer(cdrS.server)
@@ -173,6 +173,9 @@ func (cdrS *CDRServer) Reload() (err error) {
return
}
cdrS.Lock()
if cdrS.storDB.WasReconnected() { // rewrite the connection if was changed
cdrS.cdrS.SetStorDB(cdrS.storDB.GetDM())
}
cdrS.cdrS.SetRALsConnection(ralConn)
cdrS.cdrS.SetAttributeSConnection(attrSConn)
cdrS.cdrS.SetThresholSConnection(thresholdSConn)

View File

@@ -117,7 +117,7 @@ func (db *DataDBService) Reload() (err error) {
db.Lock()
defer db.Unlock()
if db.needsConnectionReload() {
db.db.DataDB().Close()
// db.db.DataDB().Close() // already closed in reconnect
if err = db.db.Reconnect(db.cfg.GeneralCfg().DBDataEncoding, db.cfg.DataDbCfg()); err != nil {
return
}

View File

@@ -32,12 +32,11 @@ import (
// NewRalService returns the Ral Service
func NewRalService(cfg *config.CGRConfig, dm *DataDBService,
cdrStorage engine.CdrStorage, loadStorage engine.LoadStorage,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server,
storDB *StorDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server,
thsChan, stsChan, cacheSChan, schedChan, attrsChan, dispatcherChan chan rpcclient.RpcClientConnection,
schedulerService *SchedulerService, exitChan chan bool) *RalService {
resp := NewResponderService(cfg, server, thsChan, stsChan, dispatcherChan, exitChan)
apiv1 := NewApierV1Service(cfg, dm, cdrStorage, loadStorage, filterSChan, server, cacheSChan, schedChan, attrsChan, dispatcherChan, schedulerService, resp)
apiv1 := NewApierV1Service(cfg, dm, storDB, filterSChan, server, cacheSChan, schedChan, attrsChan, dispatcherChan, schedulerService, resp)
apiv2 := NewApierV2Service(apiv1, cfg, server)
return &RalService{
connChan: make(chan rpcclient.RpcClientConnection, 1),

195
services/stordb.go Normal file
View File

@@ -0,0 +1,195 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"fmt"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewStorDBService returns the StorDB Service
func NewStorDBService(cfg *config.CGRConfig) *StorDBService {
return &StorDBService{
cfg: cfg,
dbchan: make(chan engine.StorDB, 1),
// db: engine.NewInternalDB([]string{}, []string{}), // to be removed
}
}
// StorDBService implements Service interface
type StorDBService struct {
sync.RWMutex
cfg *config.CGRConfig
oldDBCfg *config.StorDbCfg
db engine.StorDB
dbchan chan engine.StorDB
reconnected bool
}
// Start should handle the sercive start
func (db *StorDBService) Start() (err error) {
if db.IsRunning() {
return fmt.Errorf("service aleady running")
}
db.Lock()
defer db.Unlock()
db.oldDBCfg = db.cfg.StorDbCfg().Clone()
d, err := engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host,
db.cfg.StorDbCfg().Port, db.cfg.StorDbCfg().Name, db.cfg.StorDbCfg().User,
db.cfg.StorDbCfg().Password, db.cfg.StorDbCfg().SSLMode, db.cfg.StorDbCfg().MaxOpenConns,
db.cfg.StorDbCfg().MaxIdleConns, db.cfg.StorDbCfg().ConnMaxLifetime,
db.cfg.StorDbCfg().StringIndexedFields, db.cfg.StorDbCfg().PrefixIndexedFields)
if err != nil { // Cannot configure getter database, show stopper
utils.Logger.Crit(fmt.Sprintf("Could not configure storDB: %s exiting!", err))
return
}
db.db = d
engine.SetCdrStorage(db.db)
if err = engine.CheckVersions(db.db); err != nil {
fmt.Println(err)
return
}
db.dbchan <- db.db
return
}
// GetIntenternalChan returns the internal connection chanel
func (db *StorDBService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
return nil
}
// Reload handles the change of config
func (db *StorDBService) Reload() (err error) {
db.Lock()
defer db.Unlock()
if db.reconnected = db.needsConnectionReload(); db.reconnected {
var d engine.StorDB
if d, err = engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host,
db.cfg.StorDbCfg().Port, db.cfg.StorDbCfg().Name, db.cfg.StorDbCfg().User,
db.cfg.StorDbCfg().Password, db.cfg.StorDbCfg().SSLMode, db.cfg.StorDbCfg().MaxOpenConns,
db.cfg.StorDbCfg().MaxIdleConns, db.cfg.StorDbCfg().ConnMaxLifetime,
db.cfg.StorDbCfg().StringIndexedFields, db.cfg.StorDbCfg().PrefixIndexedFields); err != nil {
return
}
db.db.Close()
db.db = d
db.oldDBCfg = db.cfg.StorDbCfg().Clone()
return
}
if db.cfg.StorDbCfg().Type == utils.MONGO {
mgo, canCast := db.db.(*engine.MongoStorage)
if !canCast {
return fmt.Errorf("can't conver StorDB of type %s to MongoStorage",
db.cfg.StorDbCfg().Type)
}
mgo.SetTTL(db.cfg.StorDbCfg().QueryTimeout)
} else if db.cfg.StorDbCfg().Type == utils.POSTGRES ||
db.cfg.StorDbCfg().Type == utils.MYSQL {
msql, canCast := db.db.(*engine.SQLStorage)
if !canCast {
return fmt.Errorf("can't conver StorDB of type %s to SQLStorage",
db.cfg.StorDbCfg().Type)
}
msql.Db.SetMaxOpenConns(db.cfg.StorDbCfg().MaxOpenConns)
msql.Db.SetMaxIdleConns(db.cfg.StorDbCfg().MaxIdleConns)
msql.Db.SetConnMaxLifetime(time.Duration(db.cfg.StorDbCfg().ConnMaxLifetime) * time.Second)
} else if db.cfg.StorDbCfg().Type == utils.INTERNAL {
idb, canCast := db.db.(*engine.InternalDB)
if !canCast {
return fmt.Errorf("can't conver StorDB of type %s to InternalDB",
db.cfg.StorDbCfg().Type)
}
idb.SetStringIndexedFields(db.cfg.StorDbCfg().StringIndexedFields)
idb.SetPrefixIndexedFields(db.cfg.StorDbCfg().PrefixIndexedFields)
}
return
}
// Shutdown stops the service
func (db *StorDBService) Shutdown() (err error) {
db.Lock()
db.db.Close()
db.db = nil
db.reconnected = false
db.Unlock()
return
}
// IsRunning returns if the service is running
func (db *StorDBService) IsRunning() bool {
db.RLock()
defer db.RUnlock()
return db != nil && db.db != nil
}
// ServiceName returns the service name
func (db *StorDBService) ServiceName() string {
return utils.StorDB
}
// ShouldRun returns if the service should be running
func (db *StorDBService) ShouldRun() bool {
return db.cfg.RalsCfg().Enabled || db.cfg.CdrsCfg().Enabled
}
// GetDM returns the StorDB
func (db *StorDBService) GetDM() engine.StorDB {
db.RLock()
defer db.RUnlock()
return db.db
}
// needsConnectionReload returns if the DB connection needs to reloaded
func (db *StorDBService) needsConnectionReload() bool {
if db.oldDBCfg.Type != db.cfg.StorDbCfg().Type ||
db.oldDBCfg.Host != db.cfg.StorDbCfg().Host ||
db.oldDBCfg.Name != db.cfg.StorDbCfg().Name ||
db.oldDBCfg.Port != db.cfg.StorDbCfg().Port ||
db.oldDBCfg.User != db.cfg.StorDbCfg().User ||
db.oldDBCfg.Password != db.cfg.StorDbCfg().Password {
return true
}
if db.cfg.StorDbCfg().Type == utils.POSTGRES &&
db.oldDBCfg.SSLMode != db.cfg.StorDbCfg().SSLMode {
return true
}
return false
}
// GetStorDBchan returns the StorDB chanel
func (db *StorDBService) GetStorDBchan() chan engine.StorDB {
db.RLock()
defer db.RUnlock()
return db.dbchan
}
// WasReconnected returns if after reload the DB was recreated
func (db *StorDBService) WasReconnected() bool {
db.RLock()
defer db.RUnlock()
return db.reconnected
}

View File

@@ -241,6 +241,9 @@ func (srvMngr *ServiceManager) handleReload() {
if err = srvMngr.reloadService(utils.ApierV1); err != nil {
return
}
if err = srvMngr.reloadService(utils.ApierV2); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.CDRS_JSN):
if err = srvMngr.reloadService(utils.CDRServer); err != nil {
return
@@ -297,6 +300,10 @@ func (srvMngr *ServiceManager) handleReload() {
if err = srvMngr.reloadService(utils.DataDB); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.STORDB_JSN):
if err = srvMngr.reloadService(utils.StorDB); err != nil {
return
}
}
// handle RPC server
}