Added some dispatcherS APIs

This commit is contained in:
Trial97
2021-06-14 16:55:22 +03:00
committed by Dan Christian Bogos
parent 6b0ae4859d
commit e5a1e7553a
11 changed files with 370 additions and 98 deletions

226
apis/dispatchers.go Normal file
View File

@@ -0,0 +1,226 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package apis
import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
/*
// GetDispatcherProfile returns a Dispatcher Profile
func (apierSv1 *APIerSv1) GetDispatcherProfile(arg *utils.TenantID, reply *engine.DispatcherProfile) error {
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
tnt := arg.Tenant
if tnt == utils.EmptyString {
tnt = apierSv1.Config.GeneralCfg().DefaultTenant
}
dpp, err := apierSv1.DataManager.GetDispatcherProfile(tnt, arg.ID, true, true, utils.NonTransactional)
if err != nil {
return utils.APIErrorHandler(err)
}
*reply = *dpp
return nil
}
// GetDispatcherProfileIDs returns list of dispatcherProfile IDs registered for a tenant
func (apierSv1 *APIerSv1) GetDispatcherProfileIDs(tenantArg *utils.PaginatorWithTenant, dPrfIDs *[]string) error {
tenant := tenantArg.Tenant
if tenant == utils.EmptyString {
tenant = apierSv1.Config.GeneralCfg().DefaultTenant
}
prfx := utils.DispatcherProfilePrefix + tenant + utils.ConcatenatedKeySep
keys, err := apierSv1.DataManager.DataDB().GetKeysForPrefix(prfx)
if err != nil {
return err
}
if len(keys) == 0 {
return utils.ErrNotFound
}
retIDs := make([]string, len(keys))
for i, key := range keys {
retIDs[i] = key[len(prfx):]
}
*dPrfIDs = tenantArg.PaginateStringSlice(retIDs)
return nil
}
type DispatcherWithAPIOpts struct {
*engine.DispatcherProfile
APIOpts map[string]interface{}
}
//SetDispatcherProfile add/update a new Dispatcher Profile
func (apierSv1 *APIerSv1) SetDispatcherProfile(args *DispatcherWithAPIOpts, reply *string) error {
if missing := utils.MissingStructFields(args.DispatcherProfile, []string{utils.ID, utils.Subsystems}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
if args.Tenant == utils.EmptyString {
args.Tenant = apierSv1.Config.GeneralCfg().DefaultTenant
}
if err := apierSv1.DataManager.SetDispatcherProfile(args.DispatcherProfile, true); err != nil {
return utils.APIErrorHandler(err)
}
//generate a loadID for CacheDispatcherProfiles and store it in database
if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheDispatcherProfiles: time.Now().UnixNano()}); err != nil {
return utils.APIErrorHandler(err)
}
//handle caching for DispatcherProfile
if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, utils.CacheDispatcherProfiles,
args.TenantID(), &args.FilterIDs, args.Subsystems, args.APIOpts); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return nil
}
//RemoveDispatcherProfile remove a specific Dispatcher Profile
func (apierSv1 *APIerSv1) RemoveDispatcherProfile(arg *utils.TenantIDWithAPIOpts, reply *string) error {
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
tnt := arg.Tenant
if tnt == utils.EmptyString {
tnt = apierSv1.Config.GeneralCfg().DefaultTenant
}
if err := apierSv1.DataManager.RemoveDispatcherProfile(tnt,
arg.ID, true); err != nil {
return utils.APIErrorHandler(err)
}
//generate a loadID for CacheDispatcherProfiles and store it in database
if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheDispatcherProfiles: time.Now().UnixNano()}); err != nil {
return utils.APIErrorHandler(err)
}
//handle caching for DispatcherProfile
if err := apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), tnt, utils.CacheDispatcherProfiles,
utils.ConcatenatedKey(tnt, arg.ID), nil, nil, arg.APIOpts); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return nil
}
// GetDispatcherHost returns a Dispatcher Host
func (apierSv1 *APIerSv1) GetDispatcherHost(arg *utils.TenantID, reply *engine.DispatcherHost) error {
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
tnt := arg.Tenant
if tnt == utils.EmptyString {
tnt = apierSv1.Config.GeneralCfg().DefaultTenant
}
dpp, err := apierSv1.DataManager.GetDispatcherHost(tnt, arg.ID, true, false, utils.NonTransactional)
if err != nil {
return utils.APIErrorHandler(err)
}
*reply = *dpp
return nil
}
// GetDispatcherHostIDs returns list of dispatcherHost IDs registered for a tenant
func (apierSv1 *APIerSv1) GetDispatcherHostIDs(tenantArg *utils.PaginatorWithTenant, dPrfIDs *[]string) error {
tenant := tenantArg.Tenant
if tenant == utils.EmptyString {
tenant = apierSv1.Config.GeneralCfg().DefaultTenant
}
prfx := utils.DispatcherHostPrefix + tenant + utils.ConcatenatedKeySep
keys, err := apierSv1.DataManager.DataDB().GetKeysForPrefix(prfx)
if err != nil {
return err
}
if len(keys) == 0 {
return utils.ErrNotFound
}
retIDs := make([]string, len(keys))
for i, key := range keys {
retIDs[i] = key[len(prfx):]
}
*dPrfIDs = tenantArg.PaginateStringSlice(retIDs)
return nil
}
//SetDispatcherHost add/update a new Dispatcher Host
func (apierSv1 *APIerSv1) SetDispatcherHost(args *engine.DispatcherHostWithAPIOpts, reply *string) error {
if missing := utils.MissingStructFields(args.DispatcherHost, []string{utils.ID}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
if args.Tenant == utils.EmptyString {
args.Tenant = apierSv1.Config.GeneralCfg().DefaultTenant
}
if err := apierSv1.DataManager.SetDispatcherHost(args.DispatcherHost); err != nil {
return utils.APIErrorHandler(err)
}
//generate a loadID for CacheDispatcherHosts and store it in database
if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheDispatcherHosts: time.Now().UnixNano()}); err != nil {
return utils.APIErrorHandler(err)
}
//handle caching for DispatcherProfile
if err := apierSv1.CallCache(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), args.Tenant, utils.CacheDispatcherHosts,
args.TenantID(), nil, nil, args.APIOpts); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return nil
}
//RemoveDispatcherHost remove a specific Dispatcher Host
func (apierSv1 *APIerSv1) RemoveDispatcherHost(arg *utils.TenantIDWithAPIOpts, reply *string) error {
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
tnt := arg.Tenant
if tnt == utils.EmptyString {
tnt = apierSv1.Config.GeneralCfg().DefaultTenant
}
if err := apierSv1.DataManager.RemoveDispatcherHost(tnt,
arg.ID); err != nil {
return utils.APIErrorHandler(err)
}
//generate a loadID for CacheDispatcherHosts and store it in database
if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{utils.CacheDispatcherHosts: time.Now().UnixNano()}); err != nil {
return utils.APIErrorHandler(err)
}
//handle caching for DispatcherProfile
if err := apierSv1.CallCache(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), tnt, utils.CacheDispatcherHosts,
utils.ConcatenatedKey(tnt, arg.ID), nil, nil, arg.APIOpts); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return nil
}
*/
func NewDispatcherSv1(dS *dispatchers.DispatcherService) *DispatcherSv1 {
return &DispatcherSv1{dS: dS}
}
type DispatcherSv1 struct {
dS *dispatchers.DispatcherService
ping
}
// GetProfileForEvent returns the matching dispatcher profile for the provided event
func (dSv1 DispatcherSv1) GetProfilesForEvent(ctx *context.Context, ev *utils.CGREvent,
dPrfl *engine.DispatcherProfiles) error {
return dSv1.dS.V1GetProfilesForEvent(ctx, ev, dPrfl)
}

View File

@@ -632,8 +632,8 @@ func main() {
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, shdChan, shdWg, connManager)
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, anz, srvDep)
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager, anz, srvDep)
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, anz, dspS, srvDep)
dspH := services.NewRegistrarCService(cfg, server, connManager, anz, srvDep)
chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server,
internalChargerSChan, connManager, anz, srvDep)

View File

@@ -19,31 +19,19 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package dispatchers
import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// AttributeSv1Ping interrogates AttributeS server responsible to process the event
func (dS *DispatcherService) AttributeSv1Ping(args *utils.CGREvent,
func (dS *DispatcherService) AttributeSv1Ping(ctx *context.Context, args *utils.CGREvent,
reply *string) (err error) {
if args == nil {
args = new(utils.CGREvent)
}
tnt := dS.cfg.GeneralCfg().DefaultTenant
if args.Tenant != utils.EmptyString {
tnt = args.Tenant
}
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if err = dS.authorize(utils.AttributeSv1Ping, tnt,
utils.IfaceAsString(args.APIOpts[utils.OptsAPIKey])); err != nil {
return
}
}
return dS.Dispatch(args, utils.MetaAttributes, utils.AttributeSv1Ping, args, reply)
return dS.ping(ctx, utils.MetaAttributes, utils.AttributeSv1Ping, args, reply)
}
// AttributeSv1GetAttributeForEvent is the dispatcher method for AttributeSv1.GetAttributeForEvent
func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(args *engine.AttrArgsProcessEvent,
func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(ctx *context.Context, args *engine.AttrArgsProcessEvent,
reply *engine.AttributeProfile) (err error) {
tnt := dS.cfg.GeneralCfg().DefaultTenant
if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString {
@@ -59,7 +47,7 @@ func (dS *DispatcherService) AttributeSv1GetAttributeForEvent(args *engine.AttrA
}
// AttributeSv1ProcessEvent .
func (dS *DispatcherService) AttributeSv1ProcessEvent(args *engine.AttrArgsProcessEvent,
func (dS *DispatcherService) AttributeSv1ProcessEvent(ctx *context.Context, args *engine.AttrArgsProcessEvent,
reply *engine.AttrSProcessEventReply) (err error) {
tnt := dS.cfg.GeneralCfg().DefaultTenant
if args.CGREvent != nil && args.CGREvent.Tenant != utils.EmptyString {

View File

@@ -21,6 +21,7 @@ package dispatchers
import (
"testing"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -31,7 +32,7 @@ func TestDspAttributeSv1PingError(t *testing.T) {
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
cgrEvent := &utils.CGREvent{}
var reply *string
err := dspSrv.AttributeSv1Ping(cgrEvent, reply)
err := dspSrv.AttributeSv1Ping(context.Background(), cgrEvent, reply)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -45,7 +46,7 @@ func TestDspAttributeSv1PingErrorTenant(t *testing.T) {
Tenant: "tenant",
}
var reply *string
err := dspSrv.AttributeSv1Ping(cgrEvent, reply)
err := dspSrv.AttributeSv1Ping(context.Background(), cgrEvent, reply)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -56,7 +57,7 @@ func TestDspAttributeSv1PingErrorNil(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
var reply *string
err := dspSrv.AttributeSv1Ping(nil, reply)
err := dspSrv.AttributeSv1Ping(context.Background(), nil, reply)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -72,7 +73,7 @@ func TestDspAttributeSv1PingErrorAttributeSConns(t *testing.T) {
ID: "ID",
}
var reply *string
err := dspSrv.AttributeSv1Ping(cgrEvent, reply)
err := dspSrv.AttributeSv1Ping(context.Background(), cgrEvent, reply)
expected := "MANDATORY_IE_MISSING: [ApiKey]"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -87,7 +88,7 @@ func TestDspAttributeSv1GetAttributeForEventError(t *testing.T) {
CGREvent: &utils.CGREvent{},
}
var reply *engine.AttributeProfile
err := dspSrv.AttributeSv1GetAttributeForEvent(processEvent, reply)
err := dspSrv.AttributeSv1GetAttributeForEvent(context.Background(), processEvent, reply)
expected := "MANDATORY_IE_MISSING: [ApiKey]"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -104,7 +105,7 @@ func TestDspAttributeSv1GetAttributeForEventErrorTenant(t *testing.T) {
},
}
var reply *engine.AttributeProfile
err := dspSrv.AttributeSv1GetAttributeForEvent(processEvent, reply)
err := dspSrv.AttributeSv1GetAttributeForEvent(context.Background(), processEvent, reply)
expected := "MANDATORY_IE_MISSING: [ApiKey]"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -121,7 +122,7 @@ func TestDspAttributeSv1GetAttributeForEventErrorAttributeS(t *testing.T) {
}
var reply *engine.AttributeProfile
err := dspSrv.AttributeSv1GetAttributeForEvent(processEvent, reply)
err := dspSrv.AttributeSv1GetAttributeForEvent(context.Background(), processEvent, reply)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -138,7 +139,7 @@ func TestDspAttributeSv1ProcessEventError(t *testing.T) {
}
var reply *engine.AttrSProcessEventReply
err := dspSrv.AttributeSv1ProcessEvent(processEvent, reply)
err := dspSrv.AttributeSv1ProcessEvent(context.Background(), processEvent, reply)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -156,7 +157,7 @@ func TestDspAttributeSv1ProcessEventErrorAttributeSConns(t *testing.T) {
}
var reply *engine.AttrSProcessEventReply
err := dspSrv.AttributeSv1ProcessEvent(processEvent, reply)
err := dspSrv.AttributeSv1ProcessEvent(context.Background(), processEvent, reply)
expected := "MANDATORY_IE_MISSING: [ApiKey]"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)

View File

@@ -31,29 +31,29 @@ import (
var (
sTestsDspCDRs = []func(t *testing.T){
testDspCDRsPing,
// testDspCDRsProcessEvent,
// testDspCDRsCountCDR,
// testDspCDRsGetCDR,
// testDspCDRsGetCDRWithoutTenant,
// testDspCDRsProcessCDR,
// testDspCDRsGetCDR2,
// testDspCDRsProcessExternalCDR,
// testDspCDRsGetCDR3,
// testDspCDRsV2ProcessEvent,
testDspCDRsProcessEvent,
testDspCDRsCountCDR,
testDspCDRsGetCDR,
testDspCDRsGetCDRWithoutTenant,
testDspCDRsProcessCDR,
testDspCDRsGetCDR2,
testDspCDRsProcessExternalCDR,
testDspCDRsGetCDR3,
testDspCDRsV2ProcessEvent,
// testDspCDRsV2StoreSessionCost,
}
sTestsDspCDRsWithoutAuth = []func(t *testing.T){
testDspCDRsPingNoAuth,
// testDspCDRsProcessEventNoAuth,
// testDspCDRsCountCDRNoAuth,
// testDspCDRsGetCDRNoAuth,
// testDspCDRsGetCDRNoAuthWithoutTenant,
// testDspCDRsProcessCDRNoAuth,
// testDspCDRsGetCDR2NoAuth,
// testDspCDRsProcessExternalCDRNoAuth,
// testDspCDRsGetCDR3NoAuth,
// testDspCDRsV2ProcessEventNoAuth,
testDspCDRsProcessEventNoAuth,
testDspCDRsCountCDRNoAuth,
testDspCDRsGetCDRNoAuth,
testDspCDRsGetCDRNoAuthWithoutTenant,
testDspCDRsProcessCDRNoAuth,
testDspCDRsGetCDR2NoAuth,
testDspCDRsProcessExternalCDRNoAuth,
testDspCDRsGetCDR3NoAuth,
testDspCDRsV2ProcessEventNoAuth,
// testDspCDRsV2StoreSessionCostNoAuth,
}
)

View File

@@ -101,11 +101,11 @@ func (dS *DispatcherService) authorize(method, tenant string, apiKey string) (er
// dispatcherForEvent returns a dispatcher instance configured for specific event
// or utils.ErrNotFound if none present
func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CGREvent,
func (dS *DispatcherService) dispatcherProfilesForEvent(ctx *context.Context, tnt string, ev *utils.CGREvent,
evNm utils.MapStorage) (dPrlfs engine.DispatcherProfiles, err error) {
// find out the matching profiles
var prflIDs utils.StringSet
if prflIDs, err = engine.MatchingItemIDsForEvent(context.TODO(), evNm,
if prflIDs, err = engine.MatchingItemIDsForEvent(ctx, evNm,
dS.cfg.DispatcherSCfg().StringIndexedFields,
dS.cfg.DispatcherSCfg().PrefixIndexedFields,
dS.cfg.DispatcherSCfg().SuffixIndexedFields,
@@ -116,7 +116,7 @@ func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CG
return
}
for prflID := range prflIDs {
prfl, err := dS.dm.GetDispatcherProfile(context.TODO(), tnt, prflID, true, true, utils.NonTransactional)
prfl, err := dS.dm.GetDispatcherProfile(ctx, tnt, prflID, true, true, utils.NonTransactional)
if err != nil {
if err != utils.ErrNotFound {
return nil, err
@@ -124,7 +124,7 @@ func (dS *DispatcherService) dispatcherProfilesForEvent(tnt string, ev *utils.CG
continue
}
if pass, err := dS.fltrS.Pass(context.TODO(), tnt, prfl.FilterIDs,
if pass, err := dS.fltrS.Pass(ctx, tnt, prfl.FilterIDs,
evNm); err != nil {
return nil, err
} else if !pass {
@@ -165,7 +165,7 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
},
}
var dPrfls engine.DispatcherProfiles
if dPrfls, err = dS.dispatcherProfilesForEvent(tnt, ev, evNm); err != nil {
if dPrfls, err = dS.dispatcherProfilesForEvent(context.TODO(), tnt, ev, evNm); err != nil {
return utils.NewErrDispatcherS(err)
}
for _, dPrfl := range dPrfls {
@@ -188,13 +188,13 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
return // return the last error
}
func (dS *DispatcherService) V1GetProfilesForEvent(ev *utils.CGREvent,
func (dS *DispatcherService) V1GetProfilesForEvent(ctx *context.Context, ev *utils.CGREvent,
dPfl *engine.DispatcherProfiles) (err error) {
tnt := ev.Tenant
if tnt == utils.EmptyString {
tnt = dS.cfg.GeneralCfg().DefaultTenant
}
retDPfl, errDpfl := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
retDPfl, errDpfl := dS.dispatcherProfilesForEvent(ctx, tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -208,3 +208,21 @@ func (dS *DispatcherService) V1GetProfilesForEvent(ev *utils.CGREvent,
*dPfl = retDPfl
return
}
func (dS *DispatcherService) ping(ctx *context.Context, subsys, method string, args *utils.CGREvent,
reply *string) (err error) {
if args == nil {
args = new(utils.CGREvent)
}
tnt := dS.cfg.GeneralCfg().DefaultTenant
if args.Tenant != utils.EmptyString {
tnt = args.Tenant
}
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if err = dS.authorize(method, tnt,
utils.IfaceAsString(args.APIOpts[utils.OptsAPIKey])); err != nil {
return
}
}
return dS.Dispatch(args, subsys, method, args, reply)
}

View File

@@ -79,7 +79,7 @@ func TestDispatcherServiceDispatcherProfileForEventGetDispatcherProfileNF(t *tes
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
_, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -103,7 +103,7 @@ func TestDispatcherServiceDispatcherProfileForEventMIIDENotFound(t *testing.T) {
ev := &utils.CGREvent{}
tnt := ""
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err := dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
_, err := dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -156,7 +156,7 @@ func TestDispatcherV1GetProfileForEventErr(t *testing.T) {
dsp := NewDispatcherService(nil, cfg, nil, nil)
ev := &utils.CGREvent{}
dPfl := &engine.DispatcherProfiles{}
err := dsp.V1GetProfilesForEvent(ev, dPfl)
err := dsp.V1GetProfilesForEvent(context.Background(), ev, dPfl)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -169,7 +169,7 @@ func TestDispatcherV1GetProfileForEvent(t *testing.T) {
dsp := NewDispatcherService(nil, cfg, nil, nil)
ev := &utils.CGREvent{}
dPfl := &engine.DispatcherProfiles{}
err := dsp.V1GetProfilesForEvent(ev, dPfl)
err := dsp.V1GetProfilesForEvent(context.Background(), ev, dPfl)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if err == nil || err.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -527,7 +527,7 @@ func TestDispatcherServiceDispatcherProfileForEventErrNil(t *testing.T) {
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
_, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -574,7 +574,7 @@ func TestDispatcherV1GetProfileForEventReturn(t *testing.T) {
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
_, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -585,7 +585,7 @@ func TestDispatcherV1GetProfileForEventReturn(t *testing.T) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err)
}
dPfl := &engine.DispatcherProfiles{}
err = dss.V1GetProfilesForEvent(ev, dPfl)
err = dss.V1GetProfilesForEvent(context.Background(), ev, dPfl)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if err != nil {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err)
@@ -627,7 +627,7 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFound(t *testing.T) {
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
_, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -674,7 +674,7 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFound2(t *testing.T) {
}
tnt := ""
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
_, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -722,7 +722,7 @@ func TestDispatcherServiceDispatcherProfileForEventErrNotFoundFilter(t *testing.
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
_, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -864,7 +864,7 @@ func TestDispatcherServiceDispatcherProfileForEventFoundFilter(t *testing.T) {
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
_, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -888,7 +888,7 @@ func TestDispatcherServiceDispatcherProfileForEventNotNotFound(t *testing.T) {
if cnt == 0 {
cnt++
return map[string]utils.StringSet{
idxKey: utils.StringSet{"cgrates.org:dsp1": {}},
idxKey: {"cgrates.org:dsp1": {}},
}, nil
}
return nil, utils.ErrNotImplemented
@@ -910,7 +910,7 @@ func TestDispatcherServiceDispatcherProfileForEventNotNotFound(t *testing.T) {
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err := dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
_, err := dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -959,7 +959,7 @@ func TestDispatcherServiceDispatcherProfileForEventGetDispatcherError(t *testing
}
tnt := ev.Tenant
subsys := utils.IfaceAsString(ev.APIOpts[utils.Subsys])
_, err = dss.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
_, err = dss.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -1083,11 +1083,6 @@ func TestDispatcherServiceDispatchDspErrHostNotFound3(t *testing.T) {
engine.Cache = cacheInit
}
func (dS *DispatcherService) DispatcherServiceTest(ev *utils.CGREvent, reply *string) (error, interface{}) {
*reply = utils.Pong
return nil, nil
}
func TestDispatchersdispatcherProfileForEventAnySSfalseFirstNotFound(t *testing.T) {
tmp := engine.Cache
defer func() {
@@ -1141,7 +1136,7 @@ func TestDispatchersdispatcherProfileForEventAnySSfalseFirstNotFound(t *testing.
}
subsys := utils.MetaSessionS
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
if rcv, err := dS.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -1209,7 +1204,7 @@ func TestDispatchersdispatcherProfileForEventAnySSfalseFound(t *testing.T) {
}
subsys := utils.MetaSessionS
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
if rcv, err := dS.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -1277,7 +1272,7 @@ func TestDispatchersdispatcherProfileForEventAnySSfalseNotFound(t *testing.T) {
}
subsys := utils.MetaSessionS
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
if rcv, err := dS.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -1343,7 +1338,7 @@ func TestDispatchersdispatcherProfileForEventAnySStrueNotFound(t *testing.T) {
}
subsys := utils.MetaSessionS
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
if rcv, err := dS.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -1409,7 +1404,7 @@ func TestDispatchersdispatcherProfileForEventAnySStrueBothFound(t *testing.T) {
}
subsys := utils.MetaSessionS
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
if rcv, err := dS.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{
@@ -1429,7 +1424,7 @@ func TestDispatchersdispatcherProfileForEventAnySStrueBothFound(t *testing.T) {
t.Error(err)
}
if rcv, err := dS.dispatcherProfilesForEvent(tnt, ev, utils.MapStorage{
if rcv, err := dS.dispatcherProfilesForEvent(context.Background(), tnt, ev, utils.MapStorage{
utils.MetaReq: ev.Event,
utils.MetaOpts: ev.APIOpts,
utils.MetaVars: utils.MapStorage{

View File

@@ -33,15 +33,14 @@ import (
var (
sTestsDspSup = []func(t *testing.T){
testDspSupPingFailover,
// testDspSupGetSupFailover,
// testDspSupGetSupRoundRobin,
testDspSupGetSupFailover,
testDspSupGetSupRoundRobin,
testDspSupPing,
testDspSupTestAuthKey,
// testDspSupTestAuthKey2,
testDspSupTestAuthKey2,
testDspSupGetSupplierForEvent,
}
nowTime = time.Now()
)
//Test start here

View File

@@ -134,11 +134,3 @@ func (s RPCClientSet) Call(ctx *context.Context, method string, args interface{}
}
return conn.Call(ctx, method, args, reply)
}
// func (s RPCClientSet) ReconnectInternals(subsystems ...string) (err error) {
// for _, subsystem := range subsystems {
// if err = s[subsystem].Reconnect(); err != nil {
// return
// }
// }
// }

View File

@@ -35,7 +35,7 @@ import (
func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *cores.Server, internalChan chan birpc.ClientConnector,
anz *AnalyzerService,
anz *AnalyzerService, dspS *DispatcherService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &AttributeService{
connChan: internalChan,
@@ -46,6 +46,7 @@ func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService,
server: server,
anz: anz,
srvDep: srvDep,
dspS: dspS,
}
}
@@ -62,6 +63,7 @@ type AttributeService struct {
rpc *apis.AttributeSv1 // useful on restart
connChan chan birpc.ClientConnector // publish the internal Subsystem when available
anz *AnalyzerService
dspS *DispatcherService
srvDep map[string]*sync.WaitGroup
}
@@ -89,6 +91,16 @@ func (attrS *AttributeService) Start() (err error) {
if !attrS.cfg.DispatcherSCfg().Enabled {
attrS.server.RpcRegister(srv)
}
dspShtdChan := attrS.dspS.RegisterShutdownChan(attrS.ServiceName())
go func() {
for {
if _, closed := <-dspShtdChan; closed {
return
}
attrS.server.RpcRegister(srv)
}
}()
attrS.connChan <- attrS.anz.GetInternalCodec(srv, utils.AttributeS)
return
}
@@ -106,6 +118,7 @@ func (attrS *AttributeService) Shutdown() (err error) {
attrS.rpc = nil
<-attrS.connChan
attrS.server.RpcUnregisterName(utils.AttributeSv1)
attrS.dspS.UnregisterShutdownChan(attrS.ServiceName())
attrS.Unlock()
return
}

View File

@@ -19,14 +19,15 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"strings"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
@@ -35,7 +36,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *cores.Server, internalChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
srvDep map[string]*sync.WaitGroup) *DispatcherService {
return &DispatcherService{
connChan: internalChan,
cfg: cfg,
@@ -46,6 +47,7 @@ func NewDispatcherService(cfg *config.CGRConfig, dm *DataDBService,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
srvsReload: make(map[string]chan struct{}),
}
}
@@ -59,11 +61,12 @@ type DispatcherService struct {
server *cores.Server
connMgr *engine.ConnManager
dspS *dispatchers.DispatcherService
// rpc *v1.DispatcherSv1
dspS *dispatchers.DispatcherService
connChan chan birpc.ClientConnector
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
srvsReload map[string]chan struct{}
}
// Start should handle the sercive start
@@ -86,11 +89,22 @@ func (dspS *DispatcherService) Start() (err error) {
dspS.dspS = dispatchers.NewDispatcherService(datadb, dspS.cfg, fltrS, dspS.connMgr)
dspS.unregisterAllDispatchedSubsystems() // unregister all rpc services that can be dispatched
srv, _ := birpc.NewService(apis.NewDispatcherSv1(dspS.dspS), "", false)
dspS.server.RpcRegister(srv)
attrsv1, _ := birpc.NewServiceWithMethodsRename(dspS.dspS, utils.AttributeSv1, true, func(oldFn string) (newFn string) {
if strings.HasPrefix(oldFn, utils.AttributeSv1) {
return strings.TrimPrefix(oldFn, utils.AttributeSv1)
}
return
})
dspS.server.RpcRegisterName(utils.AttributeSv1, attrsv1)
// for the moment we dispable Apier through dispatcher
// until we figured out a better sollution in case of gob server
// dspS.server.SetDispatched()
/*
dspS.server.RpcRegister(v1.NewDispatcherSv1(dspS.dspS))
dspS.server.RpcRegisterName(utils.ThresholdSv1,
v1.NewDispatcherThresholdSv1(dspS.dspS))
@@ -143,7 +157,7 @@ func (dspS *DispatcherService) Start() (err error) {
dspS.server.RpcRegisterName(utils.AccountSv1,
v1.NewDispatcherAccountSv1(dspS.dspS))
*/
// dspS.connChan <- dspS.anz.GetInternalCodec(dspS.dspS, utils.DispatcherS)
dspS.connChan <- dspS.anz.GetInternalCodec(srv, utils.DispatcherS)
return
}
@@ -159,8 +173,9 @@ func (dspS *DispatcherService) Shutdown() (err error) {
defer dspS.Unlock()
dspS.dspS.Shutdown()
dspS.dspS = nil
// dspS.rpc = nil
//<-dspS.connChan
<-dspS.connChan
dspS.unregisterAllDispatchedSubsystems()
dspS.sync()
return
}
@@ -180,3 +195,28 @@ func (dspS *DispatcherService) ServiceName() string {
func (dspS *DispatcherService) ShouldRun() bool {
return dspS.cfg.DispatcherSCfg().Enabled
}
func (dspS *DispatcherService) unregisterAllDispatchedSubsystems() {
dspS.server.RpcUnregisterName(utils.AttributeSv1)
}
func (dspS *DispatcherService) RegisterShutdownChan(subsys string) (c chan struct{}) {
c = make(chan struct{})
dspS.Lock()
dspS.srvsReload[subsys] = c
dspS.Unlock()
return
}
func (dspS *DispatcherService) UnregisterShutdownChan(subsys string) {
dspS.Lock()
close(dspS.srvsReload[subsys])
delete(dspS.srvsReload, subsys)
dspS.Unlock()
}
func (dspS *DispatcherService) sync() {
for _, c := range dspS.srvsReload {
c <- struct{}{}
}
}