From f26a26296de0d2fd74b7a86984e2189f65d9be18 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 29 Oct 2020 17:19:27 +0200 Subject: [PATCH] Added AnalyzerSv1.StringQuery API to search over the recorded RPC calls --- analyzers/analyzers.go | 5 ++-- analyzers/analyzers_it_test.go | 4 +-- analyzers/analyzers_test.go | 12 ++++---- apier/v1/analyzer.go | 5 ++-- cmd/cgr-engine/cgr-engine.go | 34 +++++++++++----------- packages/debian/changelog | 1 + services/analyzers.go | 21 +++++++------- services/apiers_it_test.go | 12 ++++---- services/apierv1.go | 7 +++-- services/apierv2.go | 7 +++-- services/attributes.go | 7 +++-- services/attributes_it_test.go | 5 ++-- services/cdrs.go | 7 +++-- services/cdrs_it_test.go | 12 ++++---- services/chargers.go | 7 +++-- services/chargers_it_test.go | 7 +++-- services/datadb_it_test.go | 5 ++-- services/dispatcherh.go | 7 +++-- services/dispatchers.go | 8 ++++-- services/dispatchers_it_test.go | 7 +++-- services/dnsagent_it_test.go | 5 ++-- services/ees.go | 7 +++-- services/ees_it_test.go | 7 +++-- services/ers_it_test.go | 5 ++-- services/loaders.go | 7 +++-- services/rals.go | 9 ++++-- services/rals_it_test.go | 9 +++--- services/rates.go | 7 +++-- services/rates_it_test.go | 5 ++-- services/resources.go | 7 +++-- services/resources_it_test.go | 7 +++-- services/responders.go | 6 ++-- services/routes.go | 7 +++-- services/routes_it_test.go | 7 +++-- services/schedulers.go | 7 +++-- services/schedulers_it_test.go | 5 ++-- services/sessions.go | 7 +++-- services/sessions_it_test.go | 13 +++++---- services/sipagent_it_test.go | 5 ++-- services/stats.go | 8 ++++-- services/stats_it_test.go | 7 +++-- services/thresholds.go | 7 +++-- services/thresholds_it_test.go | 5 ++-- utils/concureqs.go | 11 +++---- utils/consts.go | 6 ++-- utils/server.go | 51 +++++++++++++++++++++------------ 46 files changed, 246 insertions(+), 161 deletions(-) diff --git a/analyzers/analyzers.go b/analyzers/analyzers.go index 298390112..647c1cf8b 100755 --- a/analyzers/analyzers.go +++ b/analyzers/analyzers.go @@ -118,8 +118,9 @@ func (aS *AnalyzerService) logTrafic(id uint64, method string, NewInfoRPC(id, method, params, result, err, enc, from, to, sTime, eTime)) } -// V1Search returns a list of API that match the query -func (aS *AnalyzerService) V1Search(searchstr string, reply *[]map[string]interface{}) error { +// V1StringQuery returns a list of API that match the query +func (aS *AnalyzerService) V1StringQuery(searchstr string, reply *[]map[string]interface{}) error { + s := bleve.NewSearchRequest(bleve.NewQueryStringQuery(searchstr)) s.Fields = []string{utils.Meta} // return all fields searchResults, err := aS.db.Search(s) diff --git a/analyzers/analyzers_it_test.go b/analyzers/analyzers_it_test.go index c03cc5187..a75dcefb9 100644 --- a/analyzers/analyzers_it_test.go +++ b/analyzers/analyzers_it_test.go @@ -200,7 +200,7 @@ func testAnalyzerSChargerSv1ProcessEvent(t *testing.T) { func testAnalyzerSV1Search(t *testing.T) { var result []map[string]interface{} - if err := anzRPC.Call(utils.AnalyzerSv1Search, `+RequestEncoding:\*internal +RequestMethod:AttributeSv1\.ProcessEvent`, &result); err != nil { + if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, `+RequestEncoding:\*internal +RequestMethod:AttributeSv1\.ProcessEvent`, &result); err != nil { t.Error(err) } else if len(result) != 1 { t.Errorf("Unexpected result: %s", utils.ToJSON(result)) @@ -209,7 +209,7 @@ func testAnalyzerSV1Search(t *testing.T) { func testAnalyzerSV1Search2(t *testing.T) { var result []map[string]interface{} - if err := anzRPC.Call(utils.AnalyzerSv1Search, `+RequestEncoding:\*json +RequestMethod:ChargerSv1\.ProcessEvent`, &result); err != nil { + if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, `+RequestEncoding:\*json +RequestMethod:ChargerSv1\.ProcessEvent`, &result); err != nil { t.Error(err) } else if len(result) != 1 { t.Errorf("Unexpected result: %s", utils.ToJSON(result)) diff --git a/analyzers/analyzers_test.go b/analyzers/analyzers_test.go index c243b5267..6a1f120a5 100644 --- a/analyzers/analyzers_test.go +++ b/analyzers/analyzers_test.go @@ -240,13 +240,13 @@ func TestAnalyzersV1Search(t *testing.T) { t.Fatal(err) } reply := []map[string]interface{}{} - if err = anz.V1Search(utils.CoreSv1Ping, &reply); err != nil { + if err = anz.V1StringQuery(utils.CoreSv1Ping, &reply); err != nil { t.Fatal(err) } else if len(reply) != 4 { t.Errorf("Expected 4 hits received: %v", len(reply)) } reply = []map[string]interface{}{} - if err = anz.V1Search("RequestMethod:"+utils.CoreSv1Ping, &reply); err != nil { + if err = anz.V1StringQuery("RequestMethod:"+utils.CoreSv1Ping, &reply); err != nil { t.Fatal(err) } else if len(reply) != 4 { t.Errorf("Expected 4 hits received: %v", len(reply)) @@ -264,20 +264,20 @@ func TestAnalyzersV1Search(t *testing.T) { "RequestStartTime": t1.Add(-24 * time.Hour).UTC().Format(time.RFC3339), }} reply = []map[string]interface{}{} - if err = anz.V1Search(utils.RequestDuration+":>="+strconv.FormatInt(int64(time.Hour), 10), &reply); err != nil { + if err = anz.V1StringQuery(utils.RequestDuration+":>="+strconv.FormatInt(int64(time.Hour), 10), &reply); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(expRply, reply) { t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply)) } reply = []map[string]interface{}{} - if err = anz.V1Search(utils.RequestStartTime+":<=\""+t1.Add(-23*time.Hour).UTC().Format(time.RFC3339)+"\"", &reply); err != nil { + if err = anz.V1StringQuery(utils.RequestStartTime+":<=\""+t1.Add(-23*time.Hour).UTC().Format(time.RFC3339)+"\"", &reply); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(expRply, reply) { t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply)) } reply = []map[string]interface{}{} - if err = anz.V1Search("RequestEncoding:*gob", &reply); err != nil { + if err = anz.V1StringQuery("RequestEncoding:*gob", &reply); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(expRply, reply) { t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply)) @@ -286,7 +286,7 @@ func TestAnalyzersV1Search(t *testing.T) { if err = anz.db.Close(); err != nil { t.Fatal(err) } - if err = anz.V1Search("RequestEncoding:*gob", &reply); err != bleve.ErrorIndexClosed { + if err = anz.V1StringQuery("RequestEncoding:*gob", &reply); err != bleve.ErrorIndexClosed { t.Errorf("Expected error: %v,received: %+v", bleve.ErrorIndexClosed, err) } } diff --git a/apier/v1/analyzer.go b/apier/v1/analyzer.go index fd0e6a468..8abca5a37 100755 --- a/apier/v1/analyzer.go +++ b/apier/v1/analyzer.go @@ -45,6 +45,7 @@ func (aSv1 *AnalyzerSv1) Ping(ign *utils.CGREvent, reply *string) error { return nil } -func (aSv1 *AnalyzerSv1) Search(search string, reply *[]map[string]interface{}) error { - return aSv1.aS.V1Search(search, reply) +// StringQuery returns a list of API that match the query +func (aSv1 *AnalyzerSv1) StringQuery(search string, reply *[]map[string]interface{}) error { + return aSv1.aS.V1StringQuery(search, reply) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 331b3c7d0..1c6feb449 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -567,38 +567,38 @@ func main() { // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, exitChan) - attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan) - dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager) - dspH := services.NewDispatcherHostsService(cfg, server, internalDispatcherSChan, connManager, exitChan) + attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, anz) + dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager, anz) + dspH := services.NewDispatcherHostsService(cfg, server, internalDispatcherSChan, connManager, exitChan, anz) chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server, - internalChargerSChan, connManager) - tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan) + internalChargerSChan, connManager, anz) + tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan, anz) stS := services.NewStatService(cfg, dmService, cacheS, filterSChan, server, - internalStatSChan, connManager) + internalStatSChan, connManager, anz) reS := services.NewResourceService(cfg, dmService, cacheS, filterSChan, server, - internalResourceSChan, connManager) + internalResourceSChan, connManager, anz) routeS := services.NewRouteService(cfg, dmService, cacheS, filterSChan, server, - internalRouteSChan, connManager) + internalRouteSChan, connManager, anz) schS := services.NewSchedulerService(cfg, dmService, cacheS, filterSChan, - server, internalSchedulerSChan, connManager) + server, internalSchedulerSChan, connManager, anz) rals := services.NewRalService(cfg, cacheS, server, internalRALsChan, internalResponderChan, - exitChan, connManager) + exitChan, connManager, anz) apiSv1 := services.NewAPIerSv1Service(cfg, dmService, storDBService, filterSChan, server, schS, rals.GetResponderService(), - internalAPIerSv1Chan, connManager) + internalAPIerSv1Chan, connManager, anz) - apiSv2 := services.NewAPIerSv2Service(apiSv1, cfg, server, internalAPIerSv2Chan) + apiSv2 := services.NewAPIerSv2Service(apiSv1, cfg, server, internalAPIerSv2Chan, anz) cdrS := services.NewCDRServer(cfg, dmService, storDBService, filterSChan, server, internalCDRServerChan, - connManager) + connManager, anz) - smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, exitChan, connManager) + smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, exitChan, connManager, anz) ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, exitChan, - internalLoaderSChan, connManager) + internalLoaderSChan, connManager, anz) srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, schS, rals, rals.GetResponder(), apiSv1, apiSv2, cdrS, smg, @@ -612,9 +612,9 @@ func main() { services.NewHTTPAgent(cfg, filterSChan, server, connManager), // no reload ldrs, anz, dspS, dspH, dmService, storDBService, services.NewEventExporterService(cfg, filterSChan, - connManager, server, exitChan, internalEEsChan), + connManager, server, exitChan, internalEEsChan, anz), services.NewRateService(cfg, cacheS, filterSChan, dmService, - server, exitChan, internalRateSChan), + server, exitChan, internalRateSChan, anz), services.NewSIPAgent(cfg, filterSChan, exitChan, connManager), ) srvManager.StartServices() diff --git a/packages/debian/changelog b/packages/debian/changelog index 5813022e5..b7f79001d 100644 --- a/packages/debian/changelog +++ b/packages/debian/changelog @@ -122,6 +122,7 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium * [StatS] Added support for nested fields in custom metrics * [AccountS] Add Initial in AccountSummary as initail value before debit operation * [General] For only *asap actions don't save AccountIDs withing ActionPlans + * [AnalyzerS] Added AnalyzerSv1.StringQuery API to search over the recorded RPC calls -- DanB Wed, 19 Feb 2020 13:25:52 +0200 diff --git a/services/analyzers.go b/services/analyzers.go index dd308e053..75f06bffc 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -31,11 +31,6 @@ import ( "github.com/cgrates/rpcclient" ) -var ( - // used to build the connector for analyzers - intAnzConn = func(c rpcclient.ClientConnector, to string) rpcclient.ClientConnector { return c } -) - // NewAnalyzerService returns the Analyzer Service func NewAnalyzerService(cfg *config.CGRConfig, server *utils.Server, exitChan chan bool, internalAnalyzerSChan chan rpcclient.ClientConnector) *AnalyzerService { @@ -76,10 +71,7 @@ func (anz *AnalyzerService) Start() (err error) { anz.exitChan <- true return }() - intAnzConn = func(c rpcclient.ClientConnector, to string) rpcclient.ClientConnector { - return anz.anz.NewAnalyzerConnector(c, utils.MetaInternal, utils.EmptyString, to) - } - utils.AnalizerWraperFunc = func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec { + anz.server.SetAnzWrapperFunc(func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec { fromstr := "" if from != nil { fromstr = from.String() @@ -89,7 +81,7 @@ func (anz *AnalyzerService) Start() (err error) { tostr = to.String() } return anz.anz.NewServerCodec(conn, enc, fromstr, tostr) - } + }) anz.rpc = v1.NewAnalyzerSv1(anz.anz) if !anz.cfg.DispatcherSCfg().Enabled { anz.server.RpcRegister(anz.rpc) @@ -132,6 +124,15 @@ func (anz *AnalyzerService) ShouldRun() bool { return anz.cfg.AnalyzerSCfg().Enabled } +// GetAnalyzerS returns the analyzer object func (anz *AnalyzerService) GetAnalyzerS() *analyzers.AnalyzerService { return anz.anz } + +// GetInternalCodec returns the connection wrapped in analyzer connector +func (anz *AnalyzerService) GetInternalCodec(c rpcclient.ClientConnector, to string) rpcclient.ClientConnector { + if !anz.IsRunning() { + return c + } + return anz.anz.NewAnalyzerConnector(c, utils.MetaInternal, utils.EmptyString, to) +} diff --git a/services/apiers_it_test.go b/services/apiers_it_test.go index e21940d1f..c23ce82b4 100644 --- a/services/apiers_it_test.go +++ b/services/apiers_it_test.go @@ -54,15 +54,15 @@ func TestApiersReload(t *testing.T) { db := NewDataDBService(cfg, nil) cfg.StorDbCfg().Type = utils.INTERNAL stordb := NewStorDBService(cfg) - schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) - tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1)) - + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) + tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) apiSv1 := NewAPIerSv1Service(cfg, db, stordb, filterSChan, server, schS, new(ResponderService), - make(chan rpcclient.ClientConnector, 1), nil) + make(chan rpcclient.ClientConnector, 1), nil, anz) - apiSv2 := NewAPIerSv2Service(apiSv1, cfg, server, make(chan rpcclient.ClientConnector, 1)) + apiSv2 := NewAPIerSv2Service(apiSv1, cfg, server, make(chan rpcclient.ClientConnector, 1), anz) srvMngr.AddServices(apiSv1, apiSv2, schS, tS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db, stordb) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/apierv1.go b/services/apierv1.go index 6fcffdf85..412dd1e1f 100644 --- a/services/apierv1.go +++ b/services/apierv1.go @@ -37,7 +37,8 @@ func NewAPIerSv1Service(cfg *config.CGRConfig, dm *DataDBService, schedService *SchedulerService, responderService *ResponderService, internalAPIerSv1Chan chan rpcclient.ClientConnector, - connMgr *engine.ConnManager) *APIerSv1Service { + connMgr *engine.ConnManager, + anz *AnalyzerService) *APIerSv1Service { return &APIerSv1Service{ connChan: internalAPIerSv1Chan, cfg: cfg, @@ -49,6 +50,7 @@ func NewAPIerSv1Service(cfg *config.CGRConfig, dm *DataDBService, responderService: responderService, connMgr: connMgr, APIerSv1Chan: make(chan *v1.APIerSv1, 1), + anz: anz, } } @@ -70,6 +72,7 @@ type APIerSv1Service struct { syncStop chan struct{} APIerSv1Chan chan *v1.APIerSv1 + anz *AnalyzerService } // Start should handle the sercive start @@ -119,7 +122,7 @@ func (apiService *APIerSv1Service) Start() (err error) { } //backwards compatible - apiService.connChan <- intAnzConn(apiService.api, utils.APIerSv1) + apiService.connChan <- apiService.anz.GetInternalCodec(apiService.api, utils.APIerSv1) apiService.APIerSv1Chan <- apiService.api return diff --git a/services/apierv2.go b/services/apierv2.go index f286584bb..12d73e607 100644 --- a/services/apierv2.go +++ b/services/apierv2.go @@ -30,12 +30,14 @@ import ( // NewAPIerSv2Service returns the APIerSv2 Service func NewAPIerSv2Service(apiv1 *APIerSv1Service, cfg *config.CGRConfig, server *utils.Server, - internalAPIerSv2Chan chan rpcclient.ClientConnector) *APIerSv2Service { + internalAPIerSv2Chan chan rpcclient.ClientConnector, + anz *AnalyzerService) *APIerSv2Service { return &APIerSv2Service{ apiv1: apiv1, connChan: internalAPIerSv2Chan, cfg: cfg, server: server, + anz: anz, } } @@ -48,6 +50,7 @@ type APIerSv2Service struct { apiv1 *APIerSv1Service api *v2.APIerSv2 connChan chan rpcclient.ClientConnector + anz *AnalyzerService } // Start should handle the sercive start @@ -73,7 +76,7 @@ func (api *APIerSv2Service) Start() (err error) { api.server.RpcRegisterName(utils.ApierV2, api.api) } - api.connChan <- intAnzConn(api.api, utils.APIerSv2) + api.connChan <- api.anz.GetInternalCodec(api.api, utils.APIerSv2) return } diff --git a/services/attributes.go b/services/attributes.go index b583ba841..589160237 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -33,7 +33,8 @@ import ( // NewAttributeService returns the Attribute Service func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, - server *utils.Server, internalChan chan rpcclient.ClientConnector) servmanager.Service { + server *utils.Server, internalChan chan rpcclient.ClientConnector, + anz *AnalyzerService) servmanager.Service { return &AttributeService{ connChan: internalChan, cfg: cfg, @@ -41,6 +42,7 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, cacheS: cacheS, filterSChan: filterSChan, server: server, + anz: anz, } } @@ -56,6 +58,7 @@ type AttributeService struct { attrS *engine.AttributeService rpc *v1.AttributeSv1 // useful on restart connChan chan rpcclient.ClientConnector // publish the internal Subsystem when available + anz *AnalyzerService } // Start should handle the sercive start @@ -87,7 +90,7 @@ func (attrS *AttributeService) Start() (err error) { if !attrS.cfg.DispatcherSCfg().Enabled { attrS.server.RpcRegister(attrS.rpc) } - attrS.connChan <- intAnzConn(attrS.rpc, utils.AttributeS) + attrS.connChan <- attrS.anz.GetInternalCodec(attrS.rpc, utils.AttributeS) return } diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go index 8a16b9987..0f9b56f73 100644 --- a/services/attributes_it_test.go +++ b/services/attributes_it_test.go @@ -50,12 +50,13 @@ func TestAttributeSReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) attrRPC := make(chan rpcclient.ClientConnector, 1) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) attrS := NewAttributeService(cfg, db, chS, filterSChan, server, attrRPC, - ) + anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(attrS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/cdrs.go b/services/cdrs.go index 7a9e04aee..b37b74cff 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -36,7 +36,8 @@ import ( func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, storDB *StorDBService, filterSChan chan *engine.FilterS, server *utils.Server, internalCDRServerChan chan rpcclient.ClientConnector, - connMgr *engine.ConnManager) servmanager.Service { + connMgr *engine.ConnManager, + anz *AnalyzerService) servmanager.Service { return &CDRServer{ connChan: internalCDRServerChan, cfg: cfg, @@ -45,6 +46,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, server: server, connMgr: connMgr, + anz: anz, } } @@ -64,6 +66,7 @@ type CDRServer struct { connMgr *engine.ConnManager syncStop chan struct{} + anz *AnalyzerService // storDBChan chan engine.StorDB } @@ -107,7 +110,7 @@ func (cdrService *CDRServer) Start() (err error) { // Make the cdr server available for internal communication cdrService.server.RpcRegister(cdrService.cdrS) // register CdrServer for internal usage (TODO: refactor this) } - cdrService.connChan <- intAnzConn(cdrService.cdrS, utils.CDRServer) // Signal that cdrS is operational + cdrService.connChan <- cdrService.anz.GetInternalCodec(cdrService.cdrS, utils.CDRServer) // Signal that cdrS is operational return } diff --git a/services/cdrs_it_test.go b/services/cdrs_it_test.go index 1d1472811..bf4be1a1a 100644 --- a/services/cdrs_it_test.go +++ b/services/cdrs_it_test.go @@ -63,19 +63,19 @@ func TestCdrsReload(t *testing.T) { db := NewDataDBService(cfg, nil) cfg.StorDbCfg().Type = utils.INTERNAL stordb := NewStorDBService(cfg) - chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) - schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) + schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) ralS := NewRalService(cfg, chS, server, make(chan rpcclient.ClientConnector, 1), make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil) + engineShutdown, nil, anz) cdrsRPC := make(chan rpcclient.ClientConnector, 1) cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server, - cdrsRPC, - nil) + cdrsRPC, nil, anz) srvMngr.AddServices(cdrS, ralS, schS, chrS, NewLoaderService(cfg, db, filterSChan, server, engineShutdown, - make(chan rpcclient.ClientConnector, 1), nil), db, stordb) + make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/chargers.go b/services/chargers.go index 09ce1838a..3f2728b9e 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -33,7 +33,8 @@ import ( // NewChargerService returns the Charger Service func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server, - internalChargerSChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager) servmanager.Service { + internalChargerSChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager, + anz *AnalyzerService) servmanager.Service { return &ChargerService{ connChan: internalChargerSChan, cfg: cfg, @@ -42,6 +43,7 @@ func NewChargerService(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, server: server, connMgr: connMgr, + anz: anz, } } @@ -58,6 +60,7 @@ type ChargerService struct { chrS *engine.ChargerService rpc *v1.ChargerSv1 connChan chan rpcclient.ClientConnector + anz *AnalyzerService } // Start should handle the sercive start @@ -88,7 +91,7 @@ func (chrS *ChargerService) Start() (err error) { if !chrS.cfg.DispatcherSCfg().Enabled { chrS.server.RpcRegister(cSv1) } - chrS.connChan <- intAnzConn(cSv1, utils.ChargerS) + chrS.connChan <- chrS.anz.GetInternalCodec(cSv1, utils.ChargerS) return } diff --git a/services/chargers_it_test.go b/services/chargers_it_test.go index 126dd96bd..e3f29a78b 100644 --- a/services/chargers_it_test.go +++ b/services/chargers_it_test.go @@ -51,12 +51,13 @@ func TestChargerSReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1)) - chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) + chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(attrS, chrS, NewLoaderService(cfg, db, filterSChan, server, - engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/datadb_it_test.go b/services/datadb_it_test.go index db43f775f..04f5abf81 100644 --- a/services/datadb_it_test.go +++ b/services/datadb_it_test.go @@ -51,9 +51,10 @@ func TestDataDBReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) cM := engine.NewConnManager(cfg, nil) db := NewDataDBService(cfg, cM) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) srvMngr.AddServices(NewAttributeService(cfg, db, - chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1)), - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz), + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/dispatcherh.go b/services/dispatcherh.go index 587d76f7b..e37a15079 100644 --- a/services/dispatcherh.go +++ b/services/dispatcherh.go @@ -32,13 +32,15 @@ import ( // NewDispatcherHostsService returns the Dispatcher Service func NewDispatcherHostsService(cfg *config.CGRConfig, server *utils.Server, internalChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager, - exitChan chan bool) servmanager.Service { + exitChan chan bool, + anz *AnalyzerService) servmanager.Service { return &DispatcherHostsService{ connChan: internalChan, cfg: cfg, server: server, connMgr: connMgr, exitChan: exitChan, + anz: anz, } } @@ -53,6 +55,7 @@ type DispatcherHostsService struct { dspS *dispatcherh.DispatcherHostsService // rpc *v1.DispatcherHSv1 connChan chan rpcclient.ClientConnector + anz *AnalyzerService } // Start should handle the sercive start @@ -66,7 +69,7 @@ func (dspS *DispatcherHostsService) Start() (err error) { dspS.dspS = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr) go dspS.dspS.ListenAndServe() - dspS.connChan <- intAnzConn(dspS.dspS, utils.DispatcherH) + dspS.connChan <- dspS.anz.GetInternalCodec(dspS.dspS, utils.DispatcherH) return } diff --git a/services/dispatchers.go b/services/dispatchers.go index 0896f1e1e..9f080a4d6 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -35,7 +35,9 @@ import ( // NewDispatcherService returns the Dispatcher Service func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, - server *utils.Server, internalChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager) servmanager.Service { + server *utils.Server, internalChan chan rpcclient.ClientConnector, + connMgr *engine.ConnManager, + anz *AnalyzerService) servmanager.Service { return &DispatcherService{ connChan: internalChan, cfg: cfg, @@ -44,6 +46,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, server: server, connMgr: connMgr, + anz: anz, } } @@ -60,6 +63,7 @@ type DispatcherService struct { dspS *dispatchers.DispatcherService rpc *v1.DispatcherSv1 connChan chan rpcclient.ClientConnector + anz *AnalyzerService } // Start should handle the sercive start @@ -145,7 +149,7 @@ func (dspS *DispatcherService) Start() (err error) { dspS.server.RpcRegisterName(utils.RateSv1, v1.NewDispatcherRateSv1(dspS.dspS)) - dspS.connChan <- intAnzConn(dspS.dspS, utils.DispatcherS) + dspS.connChan <- dspS.anz.GetInternalCodec(dspS.dspS, utils.DispatcherS) return } diff --git a/services/dispatchers_it_test.go b/services/dispatchers_it_test.go index 063db3ac6..2903b1132 100644 --- a/services/dispatchers_it_test.go +++ b/services/dispatchers_it_test.go @@ -52,13 +52,14 @@ func TestDispatcherSReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) srv := NewDispatcherService(cfg, db, chS, filterSChan, server, - make(chan rpcclient.ClientConnector, 1), nil) + make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(attrS, srv, NewLoaderService(cfg, db, filterSChan, server, - engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index 15449bbf8..8d2ddb4c1 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -50,12 +50,13 @@ func TestDNSAgentReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil) + engineShutdown, nil, anz) srv := NewDNSAgent(cfg, filterSChan, engineShutdown, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Fatal(err) } diff --git a/services/ees.go b/services/ees.go index cffc10ff6..04d7247cf 100644 --- a/services/ees.go +++ b/services/ees.go @@ -34,7 +34,8 @@ import ( // NewEventExporterService constructs EventExporterService func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, server *utils.Server, exitChan chan bool, - intConnChan chan rpcclient.ClientConnector) servmanager.Service { + intConnChan chan rpcclient.ClientConnector, + anz *AnalyzerService) servmanager.Service { return &EventExporterService{ cfg: cfg, filterSChan: filterSChan, @@ -43,6 +44,7 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil exitChan: exitChan, intConnChan: intConnChan, rldChan: make(chan struct{}), + anz: anz, } } @@ -60,6 +62,7 @@ type EventExporterService struct { eeS *ees.EventExporterS rpc *v1.EventExporterSv1 + anz *AnalyzerService } // ServiceName returns the service name @@ -129,6 +132,6 @@ func (es *EventExporterService) Start() (err error) { if !es.cfg.DispatcherSCfg().Enabled { es.server.RpcRegister(es.rpc) } - es.intConnChan <- intAnzConn(es.eeS, utils.EventExporterS) + es.intConnChan <- es.anz.GetInternalCodec(es.eeS, utils.EventExporterS) return } diff --git a/services/ees_it_test.go b/services/ees_it_test.go index db7346aa8..b59a4d02c 100644 --- a/services/ees_it_test.go +++ b/services/ees_it_test.go @@ -59,12 +59,13 @@ func TestEventExporterSReload(t *testing.T) { chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), - ) - ees := NewEventExporterService(cfg, filterSChan, engine.NewConnManager(cfg, nil), server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz) + ees := NewEventExporterService(cfg, filterSChan, engine.NewConnManager(cfg, nil), server, engineShutdown, make(chan rpcclient.ClientConnector, 1), anz) srvMngr.AddServices(ees, attrS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/ers_it_test.go b/services/ers_it_test.go index 62cbaed89..2a96b012b 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -54,12 +54,13 @@ func TestEventReaderSReload(t *testing.T) { engineShutdown := make(chan bool, 1) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) db := NewDataDBService(cfg, nil) - sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil) + sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, anz) attrS := NewEventReaderService(cfg, filterSChan, engineShutdown, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(attrS, sS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/loaders.go b/services/loaders.go index 319747096..b42959860 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -33,7 +33,8 @@ import ( func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, filterSChan chan *engine.FilterS, server *utils.Server, exitChan chan bool, internalLoaderSChan chan rpcclient.ClientConnector, - connMgr *engine.ConnManager) *LoaderService { + connMgr *engine.ConnManager, + anz *AnalyzerService) *LoaderService { return &LoaderService{ connChan: internalLoaderSChan, cfg: cfg, @@ -42,6 +43,7 @@ func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService, server: server, exitChan: exitChan, connMgr: connMgr, + anz: anz, } } @@ -58,6 +60,7 @@ type LoaderService struct { rpc *v1.LoaderSv1 connChan chan rpcclient.ClientConnector connMgr *engine.ConnManager + anz *AnalyzerService } // Start should handle the sercive start @@ -85,7 +88,7 @@ func (ldrs *LoaderService) Start() (err error) { if !ldrs.cfg.DispatcherSCfg().Enabled { ldrs.server.RpcRegister(ldrs.rpc) } - ldrs.connChan <- intAnzConn(ldrs.rpc, utils.LoaderS) + ldrs.connChan <- ldrs.anz.GetInternalCodec(ldrs.rpc, utils.LoaderS) return } diff --git a/services/rals.go b/services/rals.go index d6cfcc5b6..19600e82a 100644 --- a/services/rals.go +++ b/services/rals.go @@ -32,8 +32,9 @@ import ( // NewRalService returns the Ral Service func NewRalService(cfg *config.CGRConfig, cacheS *engine.CacheS, server *utils.Server, internalRALsChan, internalResponderChan chan rpcclient.ClientConnector, exitChan chan bool, - connMgr *engine.ConnManager) *RalService { - resp := NewResponderService(cfg, server, internalResponderChan, exitChan) + connMgr *engine.ConnManager, + anz *AnalyzerService) *RalService { + resp := NewResponderService(cfg, server, internalResponderChan, exitChan, anz) return &RalService{ connChan: internalRALsChan, @@ -42,6 +43,7 @@ func NewRalService(cfg *config.CGRConfig, cacheS *engine.CacheS, server *utils.S server: server, responder: resp, connMgr: connMgr, + anz: anz, } } @@ -55,6 +57,7 @@ type RalService struct { responder *ResponderService connChan chan rpcclient.ClientConnector connMgr *engine.ConnManager + anz *AnalyzerService } // Start should handle the sercive start @@ -91,7 +94,7 @@ func (rals *RalService) Start() (err error) { rals.server.RpcRegister(rals.rals) } - rals.connChan <- intAnzConn(rals.rals, utils.RALService) + rals.connChan <- rals.anz.GetInternalCodec(rals.rals, utils.RALService) return } diff --git a/services/rals_it_test.go b/services/rals_it_test.go index 57ea22777..9e5da2347 100644 --- a/services/rals_it_test.go +++ b/services/rals_it_test.go @@ -62,15 +62,16 @@ func TestRalsReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) cfg.StorDbCfg().Type = utils.INTERNAL + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) stordb := NewStorDBService(cfg) - schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) - tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1)) + schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) + tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) ralS := NewRalService(cfg, chS, server, make(chan rpcclient.ClientConnector, 1), make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil) + engineShutdown, nil, anz) srvMngr.AddServices(ralS, schS, tS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db, stordb) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/rates.go b/services/rates.go index 91ce26224..2e62aeed6 100644 --- a/services/rates.go +++ b/services/rates.go @@ -40,7 +40,8 @@ func NewRateService( filterSChan chan *engine.FilterS, dmS *DataDBService, server *utils.Server, exitChan chan bool, - intConnChan chan rpcclient.ClientConnector) servmanager.Service { + intConnChan chan rpcclient.ClientConnector, + anz *AnalyzerService) servmanager.Service { return &RateService{ cfg: cfg, cacheS: cacheS, @@ -50,6 +51,7 @@ func NewRateService( exitChan: exitChan, intConnChan: intConnChan, rldChan: make(chan struct{}), + anz: anz, } } @@ -69,6 +71,7 @@ type RateService struct { rateS *rates.RateS rpc *v1.RateSv1 intConnChan chan rpcclient.ClientConnector + anz *AnalyzerService } // ServiceName returns the service name @@ -138,6 +141,6 @@ func (rs *RateService) Start() (err error) { rs.server.RpcRegister(rs.rpc) } - rs.intConnChan <- intAnzConn(rs.rpc, utils.RateS) + rs.intConnChan <- rs.anz.GetInternalCodec(rs.rpc, utils.RateS) return } diff --git a/services/rates_it_test.go b/services/rates_it_test.go index 4f93818d4..cf671fae1 100644 --- a/services/rates_it_test.go +++ b/services/rates_it_test.go @@ -49,9 +49,10 @@ func TestRateSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheRateProfiles)) close(chS.GetPrecacheChannel(utils.CacheRateProfilesFilterIndexes)) close(chS.GetPrecacheChannel(utils.CacheRateFilterIndexes)) - rS := NewRateService(cfg, chS, filterSChan, db, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + rS := NewRateService(cfg, chS, filterSChan, db, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), anz) srvMngr.AddServices(rS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/resources.go b/services/resources.go index 0f8c86012..8222f1a45 100644 --- a/services/resources.go +++ b/services/resources.go @@ -34,7 +34,8 @@ import ( func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server, internalResourceSChan chan rpcclient.ClientConnector, - connMgr *engine.ConnManager) servmanager.Service { + connMgr *engine.ConnManager, + anz *AnalyzerService) servmanager.Service { return &ResourceService{ connChan: internalResourceSChan, cfg: cfg, @@ -43,6 +44,7 @@ func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, server: server, connMgr: connMgr, + anz: anz, } } @@ -59,6 +61,7 @@ type ResourceService struct { rpc *v1.ResourceSv1 connChan chan rpcclient.ClientConnector connMgr *engine.ConnManager + anz *AnalyzerService } // Start should handle the sercive start @@ -90,7 +93,7 @@ func (reS *ResourceService) Start() (err error) { if !reS.cfg.DispatcherSCfg().Enabled { reS.server.RpcRegister(reS.rpc) } - reS.connChan <- intAnzConn(reS.rpc, utils.ResourceS) + reS.connChan <- reS.anz.GetInternalCodec(reS.rpc, utils.ResourceS) return } diff --git a/services/resources_it_test.go b/services/resources_it_test.go index 3aae0b215..de958f82f 100644 --- a/services/resources_it_test.go +++ b/services/resources_it_test.go @@ -53,12 +53,13 @@ func TestResourceSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheResourceFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) db := NewDataDBService(cfg, nil) - tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1)) - reS := NewResourceService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) + tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) + reS := NewResourceService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(tS, reS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/responders.go b/services/responders.go index ccebad82e..6f952b293 100644 --- a/services/responders.go +++ b/services/responders.go @@ -30,12 +30,13 @@ import ( // NewResponderService returns the Resonder Service func NewResponderService(cfg *config.CGRConfig, server *utils.Server, internalRALsChan chan rpcclient.ClientConnector, - exitChan chan bool) *ResponderService { + exitChan chan bool, anz *AnalyzerService) *ResponderService { return &ResponderService{ connChan: internalRALsChan, cfg: cfg, server: server, exitChan: exitChan, + anz: anz, } } @@ -48,6 +49,7 @@ type ResponderService struct { resp *engine.Responder connChan chan rpcclient.ClientConnector + anz *AnalyzerService } // Start should handle the sercive start @@ -68,7 +70,7 @@ func (resp *ResponderService) Start() (err error) { resp.server.RpcRegister(resp.resp) } - resp.connChan <- intAnzConn(resp.resp, utils.ResponderS) // Rater done + resp.connChan <- resp.anz.GetInternalCodec(resp.resp, utils.ResponderS) // Rater done return } diff --git a/services/routes.go b/services/routes.go index 56235a074..54fd98941 100644 --- a/services/routes.go +++ b/services/routes.go @@ -34,7 +34,8 @@ import ( func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server, internalRouteSChan chan rpcclient.ClientConnector, - connMgr *engine.ConnManager) servmanager.Service { + connMgr *engine.ConnManager, + anz *AnalyzerService) servmanager.Service { return &RouteService{ connChan: internalRouteSChan, cfg: cfg, @@ -43,6 +44,7 @@ func NewRouteService(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, server: server, connMgr: connMgr, + anz: anz, } } @@ -59,6 +61,7 @@ type RouteService struct { routeS *engine.RouteService rpc *v1.RouteSv1 connChan chan rpcclient.ClientConnector + anz *AnalyzerService } // Start should handle the sercive start @@ -91,7 +94,7 @@ func (routeS *RouteService) Start() (err error) { if !routeS.cfg.DispatcherSCfg().Enabled { routeS.server.RpcRegister(routeS.rpc) } - routeS.connChan <- intAnzConn(routeS.rpc, utils.RouteS) + routeS.connChan <- routeS.anz.GetInternalCodec(routeS.rpc, utils.RouteS) return } diff --git a/services/routes_it_test.go b/services/routes_it_test.go index 336322729..ae0ecf440 100644 --- a/services/routes_it_test.go +++ b/services/routes_it_test.go @@ -52,11 +52,12 @@ func TestSupplierSReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - sts := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) - supS := NewRouteService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + sts := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) + supS := NewRouteService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(supS, sts, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/schedulers.go b/services/schedulers.go index 706a731d1..34c188f82 100644 --- a/services/schedulers.go +++ b/services/schedulers.go @@ -33,7 +33,8 @@ import ( func NewSchedulerService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, fltrSChan chan *engine.FilterS, server *utils.Server, internalSchedulerrSChan chan rpcclient.ClientConnector, - connMgr *engine.ConnManager) *SchedulerService { + connMgr *engine.ConnManager, + anz *AnalyzerService) *SchedulerService { return &SchedulerService{ connChan: internalSchedulerrSChan, cfg: cfg, @@ -42,6 +43,7 @@ func NewSchedulerService(cfg *config.CGRConfig, dm *DataDBService, fltrSChan: fltrSChan, server: server, connMgr: connMgr, + anz: anz, } } @@ -58,6 +60,7 @@ type SchedulerService struct { rpc *v1.SchedulerSv1 connChan chan rpcclient.ClientConnector connMgr *engine.ConnManager + anz *AnalyzerService } // Start should handle the sercive start @@ -84,7 +87,7 @@ func (schS *SchedulerService) Start() (err error) { if !schS.cfg.DispatcherSCfg().Enabled { schS.server.RpcRegister(schS.rpc) } - schS.connChan <- intAnzConn(schS.rpc, utils.SchedulerS) + schS.connChan <- schS.anz.GetInternalCodec(schS.rpc, utils.SchedulerS) return } diff --git a/services/schedulers_it_test.go b/services/schedulers_it_test.go index 4ebead80c..2b3295ff7 100644 --- a/services/schedulers_it_test.go +++ b/services/schedulers_it_test.go @@ -46,10 +46,11 @@ func TestSchedulerSReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(schS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/sessions.go b/services/sessions.go index 7465510ee..22bf13c6f 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -35,7 +35,8 @@ import ( // NewSessionService returns the Session Service func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, server *utils.Server, internalChan chan rpcclient.ClientConnector, - exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service { + exitChan chan bool, connMgr *engine.ConnManager, + anz *AnalyzerService) servmanager.Service { return &SessionService{ connChan: internalChan, cfg: cfg, @@ -43,6 +44,7 @@ func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, server: server, exitChan: exitChan, connMgr: connMgr, + anz: anz, } } @@ -62,6 +64,7 @@ type SessionService struct { // in order to stop the bircp server if necesary bircpEnabled bool connMgr *engine.ConnManager + anz *AnalyzerService } // Start should handle the sercive start @@ -82,7 +85,7 @@ func (smg *SessionService) Start() (err error) { //start sync session in a separate gorutine go smg.sm.ListenAndServe(smg.exitChan) // Pass internal connection via BiRPCClient - smg.connChan <- intAnzConn(smg.sm, utils.SessionS) + smg.connChan <- smg.anz.GetInternalCodec(smg.sm, utils.SessionS) // Register RPC handler smg.rpc = v1.NewSMGenericV1(smg.sm) diff --git a/services/sessions_it_test.go b/services/sessions_it_test.go index bbff80a0f..e4dfd4764 100644 --- a/services/sessions_it_test.go +++ b/services/sessions_it_test.go @@ -69,19 +69,20 @@ func TestSessionSReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) cfg.StorDbCfg().Type = utils.INTERNAL + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) stordb := NewStorDBService(cfg) - chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) - schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) + chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) + schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) ralS := NewRalService(cfg, chS, server, make(chan rpcclient.ClientConnector, 1), make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil) + engineShutdown, nil, anz) cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server, make(chan rpcclient.ClientConnector, 1), - nil) - srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil) + nil, anz) + srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, chrS, schS, ralS, cdrS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db, stordb) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db, stordb) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/sipagent_it_test.go b/services/sipagent_it_test.go index 016014276..d3218e201 100644 --- a/services/sipagent_it_test.go +++ b/services/sipagent_it_test.go @@ -50,12 +50,13 @@ func TestSIPAgentReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), - engineShutdown, nil) + engineShutdown, nil, anz) srv := NewSIPAgent(cfg, filterSChan, engineShutdown, nil) engine.NewConnManager(cfg, nil) srvMngr.AddServices(srv, sS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Fatal(err) } diff --git a/services/stats.go b/services/stats.go index 31790bcc8..2e8c54f97 100644 --- a/services/stats.go +++ b/services/stats.go @@ -33,7 +33,9 @@ import ( // NewStatService returns the Stat Service func NewStatService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, - server *utils.Server, internalStatSChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager) servmanager.Service { + server *utils.Server, internalStatSChan chan rpcclient.ClientConnector, + connMgr *engine.ConnManager, + anz *AnalyzerService) servmanager.Service { return &StatService{ connChan: internalStatSChan, cfg: cfg, @@ -42,6 +44,7 @@ func NewStatService(cfg *config.CGRConfig, dm *DataDBService, filterSChan: filterSChan, server: server, connMgr: connMgr, + anz: anz, } } @@ -58,6 +61,7 @@ type StatService struct { sts *engine.StatService rpc *v1.StatSv1 connChan chan rpcclient.ClientConnector + anz *AnalyzerService } // Start should handle the sercive start @@ -92,7 +96,7 @@ func (sts *StatService) Start() (err error) { if !sts.cfg.DispatcherSCfg().Enabled { sts.server.RpcRegister(sts.rpc) } - sts.connChan <- intAnzConn(sts.rpc, utils.StatS) + sts.connChan <- sts.anz.GetInternalCodec(sts.rpc, utils.StatS) return } diff --git a/services/stats_it_test.go b/services/stats_it_test.go index 7919d6e4f..f97957046 100644 --- a/services/stats_it_test.go +++ b/services/stats_it_test.go @@ -53,12 +53,13 @@ func TestStatSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) db := NewDataDBService(cfg, nil) - tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1)) - sS := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) + tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) + sS := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(tS, sS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/thresholds.go b/services/thresholds.go index cebb0bc1b..548560f99 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -33,7 +33,8 @@ import ( // NewThresholdService returns the Threshold Service func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, - server *utils.Server, internalThresholdSChan chan rpcclient.ClientConnector) servmanager.Service { + server *utils.Server, internalThresholdSChan chan rpcclient.ClientConnector, + anz *AnalyzerService) servmanager.Service { return &ThresholdService{ connChan: internalThresholdSChan, cfg: cfg, @@ -41,6 +42,7 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, cacheS: cacheS, filterSChan: filterSChan, server: server, + anz: anz, } } @@ -56,6 +58,7 @@ type ThresholdService struct { thrs *engine.ThresholdService rpc *v1.ThresholdSv1 connChan chan rpcclient.ClientConnector + anz *AnalyzerService } // Start should handle the sercive start @@ -87,7 +90,7 @@ func (thrs *ThresholdService) Start() (err error) { if !thrs.cfg.DispatcherSCfg().Enabled { thrs.server.RpcRegister(thrs.rpc) } - thrs.connChan <- intAnzConn(thrs.rpc, utils.ThresholdS) + thrs.connChan <- thrs.anz.GetInternalCodec(thrs.rpc, utils.ThresholdS) return } diff --git a/services/thresholds_it_test.go b/services/thresholds_it_test.go index 330f0e72e..07fdd325d 100644 --- a/services/thresholds_it_test.go +++ b/services/thresholds_it_test.go @@ -48,11 +48,12 @@ func TestThresholdSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) db := NewDataDBService(cfg, nil) - tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1)) + tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(tS, - NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil, anz), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/utils/concureqs.go b/utils/concureqs.go index 168439c67..2a53372af 100644 --- a/utils/concureqs.go +++ b/utils/concureqs.go @@ -25,9 +25,6 @@ import ( ) var ConReqs *ConcReqs -var AnalizerWraperFunc = func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec { - return conn -} type ConcReqs struct { limit int @@ -80,12 +77,12 @@ type conn interface { RemoteAddr() net.Addr } -func newConcReqsGOBCodec(conn conn) rpc.ServerCodec { - return AnalizerWraperFunc(newConcReqsServerCodec(newGobServerCodec(conn)), MetaGOB, conn.RemoteAddr(), conn.LocalAddr()) +func newConcReqsGOBCodec(conn conn, anz anzWrapFunc) rpc.ServerCodec { + return anz(newConcReqsServerCodec(newGobServerCodec(conn)), MetaGOB, conn.RemoteAddr(), conn.LocalAddr()) } -func newConcReqsJSONCodec(conn conn) rpc.ServerCodec { - return AnalizerWraperFunc(newConcReqsServerCodec(jsonrpc.NewServerCodec(conn)), MetaJSON, conn.RemoteAddr(), conn.LocalAddr()) +func newConcReqsJSONCodec(conn conn, anz anzWrapFunc) rpc.ServerCodec { + return anz(newConcReqsServerCodec(jsonrpc.NewServerCodec(conn)), MetaJSON, conn.RemoteAddr(), conn.LocalAddr()) } func newConcReqsServerCodec(sc rpc.ServerCodec) rpc.ServerCodec { diff --git a/utils/consts.go b/utils/consts.go index 17a03c289..fdfa0c90c 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1600,9 +1600,9 @@ const ( // AnalyzerS APIs const ( - AnalyzerSv1 = "AnalyzerSv1" - AnalyzerSv1Ping = "AnalyzerSv1.Ping" - AnalyzerSv1Search = "AnalyzerSv1.Search" + AnalyzerSv1 = "AnalyzerSv1" + AnalyzerSv1Ping = "AnalyzerSv1.Ping" + AnalyzerSv1StringQuery = "AnalyzerSv1.StringQuery" ) // LoaderS APIs diff --git a/utils/server.go b/utils/server.go index ee3b36d23..c07267352 100644 --- a/utils/server.go +++ b/utils/server.go @@ -50,12 +50,18 @@ func init() { gob.Register(time.Time{}) gob.Register(url.Values{}) } + +type anzWrapFunc func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec + func NewServer() (s *Server) { - s = new(Server) - s.httpMux = http.NewServeMux() - s.httpsMux = http.NewServeMux() - s.stopbiRPCServer = make(chan struct{}, 1) - return s + return &Server{ + httpMux: http.NewServeMux(), + httpsMux: http.NewServeMux(), + stopbiRPCServer: make(chan struct{}, 1), + anzWrapper: func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec { + return conn + }, + } } type Server struct { @@ -66,6 +72,11 @@ type Server struct { stopbiRPCServer chan struct{} // used in order to fully stop the biRPC httpsMux *http.ServeMux httpMux *http.ServeMux + anzWrapper anzWrapFunc +} + +func (s *Server) SetAnzWrapperFunc(anz anzWrapFunc) { + s.anzWrapper = anz } func (s *Server) RpcRegister(rcvr interface{}) { @@ -171,7 +182,7 @@ func (s *Server) ServeJSON(addr string, exitChan chan bool) { } continue } - go rpc.ServeCodec(newConcReqsJSONCodec(conn)) + go rpc.ServeCodec(newConcReqsJSONCodec(conn, s.anzWrapper)) } } @@ -207,16 +218,16 @@ func (s *Server) ServeGOB(addr string, exitChan chan bool) { } continue } - go rpc.ServeCodec(newConcReqsGOBCodec(conn)) + go rpc.ServeCodec(newConcReqsGOBCodec(conn, s.anzWrapper)) } } -func handleRequest(w http.ResponseWriter, r *http.Request) { +func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() w.Header().Set("Content-Type", "application/json") rmtIP, _ := GetRemoteIP(r) rmtAddr, _ := net.ResolveIPAddr(EmptyString, rmtIP) - res := newRPCRequest(r.Body, rmtAddr).Call() + res := newRPCRequest(r.Body, rmtAddr, s.anzWrapper).Call() io.Copy(w, res) } @@ -258,9 +269,9 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, Logger.Info(" enabling handler for JSON-RPC") if useBasicAuth { - s.httpMux.HandleFunc(jsonRPCURL, use(handleRequest, basicAuth(userList))) + s.httpMux.HandleFunc(jsonRPCURL, use(s.handleRequest, basicAuth(userList))) } else { - s.httpMux.HandleFunc(jsonRPCURL, handleRequest) + s.httpMux.HandleFunc(jsonRPCURL, s.handleRequest) } } if enabled && wsRPCURL != "" { @@ -269,7 +280,7 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, s.Unlock() Logger.Info(" enabling handler for WebSocket connections") wsHandler := websocket.Handler(func(ws *websocket.Conn) { - rpc.ServeCodec(newConcReqsJSONCodec(ws)) + rpc.ServeCodec(newConcReqsJSONCodec(ws, s.anzWrapper)) }) if useBasicAuth { s.httpMux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) { @@ -340,10 +351,11 @@ type rpcRequest struct { rw io.ReadWriter // holds the JSON formated RPC response done chan bool // signals then end of the RPC request remoteAddr net.Addr + anzWarpper anzWrapFunc } // newRPCRequest returns a new rpcRequest. -func newRPCRequest(r io.Reader, remoteAddr net.Addr) *rpcRequest { +func newRPCRequest(r io.Reader, remoteAddr net.Addr, anz anzWrapFunc) *rpcRequest { var buf bytes.Buffer done := make(chan bool) return &rpcRequest{ @@ -351,6 +363,7 @@ func newRPCRequest(r io.Reader, remoteAddr net.Addr) *rpcRequest { rw: &buf, done: done, remoteAddr: remoteAddr, + anzWarpper: anz, } } @@ -378,7 +391,7 @@ func (r *rpcRequest) Close() error { // Call invokes the RPC request, waits for it to complete, and returns the results. func (r *rpcRequest) Call() io.Reader { - go rpc.ServeCodec(newConcReqsJSONCodec(r)) + go rpc.ServeCodec(newConcReqsJSONCodec(r, r.anzWarpper)) <-r.done return r.rw } @@ -460,7 +473,7 @@ func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, } continue } - go rpc.ServeCodec(newConcReqsGOBCodec(conn)) + go rpc.ServeCodec(newConcReqsGOBCodec(conn, s.anzWrapper)) } } @@ -501,7 +514,7 @@ func (s *Server) ServeJSONTLS(addr, serverCrt, serverKey, caCert string, } continue } - go rpc.ServeCodec(newConcReqsJSONCodec(conn)) + go rpc.ServeCodec(newConcReqsJSONCodec(conn, s.anzWrapper)) } } @@ -521,9 +534,9 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP s.Unlock() Logger.Info(" enabling handler for JSON-RPC") if useBasicAuth { - s.httpsMux.HandleFunc(jsonRPCURL, use(handleRequest, basicAuth(userList))) + s.httpsMux.HandleFunc(jsonRPCURL, use(s.handleRequest, basicAuth(userList))) } else { - s.httpsMux.HandleFunc(jsonRPCURL, handleRequest) + s.httpsMux.HandleFunc(jsonRPCURL, s.handleRequest) } } if enabled && wsRPCURL != "" { @@ -532,7 +545,7 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP s.Unlock() Logger.Info(" enabling handler for WebSocket connections") wsHandler := websocket.Handler(func(ws *websocket.Conn) { - rpc.ServeCodec(newConcReqsJSONCodec(ws)) + rpc.ServeCodec(newConcReqsJSONCodec(ws, s.anzWrapper)) }) if useBasicAuth { s.httpsMux.HandleFunc(wsRPCURL, use(func(w http.ResponseWriter, r *http.Request) {