diff --git a/apis/libadmin.go b/apis/libadmin.go index 0298b938a..ef6e49807 100644 --- a/apis/libadmin.go +++ b/apis/libadmin.go @@ -193,15 +193,14 @@ func (admS *AdminSv1) callCacheForComputeIndexes(ctx *context.Context, cacheopt, method, args, &reply) } -/* // callCacheRevDestinations used for reverse destination, loadIDs and indexes replication -func (apierSv1 *AdminS) callCacheMultiple(cacheopt, tnt, cacheID string, itemIDs []string, opts map[string]interface{}) (err error) { +func (admS *AdminSv1) callCacheMultiple(ctx *context.Context, cacheopt, tnt, cacheID string, itemIDs []string, opts map[string]interface{}) (err error) { if len(itemIDs) == 0 { return } var reply, method string var args interface{} - switch utils.FirstNonEmpty(cacheopt, apierSv1.cfg.GeneralCfg().DefaultCaching) { + switch utils.FirstNonEmpty(cacheopt, admS.cfg.GeneralCfg().DefaultCaching) { case utils.MetaNone: return case utils.MetaReload: @@ -221,10 +220,9 @@ func (apierSv1 *AdminS) callCacheMultiple(cacheopt, tnt, cacheID string, itemIDs APIOpts: opts, } } - return apierSv1.ConnMgr.Call(context.TODO(), apierSv1.cfg.ApierCfg().CachesConns, + return admS.connMgr.Call(ctx, admS.cfg.AdminSCfg().CachesConns, method, args, &reply) } -*/ func composeCacheArgsForFilter(dm *engine.DataManager, ctx *context.Context, fltr *engine.Filter, tnt, tntID string, args map[string][]string) (_ map[string][]string, err error) { indxIDs := make([]string, 0, len(fltr.Rules)) diff --git a/apis/replicator.go b/apis/replicator.go new file mode 100644 index 000000000..01af6d8c0 --- /dev/null +++ b/apis/replicator.go @@ -0,0 +1,650 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package apis + +import ( + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +// NewReplicatorSv1 constructs the ReplicatorSv1 object +func NewReplicatorSv1(dm *engine.DataManager, v1 *AdminSv1) *ReplicatorSv1 { + return &ReplicatorSv1{ + dm: dm, + v1: v1, + } +} + +// ReplicatorSv1 exports the DataDB methods to RPC +type ReplicatorSv1 struct { + ping + dm *engine.DataManager + v1 *AdminSv1 // needed for CallCache only +} + +// GetAccount is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetAccount(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *utils.Account) error { + engine.UpdateReplicationFilters(utils.AccountPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetAccountDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +// GetStatQueue is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetStatQueue(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.StatQueue) error { + engine.UpdateReplicationFilters(utils.StatQueuePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetStatQueueDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +// GetFilter is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetFilter(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.Filter) error { + engine.UpdateReplicationFilters(utils.FilterPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetFilterDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +// GetThreshold is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetThreshold(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.Threshold) error { + engine.UpdateReplicationFilters(utils.ThresholdPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetThresholdDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +// GetThresholdProfile is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetThresholdProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.ThresholdProfile) error { + engine.UpdateReplicationFilters(utils.ThresholdProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetThresholdProfileDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +// GetStatQueueProfile is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetStatQueueProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.StatQueueProfile) error { + engine.UpdateReplicationFilters(utils.StatQueueProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetStatQueueProfileDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +// GetResource is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetResource(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.Resource) error { + engine.UpdateReplicationFilters(utils.ResourcesPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetResourceDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +// GetResourceProfile is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetResourceProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.ResourceProfile) error { + engine.UpdateReplicationFilters(utils.ResourceProfilesPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetResourceProfileDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +// GetRouteProfile is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetRouteProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.RouteProfile) error { + engine.UpdateReplicationFilters(utils.RouteProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetRouteProfileDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +// GetAttributeProfile is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetAttributeProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.AttributeProfile) error { + engine.UpdateReplicationFilters(utils.AttributeProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetAttributeProfileDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +// GetChargerProfile is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetChargerProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.ChargerProfile) error { + engine.UpdateReplicationFilters(utils.ChargerProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetChargerProfileDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +// GetDispatcherProfile is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetDispatcherProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.DispatcherProfile) error { + engine.UpdateReplicationFilters(utils.DispatcherProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetDispatcherProfileDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +// GetDispatcherHost is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetDispatcherHost(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.DispatcherHost) error { + engine.UpdateReplicationFilters(utils.DispatcherHostPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetDispatcherHostDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +// GetItemLoadIDs is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetItemLoadIDs(ctx *context.Context, itemID *utils.StringWithAPIOpts, reply *map[string]int64) error { + engine.UpdateReplicationFilters(utils.LoadIDPrefix, itemID.Arg, utils.IfaceAsString(itemID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetItemLoadIDsDrv(ctx, itemID.Arg) + if err != nil { + return err + } + *reply = rcv + return nil +} + +// GetIndexes is the remote method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) GetIndexes(ctx *context.Context, args *utils.GetIndexesArg, reply *map[string]utils.StringSet) error { + engine.UpdateReplicationFilters(utils.CacheInstanceToPrefix[args.IdxItmType], args.TntCtx, utils.IfaceAsString(args.APIOpts[utils.RemoteHostOpt])) + indx, err := rplSv1.dm.DataDB().GetIndexesDrv(ctx, args.IdxItmType, args.TntCtx, args.IdxKey, utils.NonTransactional) + if err != nil { + return err + } + *reply = indx + return nil +} + +// SetAccount is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetAccount(ctx *context.Context, acc *utils.AccountWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetAccountDrv(ctx, acc.Account); err != nil { + return + } + // the account doesn't have cache + *reply = utils.OK + return +} + +// SetThresholdProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetThresholdProfile(ctx *context.Context, th *engine.ThresholdProfileWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetThresholdProfileDrv(ctx, th.ThresholdProfile); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(th.APIOpts[utils.MetaCache]), + th.Tenant, utils.CacheThresholdProfiles, th.TenantID(), &th.FilterIDs, th.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// SetThreshold is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetThreshold(ctx *context.Context, th *engine.ThresholdWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetThresholdDrv(ctx, th.Threshold); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(th.APIOpts[utils.MetaCache]), + th.Tenant, utils.CacheThresholds, th.TenantID(), nil, th.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// SetStatQueueProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetStatQueueProfile(ctx *context.Context, sq *engine.StatQueueProfileWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetStatQueueProfileDrv(ctx, sq.StatQueueProfile); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(sq.APIOpts[utils.MetaCache]), + sq.Tenant, utils.CacheStatQueueProfiles, sq.TenantID(), &sq.FilterIDs, sq.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// SetStatQueue is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetStatQueue(ctx *context.Context, sq *engine.StatQueueWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetStatQueueDrv(ctx, nil, sq.StatQueue); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(sq.APIOpts[utils.MetaCache]), + sq.StatQueue.Tenant, utils.CacheStatQueues, sq.StatQueue.TenantID(), nil, sq.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// SetFilter is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetFilter(ctx *context.Context, fltr *engine.FilterWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetFilterDrv(ctx, fltr.Filter); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(fltr.APIOpts[utils.MetaCache]), + fltr.Tenant, utils.CacheFilters, fltr.TenantID(), nil, fltr.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// SetResourceProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetResourceProfile(ctx *context.Context, rs *engine.ResourceProfileWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetResourceProfileDrv(ctx, rs.ResourceProfile); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(rs.APIOpts[utils.MetaCache]), + rs.Tenant, utils.CacheResourceProfiles, rs.TenantID(), &rs.FilterIDs, rs.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// SetResource is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetResource(ctx *context.Context, rs *engine.ResourceWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetResourceDrv(ctx, rs.Resource); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(rs.APIOpts[utils.MetaCache]), + rs.Tenant, utils.CacheResources, rs.TenantID(), nil, rs.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// SetRouteProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetRouteProfile(ctx *context.Context, sp *engine.RouteProfileWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetRouteProfileDrv(ctx, sp.RouteProfile); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(sp.APIOpts[utils.MetaCache]), + sp.Tenant, utils.CacheRouteProfiles, sp.TenantID(), &sp.FilterIDs, sp.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// SetAttributeProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetAttributeProfile(ctx *context.Context, ap *engine.AttributeProfileWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetAttributeProfileDrv(ctx, ap.AttributeProfile); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(ap.APIOpts[utils.MetaCache]), + ap.Tenant, utils.CacheAttributeProfiles, ap.TenantID(), &ap.FilterIDs, ap.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// SetChargerProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetChargerProfile(ctx *context.Context, cp *engine.ChargerProfileWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetChargerProfileDrv(ctx, cp.ChargerProfile); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(cp.APIOpts[utils.MetaCache]), + cp.Tenant, utils.CacheChargerProfiles, cp.TenantID(), &cp.FilterIDs, cp.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// SetDispatcherProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetDispatcherProfile(ctx *context.Context, dpp *engine.DispatcherProfileWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetDispatcherProfileDrv(ctx, dpp.DispatcherProfile); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(dpp.APIOpts[utils.MetaCache]), + dpp.Tenant, utils.CacheDispatcherProfiles, dpp.TenantID(), &dpp.FilterIDs, dpp.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// SetDispatcherHost is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetDispatcherHost(ctx *context.Context, dpp *engine.DispatcherHostWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetDispatcherHostDrv(ctx, dpp.DispatcherHost); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(dpp.APIOpts[utils.MetaCache]), + dpp.Tenant, utils.CacheDispatcherHosts, dpp.TenantID(), nil, dpp.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// SetLoadIDs is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetLoadIDs(ctx *context.Context, args *utils.LoadIDsWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetLoadIDsDrv(ctx, args.LoadIDs); err != nil { + return + } + lIDs := make([]string, 0, len(args.LoadIDs)) + for lID := range args.LoadIDs { + lIDs = append(lIDs, lID) + } + if err = rplSv1.v1.callCacheMultiple(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheLoadIDs, lIDs, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// SetIndexes is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) SetIndexes(ctx *context.Context, args *utils.SetIndexesArg, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetIndexesDrv(ctx, args.IdxItmType, args.TntCtx, args.Indexes, true, utils.NonTransactional); err != nil { + return + } + cIDs := make([]string, 0, len(args.Indexes)) + for idxKey := range args.Indexes { + cIDs = append(cIDs, utils.ConcatenatedKey(args.TntCtx, idxKey)) + } + if err = rplSv1.v1.callCacheMultiple(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, args.IdxItmType, cIDs, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveThreshold is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveThreshold(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveThresholdDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheThresholds, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveAccount is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveAccount(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveAccountDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + // the account doesn't have cache + *reply = utils.OK + return +} + +// RemoveStatQueue is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveStatQueue(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemStatQueueDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheStatQueues, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveFilter is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveFilter(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveFilterDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheFilters, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveThresholdProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveThresholdProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemThresholdProfileDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheThresholdProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveStatQueueProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveStatQueueProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemStatQueueProfileDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheStatQueueProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveResource is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveResource(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveResourceDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheResources, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveResourceProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveResourceProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveResourceProfileDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheResourceProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveRouteProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveRouteProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveRouteProfileDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheRouteProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveAttributeProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveAttributeProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveAttributeProfileDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheAttributeProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveChargerProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveChargerProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveChargerProfileDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheChargerProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveDispatcherProfile is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveDispatcherProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveDispatcherProfileDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheDispatcherProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveDispatcherHost is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveDispatcherHost(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveDispatcherHostDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheDispatcherHosts, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveIndexes is the replication method coresponding to the dataDb driver method +func (rplSv1 *ReplicatorSv1) RemoveIndexes(ctx *context.Context, args *utils.GetIndexesArg, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveIndexesDrv(ctx, args.IdxItmType, args.TntCtx, args.IdxKey); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, args.IdxItmType, utils.ConcatenatedKey(args.TntCtx, args.IdxKey), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +func (rplSv1 *ReplicatorSv1) GetRateProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *utils.RateProfile) error { + engine.UpdateReplicationFilters(utils.RateProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetRateProfileDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} +func (rplSv1 *ReplicatorSv1) GetActionProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.ActionProfile) error { + engine.UpdateReplicationFilters(utils.ActionProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt])) + rcv, err := rplSv1.dm.DataDB().GetActionProfileDrv(ctx, tntID.Tenant, tntID.ID) + if err != nil { + return err + } + *reply = *rcv + return nil +} + +func (rplSv1 *ReplicatorSv1) SetRateProfile(ctx *context.Context, sp *utils.RateProfileWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetRateProfileDrv(ctx, sp.RateProfile); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(sp.APIOpts[utils.MetaCache]), + sp.Tenant, utils.CacheRateProfiles, sp.TenantID(), &sp.FilterIDs, sp.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} +func (rplSv1 *ReplicatorSv1) SetActionProfile(ctx *context.Context, sp *engine.ActionProfileWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetActionProfileDrv(ctx, sp.ActionProfile); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(sp.APIOpts[utils.MetaCache]), + sp.Tenant, utils.CacheActionProfiles, sp.TenantID(), &sp.FilterIDs, sp.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +func (rplSv1 *ReplicatorSv1) RemoveRateProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveRateProfileDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheRateProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} + +func (rplSv1 *ReplicatorSv1) RemoveActionProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveActionProfileDrv(ctx, args.Tenant, args.ID); err != nil { + return + } + if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), + args.Tenant, utils.CacheActionProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil { + return + } + *reply = utils.OK + return +} diff --git a/data/conf/samples/loaders_indexes_internal_db/cgrates.json b/data/conf/samples/loaders_indexes_internal_db/cgrates.json new file mode 100644 index 000000000..a43322c2e --- /dev/null +++ b/data/conf/samples/loaders_indexes_internal_db/cgrates.json @@ -0,0 +1,85 @@ +{ +"general": { + "node_id" : "IntenalLoaders", + "log_level": 7 +}, + +"listen": { + "rpc_json": ":2022", + "rpc_gob": ":2023", + "http": ":2280" +}, + +"rpc_conns": { + "engine": { + "strategy": "*first", + "conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}] + } +}, + +"data_db": { + "db_type": "*internal", + "remote_conns": ["engine"], + "replication_conns": ["engine"], + "items":{ + "*accounts": {"remote":true, "replicate":true}, + "*reverse_destinations": {"remote":true, "replicate":true}, + "*destinations": {"remote":true, "replicate":true}, + "*rating_plans": {"remote":true, "replicate":true}, + "*rating_profiles": {"remote":true, "replicate":true}, + "*actions": {"remote":true, "replicate":true}, + "*action_plans": {"remote":true, "replicate":true}, + "*account_action_plans": {"remote":true, "replicate":true}, + "*action_triggers": {"remote":true, "replicate":true}, + "*shared_groups": {"remote":true, "replicate":true}, + "*timings": {"remote":true, "replicate":true}, + "*resource_profiles": {"remote":true, "replicate":true}, + "*resources": {"remote":true, "replicate":true}, + "*statqueue_profiles": {"remote":true, "replicate":true}, + "*statqueues": {"remote":true, "replicate":true}, + "*threshold_profiles": {"remote":true, "replicate":true}, + "*thresholds": {"remote":true, "replicate":true}, + "*filters": {"remote":true, "replicate":true}, + "*route_profiles": {"remote":true, "replicate":true}, + "*attribute_profiles": {"remote":true, "replicate":true}, + "*charger_profiles": {"remote":true, "replicate":true}, + "*dispatcher_profiles": {"remote":true, "replicate":true}, + "*dispatcher_hosts": {"remote":true, "replicate":true}, + "*load_ids": {"remote":true, "replicate":true}, + "*versions": {"remote":true, "replicate":true}, + "*rate_profiles": {"remote":true, "replicate":true}, + "*action_profiles": {"remote":true, "replicate":true}, + // no remote for indexes + "*resource_filter_indexes" : {"replicate":true}, + "*stat_filter_indexes" : {"replicate":true}, + "*threshold_filter_indexes" : {"replicate":true}, + "*route_filter_indexes" : {"replicate":true}, + "*attribute_filter_indexes" : {"replicate":true}, + "*charger_filter_indexes" : {"replicate":true}, + "*dispatcher_filter_indexes" : {"replicate":true}, + "*reverse_filter_indexes" : {"replicate":true}, + "*rate_profile_filter_indexes" : {"replicate": true}, + "*rate_filter_indexes" : {"replicate": true}, + "*action_profile_filter_indexes" : {"replicate": true}, + "*account_filter_indexes" : {"replicate": true} + } +}, + +"stor_db": { + "db_type": "*internal" +}, + +"loaders": [{ + "id": "*default", + "enabled": true, + "caches_conns": ["engine"], + "tp_in_dir": "/usr/share/cgrates/tariffplans/tutorial/", + "tp_out_dir": "" +}], + +"admins": { + "enabled": true, + "caches_conns":["engine"] +} + +} \ No newline at end of file diff --git a/dispatchers/replicator.go b/dispatchers/replicator.go index 4f75edd1c..c15395f07 100644 --- a/dispatchers/replicator.go +++ b/dispatchers/replicator.go @@ -351,15 +351,15 @@ func (dS *DispatcherService) ReplicatorSv1SetStatQueue(args *engine.StatQueueWit StatQueue: &engine.StatQueue{}, } } - args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant) + args.StatQueue.Tenant = utils.FirstNonEmpty(args.StatQueue.Tenant, dS.cfg.GeneralCfg().DefaultTenant) if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { - if err = dS.authorize(utils.ReplicatorSv1SetStatQueue, args.Tenant, + if err = dS.authorize(utils.ReplicatorSv1SetStatQueue, args.StatQueue.Tenant, utils.IfaceAsString(args.APIOpts[utils.OptsAPIKey])); err != nil { return } } return dS.Dispatch(context.TODO(), &utils.CGREvent{ - Tenant: args.Tenant, + Tenant: args.StatQueue.Tenant, APIOpts: args.APIOpts, }, utils.MetaReplicator, utils.ReplicatorSv1SetStatQueue, args, rpl) } @@ -727,23 +727,6 @@ func (dS *DispatcherService) ReplicatorSv1RemoveResourceProfile(args *utils.Tena }, utils.MetaReplicator, utils.ReplicatorSv1RemoveResourceProfile, args, rpl) } -func (dS *DispatcherService) ReplicatorSv1RemoveActions(args *utils.StringWithAPIOpts, rpl *string) (err error) { - if args == nil { - args = new(utils.StringWithAPIOpts) - } - args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { - if err = dS.authorize(utils.ReplicatorSv1RemoveActions, args.Tenant, - utils.IfaceAsString(args.APIOpts[utils.OptsAPIKey])); err != nil { - return - } - } - return dS.Dispatch(context.TODO(), &utils.CGREvent{ - Tenant: args.Tenant, - APIOpts: args.APIOpts, - }, utils.MetaReplicator, utils.ReplicatorSv1RemoveActions, args, rpl) -} - func (dS *DispatcherService) ReplicatorSv1RemoveRouteProfile(args *utils.TenantIDWithAPIOpts, rpl *string) (err error) { if args == nil { args = &utils.TenantIDWithAPIOpts{ diff --git a/dispatchers/replicator_test.go b/dispatchers/replicator_test.go index 3a1f2c69c..0929ea2bf 100644 --- a/dispatchers/replicator_test.go +++ b/dispatchers/replicator_test.go @@ -757,46 +757,6 @@ func TestDspReplicatorSv1RemoveResourceProfileNilEvent(t *testing.T) { } } -func TestDspReplicatorSv1RemoveActionsNil(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"} - CGREvent := &utils.StringWithAPIOpts{ - Tenant: "tenant", - } - var reply *string - result := dspSrv.ReplicatorSv1RemoveActions(CGREvent, reply) - expected := "MANDATORY_IE_MISSING: [ApiKey]" - if result == nil || result.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestDspReplicatorSv1RemoveActionsErrorNil(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - CGREvent := &utils.StringWithAPIOpts{ - Tenant: "tenant", - } - var reply *string - result := dspSrv.ReplicatorSv1RemoveActions(CGREvent, reply) - expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" - if result == nil || result.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestDspReplicatorSv1RemoveActionsEvent(t *testing.T) { - cgrCfg := config.NewDefaultCGRConfig() - dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - var reply *string - result := dspSrv.ReplicatorSv1RemoveActions(nil, reply) - expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" - if result == nil || result.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - func TestDspReplicatorSv1RemoveRouteProfileNil(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) diff --git a/engine/libstats.go b/engine/libstats.go index b6fd8f502..8093ef3f3 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -127,8 +127,8 @@ type StoredStatQueue struct { } type StatQueueWithAPIOpts struct { - *StatQueue - APIOpts map[string]interface{} + StatQueue *StatQueue + APIOpts map[string]interface{} } // SqID will compose the unique identifier for the StatQueue out of Tenant and ID @@ -395,9 +395,6 @@ func (sis StatQueues) Sort() { } func (sq *StatQueue) MarshalJSON() (rply []byte, err error) { - if sq == nil { - return []byte("null"), nil - } type tmp StatQueue sq.lock(utils.EmptyString) rply, err = json.Marshal(tmp(*sq)) @@ -473,15 +470,23 @@ func (sq *StatQueue) UnmarshalJSON(data []byte) (err error) { return } -func (sq *StatQueue) GobEncode() (rply []byte, err error) { +type sqEncode StatQueue + +func (sq StatQueue) GobEncode() (rply []byte, err error) { buf := bytes.NewBuffer(rply) - type tmp StatQueue sq.lock(utils.EmptyString) - err = gob.NewEncoder(buf).Encode(tmp(*sq)) + err = gob.NewEncoder(buf).Encode(sqEncode(sq)) sq.unlock() return buf.Bytes(), err } +func (sq *StatQueue) GobDecode(rply []byte) (err error) { + buf := bytes.NewBuffer(rply) + var eSq sqEncode + err = gob.NewDecoder(buf).Decode(&eSq) + *sq = StatQueue(eSq) + return err +} func (sq *StatQueue) Clone() (cln *StatQueue) { cln = &StatQueue{ Tenant: sq.Tenant, @@ -511,30 +516,13 @@ func (ssq *StatQueueWithAPIOpts) MarshalJSON() (rply []byte, err error) { StatQueue APIOpts map[string]interface{} } - ssq.lock(utils.EmptyString) rply, err = json.Marshal(tmp{ StatQueue: *ssq.StatQueue, APIOpts: ssq.APIOpts, }) - ssq.unlock() return } -func (ssq *StatQueueWithAPIOpts) GobEncode() (rply []byte, err error) { - buf := bytes.NewBuffer(rply) - type tmp struct { - StatQueue - APIOpts map[string]interface{} - } - ssq.lock(utils.EmptyString) - err = gob.NewEncoder(buf).Encode(tmp{ - StatQueue: *ssq.StatQueue, - APIOpts: ssq.APIOpts, - }) - ssq.unlock() - return buf.Bytes(), err -} - // UnmarshalJSON here only to fully support json for StatQueue func (ssq *StatQueueWithAPIOpts) UnmarshalJSON(data []byte) (err error) { sq := new(StatQueue) diff --git a/engine/libstats_test.go b/engine/libstats_test.go index ec3225dea..8e88f652d 100644 --- a/engine/libstats_test.go +++ b/engine/libstats_test.go @@ -1295,6 +1295,7 @@ func TestStatQueueWithAPIOptsJSONMarshall(t *testing.T) { if err = json.Unmarshal([]byte(utils.ToJSON(exp2)), rply); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(rply, exp2) { + t.Errorf("Expected: %+v , received: %+v", exp2, rply) t.Errorf("Expected: %s , received: %s", utils.ToJSON(exp2), utils.ToJSON(rply)) } diff --git a/engine/statmetrics.go b/engine/statmetrics.go index a22b59985..ecf117cc2 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -111,7 +111,6 @@ func (asr *StatASR) AddEvent(evID string, ev utils.DataProvider) error { } func (asr *StatASR) RemEvent(evID string) (err error) { - val, has := asr.Events[evID] if !has { return utils.ErrNotFound diff --git a/general_tests/loaders_internal_indexes_it_test.go b/general_tests/loaders_internal_indexes_it_test.go new file mode 100644 index 000000000..648ece7fc --- /dev/null +++ b/general_tests/loaders_internal_indexes_it_test.go @@ -0,0 +1,192 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package general_tests + +import ( + "path" + "reflect" + "sort" + "testing" + "time" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/apis" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/loaders" + "github.com/cgrates/cgrates/utils" +) + +var ( + loadersIDBIdxCfgDir string + loadersIDBIdxCfgPath string + loadersIDBIdxCfgPathInternal = path.Join(*dataDir, "conf", "samples", "loaders_indexes_internal_db") + loadersIDBIdxCfg, loadersIDBIdxCfgInternal *config.CGRConfig + loadersIDBIdxRPC, loadersIDBIdxRPCInternal *birpc.Client + + LoadersIDBIdxTests = []func(t *testing.T){ + testLoadersIDBIdxItLoadConfig, + testLoadersIDBIdxItDB, + testLoadersIDBIdxItStartEngines, + testLoadersIDBIdxItRPCConn, + testLoadersIDBIdxItLoad, + testLoadersIDBIdxCheckAttributes, + testLoadersIDBIdxCheckAttributesIndexes, + testLoadersIDBIdxItStopCgrEngine, + } +) + +func TestLoadersIDBIdxIt(t *testing.T) { + switch *dbType { + case utils.MetaInternal: + loadersIDBIdxCfgDir = "tutinternal" + case utils.MetaMySQL: + loadersIDBIdxCfgDir = "tutmysql" + case utils.MetaMongo: + loadersIDBIdxCfgDir = "tutmongo" + case utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("Unknown Database type") + } + for _, stest := range LoadersIDBIdxTests { + t.Run(loadersIDBIdxCfgDir, stest) + } +} + +func testLoadersIDBIdxItLoadConfig(t *testing.T) { + loadersIDBIdxCfgPath = path.Join(*dataDir, "conf", "samples", loadersIDBIdxCfgDir) + if loadersIDBIdxCfg, err = config.NewCGRConfigFromPath(context.Background(), loadersIDBIdxCfgPath); err != nil { + t.Error(err) + } + if loadersIDBIdxCfgInternal, err = config.NewCGRConfigFromPath(context.Background(), loadersIDBIdxCfgPathInternal); err != nil { + t.Error(err) + } +} + +func testLoadersIDBIdxItDB(t *testing.T) { + if err := engine.InitDataDB(loadersIDBIdxCfg); err != nil { + t.Fatal(err) + } + if err := engine.InitStorDB(loadersIDBIdxCfg); err != nil { + t.Fatal(err) + } +} + +func testLoadersIDBIdxItStartEngines(t *testing.T) { + if _, err := engine.StopStartEngine(loadersIDBIdxCfgPath, *waitRater); err != nil { + t.Fatal(err) + } + if _, err := engine.StartEngine(loadersIDBIdxCfgPathInternal, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testLoadersIDBIdxItRPCConn(t *testing.T) { + var err error + if loadersIDBIdxRPC, err = newRPCClient(loadersIDBIdxCfg.ListenCfg()); err != nil { + t.Fatal(err) + } + if loadersIDBIdxRPCInternal, err = newRPCClient(loadersIDBIdxCfgInternal.ListenCfg()); err != nil { + t.Fatal(err) + } +} + +func testLoadersIDBIdxItLoad(t *testing.T) { + var reply string + if err := loadersIDBIdxRPCInternal.Call(context.Background(), utils.LoaderSv1Run, + &loaders.ArgsProcessFolder{ + APIOpts: map[string]interface{}{ + utils.MetaStopOnError: false, + utils.MetaCache: utils.MetaReload, + }, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Error("Unexpected reply returned:", reply) + } + time.Sleep(100 * time.Millisecond) +} + +func testLoadersIDBIdxCheckAttributes(t *testing.T) { + exp := &engine.APIAttributeProfile{ + Tenant: "cgrates.org", + ID: "ATTR_1001_SIMPLEAUTH", + FilterIDs: []string{"*string:~*opts.*context:simpleauth", "*string:~*req.Account:1001"}, + Attributes: []*engine.ExternalAttribute{{ + Path: utils.MetaReq + utils.NestingSep + "Password", + Type: utils.MetaConstant, + Value: "CGRateS.org", + }}, + Weight: 20.0, + } + + var reply *engine.APIAttributeProfile + if err := loadersIDBIdxRPC.Call(context.Background(), utils.AdminSv1GetAttributeProfile, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{ + Tenant: "cgrates.org", + ID: "ATTR_1001_SIMPLEAUTH", + }, + }, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(exp, reply) { + t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(reply)) + } +} + +func testLoadersIDBIdxCheckAttributesIndexes(t *testing.T) { + expIdx := []string{ + "*string:*opts.*context:*sessions:ATTR_1001_SESSIONAUTH", + "*string:*opts.*context:*sessions:ATTR_1002_SESSIONAUTH", + "*string:*opts.*context:*sessions:ATTR_1003_SESSIONAUTH", + "*string:*opts.*context:simpleauth:ATTR_1001_SIMPLEAUTH", + "*string:*opts.*context:simpleauth:ATTR_1002_SIMPLEAUTH", + "*string:*opts.*context:simpleauth:ATTR_1003_SIMPLEAUTH", + "*string:*req.Account:1001:ATTR_1001_SESSIONAUTH", + "*string:*req.Account:1001:ATTR_1001_SIMPLEAUTH", + "*string:*req.Account:1002:ATTR_1002_SESSIONAUTH", + "*string:*req.Account:1002:ATTR_1002_SIMPLEAUTH", + "*string:*req.Account:1003:ATTR_1003_SESSIONAUTH", + "*string:*req.Account:1003:ATTR_1003_SIMPLEAUTH", + "*string:*req.SubscriberId:1006:ATTR_ACC_ALIAS", + } + var indexes []string + if err := loadersIDBIdxRPC.Call(context.Background(), utils.AdminSv1GetFilterIndexes, + &apis.AttrGetFilterIndexes{ + ItemType: utils.MetaAttributes, + Tenant: "cgrates.org", + FilterType: utils.MetaString, + Context: "simpleauth", + }, &indexes); err != nil { + t.Error(err) + } else if sort.Strings(indexes); !reflect.DeepEqual(indexes, expIdx) { + t.Errorf("Expecting: %+v, received: %+v", + utils.ToJSON(expIdx), utils.ToJSON(indexes)) + } +} + +func testLoadersIDBIdxItStopCgrEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} diff --git a/services/adminsv1.go b/services/adminsv1.go index 0c19d47f0..86125ab50 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -103,6 +103,10 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF for _, s := range srv { apiService.server.RpcRegister(s) } + rpl, _ := engine.NewService(apis.NewReplicatorSv1(datadb, apiService.api)) + for _, s := range rpl { + apiService.server.RpcRegister(s) + } } //backwards compatible diff --git a/utils/consts.go b/utils/consts.go index 90fd707cb..f597cf76b 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1064,7 +1064,6 @@ const ( ReplicatorSv1GetStatQueueProfile = "ReplicatorSv1.GetStatQueueProfile" ReplicatorSv1GetResource = "ReplicatorSv1.GetResource" ReplicatorSv1GetResourceProfile = "ReplicatorSv1.GetResourceProfile" - ReplicatorSv1GetActions = "ReplicatorSv1.GetActions" ReplicatorSv1GetRouteProfile = "ReplicatorSv1.GetRouteProfile" ReplicatorSv1GetAttributeProfile = "ReplicatorSv1.GetAttributeProfile" ReplicatorSv1GetChargerProfile = "ReplicatorSv1.GetChargerProfile" @@ -1081,7 +1080,6 @@ const ( ReplicatorSv1SetStatQueueProfile = "ReplicatorSv1.SetStatQueueProfile" ReplicatorSv1SetResource = "ReplicatorSv1.SetResource" ReplicatorSv1SetResourceProfile = "ReplicatorSv1.SetResourceProfile" - ReplicatorSv1SetActions = "ReplicatorSv1.SetActions" ReplicatorSv1SetRouteProfile = "ReplicatorSv1.SetRouteProfile" ReplicatorSv1SetAttributeProfile = "ReplicatorSv1.SetAttributeProfile" ReplicatorSv1SetChargerProfile = "ReplicatorSv1.SetChargerProfile" @@ -1099,7 +1097,6 @@ const ( ReplicatorSv1RemoveStatQueueProfile = "ReplicatorSv1.RemoveStatQueueProfile" ReplicatorSv1RemoveResource = "ReplicatorSv1.RemoveResource" ReplicatorSv1RemoveResourceProfile = "ReplicatorSv1.RemoveResourceProfile" - ReplicatorSv1RemoveActions = "ReplicatorSv1.RemoveActions" ReplicatorSv1RemoveRouteProfile = "ReplicatorSv1.RemoveRouteProfile" ReplicatorSv1RemoveAttributeProfile = "ReplicatorSv1.RemoveAttributeProfile" ReplicatorSv1RemoveChargerProfile = "ReplicatorSv1.RemoveChargerProfile"