mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 12:49:54 +05:00
Updated services
This commit is contained in:
committed by
Dan Christian Bogos
parent
eac50e21f1
commit
a4ebbfe67b
@@ -143,25 +143,6 @@ func TestConfigSanityLoaders(t *testing.T) {
|
||||
t.Errorf("Expecting: %+q received: %+q", expected, err)
|
||||
}
|
||||
|
||||
cfg.loaderCfg = LoaderSCfgs{
|
||||
&LoaderSCfg{
|
||||
Enabled: true,
|
||||
TpInDir: "/",
|
||||
TpOutDir: "/",
|
||||
Data: []*LoaderDataType{{
|
||||
Type: utils.MetaStats,
|
||||
Fields: []*FCTemplate{{
|
||||
Type: utils.MetaStats,
|
||||
Tag: "test1",
|
||||
}},
|
||||
}},
|
||||
},
|
||||
}
|
||||
expected = "<LoaderS> invalid field type *stats for *stats at test1"
|
||||
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {
|
||||
t.Errorf("Expecting: %+q received: %+q", expected, err)
|
||||
}
|
||||
|
||||
cfg.loaderCfg = LoaderSCfgs{
|
||||
&LoaderSCfg{
|
||||
Enabled: true,
|
||||
|
||||
@@ -37,7 +37,7 @@ func (dS *DispatcherService) AccountSv1Ping(args *utils.CGREvent, rpl *string) (
|
||||
return dS.Dispatch(context.TODO(), args, utils.MetaAccounts, utils.AccountSv1Ping, args, rpl)
|
||||
}
|
||||
|
||||
func (dS *DispatcherService) AccountsForEvent(args *utils.CGREvent, reply *[]*utils.Account) (err error) {
|
||||
func (dS *DispatcherService) AccountSv1AccountsForEvent(args *utils.CGREvent, reply *[]*utils.Account) (err error) {
|
||||
tnt := dS.cfg.GeneralCfg().DefaultTenant
|
||||
if args != nil && args.Tenant != utils.EmptyString {
|
||||
tnt = args.Tenant
|
||||
@@ -51,7 +51,7 @@ func (dS *DispatcherService) AccountsForEvent(args *utils.CGREvent, reply *[]*ut
|
||||
return dS.Dispatch(context.TODO(), args, utils.MetaAccounts, utils.AccountSv1AccountsForEvent, args, reply)
|
||||
}
|
||||
|
||||
func (dS *DispatcherService) MaxAbstracts(args *utils.CGREvent, reply *utils.EventCharges) (err error) {
|
||||
func (dS *DispatcherService) AccountSv1MaxAbstracts(args *utils.CGREvent, reply *utils.EventCharges) (err error) {
|
||||
tnt := dS.cfg.GeneralCfg().DefaultTenant
|
||||
if args != nil && args.Tenant != utils.EmptyString {
|
||||
tnt = args.Tenant
|
||||
@@ -65,7 +65,7 @@ func (dS *DispatcherService) MaxAbstracts(args *utils.CGREvent, reply *utils.Eve
|
||||
return dS.Dispatch(context.TODO(), args, utils.MetaAccounts, utils.AccountSv1MaxAbstracts, args, reply)
|
||||
}
|
||||
|
||||
func (dS *DispatcherService) DebitAbstracts(args *utils.CGREvent, reply *utils.EventCharges) (err error) {
|
||||
func (dS *DispatcherService) AccountSv1DebitAbstracts(args *utils.CGREvent, reply *utils.EventCharges) (err error) {
|
||||
tnt := dS.cfg.GeneralCfg().DefaultTenant
|
||||
if args != nil && args.Tenant != utils.EmptyString {
|
||||
tnt = args.Tenant
|
||||
@@ -79,7 +79,7 @@ func (dS *DispatcherService) DebitAbstracts(args *utils.CGREvent, reply *utils.E
|
||||
return dS.Dispatch(context.TODO(), args, utils.MetaAccounts, utils.AccountSv1DebitAbstracts, args, reply)
|
||||
}
|
||||
|
||||
func (dS *DispatcherService) MaxConcretes(args *utils.CGREvent, reply *utils.EventCharges) (err error) {
|
||||
func (dS *DispatcherService) AccountSv1MaxConcretes(args *utils.CGREvent, reply *utils.EventCharges) (err error) {
|
||||
tnt := dS.cfg.GeneralCfg().DefaultTenant
|
||||
if args != nil && args.Tenant != utils.EmptyString {
|
||||
tnt = args.Tenant
|
||||
@@ -93,7 +93,7 @@ func (dS *DispatcherService) MaxConcretes(args *utils.CGREvent, reply *utils.Eve
|
||||
return dS.Dispatch(context.TODO(), args, utils.MetaAccounts, utils.AccountSv1MaxConcretes, args, reply)
|
||||
}
|
||||
|
||||
func (dS *DispatcherService) DebitConcretes(args *utils.CGREvent, reply *utils.EventCharges) (err error) {
|
||||
func (dS *DispatcherService) AccountSv1DebitConcretes(args *utils.CGREvent, reply *utils.EventCharges) (err error) {
|
||||
tnt := dS.cfg.GeneralCfg().DefaultTenant
|
||||
if args != nil && args.Tenant != utils.EmptyString {
|
||||
tnt = args.Tenant
|
||||
|
||||
@@ -72,7 +72,7 @@ func TestDspAAccountsForEventNil(t *testing.T) {
|
||||
Tenant: "tenant",
|
||||
}
|
||||
var reply *[]*utils.Account
|
||||
result := dspSrv.AccountsForEvent(CGREvent, reply)
|
||||
result := dspSrv.AccountSv1AccountsForEvent(CGREvent, reply)
|
||||
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
@@ -87,7 +87,7 @@ func TestDspAccountsForEventErrorNil(t *testing.T) {
|
||||
Tenant: "tenant",
|
||||
}
|
||||
var reply *[]*utils.Account
|
||||
result := dspSrv.AccountsForEvent(CGREvent, reply)
|
||||
result := dspSrv.AccountSv1AccountsForEvent(CGREvent, reply)
|
||||
expected := "MANDATORY_IE_MISSING: [ApiKey]"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
@@ -101,7 +101,7 @@ func TestDspMaxAbstractsNil(t *testing.T) {
|
||||
Tenant: "tenant",
|
||||
}
|
||||
var reply *utils.EventCharges
|
||||
result := dspSrv.MaxAbstracts(CGREvent, reply)
|
||||
result := dspSrv.AccountSv1MaxAbstracts(CGREvent, reply)
|
||||
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
@@ -116,7 +116,7 @@ func TestDspMaxAbstractsErrorNil(t *testing.T) {
|
||||
Tenant: "tenant",
|
||||
}
|
||||
var reply *utils.EventCharges
|
||||
result := dspSrv.MaxAbstracts(CGREvent, reply)
|
||||
result := dspSrv.AccountSv1MaxAbstracts(CGREvent, reply)
|
||||
expected := "MANDATORY_IE_MISSING: [ApiKey]"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
@@ -130,7 +130,7 @@ func TestDspDebitAbstractsNil(t *testing.T) {
|
||||
Tenant: "tenant",
|
||||
}
|
||||
var reply *utils.EventCharges
|
||||
result := dspSrv.DebitAbstracts(CGREvent, reply)
|
||||
result := dspSrv.AccountSv1DebitAbstracts(CGREvent, reply)
|
||||
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
@@ -145,7 +145,7 @@ func TestDspDebitAbstractsErrorNil(t *testing.T) {
|
||||
Tenant: "tenant",
|
||||
}
|
||||
var reply *utils.EventCharges
|
||||
result := dspSrv.DebitAbstracts(CGREvent, reply)
|
||||
result := dspSrv.AccountSv1DebitAbstracts(CGREvent, reply)
|
||||
expected := "MANDATORY_IE_MISSING: [ApiKey]"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
@@ -159,7 +159,7 @@ func TestDspMaxConcretesNil(t *testing.T) {
|
||||
Tenant: "tenant",
|
||||
}
|
||||
var reply *utils.EventCharges
|
||||
result := dspSrv.MaxConcretes(CGREvent, reply)
|
||||
result := dspSrv.AccountSv1MaxConcretes(CGREvent, reply)
|
||||
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
@@ -174,7 +174,7 @@ func TestDspMaxConcretesErrorNil(t *testing.T) {
|
||||
Tenant: "tenant",
|
||||
}
|
||||
var reply *utils.EventCharges
|
||||
result := dspSrv.MaxConcretes(CGREvent, reply)
|
||||
result := dspSrv.AccountSv1MaxConcretes(CGREvent, reply)
|
||||
expected := "MANDATORY_IE_MISSING: [ApiKey]"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
@@ -188,7 +188,7 @@ func TestDspDebitConcretesNil(t *testing.T) {
|
||||
Tenant: "tenant",
|
||||
}
|
||||
var reply *utils.EventCharges
|
||||
result := dspSrv.DebitConcretes(CGREvent, reply)
|
||||
result := dspSrv.AccountSv1DebitConcretes(CGREvent, reply)
|
||||
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
@@ -203,7 +203,7 @@ func TestDspDebitConcretesErrorNil(t *testing.T) {
|
||||
Tenant: "tenant",
|
||||
}
|
||||
var reply *utils.EventCharges
|
||||
result := dspSrv.DebitConcretes(CGREvent, reply)
|
||||
result := dspSrv.AccountSv1DebitConcretes(CGREvent, reply)
|
||||
expected := "MANDATORY_IE_MISSING: [ApiKey]"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
|
||||
@@ -125,3 +125,108 @@ func (s RPCClientSet) Call(ctx *context.Context, method string, args interface{}
|
||||
}
|
||||
return conn.Call(ctx, method, args, reply)
|
||||
}
|
||||
|
||||
func NewService(val interface{}) (_ IntService, err error) {
|
||||
var srv *birpc.Service
|
||||
if srv, err = birpc.NewService(val, utils.EmptyString, false); err != nil {
|
||||
return
|
||||
}
|
||||
s := IntService{srv.Name: srv}
|
||||
for m, v := range srv.Methods {
|
||||
if len(m) < 2 || m[0] != 'V' {
|
||||
continue
|
||||
}
|
||||
key := srv.Name + "v" + string(m[1])
|
||||
srv2, has := s[key]
|
||||
if !has {
|
||||
srv2 = new(birpc.Service)
|
||||
*srv2 = *srv
|
||||
srv2.Methods = make(map[string]*birpc.MethodType)
|
||||
s[key] = srv2
|
||||
}
|
||||
srv2.Methods[m[2:]] = v
|
||||
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func NewDispatcherService(val interface{}) (_ IntService, err error) {
|
||||
var srv *birpc.Service
|
||||
if srv, err = birpc.NewService(val, utils.EmptyString, false); err != nil {
|
||||
return
|
||||
}
|
||||
s := IntService{srv.Name: srv}
|
||||
for m, v := range srv.Methods {
|
||||
key := srv.Name
|
||||
switch {
|
||||
case strings.HasPrefix(m, utils.AccountS):
|
||||
m = strings.TrimRight(m, utils.AccountS)
|
||||
key = utils.AccountS
|
||||
case strings.HasPrefix(m, utils.ActionS):
|
||||
m = strings.TrimRight(m, utils.ActionS)
|
||||
key = utils.ActionS
|
||||
case strings.HasPrefix(m, utils.AttributeS):
|
||||
m = strings.TrimRight(m, utils.AttributeS)
|
||||
key = utils.AttributeS
|
||||
case strings.HasPrefix(m, utils.CacheS):
|
||||
m = strings.TrimRight(m, utils.CacheS)
|
||||
key = utils.CacheS
|
||||
case strings.HasPrefix(m, utils.ChargerS):
|
||||
m = strings.TrimRight(m, utils.ChargerS)
|
||||
key = utils.ChargerS
|
||||
case strings.HasPrefix(m, utils.ConfigS):
|
||||
m = strings.TrimRight(m, utils.ConfigS)
|
||||
key = utils.ConfigS
|
||||
case strings.HasPrefix(m, utils.DispatcherS):
|
||||
m = strings.TrimRight(m, utils.DispatcherS)
|
||||
key = utils.DispatcherS
|
||||
case strings.HasPrefix(m, utils.GuardianS):
|
||||
m = strings.TrimRight(m, utils.GuardianS)
|
||||
key = utils.GuardianS
|
||||
case strings.HasPrefix(m, utils.RateS):
|
||||
m = strings.TrimRight(m, utils.RateS)
|
||||
key = utils.RateS
|
||||
// case strings.HasPrefix(m, utils.ReplicatorS):
|
||||
// m = strings.TrimRight(m, utils.ReplicatorS)
|
||||
// key = utils.ReplicatorS
|
||||
case strings.HasPrefix(m, utils.ResourceS):
|
||||
m = strings.TrimRight(m, utils.ResourceS)
|
||||
key = utils.ResourceS
|
||||
case strings.HasPrefix(m, utils.RouteS):
|
||||
m = strings.TrimRight(m, utils.RouteS)
|
||||
key = utils.RouteS
|
||||
case strings.HasPrefix(m, utils.SessionS):
|
||||
m = strings.TrimRight(m, utils.SessionS)
|
||||
key = utils.SessionS
|
||||
case strings.HasPrefix(m, utils.StatS):
|
||||
m = strings.TrimRight(m, utils.StatS)
|
||||
key = utils.StatS
|
||||
case strings.HasPrefix(m, utils.ThresholdS):
|
||||
m = strings.TrimRight(m, utils.ThresholdS)
|
||||
key = utils.ThresholdS
|
||||
case strings.HasPrefix(m, utils.CDRs):
|
||||
m = strings.TrimRight(m, utils.CDRs)
|
||||
key = utils.CDRs
|
||||
|
||||
case len(m) < 2 || m[0] != 'V':
|
||||
continue
|
||||
}
|
||||
key += "v" + string(m[1])
|
||||
srv2, has := s[key]
|
||||
if !has {
|
||||
srv2 = new(birpc.Service)
|
||||
*srv2 = *srv
|
||||
srv2.Methods = make(map[string]*birpc.MethodType)
|
||||
s[key] = srv2
|
||||
}
|
||||
srv2.Methods[m[2:]] = v
|
||||
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
type IntService map[string]*birpc.Service
|
||||
|
||||
func (s IntService) Call(ctx *context.Context, method string, args, reply interface{}) error {
|
||||
return s[strings.Split(method, utils.NestingSep)[0]].Call(ctx, method, args, reply)
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/cgrates/accounts"
|
||||
@@ -103,9 +102,12 @@ func (acts *AccountService) Start(ctx *context.Context, _ context.CancelFunc) (e
|
||||
go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan)
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AccountS))
|
||||
srv, _ := birpc.NewService(apis.NewAccountSv1(acts.acts), "", false)
|
||||
srv, _ := engine.NewService(acts.acts)
|
||||
// srv, _ := birpc.NewService(apis.NewAccountSv1(acts.acts), "", false)
|
||||
if !acts.cfg.DispatcherSCfg().Enabled {
|
||||
acts.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
acts.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
acts.connChan <- acts.anz.GetInternalCodec(srv, utils.AccountS)
|
||||
return
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/cgrates/actions"
|
||||
@@ -102,9 +101,12 @@ func (acts *ActionService) Start(ctx *context.Context, _ context.CancelFunc) (er
|
||||
go acts.acts.ListenAndServe(acts.stopChan, acts.rldChan)
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ActionS))
|
||||
srv, _ := birpc.NewService(apis.NewActionSv1(acts.acts), "", false)
|
||||
srv, _ := engine.NewService(acts.acts)
|
||||
// srv, _ := birpc.NewService(apis.NewActionSv1(acts.acts), "", false)
|
||||
if !acts.cfg.DispatcherSCfg().Enabled {
|
||||
acts.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
acts.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
acts.connChan <- acts.anz.GetInternalCodec(srv, utils.ActionS)
|
||||
return
|
||||
|
||||
@@ -96,14 +96,17 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
|
||||
|
||||
// go apiService.api.ListenAndServe(apiService.stopChan)
|
||||
// runtime.Gosched()
|
||||
srv, _ := birpc.NewService(apiService.api, "", false)
|
||||
srv, _ := engine.NewService(apiService.api)
|
||||
// srv, _ := birpc.NewService(apiService.api, "", false)
|
||||
|
||||
if !apiService.cfg.DispatcherSCfg().Enabled {
|
||||
apiService.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
apiService.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
|
||||
//backwards compatible
|
||||
apiService.connChan <- apiService.anz.GetInternalCodec(srv, srv.Name)
|
||||
apiService.connChan <- apiService.anz.GetInternalCodec(srv, utils.AdminSv1)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/analyzers"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -97,9 +96,12 @@ func (anz *AnalyzerService) start(ctx *context.Context) {
|
||||
anz.Lock()
|
||||
anz.anz.SetFilterS(fS)
|
||||
|
||||
srv, _ := birpc.NewService(apis.NewAnalyzerSv1(anz.anz), "", false)
|
||||
srv, _ := engine.NewService(anz.anz)
|
||||
// srv, _ := birpc.NewService(apis.NewAnalyzerSv1(anz.anz), "", false)
|
||||
if !anz.cfg.DispatcherSCfg().Enabled {
|
||||
anz.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
anz.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
anz.Unlock()
|
||||
anz.connChan <- anz.GetInternalCodec(srv, utils.AnalyzerS)
|
||||
|
||||
@@ -95,9 +95,12 @@ func (attrS *AttributeService) Start(ctx *context.Context, _ context.CancelFunc)
|
||||
attrS.attrS = engine.NewAttributeService(datadb, filterS, attrS.cfg)
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AttributeS))
|
||||
attrS.rpc = apis.NewAttributeSv1(attrS.attrS)
|
||||
srv, _ := birpc.NewService(attrS.rpc, "", false)
|
||||
srv, _ := engine.NewService(attrS.rpc)
|
||||
// srv, _ := birpc.NewService(attrS.rpc, "", false)
|
||||
if !attrS.cfg.DispatcherSCfg().Enabled {
|
||||
attrS.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
attrS.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
dspShtdChan := attrS.dspS.RegisterShutdownChan(attrS.ServiceName())
|
||||
go func() {
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -76,11 +75,14 @@ func (cS *CacheService) Start(ctx *context.Context, shtDw context.CancelFunc) (e
|
||||
|
||||
cS.cacheCh <- engine.Cache
|
||||
|
||||
chSv1, _ := birpc.NewService(apis.NewCacheSv1(engine.Cache), "", false)
|
||||
srv, _ := engine.NewService(engine.Cache)
|
||||
// srv, _ := birpc.NewService(apis.NewCacheSv1(engine.Cache), "", false)
|
||||
if !cS.cfg.DispatcherSCfg().Enabled {
|
||||
cS.server.RpcRegister(chSv1)
|
||||
for _, s := range srv {
|
||||
cS.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
cS.rpc <- cS.anz.GetInternalCodec(chSv1, utils.CacheS)
|
||||
cS.rpc <- cS.anz.GetInternalCodec(srv, utils.CacheS)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -99,9 +98,12 @@ func (cdrService *CDRServer) Start(ctx *context.Context, _ context.CancelFunc) (
|
||||
go cdrService.cdrS.ListenAndServe(cdrService.stopChan)
|
||||
runtime.Gosched()
|
||||
utils.Logger.Info("Registering CDRS RPC service.")
|
||||
srv, _ := birpc.NewService(apis.NewCDRsV1(cdrService.cdrS), "", false)
|
||||
srv, _ := engine.NewService(cdrService.cdrS)
|
||||
// srv, _ := birpc.NewService(apis.NewCDRsV1(cdrService.cdrS), "", false)
|
||||
if !cdrService.cfg.DispatcherSCfg().Enabled {
|
||||
cdrService.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
cdrService.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
cdrService.connChan <- cdrService.anz.GetInternalCodec(srv, utils.CDRServer) // Signal that cdrS is operational
|
||||
return
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
@@ -93,9 +92,12 @@ func (chrS *ChargerService) Start(ctx *context.Context, _ context.CancelFunc) (e
|
||||
defer chrS.Unlock()
|
||||
chrS.chrS = engine.NewChargerService(datadb, filterS, chrS.cfg, chrS.connMgr)
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ChargerS))
|
||||
srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false)
|
||||
srv, _ := engine.NewService(chrS.chrS)
|
||||
// srv, _ := birpc.NewService(apis.NewChargerSv1(chrS.chrS), "", false)
|
||||
if !chrS.cfg.DispatcherSCfg().Enabled {
|
||||
chrS.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
chrS.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
chrS.connChan <- chrS.anz.GetInternalCodec(srv, utils.ChargerS)
|
||||
return
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -82,9 +81,12 @@ func (cS *CoreService) Start(_ *context.Context, shtDw context.CancelFunc) (_ er
|
||||
cS.stopChan = make(chan struct{})
|
||||
cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.fileMem, cS.stopChan, cS.stopMemPrf, cS.shdWg, shtDw)
|
||||
cS.csCh <- cS.cS
|
||||
srv, _ := birpc.NewService(apis.NewCoreSv1(cS.cS), utils.EmptyString, false)
|
||||
srv, _ := engine.NewService(cS.cS)
|
||||
// srv, _ := birpc.NewService(apis.NewCoreSv1(cS.cS), utils.EmptyString, false)
|
||||
if !cS.cfg.DispatcherSCfg().Enabled {
|
||||
cS.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
cS.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
cS.connChan <- cS.anz.GetInternalCodec(srv, utils.CoreS)
|
||||
return
|
||||
|
||||
@@ -19,12 +19,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package services
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/dispatchers"
|
||||
@@ -100,72 +98,14 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
|
||||
|
||||
dspS.unregisterAllDispatchedSubsystems() // unregister all rpc services that can be dispatched
|
||||
|
||||
srv, _ := birpc.NewService(apis.NewDispatcherSv1(dspS.dspS), "", false)
|
||||
dspS.server.RpcRegister(srv)
|
||||
|
||||
attrsv1, _ := birpc.NewServiceWithMethodsRename(dspS.dspS, utils.AttributeSv1, true, func(oldFn string) (newFn string) {
|
||||
if strings.HasPrefix(oldFn, utils.AttributeSv1) {
|
||||
return strings.TrimPrefix(oldFn, utils.AttributeSv1)
|
||||
}
|
||||
return
|
||||
})
|
||||
dspS.server.RpcRegisterName(utils.AttributeSv1, attrsv1)
|
||||
srv, _ := engine.NewDispatcherService(dspS.dspS)
|
||||
// srv, _ := birpc.NewService(apis.NewDispatcherSv1(dspS.dspS), "", false)
|
||||
for _, s := range srv {
|
||||
dspS.server.RpcRegister(s)
|
||||
}
|
||||
// for the moment we dispable Apier through dispatcher
|
||||
// until we figured out a better sollution in case of gob server
|
||||
// dspS.server.SetDispatched()
|
||||
/*
|
||||
|
||||
dspS.server.RpcRegisterName(utils.ThresholdSv1,
|
||||
v1.NewDispatcherThresholdSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.StatSv1,
|
||||
v1.NewDispatcherStatSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.ResourceSv1,
|
||||
v1.NewDispatcherResourceSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.RouteSv1,
|
||||
v1.NewDispatcherRouteSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.AttributeSv1,
|
||||
v1.NewDispatcherAttributeSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.SessionSv1,
|
||||
v1.NewDispatcherSessionSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.ChargerSv1,
|
||||
v1.NewDispatcherChargerSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.CacheSv1,
|
||||
v1.NewDispatcherCacheSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.GuardianSv1,
|
||||
v1.NewDispatcherGuardianSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.CDRsV1,
|
||||
v1.NewDispatcherSCDRsV1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.ConfigSv1,
|
||||
v1.NewDispatcherConfigSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.CoreSv1,
|
||||
v1.NewDispatcherCoreSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.ReplicatorSv1,
|
||||
v1.NewDispatcherReplicatorSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.CDRsV2,
|
||||
v2.NewDispatcherSCDRsV2(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.RateSv1,
|
||||
v1.NewDispatcherRateSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.ActionSv1,
|
||||
v1.NewDispatcherActionSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.AccountSv1,
|
||||
v1.NewDispatcherAccountSv1(dspS.dspS))
|
||||
*/
|
||||
dspS.connChan <- dspS.anz.GetInternalCodec(srv, utils.DispatcherS)
|
||||
|
||||
return
|
||||
|
||||
@@ -24,7 +24,6 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/ees"
|
||||
@@ -63,7 +62,6 @@ type EventExporterService struct {
|
||||
stopChan chan struct{}
|
||||
|
||||
eeS *ees.EventExporterS
|
||||
rpc *apis.EeSv1
|
||||
anz *AnalyzerService
|
||||
srvDep map[string]*sync.WaitGroup
|
||||
}
|
||||
@@ -123,10 +121,12 @@ func (es *EventExporterService) Start(ctx *context.Context, _ context.CancelFunc
|
||||
es.stopChan = make(chan struct{})
|
||||
go es.eeS.ListenAndServe(es.stopChan, es.rldChan)
|
||||
|
||||
es.rpc = apis.NewEeSv1(es.eeS)
|
||||
srv, _ := birpc.NewService(es.rpc, "", false)
|
||||
srv, _ := engine.NewService(es.eeS)
|
||||
// srv, _ := birpc.NewService(es.rpc, "", false)
|
||||
if !es.cfg.DispatcherSCfg().Enabled {
|
||||
es.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
es.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
es.intConnChan <- es.anz.GetInternalCodec(srv, utils.EEs)
|
||||
return
|
||||
|
||||
@@ -31,7 +31,6 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -201,11 +200,14 @@ func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector,
|
||||
|
||||
func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector,
|
||||
cfg *config.CGRConfig, server *cores.Server, anz *AnalyzerService) {
|
||||
cfgSv1, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false)
|
||||
srv, _ := engine.NewService(cfg)
|
||||
// srv, _ := birpc.NewService(apis.NewConfigSv1(cfg), "", false)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(cfgSv1)
|
||||
for _, s := range srv {
|
||||
server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
iConfigCh <- anz.GetInternalCodec(cfgSv1, utils.ConfigSv1)
|
||||
iConfigCh <- anz.GetInternalCodec(srv, utils.ConfigSv1)
|
||||
}
|
||||
|
||||
func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc,
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
@@ -93,9 +92,12 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
|
||||
if err = ldrs.ldrs.ListenAndServe(ldrs.stopChan); err != nil {
|
||||
return
|
||||
}
|
||||
srv, _ := birpc.NewService(apis.NewLoaderSv1(ldrs.ldrs), "", false)
|
||||
srv, _ := engine.NewService(ldrs.ldrs)
|
||||
// srv, _ := birpc.NewService(apis.NewLoaderSv1(ldrs.ldrs), "", false)
|
||||
if !ldrs.cfg.DispatcherSCfg().Enabled {
|
||||
ldrs.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
ldrs.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
ldrs.connChan <- ldrs.anz.GetInternalCodec(srv, utils.LoaderS)
|
||||
return
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -135,9 +134,12 @@ func (rs *RateService) Start(ctx *context.Context, _ context.CancelFunc) (err er
|
||||
rs.stopChan = make(chan struct{})
|
||||
go rs.rateS.ListenAndServe(rs.stopChan, rs.rldChan)
|
||||
|
||||
srv, _ := birpc.NewService(apis.NewRateSv1(rs.rateS), "", false)
|
||||
srv, _ := engine.NewService(rs.rateS)
|
||||
// srv, _ := birpc.NewService(apis.NewRateSv1(rs.rateS), "", false)
|
||||
if !rs.cfg.DispatcherSCfg().Enabled {
|
||||
rs.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
rs.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
rs.intConnChan <- rs.anz.GetInternalCodec(srv, utils.RateS)
|
||||
return
|
||||
|
||||
@@ -24,7 +24,6 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -96,9 +95,12 @@ func (reS *ResourceService) Start(ctx *context.Context, _ context.CancelFunc) (e
|
||||
reS.reS = engine.NewResourceService(datadb, reS.cfg, filterS, reS.connMgr)
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ResourceS))
|
||||
reS.reS.StartLoop(ctx)
|
||||
srv, _ := birpc.NewService(apis.NewResourceSv1(reS.reS), "", false)
|
||||
srv, _ := engine.NewService(reS.reS)
|
||||
// srv, _ := birpc.NewService(apis.NewResourceSv1(reS.reS), "", false)
|
||||
if !reS.cfg.DispatcherSCfg().Enabled {
|
||||
reS.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
reS.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
reS.connChan <- reS.anz.GetInternalCodec(srv, utils.ResourceS)
|
||||
return
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
@@ -94,9 +93,12 @@ func (routeS *RouteService) Start(ctx *context.Context, _ context.CancelFunc) (e
|
||||
routeS.routeS = engine.NewRouteService(datadb, filterS, routeS.cfg, routeS.connMgr)
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.RouteS))
|
||||
srv, _ := birpc.NewService(apis.NewRouteSv1(routeS.routeS), "", false)
|
||||
srv, _ := engine.NewService(routeS.routeS)
|
||||
// srv, _ := birpc.NewService(apis.NewRouteSv1(routeS.routeS), "", false)
|
||||
if !routeS.cfg.DispatcherSCfg().Enabled {
|
||||
routeS.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
routeS.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
routeS.connChan <- routeS.anz.GetInternalCodec(srv, utils.RouteS)
|
||||
return
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
@@ -96,9 +95,12 @@ func (smg *SessionService) Start(ctx *context.Context, shtDw context.CancelFunc)
|
||||
// Pass internal connection via BiRPCClient
|
||||
|
||||
// Register RPC handler
|
||||
srv, _ := birpc.NewService(apis.NewSessionSv1(smg.sm), utils.EmptyString, false) // methods with multiple options
|
||||
srv, _ := engine.NewService(smg.sm) // methods with multiple options
|
||||
// srv, _ := birpc.NewService(apis.NewSessionSv1(smg.sm), utils.EmptyString, false) // methods with multiple options
|
||||
if !smg.cfg.DispatcherSCfg().Enabled {
|
||||
smg.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
smg.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
smg.connChan <- smg.anz.GetInternalCodec(srv, utils.SessionS)
|
||||
// Register BiRpc handlers
|
||||
|
||||
@@ -24,7 +24,6 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -97,9 +96,12 @@ func (sts *StatService) Start(ctx *context.Context, _ context.CancelFunc) (err e
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem",
|
||||
utils.CoreS, utils.StatS))
|
||||
sts.sts.StartLoop(ctx)
|
||||
srv, _ := birpc.NewService(apis.NewStatSv1(sts.sts), "", false)
|
||||
srv, _ := engine.NewService(sts.sts)
|
||||
// srv, _ := birpc.NewService(apis.NewStatSv1(sts.sts), "", false)
|
||||
if !sts.cfg.DispatcherSCfg().Enabled {
|
||||
sts.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
sts.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
sts.connChan <- sts.anz.GetInternalCodec(srv, utils.StatS)
|
||||
return
|
||||
|
||||
@@ -24,7 +24,6 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -96,9 +95,12 @@ func (thrs *ThresholdService) Start(ctx *context.Context, _ context.CancelFunc)
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS))
|
||||
thrs.thrs.StartLoop(ctx)
|
||||
srv, _ := birpc.NewService(apis.NewThresholdSv1(thrs.thrs), "", false)
|
||||
srv, _ := engine.NewService(thrs.thrs)
|
||||
// srv, _ := birpc.NewService(apis.NewThresholdSv1(thrs.thrs), "", false)
|
||||
if !thrs.cfg.DispatcherSCfg().Enabled {
|
||||
thrs.server.RpcRegister(srv)
|
||||
for _, s := range srv {
|
||||
thrs.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
thrs.connChan <- thrs.anz.GetInternalCodec(srv, utils.ThresholdS)
|
||||
return
|
||||
|
||||
@@ -1247,6 +1247,7 @@ const (
|
||||
|
||||
// ConfigSv1 APIs
|
||||
const (
|
||||
ConfigS = "ConfigS"
|
||||
ConfigSv1 = "ConfigSv1"
|
||||
ConfigSv1ReloadConfig = "ConfigSv1.ReloadConfig"
|
||||
ConfigSv1GetConfig = "ConfigSv1.GetConfig"
|
||||
|
||||
Reference in New Issue
Block a user