diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 659c9420e..56cb3e399 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -42,9 +42,9 @@ type SchedulerGeter interface { } type ApierV1 struct { - StorDb engine.LoadStorage - DataManager *engine.DataManager + StorDb engine.LoadStorage // we should consider keeping only one of StorDB type CdrDb engine.CdrStorage + DataManager *engine.DataManager Config *config.CGRConfig Responder *engine.Responder CDRs rpcclient.RpcClientConnection // FixMe: populate it from cgr-engine @@ -96,12 +96,12 @@ func (apiv1 *ApierV1) RemoveDestination(attr AttrRemoveDestination, reply *strin } // GetReverseDestination retrieves revese destination list for a prefix -func (apierv1 *ApierV1) GetReverseDestination(prefix string, reply *[]string) (err error) { +func (apiv1 *ApierV1) GetReverseDestination(prefix string, reply *[]string) (err error) { if prefix == "" { return utils.NewErrMandatoryIeMissing("prefix") } var revLst []string - if revLst, err = apierv1.DataManager.DataDB().GetReverseDestination(prefix, false, utils.NonTransactional); err != nil { + if revLst, err = apiv1.DataManager.DataDB().GetReverseDestination(prefix, false, utils.NonTransactional); err != nil { return } *reply = revLst @@ -109,8 +109,8 @@ func (apierv1 *ApierV1) GetReverseDestination(prefix string, reply *[]string) (e } // ComputeReverseDestinations will rebuild complete reverse destinations data -func (apierv1 *ApierV1) ComputeReverseDestinations(ignr string, reply *string) (err error) { - if err = apierv1.DataManager.DataDB().RebuildReverseForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil { +func (apiv1 *ApierV1) ComputeReverseDestinations(ignr string, reply *string) (err error) { + if err = apiv1.DataManager.DataDB().RebuildReverseForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil { return } *reply = utils.OK @@ -118,16 +118,16 @@ func (apierv1 *ApierV1) ComputeReverseDestinations(ignr string, reply *string) ( } // ComputeAccountActionPlans will rebuild complete reverse accountActions data -func (apierv1 *ApierV1) ComputeAccountActionPlans(ignr string, reply *string) (err error) { - if err = apierv1.DataManager.DataDB().RebuildReverseForPrefix(utils.AccountActionPlansPrefix); err != nil { +func (apiv1 *ApierV1) ComputeAccountActionPlans(ignr string, reply *string) (err error) { + if err = apiv1.DataManager.DataDB().RebuildReverseForPrefix(utils.AccountActionPlansPrefix); err != nil { return } *reply = utils.OK return } -func (apierv1 *ApierV1) GetSharedGroup(sgId string, reply *engine.SharedGroup) error { - if sg, err := apierv1.DataManager.GetSharedGroup(sgId, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { // Not found is not an error here +func (apiv1 *ApierV1) GetSharedGroup(sgId string, reply *engine.SharedGroup) error { + if sg, err := apiv1.DataManager.GetSharedGroup(sgId, false, utils.NonTransactional); err != nil && err != utils.ErrNotFound { // Not found is not an error here return err } else { if sg != nil { @@ -436,12 +436,12 @@ func (apiv1 *ApierV1) SetRatingProfile(attrs utils.AttrSetRatingProfile, reply * } // GetRatingProfileIDs returns list of resourceProfile IDs registered for a tenant -func (apierV1 *ApierV1) GetRatingProfileIDs(args utils.TenantArgWithPaginator, rsPrfIDs *[]string) error { +func (apiv1 *ApierV1) GetRatingProfileIDs(args utils.TenantArgWithPaginator, rsPrfIDs *[]string) error { if missing := utils.MissingStructFields(&args, []string{utils.Tenant}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } prfx := utils.RATING_PROFILE_PREFIX + "*out:" + args.Tenant + ":" - keys, err := apierV1.DataManager.DataDB().GetKeysForPrefix(prfx) + keys, err := apiv1.DataManager.DataDB().GetKeysForPrefix(prfx) if err != nil { return err } @@ -1132,8 +1132,8 @@ type ArgsReplyFailedPosts struct { } // ReplayFailedPosts will repost failed requests found in the FailedRequestsInDir -func (apierv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (err error) { - failedReqsInDir := apierv1.Config.GeneralCfg().FailedPostsDir +func (apiv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *string) (err error) { + failedReqsInDir := apiv1.Config.GeneralCfg().FailedPostsDir if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != "" { failedReqsInDir = *args.FailedRequestsInDir } @@ -1172,7 +1172,7 @@ func (apierv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *stri return 0, err } return 0, os.Remove(filePath) - }, apierv1.Config.GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath) + }, apiv1.Config.GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath) if err != nil { return utils.NewErrServerError(err) } @@ -1182,26 +1182,26 @@ func (apierv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *stri } switch ffn.Transport { case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST: - _, err = engine.NewHTTPPoster(apierv1.Config.GeneralCfg().HttpSkipTlsVerify, - apierv1.Config.GeneralCfg().ReplyTimeout).Post(ffn.Address, + _, err = engine.NewHTTPPoster(apiv1.Config.GeneralCfg().HttpSkipTlsVerify, + apiv1.Config.GeneralCfg().ReplyTimeout).Post(ffn.Address, utils.PosterTransportContentTypes[ffn.Transport], fileContent, - apierv1.Config.GeneralCfg().PosterAttempts, failoverPath) + apiv1.Config.GeneralCfg().PosterAttempts, failoverPath) case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap: err = engine.PostersCache.PostAMQP(ffn.Address, - apierv1.Config.GeneralCfg().PosterAttempts, fileContent, + apiv1.Config.GeneralCfg().PosterAttempts, fileContent, utils.PosterTransportContentTypes[ffn.Transport], failedReqsOutDir, file.Name()) case utils.MetaAMQPV1jsonMap: - err = engine.PostersCache.PostAMQPv1(ffn.Address, apierv1.Config.GeneralCfg().PosterAttempts, + err = engine.PostersCache.PostAMQPv1(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts, fileContent, failedReqsOutDir, file.Name()) case utils.MetaSQSjsonMap: - err = engine.PostersCache.PostSQS(ffn.Address, apierv1.Config.GeneralCfg().PosterAttempts, + err = engine.PostersCache.PostSQS(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts, fileContent, failedReqsOutDir, file.Name()) case utils.MetaKafkajsonMap: - err = engine.PostersCache.PostKafka(ffn.Address, apierv1.Config.GeneralCfg().PosterAttempts, + err = engine.PostersCache.PostKafka(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts, fileContent, failedReqsOutDir, file.Name(), utils.UUIDSha1Prefix()) case utils.MetaS3jsonMap: - err = engine.PostersCache.PostS3(ffn.Address, apierv1.Config.GeneralCfg().PosterAttempts, + err = engine.PostersCache.PostS3(ffn.Address, apiv1.Config.GeneralCfg().PosterAttempts, fileContent, failedReqsOutDir, file.Name(), utils.UUIDSha1Prefix()) default: err = fmt.Errorf("unsupported replication transport: %s", ffn.Transport) @@ -1218,7 +1218,7 @@ func (apierv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *stri _, err = fileOut.Write(fileContent) fileOut.Close() return 0, err - }, apierv1.Config.GeneralCfg().LockingTimeout, utils.FileLockPrefix+failoverPath) + }, apiv1.Config.GeneralCfg().LockingTimeout, utils.FileLockPrefix+failoverPath) if err != nil { return utils.NewErrServerError(err) } @@ -1230,28 +1230,28 @@ func (apierv1 *ApierV1) ReplayFailedPosts(args ArgsReplyFailedPosts, reply *stri // CallCache caching the item based on cacheopt // visible in ApierV2 -func (apierv1 *ApierV1) CallCache(cacheOpt string, args utils.ArgsGetCacheItem) (err error) { +func (apiv1 *ApierV1) CallCache(cacheOpt string, args utils.ArgsGetCacheItem) (err error) { var reply string switch cacheOpt { case utils.META_NONE: return case utils.MetaReload: - if err = apierv1.CacheS.Call(utils.CacheSv1ReloadCache, utils.AttrReloadCacheWithArgDispatcher{ + if err = apiv1.CacheS.Call(utils.CacheSv1ReloadCache, utils.AttrReloadCacheWithArgDispatcher{ AttrReloadCache: composeArgsReload(args)}, &reply); err != nil { return err } case utils.MetaLoad: - if err = apierv1.CacheS.Call(utils.CacheSv1LoadCache, utils.AttrReloadCacheWithArgDispatcher{ + if err = apiv1.CacheS.Call(utils.CacheSv1LoadCache, utils.AttrReloadCacheWithArgDispatcher{ AttrReloadCache: composeArgsReload(args)}, &reply); err != nil { return err } case utils.MetaRemove: - if err = apierv1.CacheS.Call(utils.CacheSv1RemoveItem, + if err = apiv1.CacheS.Call(utils.CacheSv1RemoveItem, &utils.ArgsGetCacheItemWithArgDispatcher{ArgsGetCacheItem: args}, &reply); err != nil { return err } case utils.MetaClear: - if err = apierv1.CacheS.Call(utils.CacheSv1FlushCache, utils.AttrReloadCacheWithArgDispatcher{ + if err = apiv1.CacheS.Call(utils.CacheSv1FlushCache, utils.AttrReloadCacheWithArgDispatcher{ AttrReloadCache: composeArgsReload(args)}, &reply); err != nil { return err } @@ -1259,8 +1259,8 @@ func (apierv1 *ApierV1) CallCache(cacheOpt string, args utils.ArgsGetCacheItem) return } -func (apierv1 *ApierV1) GetLoadIDs(args string, reply *map[string]int64) (err error) { - if loadIDs, err := apierv1.DataManager.GetItemLoadIDs(args, false); err != nil { +func (apiv1 *ApierV1) GetLoadIDs(args string, reply *map[string]int64) (err error) { + if loadIDs, err := apiv1.DataManager.GetItemLoadIDs(args, false); err != nil { return err } else { *reply = loadIDs @@ -1273,8 +1273,8 @@ type LoadTimeArgs struct { Item string } -func (apierv1 *ApierV1) GetLoadTimes(args LoadTimeArgs, reply *map[string]string) (err error) { - if loadIDs, err := apierv1.DataManager.GetItemLoadIDs(args.Item, false); err != nil { +func (apiv1 *ApierV1) GetLoadTimes(args LoadTimeArgs, reply *map[string]string) (err error) { + if loadIDs, err := apiv1.DataManager.GetItemLoadIDs(args.Item, false); err != nil { return err } else { provMp := make(map[string]string) @@ -1306,9 +1306,9 @@ func (apiv1 *ApierV1) ComputeActionPlanIndexes(_ string, reply *string) (err err } // GetActionPlanIDs returns list of ActionPlan IDs registered for a tenant -func (apierV1 *ApierV1) GetActionPlanIDs(args utils.TenantArgWithPaginator, attrPrfIDs *[]string) error { +func (apiv1 *ApierV1) GetActionPlanIDs(args utils.TenantArgWithPaginator, attrPrfIDs *[]string) error { prfx := utils.ACTION_PLAN_PREFIX - keys, err := apierV1.DataManager.DataDB().GetKeysForPrefix(utils.ACTION_PLAN_PREFIX) + keys, err := apiv1.DataManager.DataDB().GetKeysForPrefix(utils.ACTION_PLAN_PREFIX) if err != nil { return err } @@ -1324,9 +1324,9 @@ func (apierV1 *ApierV1) GetActionPlanIDs(args utils.TenantArgWithPaginator, attr } // GetRatingPlanIDs returns list of RatingPlan IDs registered for a tenant -func (apierV1 *ApierV1) GetRatingPlanIDs(args utils.TenantArgWithPaginator, attrPrfIDs *[]string) error { +func (apiv1 *ApierV1) GetRatingPlanIDs(args utils.TenantArgWithPaginator, attrPrfIDs *[]string) error { prfx := utils.RATING_PLAN_PREFIX - keys, err := apierV1.DataManager.DataDB().GetKeysForPrefix(utils.RATING_PLAN_PREFIX) + keys, err := apiv1.DataManager.DataDB().GetKeysForPrefix(utils.RATING_PLAN_PREFIX) if err != nil { return err } @@ -1343,18 +1343,25 @@ func (apierV1 *ApierV1) GetRatingPlanIDs(args utils.TenantArgWithPaginator, attr // SetAttributeSConnection sets the new connection to the attribute service // only used on reload -func (apierv1 *ApierV1) SetAttributeSConnection(attrS rpcclient.RpcClientConnection) { - apierv1.AttributeS = attrS +func (apiv1 *ApierV1) SetAttributeSConnection(attrS rpcclient.RpcClientConnection) { + apiv1.AttributeS = attrS } // SetCacheSConnection sets the new connection to the cache service // only used on reload -func (apierv1 *ApierV1) SetCacheSConnection(chS rpcclient.RpcClientConnection) { - apierv1.CacheS = chS +func (apiv1 *ApierV1) SetCacheSConnection(chS rpcclient.RpcClientConnection) { + apiv1.CacheS = chS } // SetSchedulerSConnection sets the new connection to the scheduler service // only used on reload -func (apierv1 *ApierV1) SetSchedulerSConnection(schS rpcclient.RpcClientConnection) { - apierv1.SchedulerS = schS +func (apiv1 *ApierV1) SetSchedulerSConnection(schS rpcclient.RpcClientConnection) { + apiv1.SchedulerS = schS +} + +// SetStorDB sets the new connection for StorDB +// only used on reload +func (apiv1 *ApierV1) SetStorDB(storDB engine.StorDB) { + apiv1.CdrDb = storDB + apiv1.StorDb = storDB } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 3e915692c..126229bbe 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -454,31 +454,15 @@ func main() { cfg.LazySanityCheck() - // we should use only one database for StorDB - var storDb engine.StorDB - // var cdrDb engine.CdrStorage dmService := services.NewDataDBService(cfg) + storDBService := services.NewStorDBService(cfg) if dmService.ShouldRun() { // Some services can run without db, ie: CDRC if err = dmService.Start(); err != nil { return } } - if cfg.RalsCfg().Enabled || cfg.CdrsCfg().Enabled { - storDb, err := engine.NewStorDBConn(cfg.StorDbCfg().Type, - cfg.StorDbCfg().Host, cfg.StorDbCfg().Port, - cfg.StorDbCfg().Name, cfg.StorDbCfg().User, - cfg.StorDbCfg().Password, cfg.StorDbCfg().SSLMode, - cfg.StorDbCfg().MaxOpenConns, cfg.StorDbCfg().MaxIdleConns, - cfg.StorDbCfg().ConnMaxLifetime, cfg.StorDbCfg().StringIndexedFields, - cfg.StorDbCfg().PrefixIndexedFields) - if err != nil { // Cannot configure logger database, show stopper - utils.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err)) - return - } - defer storDb.Close() - engine.SetCdrStorage(storDb) - if err := engine.CheckVersions(storDb); err != nil { - fmt.Println(err.Error()) + if storDBService.ShouldRun() { + if err = storDBService.Start(); err != nil { return } } @@ -529,11 +513,11 @@ func main() { attrS.GetIntenternalChan(), stS.GetIntenternalChan(), reS.GetIntenternalChan(), dspS.GetIntenternalChan()) schS := services.NewSchedulerService(cfg, dmService, cacheS, filterSChan, server, internalCDRServerChan, dspS.GetIntenternalChan()) - rals := services.NewRalService(cfg, dmService, storDb, storDb, cacheS, filterSChan, server, + rals := services.NewRalService(cfg, dmService, storDBService, cacheS, filterSChan, server, tS.GetIntenternalChan(), stS.GetIntenternalChan(), internalCacheSChan, schS.GetIntenternalChan(), attrS.GetIntenternalChan(), dspS.GetIntenternalChan(), schS, exitChan) - cdrS := services.NewCDRServer(cfg, dmService, storDb, filterSChan, server, internalCDRServerChan, + cdrS := services.NewCDRServer(cfg, dmService, storDBService, filterSChan, server, internalCDRServerChan, chrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(), attrS.GetIntenternalChan(), tS.GetIntenternalChan(), stS.GetIntenternalChan(), dspS.GetIntenternalChan()) diff --git a/config/config.go b/config/config.go index 65f62cb79..0594b1191 100755 --- a/config/config.go +++ b/config/config.go @@ -1575,7 +1575,11 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case STORDB_JSN: + cfg.rldChans[STORDB_JSN] <- struct{}{} + time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it) if !fall { + cfg.rldChans[CDRS_JSN] <- struct{}{} + cfg.rldChans[Apier] <- struct{}{} break } fallthrough @@ -1607,7 +1611,8 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { case RALS_JSN: if !fall { cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before - time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it) + cfg.rldChans[STORDB_JSN] <- struct{}{} + time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it) } cfg.rldChans[RALS_JSN] <- struct{}{} if !fall { @@ -1617,7 +1622,8 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { case CDRS_JSN: if !fall { cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before - time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it) + cfg.rldChans[STORDB_JSN] <- struct{}{} + time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it) } cfg.rldChans[CDRS_JSN] <- struct{}{} if !fall { @@ -1773,6 +1779,10 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case Apier: + if !fall { + cfg.rldChans[STORDB_JSN] <- struct{}{} + time.Sleep(1) // to force the context switch( to be sure we start the DB before a service that needs it) + } cfg.rldChans[Apier] <- struct{}{} if !fall { break diff --git a/config/storedbcfg.go b/config/storedbcfg.go index d6657f10d..b8d17a182 100644 --- a/config/storedbcfg.go +++ b/config/storedbcfg.go @@ -95,3 +95,22 @@ func (dbcfg *StorDbCfg) loadFromJsonCfg(jsnDbCfg *DbJsonCfg) (err error) { } return nil } + +// Clone returns the cloned object +func (dbcfg *StorDbCfg) Clone() *StorDbCfg { + return &StorDbCfg{ + Type: dbcfg.Type, + Host: dbcfg.Host, + Port: dbcfg.Port, + Name: dbcfg.Name, + User: dbcfg.User, + Password: dbcfg.Password, + MaxOpenConns: dbcfg.MaxOpenConns, + MaxIdleConns: dbcfg.MaxIdleConns, + ConnMaxLifetime: dbcfg.ConnMaxLifetime, + StringIndexedFields: dbcfg.StringIndexedFields, + PrefixIndexedFields: dbcfg.PrefixIndexedFields, + QueryTimeout: dbcfg.QueryTimeout, + SSLMode: dbcfg.SSLMode, + } +} diff --git a/engine/cdrs.go b/engine/cdrs.go index 96464658b..64f1e736b 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -85,10 +85,16 @@ func NewCDRServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, r if chargerS != nil && reflect.ValueOf(chargerS).IsNil() { chargerS = nil } - return &CDRServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm, - rals: rater, attrS: attrS, - statS: statS, thdS: thdS, - chargerS: chargerS, guard: guardian.Guardian, + return &CDRServer{ + cgrCfg: cgrCfg, + cdrDb: cdrDb, + dm: dm, + rals: rater, + attrS: attrS, + statS: statS, + thdS: thdS, + chargerS: chargerS, + guard: guardian.Guardian, httpPoster: NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify, cgrCfg.GeneralCfg().ReplyTimeout), filterS: filterS} } @@ -859,3 +865,9 @@ func (cdrS *CDRServer) SetChargerSConnection(chS rpcclient.RpcClientConnection) func (cdrS *CDRServer) SetRALsConnection(rls rpcclient.RpcClientConnection) { cdrS.rals = rls } + +// SetStorDB sets the new StorDB +// only used on reload +func (cdrS *CDRServer) SetStorDB(cdrDb CdrStorage) { + cdrS.cdrDb = cdrDb +} diff --git a/engine/datamanager.go b/engine/datamanager.go index b3ec1b072..36301d0df 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -17,7 +17,6 @@ package engine import ( "fmt" - "strconv" "strings" "github.com/cgrates/cgrates/config" @@ -2051,33 +2050,8 @@ func (dm *DataManager) SetLoadIDs(loadIDs map[string]int64) (err error) { // Reconnect recconnects to the DB when the config was changes func (dm *DataManager) Reconnect(marshaler string, newcfg *config.DataDbCfg) (err error) { - var d DataDB - switch newcfg.DataDbType { - case utils.REDIS: - var dbNb int - dbNb, err = strconv.Atoi(newcfg.DataDbName) - if err != nil { - utils.Logger.Crit("Redis db name must be an integer!") - return - } - host := newcfg.DataDbHost - if newcfg.DataDbPort != "" && strings.Index(host, ":") == -1 { - host += ":" + newcfg.DataDbPort - } - d, err = NewRedisStorage(host, dbNb, newcfg.DataDbPass, marshaler, utils.REDIS_MAX_CONNS, newcfg.DataDbSentinelName) - case utils.MONGO: - d, err = NewMongoStorage(newcfg.DataDbHost, newcfg.DataDbPort, newcfg.DataDbName, - newcfg.DataDbUser, newcfg.DataDbPass, utils.DataDB, nil, true) - case utils.INTERNAL: - if marshaler == utils.JSON { - d = NewInternalDBJson(nil, nil) - } else { - d = NewInternalDB(nil, nil) - } - default: - err = fmt.Errorf("unknown db '%s' valid options are '%s' or '%s or '%s'", - newcfg.DataDbType, utils.REDIS, utils.MONGO, utils.INTERNAL) - } + d, err := NewDataDBConn(newcfg.DataDbType, newcfg.DataDbHost, newcfg.DataDbPort, newcfg.DataDbName, + newcfg.DataDbUser, newcfg.DataDbPass, marshaler, newcfg.DataDbSentinelName) if err != nil { return } diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index c2793f418..464ed3a71 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -41,9 +41,13 @@ type InternalDB struct { func NewInternalDB(stringIndexedFields, prefixIndexedFields []string) *InternalDB { dfltCfg, _ := config.NewDefaultCGRConfig() - return &InternalDB{db: ltcache.NewTransCache(dfltCfg.CacheCfg().AsTransCacheConfig()), - ms: NewCodecMsgpackMarshaler(), stringIndexedFields: stringIndexedFields, - prefixIndexedFields: prefixIndexedFields, cnter: utils.NewCounter(time.Now().UnixNano(), 0)} + return &InternalDB{ + db: ltcache.NewTransCache(dfltCfg.CacheCfg().AsTransCacheConfig()), + ms: NewCodecMsgpackMarshaler(), + stringIndexedFields: stringIndexedFields, + prefixIndexedFields: prefixIndexedFields, + cnter: utils.NewCounter(time.Now().UnixNano(), 0), + } } func NewInternalDBJson(stringIndexedFields, prefixIndexedFields []string) (InternalDB *InternalDB) { @@ -52,6 +56,16 @@ func NewInternalDBJson(stringIndexedFields, prefixIndexedFields []string) (Inter return } +// SetStringIndexedFields set the stringIndexedFields, used at StorDB reload +func (iDB *InternalDB) SetStringIndexedFields(stringIndexedFields []string) { + iDB.stringIndexedFields = stringIndexedFields +} + +// SetPrefixIndexedFields set the prefixIndexedFields, used at StorDB reload +func (iDB *InternalDB) SetPrefixIndexedFields(prefixIndexedFields []string) { + iDB.prefixIndexedFields = prefixIndexedFields +} + func (iDB *InternalDB) Close() {} func (iDB *InternalDB) Flush(_ string) error { diff --git a/services/apierv1.go b/services/apierv1.go index 381573b3a..19213adc3 100644 --- a/services/apierv1.go +++ b/services/apierv1.go @@ -31,8 +31,7 @@ import ( // NewApierV1Service returns the ApierV1 Service func NewApierV1Service(cfg *config.CGRConfig, dm *DataDBService, - cdrStorage engine.CdrStorage, loadStorage engine.LoadStorage, - filterSChan chan *engine.FilterS, + storDB *StorDBService, filterSChan chan *engine.FilterS, server *utils.Server, cacheSChan, schedChan, attrsChan, dispatcherChan chan rpcclient.RpcClientConnection, schedService *SchedulerService, @@ -41,8 +40,7 @@ func NewApierV1Service(cfg *config.CGRConfig, dm *DataDBService, connChan: make(chan rpcclient.RpcClientConnection, 1), cfg: cfg, dm: dm, - cdrStorage: cdrStorage, - loadStorage: loadStorage, + storDB: storDB, filterSChan: filterSChan, server: server, cacheSChan: cacheSChan, @@ -59,8 +57,7 @@ type ApierV1Service struct { sync.RWMutex cfg *config.CGRConfig dm *DataDBService - cdrStorage engine.CdrStorage - loadStorage engine.LoadStorage + storDB *StorDBService filterSChan chan *engine.FilterS server *utils.Server cacheSChan chan rpcclient.RpcClientConnection @@ -110,9 +107,9 @@ func (api *ApierV1Service) Start() (err error) { } api.api = &v1.ApierV1{ - StorDb: api.loadStorage, DataManager: api.dm.GetDM(), - CdrDb: api.cdrStorage, + CdrDb: api.storDB.GetDM(), + StorDb: api.storDB.GetDM(), Config: api.cfg, Responder: api.responderService.GetResponder(), SchedulerService: api.schedService, @@ -164,6 +161,9 @@ func (api *ApierV1Service) Reload() (err error) { return } api.Lock() + if api.storDB.WasReconnected() { // rewrite the connection if was changed + api.api.SetStorDB(api.storDB.GetDM()) + } api.api.SetAttributeSConnection(attributeSrpc) api.api.SetCacheSConnection(cacheSrpc) api.api.SetSchedulerSConnection(schedulerSrpc) diff --git a/services/cdrs.go b/services/cdrs.go index 83d2e8a7f..9c1f5548e 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -33,14 +33,14 @@ import ( // NewCDRServer returns the CDR Server func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, - cdrStorage engine.CdrStorage, filterSChan chan *engine.FilterS, + storDB *StorDBService, filterSChan chan *engine.FilterS, server *utils.Server, internalCDRServerChan, chrsChan, respChan, attrsChan, thsChan, stsChan, dispatcherChan chan rpcclient.RpcClientConnection) servmanager.Service { return &CDRServer{ connChan: internalCDRServerChan, cfg: cfg, dm: dm, - cdrStorage: cdrStorage, + storDB: storDB, filterSChan: filterSChan, server: server, chrsChan: chrsChan, @@ -57,7 +57,7 @@ type CDRServer struct { sync.RWMutex cfg *config.CGRConfig dm *DataDBService - cdrStorage engine.CdrStorage + storDB *StorDBService filterSChan chan *engine.FilterS server *utils.Server chrsChan chan rpcclient.RpcClientConnection @@ -118,7 +118,7 @@ func (cdrS *CDRServer) Start() (err error) { } cdrS.Lock() defer cdrS.Unlock() - cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, cdrS.cdrStorage, cdrS.dm.GetDM(), + cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, cdrS.storDB.GetDM(), cdrS.dm.GetDM(), ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn, filterS) utils.Logger.Info("Registering CDRS HTTP Handlers.") cdrS.cdrS.RegisterHandlersToServer(cdrS.server) @@ -173,6 +173,9 @@ func (cdrS *CDRServer) Reload() (err error) { return } cdrS.Lock() + if cdrS.storDB.WasReconnected() { // rewrite the connection if was changed + cdrS.cdrS.SetStorDB(cdrS.storDB.GetDM()) + } cdrS.cdrS.SetRALsConnection(ralConn) cdrS.cdrS.SetAttributeSConnection(attrSConn) cdrS.cdrS.SetThresholSConnection(thresholdSConn) diff --git a/services/datadb.go b/services/datadb.go index 00e9f17b1..b6e602cc3 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -117,7 +117,7 @@ func (db *DataDBService) Reload() (err error) { db.Lock() defer db.Unlock() if db.needsConnectionReload() { - db.db.DataDB().Close() + // db.db.DataDB().Close() // already closed in reconnect if err = db.db.Reconnect(db.cfg.GeneralCfg().DBDataEncoding, db.cfg.DataDbCfg()); err != nil { return } diff --git a/services/rals.go b/services/rals.go index fb0b2043d..49838110f 100644 --- a/services/rals.go +++ b/services/rals.go @@ -32,12 +32,11 @@ import ( // NewRalService returns the Ral Service func NewRalService(cfg *config.CGRConfig, dm *DataDBService, - cdrStorage engine.CdrStorage, loadStorage engine.LoadStorage, - cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server, + storDB *StorDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server, thsChan, stsChan, cacheSChan, schedChan, attrsChan, dispatcherChan chan rpcclient.RpcClientConnection, schedulerService *SchedulerService, exitChan chan bool) *RalService { resp := NewResponderService(cfg, server, thsChan, stsChan, dispatcherChan, exitChan) - apiv1 := NewApierV1Service(cfg, dm, cdrStorage, loadStorage, filterSChan, server, cacheSChan, schedChan, attrsChan, dispatcherChan, schedulerService, resp) + apiv1 := NewApierV1Service(cfg, dm, storDB, filterSChan, server, cacheSChan, schedChan, attrsChan, dispatcherChan, schedulerService, resp) apiv2 := NewApierV2Service(apiv1, cfg, server) return &RalService{ connChan: make(chan rpcclient.RpcClientConnection, 1), diff --git a/services/stordb.go b/services/stordb.go new file mode 100644 index 000000000..43640917b --- /dev/null +++ b/services/stordb.go @@ -0,0 +1,195 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewStorDBService returns the StorDB Service +func NewStorDBService(cfg *config.CGRConfig) *StorDBService { + return &StorDBService{ + cfg: cfg, + dbchan: make(chan engine.StorDB, 1), + // db: engine.NewInternalDB([]string{}, []string{}), // to be removed + } +} + +// StorDBService implements Service interface +type StorDBService struct { + sync.RWMutex + cfg *config.CGRConfig + oldDBCfg *config.StorDbCfg + + db engine.StorDB + dbchan chan engine.StorDB + + reconnected bool +} + +// Start should handle the sercive start +func (db *StorDBService) Start() (err error) { + if db.IsRunning() { + return fmt.Errorf("service aleady running") + } + db.Lock() + defer db.Unlock() + db.oldDBCfg = db.cfg.StorDbCfg().Clone() + d, err := engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host, + db.cfg.StorDbCfg().Port, db.cfg.StorDbCfg().Name, db.cfg.StorDbCfg().User, + db.cfg.StorDbCfg().Password, db.cfg.StorDbCfg().SSLMode, db.cfg.StorDbCfg().MaxOpenConns, + db.cfg.StorDbCfg().MaxIdleConns, db.cfg.StorDbCfg().ConnMaxLifetime, + db.cfg.StorDbCfg().StringIndexedFields, db.cfg.StorDbCfg().PrefixIndexedFields) + if err != nil { // Cannot configure getter database, show stopper + utils.Logger.Crit(fmt.Sprintf("Could not configure storDB: %s exiting!", err)) + return + } + db.db = d + engine.SetCdrStorage(db.db) + if err = engine.CheckVersions(db.db); err != nil { + fmt.Println(err) + return + } + db.dbchan <- db.db + return +} + +// GetIntenternalChan returns the internal connection chanel +func (db *StorDBService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return nil +} + +// Reload handles the change of config +func (db *StorDBService) Reload() (err error) { + db.Lock() + defer db.Unlock() + if db.reconnected = db.needsConnectionReload(); db.reconnected { + var d engine.StorDB + if d, err = engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host, + db.cfg.StorDbCfg().Port, db.cfg.StorDbCfg().Name, db.cfg.StorDbCfg().User, + db.cfg.StorDbCfg().Password, db.cfg.StorDbCfg().SSLMode, db.cfg.StorDbCfg().MaxOpenConns, + db.cfg.StorDbCfg().MaxIdleConns, db.cfg.StorDbCfg().ConnMaxLifetime, + db.cfg.StorDbCfg().StringIndexedFields, db.cfg.StorDbCfg().PrefixIndexedFields); err != nil { + return + } + db.db.Close() + db.db = d + db.oldDBCfg = db.cfg.StorDbCfg().Clone() + return + } + if db.cfg.StorDbCfg().Type == utils.MONGO { + mgo, canCast := db.db.(*engine.MongoStorage) + if !canCast { + return fmt.Errorf("can't conver StorDB of type %s to MongoStorage", + db.cfg.StorDbCfg().Type) + } + mgo.SetTTL(db.cfg.StorDbCfg().QueryTimeout) + } else if db.cfg.StorDbCfg().Type == utils.POSTGRES || + db.cfg.StorDbCfg().Type == utils.MYSQL { + msql, canCast := db.db.(*engine.SQLStorage) + if !canCast { + return fmt.Errorf("can't conver StorDB of type %s to SQLStorage", + db.cfg.StorDbCfg().Type) + } + msql.Db.SetMaxOpenConns(db.cfg.StorDbCfg().MaxOpenConns) + msql.Db.SetMaxIdleConns(db.cfg.StorDbCfg().MaxIdleConns) + msql.Db.SetConnMaxLifetime(time.Duration(db.cfg.StorDbCfg().ConnMaxLifetime) * time.Second) + } else if db.cfg.StorDbCfg().Type == utils.INTERNAL { + idb, canCast := db.db.(*engine.InternalDB) + if !canCast { + return fmt.Errorf("can't conver StorDB of type %s to InternalDB", + db.cfg.StorDbCfg().Type) + } + idb.SetStringIndexedFields(db.cfg.StorDbCfg().StringIndexedFields) + idb.SetPrefixIndexedFields(db.cfg.StorDbCfg().PrefixIndexedFields) + } + return +} + +// Shutdown stops the service +func (db *StorDBService) Shutdown() (err error) { + db.Lock() + db.db.Close() + db.db = nil + db.reconnected = false + db.Unlock() + return +} + +// IsRunning returns if the service is running +func (db *StorDBService) IsRunning() bool { + db.RLock() + defer db.RUnlock() + return db != nil && db.db != nil +} + +// ServiceName returns the service name +func (db *StorDBService) ServiceName() string { + return utils.StorDB +} + +// ShouldRun returns if the service should be running +func (db *StorDBService) ShouldRun() bool { + return db.cfg.RalsCfg().Enabled || db.cfg.CdrsCfg().Enabled +} + +// GetDM returns the StorDB +func (db *StorDBService) GetDM() engine.StorDB { + db.RLock() + defer db.RUnlock() + return db.db +} + +// needsConnectionReload returns if the DB connection needs to reloaded +func (db *StorDBService) needsConnectionReload() bool { + if db.oldDBCfg.Type != db.cfg.StorDbCfg().Type || + db.oldDBCfg.Host != db.cfg.StorDbCfg().Host || + db.oldDBCfg.Name != db.cfg.StorDbCfg().Name || + db.oldDBCfg.Port != db.cfg.StorDbCfg().Port || + db.oldDBCfg.User != db.cfg.StorDbCfg().User || + db.oldDBCfg.Password != db.cfg.StorDbCfg().Password { + return true + } + if db.cfg.StorDbCfg().Type == utils.POSTGRES && + db.oldDBCfg.SSLMode != db.cfg.StorDbCfg().SSLMode { + return true + } + return false +} + +// GetStorDBchan returns the StorDB chanel +func (db *StorDBService) GetStorDBchan() chan engine.StorDB { + db.RLock() + defer db.RUnlock() + return db.dbchan +} + +// WasReconnected returns if after reload the DB was recreated +func (db *StorDBService) WasReconnected() bool { + db.RLock() + defer db.RUnlock() + return db.reconnected +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 1f7b2a42f..3e498b7db 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -241,6 +241,9 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.ApierV1); err != nil { return } + if err = srvMngr.reloadService(utils.ApierV2); err != nil { + return + } case <-srvMngr.GetConfig().GetReloadChan(config.CDRS_JSN): if err = srvMngr.reloadService(utils.CDRServer); err != nil { return @@ -297,6 +300,10 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.DataDB); err != nil { return } + case <-srvMngr.GetConfig().GetReloadChan(config.STORDB_JSN): + if err = srvMngr.reloadService(utils.StorDB); err != nil { + return + } } // handle RPC server }