From a4ebbfe67b0a0ac0a601d25bb50a7c0846f2e4f3 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Mon, 15 Nov 2021 16:13:36 +0200 Subject: [PATCH] Updated services --- config/configsanity_test.go | 19 ------- dispatchers/accounts.go | 10 ++-- dispatchers/accounts_test.go | 20 +++---- engine/libengine.go | 105 +++++++++++++++++++++++++++++++++++ services/accounts.go | 8 ++- services/actions.go | 8 ++- services/adminsv1.go | 9 ++- services/analyzers.go | 8 ++- services/attributes.go | 7 ++- services/caches.go | 10 ++-- services/cdrs.go | 8 ++- services/chargers.go | 8 ++- services/cores.go | 8 ++- services/dispatchers.go | 70 ++--------------------- services/ees.go | 10 ++-- services/libcgr-engine.go | 10 ++-- services/loaders.go | 8 ++- services/rates.go | 8 ++- services/resources.go | 8 ++- services/routes.go | 8 ++- services/sessions.go | 8 ++- services/stats.go | 8 ++- services/thresholds.go | 8 ++- utils/consts.go | 1 + 24 files changed, 219 insertions(+), 156 deletions(-) diff --git a/config/configsanity_test.go b/config/configsanity_test.go index 22093f918..cb4e63546 100644 --- a/config/configsanity_test.go +++ b/config/configsanity_test.go @@ -143,25 +143,6 @@ func TestConfigSanityLoaders(t *testing.T) { t.Errorf("Expecting: %+q received: %+q", expected, err) } - cfg.loaderCfg = LoaderSCfgs{ - &LoaderSCfg{ - Enabled: true, - TpInDir: "/", - TpOutDir: "/", - Data: []*LoaderDataType{{ - Type: utils.MetaStats, - Fields: []*FCTemplate{{ - Type: utils.MetaStats, - Tag: "test1", - }}, - }}, - }, - } - expected = " invalid field type *stats for *stats at test1" - if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { - t.Errorf("Expecting: %+q received: %+q", expected, err) - } - cfg.loaderCfg = LoaderSCfgs{ &LoaderSCfg{ Enabled: true, diff --git a/dispatchers/accounts.go b/dispatchers/accounts.go index 8b0825cc3..d69f04281 100644 --- a/dispatchers/accounts.go +++ b/dispatchers/accounts.go @@ -37,7 +37,7 @@ func (dS *DispatcherService) AccountSv1Ping(args *utils.CGREvent, rpl *string) ( return dS.Dispatch(context.TODO(), args, utils.MetaAccounts, utils.AccountSv1Ping, args, rpl) } -func (dS *DispatcherService) AccountsForEvent(args *utils.CGREvent, reply *[]*utils.Account) (err error) { +func (dS *DispatcherService) AccountSv1AccountsForEvent(args *utils.CGREvent, reply *[]*utils.Account) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant if args != nil && args.Tenant != utils.EmptyString { tnt = args.Tenant @@ -51,7 +51,7 @@ func (dS *DispatcherService) AccountsForEvent(args *utils.CGREvent, reply *[]*ut return dS.Dispatch(context.TODO(), args, utils.MetaAccounts, utils.AccountSv1AccountsForEvent, args, reply) } -func (dS *DispatcherService) MaxAbstracts(args *utils.CGREvent, reply *utils.EventCharges) (err error) { +func (dS *DispatcherService) AccountSv1MaxAbstracts(args *utils.CGREvent, reply *utils.EventCharges) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant if args != nil && args.Tenant != utils.EmptyString { tnt = args.Tenant @@ -65,7 +65,7 @@ func (dS *DispatcherService) MaxAbstracts(args *utils.CGREvent, reply *utils.Eve return dS.Dispatch(context.TODO(), args, utils.MetaAccounts, utils.AccountSv1MaxAbstracts, args, reply) } -func (dS *DispatcherService) DebitAbstracts(args *utils.CGREvent, reply *utils.EventCharges) (err error) { +func (dS *DispatcherService) AccountSv1DebitAbstracts(args *utils.CGREvent, reply *utils.EventCharges) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant if args != nil && args.Tenant != utils.EmptyString { tnt = args.Tenant @@ -79,7 +79,7 @@ func (dS *DispatcherService) DebitAbstracts(args *utils.CGREvent, reply *utils.E return dS.Dispatch(context.TODO(), args, utils.MetaAccounts, utils.AccountSv1DebitAbstracts, args, reply) } -func (dS *DispatcherService) MaxConcretes(args *utils.CGREvent, reply *utils.EventCharges) (err error) { +func (dS *DispatcherService) AccountSv1MaxConcretes(args *utils.CGREvent, reply *utils.EventCharges) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant if args != nil && args.Tenant != utils.EmptyString { tnt = args.Tenant @@ -93,7 +93,7 @@ func (dS *DispatcherService) MaxConcretes(args *utils.CGREvent, reply *utils.Eve return dS.Dispatch(context.TODO(), args, utils.MetaAccounts, utils.AccountSv1MaxConcretes, args, reply) } -func (dS *DispatcherService) DebitConcretes(args *utils.CGREvent, reply *utils.EventCharges) (err error) { +func (dS *DispatcherService) AccountSv1DebitConcretes(args *utils.CGREvent, reply *utils.EventCharges) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant if args != nil && args.Tenant != utils.EmptyString { tnt = args.Tenant diff --git a/dispatchers/accounts_test.go b/dispatchers/accounts_test.go index f51052828..401b6d7da 100644 --- a/dispatchers/accounts_test.go +++ b/dispatchers/accounts_test.go @@ -72,7 +72,7 @@ func TestDspAAccountsForEventNil(t *testing.T) { Tenant: "tenant", } var reply *[]*utils.Account - result := dspSrv.AccountsForEvent(CGREvent, reply) + result := dspSrv.AccountSv1AccountsForEvent(CGREvent, reply) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if result == nil || result.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) @@ -87,7 +87,7 @@ func TestDspAccountsForEventErrorNil(t *testing.T) { Tenant: "tenant", } var reply *[]*utils.Account - result := dspSrv.AccountsForEvent(CGREvent, reply) + result := dspSrv.AccountSv1AccountsForEvent(CGREvent, reply) expected := "MANDATORY_IE_MISSING: [ApiKey]" if result == nil || result.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) @@ -101,7 +101,7 @@ func TestDspMaxAbstractsNil(t *testing.T) { Tenant: "tenant", } var reply *utils.EventCharges - result := dspSrv.MaxAbstracts(CGREvent, reply) + result := dspSrv.AccountSv1MaxAbstracts(CGREvent, reply) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if result == nil || result.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) @@ -116,7 +116,7 @@ func TestDspMaxAbstractsErrorNil(t *testing.T) { Tenant: "tenant", } var reply *utils.EventCharges - result := dspSrv.MaxAbstracts(CGREvent, reply) + result := dspSrv.AccountSv1MaxAbstracts(CGREvent, reply) expected := "MANDATORY_IE_MISSING: [ApiKey]" if result == nil || result.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) @@ -130,7 +130,7 @@ func TestDspDebitAbstractsNil(t *testing.T) { Tenant: "tenant", } var reply *utils.EventCharges - result := dspSrv.DebitAbstracts(CGREvent, reply) + result := dspSrv.AccountSv1DebitAbstracts(CGREvent, reply) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if result == nil || result.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) @@ -145,7 +145,7 @@ func TestDspDebitAbstractsErrorNil(t *testing.T) { Tenant: "tenant", } var reply *utils.EventCharges - result := dspSrv.DebitAbstracts(CGREvent, reply) + result := dspSrv.AccountSv1DebitAbstracts(CGREvent, reply) expected := "MANDATORY_IE_MISSING: [ApiKey]" if result == nil || result.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) @@ -159,7 +159,7 @@ func TestDspMaxConcretesNil(t *testing.T) { Tenant: "tenant", } var reply *utils.EventCharges - result := dspSrv.MaxConcretes(CGREvent, reply) + result := dspSrv.AccountSv1MaxConcretes(CGREvent, reply) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if result == nil || result.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) @@ -174,7 +174,7 @@ func TestDspMaxConcretesErrorNil(t *testing.T) { Tenant: "tenant", } var reply *utils.EventCharges - result := dspSrv.MaxConcretes(CGREvent, reply) + result := dspSrv.AccountSv1MaxConcretes(CGREvent, reply) expected := "MANDATORY_IE_MISSING: [ApiKey]" if result == nil || result.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) @@ -188,7 +188,7 @@ func TestDspDebitConcretesNil(t *testing.T) { Tenant: "tenant", } var reply *utils.EventCharges - result := dspSrv.DebitConcretes(CGREvent, reply) + result := dspSrv.AccountSv1DebitConcretes(CGREvent, reply) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if result == nil || result.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) @@ -203,7 +203,7 @@ func TestDspDebitConcretesErrorNil(t *testing.T) { Tenant: "tenant", } var reply *utils.EventCharges - result := dspSrv.DebitConcretes(CGREvent, reply) + result := dspSrv.AccountSv1DebitConcretes(CGREvent, reply) expected := "MANDATORY_IE_MISSING: [ApiKey]" if result == nil || result.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) diff --git a/engine/libengine.go b/engine/libengine.go index 878f8d827..32b236742 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -125,3 +125,108 @@ func (s RPCClientSet) Call(ctx *context.Context, method string, args interface{} } return conn.Call(ctx, method, args, reply) } + +func NewService(val interface{}) (_ IntService, err error) { + var srv *birpc.Service + if srv, err = birpc.NewService(val, utils.EmptyString, false); err != nil { + return + } + s := IntService{srv.Name: srv} + for m, v := range srv.Methods { + if len(m) < 2 || m[0] != 'V' { + continue + } + key := srv.Name + "v" + string(m[1]) + srv2, has := s[key] + if !has { + srv2 = new(birpc.Service) + *srv2 = *srv + srv2.Methods = make(map[string]*birpc.MethodType) + s[key] = srv2 + } + srv2.Methods[m[2:]] = v + + } + return s, nil +} + +func NewDispatcherService(val interface{}) (_ IntService, err error) { + var srv *birpc.Service + if srv, err = birpc.NewService(val, utils.EmptyString, false); err != nil { + return + } + s := IntService{srv.Name: srv} + for m, v := range srv.Methods { + key := srv.Name + switch { + case strings.HasPrefix(m, utils.AccountS): + m = strings.TrimRight(m, utils.AccountS) + key = utils.AccountS + case strings.HasPrefix(m, utils.ActionS): + m = strings.TrimRight(m, utils.ActionS) + key = utils.ActionS + case strings.HasPrefix(m, utils.AttributeS): + m = strings.TrimRight(m, utils.AttributeS) + key = utils.AttributeS + case strings.HasPrefix(m, utils.CacheS): + m = strings.TrimRight(m, utils.CacheS) + key = utils.CacheS + case strings.HasPrefix(m, utils.ChargerS): + m = strings.TrimRight(m, utils.ChargerS) + key = utils.ChargerS + case strings.HasPrefix(m, utils.ConfigS): + m = strings.TrimRight(m, utils.ConfigS) + key = utils.ConfigS + case strings.HasPrefix(m, utils.DispatcherS): + m = strings.TrimRight(m, utils.DispatcherS) + key = utils.DispatcherS + case strings.HasPrefix(m, utils.GuardianS): + m = strings.TrimRight(m, utils.GuardianS) + key = utils.GuardianS + case strings.HasPrefix(m, utils.RateS): + m = strings.TrimRight(m, utils.RateS) + key = utils.RateS + // case strings.HasPrefix(m, utils.ReplicatorS): + // m = strings.TrimRight(m, utils.ReplicatorS) + // key = utils.ReplicatorS + case strings.HasPrefix(m, utils.ResourceS): + m = strings.TrimRight(m, utils.ResourceS) + key = utils.ResourceS + case strings.HasPrefix(m, utils.RouteS): + m = strings.TrimRight(m, utils.RouteS) + key = utils.RouteS + case strings.HasPrefix(m, utils.SessionS): + m = strings.TrimRight(m, utils.SessionS) + key = utils.SessionS + case strings.HasPrefix(m, utils.StatS): + m = strings.TrimRight(m, utils.StatS) + key = utils.StatS + case strings.HasPrefix(m, utils.ThresholdS): + m = strings.TrimRight(m, utils.ThresholdS) + key = utils.ThresholdS + case strings.HasPrefix(m, utils.CDRs): + m = strings.TrimRight(m, utils.CDRs) + key = utils.CDRs + + case len(m) < 2 || m[0] != 'V': + continue + } + key += "v" + string(m[1]) + srv2, has := s[key] + if !has { + srv2 = new(birpc.Service) + *srv2 = *srv + srv2.Methods = make(map[string]*birpc.MethodType) + s[key] = srv2 + } + srv2.Methods[m[2:]] = v + + } + return s, nil +} + +type IntService map[string]*birpc.Service + +func (s IntService) Call(ctx *context.Context, method string, args, reply interface{}) error { + return s[strings.Split(method, utils.NestingSep)[0]].Call(ctx, method, args, reply) +} diff --git a/services/accounts.go b/services/accounts.go index c266e41b3..7ea30122d 100644 --- a/services/accounts.go +++ b/services/accounts.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/birpc" "github.com/cgrates/cgrates/accounts" @@ -103,9 +102,12 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AccountS)) - srv, _ := birpc.NewService(apis.NewAccountSv1(acts.acts), "", false) + srv, _ := engine.NewService(acts.acts) + // srv, _ := birpc.NewService(apis.NewAccountSv1(acts.acts), "", false) if !acts.cfg.DispatcherSCfg().Enabled { - acts.server.RpcRegister(srv) + for _, s := range srv { + acts.server.RpcRegister(s) + } } acts.connChan <- acts.anz.GetInternalCodec(srv, utils.AccountS) return diff --git a/services/actions.go b/services/actions.go index f10ba11ec..31a700749 100644 --- a/services/actions.go +++ b/services/actions.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/birpc" "github.com/cgrates/cgrates/actions" @@ -102,9 +101,12 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ActionS)) - srv, _ := birpc.NewService(apis.NewActionSv1(acts.acts), "", false) + srv, _ := engine.NewService(acts.acts) + // srv, _ := birpc.NewService(apis.NewActionSv1(acts.acts), "", false) if !acts.cfg.DispatcherSCfg().Enabled { - acts.server.RpcRegister(srv) + for _, s := range srv { + acts.server.RpcRegister(s) + } } acts.connChan <- acts.anz.GetInternalCodec(srv, utils.ActionS) return diff --git a/services/adminsv1.go b/services/adminsv1.go index fcf049ae5..0c19d47f0 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -96,14 +96,17 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF // go apiService.api.ListenAndServe(apiService.stopChan) // runtime.Gosched() - srv, _ := birpc.NewService(apiService.api, "", false) + srv, _ := engine.NewService(apiService.api) + // srv, _ := birpc.NewService(apiService.api, "", false) if !apiService.cfg.DispatcherSCfg().Enabled { - apiService.server.RpcRegister(srv) + for _, s := range srv { + apiService.server.RpcRegister(s) + } } //backwards compatible - apiService.connChan <- apiService.anz.GetInternalCodec(srv, srv.Name) + apiService.connChan <- apiService.anz.GetInternalCodec(srv, utils.AdminSv1) return } diff --git a/services/analyzers.go b/services/analyzers.go index 300e4e36e..c9b9413d1 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -25,7 +25,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/analyzers" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -97,9 +96,12 @@ func (anz *AnalyzerService) start(ctx *context.Context) { anz.Lock() anz.anz.SetFilterS(fS) - srv, _ := birpc.NewService(apis.NewAnalyzerSv1(anz.anz), "", false) + srv, _ := engine.NewService(anz.anz) + // srv, _ := birpc.NewService(apis.NewAnalyzerSv1(anz.anz), "", false) if !anz.cfg.DispatcherSCfg().Enabled { - anz.server.RpcRegister(srv) + for _, s := range srv { + anz.server.RpcRegister(s) + } } anz.Unlock() anz.connChan <- anz.GetInternalCodec(srv, utils.AnalyzerS) diff --git a/services/attributes.go b/services/attributes.go index 6e4ca092a..f27d652df 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -95,9 +95,12 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc) attrS.attrS = engine.NewAttributeService(datadb, filterS, attrS.cfg) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AttributeS)) attrS.rpc = apis.NewAttributeSv1(attrS.attrS) - srv, _ := birpc.NewService(attrS.rpc, "", false) + srv, _ := engine.NewService(attrS.rpc) + // srv, _ := birpc.NewService(attrS.rpc, "", false) if !attrS.cfg.DispatcherSCfg().Enabled { - attrS.server.RpcRegister(srv) + for _, s := range srv { + attrS.server.RpcRegister(s) + } } dspShtdChan := attrS.dspS.RegisterShutdownChan(attrS.ServiceName()) go func() { diff --git a/services/caches.go b/services/caches.go index 723af8341..bdb37daca 100644 --- a/services/caches.go +++ b/services/caches.go @@ -23,7 +23,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -76,11 +75,14 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e cS.cacheCh <- engine.Cache - chSv1, _ := birpc.NewService(apis.NewCacheSv1(engine.Cache), "", false) + srv, _ := engine.NewService(engine.Cache) + // srv, _ := birpc.NewService(apis.NewCacheSv1(engine.Cache), "", false) if !cS.cfg.DispatcherSCfg().Enabled { - cS.server.RpcRegister(chSv1) + for _, s := range srv { + cS.server.RpcRegister(s) + } } - cS.rpc <- cS.anz.GetInternalCodec(chSv1, utils.CacheS) + cS.rpc <- cS.anz.GetInternalCodec(srv, utils.CacheS) return } diff --git a/services/cdrs.go b/services/cdrs.go index 1072bfac5..dd0c57277 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -25,7 +25,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -99,9 +98,12 @@ func (cdrService *CDRServer) Start(ctx *context.Context, _ context.CancelFunc) ( go cdrService.cdrS.ListenAndServe(cdrService.stopChan) runtime.Gosched() utils.Logger.Info("Registering CDRS RPC service.") - srv, _ := birpc.NewService(apis.NewCDRsV1(cdrService.cdrS), "", false) + srv, _ := engine.NewService(cdrService.cdrS) + // srv, _ := birpc.NewService(apis.NewCDRsV1(cdrService.cdrS), "", false) if !cdrService.cfg.DispatcherSCfg().Enabled { - cdrService.server.RpcRegister(srv) + for _, s := range srv { + cdrService.server.RpcRegister(s) + } } cdrService.connChan <- cdrService.anz.GetInternalCodec(srv, utils.CDRServer) // Signal that cdrS is operational return diff --git a/services/chargers.go b/services/chargers.go index 95a7ab13c..a44575800 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/birpc" "github.com/cgrates/cgrates/config" @@ -93,9 +92,12 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e defer chrS.Unlock() chrS.chrS = engine.NewChargerService(datadb, filterS, chrS.cfg, chrS.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ChargerS)) - srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false) + srv, _ := engine.NewService(chrS.chrS) + // srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false) if !chrS.cfg.DispatcherSCfg().Enabled { - chrS.server.RpcRegister(srv) + for _, s := range srv { + chrS.server.RpcRegister(s) + } } chrS.connChan <- chrS.anz.GetInternalCodec(srv, utils.ChargerS) return diff --git a/services/cores.go b/services/cores.go index ad01c08cb..ba76e7acd 100644 --- a/services/cores.go +++ b/services/cores.go @@ -25,7 +25,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -82,9 +81,12 @@ func (cS *CoreService) Start(_ *context.Context, shtDw context.CancelFunc) (_ er cS.stopChan = make(chan struct{}) cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.fileMem, cS.stopChan, cS.stopMemPrf, cS.shdWg, shtDw) cS.csCh <- cS.cS - srv, _ := birpc.NewService(apis.NewCoreSv1(cS.cS), utils.EmptyString, false) + srv, _ := engine.NewService(cS.cS) + // srv, _ := birpc.NewService(apis.NewCoreSv1(cS.cS), utils.EmptyString, false) if !cS.cfg.DispatcherSCfg().Enabled { - cS.server.RpcRegister(srv) + for _, s := range srv { + cS.server.RpcRegister(s) + } } cS.connChan <- cS.anz.GetInternalCodec(srv, utils.CoreS) return diff --git a/services/dispatchers.go b/services/dispatchers.go index 30148d0b1..dadc4776e 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -19,12 +19,10 @@ along with this program. If not, see package services import ( - "strings" "sync" "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/dispatchers" @@ -100,72 +98,14 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) dspS.unregisterAllDispatchedSubsystems() // unregister all rpc services that can be dispatched - srv, _ := birpc.NewService(apis.NewDispatcherSv1(dspS.dspS), "", false) - dspS.server.RpcRegister(srv) - - attrsv1, _ := birpc.NewServiceWithMethodsRename(dspS.dspS, utils.AttributeSv1, true, func(oldFn string) (newFn string) { - if strings.HasPrefix(oldFn, utils.AttributeSv1) { - return strings.TrimPrefix(oldFn, utils.AttributeSv1) - } - return - }) - dspS.server.RpcRegisterName(utils.AttributeSv1, attrsv1) + srv, _ := engine.NewDispatcherService(dspS.dspS) + // srv, _ := birpc.NewService(apis.NewDispatcherSv1(dspS.dspS), "", false) + for _, s := range srv { + dspS.server.RpcRegister(s) + } // for the moment we dispable Apier through dispatcher // until we figured out a better sollution in case of gob server // dspS.server.SetDispatched() - /* - - dspS.server.RpcRegisterName(utils.ThresholdSv1, - v1.NewDispatcherThresholdSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.StatSv1, - v1.NewDispatcherStatSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.ResourceSv1, - v1.NewDispatcherResourceSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.RouteSv1, - v1.NewDispatcherRouteSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.AttributeSv1, - v1.NewDispatcherAttributeSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.SessionSv1, - v1.NewDispatcherSessionSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.ChargerSv1, - v1.NewDispatcherChargerSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.CacheSv1, - v1.NewDispatcherCacheSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.GuardianSv1, - v1.NewDispatcherGuardianSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.CDRsV1, - v1.NewDispatcherSCDRsV1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.ConfigSv1, - v1.NewDispatcherConfigSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.CoreSv1, - v1.NewDispatcherCoreSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.ReplicatorSv1, - v1.NewDispatcherReplicatorSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.CDRsV2, - v2.NewDispatcherSCDRsV2(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.RateSv1, - v1.NewDispatcherRateSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.ActionSv1, - v1.NewDispatcherActionSv1(dspS.dspS)) - - dspS.server.RpcRegisterName(utils.AccountSv1, - v1.NewDispatcherAccountSv1(dspS.dspS)) - */ dspS.connChan <- dspS.anz.GetInternalCodec(srv, utils.DispatcherS) return diff --git a/services/ees.go b/services/ees.go index 5cd48a201..39a2e1f0c 100644 --- a/services/ees.go +++ b/services/ees.go @@ -24,7 +24,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/ees" @@ -63,7 +62,6 @@ type EventExporterService struct { stopChan chan struct{} eeS *ees.EventExporterS - rpc *apis.EeSv1 anz *AnalyzerService srvDep map[string]*sync.WaitGroup } @@ -123,10 +121,12 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc es.stopChan = make(chan struct{}) go es.eeS.ListenAndServe(es.stopChan, es.rldChan) - es.rpc = apis.NewEeSv1(es.eeS) - srv, _ := birpc.NewService(es.rpc, "", false) + srv, _ := engine.NewService(es.eeS) + // srv, _ := birpc.NewService(es.rpc, "", false) if !es.cfg.DispatcherSCfg().Enabled { - es.server.RpcRegister(srv) + for _, s := range srv { + es.server.RpcRegister(s) + } } es.intConnChan <- es.anz.GetInternalCodec(srv, utils.EEs) return diff --git a/services/libcgr-engine.go b/services/libcgr-engine.go index 8752574c3..051d87122 100644 --- a/services/libcgr-engine.go +++ b/services/libcgr-engine.go @@ -31,7 +31,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -201,11 +200,14 @@ func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector, cfg *config.CGRConfig, server *cores.Server, anz *AnalyzerService) { - cfgSv1, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false) + srv, _ := engine.NewService(cfg) + // srv, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false) if !cfg.DispatcherSCfg().Enabled { - server.RpcRegister(cfgSv1) + for _, s := range srv { + server.RpcRegister(s) + } } - iConfigCh <- anz.GetInternalCodec(cfgSv1, utils.ConfigSv1) + iConfigCh <- anz.GetInternalCodec(srv, utils.ConfigSv1) } func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, diff --git a/services/loaders.go b/services/loaders.go index f13f3140a..203de71e4 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -22,7 +22,6 @@ import ( "sync" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/birpc" "github.com/cgrates/cgrates/config" @@ -93,9 +92,12 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er if err = ldrs.ldrs.ListenAndServe(ldrs.stopChan); err != nil { return } - srv, _ := birpc.NewService(apis.NewLoaderSv1(ldrs.ldrs), "", false) + srv, _ := engine.NewService(ldrs.ldrs) + // srv, _ := birpc.NewService(apis.NewLoaderSv1(ldrs.ldrs), "", false) if !ldrs.cfg.DispatcherSCfg().Enabled { - ldrs.server.RpcRegister(srv) + for _, s := range srv { + ldrs.server.RpcRegister(s) + } } ldrs.connChan <- ldrs.anz.GetInternalCodec(srv, utils.LoaderS) return diff --git a/services/rates.go b/services/rates.go index 9e97b7caa..a25142e28 100644 --- a/services/rates.go +++ b/services/rates.go @@ -23,7 +23,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -135,9 +134,12 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er rs.stopChan = make(chan struct{}) go rs.rateS.ListenAndServe(rs.stopChan, rs.rldChan) - srv, _ := birpc.NewService(apis.NewRateSv1(rs.rateS), "", false) + srv, _ := engine.NewService(rs.rateS) + // srv, _ := birpc.NewService(apis.NewRateSv1(rs.rateS), "", false) if !rs.cfg.DispatcherSCfg().Enabled { - rs.server.RpcRegister(srv) + for _, s := range srv { + rs.server.RpcRegister(s) + } } rs.intConnChan <- rs.anz.GetInternalCodec(srv, utils.RateS) return diff --git a/services/resources.go b/services/resources.go index ba5a66526..ca51e9de4 100644 --- a/services/resources.go +++ b/services/resources.go @@ -24,7 +24,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -96,9 +95,12 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e reS.reS = engine.NewResourceService(datadb, reS.cfg, filterS, reS.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ResourceS)) reS.reS.StartLoop(ctx) - srv, _ := birpc.NewService(apis.NewResourceSv1(reS.reS), "", false) + srv, _ := engine.NewService(reS.reS) + // srv, _ := birpc.NewService(apis.NewResourceSv1(reS.reS), "", false) if !reS.cfg.DispatcherSCfg().Enabled { - reS.server.RpcRegister(srv) + for _, s := range srv { + reS.server.RpcRegister(s) + } } reS.connChan <- reS.anz.GetInternalCodec(srv, utils.ResourceS) return diff --git a/services/routes.go b/services/routes.go index dbdef630c..a14682225 100644 --- a/services/routes.go +++ b/services/routes.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/birpc" "github.com/cgrates/cgrates/config" @@ -94,9 +93,12 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e routeS.routeS = engine.NewRouteService(datadb, filterS, routeS.cfg, routeS.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.RouteS)) - srv, _ := birpc.NewService(apis.NewRouteSv1(routeS.routeS), "", false) + srv, _ := engine.NewService(routeS.routeS) + // srv, _ := birpc.NewService(apis.NewRouteSv1(routeS.routeS), "", false) if !routeS.cfg.DispatcherSCfg().Enabled { - routeS.server.RpcRegister(srv) + for _, s := range srv { + routeS.server.RpcRegister(s) + } } routeS.connChan <- routeS.anz.GetInternalCodec(srv, utils.RouteS) return diff --git a/services/sessions.go b/services/sessions.go index cb0ea1d01..1ceea9023 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/birpc" "github.com/cgrates/cgrates/cores" @@ -96,9 +95,12 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc) // Pass internal connection via BiRPCClient // Register RPC handler - srv, _ := birpc.NewService(apis.NewSessionSv1(smg.sm), utils.EmptyString, false) // methods with multiple options + srv, _ := engine.NewService(smg.sm) // methods with multiple options + // srv, _ := birpc.NewService(apis.NewSessionSv1(smg.sm), utils.EmptyString, false) // methods with multiple options if !smg.cfg.DispatcherSCfg().Enabled { - smg.server.RpcRegister(srv) + for _, s := range srv { + smg.server.RpcRegister(s) + } } smg.connChan <- smg.anz.GetInternalCodec(srv, utils.SessionS) // Register BiRpc handlers diff --git a/services/stats.go b/services/stats.go index c453719f7..2b796ee87 100644 --- a/services/stats.go +++ b/services/stats.go @@ -24,7 +24,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -97,9 +96,12 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS)) sts.sts.StartLoop(ctx) - srv, _ := birpc.NewService(apis.NewStatSv1(sts.sts), "", false) + srv, _ := engine.NewService(sts.sts) + // srv, _ := birpc.NewService(apis.NewStatSv1(sts.sts), "", false) if !sts.cfg.DispatcherSCfg().Enabled { - sts.server.RpcRegister(srv) + for _, s := range srv { + sts.server.RpcRegister(s) + } } sts.connChan <- sts.anz.GetInternalCodec(srv, utils.StatS) return diff --git a/services/thresholds.go b/services/thresholds.go index e2d844b0c..0fee60a6c 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -24,7 +24,6 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -96,9 +95,12 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS)) thrs.thrs.StartLoop(ctx) - srv, _ := birpc.NewService(apis.NewThresholdSv1(thrs.thrs), "", false) + srv, _ := engine.NewService(thrs.thrs) + // srv, _ := birpc.NewService(apis.NewThresholdSv1(thrs.thrs), "", false) if !thrs.cfg.DispatcherSCfg().Enabled { - thrs.server.RpcRegister(srv) + for _, s := range srv { + thrs.server.RpcRegister(s) + } } thrs.connChan <- thrs.anz.GetInternalCodec(srv, utils.ThresholdS) return diff --git a/utils/consts.go b/utils/consts.go index f3b7257a7..457c72881 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1247,6 +1247,7 @@ const ( // ConfigSv1 APIs const ( + ConfigS = "ConfigS" ConfigSv1 = "ConfigSv1" ConfigSv1ReloadConfig = "ConfigSv1.ReloadConfig" ConfigSv1GetConfig = "ConfigSv1.GetConfig"