diff --git a/apier/v1/resourcesv1_it_test.go b/apier/v1/resourcesv1_it_test.go index bba1a02ef..84de43413 100644 --- a/apier/v1/resourcesv1_it_test.go +++ b/apier/v1/resourcesv1_it_test.go @@ -19,7 +19,6 @@ along with this program. If not, see */ package v1 -/* import ( "net/rpc" "net/rpc/jsonrpc" @@ -43,42 +42,42 @@ var ( ) var sTestsRLSV1 = []func(t *testing.T){ - testV1RLSLoadConfig, - testV1RLSInitDataDb, - testV1RLSResetStorDb, - testV1RLSStartEngine, - testV1RLSRpcConn, - testV1RLSFromFolder, - testV1RLSGetResourcesFromEvent, - testV1RLSAllocateResource, - testV1RLSAllowUsage, - testV1RLSReleaseResource, - testV1RLSGetResourceConfigBeforeSet, - testV1RLSSetResourceConfig, - testV1RLSGetResourceConfigAfterSet, - testV1RLSUpdateResourceConfig, - testV1RLSGetResourceConfigAfterUpdate, - testV1RLSRemResourceCOnfig, - testV1RLSGetResourceConfigAfterDelete, - testV1RLSStopEngine, + testV1RsLoadConfig, + testV1RsInitDataDb, + testV1RsResetStorDb, + testV1RsStartEngine, + testV1RsRpcConn, + testV1RsFromFolder, + testV1RsGetResourcesFromEvent, + testV1RsAllocateResource, + testV1RsAllowUsage, + testV1RsReleaseResource, + testV1RsGetResourceConfigBeforeSet, + testV1RsSetResourceConfig, + testV1RsGetResourceConfigAfterSet, + testV1RsUpdateResourceConfig, + testV1RsGetResourceConfigAfterUpdate, + testV1RsRemResourceCOnfig, + testV1RsGetResourceConfigAfterDelete, + testV1RsStopEngine, } //Test start here -func TestRLSV1ITMySQL(t *testing.T) { +func TestRsV1ITMySQL(t *testing.T) { rlsV1ConfDIR = "tutmysql" for _, stest := range sTestsRLSV1 { t.Run(rlsV1ConfDIR, stest) } } -func TestRLSV1ITMongo(t *testing.T) { +func TestRsV1ITMongo(t *testing.T) { rlsV1ConfDIR = "tutmongo" for _, stest := range sTestsRLSV1 { t.Run(rlsV1ConfDIR, stest) } } -func testV1RLSLoadConfig(t *testing.T) { +func testV1RsLoadConfig(t *testing.T) { var err error rlsV1CfgPath = path.Join(*dataDir, "conf", "samples", rlsV1ConfDIR) if rlsV1Cfg, err = config.NewCGRConfigFromFolder(rlsV1CfgPath); err != nil { @@ -92,26 +91,26 @@ func testV1RLSLoadConfig(t *testing.T) { } } -func testV1RLSInitDataDb(t *testing.T) { +func testV1RsInitDataDb(t *testing.T) { if err := engine.InitDataDb(rlsV1Cfg); err != nil { t.Fatal(err) } } // Wipe out the cdr database -func testV1RLSResetStorDb(t *testing.T) { +func testV1RsResetStorDb(t *testing.T) { if err := engine.InitStorDb(rlsV1Cfg); err != nil { t.Fatal(err) } } -func testV1RLSStartEngine(t *testing.T) { +func testV1RsStartEngine(t *testing.T) { if _, err := engine.StopStartEngine(rlsV1CfgPath, resDelay); err != nil { t.Fatal(err) } } -func testV1RLSRpcConn(t *testing.T) { +func testV1RsRpcConn(t *testing.T) { var err error rlsV1Rpc, err = jsonrpc.Dial("tcp", rlsV1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed if err != nil { @@ -119,7 +118,7 @@ func testV1RLSRpcConn(t *testing.T) { } } -func testV1RLSFromFolder(t *testing.T) { +func testV1RsFromFolder(t *testing.T) { var reply string time.Sleep(time.Duration(2000) * time.Millisecond) attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} @@ -129,7 +128,7 @@ func testV1RLSFromFolder(t *testing.T) { time.Sleep(time.Duration(1000) * time.Millisecond) } -func testV1RLSGetResourcesFromEvent(t *testing.T) { +func testV1RsGetResourcesFromEvent(t *testing.T) { var reply *[]*engine.ResourceCfg ev := map[string]interface{}{"Unknown": "unknown"} if err := rlsV1Rpc.Call("ResourceSV1.GetResourcesForEvent", ev, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { @@ -171,7 +170,7 @@ func testV1RLSGetResourcesFromEvent(t *testing.T) { } } -func testV1RLSAllocateResource(t *testing.T) { +func testV1RsAllocateResource(t *testing.T) { var reply string attrRU := utils.AttrRLsResourceUsage{ @@ -216,7 +215,7 @@ func testV1RLSAllocateResource(t *testing.T) { } -func testV1RLSAllowUsage(t *testing.T) { +func testV1RsAllowUsage(t *testing.T) { var reply bool attrRU := utils.AttrRLsResourceUsage{ UsageID: "651a8db2-4f67-4cf8-b622-169e8a482e51", @@ -239,7 +238,7 @@ func testV1RLSAllowUsage(t *testing.T) { } } -func testV1RLSReleaseResource(t *testing.T) { +func testV1RsReleaseResource(t *testing.T) { var reply interface{} attrRU := utils.AttrRLsResourceUsage{ @@ -265,14 +264,14 @@ func testV1RLSReleaseResource(t *testing.T) { } -func testV1RLSGetResourceConfigBeforeSet(t *testing.T) { +func testV1RsGetResourceConfigBeforeSet(t *testing.T) { var reply *string if err := rlsV1Rpc.Call("ApierV1.GetResourceConfig", &AttrGetResCfg{ID: "RCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } -func testV1RLSSetResourceConfig(t *testing.T) { +func testV1RsSetResourceConfig(t *testing.T) { rlsConfig = &engine.ResourceCfg{ ID: "RCFG1", Filters: []*engine.RequestFilter{ @@ -302,7 +301,7 @@ func testV1RLSSetResourceConfig(t *testing.T) { } } -func testV1RLSGetResourceConfigAfterSet(t *testing.T) { +func testV1RsGetResourceConfigAfterSet(t *testing.T) { var reply *engine.ResourceCfg if err := rlsV1Rpc.Call("ApierV1.GetResourceConfig", &AttrGetResCfg{ID: rlsConfig.ID}, &reply); err != nil { t.Error(err) @@ -311,7 +310,7 @@ func testV1RLSGetResourceConfigAfterSet(t *testing.T) { } } -func testV1RLSUpdateResourceConfig(t *testing.T) { +func testV1RsUpdateResourceConfig(t *testing.T) { var result string rlsConfig.Filters = []*engine.RequestFilter{ &engine.RequestFilter{ @@ -332,7 +331,7 @@ func testV1RLSUpdateResourceConfig(t *testing.T) { } } -func testV1RLSGetResourceConfigAfterUpdate(t *testing.T) { +func testV1RsGetResourceConfigAfterUpdate(t *testing.T) { var reply *engine.ResourceCfg if err := rlsV1Rpc.Call("ApierV1.GetResourceConfig", &AttrGetResCfg{ID: rlsConfig.ID}, &reply); err != nil { t.Error(err) @@ -341,7 +340,7 @@ func testV1RLSGetResourceConfigAfterUpdate(t *testing.T) { } } -func testV1RLSRemResourceCOnfig(t *testing.T) { +func testV1RsRemResourceCOnfig(t *testing.T) { var resp string if err := rlsV1Rpc.Call("ApierV1.RemResourceConfig", &AttrGetResCfg{ID: rlsConfig.ID}, &resp); err != nil { t.Error(err) @@ -350,16 +349,15 @@ func testV1RLSRemResourceCOnfig(t *testing.T) { } } -func testV1RLSGetResourceConfigAfterDelete(t *testing.T) { +func testV1RsGetResourceConfigAfterDelete(t *testing.T) { var reply *string if err := rlsV1Rpc.Call("ApierV1.GetResourceConfig", &AttrGetResCfg{ID: "RCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } -func testV1RLSStopEngine(t *testing.T) { +func testV1RsStopEngine(t *testing.T) { if err := engine.KillEngine(100); err != nil { t.Error(err) } } -*/ diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 3c3c646ed..556cbae4b 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -530,27 +530,31 @@ func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, data func startResourceService(internalRsChan, internalStatSConn chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dataDB engine.DataDB, server *utils.Server, exitChan chan bool) { var statsConn *rpcclient.RpcClientPool - if len(cfg.ResourceLimiterCfg().StatSConns) != 0 { // Stats connection init + if len(cfg.ResourceSCfg().StatSConns) != 0 { // Stats connection init statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.ResourceLimiterCfg().StatSConns, internalStatSConn, cfg.InternalTtl) + cfg.ResourceSCfg().StatSConns, internalStatSConn, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS: %s", err.Error())) exitChan <- true return } } - rS, err := engine.NewResourceService(cfg, dataDB, statsConn) + rS, err := engine.NewResourceService(dataDB, cfg.ResourceSCfg().StoreInterval, statsConn) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) exitChan <- true return } utils.Logger.Info(fmt.Sprintf("Starting Resource Service")) - if err := rS.ListenAndServe(exitChan); err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + go func() { + if err := rS.ListenAndServe(exitChan); err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + + } + rS.Shutdown() exitChan <- true return - } + }() rsV1 := v1.NewResourceSV1(rS) server.RpcRegister(rsV1) internalRsChan <- rsV1 @@ -848,7 +852,7 @@ func main() { } // Start RL service - if cfg.ResourceLimiterCfg().Enabled { + if cfg.ResourceSCfg().Enabled { go startResourceService(internalRsChan, internalStatSChan, cfg, dataDB, server, exitChan) } diff --git a/config/config.go b/config/config.go index 7678bb991..fae1035dc 100755 --- a/config/config.go +++ b/config/config.go @@ -265,7 +265,7 @@ type CGRConfig struct { AliasesServerEnabled bool // Starts PubSub as server: . UserServerEnabled bool // Starts User as server: UserServerIndexes []string // List of user profile field indexes - resourceLimiterCfg *ResourceLimiterConfig // Configuration for resource limiter + resourceSCfg *ResourceLimiterConfig // Configuration for resource limiter statsCfg *StatSCfg // Configuration for StatS MailerServer string // The server to use when sending emails out MailerAuthUser string // Authenticate to email server using this user @@ -417,7 +417,7 @@ func (self *CGRConfig) checkConfigSanity() error { } } for _, smFSRLsConn := range self.SmFsConfig.RLsConns { - if smFSRLsConn.Address == utils.MetaInternal && !self.resourceLimiterCfg.Enabled { + if smFSRLsConn.Address == utils.MetaInternal && !self.resourceSCfg.Enabled { return errors.New("RLs not enabled but referenced by SMFreeSWITCH component") } } @@ -441,7 +441,7 @@ func (self *CGRConfig) checkConfigSanity() error { } } for _, smKamRLsConn := range self.SmKamConfig.RLsConns { - if smKamRLsConn.Address == utils.MetaInternal && !self.resourceLimiterCfg.Enabled { + if smKamRLsConn.Address == utils.MetaInternal && !self.resourceSCfg.Enabled { return errors.New("RLs not enabled but requested by SM-Kamailio component") } } @@ -502,8 +502,8 @@ func (self *CGRConfig) checkConfigSanity() error { } } // ResourceLimiter checks - if self.resourceLimiterCfg != nil && self.resourceLimiterCfg.Enabled { - for _, connCfg := range self.resourceLimiterCfg.StatSConns { + if self.resourceSCfg != nil && self.resourceSCfg.Enabled { + for _, connCfg := range self.resourceSCfg.StatSConns { if connCfg.Address == utils.MetaInternal && !self.statsCfg.Enabled { return errors.New("StatS not enabled but requested by ResourceLimiter component.") } @@ -1075,10 +1075,10 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { } if jsnRLSCfg != nil { - if self.resourceLimiterCfg == nil { - self.resourceLimiterCfg = new(ResourceLimiterConfig) + if self.resourceSCfg == nil { + self.resourceSCfg = new(ResourceLimiterConfig) } - if self.resourceLimiterCfg.loadFromJsonCfg(jsnRLSCfg); err != nil { + if self.resourceSCfg.loadFromJsonCfg(jsnRLSCfg); err != nil { return err } } @@ -1146,8 +1146,8 @@ func (self *CGRConfig) RadiusAgentCfg() *RadiusAgentCfg { } // ToDo: fix locking here -func (self *CGRConfig) ResourceLimiterCfg() *ResourceLimiterConfig { - return self.resourceLimiterCfg +func (self *CGRConfig) ResourceSCfg() *ResourceLimiterConfig { + return self.resourceSCfg } // ToDo: fix locking diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index da7ff9edf..793278d1a 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -101,20 +101,8 @@ "indexes": ["Uuid"], // user profile field indexes }, -"rls": { - "enabled": true, // starts ResourceLimiter service: . - "cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> - "cache_dump_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|dur> - "usage_ttl": "3h", // expire usage records if older than this duration <""|*never|dur> -}, - "resources": { "enabled": true, - "stats_conns": [ - //{"address": "*internal"} - ], - "cache_dump_interval": "0s", - "usage_ttl": "3h", }, "stats": { diff --git a/engine/resources.go b/engine/resources.go index 8815bf772..de4b8328d 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -220,18 +220,18 @@ func (rs Resources) AllocateResource(ru *ResourceUsage, dryRun bool) (alcMessage } // Pas the config as a whole so we can ask access concurrently -func NewResourceService(cfg *config.CGRConfig, dataDB DataDB, statS rpcclient.RpcClientConnection) (*ResourceService, error) { +func NewResourceService(dataDB DataDB, storeInterval time.Duration, statS rpcclient.RpcClientConnection) (*ResourceService, error) { if statS != nil && reflect.ValueOf(statS).IsNil() { statS = nil } return &ResourceService{dataDB: dataDB, statS: statS, scEventResources: ltcache.New(ltcache.UnlimitedCaching, time.Duration(1)*time.Minute, false, nil), - lcEventResources: ltcache.New(ltcache.UnlimitedCaching, ltcache.UnlimitedCaching, false, nil)}, nil + lcEventResources: ltcache.New(ltcache.UnlimitedCaching, ltcache.UnlimitedCaching, false, nil), + storeInterval: storeInterval}, nil } // ResourceService is the service handling resources type ResourceService struct { - cfg *config.CGRConfig dataDB DataDB // So we can load the data in cache and index it statS rpcclient.RpcClientConnection // allows applying filters based on stats scEventResources *ltcache.Cache // short cache map[ruID], used to keep references to matched resources for events in allow queries @@ -239,7 +239,7 @@ type ResourceService struct { storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool srMux sync.RWMutex stopBackup chan struct{} // control storing process - backupInterval time.Duration + storeInterval time.Duration } // Called to start the service @@ -251,7 +251,7 @@ func (rS *ResourceService) ListenAndServe(exitChan chan bool) error { } // Called to shutdown the service -func (rS *ResourceService) ServiceShutdown() error { +func (rS *ResourceService) Shutdown() error { utils.Logger.Info(" service shutdown initialized") close(rS.stopBackup) rS.storeResources() @@ -306,7 +306,7 @@ func (rS *ResourceService) storeResources() { // backup will regularly store resources changed to dataDB func (rS *ResourceService) runBackup() { - if rS.backupInterval <= 0 { + if rS.storeInterval <= 0 { return } for { @@ -316,7 +316,7 @@ func (rS *ResourceService) runBackup() { } rS.storeResources() } - time.Sleep(rS.backupInterval) + time.Sleep(rS.storeInterval) } // cachedResourcesForEvent attempts to retrieve cached resources for an event @@ -341,7 +341,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) } lockIDs := utils.PrefixSliceItems(rIDs, utils.ResourcesPrefix) - guardian.Guardian.GuardIDs(rS.cfg.LockingTimeout, lockIDs...) + guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...) defer guardian.Guardian.UnguardIDs(lockIDs...) for i, rID := range rIDs { if r, err := rS.dataDB.GetResource(rID, false, ""); err != nil { @@ -370,7 +370,7 @@ func (rS *ResourceService) matchingResourcesForEvent(ev map[string]interface{}) return nil, err } lockIDs := utils.PrefixSliceItems(rIDs.Slice(), utils.ResourcesPrefix) - guardian.Guardian.GuardIDs(rS.cfg.LockingTimeout, lockIDs...) + guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...) defer guardian.Guardian.UnguardIDs(lockIDs...) for resName := range rIDs { rCfg, err := rS.dataDB.GetResourceCfg(resName, false, utils.NonTransactional) @@ -492,7 +492,7 @@ func (rS *ResourceService) V1AllocateResource(args utils.AttrRLsResourceUsage, r // index it for storing rS.srMux.Lock() for _, r := range mtcRLs { - if rS.backupInterval == -1 { + if rS.storeInterval == -1 { rS.StoreResource(r) } else if r.dirty != nil { *r.dirty = true // mark it to be saved @@ -514,12 +514,12 @@ func (rS *ResourceService) V1ReleaseResource(args utils.AttrRLsResourceUsage, re } mtcRLs.clearUsage(args.UsageID) rS.lcEventResources.Remove(args.UsageID) - if rS.backupInterval != -1 { + if rS.storeInterval != -1 { rS.srMux.Lock() } for _, r := range mtcRLs { if r.dirty != nil { - if rS.backupInterval == -1 { + if rS.storeInterval == -1 { rS.StoreResource(r) } else { *r.dirty = true // mark it to be saved @@ -527,7 +527,7 @@ func (rS *ResourceService) V1ReleaseResource(args utils.AttrRLsResourceUsage, re } } } - if rS.backupInterval != -1 { + if rS.storeInterval != -1 { rS.srMux.Unlock() } *reply = utils.OK