mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-16 05:39:54 +05:00
Added tests for in-memory indexing
This commit is contained in:
committed by
Dan Christian Bogos
parent
46f0ddb860
commit
fbfcffd2a9
@@ -193,15 +193,14 @@ func (admS *AdminSv1) callCacheForComputeIndexes(ctx *context.Context, cacheopt,
|
||||
method, args, &reply)
|
||||
}
|
||||
|
||||
/*
|
||||
// callCacheRevDestinations used for reverse destination, loadIDs and indexes replication
|
||||
func (apierSv1 *AdminS) callCacheMultiple(cacheopt, tnt, cacheID string, itemIDs []string, opts map[string]interface{}) (err error) {
|
||||
func (admS *AdminSv1) callCacheMultiple(ctx *context.Context, cacheopt, tnt, cacheID string, itemIDs []string, opts map[string]interface{}) (err error) {
|
||||
if len(itemIDs) == 0 {
|
||||
return
|
||||
}
|
||||
var reply, method string
|
||||
var args interface{}
|
||||
switch utils.FirstNonEmpty(cacheopt, apierSv1.cfg.GeneralCfg().DefaultCaching) {
|
||||
switch utils.FirstNonEmpty(cacheopt, admS.cfg.GeneralCfg().DefaultCaching) {
|
||||
case utils.MetaNone:
|
||||
return
|
||||
case utils.MetaReload:
|
||||
@@ -221,10 +220,9 @@ func (apierSv1 *AdminS) callCacheMultiple(cacheopt, tnt, cacheID string, itemIDs
|
||||
APIOpts: opts,
|
||||
}
|
||||
}
|
||||
return apierSv1.ConnMgr.Call(context.TODO(), apierSv1.cfg.ApierCfg().CachesConns,
|
||||
return admS.connMgr.Call(ctx, admS.cfg.AdminSCfg().CachesConns,
|
||||
method, args, &reply)
|
||||
}
|
||||
*/
|
||||
|
||||
func composeCacheArgsForFilter(dm *engine.DataManager, ctx *context.Context, fltr *engine.Filter, tnt, tntID string, args map[string][]string) (_ map[string][]string, err error) {
|
||||
indxIDs := make([]string, 0, len(fltr.Rules))
|
||||
|
||||
650
apis/replicator.go
Normal file
650
apis/replicator.go
Normal file
@@ -0,0 +1,650 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package apis
|
||||
|
||||
import (
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// NewReplicatorSv1 constructs the ReplicatorSv1 object
|
||||
func NewReplicatorSv1(dm *engine.DataManager, v1 *AdminSv1) *ReplicatorSv1 {
|
||||
return &ReplicatorSv1{
|
||||
dm: dm,
|
||||
v1: v1,
|
||||
}
|
||||
}
|
||||
|
||||
// ReplicatorSv1 exports the DataDB methods to RPC
|
||||
type ReplicatorSv1 struct {
|
||||
ping
|
||||
dm *engine.DataManager
|
||||
v1 *AdminSv1 // needed for CallCache only
|
||||
}
|
||||
|
||||
// GetAccount is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetAccount(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *utils.Account) error {
|
||||
engine.UpdateReplicationFilters(utils.AccountPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetAccountDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetStatQueue is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetStatQueue(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.StatQueue) error {
|
||||
engine.UpdateReplicationFilters(utils.StatQueuePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetStatQueueDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetFilter is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetFilter(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.Filter) error {
|
||||
engine.UpdateReplicationFilters(utils.FilterPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetFilterDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetThreshold is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetThreshold(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.Threshold) error {
|
||||
engine.UpdateReplicationFilters(utils.ThresholdPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetThresholdDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetThresholdProfile is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetThresholdProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.ThresholdProfile) error {
|
||||
engine.UpdateReplicationFilters(utils.ThresholdProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetThresholdProfileDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetStatQueueProfile is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetStatQueueProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.StatQueueProfile) error {
|
||||
engine.UpdateReplicationFilters(utils.StatQueueProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetStatQueueProfileDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetResource is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetResource(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.Resource) error {
|
||||
engine.UpdateReplicationFilters(utils.ResourcesPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetResourceDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetResourceProfile is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetResourceProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.ResourceProfile) error {
|
||||
engine.UpdateReplicationFilters(utils.ResourceProfilesPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetResourceProfileDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetRouteProfile is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetRouteProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.RouteProfile) error {
|
||||
engine.UpdateReplicationFilters(utils.RouteProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetRouteProfileDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAttributeProfile is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetAttributeProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.AttributeProfile) error {
|
||||
engine.UpdateReplicationFilters(utils.AttributeProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetAttributeProfileDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetChargerProfile is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetChargerProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.ChargerProfile) error {
|
||||
engine.UpdateReplicationFilters(utils.ChargerProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetChargerProfileDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDispatcherProfile is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetDispatcherProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.DispatcherProfile) error {
|
||||
engine.UpdateReplicationFilters(utils.DispatcherProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetDispatcherProfileDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDispatcherHost is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetDispatcherHost(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.DispatcherHost) error {
|
||||
engine.UpdateReplicationFilters(utils.DispatcherHostPrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetDispatcherHostDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetItemLoadIDs is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetItemLoadIDs(ctx *context.Context, itemID *utils.StringWithAPIOpts, reply *map[string]int64) error {
|
||||
engine.UpdateReplicationFilters(utils.LoadIDPrefix, itemID.Arg, utils.IfaceAsString(itemID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetItemLoadIDsDrv(ctx, itemID.Arg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetIndexes is the remote method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) GetIndexes(ctx *context.Context, args *utils.GetIndexesArg, reply *map[string]utils.StringSet) error {
|
||||
engine.UpdateReplicationFilters(utils.CacheInstanceToPrefix[args.IdxItmType], args.TntCtx, utils.IfaceAsString(args.APIOpts[utils.RemoteHostOpt]))
|
||||
indx, err := rplSv1.dm.DataDB().GetIndexesDrv(ctx, args.IdxItmType, args.TntCtx, args.IdxKey, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = indx
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetAccount is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetAccount(ctx *context.Context, acc *utils.AccountWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetAccountDrv(ctx, acc.Account); err != nil {
|
||||
return
|
||||
}
|
||||
// the account doesn't have cache
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetThresholdProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetThresholdProfile(ctx *context.Context, th *engine.ThresholdProfileWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetThresholdProfileDrv(ctx, th.ThresholdProfile); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(th.APIOpts[utils.MetaCache]),
|
||||
th.Tenant, utils.CacheThresholdProfiles, th.TenantID(), &th.FilterIDs, th.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetThreshold is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetThreshold(ctx *context.Context, th *engine.ThresholdWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetThresholdDrv(ctx, th.Threshold); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(th.APIOpts[utils.MetaCache]),
|
||||
th.Tenant, utils.CacheThresholds, th.TenantID(), nil, th.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetStatQueueProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetStatQueueProfile(ctx *context.Context, sq *engine.StatQueueProfileWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetStatQueueProfileDrv(ctx, sq.StatQueueProfile); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(sq.APIOpts[utils.MetaCache]),
|
||||
sq.Tenant, utils.CacheStatQueueProfiles, sq.TenantID(), &sq.FilterIDs, sq.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetStatQueue is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetStatQueue(ctx *context.Context, sq *engine.StatQueueWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetStatQueueDrv(ctx, nil, sq.StatQueue); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(sq.APIOpts[utils.MetaCache]),
|
||||
sq.StatQueue.Tenant, utils.CacheStatQueues, sq.StatQueue.TenantID(), nil, sq.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetFilter is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetFilter(ctx *context.Context, fltr *engine.FilterWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetFilterDrv(ctx, fltr.Filter); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(fltr.APIOpts[utils.MetaCache]),
|
||||
fltr.Tenant, utils.CacheFilters, fltr.TenantID(), nil, fltr.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetResourceProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetResourceProfile(ctx *context.Context, rs *engine.ResourceProfileWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetResourceProfileDrv(ctx, rs.ResourceProfile); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(rs.APIOpts[utils.MetaCache]),
|
||||
rs.Tenant, utils.CacheResourceProfiles, rs.TenantID(), &rs.FilterIDs, rs.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetResource is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetResource(ctx *context.Context, rs *engine.ResourceWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetResourceDrv(ctx, rs.Resource); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(rs.APIOpts[utils.MetaCache]),
|
||||
rs.Tenant, utils.CacheResources, rs.TenantID(), nil, rs.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetRouteProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetRouteProfile(ctx *context.Context, sp *engine.RouteProfileWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetRouteProfileDrv(ctx, sp.RouteProfile); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(sp.APIOpts[utils.MetaCache]),
|
||||
sp.Tenant, utils.CacheRouteProfiles, sp.TenantID(), &sp.FilterIDs, sp.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetAttributeProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetAttributeProfile(ctx *context.Context, ap *engine.AttributeProfileWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetAttributeProfileDrv(ctx, ap.AttributeProfile); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(ap.APIOpts[utils.MetaCache]),
|
||||
ap.Tenant, utils.CacheAttributeProfiles, ap.TenantID(), &ap.FilterIDs, ap.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetChargerProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetChargerProfile(ctx *context.Context, cp *engine.ChargerProfileWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetChargerProfileDrv(ctx, cp.ChargerProfile); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(cp.APIOpts[utils.MetaCache]),
|
||||
cp.Tenant, utils.CacheChargerProfiles, cp.TenantID(), &cp.FilterIDs, cp.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetDispatcherProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetDispatcherProfile(ctx *context.Context, dpp *engine.DispatcherProfileWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetDispatcherProfileDrv(ctx, dpp.DispatcherProfile); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(dpp.APIOpts[utils.MetaCache]),
|
||||
dpp.Tenant, utils.CacheDispatcherProfiles, dpp.TenantID(), &dpp.FilterIDs, dpp.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetDispatcherHost is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetDispatcherHost(ctx *context.Context, dpp *engine.DispatcherHostWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetDispatcherHostDrv(ctx, dpp.DispatcherHost); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(dpp.APIOpts[utils.MetaCache]),
|
||||
dpp.Tenant, utils.CacheDispatcherHosts, dpp.TenantID(), nil, dpp.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetLoadIDs is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetLoadIDs(ctx *context.Context, args *utils.LoadIDsWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetLoadIDsDrv(ctx, args.LoadIDs); err != nil {
|
||||
return
|
||||
}
|
||||
lIDs := make([]string, 0, len(args.LoadIDs))
|
||||
for lID := range args.LoadIDs {
|
||||
lIDs = append(lIDs, lID)
|
||||
}
|
||||
if err = rplSv1.v1.callCacheMultiple(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheLoadIDs, lIDs, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// SetIndexes is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) SetIndexes(ctx *context.Context, args *utils.SetIndexesArg, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetIndexesDrv(ctx, args.IdxItmType, args.TntCtx, args.Indexes, true, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
cIDs := make([]string, 0, len(args.Indexes))
|
||||
for idxKey := range args.Indexes {
|
||||
cIDs = append(cIDs, utils.ConcatenatedKey(args.TntCtx, idxKey))
|
||||
}
|
||||
if err = rplSv1.v1.callCacheMultiple(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, args.IdxItmType, cIDs, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveThreshold is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveThreshold(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemoveThresholdDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheThresholds, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveAccount is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveAccount(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemoveAccountDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
// the account doesn't have cache
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveStatQueue is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveStatQueue(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemStatQueueDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheStatQueues, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveFilter is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveFilter(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemoveFilterDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheFilters, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveThresholdProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveThresholdProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemThresholdProfileDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheThresholdProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveStatQueueProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveStatQueueProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemStatQueueProfileDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheStatQueueProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveResource is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveResource(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemoveResourceDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheResources, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveResourceProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveResourceProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemoveResourceProfileDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheResourceProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveRouteProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveRouteProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemoveRouteProfileDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheRouteProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveAttributeProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveAttributeProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemoveAttributeProfileDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheAttributeProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveChargerProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveChargerProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemoveChargerProfileDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheChargerProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveDispatcherProfile is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveDispatcherProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemoveDispatcherProfileDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheDispatcherProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveDispatcherHost is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveDispatcherHost(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemoveDispatcherHostDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheDispatcherHosts, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveIndexes is the replication method coresponding to the dataDb driver method
|
||||
func (rplSv1 *ReplicatorSv1) RemoveIndexes(ctx *context.Context, args *utils.GetIndexesArg, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemoveIndexesDrv(ctx, args.IdxItmType, args.TntCtx, args.IdxKey); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, args.IdxItmType, utils.ConcatenatedKey(args.TntCtx, args.IdxKey), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
func (rplSv1 *ReplicatorSv1) GetRateProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *utils.RateProfile) error {
|
||||
engine.UpdateReplicationFilters(utils.RateProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetRateProfileDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
func (rplSv1 *ReplicatorSv1) GetActionProfile(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *engine.ActionProfile) error {
|
||||
engine.UpdateReplicationFilters(utils.ActionProfilePrefix, tntID.TenantID.TenantID(), utils.IfaceAsString(tntID.APIOpts[utils.RemoteHostOpt]))
|
||||
rcv, err := rplSv1.dm.DataDB().GetActionProfileDrv(ctx, tntID.Tenant, tntID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *rcv
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rplSv1 *ReplicatorSv1) SetRateProfile(ctx *context.Context, sp *utils.RateProfileWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetRateProfileDrv(ctx, sp.RateProfile); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(sp.APIOpts[utils.MetaCache]),
|
||||
sp.Tenant, utils.CacheRateProfiles, sp.TenantID(), &sp.FilterIDs, sp.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
func (rplSv1 *ReplicatorSv1) SetActionProfile(ctx *context.Context, sp *engine.ActionProfileWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().SetActionProfileDrv(ctx, sp.ActionProfile); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(sp.APIOpts[utils.MetaCache]),
|
||||
sp.Tenant, utils.CacheActionProfiles, sp.TenantID(), &sp.FilterIDs, sp.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
func (rplSv1 *ReplicatorSv1) RemoveRateProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemoveRateProfileDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheRateProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
func (rplSv1 *ReplicatorSv1) RemoveActionProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) {
|
||||
if err = rplSv1.dm.DataDB().RemoveActionProfileDrv(ctx, args.Tenant, args.ID); err != nil {
|
||||
return
|
||||
}
|
||||
if err = rplSv1.v1.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]),
|
||||
args.Tenant, utils.CacheActionProfiles, args.TenantID.TenantID(), nil, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
85
data/conf/samples/loaders_indexes_internal_db/cgrates.json
Normal file
85
data/conf/samples/loaders_indexes_internal_db/cgrates.json
Normal file
@@ -0,0 +1,85 @@
|
||||
{
|
||||
"general": {
|
||||
"node_id" : "IntenalLoaders",
|
||||
"log_level": 7
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":2022",
|
||||
"rpc_gob": ":2023",
|
||||
"http": ":2280"
|
||||
},
|
||||
|
||||
"rpc_conns": {
|
||||
"engine": {
|
||||
"strategy": "*first",
|
||||
"conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}]
|
||||
}
|
||||
},
|
||||
|
||||
"data_db": {
|
||||
"db_type": "*internal",
|
||||
"remote_conns": ["engine"],
|
||||
"replication_conns": ["engine"],
|
||||
"items":{
|
||||
"*accounts": {"remote":true, "replicate":true},
|
||||
"*reverse_destinations": {"remote":true, "replicate":true},
|
||||
"*destinations": {"remote":true, "replicate":true},
|
||||
"*rating_plans": {"remote":true, "replicate":true},
|
||||
"*rating_profiles": {"remote":true, "replicate":true},
|
||||
"*actions": {"remote":true, "replicate":true},
|
||||
"*action_plans": {"remote":true, "replicate":true},
|
||||
"*account_action_plans": {"remote":true, "replicate":true},
|
||||
"*action_triggers": {"remote":true, "replicate":true},
|
||||
"*shared_groups": {"remote":true, "replicate":true},
|
||||
"*timings": {"remote":true, "replicate":true},
|
||||
"*resource_profiles": {"remote":true, "replicate":true},
|
||||
"*resources": {"remote":true, "replicate":true},
|
||||
"*statqueue_profiles": {"remote":true, "replicate":true},
|
||||
"*statqueues": {"remote":true, "replicate":true},
|
||||
"*threshold_profiles": {"remote":true, "replicate":true},
|
||||
"*thresholds": {"remote":true, "replicate":true},
|
||||
"*filters": {"remote":true, "replicate":true},
|
||||
"*route_profiles": {"remote":true, "replicate":true},
|
||||
"*attribute_profiles": {"remote":true, "replicate":true},
|
||||
"*charger_profiles": {"remote":true, "replicate":true},
|
||||
"*dispatcher_profiles": {"remote":true, "replicate":true},
|
||||
"*dispatcher_hosts": {"remote":true, "replicate":true},
|
||||
"*load_ids": {"remote":true, "replicate":true},
|
||||
"*versions": {"remote":true, "replicate":true},
|
||||
"*rate_profiles": {"remote":true, "replicate":true},
|
||||
"*action_profiles": {"remote":true, "replicate":true},
|
||||
// no remote for indexes
|
||||
"*resource_filter_indexes" : {"replicate":true},
|
||||
"*stat_filter_indexes" : {"replicate":true},
|
||||
"*threshold_filter_indexes" : {"replicate":true},
|
||||
"*route_filter_indexes" : {"replicate":true},
|
||||
"*attribute_filter_indexes" : {"replicate":true},
|
||||
"*charger_filter_indexes" : {"replicate":true},
|
||||
"*dispatcher_filter_indexes" : {"replicate":true},
|
||||
"*reverse_filter_indexes" : {"replicate":true},
|
||||
"*rate_profile_filter_indexes" : {"replicate": true},
|
||||
"*rate_filter_indexes" : {"replicate": true},
|
||||
"*action_profile_filter_indexes" : {"replicate": true},
|
||||
"*account_filter_indexes" : {"replicate": true}
|
||||
}
|
||||
},
|
||||
|
||||
"stor_db": {
|
||||
"db_type": "*internal"
|
||||
},
|
||||
|
||||
"loaders": [{
|
||||
"id": "*default",
|
||||
"enabled": true,
|
||||
"caches_conns": ["engine"],
|
||||
"tp_in_dir": "/usr/share/cgrates/tariffplans/tutorial/",
|
||||
"tp_out_dir": ""
|
||||
}],
|
||||
|
||||
"admins": {
|
||||
"enabled": true,
|
||||
"caches_conns":["engine"]
|
||||
}
|
||||
|
||||
}
|
||||
@@ -351,15 +351,15 @@ func (dS *DispatcherService) ReplicatorSv1SetStatQueue(args *engine.StatQueueWit
|
||||
StatQueue: &engine.StatQueue{},
|
||||
}
|
||||
}
|
||||
args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
|
||||
args.StatQueue.Tenant = utils.FirstNonEmpty(args.StatQueue.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
|
||||
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
|
||||
if err = dS.authorize(utils.ReplicatorSv1SetStatQueue, args.Tenant,
|
||||
if err = dS.authorize(utils.ReplicatorSv1SetStatQueue, args.StatQueue.Tenant,
|
||||
utils.IfaceAsString(args.APIOpts[utils.OptsAPIKey])); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return dS.Dispatch(context.TODO(), &utils.CGREvent{
|
||||
Tenant: args.Tenant,
|
||||
Tenant: args.StatQueue.Tenant,
|
||||
APIOpts: args.APIOpts,
|
||||
}, utils.MetaReplicator, utils.ReplicatorSv1SetStatQueue, args, rpl)
|
||||
}
|
||||
@@ -727,23 +727,6 @@ func (dS *DispatcherService) ReplicatorSv1RemoveResourceProfile(args *utils.Tena
|
||||
}, utils.MetaReplicator, utils.ReplicatorSv1RemoveResourceProfile, args, rpl)
|
||||
}
|
||||
|
||||
func (dS *DispatcherService) ReplicatorSv1RemoveActions(args *utils.StringWithAPIOpts, rpl *string) (err error) {
|
||||
if args == nil {
|
||||
args = new(utils.StringWithAPIOpts)
|
||||
}
|
||||
args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
|
||||
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
|
||||
if err = dS.authorize(utils.ReplicatorSv1RemoveActions, args.Tenant,
|
||||
utils.IfaceAsString(args.APIOpts[utils.OptsAPIKey])); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return dS.Dispatch(context.TODO(), &utils.CGREvent{
|
||||
Tenant: args.Tenant,
|
||||
APIOpts: args.APIOpts,
|
||||
}, utils.MetaReplicator, utils.ReplicatorSv1RemoveActions, args, rpl)
|
||||
}
|
||||
|
||||
func (dS *DispatcherService) ReplicatorSv1RemoveRouteProfile(args *utils.TenantIDWithAPIOpts, rpl *string) (err error) {
|
||||
if args == nil {
|
||||
args = &utils.TenantIDWithAPIOpts{
|
||||
|
||||
@@ -757,46 +757,6 @@ func TestDspReplicatorSv1RemoveResourceProfileNilEvent(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDspReplicatorSv1RemoveActionsNil(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
|
||||
cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"}
|
||||
CGREvent := &utils.StringWithAPIOpts{
|
||||
Tenant: "tenant",
|
||||
}
|
||||
var reply *string
|
||||
result := dspSrv.ReplicatorSv1RemoveActions(CGREvent, reply)
|
||||
expected := "MANDATORY_IE_MISSING: [ApiKey]"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDspReplicatorSv1RemoveActionsErrorNil(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
|
||||
CGREvent := &utils.StringWithAPIOpts{
|
||||
Tenant: "tenant",
|
||||
}
|
||||
var reply *string
|
||||
result := dspSrv.ReplicatorSv1RemoveActions(CGREvent, reply)
|
||||
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDspReplicatorSv1RemoveActionsEvent(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
|
||||
var reply *string
|
||||
result := dspSrv.ReplicatorSv1RemoveActions(nil, reply)
|
||||
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
|
||||
if result == nil || result.Error() != expected {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDspReplicatorSv1RemoveRouteProfileNil(t *testing.T) {
|
||||
cgrCfg := config.NewDefaultCGRConfig()
|
||||
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
|
||||
|
||||
@@ -127,8 +127,8 @@ type StoredStatQueue struct {
|
||||
}
|
||||
|
||||
type StatQueueWithAPIOpts struct {
|
||||
*StatQueue
|
||||
APIOpts map[string]interface{}
|
||||
StatQueue *StatQueue
|
||||
APIOpts map[string]interface{}
|
||||
}
|
||||
|
||||
// SqID will compose the unique identifier for the StatQueue out of Tenant and ID
|
||||
@@ -395,9 +395,6 @@ func (sis StatQueues) Sort() {
|
||||
}
|
||||
|
||||
func (sq *StatQueue) MarshalJSON() (rply []byte, err error) {
|
||||
if sq == nil {
|
||||
return []byte("null"), nil
|
||||
}
|
||||
type tmp StatQueue
|
||||
sq.lock(utils.EmptyString)
|
||||
rply, err = json.Marshal(tmp(*sq))
|
||||
@@ -473,15 +470,23 @@ func (sq *StatQueue) UnmarshalJSON(data []byte) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (sq *StatQueue) GobEncode() (rply []byte, err error) {
|
||||
type sqEncode StatQueue
|
||||
|
||||
func (sq StatQueue) GobEncode() (rply []byte, err error) {
|
||||
buf := bytes.NewBuffer(rply)
|
||||
type tmp StatQueue
|
||||
sq.lock(utils.EmptyString)
|
||||
err = gob.NewEncoder(buf).Encode(tmp(*sq))
|
||||
err = gob.NewEncoder(buf).Encode(sqEncode(sq))
|
||||
sq.unlock()
|
||||
return buf.Bytes(), err
|
||||
}
|
||||
|
||||
func (sq *StatQueue) GobDecode(rply []byte) (err error) {
|
||||
buf := bytes.NewBuffer(rply)
|
||||
var eSq sqEncode
|
||||
err = gob.NewDecoder(buf).Decode(&eSq)
|
||||
*sq = StatQueue(eSq)
|
||||
return err
|
||||
}
|
||||
func (sq *StatQueue) Clone() (cln *StatQueue) {
|
||||
cln = &StatQueue{
|
||||
Tenant: sq.Tenant,
|
||||
@@ -511,30 +516,13 @@ func (ssq *StatQueueWithAPIOpts) MarshalJSON() (rply []byte, err error) {
|
||||
StatQueue
|
||||
APIOpts map[string]interface{}
|
||||
}
|
||||
ssq.lock(utils.EmptyString)
|
||||
rply, err = json.Marshal(tmp{
|
||||
StatQueue: *ssq.StatQueue,
|
||||
APIOpts: ssq.APIOpts,
|
||||
})
|
||||
ssq.unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (ssq *StatQueueWithAPIOpts) GobEncode() (rply []byte, err error) {
|
||||
buf := bytes.NewBuffer(rply)
|
||||
type tmp struct {
|
||||
StatQueue
|
||||
APIOpts map[string]interface{}
|
||||
}
|
||||
ssq.lock(utils.EmptyString)
|
||||
err = gob.NewEncoder(buf).Encode(tmp{
|
||||
StatQueue: *ssq.StatQueue,
|
||||
APIOpts: ssq.APIOpts,
|
||||
})
|
||||
ssq.unlock()
|
||||
return buf.Bytes(), err
|
||||
}
|
||||
|
||||
// UnmarshalJSON here only to fully support json for StatQueue
|
||||
func (ssq *StatQueueWithAPIOpts) UnmarshalJSON(data []byte) (err error) {
|
||||
sq := new(StatQueue)
|
||||
|
||||
@@ -1295,6 +1295,7 @@ func TestStatQueueWithAPIOptsJSONMarshall(t *testing.T) {
|
||||
if err = json.Unmarshal([]byte(utils.ToJSON(exp2)), rply); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !reflect.DeepEqual(rply, exp2) {
|
||||
t.Errorf("Expected: %+v , received: %+v", exp2, rply)
|
||||
t.Errorf("Expected: %s , received: %s", utils.ToJSON(exp2), utils.ToJSON(rply))
|
||||
}
|
||||
|
||||
|
||||
@@ -111,7 +111,6 @@ func (asr *StatASR) AddEvent(evID string, ev utils.DataProvider) error {
|
||||
}
|
||||
|
||||
func (asr *StatASR) RemEvent(evID string) (err error) {
|
||||
|
||||
val, has := asr.Events[evID]
|
||||
if !has {
|
||||
return utils.ErrNotFound
|
||||
|
||||
192
general_tests/loaders_internal_indexes_it_test.go
Normal file
192
general_tests/loaders_internal_indexes_it_test.go
Normal file
@@ -0,0 +1,192 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package general_tests
|
||||
|
||||
import (
|
||||
"path"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/loaders"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
loadersIDBIdxCfgDir string
|
||||
loadersIDBIdxCfgPath string
|
||||
loadersIDBIdxCfgPathInternal = path.Join(*dataDir, "conf", "samples", "loaders_indexes_internal_db")
|
||||
loadersIDBIdxCfg, loadersIDBIdxCfgInternal *config.CGRConfig
|
||||
loadersIDBIdxRPC, loadersIDBIdxRPCInternal *birpc.Client
|
||||
|
||||
LoadersIDBIdxTests = []func(t *testing.T){
|
||||
testLoadersIDBIdxItLoadConfig,
|
||||
testLoadersIDBIdxItDB,
|
||||
testLoadersIDBIdxItStartEngines,
|
||||
testLoadersIDBIdxItRPCConn,
|
||||
testLoadersIDBIdxItLoad,
|
||||
testLoadersIDBIdxCheckAttributes,
|
||||
testLoadersIDBIdxCheckAttributesIndexes,
|
||||
testLoadersIDBIdxItStopCgrEngine,
|
||||
}
|
||||
)
|
||||
|
||||
func TestLoadersIDBIdxIt(t *testing.T) {
|
||||
switch *dbType {
|
||||
case utils.MetaInternal:
|
||||
loadersIDBIdxCfgDir = "tutinternal"
|
||||
case utils.MetaMySQL:
|
||||
loadersIDBIdxCfgDir = "tutmysql"
|
||||
case utils.MetaMongo:
|
||||
loadersIDBIdxCfgDir = "tutmongo"
|
||||
case utils.MetaPostgres:
|
||||
t.SkipNow()
|
||||
default:
|
||||
t.Fatal("Unknown Database type")
|
||||
}
|
||||
for _, stest := range LoadersIDBIdxTests {
|
||||
t.Run(loadersIDBIdxCfgDir, stest)
|
||||
}
|
||||
}
|
||||
|
||||
func testLoadersIDBIdxItLoadConfig(t *testing.T) {
|
||||
loadersIDBIdxCfgPath = path.Join(*dataDir, "conf", "samples", loadersIDBIdxCfgDir)
|
||||
if loadersIDBIdxCfg, err = config.NewCGRConfigFromPath(context.Background(), loadersIDBIdxCfgPath); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if loadersIDBIdxCfgInternal, err = config.NewCGRConfigFromPath(context.Background(), loadersIDBIdxCfgPathInternal); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testLoadersIDBIdxItDB(t *testing.T) {
|
||||
if err := engine.InitDataDB(loadersIDBIdxCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := engine.InitStorDB(loadersIDBIdxCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testLoadersIDBIdxItStartEngines(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(loadersIDBIdxCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := engine.StartEngine(loadersIDBIdxCfgPathInternal, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testLoadersIDBIdxItRPCConn(t *testing.T) {
|
||||
var err error
|
||||
if loadersIDBIdxRPC, err = newRPCClient(loadersIDBIdxCfg.ListenCfg()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if loadersIDBIdxRPCInternal, err = newRPCClient(loadersIDBIdxCfgInternal.ListenCfg()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testLoadersIDBIdxItLoad(t *testing.T) {
|
||||
var reply string
|
||||
if err := loadersIDBIdxRPCInternal.Call(context.Background(), utils.LoaderSv1Run,
|
||||
&loaders.ArgsProcessFolder{
|
||||
APIOpts: map[string]interface{}{
|
||||
utils.MetaStopOnError: false,
|
||||
utils.MetaCache: utils.MetaReload,
|
||||
},
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Error("Unexpected reply returned:", reply)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
func testLoadersIDBIdxCheckAttributes(t *testing.T) {
|
||||
exp := &engine.APIAttributeProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "ATTR_1001_SIMPLEAUTH",
|
||||
FilterIDs: []string{"*string:~*opts.*context:simpleauth", "*string:~*req.Account:1001"},
|
||||
Attributes: []*engine.ExternalAttribute{{
|
||||
Path: utils.MetaReq + utils.NestingSep + "Password",
|
||||
Type: utils.MetaConstant,
|
||||
Value: "CGRateS.org",
|
||||
}},
|
||||
Weight: 20.0,
|
||||
}
|
||||
|
||||
var reply *engine.APIAttributeProfile
|
||||
if err := loadersIDBIdxRPC.Call(context.Background(), utils.AdminSv1GetAttributeProfile,
|
||||
&utils.TenantIDWithAPIOpts{
|
||||
TenantID: &utils.TenantID{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "ATTR_1001_SIMPLEAUTH",
|
||||
},
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(exp, reply) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(reply))
|
||||
}
|
||||
}
|
||||
|
||||
func testLoadersIDBIdxCheckAttributesIndexes(t *testing.T) {
|
||||
expIdx := []string{
|
||||
"*string:*opts.*context:*sessions:ATTR_1001_SESSIONAUTH",
|
||||
"*string:*opts.*context:*sessions:ATTR_1002_SESSIONAUTH",
|
||||
"*string:*opts.*context:*sessions:ATTR_1003_SESSIONAUTH",
|
||||
"*string:*opts.*context:simpleauth:ATTR_1001_SIMPLEAUTH",
|
||||
"*string:*opts.*context:simpleauth:ATTR_1002_SIMPLEAUTH",
|
||||
"*string:*opts.*context:simpleauth:ATTR_1003_SIMPLEAUTH",
|
||||
"*string:*req.Account:1001:ATTR_1001_SESSIONAUTH",
|
||||
"*string:*req.Account:1001:ATTR_1001_SIMPLEAUTH",
|
||||
"*string:*req.Account:1002:ATTR_1002_SESSIONAUTH",
|
||||
"*string:*req.Account:1002:ATTR_1002_SIMPLEAUTH",
|
||||
"*string:*req.Account:1003:ATTR_1003_SESSIONAUTH",
|
||||
"*string:*req.Account:1003:ATTR_1003_SIMPLEAUTH",
|
||||
"*string:*req.SubscriberId:1006:ATTR_ACC_ALIAS",
|
||||
}
|
||||
var indexes []string
|
||||
if err := loadersIDBIdxRPC.Call(context.Background(), utils.AdminSv1GetFilterIndexes,
|
||||
&apis.AttrGetFilterIndexes{
|
||||
ItemType: utils.MetaAttributes,
|
||||
Tenant: "cgrates.org",
|
||||
FilterType: utils.MetaString,
|
||||
Context: "simpleauth",
|
||||
}, &indexes); err != nil {
|
||||
t.Error(err)
|
||||
} else if sort.Strings(indexes); !reflect.DeepEqual(indexes, expIdx) {
|
||||
t.Errorf("Expecting: %+v, received: %+v",
|
||||
utils.ToJSON(expIdx), utils.ToJSON(indexes))
|
||||
}
|
||||
}
|
||||
|
||||
func testLoadersIDBIdxItStopCgrEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(100); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
@@ -103,6 +103,10 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
|
||||
for _, s := range srv {
|
||||
apiService.server.RpcRegister(s)
|
||||
}
|
||||
rpl, _ := engine.NewService(apis.NewReplicatorSv1(datadb, apiService.api))
|
||||
for _, s := range rpl {
|
||||
apiService.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
|
||||
//backwards compatible
|
||||
|
||||
@@ -1064,7 +1064,6 @@ const (
|
||||
ReplicatorSv1GetStatQueueProfile = "ReplicatorSv1.GetStatQueueProfile"
|
||||
ReplicatorSv1GetResource = "ReplicatorSv1.GetResource"
|
||||
ReplicatorSv1GetResourceProfile = "ReplicatorSv1.GetResourceProfile"
|
||||
ReplicatorSv1GetActions = "ReplicatorSv1.GetActions"
|
||||
ReplicatorSv1GetRouteProfile = "ReplicatorSv1.GetRouteProfile"
|
||||
ReplicatorSv1GetAttributeProfile = "ReplicatorSv1.GetAttributeProfile"
|
||||
ReplicatorSv1GetChargerProfile = "ReplicatorSv1.GetChargerProfile"
|
||||
@@ -1081,7 +1080,6 @@ const (
|
||||
ReplicatorSv1SetStatQueueProfile = "ReplicatorSv1.SetStatQueueProfile"
|
||||
ReplicatorSv1SetResource = "ReplicatorSv1.SetResource"
|
||||
ReplicatorSv1SetResourceProfile = "ReplicatorSv1.SetResourceProfile"
|
||||
ReplicatorSv1SetActions = "ReplicatorSv1.SetActions"
|
||||
ReplicatorSv1SetRouteProfile = "ReplicatorSv1.SetRouteProfile"
|
||||
ReplicatorSv1SetAttributeProfile = "ReplicatorSv1.SetAttributeProfile"
|
||||
ReplicatorSv1SetChargerProfile = "ReplicatorSv1.SetChargerProfile"
|
||||
@@ -1099,7 +1097,6 @@ const (
|
||||
ReplicatorSv1RemoveStatQueueProfile = "ReplicatorSv1.RemoveStatQueueProfile"
|
||||
ReplicatorSv1RemoveResource = "ReplicatorSv1.RemoveResource"
|
||||
ReplicatorSv1RemoveResourceProfile = "ReplicatorSv1.RemoveResourceProfile"
|
||||
ReplicatorSv1RemoveActions = "ReplicatorSv1.RemoveActions"
|
||||
ReplicatorSv1RemoveRouteProfile = "ReplicatorSv1.RemoveRouteProfile"
|
||||
ReplicatorSv1RemoveAttributeProfile = "ReplicatorSv1.RemoveAttributeProfile"
|
||||
ReplicatorSv1RemoveChargerProfile = "ReplicatorSv1.RemoveChargerProfile"
|
||||
|
||||
Reference in New Issue
Block a user