From c84ae4e49b71d369fd7a1a75366743f10dbc0c05 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 5 Jun 2020 11:56:20 +0300 Subject: [PATCH] Added replication for index functions --- apier/v1/api_interfaces.go | 4 ++ apier/v1/dispatcher.go | 15 +++++ apier/v1/replicator.go | 28 +++++++++ dispatchers/replicator.go | 69 +++++++++++++++++++++ engine/datamanager.go | 123 ++++++++++++++++++------------------- utils/consts.go | 4 ++ utils/coreutils.go | 66 ++++++++++++-------- 7 files changed, 223 insertions(+), 86 deletions(-) diff --git a/apier/v1/api_interfaces.go b/apier/v1/api_interfaces.go index 23c0ed587..ad7513f41 100644 --- a/apier/v1/api_interfaces.go +++ b/apier/v1/api_interfaces.go @@ -255,4 +255,8 @@ type ReplicatorSv1Interface interface { RemoveDispatcherProfile(args *utils.TenantIDWithArgDispatcher, reply *string) error RemoveDispatcherHost(args *utils.TenantIDWithArgDispatcher, reply *string) error RemoveRateProfile(args *utils.TenantIDWithArgDispatcher, reply *string) error + + GetIndexes(args *utils.GetIndexesArg, reply *map[string]utils.StringSet) error + SetIndexes(args *utils.SetIndexesArg, reply *string) error + RemoveIndexes(args *utils.GetIndexesArg, reply *string) error } diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index ad9ea336f..16ed2bd04 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -1254,3 +1254,18 @@ func (dS *DispatcherReplicatorSv1) RemoveDispatcherHost(args *utils.TenantIDWith func (dS *DispatcherReplicatorSv1) RemoveRateProfile(args *utils.TenantIDWithArgDispatcher, reply *string) error { return dS.dS.ReplicatorSv1RemoveRateProfile(args, reply) } + +// GetIndexes . +func (dS *DispatcherReplicatorSv1) GetIndexes(args *utils.GetIndexesArg, reply *map[string]utils.StringSet) error { + return dS.dS.ReplicatorSv1GetIndexes(args, reply) +} + +// SetIndexes . +func (dS *DispatcherReplicatorSv1) SetIndexes(args *utils.SetIndexesArg, reply *string) error { + return dS.dS.ReplicatorSv1SetIndexes(args, reply) +} + +// RemoveIndexes . +func (dS *DispatcherReplicatorSv1) RemoveIndexes(args *utils.GetIndexesArg, reply *string) error { + return dS.dS.ReplicatorSv1RemoveIndexes(args, reply) +} diff --git a/apier/v1/replicator.go b/apier/v1/replicator.go index 585fd11f2..8b148d455 100644 --- a/apier/v1/replicator.go +++ b/apier/v1/replicator.go @@ -750,3 +750,31 @@ func (rplSv1 *ReplicatorSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *s *reply = utils.Pong return nil } + +// GetIndexes . +func (rplSv1 *ReplicatorSv1) GetIndexes(args *utils.GetIndexesArg, reply *map[string]utils.StringSet) error { + indx, err := rplSv1.dm.DataDB().GetIndexesDrv(args.IdxItmType, args.TntCtx, args.IdxKey) + if err != nil { + return err + } + *reply = indx + return nil +} + +// SetIndexes . +func (rplSv1 *ReplicatorSv1) SetIndexes(args *utils.SetIndexesArg, reply *string) error { + if err := rplSv1.dm.DataDB().SetIndexesDrv(args.IdxItmType, args.TntCtx, args.Indexes, true, utils.NonTransactional); err != nil { + return err + } + *reply = utils.OK + return nil +} + +// RemoveIndexes . +func (rplSv1 *ReplicatorSv1) RemoveIndexes(args *utils.GetIndexesArg, reply *string) error { + if err := rplSv1.dm.DataDB().RemoveIndexesDrv(args.IdxItmType, args.TntCtx, args.IdxKey); err != nil { + return err + } + *reply = utils.OK + return nil +} diff --git a/dispatchers/replicator.go b/dispatchers/replicator.go index 581b46458..1780b5ab9 100644 --- a/dispatchers/replicator.go +++ b/dispatchers/replicator.go @@ -1727,3 +1727,72 @@ func (dS *DispatcherService) ReplicatorSv1RemoveRateProfile(args *utils.TenantID return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaReplicator, routeID, utils.ReplicatorSv1RemoveRateProfile, args, rpl) } + +// ReplicatorSv1GetIndexes . +func (dS *DispatcherService) ReplicatorSv1GetIndexes(args *utils.GetIndexesArg, reply *map[string]utils.StringSet) (err error) { + if args == nil { + args = &utils.GetIndexesArg{} + } + args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant) + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { + if args.ArgDispatcher == nil { + return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) + } + if err = dS.authorize(utils.ReplicatorSv1GetIndexes, args.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + var routeID *string + if args.ArgDispatcher != nil { + routeID = args.ArgDispatcher.RouteID + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaReplicator, routeID, + utils.ReplicatorSv1GetIndexes, args, reply) +} + +// ReplicatorSv1SetIndexes . +func (dS *DispatcherService) ReplicatorSv1SetIndexes(args *utils.SetIndexesArg, reply *string) (err error) { + if args == nil { + args = &utils.SetIndexesArg{} + } + args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant) + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { + if args.ArgDispatcher == nil { + return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) + } + if err = dS.authorize(utils.ReplicatorSv1SetIndexes, args.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + var routeID *string + if args.ArgDispatcher != nil { + routeID = args.ArgDispatcher.RouteID + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaReplicator, routeID, + utils.ReplicatorSv1SetIndexes, args, reply) +} + +// ReplicatorSv1RemoveIndexes . +func (dS *DispatcherService) ReplicatorSv1RemoveIndexes(args *utils.GetIndexesArg, reply *string) (err error) { + if args == nil { + args = &utils.GetIndexesArg{} + } + args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant) + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { + if args.ArgDispatcher == nil { + return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) + } + if err = dS.authorize(utils.ReplicatorSv1RemoveIndexes, args.Tenant, + args.APIKey, utils.TimePointer(time.Now())); err != nil { + return + } + } + var routeID *string + if args.ArgDispatcher != nil { + routeID = args.ArgDispatcher.RouteID + } + return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaReplicator, routeID, + utils.ReplicatorSv1RemoveIndexes, args, reply) +} diff --git a/engine/datamanager.go b/engine/datamanager.go index adb4e6827..b80ea718e 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -3301,35 +3301,33 @@ func (dm *DataManager) GetIndexes(idxItmType, tntCtx, idxKey string, } } if indexes, err = dm.DataDB().GetIndexesDrv(idxItmType, tntCtx, idxKey); err != nil { - // if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; err == utils.ErrNotFound && itm.Remote { - // if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil, - // utils.ReplicatorSv1GetFilterIndexes, - // &utils.GetFilterIndexesArgWithArgDispatcher{ - // GetFilterIndexesArg: &utils.GetFilterIndexesArg{ - // CacheID: cacheID, - // ItemIDPrefix: key, - // FilterType: filterType, - // FldNameVal: fldNameVal}, - // TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant}, - // ArgDispatcher: &utils.ArgDispatcher{ - // APIKey: utils.StringPointer(itm.APIKey), - // RouteID: utils.StringPointer(itm.RouteID)}, - // }, &indexes); err == nil { - // err = dm.dataDB.SetFilterIndexesDrv(cacheID, key, indexes, true, utils.NonTransactional) - // } - // } - // if err != nil { - // err = utils.CastRPCErr(err) - if err == utils.ErrNotFound { - if idxKey != utils.EmptyString { - if errCh := Cache.Set(idxItmType, cachekey, nil, nil, - true, utils.NonTransactional); errCh != nil { - return nil, errCh - } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaIndexes]; err == utils.ErrNotFound && itm.Remote { + if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil, + utils.ReplicatorSv1GetIndexes, + &utils.GetIndexesArg{ + IdxItmType: idxItmType, + TntCtx: tntCtx, + IdxKey: idxKey, + TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant}, + ArgDispatcher: &utils.ArgDispatcher{ + APIKey: utils.StringPointer(itm.APIKey), + RouteID: utils.StringPointer(itm.RouteID)}, + }, &indexes); err == nil { + err = dm.dataDB.SetIndexesDrv(idxItmType, tntCtx, indexes, true, utils.NonTransactional) } } - return nil, err - // } + if err != nil { + err = utils.CastRPCErr(err) + if err == utils.ErrNotFound { + if idxKey != utils.EmptyString { + if errCh := Cache.Set(idxItmType, cachekey, nil, nil, + true, utils.NonTransactional); errCh != nil { + return nil, errCh + } + } + } + return nil, err + } } for k, v := range indexes { @@ -3352,24 +3350,22 @@ func (dm *DataManager) SetIndexes(idxItmType, tntCtx string, indexes, commit, transactionID); err != nil { return } - // if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; itm.Replicate { - // var reply string - // if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - // utils.ReplicatorSv1SetFilterIndexes, - // &utils.SetFilterIndexesArgWithArgDispatcher{ - // SetFilterIndexesArg: &utils.SetFilterIndexesArg{ - // CacheID: cacheID, - // ItemIDPrefix: itemIDPrefix, - // Indexes: indexes}, - // TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant}, - // ArgDispatcher: &utils.ArgDispatcher{ - // APIKey: utils.StringPointer(itm.APIKey), - // RouteID: utils.StringPointer(itm.RouteID)}, - // }, &reply); err != nil { - // err = utils.CastRPCErr(err) - // return - // } - // } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaIndexes]; itm.Replicate { + var reply string + if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, + utils.ReplicatorSv1SetIndexes, + &utils.SetIndexesArg{ + IdxItmType: idxItmType, + TntCtx: tntCtx, + Indexes: indexes, + TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant}, + ArgDispatcher: &utils.ArgDispatcher{ + APIKey: utils.StringPointer(itm.APIKey), + RouteID: utils.StringPointer(itm.RouteID)}, + }, &reply); err != nil { + err = utils.CastRPCErr(err) + } + } return } @@ -3378,21 +3374,24 @@ func (dm *DataManager) RemoveIndexes(idxItmType, tntCtx, idxKey string) (err err err = utils.ErrNoDatabaseConn return } - return dm.DataDB().RemoveIndexesDrv(idxItmType, tntCtx, idxKey) - // if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; itm.Replicate { - // var reply string - // dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - // utils.ReplicatorSv1RemoveFilterIndexes, - // &utils.SetFilterIndexesArgWithArgDispatcher{ - // SetFilterIndexesArg: &utils.SetFilterIndexesArg{ - // CacheID: cacheID, - // ItemIDPrefix: itemIDPrefix, - // Indexes: indexes}, - // TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant}, - // ArgDispatcher: &utils.ArgDispatcher{ - // APIKey: utils.StringPointer(itm.APIKey), - // RouteID: utils.StringPointer(itm.RouteID)}, - // }, &reply); err != nil { - // err = utils.CastRPCErr(err) - // } + if err = dm.DataDB().RemoveIndexesDrv(idxItmType, tntCtx, idxKey); err != nil { + return + } + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaIndexes]; itm.Replicate { + var reply string + if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, + utils.ReplicatorSv1RemoveIndexes, + &utils.GetIndexesArg{ + IdxItmType: idxItmType, + TntCtx: tntCtx, + IdxKey: idxKey, + TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant}, + ArgDispatcher: &utils.ArgDispatcher{ + APIKey: utils.StringPointer(itm.APIKey), + RouteID: utils.StringPointer(itm.RouteID)}, + }, &reply); err != nil { + err = utils.CastRPCErr(err) + } + } + return } diff --git a/utils/consts.go b/utils/consts.go index ea26dd152..744ff3f05 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -786,6 +786,7 @@ const ( MetaRouteProfiles = "*route_profiles" MetaAttributeProfiles = "*attribute_profiles" MetaFilterIndexes = "*filter_indexes" + MetaIndexes = "*indexes" MetaDispatcherProfiles = "*dispatcher_profiles" MetaRateProfiles = "*rate_profiles" MetaChargerProfiles = "*charger_profiles" @@ -1079,6 +1080,9 @@ const ( ReplicatorSv1RemoveDispatcherProfile = "ReplicatorSv1.RemoveDispatcherProfile" ReplicatorSv1RemoveRateProfile = "ReplicatorSv1.RemoveRateProfile" ReplicatorSv1RemoveDispatcherHost = "ReplicatorSv1.RemoveDispatcherHost" + ReplicatorSv1GetIndexes = "ReplicatorSv1.GetIndexes" + ReplicatorSv1SetIndexes = "ReplicatorSv1.SetIndexes" + ReplicatorSv1RemoveIndexes = "ReplicatorSv1.RemoveIndexes" ) // APIerSv1 APIs diff --git a/utils/coreutils.go b/utils/coreutils.go index 7629a5316..47445d2a2 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -881,6 +881,30 @@ type SetFilterIndexesArg struct { Indexes map[string]StringMap } +type GetFilterIndexesArgWithArgDispatcher struct { + *GetFilterIndexesArg + TenantArg + *ArgDispatcher +} + +type MatchFilterIndexArgWithArgDispatcher struct { + *MatchFilterIndexArg + TenantArg + *ArgDispatcher +} + +type SetFilterIndexesArgWithArgDispatcher struct { + *SetFilterIndexesArg + TenantArg + *ArgDispatcher +} + +type StringWithApiKey struct { + *ArgDispatcher + TenantArg + Arg string +} + func CastRPCErr(err error) error { if _, has := ErrMap[err.Error()]; has { return ErrMap[err.Error()] @@ -894,30 +918,6 @@ func RandomInteger(min, max int) int { return math_rand.Intn(max-min) + min } -type GetFilterIndexesArgWithArgDispatcher struct { - *GetFilterIndexesArg - TenantArg - *ArgDispatcher -} - -type MatchFilterIndexArgWithArgDispatcher struct { - *MatchFilterIndexArg - TenantArg - *ArgDispatcher -} - -type StringWithApiKey struct { - *ArgDispatcher - TenantArg - Arg string -} - -type SetFilterIndexesArgWithArgDispatcher struct { - *SetFilterIndexesArg - TenantArg - *ArgDispatcher -} - type LoadIDsWithArgDispatcher struct { LoadIDs map[string]int64 TenantArg @@ -929,3 +929,21 @@ func IsURL(path string) bool { return strings.HasPrefix(path, "https://") || strings.HasPrefix(path, "http://") } + +// GetIndexesArg the API argumets to specify an index +type GetIndexesArg struct { + IdxItmType string + TntCtx string + IdxKey string + TenantArg + *ArgDispatcher +} + +// SetIndexesArg the API arguments needed for seting an index +type SetIndexesArg struct { + IdxItmType string + TntCtx string + Indexes map[string]StringSet + TenantArg + *ArgDispatcher +}