Added replication for index functions

This commit is contained in:
Trial97
2020-06-05 11:56:20 +03:00
committed by Dan Christian Bogos
parent 760b242bd2
commit c84ae4e49b
7 changed files with 223 additions and 86 deletions

View File

@@ -255,4 +255,8 @@ type ReplicatorSv1Interface interface {
RemoveDispatcherProfile(args *utils.TenantIDWithArgDispatcher, reply *string) error
RemoveDispatcherHost(args *utils.TenantIDWithArgDispatcher, reply *string) error
RemoveRateProfile(args *utils.TenantIDWithArgDispatcher, reply *string) error
GetIndexes(args *utils.GetIndexesArg, reply *map[string]utils.StringSet) error
SetIndexes(args *utils.SetIndexesArg, reply *string) error
RemoveIndexes(args *utils.GetIndexesArg, reply *string) error
}

View File

@@ -1254,3 +1254,18 @@ func (dS *DispatcherReplicatorSv1) RemoveDispatcherHost(args *utils.TenantIDWith
func (dS *DispatcherReplicatorSv1) RemoveRateProfile(args *utils.TenantIDWithArgDispatcher, reply *string) error {
return dS.dS.ReplicatorSv1RemoveRateProfile(args, reply)
}
// GetIndexes .
func (dS *DispatcherReplicatorSv1) GetIndexes(args *utils.GetIndexesArg, reply *map[string]utils.StringSet) error {
return dS.dS.ReplicatorSv1GetIndexes(args, reply)
}
// SetIndexes .
func (dS *DispatcherReplicatorSv1) SetIndexes(args *utils.SetIndexesArg, reply *string) error {
return dS.dS.ReplicatorSv1SetIndexes(args, reply)
}
// RemoveIndexes .
func (dS *DispatcherReplicatorSv1) RemoveIndexes(args *utils.GetIndexesArg, reply *string) error {
return dS.dS.ReplicatorSv1RemoveIndexes(args, reply)
}

View File

@@ -750,3 +750,31 @@ func (rplSv1 *ReplicatorSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *s
*reply = utils.Pong
return nil
}
// GetIndexes .
func (rplSv1 *ReplicatorSv1) GetIndexes(args *utils.GetIndexesArg, reply *map[string]utils.StringSet) error {
indx, err := rplSv1.dm.DataDB().GetIndexesDrv(args.IdxItmType, args.TntCtx, args.IdxKey)
if err != nil {
return err
}
*reply = indx
return nil
}
// SetIndexes .
func (rplSv1 *ReplicatorSv1) SetIndexes(args *utils.SetIndexesArg, reply *string) error {
if err := rplSv1.dm.DataDB().SetIndexesDrv(args.IdxItmType, args.TntCtx, args.Indexes, true, utils.NonTransactional); err != nil {
return err
}
*reply = utils.OK
return nil
}
// RemoveIndexes .
func (rplSv1 *ReplicatorSv1) RemoveIndexes(args *utils.GetIndexesArg, reply *string) error {
if err := rplSv1.dm.DataDB().RemoveIndexesDrv(args.IdxItmType, args.TntCtx, args.IdxKey); err != nil {
return err
}
*reply = utils.OK
return nil
}

View File

@@ -1727,3 +1727,72 @@ func (dS *DispatcherService) ReplicatorSv1RemoveRateProfile(args *utils.TenantID
return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaReplicator, routeID,
utils.ReplicatorSv1RemoveRateProfile, args, rpl)
}
// ReplicatorSv1GetIndexes .
func (dS *DispatcherService) ReplicatorSv1GetIndexes(args *utils.GetIndexesArg, reply *map[string]utils.StringSet) (err error) {
if args == nil {
args = &utils.GetIndexesArg{}
}
args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
if err = dS.authorize(utils.ReplicatorSv1GetIndexes, args.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
var routeID *string
if args.ArgDispatcher != nil {
routeID = args.ArgDispatcher.RouteID
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaReplicator, routeID,
utils.ReplicatorSv1GetIndexes, args, reply)
}
// ReplicatorSv1SetIndexes .
func (dS *DispatcherService) ReplicatorSv1SetIndexes(args *utils.SetIndexesArg, reply *string) (err error) {
if args == nil {
args = &utils.SetIndexesArg{}
}
args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
if err = dS.authorize(utils.ReplicatorSv1SetIndexes, args.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
var routeID *string
if args.ArgDispatcher != nil {
routeID = args.ArgDispatcher.RouteID
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaReplicator, routeID,
utils.ReplicatorSv1SetIndexes, args, reply)
}
// ReplicatorSv1RemoveIndexes .
func (dS *DispatcherService) ReplicatorSv1RemoveIndexes(args *utils.GetIndexesArg, reply *string) (err error) {
if args == nil {
args = &utils.GetIndexesArg{}
}
args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
if err = dS.authorize(utils.ReplicatorSv1RemoveIndexes, args.Tenant,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
var routeID *string
if args.ArgDispatcher != nil {
routeID = args.ArgDispatcher.RouteID
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaReplicator, routeID,
utils.ReplicatorSv1RemoveIndexes, args, reply)
}

View File

@@ -3301,35 +3301,33 @@ func (dm *DataManager) GetIndexes(idxItmType, tntCtx, idxKey string,
}
}
if indexes, err = dm.DataDB().GetIndexesDrv(idxItmType, tntCtx, idxKey); err != nil {
// if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; err == utils.ErrNotFound && itm.Remote {
// if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil,
// utils.ReplicatorSv1GetFilterIndexes,
// &utils.GetFilterIndexesArgWithArgDispatcher{
// GetFilterIndexesArg: &utils.GetFilterIndexesArg{
// CacheID: cacheID,
// ItemIDPrefix: key,
// FilterType: filterType,
// FldNameVal: fldNameVal},
// TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant},
// ArgDispatcher: &utils.ArgDispatcher{
// APIKey: utils.StringPointer(itm.APIKey),
// RouteID: utils.StringPointer(itm.RouteID)},
// }, &indexes); err == nil {
// err = dm.dataDB.SetFilterIndexesDrv(cacheID, key, indexes, true, utils.NonTransactional)
// }
// }
// if err != nil {
// err = utils.CastRPCErr(err)
if err == utils.ErrNotFound {
if idxKey != utils.EmptyString {
if errCh := Cache.Set(idxItmType, cachekey, nil, nil,
true, utils.NonTransactional); errCh != nil {
return nil, errCh
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaIndexes]; err == utils.ErrNotFound && itm.Remote {
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil,
utils.ReplicatorSv1GetIndexes,
&utils.GetIndexesArg{
IdxItmType: idxItmType,
TntCtx: tntCtx,
IdxKey: idxKey,
TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant},
ArgDispatcher: &utils.ArgDispatcher{
APIKey: utils.StringPointer(itm.APIKey),
RouteID: utils.StringPointer(itm.RouteID)},
}, &indexes); err == nil {
err = dm.dataDB.SetIndexesDrv(idxItmType, tntCtx, indexes, true, utils.NonTransactional)
}
}
return nil, err
// }
if err != nil {
err = utils.CastRPCErr(err)
if err == utils.ErrNotFound {
if idxKey != utils.EmptyString {
if errCh := Cache.Set(idxItmType, cachekey, nil, nil,
true, utils.NonTransactional); errCh != nil {
return nil, errCh
}
}
}
return nil, err
}
}
for k, v := range indexes {
@@ -3352,24 +3350,22 @@ func (dm *DataManager) SetIndexes(idxItmType, tntCtx string,
indexes, commit, transactionID); err != nil {
return
}
// if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; itm.Replicate {
// var reply string
// if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
// utils.ReplicatorSv1SetFilterIndexes,
// &utils.SetFilterIndexesArgWithArgDispatcher{
// SetFilterIndexesArg: &utils.SetFilterIndexesArg{
// CacheID: cacheID,
// ItemIDPrefix: itemIDPrefix,
// Indexes: indexes},
// TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant},
// ArgDispatcher: &utils.ArgDispatcher{
// APIKey: utils.StringPointer(itm.APIKey),
// RouteID: utils.StringPointer(itm.RouteID)},
// }, &reply); err != nil {
// err = utils.CastRPCErr(err)
// return
// }
// }
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaIndexes]; itm.Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1SetIndexes,
&utils.SetIndexesArg{
IdxItmType: idxItmType,
TntCtx: tntCtx,
Indexes: indexes,
TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant},
ArgDispatcher: &utils.ArgDispatcher{
APIKey: utils.StringPointer(itm.APIKey),
RouteID: utils.StringPointer(itm.RouteID)},
}, &reply); err != nil {
err = utils.CastRPCErr(err)
}
}
return
}
@@ -3378,21 +3374,24 @@ func (dm *DataManager) RemoveIndexes(idxItmType, tntCtx, idxKey string) (err err
err = utils.ErrNoDatabaseConn
return
}
return dm.DataDB().RemoveIndexesDrv(idxItmType, tntCtx, idxKey)
// if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; itm.Replicate {
// var reply string
// dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
// utils.ReplicatorSv1RemoveFilterIndexes,
// &utils.SetFilterIndexesArgWithArgDispatcher{
// SetFilterIndexesArg: &utils.SetFilterIndexesArg{
// CacheID: cacheID,
// ItemIDPrefix: itemIDPrefix,
// Indexes: indexes},
// TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant},
// ArgDispatcher: &utils.ArgDispatcher{
// APIKey: utils.StringPointer(itm.APIKey),
// RouteID: utils.StringPointer(itm.RouteID)},
// }, &reply); err != nil {
// err = utils.CastRPCErr(err)
// }
if err = dm.DataDB().RemoveIndexesDrv(idxItmType, tntCtx, idxKey); err != nil {
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaIndexes]; itm.Replicate {
var reply string
if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil,
utils.ReplicatorSv1RemoveIndexes,
&utils.GetIndexesArg{
IdxItmType: idxItmType,
TntCtx: tntCtx,
IdxKey: idxKey,
TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant},
ArgDispatcher: &utils.ArgDispatcher{
APIKey: utils.StringPointer(itm.APIKey),
RouteID: utils.StringPointer(itm.RouteID)},
}, &reply); err != nil {
err = utils.CastRPCErr(err)
}
}
return
}

View File

@@ -786,6 +786,7 @@ const (
MetaRouteProfiles = "*route_profiles"
MetaAttributeProfiles = "*attribute_profiles"
MetaFilterIndexes = "*filter_indexes"
MetaIndexes = "*indexes"
MetaDispatcherProfiles = "*dispatcher_profiles"
MetaRateProfiles = "*rate_profiles"
MetaChargerProfiles = "*charger_profiles"
@@ -1079,6 +1080,9 @@ const (
ReplicatorSv1RemoveDispatcherProfile = "ReplicatorSv1.RemoveDispatcherProfile"
ReplicatorSv1RemoveRateProfile = "ReplicatorSv1.RemoveRateProfile"
ReplicatorSv1RemoveDispatcherHost = "ReplicatorSv1.RemoveDispatcherHost"
ReplicatorSv1GetIndexes = "ReplicatorSv1.GetIndexes"
ReplicatorSv1SetIndexes = "ReplicatorSv1.SetIndexes"
ReplicatorSv1RemoveIndexes = "ReplicatorSv1.RemoveIndexes"
)
// APIerSv1 APIs

View File

@@ -881,6 +881,30 @@ type SetFilterIndexesArg struct {
Indexes map[string]StringMap
}
type GetFilterIndexesArgWithArgDispatcher struct {
*GetFilterIndexesArg
TenantArg
*ArgDispatcher
}
type MatchFilterIndexArgWithArgDispatcher struct {
*MatchFilterIndexArg
TenantArg
*ArgDispatcher
}
type SetFilterIndexesArgWithArgDispatcher struct {
*SetFilterIndexesArg
TenantArg
*ArgDispatcher
}
type StringWithApiKey struct {
*ArgDispatcher
TenantArg
Arg string
}
func CastRPCErr(err error) error {
if _, has := ErrMap[err.Error()]; has {
return ErrMap[err.Error()]
@@ -894,30 +918,6 @@ func RandomInteger(min, max int) int {
return math_rand.Intn(max-min) + min
}
type GetFilterIndexesArgWithArgDispatcher struct {
*GetFilterIndexesArg
TenantArg
*ArgDispatcher
}
type MatchFilterIndexArgWithArgDispatcher struct {
*MatchFilterIndexArg
TenantArg
*ArgDispatcher
}
type StringWithApiKey struct {
*ArgDispatcher
TenantArg
Arg string
}
type SetFilterIndexesArgWithArgDispatcher struct {
*SetFilterIndexesArg
TenantArg
*ArgDispatcher
}
type LoadIDsWithArgDispatcher struct {
LoadIDs map[string]int64
TenantArg
@@ -929,3 +929,21 @@ func IsURL(path string) bool {
return strings.HasPrefix(path, "https://") ||
strings.HasPrefix(path, "http://")
}
// GetIndexesArg the API argumets to specify an index
type GetIndexesArg struct {
IdxItmType string
TntCtx string
IdxKey string
TenantArg
*ArgDispatcher
}
// SetIndexesArg the API arguments needed for seting an index
type SetIndexesArg struct {
IdxItmType string
TntCtx string
Indexes map[string]StringSet
TenantArg
*ArgDispatcher
}