diff --git a/apier/v1/trends.go b/apier/v1/trends.go index aaac7fec6..755e94298 100644 --- a/apier/v1/trends.go +++ b/apier/v1/trends.go @@ -95,11 +95,13 @@ func (apierSv1 *APIerSv1) RemoveTrendProfile(ctx *context.Context, args *utils.T } // NewTrendSv1 initializes TrendSV1 -func NewTrendSv1() *TrendSv1 { +func NewTrendSv1(trs *engine.TrendService) *TrendSv1 { return &TrendSv1{} } -type TrendSv1 struct{} +type TrendSv1 struct { + trS *engine.TrendService +} func (sa *TrendSv1) Ping(ctx *context.Context, ign *utils.CGREvent, reply *string) error { *reply = utils.Pong diff --git a/engine/trends.go b/engine/trends.go index aaf8477a6..768d1bda9 100644 --- a/engine/trends.go +++ b/engine/trends.go @@ -21,6 +21,7 @@ package engine import ( "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -62,3 +63,18 @@ type Trend struct { func (tr *Trend) TenantID() string { return utils.ConcatenatedKey(tr.Tenant, tr.ID) } + +// NewTrendService the constructor for TrendS service +func NewTrendService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) *TrendService { + return &TrendService{ + dm: dm, + cgrcfg: cgrcfg, + filterS: filterS, + } +} + +type TrendService struct { + dm *DataManager + cgrcfg *config.CGRConfig + filterS *FilterS +} diff --git a/services/trends.go b/services/trends.go index e90f7a73e..17a7506a1 100644 --- a/services/trends.go +++ b/services/trends.go @@ -58,37 +58,42 @@ type TrendService struct { filterSChan chan *engine.FilterS server *cores.Server connMgr *engine.ConnManager - connChan chan birpc.ClientConnector - anz *AnalyzerService - srvDep map[string]*sync.WaitGroup + + trs *engine.TrendService + connChan chan birpc.ClientConnector + anz *AnalyzerService + srvDep map[string]*sync.WaitGroup } // Start should handle the sercive start -func (tr *TrendService) Start() error { - if tr.IsRunning() { +func (trs *TrendService) Start() error { + if trs.IsRunning() { return utils.ErrServiceAlreadyRunning } - tr.srvDep[utils.DataDB].Add(1) - <-tr.cacheS.GetPrecacheChannel(utils.CacheTrendProfiles) + trs.srvDep[utils.DataDB].Add(1) + <-trs.cacheS.GetPrecacheChannel(utils.CacheTrendProfiles) - filterS := <-tr.filterSChan - tr.filterSChan <- filterS - dbchan := tr.dm.GetDMChan() + filterS := <-trs.filterSChan + trs.filterSChan <- filterS + dbchan := trs.dm.GetDMChan() datadb := <-dbchan dbchan <- datadb utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TrendS)) - srv, err := engine.NewService(v1.NewTrendSv1()) + trs.Lock() + defer trs.Unlock() + trs.trs = engine.NewTrendService(datadb, trs.cfg, filterS) + srv, err := engine.NewService(v1.NewTrendSv1(trs.trs)) if err != nil { return err } - if !tr.cfg.DispatcherSCfg().Enabled { + if !trs.cfg.DispatcherSCfg().Enabled { for _, s := range srv { - tr.server.RpcRegister(s) + trs.server.RpcRegister(s) } } - tr.connChan <- tr.anz.GetInternalCodec(srv, utils.StatS) + trs.connChan <- trs.anz.GetInternalCodec(srv, utils.StatS) return nil }