diff --git a/apis/dispatchers.go b/apis/dispatchers.go new file mode 100644 index 000000000..7c66f332e --- /dev/null +++ b/apis/dispatchers.go @@ -0,0 +1,226 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package apis + +import ( + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/dispatchers" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +/* +// GetDispatcherProfile returns a Dispatcher Profile +func (apierSv1 *APIerSv1) GetDispatcherProfile(arg *utils.TenantID, reply *engine.DispatcherProfile) error { + if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + tnt := arg.Tenant + if tnt == utils.EmptyString { + tnt = apierSv1.Config.GeneralCfg().DefaultTenant + } + dpp, err := apierSv1.DataManager.GetDispatcherProfile(tnt, arg.ID, true, true, utils.NonTransactional) + if err != nil { + return utils.APIErrorHandler(err) + } + *reply = *dpp + return nil +} + +// GetDispatcherProfileIDs returns list of dispatcherProfile IDs registered for a tenant +func (apierSv1 *APIerSv1) GetDispatcherProfileIDs(tenantArg *utils.PaginatorWithTenant, dPrfIDs *[]string) error { + tenant := tenantArg.Tenant + if tenant == utils.EmptyString { + tenant = apierSv1.Config.GeneralCfg().DefaultTenant + } + prfx := utils.DispatcherProfilePrefix + tenant + utils.ConcatenatedKeySep + keys, err := apierSv1.DataManager.DataDB().GetKeysForPrefix(prfx) + if err != nil { + return err + } + if len(keys) == 0 { + return utils.ErrNotFound + } + retIDs := make([]string, len(keys)) + for i, key := range keys { + retIDs[i] = key[len(prfx):] + } + *dPrfIDs = tenantArg.PaginateStringSlice(retIDs) + return nil +} + +type DispatcherWithAPIOpts struct { + *engine.DispatcherProfile + APIOpts map[string]interface{} +} + +//SetDispatcherProfile add/update a new Dispatcher Profile +func (apierSv1 *APIerSv1) SetDispatcherProfile(args *DispatcherWithAPIOpts, reply *string) error { + if missing := utils.MissingStructFields(args.DispatcherProfile, []string{utils.ID, utils.Subsystems}); len(missing) != 0 { + return utils.NewErrMandatoryIeMissing(missing...) + } + if args.Tenant == utils.EmptyString { + args.Tenant = apierSv1.Config.GeneralCfg().DefaultTenant + } + if err := apierSv1.DataManager.SetDispatcherProfile(args.DispatcherProfile, true); err != nil { + return utils.APIErrorHandler(err) + } + //generate a loadID for CacheDispatcherProfiles and store it in database + if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheDispatcherProfiles: time.Now().UnixNano()}); err != nil { + return utils.APIErrorHandler(err) + } + //handle caching for DispatcherProfile + if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, utils.CacheDispatcherProfiles, + args.TenantID(), &args.FilterIDs, args.Subsystems, args.APIOpts); err != nil { + return utils.APIErrorHandler(err) + } + *reply = utils.OK + return nil +} + +//RemoveDispatcherProfile remove a specific Dispatcher Profile +func (apierSv1 *APIerSv1) RemoveDispatcherProfile(arg *utils.TenantIDWithAPIOpts, reply *string) error { + if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + tnt := arg.Tenant + if tnt == utils.EmptyString { + tnt = apierSv1.Config.GeneralCfg().DefaultTenant + } + if err := apierSv1.DataManager.RemoveDispatcherProfile(tnt, + arg.ID, true); err != nil { + return utils.APIErrorHandler(err) + } + //generate a loadID for CacheDispatcherProfiles and store it in database + if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheDispatcherProfiles: time.Now().UnixNano()}); err != nil { + return utils.APIErrorHandler(err) + } + //handle caching for DispatcherProfile + if err := apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), tnt, utils.CacheDispatcherProfiles, + utils.ConcatenatedKey(tnt, arg.ID), nil, nil, arg.APIOpts); err != nil { + return utils.APIErrorHandler(err) + } + *reply = utils.OK + return nil +} + +// GetDispatcherHost returns a Dispatcher Host +func (apierSv1 *APIerSv1) GetDispatcherHost(arg *utils.TenantID, reply *engine.DispatcherHost) error { + if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + tnt := arg.Tenant + if tnt == utils.EmptyString { + tnt = apierSv1.Config.GeneralCfg().DefaultTenant + } + dpp, err := apierSv1.DataManager.GetDispatcherHost(tnt, arg.ID, true, false, utils.NonTransactional) + if err != nil { + return utils.APIErrorHandler(err) + } + *reply = *dpp + return nil +} + +// GetDispatcherHostIDs returns list of dispatcherHost IDs registered for a tenant +func (apierSv1 *APIerSv1) GetDispatcherHostIDs(tenantArg *utils.PaginatorWithTenant, dPrfIDs *[]string) error { + tenant := tenantArg.Tenant + if tenant == utils.EmptyString { + tenant = apierSv1.Config.GeneralCfg().DefaultTenant + } + prfx := utils.DispatcherHostPrefix + tenant + utils.ConcatenatedKeySep + keys, err := apierSv1.DataManager.DataDB().GetKeysForPrefix(prfx) + if err != nil { + return err + } + if len(keys) == 0 { + return utils.ErrNotFound + } + retIDs := make([]string, len(keys)) + for i, key := range keys { + retIDs[i] = key[len(prfx):] + } + *dPrfIDs = tenantArg.PaginateStringSlice(retIDs) + return nil +} + +//SetDispatcherHost add/update a new Dispatcher Host +func (apierSv1 *APIerSv1) SetDispatcherHost(args *engine.DispatcherHostWithAPIOpts, reply *string) error { + if missing := utils.MissingStructFields(args.DispatcherHost, []string{utils.ID}); len(missing) != 0 { + return utils.NewErrMandatoryIeMissing(missing...) + } + if args.Tenant == utils.EmptyString { + args.Tenant = apierSv1.Config.GeneralCfg().DefaultTenant + } + if err := apierSv1.DataManager.SetDispatcherHost(args.DispatcherHost); err != nil { + return utils.APIErrorHandler(err) + } + //generate a loadID for CacheDispatcherHosts and store it in database + if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheDispatcherHosts: time.Now().UnixNano()}); err != nil { + return utils.APIErrorHandler(err) + } + //handle caching for DispatcherProfile + if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, utils.CacheDispatcherHosts, + args.TenantID(), nil, nil, args.APIOpts); err != nil { + return utils.APIErrorHandler(err) + } + *reply = utils.OK + return nil +} + +//RemoveDispatcherHost remove a specific Dispatcher Host +func (apierSv1 *APIerSv1) RemoveDispatcherHost(arg *utils.TenantIDWithAPIOpts, reply *string) error { + if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + tnt := arg.Tenant + if tnt == utils.EmptyString { + tnt = apierSv1.Config.GeneralCfg().DefaultTenant + } + if err := apierSv1.DataManager.RemoveDispatcherHost(tnt, + arg.ID); err != nil { + return utils.APIErrorHandler(err) + } + //generate a loadID for CacheDispatcherHosts and store it in database + if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheDispatcherHosts: time.Now().UnixNano()}); err != nil { + return utils.APIErrorHandler(err) + } + //handle caching for DispatcherProfile + if err := apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), tnt, utils.CacheDispatcherHosts, + utils.ConcatenatedKey(tnt, arg.ID), nil, nil, arg.APIOpts); err != nil { + return utils.APIErrorHandler(err) + } + *reply = utils.OK + return nil +} +*/ + +func NewDispatcherSv1(dS *dispatchers.DispatcherService) *DispatcherSv1 { + return &DispatcherSv1{dS: dS} +} + +type DispatcherSv1 struct { + dS *dispatchers.DispatcherService + ping +} + +// GetProfileForEvent returns the matching dispatcher profile for the provided event +func (dSv1 DispatcherSv1) GetProfilesForEvent(ctx *context.Context, ev *utils.CGREvent, + dPrfl *engine.DispatcherProfiles) error { + return dSv1.dS.V1GetProfilesForEvent(ctx, ev, dPrfl) +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 67d09a8c6..96a374683 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -632,8 +632,8 @@ func main() { // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, shdChan, shdWg, connManager) - attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, anz, srvDep) dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager, anz, srvDep) + attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, anz, dspS, srvDep) dspH := services.NewRegistrarCService(cfg, server, connManager, anz, srvDep) chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server, internalChargerSChan, connManager, anz, srvDep) diff --git a/dispatchers/attributes.go b/dispatchers/attributes.go index 3a01bbcc7..e9f897752 100644 --- a/dispatchers/attributes.go +++ b/dispatchers/attributes.go @@ -19,31 +19,19 @@ along with this program. If not, see package dispatchers import ( + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) // AttributeSv1Ping interrogates AttributeS server responsible to process the event -func (dS *DispatcherService) AttributeSv1Ping(args *utils.CGREvent, +func (dS *DispatcherService) AttributeSv1Ping(ctx *context.Context, args *utils.CGREvent, reply *string) (err error) { - if args == nil { - args = new(utils.CGREvent) - } - tnt := dS.cfg.GeneralCfg().DefaultTenant - if args.Tenant != utils.EmptyString { - tnt = args.Tenant - } - if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { - if err = dS.authorize(utils.AttributeSv1Ping, tnt, - utils.IfaceAsString(args.APIOpts[utils.OptsAPIKey])); err != nil { - return - } - } - return dS.Dispatch(args, utils.MetaAttributes, utils.AttributeSv1Ping, args, reply) + return dS.ping(ctx, utils.MetaAttributes, utils.AttributeSv1Ping, args, reply) } // AttributeSv1GetAttributeForEvent is the dispatcher method for AttributeSv1.GetAttributeForEvent -func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(args *engine.AttrArgsProcessEvent, +func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(ctx *context.Context, args *engine.AttrArgsProcessEvent, reply *engine.AttributeProfile) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString { @@ -59,7 +47,7 @@ func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(args *engine.AttrA } // AttributeSv1ProcessEvent . -func (dS *DispatcherService) AttributeSv1ProcessEvent(args *engine.AttrArgsProcessEvent, +func (dS *DispatcherService) AttributeSv1ProcessEvent(ctx *context.Context, args *engine.AttrArgsProcessEvent, reply *engine.AttrSProcessEventReply) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString { diff --git a/dispatchers/attributes_test.go b/dispatchers/attributes_test.go index a3df439f2..5557aa524 100644 --- a/dispatchers/attributes_test.go +++ b/dispatchers/attributes_test.go @@ -21,6 +21,7 @@ package dispatchers import ( "testing" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -31,7 +32,7 @@ func TestDspAttributeSv1PingError(t *testing.T) { dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) cgrEvent := &utils.CGREvent{} var reply *string - err := dspSrv.AttributeSv1Ping(cgrEvent, reply) + err := dspSrv.AttributeSv1Ping(context.Background(), cgrEvent, reply) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -45,7 +46,7 @@ func TestDspAttributeSv1PingErrorTenant(t *testing.T) { Tenant: "tenant", } var reply *string - err := dspSrv.AttributeSv1Ping(cgrEvent, reply) + err := dspSrv.AttributeSv1Ping(context.Background(), cgrEvent, reply) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -56,7 +57,7 @@ func TestDspAttributeSv1PingErrorNil(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) var reply *string - err := dspSrv.AttributeSv1Ping(nil, reply) + err := dspSrv.AttributeSv1Ping(context.Background(), nil, reply) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -72,7 +73,7 @@ func TestDspAttributeSv1PingErrorAttributeSConns(t *testing.T) { ID: "ID", } var reply *string - err := dspSrv.AttributeSv1Ping(cgrEvent, reply) + err := dspSrv.AttributeSv1Ping(context.Background(), cgrEvent, reply) expected := "MANDATORY_IE_MISSING: [ApiKey]" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -87,7 +88,7 @@ func TestDspAttributeSv1GetAttributeForEventError(t *testing.T) { CGREvent: &utils.CGREvent{}, } var reply *engine.AttributeProfile - err := dspSrv.AttributeSv1GetAttributeForEvent(processEvent, reply) + err := dspSrv.AttributeSv1GetAttributeForEvent(context.Background(), processEvent, reply) expected := "MANDATORY_IE_MISSING: [ApiKey]" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -104,7 +105,7 @@ func TestDspAttributeSv1GetAttributeForEventErrorTenant(t *testing.T) { }, } var reply *engine.AttributeProfile - err := dspSrv.AttributeSv1GetAttributeForEvent(processEvent, reply) + err := dspSrv.AttributeSv1GetAttributeForEvent(context.Background(), processEvent, reply) expected := "MANDATORY_IE_MISSING: [ApiKey]" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -121,7 +122,7 @@ func TestDspAttributeSv1GetAttributeForEventErrorAttributeS(t *testing.T) { } var reply *engine.AttributeProfile - err := dspSrv.AttributeSv1GetAttributeForEvent(processEvent, reply) + err := dspSrv.AttributeSv1GetAttributeForEvent(context.Background(), processEvent, reply) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -138,7 +139,7 @@ func TestDspAttributeSv1ProcessEventError(t *testing.T) { } var reply *engine.AttrSProcessEventReply - err := dspSrv.AttributeSv1ProcessEvent(processEvent, reply) + err := dspSrv.AttributeSv1ProcessEvent(context.Background(), processEvent, reply) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -156,7 +157,7 @@ func TestDspAttributeSv1ProcessEventErrorAttributeSConns(t *testing.T) { } var reply *engine.AttrSProcessEventReply - err := dspSrv.AttributeSv1ProcessEvent(processEvent, reply) + err := dspSrv.AttributeSv1ProcessEvent(context.Background(), processEvent, reply) expected := "MANDATORY_IE_MISSING: [ApiKey]" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) diff --git a/dispatchers/cdrs_it_test.go b/dispatchers/cdrs_it_test.go index f4d0bca14..3432b90ca 100644 --- a/dispatchers/cdrs_it_test.go +++ b/dispatchers/cdrs_it_test.go @@ -31,29 +31,29 @@ import ( var ( sTestsDspCDRs = []func(t *testing.T){ testDspCDRsPing, - // testDspCDRsProcessEvent, - // testDspCDRsCountCDR, - // testDspCDRsGetCDR, - // testDspCDRsGetCDRWithoutTenant, - // testDspCDRsProcessCDR, - // testDspCDRsGetCDR2, - // testDspCDRsProcessExternalCDR, - // testDspCDRsGetCDR3, - // testDspCDRsV2ProcessEvent, + testDspCDRsProcessEvent, + testDspCDRsCountCDR, + testDspCDRsGetCDR, + testDspCDRsGetCDRWithoutTenant, + testDspCDRsProcessCDR, + testDspCDRsGetCDR2, + testDspCDRsProcessExternalCDR, + testDspCDRsGetCDR3, + testDspCDRsV2ProcessEvent, // testDspCDRsV2StoreSessionCost, } sTestsDspCDRsWithoutAuth = []func(t *testing.T){ testDspCDRsPingNoAuth, - // testDspCDRsProcessEventNoAuth, - // testDspCDRsCountCDRNoAuth, - // testDspCDRsGetCDRNoAuth, - // testDspCDRsGetCDRNoAuthWithoutTenant, - // testDspCDRsProcessCDRNoAuth, - // testDspCDRsGetCDR2NoAuth, - // testDspCDRsProcessExternalCDRNoAuth, - // testDspCDRsGetCDR3NoAuth, - // testDspCDRsV2ProcessEventNoAuth, + testDspCDRsProcessEventNoAuth, + testDspCDRsCountCDRNoAuth, + testDspCDRsGetCDRNoAuth, + testDspCDRsGetCDRNoAuthWithoutTenant, + testDspCDRsProcessCDRNoAuth, + testDspCDRsGetCDR2NoAuth, + testDspCDRsProcessExternalCDRNoAuth, + testDspCDRsGetCDR3NoAuth, + testDspCDRsV2ProcessEventNoAuth, // testDspCDRsV2StoreSessionCostNoAuth, } ) diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 0c7dff941..85bba6991 100644 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -101,11 +101,11 @@ func (dS *DispatcherService) authorize(method, tenant string, apiKey string) (er // dispatcherForEvent returns a dispatcher instance configured for specific event // or utils.ErrNotFound if none present -func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CGREvent, +func (dS *DispatcherService) dispatcherProfilesForEvent(ctx *context.Context, tnt string, ev *utils.CGREvent, evNm utils.MapStorage) (dPrlfs engine.DispatcherProfiles, err error) { // find out the matching profiles var prflIDs utils.StringSet - if prflIDs, err = engine.MatchingItemIDsForEvent(context.TODO(), evNm, + if prflIDs, err = engine.MatchingItemIDsForEvent(ctx, evNm, dS.cfg.DispatcherSCfg().StringIndexedFields, dS.cfg.DispatcherSCfg().PrefixIndexedFields, dS.cfg.DispatcherSCfg().SuffixIndexedFields, @@ -116,7 +116,7 @@ func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CG return } for prflID := range prflIDs { - prfl, err := dS.dm.GetDispatcherProfile(context.TODO(), tnt, prflID, true, true, utils.NonTransactional) + prfl, err := dS.dm.GetDispatcherProfile(ctx, tnt, prflID, true, true, utils.NonTransactional) if err != nil { if err != utils.ErrNotFound { return nil, err @@ -124,7 +124,7 @@ func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CG continue } - if pass, err := dS.fltrS.Pass(context.TODO(), tnt, prfl.FilterIDs, + if pass, err := dS.fltrS.Pass(ctx, tnt, prfl.FilterIDs, evNm); err != nil { return nil, err } else if !pass { @@ -165,7 +165,7 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, }, } var dPrfls engine.DispatcherProfiles - if dPrfls, err = dS.dispatcherProfilesForEvent(tnt, ev, evNm); err != nil { + if dPrfls, err = dS.dispatcherProfilesForEvent(context.TODO(), tnt, ev, evNm); err != nil { return utils.NewErrDispatcherS(err) } for _, dPrfl := range dPrfls { @@ -188,13 +188,13 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, return // return the last error } -func (dS *DispatcherService) V1GetProfilesForEvent(ev *utils.CGREvent, +func (dS *DispatcherService) V1GetProfilesForEvent(ctx *context.Context, ev *utils.CGREvent, dPfl *engine.DispatcherProfiles) (err error) { tnt := ev.Tenant if tnt == utils.EmptyString { tnt = dS.cfg.GeneralCfg().DefaultTenant } - retDPfl, errDpfl := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + retDPfl, errDpfl := dS.dispatcherProfilesForEvent(ctx, tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -208,3 +208,21 @@ func (dS *DispatcherService) V1GetProfilesForEvent(ev *utils.CGREvent, *dPfl = retDPfl return } + +func (dS *DispatcherService) ping(ctx *context.Context, subsys, method string, args *utils.CGREvent, + reply *string) (err error) { + if args == nil { + args = new(utils.CGREvent) + } + tnt := dS.cfg.GeneralCfg().DefaultTenant + if args.Tenant != utils.EmptyString { + tnt = args.Tenant + } + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { + if err = dS.authorize(method, tnt, + utils.IfaceAsString(args.APIOpts[utils.OptsAPIKey])); err != nil { + return + } + } + return dS.Dispatch(args, subsys, method, args, reply) +} diff --git a/dispatchers/dispatchers_test.go b/dispatchers/dispatchers_test.go index 98b3945e8..80367ca8c 100644 --- a/dispatchers/dispatchers_test.go +++ b/dispatchers/dispatchers_test.go @@ -79,7 +79,7 @@ func TestDispatcherServiceDispatcherProfileForEventGetDispatcherProfileNF(t *tes } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + _, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -103,7 +103,7 @@ func TestDispatcherServiceDispatcherProfileForEventMIIDENotFound(t *testing.T) { ev := &utils.CGREvent{} tnt := "" subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err := dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + _, err := dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -156,7 +156,7 @@ func TestDispatcherV1GetProfileForEventErr(t *testing.T) { dsp := NewDispatcherService(nil, cfg, nil, nil) ev := &utils.CGREvent{} dPfl := &engine.DispatcherProfiles{} - err := dsp.V1GetProfilesForEvent(ev, dPfl) + err := dsp.V1GetProfilesForEvent(context.Background(), ev, dPfl) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -169,7 +169,7 @@ func TestDispatcherV1GetProfileForEvent(t *testing.T) { dsp := NewDispatcherService(nil, cfg, nil, nil) ev := &utils.CGREvent{} dPfl := &engine.DispatcherProfiles{} - err := dsp.V1GetProfilesForEvent(ev, dPfl) + err := dsp.V1GetProfilesForEvent(context.Background(), ev, dPfl) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -527,7 +527,7 @@ func TestDispatcherServiceDispatcherProfileForEventErrNil(t *testing.T) { } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + _, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -574,7 +574,7 @@ func TestDispatcherV1GetProfileForEventReturn(t *testing.T) { } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + _, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -585,7 +585,7 @@ func TestDispatcherV1GetProfileForEventReturn(t *testing.T) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } dPfl := &engine.DispatcherProfiles{} - err = dss.V1GetProfilesForEvent(ev, dPfl) + err = dss.V1GetProfilesForEvent(context.Background(), ev, dPfl) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) @@ -627,7 +627,7 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFound(t *testing.T) { } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + _, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -674,7 +674,7 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFound2(t *testing.T) { } tnt := "" subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + _, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -722,7 +722,7 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFoundFilter(t *testing. } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + _, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -864,7 +864,7 @@ func TestDispatcherServiceDispatcherProfileForEventFoundFilter(t *testing.T) { } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + _, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -888,7 +888,7 @@ func TestDispatcherServiceDispatcherProfileForEventNotNotFound(t *testing.T) { if cnt == 0 { cnt++ return map[string]utils.StringSet{ - idxKey: utils.StringSet{"cgrates.org:dsp1": {}}, + idxKey: {"cgrates.org:dsp1": {}}, }, nil } return nil, utils.ErrNotImplemented @@ -910,7 +910,7 @@ func TestDispatcherServiceDispatcherProfileForEventNotNotFound(t *testing.T) { } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err := dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + _, err := dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -959,7 +959,7 @@ func TestDispatcherServiceDispatcherProfileForEventGetDispatcherError(t *testing } tnt := ev.Tenant subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys]) - _, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + _, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -1083,11 +1083,6 @@ func TestDispatcherServiceDispatchDspErrHostNotFound3(t *testing.T) { engine.Cache = cacheInit } -func (dS *DispatcherService) DispatcherServiceTest(ev *utils.CGREvent, reply *string) (error, interface{}) { - *reply = utils.Pong - return nil, nil -} - func TestDispatchersdispatcherProfileForEventAnySSfalseFirstNotFound(t *testing.T) { tmp := engine.Cache defer func() { @@ -1141,7 +1136,7 @@ func TestDispatchersdispatcherProfileForEventAnySSfalseFirstNotFound(t *testing. } subsys := utils.MetaSessionS - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + if rcv, err := dS.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -1209,7 +1204,7 @@ func TestDispatchersdispatcherProfileForEventAnySSfalseFound(t *testing.T) { } subsys := utils.MetaSessionS - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + if rcv, err := dS.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -1277,7 +1272,7 @@ func TestDispatchersdispatcherProfileForEventAnySSfalseNotFound(t *testing.T) { } subsys := utils.MetaSessionS - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + if rcv, err := dS.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -1343,7 +1338,7 @@ func TestDispatchersdispatcherProfileForEventAnySStrueNotFound(t *testing.T) { } subsys := utils.MetaSessionS - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + if rcv, err := dS.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -1409,7 +1404,7 @@ func TestDispatchersdispatcherProfileForEventAnySStrueBothFound(t *testing.T) { } subsys := utils.MetaSessionS - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + if rcv, err := dS.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ @@ -1429,7 +1424,7 @@ func TestDispatchersdispatcherProfileForEventAnySStrueBothFound(t *testing.T) { t.Error(err) } - if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{ + if rcv, err := dS.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, utils.MetaVars: utils.MapStorage{ diff --git a/dispatchers/routes_it_test.go b/dispatchers/routes_it_test.go index 99e710917..68916d642 100644 --- a/dispatchers/routes_it_test.go +++ b/dispatchers/routes_it_test.go @@ -33,15 +33,14 @@ import ( var ( sTestsDspSup = []func(t *testing.T){ testDspSupPingFailover, - // testDspSupGetSupFailover, - // testDspSupGetSupRoundRobin, + testDspSupGetSupFailover, + testDspSupGetSupRoundRobin, testDspSupPing, testDspSupTestAuthKey, - // testDspSupTestAuthKey2, + testDspSupTestAuthKey2, testDspSupGetSupplierForEvent, } - nowTime = time.Now() ) //Test start here diff --git a/engine/libengine.go b/engine/libengine.go index 99b2ec368..86edfbeeb 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -134,11 +134,3 @@ func (s RPCClientSet) Call(ctx *context.Context, method string, args interface{} } return conn.Call(ctx, method, args, reply) } - -// func (s RPCClientSet) ReconnectInternals(subsystems ...string) (err error) { -// for _, subsystem := range subsystems { -// if err = s[subsystem].Reconnect(); err != nil { -// return -// } -// } -// } diff --git a/services/attributes.go b/services/attributes.go index efae1e962..7d67cea49 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -35,7 +35,7 @@ import ( func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *cores.Server, internalChan chan birpc.ClientConnector, - anz *AnalyzerService, + anz *AnalyzerService, dspS *DispatcherService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &AttributeService{ connChan: internalChan, @@ -46,6 +46,7 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, server: server, anz: anz, srvDep: srvDep, + dspS: dspS, } } @@ -62,6 +63,7 @@ type AttributeService struct { rpc *apis.AttributeSv1 // useful on restart connChan chan birpc.ClientConnector // publish the internal Subsystem when available anz *AnalyzerService + dspS *DispatcherService srvDep map[string]*sync.WaitGroup } @@ -89,6 +91,16 @@ func (attrS *AttributeService) Start() (err error) { if !attrS.cfg.DispatcherSCfg().Enabled { attrS.server.RpcRegister(srv) } + dspShtdChan := attrS.dspS.RegisterShutdownChan(attrS.ServiceName()) + go func() { + for { + if _, closed := <-dspShtdChan; closed { + return + } + attrS.server.RpcRegister(srv) + + } + }() attrS.connChan <- attrS.anz.GetInternalCodec(srv, utils.AttributeS) return } @@ -106,6 +118,7 @@ func (attrS *AttributeService) Shutdown() (err error) { attrS.rpc = nil <-attrS.connChan attrS.server.RpcUnregisterName(utils.AttributeSv1) + attrS.dspS.UnregisterShutdownChan(attrS.ServiceName()) attrS.Unlock() return } diff --git a/services/dispatchers.go b/services/dispatchers.go index 760717000..ceec10301 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -19,14 +19,15 @@ along with this program. If not, see package services import ( + "strings" "sync" "github.com/cgrates/birpc" + "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/dispatchers" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" ) @@ -35,7 +36,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *cores.Server, internalChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + srvDep map[string]*sync.WaitGroup) *DispatcherService { return &DispatcherService{ connChan: internalChan, cfg: cfg, @@ -46,6 +47,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService, connMgr: connMgr, anz: anz, srvDep: srvDep, + srvsReload: make(map[string]chan struct{}), } } @@ -59,11 +61,12 @@ type DispatcherService struct { server *cores.Server connMgr *engine.ConnManager - dspS *dispatchers.DispatcherService - // rpc *v1.DispatcherSv1 + dspS *dispatchers.DispatcherService connChan chan birpc.ClientConnector anz *AnalyzerService srvDep map[string]*sync.WaitGroup + + srvsReload map[string]chan struct{} } // Start should handle the sercive start @@ -86,11 +89,22 @@ func (dspS *DispatcherService) Start() (err error) { dspS.dspS = dispatchers.NewDispatcherService(datadb, dspS.cfg, fltrS, dspS.connMgr) + dspS.unregisterAllDispatchedSubsystems() // unregister all rpc services that can be dispatched + + srv, _ := birpc.NewService(apis.NewDispatcherSv1(dspS.dspS), "", false) + dspS.server.RpcRegister(srv) + + attrsv1, _ := birpc.NewServiceWithMethodsRename(dspS.dspS, utils.AttributeSv1, true, func(oldFn string) (newFn string) { + if strings.HasPrefix(oldFn, utils.AttributeSv1) { + return strings.TrimPrefix(oldFn, utils.AttributeSv1) + } + return + }) + dspS.server.RpcRegisterName(utils.AttributeSv1, attrsv1) // for the moment we dispable Apier through dispatcher // until we figured out a better sollution in case of gob server // dspS.server.SetDispatched() /* - dspS.server.RpcRegister(v1.NewDispatcherSv1(dspS.dspS)) dspS.server.RpcRegisterName(utils.ThresholdSv1, v1.NewDispatcherThresholdSv1(dspS.dspS)) @@ -143,7 +157,7 @@ func (dspS *DispatcherService) Start() (err error) { dspS.server.RpcRegisterName(utils.AccountSv1, v1.NewDispatcherAccountSv1(dspS.dspS)) */ - // dspS.connChan <- dspS.anz.GetInternalCodec(dspS.dspS, utils.DispatcherS) + dspS.connChan <- dspS.anz.GetInternalCodec(srv, utils.DispatcherS) return } @@ -159,8 +173,9 @@ func (dspS *DispatcherService) Shutdown() (err error) { defer dspS.Unlock() dspS.dspS.Shutdown() dspS.dspS = nil - // dspS.rpc = nil - //<-dspS.connChan + <-dspS.connChan + dspS.unregisterAllDispatchedSubsystems() + dspS.sync() return } @@ -180,3 +195,28 @@ func (dspS *DispatcherService) ServiceName() string { func (dspS *DispatcherService) ShouldRun() bool { return dspS.cfg.DispatcherSCfg().Enabled } + +func (dspS *DispatcherService) unregisterAllDispatchedSubsystems() { + dspS.server.RpcUnregisterName(utils.AttributeSv1) +} + +func (dspS *DispatcherService) RegisterShutdownChan(subsys string) (c chan struct{}) { + c = make(chan struct{}) + dspS.Lock() + dspS.srvsReload[subsys] = c + dspS.Unlock() + return +} + +func (dspS *DispatcherService) UnregisterShutdownChan(subsys string) { + dspS.Lock() + close(dspS.srvsReload[subsys]) + delete(dspS.srvsReload, subsys) + dspS.Unlock() +} + +func (dspS *DispatcherService) sync() { + for _, c := range dspS.srvsReload { + c <- struct{}{} + } +}