Integrate state deps across services for SERVICE_UP

This commit is contained in:
ionutboangiu
2024-12-04 17:19:38 +02:00
committed by Dan Christian Bogos
parent c29f1fdd35
commit cfdb3e80ca
37 changed files with 73 additions and 6 deletions

View File

@@ -514,7 +514,7 @@ const CGRATES_CFG_JSON = `
},
"efs": {
"enabled": false, // starts the EventReader service: <true|false>
"enabled": false, // starts the EventReader service: <true|false>
"poster_attempts": 3, // number of attempts before considering post request failed (eg: *httpPost, CDR exports)
"failed_posts_dir": "/var/spool/cgrates/failed_posts", // directory path where we store failed requests
"failed_posts_ttl": "5s" // time to wait before writing the failed posts in a single file

View File

@@ -53,6 +53,7 @@ func NewAccountService(cfg *config.CGRConfig, dm *DataDBService,
srvDep: srvDep,
rldChan: make(chan struct{}, 1),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -86,7 +87,6 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
if acts.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
acts.stateDeps = NewStateDependencies([]string{utils.StateServiceUP})
acts.cl = <-acts.clSChan
acts.clSChan <- acts.cl
if err = acts.cacheS.WaitToPrecache(ctx,
@@ -124,6 +124,7 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
acts.intRPCconn = anz.GetInternalCodec(srv, utils.AccountS)
acts.connChan <- acts.intRPCconn
close(acts.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -53,6 +53,7 @@ func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
srvDep: srvDep,
rldChan: make(chan struct{}, 1),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -124,6 +125,7 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er
acts.intRPCconn = anz.GetInternalCodec(srv, utils.ActionS)
acts.connChan <- acts.intRPCconn
close(acts.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -50,6 +50,7 @@ func NewAdminSv1Service(cfg *config.CGRConfig,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -124,7 +125,7 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
//backwards compatible
apiService.intRPCconn = anz.GetInternalCodec(srv, utils.AdminSv1)
apiService.connChan <- apiService.intRPCconn
close(apiService.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -47,6 +47,7 @@ func NewAnalyzerService(cfg *config.CGRConfig, clSChan chan *commonlisteners.Com
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -98,6 +99,7 @@ func (anz *AnalyzerService) Start(ctx *context.Context, shtDwn context.CancelFun
}(anz.anz)
anz.cl.SetAnalyzer(anz.anz)
go anz.start(ctx)
close(anz.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -42,6 +42,7 @@ func NewAsteriskAgent(cfg *config.CGRConfig,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -81,6 +82,7 @@ func (ast *AsteriskAgent) Start(_ *context.Context, shtDwn context.CancelFunc) (
ast.smas[connIdx] = agents.NewAsteriskAgent(ast.cfg, connIdx, ast.connMgr)
go listenAndServe(ast.smas[connIdx], ast.stopChan)
}
close(ast.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -49,6 +49,7 @@ func NewCacheService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.C
rpc: internalChan,
cacheCh: make(chan *engine.CacheS, 1),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -101,6 +102,7 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e
}
cS.intRPCconn = anz.GetInternalCodec(srv, utils.CacheS)
cS.rpc <- cS.intRPCconn
close(cS.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -51,6 +51,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -120,6 +121,7 @@ func (cs *CDRService) Start(ctx *context.Context, _ context.CancelFunc) (err err
cs.intRPCconn = anz.GetInternalCodec(srv, utils.CDRServer)
cs.connChan <- cs.intRPCconn // Signal that cdrS is operational
close(cs.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -49,6 +49,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -113,6 +114,7 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e
chrS.intRPCconn = anz.GetInternalCodec(srv, utils.ChargerS)
chrS.connChan <- chrS.intRPCconn
close(chrS.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -41,6 +41,7 @@ func NewCommonListenerService(cfg *config.CGRConfig, caps *engine.Caps,
clSChan: clSChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -75,6 +76,7 @@ func (cl *CommonListenerService) Start(*context.Context, context.CancelFunc) err
if cl.cfg.ConfigSCfg().Enabled {
cl.cls.RegisterHTTPFunc(cl.cfg.ConfigSCfg().URL, config.HandlerConfigS)
}
close(cl.stateDeps.StateChan(utils.StateServiceUP))
return nil
}

View File

@@ -50,6 +50,7 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, clSChan chan *comm
srvDep: srvDep,
csCh: make(chan *cores.CoreS, 1),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -106,6 +107,7 @@ func (cS *CoreService) Start(ctx *context.Context, shtDw context.CancelFunc) err
cS.intRPCconn = anz.GetInternalCodec(srv, utils.CoreS)
cS.connChan <- cS.intRPCconn
close(cS.stateDeps.StateChan(utils.StateServiceUP))
return nil
}

View File

@@ -41,6 +41,7 @@ func NewDataDBService(cfg *config.CGRConfig, connMgr *engine.ConnManager, setVer
setVersions: setVersions,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -91,6 +92,7 @@ func (db *DataDBService) Start(*context.Context, context.CancelFunc) (err error)
}
db.dbchan <- db.dm
close(db.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -43,6 +43,7 @@ func NewDiameterAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
caps: caps,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -100,6 +101,7 @@ func (da *DiameterAgent) start(filterS *engine.FilterS, shtDwn context.CancelFun
shtDwn()
}
}(da.da)
close(da.stateDeps.StateChan(utils.StateServiceUP))
return nil
}

View File

@@ -50,6 +50,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
srvDep: srvDep,
srvsReload: make(map[string]chan struct{}),
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -120,7 +121,7 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
// dspS.server.SetDispatched()
dspS.intRPCconn = anz.GetInternalCodec(srv, utils.DispatcherS)
dspS.connChan <- dspS.intRPCconn
close(dspS.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -42,6 +42,7 @@ func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -82,6 +83,7 @@ func (dns *DNSAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err
}
dns.stopChan = make(chan struct{})
go dns.listenAndServe(dns.stopChan, shtDwn)
close(dns.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -46,6 +46,7 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -140,6 +141,7 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc
es.intRPCconn = anz.GetInternalCodec(srv, utils.EEs)
es.intConnChan <- es.intRPCconn
close(es.stateDeps.StateChan(utils.StateServiceUP))
return nil
}

View File

@@ -67,6 +67,7 @@ func NewExportFailoverService(cfg *config.CGRConfig, connMgr *engine.ConnManager
intConnChan: intConnChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -84,6 +85,7 @@ func (efServ *ExportFailoverService) Start(ctx *context.Context, _ context.Cance
efServ.srv, _ = engine.NewServiceWithPing(efServ.efS, utils.EfSv1, utils.V1Prfx)
efServ.cl.RpcRegister(efServ.srv)
efServ.Unlock()
close(efServ.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -52,6 +52,7 @@ func NewEventReaderService(
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -114,6 +115,7 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel
}
erS.intRPCconn = anz.GetInternalCodec(srv, utils.ERs)
erS.intConn <- erS.intRPCconn
close(erS.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -42,6 +42,7 @@ func NewFreeswitchAgent(cfg *config.CGRConfig,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -71,6 +72,7 @@ func (fS *FreeswitchAgent) Start(_ *context.Context, shtDwn context.CancelFunc)
fS.fS = agents.NewFSsessions(fS.cfg.FsAgentCfg(), fS.cfg.GeneralCfg().DefaultTimezone, fS.connMgr)
go fS.connect(shtDwn)
close(fS.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -38,6 +38,7 @@ func NewGlobalVarS(cfg *config.CGRConfig,
cfg: cfg,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -58,6 +59,7 @@ func (gv *GlobalVarS) Start(*context.Context, context.CancelFunc) error {
utils.DecimalContext.MinScale = gv.cfg.GeneralCfg().DecimalMinScale
utils.DecimalContext.Precision = gv.cfg.GeneralCfg().DecimalPrecision
utils.DecimalContext.RoundingMode = gv.cfg.GeneralCfg().DecimalRoundingMode
close(gv.stateDeps.StateChan(utils.StateServiceUP))
return nil
}

View File

@@ -44,6 +44,7 @@ func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -92,6 +93,7 @@ func (ha *HTTPAgent) Start(ctx *context.Context, _ context.CancelFunc) (err erro
agntCfg.ReplyPayload, agntCfg.RequestProcessors))
}
ha.Unlock()
close(ha.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -45,6 +45,7 @@ func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -102,6 +103,7 @@ func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err err
ja.started = true
ja.Unlock()
close(ja.stateDeps.StateChan(utils.StateServiceUP))
utils.Logger.Info(fmt.Sprintf("<%s> successfully started.", utils.JanusAgent))
return
}

View File

@@ -43,6 +43,7 @@ func NewKamailioAgent(cfg *config.CGRConfig,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -73,6 +74,7 @@ func (kam *KamailioAgent) Start(_ *context.Context, shtDwn context.CancelFunc) (
utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone))
go kam.connect(kam.kam, shtDwn)
close(kam.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -50,6 +50,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -115,6 +116,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
}
ldrs.intRPCconn = anz.GetInternalCodec(srv, utils.LoaderS)
ldrs.connChan <- ldrs.intRPCconn
close(ldrs.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -42,6 +42,7 @@ func NewRadiusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -90,7 +91,7 @@ func (rad *RadiusAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (
rad.stopChan = make(chan struct{})
go rad.listenAndServe(rad.rad, shtDwn)
close(rad.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -50,6 +50,7 @@ func NewRankingService(cfg *config.CGRConfig, dm *DataDBService,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -121,6 +122,7 @@ func (ran *RankingService) Start(ctx *context.Context, _ context.CancelFunc) (er
}
ran.intRPCconn = anz.GetInternalCodec(srv, utils.RankingS)
ran.connChan <- ran.intRPCconn
close(ran.stateDeps.StateChan(utils.StateServiceUP))
return nil
}

View File

@@ -49,6 +49,7 @@ func NewRateService(cfg *config.CGRConfig,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -154,6 +155,7 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
rs.intRPCconn = anz.GetInternalCodec(srv, utils.RateS)
rs.intConnChan <- rs.intRPCconn
close(rs.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -39,6 +39,7 @@ func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -72,7 +73,7 @@ func (dspS *RegistrarCService) Start(*context.Context, context.CancelFunc) (err
dspS.rldChan = make(chan struct{})
dspS.dspS = registrarc.NewRegistrarCService(dspS.cfg, dspS.connMgr)
go dspS.dspS.ListenAndServe(dspS.stopChan, dspS.rldChan)
close(dspS.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -49,6 +49,7 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -116,6 +117,7 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e
reS.intRPCconn = anz.GetInternalCodec(srv, utils.ResourceS)
reS.connChan <- reS.intRPCconn
close(reS.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -50,6 +50,7 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -114,6 +115,7 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e
}
routeS.intRPCconn = anz.GetInternalCodec(srv, utils.RouteS)
routeS.connChan <- routeS.intRPCconn
close(routeS.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -50,6 +50,7 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, filterSChan cha
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -123,6 +124,7 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
// run this in it's own goroutine
go smg.start(shtDw)
}
close(smg.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -42,6 +42,7 @@ func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr: connMgr,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -83,6 +84,7 @@ func (sip *SIPAgent) Start(ctx *context.Context, shtDwn context.CancelFunc) (err
return
}
go sip.listenAndServe(shtDwn)
close(sip.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -49,6 +49,7 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
anzChan: anzChan,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -117,6 +118,7 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
}
sts.intRPCconn = anz.GetInternalCodec(srv, utils.StatS)
sts.connChan <- sts.intRPCconn
close(sts.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -39,6 +39,7 @@ func NewStorDBService(cfg *config.CGRConfig, setVersions bool,
setVersions: setVersions,
srvDep: srvDep,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -88,6 +89,7 @@ func (db *StorDBService) Start(*context.Context, context.CancelFunc) (err error)
}
db.sync()
close(db.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -49,6 +49,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
srvDep: srvDep,
connMgr: connMgr,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -116,6 +117,7 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
}
thrs.intRPCconn = anz.GetInternalCodec(srv, utils.ThresholdS)
thrs.connChan <- thrs.intRPCconn
close(thrs.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -43,6 +43,7 @@ func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataD
connMgr: connMgr,
clSChan: clSChan,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -81,6 +82,7 @@ func (ts *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err err
ts.stopChan = make(chan struct{})
ts.srv, _ = birpc.NewService(apis.NewTPeSv1(ts.tpes), utils.EmptyString, false)
ts.cl.RpcRegister(ts.srv)
close(ts.stateDeps.StateChan(utils.StateServiceUP))
return
}

View File

@@ -49,6 +49,7 @@ func NewTrendService(cfg *config.CGRConfig, dm *DataDBService,
srvDep: srvDep,
filterSChan: filterSChan,
srvIndexer: srvIndexer,
stateDeps: NewStateDependencies([]string{utils.StateServiceUP}),
}
}
@@ -118,6 +119,7 @@ func (trs *TrendService) Start(ctx *context.Context, _ context.CancelFunc) (err
}
trs.intRPCconn = anz.GetInternalCodec(srv, utils.Trends)
trs.connChan <- trs.intRPCconn
close(trs.stateDeps.StateChan(utils.StateServiceUP))
return nil
}