added service for trends

This commit is contained in:
gezimblliku
2024-08-14 14:56:44 +02:00
committed by Dan Christian Bogos
parent 4e6e1e8159
commit b1218c4ae3
3 changed files with 39 additions and 16 deletions

View File

@@ -95,11 +95,13 @@ func (apierSv1 *APIerSv1) RemoveTrendProfile(ctx *context.Context, args *utils.T
}
// NewTrendSv1 initializes TrendSV1
func NewTrendSv1() *TrendSv1 {
func NewTrendSv1(trs *engine.TrendService) *TrendSv1 {
return &TrendSv1{}
}
type TrendSv1 struct{}
type TrendSv1 struct {
trS *engine.TrendService
}
func (sa *TrendSv1) Ping(ctx *context.Context, ign *utils.CGREvent, reply *string) error {
*reply = utils.Pong

View File

@@ -21,6 +21,7 @@ package engine
import (
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -62,3 +63,18 @@ type Trend struct {
func (tr *Trend) TenantID() string {
return utils.ConcatenatedKey(tr.Tenant, tr.ID)
}
// NewTrendService the constructor for TrendS service
func NewTrendService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) *TrendService {
return &TrendService{
dm: dm,
cgrcfg: cgrcfg,
filterS: filterS,
}
}
type TrendService struct {
dm *DataManager
cgrcfg *config.CGRConfig
filterS *FilterS
}

View File

@@ -58,37 +58,42 @@ type TrendService struct {
filterSChan chan *engine.FilterS
server *cores.Server
connMgr *engine.ConnManager
connChan chan birpc.ClientConnector
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
trs *engine.TrendService
connChan chan birpc.ClientConnector
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
}
// Start should handle the sercive start
func (tr *TrendService) Start() error {
if tr.IsRunning() {
func (trs *TrendService) Start() error {
if trs.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
tr.srvDep[utils.DataDB].Add(1)
<-tr.cacheS.GetPrecacheChannel(utils.CacheTrendProfiles)
trs.srvDep[utils.DataDB].Add(1)
<-trs.cacheS.GetPrecacheChannel(utils.CacheTrendProfiles)
filterS := <-tr.filterSChan
tr.filterSChan <- filterS
dbchan := tr.dm.GetDMChan()
filterS := <-trs.filterSChan
trs.filterSChan <- filterS
dbchan := trs.dm.GetDMChan()
datadb := <-dbchan
dbchan <- datadb
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem",
utils.CoreS, utils.TrendS))
srv, err := engine.NewService(v1.NewTrendSv1())
trs.Lock()
defer trs.Unlock()
trs.trs = engine.NewTrendService(datadb, trs.cfg, filterS)
srv, err := engine.NewService(v1.NewTrendSv1(trs.trs))
if err != nil {
return err
}
if !tr.cfg.DispatcherSCfg().Enabled {
if !trs.cfg.DispatcherSCfg().Enabled {
for _, s := range srv {
tr.server.RpcRegister(s)
trs.server.RpcRegister(s)
}
}
tr.connChan <- tr.anz.GetInternalCodec(srv, utils.StatS)
trs.connChan <- trs.anz.GetInternalCodec(srv, utils.StatS)
return nil
}