Added AnalyzerSv1.StringQuery API to search over the recorded RPC calls

This commit is contained in:
Trial97
2020-10-29 17:19:27 +02:00
committed by Dan Christian Bogos
parent b2db71f303
commit f26a26296d
46 changed files with 246 additions and 161 deletions

View File

@@ -118,8 +118,9 @@ func (aS *AnalyzerService) logTrafic(id uint64, method string,
NewInfoRPC(id, method, params, result, err, enc, from, to, sTime, eTime))
}
// V1Search returns a list of API that match the query
func (aS *AnalyzerService) V1Search(searchstr string, reply *[]map[string]interface{}) error {
// V1StringQuery returns a list of API that match the query
func (aS *AnalyzerService) V1StringQuery(searchstr string, reply *[]map[string]interface{}) error {
s := bleve.NewSearchRequest(bleve.NewQueryStringQuery(searchstr))
s.Fields = []string{utils.Meta} // return all fields
searchResults, err := aS.db.Search(s)

View File

@@ -200,7 +200,7 @@ func testAnalyzerSChargerSv1ProcessEvent(t *testing.T) {
func testAnalyzerSV1Search(t *testing.T) {
var result []map[string]interface{}
if err := anzRPC.Call(utils.AnalyzerSv1Search, `+RequestEncoding:\*internal +RequestMethod:AttributeSv1\.ProcessEvent`, &result); err != nil {
if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, `+RequestEncoding:\*internal +RequestMethod:AttributeSv1\.ProcessEvent`, &result); err != nil {
t.Error(err)
} else if len(result) != 1 {
t.Errorf("Unexpected result: %s", utils.ToJSON(result))
@@ -209,7 +209,7 @@ func testAnalyzerSV1Search(t *testing.T) {
func testAnalyzerSV1Search2(t *testing.T) {
var result []map[string]interface{}
if err := anzRPC.Call(utils.AnalyzerSv1Search, `+RequestEncoding:\*json +RequestMethod:ChargerSv1\.ProcessEvent`, &result); err != nil {
if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, `+RequestEncoding:\*json +RequestMethod:ChargerSv1\.ProcessEvent`, &result); err != nil {
t.Error(err)
} else if len(result) != 1 {
t.Errorf("Unexpected result: %s", utils.ToJSON(result))

View File

@@ -240,13 +240,13 @@ func TestAnalyzersV1Search(t *testing.T) {
t.Fatal(err)
}
reply := []map[string]interface{}{}
if err = anz.V1Search(utils.CoreSv1Ping, &reply); err != nil {
if err = anz.V1StringQuery(utils.CoreSv1Ping, &reply); err != nil {
t.Fatal(err)
} else if len(reply) != 4 {
t.Errorf("Expected 4 hits received: %v", len(reply))
}
reply = []map[string]interface{}{}
if err = anz.V1Search("RequestMethod:"+utils.CoreSv1Ping, &reply); err != nil {
if err = anz.V1StringQuery("RequestMethod:"+utils.CoreSv1Ping, &reply); err != nil {
t.Fatal(err)
} else if len(reply) != 4 {
t.Errorf("Expected 4 hits received: %v", len(reply))
@@ -264,20 +264,20 @@ func TestAnalyzersV1Search(t *testing.T) {
"RequestStartTime": t1.Add(-24 * time.Hour).UTC().Format(time.RFC3339),
}}
reply = []map[string]interface{}{}
if err = anz.V1Search(utils.RequestDuration+":>="+strconv.FormatInt(int64(time.Hour), 10), &reply); err != nil {
if err = anz.V1StringQuery(utils.RequestDuration+":>="+strconv.FormatInt(int64(time.Hour), 10), &reply); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(expRply, reply) {
t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply))
}
reply = []map[string]interface{}{}
if err = anz.V1Search(utils.RequestStartTime+":<=\""+t1.Add(-23*time.Hour).UTC().Format(time.RFC3339)+"\"", &reply); err != nil {
if err = anz.V1StringQuery(utils.RequestStartTime+":<=\""+t1.Add(-23*time.Hour).UTC().Format(time.RFC3339)+"\"", &reply); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(expRply, reply) {
t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply))
}
reply = []map[string]interface{}{}
if err = anz.V1Search("RequestEncoding:*gob", &reply); err != nil {
if err = anz.V1StringQuery("RequestEncoding:*gob", &reply); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(expRply, reply) {
t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply))
@@ -286,7 +286,7 @@ func TestAnalyzersV1Search(t *testing.T) {
if err = anz.db.Close(); err != nil {
t.Fatal(err)
}
if err = anz.V1Search("RequestEncoding:*gob", &reply); err != bleve.ErrorIndexClosed {
if err = anz.V1StringQuery("RequestEncoding:*gob", &reply); err != bleve.ErrorIndexClosed {
t.Errorf("Expected error: %v,received: %+v", bleve.ErrorIndexClosed, err)
}
}

View File

@@ -45,6 +45,7 @@ func (aSv1 *AnalyzerSv1) Ping(ign *utils.CGREvent, reply *string) error {
return nil
}
func (aSv1 *AnalyzerSv1) Search(search string, reply *[]map[string]interface{}) error {
return aSv1.aS.V1Search(search, reply)
// StringQuery returns a list of API that match the query
func (aSv1 *AnalyzerSv1) StringQuery(search string, reply *[]map[string]interface{}) error {
return aSv1.aS.V1StringQuery(search, reply)
}

View File

@@ -567,38 +567,38 @@ func main() {
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, exitChan)
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan)
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager)
dspH := services.NewDispatcherHostsService(cfg, server, internalDispatcherSChan, connManager, exitChan)
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, anz)
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager, anz)
dspH := services.NewDispatcherHostsService(cfg, server, internalDispatcherSChan, connManager, exitChan, anz)
chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server,
internalChargerSChan, connManager)
tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan)
internalChargerSChan, connManager, anz)
tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan, anz)
stS := services.NewStatService(cfg, dmService, cacheS, filterSChan, server,
internalStatSChan, connManager)
internalStatSChan, connManager, anz)
reS := services.NewResourceService(cfg, dmService, cacheS, filterSChan, server,
internalResourceSChan, connManager)
internalResourceSChan, connManager, anz)
routeS := services.NewRouteService(cfg, dmService, cacheS, filterSChan, server,
internalRouteSChan, connManager)
internalRouteSChan, connManager, anz)
schS := services.NewSchedulerService(cfg, dmService, cacheS, filterSChan,
server, internalSchedulerSChan, connManager)
server, internalSchedulerSChan, connManager, anz)
rals := services.NewRalService(cfg, cacheS, server,
internalRALsChan, internalResponderChan,
exitChan, connManager)
exitChan, connManager, anz)
apiSv1 := services.NewAPIerSv1Service(cfg, dmService, storDBService, filterSChan, server, schS, rals.GetResponderService(),
internalAPIerSv1Chan, connManager)
internalAPIerSv1Chan, connManager, anz)
apiSv2 := services.NewAPIerSv2Service(apiSv1, cfg, server, internalAPIerSv2Chan)
apiSv2 := services.NewAPIerSv2Service(apiSv1, cfg, server, internalAPIerSv2Chan, anz)
cdrS := services.NewCDRServer(cfg, dmService, storDBService, filterSChan, server, internalCDRServerChan,
connManager)
connManager, anz)
smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, exitChan, connManager)
smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, exitChan, connManager, anz)
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, exitChan,
internalLoaderSChan, connManager)
internalLoaderSChan, connManager, anz)
srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, schS, rals,
rals.GetResponder(), apiSv1, apiSv2, cdrS, smg,
@@ -612,9 +612,9 @@ func main() {
services.NewHTTPAgent(cfg, filterSChan, server, connManager), // no reload
ldrs, anz, dspS, dspH, dmService, storDBService,
services.NewEventExporterService(cfg, filterSChan,
connManager, server, exitChan, internalEEsChan),
connManager, server, exitChan, internalEEsChan, anz),
services.NewRateService(cfg, cacheS, filterSChan, dmService,
server, exitChan, internalRateSChan),
server, exitChan, internalRateSChan, anz),
services.NewSIPAgent(cfg, filterSChan, exitChan, connManager),
)
srvManager.StartServices()

View File

@@ -122,6 +122,7 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium
* [StatS] Added support for nested fields in custom metrics
* [AccountS] Add Initial in AccountSummary as initail value before debit operation
* [General] For only *asap actions don't save AccountIDs withing ActionPlans
* [AnalyzerS] Added AnalyzerSv1.StringQuery API to search over the recorded RPC calls
-- DanB <danb@cgrates.org> Wed, 19 Feb 2020 13:25:52 +0200

View File

@@ -31,11 +31,6 @@ import (
"github.com/cgrates/rpcclient"
)
var (
// used to build the connector for analyzers
intAnzConn = func(c rpcclient.ClientConnector, to string) rpcclient.ClientConnector { return c }
)
// NewAnalyzerService returns the Analyzer Service
func NewAnalyzerService(cfg *config.CGRConfig, server *utils.Server, exitChan chan bool,
internalAnalyzerSChan chan rpcclient.ClientConnector) *AnalyzerService {
@@ -76,10 +71,7 @@ func (anz *AnalyzerService) Start() (err error) {
anz.exitChan <- true
return
}()
intAnzConn = func(c rpcclient.ClientConnector, to string) rpcclient.ClientConnector {
return anz.anz.NewAnalyzerConnector(c, utils.MetaInternal, utils.EmptyString, to)
}
utils.AnalizerWraperFunc = func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec {
anz.server.SetAnzWrapperFunc(func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec {
fromstr := ""
if from != nil {
fromstr = from.String()
@@ -89,7 +81,7 @@ func (anz *AnalyzerService) Start() (err error) {
tostr = to.String()
}
return anz.anz.NewServerCodec(conn, enc, fromstr, tostr)
}
})
anz.rpc = v1.NewAnalyzerSv1(anz.anz)
if !anz.cfg.DispatcherSCfg().Enabled {
anz.server.RpcRegister(anz.rpc)
@@ -132,6 +124,15 @@ func (anz *AnalyzerService) ShouldRun() bool {
return anz.cfg.AnalyzerSCfg().Enabled
}
// GetAnalyzerS returns the analyzer object
func (anz *AnalyzerService) GetAnalyzerS() *analyzers.AnalyzerService {
return anz.anz
}
// GetInternalCodec returns the connection wrapped in analyzer connector
func (anz *AnalyzerService) GetInternalCodec(c rpcclient.ClientConnector, to string) rpcclient.ClientConnector {
if !anz.IsRunning() {
return c
}
return anz.anz.NewAnalyzerConnector(c, utils.MetaInternal, utils.EmptyString, to)
}

View File

@@ -54,15 +54,15 @@ func TestApiersReload(t *testing.T) {
db := NewDataDBService(cfg, nil)
cfg.StorDbCfg().Type = utils.INTERNAL
stordb := NewStorDBService(cfg)
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1))
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz)
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz)
apiSv1 := NewAPIerSv1Service(cfg, db, stordb, filterSChan, server, schS, new(ResponderService),
make(chan rpcclient.ClientConnector, 1), nil)
make(chan rpcclient.ClientConnector, 1), nil, anz)
apiSv2 := NewAPIerSv2Service(apiSv1, cfg, server, make(chan rpcclient.ClientConnector, 1))
apiSv2 := NewAPIerSv2Service(apiSv1, cfg, server, make(chan rpcclient.ClientConnector, 1), anz)
srvMngr.AddServices(apiSv1, apiSv2, schS, tS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db, stordb)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -37,7 +37,8 @@ func NewAPIerSv1Service(cfg *config.CGRConfig, dm *DataDBService,
schedService *SchedulerService,
responderService *ResponderService,
internalAPIerSv1Chan chan rpcclient.ClientConnector,
connMgr *engine.ConnManager) *APIerSv1Service {
connMgr *engine.ConnManager,
anz *AnalyzerService) *APIerSv1Service {
return &APIerSv1Service{
connChan: internalAPIerSv1Chan,
cfg: cfg,
@@ -49,6 +50,7 @@ func NewAPIerSv1Service(cfg *config.CGRConfig, dm *DataDBService,
responderService: responderService,
connMgr: connMgr,
APIerSv1Chan: make(chan *v1.APIerSv1, 1),
anz: anz,
}
}
@@ -70,6 +72,7 @@ type APIerSv1Service struct {
syncStop chan struct{}
APIerSv1Chan chan *v1.APIerSv1
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -119,7 +122,7 @@ func (apiService *APIerSv1Service) Start() (err error) {
}
//backwards compatible
apiService.connChan <- intAnzConn(apiService.api, utils.APIerSv1)
apiService.connChan <- apiService.anz.GetInternalCodec(apiService.api, utils.APIerSv1)
apiService.APIerSv1Chan <- apiService.api
return

View File

@@ -30,12 +30,14 @@ import (
// NewAPIerSv2Service returns the APIerSv2 Service
func NewAPIerSv2Service(apiv1 *APIerSv1Service, cfg *config.CGRConfig,
server *utils.Server,
internalAPIerSv2Chan chan rpcclient.ClientConnector) *APIerSv2Service {
internalAPIerSv2Chan chan rpcclient.ClientConnector,
anz *AnalyzerService) *APIerSv2Service {
return &APIerSv2Service{
apiv1: apiv1,
connChan: internalAPIerSv2Chan,
cfg: cfg,
server: server,
anz: anz,
}
}
@@ -48,6 +50,7 @@ type APIerSv2Service struct {
apiv1 *APIerSv1Service
api *v2.APIerSv2
connChan chan rpcclient.ClientConnector
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -73,7 +76,7 @@ func (api *APIerSv2Service) Start() (err error) {
api.server.RpcRegisterName(utils.ApierV2, api.api)
}
api.connChan <- intAnzConn(api.api, utils.APIerSv2)
api.connChan <- api.anz.GetInternalCodec(api.api, utils.APIerSv2)
return
}

View File

@@ -33,7 +33,8 @@ import (
// NewAttributeService returns the Attribute Service
func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server, internalChan chan rpcclient.ClientConnector) servmanager.Service {
server *utils.Server, internalChan chan rpcclient.ClientConnector,
anz *AnalyzerService) servmanager.Service {
return &AttributeService{
connChan: internalChan,
cfg: cfg,
@@ -41,6 +42,7 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
anz: anz,
}
}
@@ -56,6 +58,7 @@ type AttributeService struct {
attrS *engine.AttributeService
rpc *v1.AttributeSv1 // useful on restart
connChan chan rpcclient.ClientConnector // publish the internal Subsystem when available
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -87,7 +90,7 @@ func (attrS *AttributeService) Start() (err error) {
if !attrS.cfg.DispatcherSCfg().Enabled {
attrS.server.RpcRegister(attrS.rpc)
}
attrS.connChan <- intAnzConn(attrS.rpc, utils.AttributeS)
attrS.connChan <- attrS.anz.GetInternalCodec(attrS.rpc, utils.AttributeS)
return
}

View File

@@ -50,12 +50,13 @@ func TestAttributeSReload(t *testing.T) {
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg, nil)
attrRPC := make(chan rpcclient.ClientConnector, 1)
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
attrS := NewAttributeService(cfg, db,
chS, filterSChan, server, attrRPC,
)
anz)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(attrS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -36,7 +36,8 @@ import (
func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
storDB *StorDBService, filterSChan chan *engine.FilterS,
server *utils.Server, internalCDRServerChan chan rpcclient.ClientConnector,
connMgr *engine.ConnManager) servmanager.Service {
connMgr *engine.ConnManager,
anz *AnalyzerService) servmanager.Service {
return &CDRServer{
connChan: internalCDRServerChan,
cfg: cfg,
@@ -45,6 +46,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
filterSChan: filterSChan,
server: server,
connMgr: connMgr,
anz: anz,
}
}
@@ -64,6 +66,7 @@ type CDRServer struct {
connMgr *engine.ConnManager
syncStop chan struct{}
anz *AnalyzerService
// storDBChan chan engine.StorDB
}
@@ -107,7 +110,7 @@ func (cdrService *CDRServer) Start() (err error) {
// Make the cdr server available for internal communication
cdrService.server.RpcRegister(cdrService.cdrS) // register CdrServer for internal usage (TODO: refactor this)
}
cdrService.connChan <- intAnzConn(cdrService.cdrS, utils.CDRServer) // Signal that cdrS is operational
cdrService.connChan <- cdrService.anz.GetInternalCodec(cdrService.cdrS, utils.CDRServer) // Signal that cdrS is operational
return
}

View File

@@ -63,19 +63,19 @@ func TestCdrsReload(t *testing.T) {
db := NewDataDBService(cfg, nil)
cfg.StorDbCfg().Type = utils.INTERNAL
stordb := NewStorDBService(cfg)
chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz)
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz)
ralS := NewRalService(cfg, chS, server,
make(chan rpcclient.ClientConnector, 1),
make(chan rpcclient.ClientConnector, 1),
engineShutdown, nil)
engineShutdown, nil, anz)
cdrsRPC := make(chan rpcclient.ClientConnector, 1)
cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server,
cdrsRPC,
nil)
cdrsRPC, nil, anz)
srvMngr.AddServices(cdrS, ralS, schS, chrS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown,
make(chan rpcclient.ClientConnector, 1), nil), db, stordb)
make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -33,7 +33,8 @@ import (
// NewChargerService returns the Charger Service
func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server,
internalChargerSChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager) servmanager.Service {
internalChargerSChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager,
anz *AnalyzerService) servmanager.Service {
return &ChargerService{
connChan: internalChargerSChan,
cfg: cfg,
@@ -42,6 +43,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan: filterSChan,
server: server,
connMgr: connMgr,
anz: anz,
}
}
@@ -58,6 +60,7 @@ type ChargerService struct {
chrS *engine.ChargerService
rpc *v1.ChargerSv1
connChan chan rpcclient.ClientConnector
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -88,7 +91,7 @@ func (chrS *ChargerService) Start() (err error) {
if !chrS.cfg.DispatcherSCfg().Enabled {
chrS.server.RpcRegister(cSv1)
}
chrS.connChan <- intAnzConn(cSv1, utils.ChargerS)
chrS.connChan <- chrS.anz.GetInternalCodec(cSv1, utils.ChargerS)
return
}

View File

@@ -51,12 +51,13 @@ func TestChargerSReload(t *testing.T) {
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg, nil)
attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1))
chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz)
chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(attrS, chrS,
NewLoaderService(cfg, db, filterSChan, server,
engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -51,9 +51,10 @@ func TestDataDBReload(t *testing.T) {
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
cM := engine.NewConnManager(cfg, nil)
db := NewDataDBService(cfg, cM)
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
srvMngr.AddServices(NewAttributeService(cfg, db,
chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1)),
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz),
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -32,13 +32,15 @@ import (
// NewDispatcherHostsService returns the Dispatcher Service
func NewDispatcherHostsService(cfg *config.CGRConfig, server *utils.Server,
internalChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager,
exitChan chan bool) servmanager.Service {
exitChan chan bool,
anz *AnalyzerService) servmanager.Service {
return &DispatcherHostsService{
connChan: internalChan,
cfg: cfg,
server: server,
connMgr: connMgr,
exitChan: exitChan,
anz: anz,
}
}
@@ -53,6 +55,7 @@ type DispatcherHostsService struct {
dspS *dispatcherh.DispatcherHostsService
// rpc *v1.DispatcherHSv1
connChan chan rpcclient.ClientConnector
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -66,7 +69,7 @@ func (dspS *DispatcherHostsService) Start() (err error) {
dspS.dspS = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr)
go dspS.dspS.ListenAndServe()
dspS.connChan <- intAnzConn(dspS.dspS, utils.DispatcherH)
dspS.connChan <- dspS.anz.GetInternalCodec(dspS.dspS, utils.DispatcherH)
return
}

View File

@@ -35,7 +35,9 @@ import (
// NewDispatcherService returns the Dispatcher Service
func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server, internalChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager) servmanager.Service {
server *utils.Server, internalChan chan rpcclient.ClientConnector,
connMgr *engine.ConnManager,
anz *AnalyzerService) servmanager.Service {
return &DispatcherService{
connChan: internalChan,
cfg: cfg,
@@ -44,6 +46,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan: filterSChan,
server: server,
connMgr: connMgr,
anz: anz,
}
}
@@ -60,6 +63,7 @@ type DispatcherService struct {
dspS *dispatchers.DispatcherService
rpc *v1.DispatcherSv1
connChan chan rpcclient.ClientConnector
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -145,7 +149,7 @@ func (dspS *DispatcherService) Start() (err error) {
dspS.server.RpcRegisterName(utils.RateSv1,
v1.NewDispatcherRateSv1(dspS.dspS))
dspS.connChan <- intAnzConn(dspS.dspS, utils.DispatcherS)
dspS.connChan <- dspS.anz.GetInternalCodec(dspS.dspS, utils.DispatcherS)
return
}

View File

@@ -52,13 +52,14 @@ func TestDispatcherSReload(t *testing.T) {
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg, nil)
attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1))
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz)
srv := NewDispatcherService(cfg, db, chS, filterSChan, server,
make(chan rpcclient.ClientConnector, 1), nil)
make(chan rpcclient.ClientConnector, 1), nil, anz)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(attrS, srv,
NewLoaderService(cfg, db, filterSChan, server,
engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -50,12 +50,13 @@ func TestDNSAgentReload(t *testing.T) {
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg, nil)
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
engineShutdown, nil)
engineShutdown, nil, anz)
srv := NewDNSAgent(cfg, filterSChan, engineShutdown, nil)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Fatal(err)
}

View File

@@ -34,7 +34,8 @@ import (
// NewEventExporterService constructs EventExporterService
func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager, server *utils.Server, exitChan chan bool,
intConnChan chan rpcclient.ClientConnector) servmanager.Service {
intConnChan chan rpcclient.ClientConnector,
anz *AnalyzerService) servmanager.Service {
return &EventExporterService{
cfg: cfg,
filterSChan: filterSChan,
@@ -43,6 +44,7 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil
exitChan: exitChan,
intConnChan: intConnChan,
rldChan: make(chan struct{}),
anz: anz,
}
}
@@ -60,6 +62,7 @@ type EventExporterService struct {
eeS *ees.EventExporterS
rpc *v1.EventExporterSv1
anz *AnalyzerService
}
// ServiceName returns the service name
@@ -129,6 +132,6 @@ func (es *EventExporterService) Start() (err error) {
if !es.cfg.DispatcherSCfg().Enabled {
es.server.RpcRegister(es.rpc)
}
es.intConnChan <- intAnzConn(es.eeS, utils.EventExporterS)
es.intConnChan <- es.anz.GetInternalCodec(es.eeS, utils.EventExporterS)
return
}

View File

@@ -59,12 +59,13 @@ func TestEventExporterSReload(t *testing.T) {
chS := engine.NewCacheS(cfg, nil)
close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles))
close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes))
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
attrS := NewAttributeService(cfg, db,
chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1),
)
ees := NewEventExporterService(cfg, filterSChan, engine.NewConnManager(cfg, nil), server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
anz)
ees := NewEventExporterService(cfg, filterSChan, engine.NewConnManager(cfg, nil), server, engineShutdown, make(chan rpcclient.ClientConnector, 1), anz)
srvMngr.AddServices(ees, attrS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -54,12 +54,13 @@ func TestEventReaderSReload(t *testing.T) {
engineShutdown := make(chan bool, 1)
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
db := NewDataDBService(cfg, nil)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil)
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, anz)
attrS := NewEventReaderService(cfg, filterSChan, engineShutdown, nil)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(attrS, sS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -33,7 +33,8 @@ import (
func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan chan *engine.FilterS, server *utils.Server,
exitChan chan bool, internalLoaderSChan chan rpcclient.ClientConnector,
connMgr *engine.ConnManager) *LoaderService {
connMgr *engine.ConnManager,
anz *AnalyzerService) *LoaderService {
return &LoaderService{
connChan: internalLoaderSChan,
cfg: cfg,
@@ -42,6 +43,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
server: server,
exitChan: exitChan,
connMgr: connMgr,
anz: anz,
}
}
@@ -58,6 +60,7 @@ type LoaderService struct {
rpc *v1.LoaderSv1
connChan chan rpcclient.ClientConnector
connMgr *engine.ConnManager
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -85,7 +88,7 @@ func (ldrs *LoaderService) Start() (err error) {
if !ldrs.cfg.DispatcherSCfg().Enabled {
ldrs.server.RpcRegister(ldrs.rpc)
}
ldrs.connChan <- intAnzConn(ldrs.rpc, utils.LoaderS)
ldrs.connChan <- ldrs.anz.GetInternalCodec(ldrs.rpc, utils.LoaderS)
return
}

View File

@@ -32,8 +32,9 @@ import (
// NewRalService returns the Ral Service
func NewRalService(cfg *config.CGRConfig, cacheS *engine.CacheS, server *utils.Server,
internalRALsChan, internalResponderChan chan rpcclient.ClientConnector, exitChan chan bool,
connMgr *engine.ConnManager) *RalService {
resp := NewResponderService(cfg, server, internalResponderChan, exitChan)
connMgr *engine.ConnManager,
anz *AnalyzerService) *RalService {
resp := NewResponderService(cfg, server, internalResponderChan, exitChan, anz)
return &RalService{
connChan: internalRALsChan,
@@ -42,6 +43,7 @@ func NewRalService(cfg *config.CGRConfig, cacheS *engine.CacheS, server *utils.S
server: server,
responder: resp,
connMgr: connMgr,
anz: anz,
}
}
@@ -55,6 +57,7 @@ type RalService struct {
responder *ResponderService
connChan chan rpcclient.ClientConnector
connMgr *engine.ConnManager
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -91,7 +94,7 @@ func (rals *RalService) Start() (err error) {
rals.server.RpcRegister(rals.rals)
}
rals.connChan <- intAnzConn(rals.rals, utils.RALService)
rals.connChan <- rals.anz.GetInternalCodec(rals.rals, utils.RALService)
return
}

View File

@@ -62,15 +62,16 @@ func TestRalsReload(t *testing.T) {
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg, nil)
cfg.StorDbCfg().Type = utils.INTERNAL
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
stordb := NewStorDBService(cfg)
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1))
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz)
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz)
ralS := NewRalService(cfg, chS, server,
make(chan rpcclient.ClientConnector, 1),
make(chan rpcclient.ClientConnector, 1),
engineShutdown, nil)
engineShutdown, nil, anz)
srvMngr.AddServices(ralS, schS, tS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db, stordb)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -40,7 +40,8 @@ func NewRateService(
filterSChan chan *engine.FilterS,
dmS *DataDBService,
server *utils.Server, exitChan chan bool,
intConnChan chan rpcclient.ClientConnector) servmanager.Service {
intConnChan chan rpcclient.ClientConnector,
anz *AnalyzerService) servmanager.Service {
return &RateService{
cfg: cfg,
cacheS: cacheS,
@@ -50,6 +51,7 @@ func NewRateService(
exitChan: exitChan,
intConnChan: intConnChan,
rldChan: make(chan struct{}),
anz: anz,
}
}
@@ -69,6 +71,7 @@ type RateService struct {
rateS *rates.RateS
rpc *v1.RateSv1
intConnChan chan rpcclient.ClientConnector
anz *AnalyzerService
}
// ServiceName returns the service name
@@ -138,6 +141,6 @@ func (rs *RateService) Start() (err error) {
rs.server.RpcRegister(rs.rpc)
}
rs.intConnChan <- intAnzConn(rs.rpc, utils.RateS)
rs.intConnChan <- rs.anz.GetInternalCodec(rs.rpc, utils.RateS)
return
}

View File

@@ -49,9 +49,10 @@ func TestRateSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheRateProfiles))
close(chS.GetPrecacheChannel(utils.CacheRateProfilesFilterIndexes))
close(chS.GetPrecacheChannel(utils.CacheRateFilterIndexes))
rS := NewRateService(cfg, chS, filterSChan, db, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
rS := NewRateService(cfg, chS, filterSChan, db, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), anz)
srvMngr.AddServices(rS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -34,7 +34,8 @@ import (
func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server, internalResourceSChan chan rpcclient.ClientConnector,
connMgr *engine.ConnManager) servmanager.Service {
connMgr *engine.ConnManager,
anz *AnalyzerService) servmanager.Service {
return &ResourceService{
connChan: internalResourceSChan,
cfg: cfg,
@@ -43,6 +44,7 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan: filterSChan,
server: server,
connMgr: connMgr,
anz: anz,
}
}
@@ -59,6 +61,7 @@ type ResourceService struct {
rpc *v1.ResourceSv1
connChan chan rpcclient.ClientConnector
connMgr *engine.ConnManager
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -90,7 +93,7 @@ func (reS *ResourceService) Start() (err error) {
if !reS.cfg.DispatcherSCfg().Enabled {
reS.server.RpcRegister(reS.rpc)
}
reS.connChan <- intAnzConn(reS.rpc, utils.ResourceS)
reS.connChan <- reS.anz.GetInternalCodec(reS.rpc, utils.ResourceS)
return
}

View File

@@ -53,12 +53,13 @@ func TestResourceSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheResourceFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
db := NewDataDBService(cfg, nil)
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1))
reS := NewResourceService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz)
reS := NewResourceService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(tS, reS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -30,12 +30,13 @@ import (
// NewResponderService returns the Resonder Service
func NewResponderService(cfg *config.CGRConfig, server *utils.Server,
internalRALsChan chan rpcclient.ClientConnector,
exitChan chan bool) *ResponderService {
exitChan chan bool, anz *AnalyzerService) *ResponderService {
return &ResponderService{
connChan: internalRALsChan,
cfg: cfg,
server: server,
exitChan: exitChan,
anz: anz,
}
}
@@ -48,6 +49,7 @@ type ResponderService struct {
resp *engine.Responder
connChan chan rpcclient.ClientConnector
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -68,7 +70,7 @@ func (resp *ResponderService) Start() (err error) {
resp.server.RpcRegister(resp.resp)
}
resp.connChan <- intAnzConn(resp.resp, utils.ResponderS) // Rater done
resp.connChan <- resp.anz.GetInternalCodec(resp.resp, utils.ResponderS) // Rater done
return
}

View File

@@ -34,7 +34,8 @@ import (
func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server, internalRouteSChan chan rpcclient.ClientConnector,
connMgr *engine.ConnManager) servmanager.Service {
connMgr *engine.ConnManager,
anz *AnalyzerService) servmanager.Service {
return &RouteService{
connChan: internalRouteSChan,
cfg: cfg,
@@ -43,6 +44,7 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan: filterSChan,
server: server,
connMgr: connMgr,
anz: anz,
}
}
@@ -59,6 +61,7 @@ type RouteService struct {
routeS *engine.RouteService
rpc *v1.RouteSv1
connChan chan rpcclient.ClientConnector
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -91,7 +94,7 @@ func (routeS *RouteService) Start() (err error) {
if !routeS.cfg.DispatcherSCfg().Enabled {
routeS.server.RpcRegister(routeS.rpc)
}
routeS.connChan <- intAnzConn(routeS.rpc, utils.RouteS)
routeS.connChan <- routeS.anz.GetInternalCodec(routeS.rpc, utils.RouteS)
return
}

View File

@@ -52,11 +52,12 @@ func TestSupplierSReload(t *testing.T) {
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg, nil)
sts := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
supS := NewRouteService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
sts := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz)
supS := NewRouteService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(supS, sts,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -33,7 +33,8 @@ import (
func NewSchedulerService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, fltrSChan chan *engine.FilterS,
server *utils.Server, internalSchedulerrSChan chan rpcclient.ClientConnector,
connMgr *engine.ConnManager) *SchedulerService {
connMgr *engine.ConnManager,
anz *AnalyzerService) *SchedulerService {
return &SchedulerService{
connChan: internalSchedulerrSChan,
cfg: cfg,
@@ -42,6 +43,7 @@ func NewSchedulerService(cfg *config.CGRConfig, dm *DataDBService,
fltrSChan: fltrSChan,
server: server,
connMgr: connMgr,
anz: anz,
}
}
@@ -58,6 +60,7 @@ type SchedulerService struct {
rpc *v1.SchedulerSv1
connChan chan rpcclient.ClientConnector
connMgr *engine.ConnManager
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -84,7 +87,7 @@ func (schS *SchedulerService) Start() (err error) {
if !schS.cfg.DispatcherSCfg().Enabled {
schS.server.RpcRegister(schS.rpc)
}
schS.connChan <- intAnzConn(schS.rpc, utils.SchedulerS)
schS.connChan <- schS.anz.GetInternalCodec(schS.rpc, utils.SchedulerS)
return
}

View File

@@ -46,10 +46,11 @@ func TestSchedulerSReload(t *testing.T) {
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg, nil)
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(schS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -35,7 +35,8 @@ import (
// NewSessionService returns the Session Service
func NewSessionService(cfg *config.CGRConfig, dm *DataDBService,
server *utils.Server, internalChan chan rpcclient.ClientConnector,
exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service {
exitChan chan bool, connMgr *engine.ConnManager,
anz *AnalyzerService) servmanager.Service {
return &SessionService{
connChan: internalChan,
cfg: cfg,
@@ -43,6 +44,7 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService,
server: server,
exitChan: exitChan,
connMgr: connMgr,
anz: anz,
}
}
@@ -62,6 +64,7 @@ type SessionService struct {
// in order to stop the bircp server if necesary
bircpEnabled bool
connMgr *engine.ConnManager
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -82,7 +85,7 @@ func (smg *SessionService) Start() (err error) {
//start sync session in a separate gorutine
go smg.sm.ListenAndServe(smg.exitChan)
// Pass internal connection via BiRPCClient
smg.connChan <- intAnzConn(smg.sm, utils.SessionS)
smg.connChan <- smg.anz.GetInternalCodec(smg.sm, utils.SessionS)
// Register RPC handler
smg.rpc = v1.NewSMGenericV1(smg.sm)

View File

@@ -69,19 +69,20 @@ func TestSessionSReload(t *testing.T) {
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg, nil)
cfg.StorDbCfg().Type = utils.INTERNAL
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
stordb := NewStorDBService(cfg)
chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz)
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz)
ralS := NewRalService(cfg, chS, server,
make(chan rpcclient.ClientConnector, 1), make(chan rpcclient.ClientConnector, 1),
engineShutdown, nil)
engineShutdown, nil, anz)
cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server,
make(chan rpcclient.ClientConnector, 1),
nil)
srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil)
nil, anz)
srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, anz)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, chrS, schS, ralS, cdrS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db, stordb)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -50,12 +50,13 @@ func TestSIPAgentReload(t *testing.T) {
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg, nil)
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1),
engineShutdown, nil)
engineShutdown, nil, anz)
srv := NewSIPAgent(cfg, filterSChan, engineShutdown, nil)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(srv, sS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Fatal(err)
}

View File

@@ -33,7 +33,9 @@ import (
// NewStatService returns the Stat Service
func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server, internalStatSChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager) servmanager.Service {
server *utils.Server, internalStatSChan chan rpcclient.ClientConnector,
connMgr *engine.ConnManager,
anz *AnalyzerService) servmanager.Service {
return &StatService{
connChan: internalStatSChan,
cfg: cfg,
@@ -42,6 +44,7 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
filterSChan: filterSChan,
server: server,
connMgr: connMgr,
anz: anz,
}
}
@@ -58,6 +61,7 @@ type StatService struct {
sts *engine.StatService
rpc *v1.StatSv1
connChan chan rpcclient.ClientConnector
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -92,7 +96,7 @@ func (sts *StatService) Start() (err error) {
if !sts.cfg.DispatcherSCfg().Enabled {
sts.server.RpcRegister(sts.rpc)
}
sts.connChan <- intAnzConn(sts.rpc, utils.StatS)
sts.connChan <- sts.anz.GetInternalCodec(sts.rpc, utils.StatS)
return
}

View File

@@ -53,12 +53,13 @@ func TestStatSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
db := NewDataDBService(cfg, nil)
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1))
sS := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz)
sS := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(tS, sS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -33,7 +33,8 @@ import (
// NewThresholdService returns the Threshold Service
func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server, internalThresholdSChan chan rpcclient.ClientConnector) servmanager.Service {
server *utils.Server, internalThresholdSChan chan rpcclient.ClientConnector,
anz *AnalyzerService) servmanager.Service {
return &ThresholdService{
connChan: internalThresholdSChan,
cfg: cfg,
@@ -41,6 +42,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
anz: anz,
}
}
@@ -56,6 +58,7 @@ type ThresholdService struct {
thrs *engine.ThresholdService
rpc *v1.ThresholdSv1
connChan chan rpcclient.ClientConnector
anz *AnalyzerService
}
// Start should handle the sercive start
@@ -87,7 +90,7 @@ func (thrs *ThresholdService) Start() (err error) {
if !thrs.cfg.DispatcherSCfg().Enabled {
thrs.server.RpcRegister(thrs.rpc)
}
thrs.connChan <- intAnzConn(thrs.rpc, utils.ThresholdS)
thrs.connChan <- thrs.anz.GetInternalCodec(thrs.rpc, utils.ThresholdS)
return
}

View File

@@ -48,11 +48,12 @@ func TestThresholdSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
db := NewDataDBService(cfg, nil)
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1))
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(tS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -25,9 +25,6 @@ import (
)
var ConReqs *ConcReqs
var AnalizerWraperFunc = func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec {
return conn
}
type ConcReqs struct {
limit int
@@ -80,12 +77,12 @@ type conn interface {
RemoteAddr() net.Addr
}
func newConcReqsGOBCodec(conn conn) rpc.ServerCodec {
return AnalizerWraperFunc(newConcReqsServerCodec(newGobServerCodec(conn)), MetaGOB, conn.RemoteAddr(), conn.LocalAddr())
func newConcReqsGOBCodec(conn conn, anz anzWrapFunc) rpc.ServerCodec {
return anz(newConcReqsServerCodec(newGobServerCodec(conn)), MetaGOB, conn.RemoteAddr(), conn.LocalAddr())
}
func newConcReqsJSONCodec(conn conn) rpc.ServerCodec {
return AnalizerWraperFunc(newConcReqsServerCodec(jsonrpc.NewServerCodec(conn)), MetaJSON, conn.RemoteAddr(), conn.LocalAddr())
func newConcReqsJSONCodec(conn conn, anz anzWrapFunc) rpc.ServerCodec {
return anz(newConcReqsServerCodec(jsonrpc.NewServerCodec(conn)), MetaJSON, conn.RemoteAddr(), conn.LocalAddr())
}
func newConcReqsServerCodec(sc rpc.ServerCodec) rpc.ServerCodec {

View File

@@ -1600,9 +1600,9 @@ const (
// AnalyzerS APIs
const (
AnalyzerSv1 = "AnalyzerSv1"
AnalyzerSv1Ping = "AnalyzerSv1.Ping"
AnalyzerSv1Search = "AnalyzerSv1.Search"
AnalyzerSv1 = "AnalyzerSv1"
AnalyzerSv1Ping = "AnalyzerSv1.Ping"
AnalyzerSv1StringQuery = "AnalyzerSv1.StringQuery"
)
// LoaderS APIs

View File

@@ -50,12 +50,18 @@ func init() {
gob.Register(time.Time{})
gob.Register(url.Values{})
}
type anzWrapFunc func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec
func NewServer() (s *Server) {
s = new(Server)
s.httpMux = http.NewServeMux()
s.httpsMux = http.NewServeMux()
s.stopbiRPCServer = make(chan struct{}, 1)
return s
return &Server{
httpMux: http.NewServeMux(),
httpsMux: http.NewServeMux(),
stopbiRPCServer: make(chan struct{}, 1),
anzWrapper: func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec {
return conn
},
}
}
type Server struct {
@@ -66,6 +72,11 @@ type Server struct {
stopbiRPCServer chan struct{} // used in order to fully stop the biRPC
httpsMux *http.ServeMux
httpMux *http.ServeMux
anzWrapper anzWrapFunc
}
func (s *Server) SetAnzWrapperFunc(anz anzWrapFunc) {
s.anzWrapper = anz
}
func (s *Server) RpcRegister(rcvr interface{}) {
@@ -171,7 +182,7 @@ func (s *Server) ServeJSON(addr string, exitChan chan bool) {
}
continue
}
go rpc.ServeCodec(newConcReqsJSONCodec(conn))
go rpc.ServeCodec(newConcReqsJSONCodec(conn, s.anzWrapper))
}
}
@@ -207,16 +218,16 @@ func (s *Server) ServeGOB(addr string, exitChan chan bool) {
}
continue
}
go rpc.ServeCodec(newConcReqsGOBCodec(conn))
go rpc.ServeCodec(newConcReqsGOBCodec(conn, s.anzWrapper))
}
}
func handleRequest(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.Header().Set("Content-Type", "application/json")
rmtIP, _ := GetRemoteIP(r)
rmtAddr, _ := net.ResolveIPAddr(EmptyString, rmtIP)
res := newRPCRequest(r.Body, rmtAddr).Call()
res := newRPCRequest(r.Body, rmtAddr, s.anzWrapper).Call()
io.Copy(w, res)
}
@@ -258,9 +269,9 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
Logger.Info("<HTTP> enabling handler for JSON-RPC")
if useBasicAuth {
s.httpMux.HandleFunc(jsonRPCURL, use(handleRequest, basicAuth(userList)))
s.httpMux.HandleFunc(jsonRPCURL, use(s.handleRequest, basicAuth(userList)))
} else {
s.httpMux.HandleFunc(jsonRPCURL, handleRequest)
s.httpMux.HandleFunc(jsonRPCURL, s.handleRequest)
}
}
if enabled && wsRPCURL != "" {
@@ -269,7 +280,7 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string,
s.Unlock()
Logger.Info("<HTTP> enabling handler for WebSocket connections")
wsHandler := websocket.Handler(func(ws *websocket.Conn) {
rpc.ServeCodec(newConcReqsJSONCodec(ws))
rpc.ServeCodec(newConcReqsJSONCodec(ws, s.anzWrapper))
})
if useBasicAuth {
s.httpMux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) {
@@ -340,10 +351,11 @@ type rpcRequest struct {
rw io.ReadWriter // holds the JSON formated RPC response
done chan bool // signals then end of the RPC request
remoteAddr net.Addr
anzWarpper anzWrapFunc
}
// newRPCRequest returns a new rpcRequest.
func newRPCRequest(r io.Reader, remoteAddr net.Addr) *rpcRequest {
func newRPCRequest(r io.Reader, remoteAddr net.Addr, anz anzWrapFunc) *rpcRequest {
var buf bytes.Buffer
done := make(chan bool)
return &rpcRequest{
@@ -351,6 +363,7 @@ func newRPCRequest(r io.Reader, remoteAddr net.Addr) *rpcRequest {
rw: &buf,
done: done,
remoteAddr: remoteAddr,
anzWarpper: anz,
}
}
@@ -378,7 +391,7 @@ func (r *rpcRequest) Close() error {
// Call invokes the RPC request, waits for it to complete, and returns the results.
func (r *rpcRequest) Call() io.Reader {
go rpc.ServeCodec(newConcReqsJSONCodec(r))
go rpc.ServeCodec(newConcReqsJSONCodec(r, r.anzWarpper))
<-r.done
return r.rw
}
@@ -460,7 +473,7 @@ func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string,
}
continue
}
go rpc.ServeCodec(newConcReqsGOBCodec(conn))
go rpc.ServeCodec(newConcReqsGOBCodec(conn, s.anzWrapper))
}
}
@@ -501,7 +514,7 @@ func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string,
}
continue
}
go rpc.ServeCodec(newConcReqsJSONCodec(conn))
go rpc.ServeCodec(newConcReqsJSONCodec(conn, s.anzWrapper))
}
}
@@ -521,9 +534,9 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP
s.Unlock()
Logger.Info("<HTTPS> enabling handler for JSON-RPC")
if useBasicAuth {
s.httpsMux.HandleFunc(jsonRPCURL, use(handleRequest, basicAuth(userList)))
s.httpsMux.HandleFunc(jsonRPCURL, use(s.handleRequest, basicAuth(userList)))
} else {
s.httpsMux.HandleFunc(jsonRPCURL, handleRequest)
s.httpsMux.HandleFunc(jsonRPCURL, s.handleRequest)
}
}
if enabled && wsRPCURL != "" {
@@ -532,7 +545,7 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP
s.Unlock()
Logger.Info("<HTTPS> enabling handler for WebSocket connections")
wsHandler := websocket.Handler(func(ws *websocket.Conn) {
rpc.ServeCodec(newConcReqsJSONCodec(ws))
rpc.ServeCodec(newConcReqsJSONCodec(ws, s.anzWrapper))
})
if useBasicAuth {
s.httpsMux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) {