Update SetThresholdProfile in case of Replication

This commit is contained in:
TeoV
2019-11-18 04:02:36 -05:00
committed by Dan Christian Bogos
parent 25bc80458d
commit 3e3cc70882
5 changed files with 137 additions and 54 deletions

View File

@@ -25,6 +25,7 @@ import (
"net/rpc/jsonrpc"
"path"
"reflect"
"sort"
"testing"
"time"
@@ -566,6 +567,21 @@ func testInternalReplicationSetThreshold(t *testing.T) {
err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
//verify indexes on engine2 before adding new threshold profile
var indexes []string
expectedIDX := []string{"*string:~Account:1001:THD_ACNT_1001",
"*string:~Account:1002:THD_ACNT_1002"}
// verify index on engine2
if err := engineTwoRPC.Call("ApierV1.GetFilterIndexes", &AttrGetFilterIndexes{
ItemType: utils.MetaThresholds, Tenant: "cgrates.org", FilterType: utils.MetaString},
&indexes); err != nil {
t.Error(err)
}
sort.Strings(indexes)
if !reflect.DeepEqual(expectedIDX, indexes) {
t.Errorf("Expecting: %+v, received: %+v",
expectedIDX, utils.ToJSON(indexes))
}
tPrfl := &ThresholdWithCache{
ThresholdProfile: &engine.ThresholdProfile{
Tenant: "cgrates.org",
@@ -587,55 +603,68 @@ func testInternalReplicationSetThreshold(t *testing.T) {
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
//
//if err := internalRPC.Call("ApierV1.GetThresholdProfile",
// &utils.TenantID{Tenant: "cgrates.org", ID: "THD_Replication"}, &reply); err != nil {
// t.Error(err)
//} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, reply) {
// t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, reply)
//}
//// make sure was set on engine1
//expectedIDX = []string{"*string:~Account:1001:THD_Replication",
// "*string:~CustomField:CustomValue:THD_Replication"}
//// verify index on internal
//sort.Strings(expectedIDX)
//if err := internalRPC.Call("ApierV1.GetFilterIndexes", &AttrGetFilterIndexes{
// ItemType: utils.MetaThresholds, Tenant: "cgrates.org", FilterType: utils.MetaString},
// &indexes); err != nil {
// t.Error(err)
//}
//sort.Strings(indexes)
//if !reflect.DeepEqual(expectedIDX, indexes) {
// t.Errorf("Expecting: %+v, received: %+v",
// expectedIDX, utils.ToJSON(indexes))
//}
//// verify data on engine1
//if err := engineOneRPC.Call("ApierV1.GetThresholdProfile",
// &utils.TenantID{Tenant: "cgrates.org", ID: "THD_Replication"}, &reply); err != nil {
// t.Error(err)
//} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, reply) {
// t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, reply)
//}
//// make sure was set on engine2
//// verify indexes on engine1 (should be the same as internal)
//if err := engineOneRPC.Call("ApierV1.GetFilterIndexes", &AttrGetFilterIndexes{
// ItemType: utils.MetaThresholds, Tenant: "cgrates.org", FilterType: utils.MetaString},
// &indexes); err != nil {
// t.Error(err)
//}
//sort.Strings(indexes)
//if !reflect.DeepEqual(expectedIDX, indexes) {
// t.Errorf("Expecting: %+v, received: %+v",
// expectedIDX, utils.ToJSON(indexes))
//}
//// verify data on engine2
//if err := engineTwoRPC.Call("ApierV1.GetThresholdProfile",
// &utils.TenantID{Tenant: "cgrates.org", ID: "THD_Replication"}, &reply); err != nil {
// t.Error(err)
//} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, reply) {
// t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, reply)
//}
//
////// verify threshold profile in replication dataDB
////if rcv, err := rmtDM.GetThresholdProfile("cgrates.org", "THD_Replication",
//// false, false, utils.NonTransactional); err != nil {
//// t.Error(err)
////} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, rcv) {
//// t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, rcv)
////}
//////
////eIdxes := map[string]utils.StringMap{
//// "*string:~Account:1001": {
//// "THD_ACNT_1001": true,
//// "THD_Replication": true,
//// },
//// "*string:~Account:1002": {
//// "THD_ACNT_1002": true,
//// },
//// "*string:~CustomField:CustomValue": {
//// "THD_Replication": true,
//// },
////}
////if rcvIdx, err := rmtDM.GetFilterIndexes(
//// utils.PrefixToIndexCache[utils.ThresholdProfilePrefix], tPrfl.Tenant,
//// utils.EmptyString, nil); err != nil {
//// t.Error(err)
////} else if !reflect.DeepEqual(eIdxes, rcvIdx) {
//// t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx)
////}
//expectedIDX = []string{"*string:~Account:1001:THD_ACNT_1001",
// "*string:~Account:1001:THD_Replication",
// "*string:~Account:1001:THD_ACNT_1002",
// "*string:~CustomField:CustomValue:THD_Replication"}
//// check if indexes was created correctly on engine2
//if err := engineTwoRPC.Call("ApierV1.GetFilterIndexes", &AttrGetFilterIndexes{
// ItemType: utils.MetaThresholds, Tenant: "cgrates.org", FilterType: utils.MetaString},
// &indexes); err != nil {
// t.Error(err)
//}
//sort.Strings(indexes)
//if !reflect.DeepEqual(expectedIDX, indexes) {
// t.Errorf("Expecting: %+v,\n received: %+v",
// expectedIDX, utils.ToJSON(indexes))
//}
}
func testInternalRemoteITKillEngine(t *testing.T) {

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"fmt"
"time"
"github.com/cgrates/cgrates/engine"
@@ -67,8 +66,6 @@ func (apierV1 *ApierV1) GetThresholdProfile(arg *utils.TenantID, reply *engine.T
return utils.NewErrMandatoryIeMissing(missing...)
}
if th, err := apierV1.DataManager.GetThresholdProfile(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
utils.Logger.Debug(fmt.Sprintf("API RTURNER IN ERR : %+v", err))
utils.Logger.Debug(fmt.Sprintf("API RTURNER IN ERR : %+v", utils.APIErrorHandler(err)))
return utils.APIErrorHandler(err)
} else {
*reply = *th

View File

@@ -478,6 +478,12 @@ func (dm *DataManager) SetThreshold(th *Threshold) (err error) {
if err = dm.DataDB().SetThresholdDrv(th); err != nil {
return
}
if dm.rplConns != nil {
var reply string
if err = dm.rplConns.Call("ReplicatorSv1.SetThreshold", th, &reply); err != nil {
return
}
}
return
}
@@ -505,6 +511,7 @@ func (dm *DataManager) GetThresholdProfile(tenant, id string, cacheRead, cacheWr
err = dm.rmtConns.Call(utils.ReplicatorSv1GetThresholdProfile,
&utils.TenantID{Tenant: tenant, ID: id}, &th)
}
err = utils.CastRPCErrToErr(err)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheThresholdProfiles, tntID, nil, nil,
@@ -1137,17 +1144,17 @@ func (dm *DataManager) HasData(category, subject, tenant string) (has bool, err
func (dm *DataManager) GetFilterIndexes(cacheID, itemIDPrefix, filterType string,
fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) {
if indexes, err = dm.DataDB().GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType, fldNameVal); err != nil {
//if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 {
// var rmtErr error
// for _, rmtDM := range dm.rmtDataDBs {
// if indexes, rmtErr = rmtDM.GetFilterIndexes(cacheID, itemIDPrefix,
// filterType, fldNameVal); rmtErr == nil {
// break
// }
// }
// err = rmtErr
//}
if err == utils.ErrNotFound && dm.rmtConns != nil {
err = dm.rmtConns.Call(utils.ReplicatorSv1GetFilterIndexes,
&utils.GetFilterIndexesArg{
CacheID: cacheID,
ItemIDPrefix: itemIDPrefix,
FilterType: filterType,
FldNameVal: fldNameVal,
}, &indexes)
}
if err != nil {
err = utils.CastRPCErrToErr(err)
return nil, err
}
}
@@ -1160,6 +1167,17 @@ func (dm *DataManager) SetFilterIndexes(cacheID, itemIDPrefix string,
indexes, commit, transactionID); err != nil {
return
}
if dm.rplConns != nil {
var reply string
if err = dm.rplConns.Call("ReplicatorSv1.SetFilterIndexes",
&utils.SetFilterIndexesArg{
CacheID: cacheID,
ItemIDPrefix: itemIDPrefix,
Indexes: indexes,
}, &reply); err != nil {
return
}
}
return
}
@@ -1238,6 +1256,7 @@ func (dm *DataManager) GetSupplierProfile(tenant, id string, cacheRead, cacheWri
&utils.TenantID{Tenant: tenant, ID: id}, &supp)
}
if err != nil {
err = utils.CastRPCErrToErr(err)
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheSupplierProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)

View File

@@ -1016,3 +1016,11 @@ type SetFilterIndexesArg struct {
ItemIDPrefix string
Indexes map[string]StringMap
}
func CastRPCErrToErr(err error) error {
if _, has := ErrMap[err.Error()]; has {
return ErrMap[err.Error()]
} else {
return err
}
}

View File

@@ -44,7 +44,6 @@ var (
ErrRatingPlanNotFound = errors.New("RATING_PLAN_NOT_FOUND")
ErrAccountNotFound = errors.New("ACCOUNT_NOT_FOUND")
ErrAccountDisabled = errors.New("ACCOUNT_DISABLED")
ErrUserNotFound = errors.New("USER_NOT_FOUND")
ErrInsufficientCredit = errors.New("INSUFFICIENT_CREDIT")
ErrNotConvertible = errors.New("NOT_CONVERTIBLE")
ErrResourceUnavailable = errors.New("RESOURCE_UNAVAILABLE")
@@ -52,23 +51,15 @@ var (
ErrNoActiveSession = errors.New("NO_ACTIVE_SESSION")
ErrPartiallyExecuted = errors.New("PARTIALLY_EXECUTED")
ErrMaxUsageExceeded = errors.New("MAX_USAGE_EXCEEDED")
ErrUnallocatedResource = errors.New("UNALLOCATED_RESOURCE")
ErrNotFoundNoCaps = errors.New("not found")
ErrFilterNotPassingNoCaps = errors.New("filter not passing")
ErrNotConvertibleNoCaps = errors.New("not convertible")
ErrMandatoryIeMissingNoCaps = errors.New("mandatory information missing")
ErrUnauthorizedApi = errors.New("UNAUTHORIZED_API")
ErrUnknownApiKey = errors.New("UNKNOWN_API_KEY")
ErrIncompatible = errors.New("INCOMPATIBLE")
ErrReqUnsynchronized = errors.New("REQ_UNSYNCHRONIZED")
ErrUnsupporteServiceMethod = errors.New("UNSUPPORTED_SERVICE_METHOD")
ErrWrongArgsType = errors.New("WRONG_ARGS_TYPE")
ErrWrongReplyType = errors.New("WRONG_REPLY_TYPE")
ErrDisconnected = errors.New("DISCONNECTED")
ErrReplyTimeout = errors.New("REPLY_TIMEOUT")
ErrFailedReconnect = errors.New("FAILED_RECONNECT")
ErrInternallyDisconnected = errors.New("INTERNALLY_DISCONNECTED")
ErrUnsupportedCodec = errors.New("UNSUPPORTED_CODEC")
ErrSessionNotFound = errors.New("SESSION_NOT_FOUND")
ErrJsonIncompleteComment = errors.New("JSON_INCOMPLETE_COMMENT")
ErrCDRCNoProfileID = errors.New("CDRC_PROFILE_WITHOUT_ID")
@@ -80,6 +71,45 @@ var (
ErrUnsupportedFormat = errors.New("UNSUPPORTED_FORMAT")
ErrNoDatabaseConn = errors.New("NO_DATA_BASE_CONNECTION")
ErrMaxIncrementsExceeded = errors.New("MAX_INCREMENTS_EXCEEDED")
ErrMap = map[string]error{
ErrNoMoreData.Error(): ErrNoMoreData,
ErrNotImplemented.Error(): ErrNotImplemented,
ErrNotFound.Error(): ErrNotFound,
ErrTimedOut.Error(): ErrTimedOut,
ErrServerError.Error(): ErrServerError,
ErrMaxRecursionDepth.Error(): ErrMaxRecursionDepth,
ErrExists.Error(): ErrExists,
ErrBrokenReference.Error(): ErrBrokenReference,
ErrParserError.Error(): ErrParserError,
ErrInvalidPath.Error(): ErrInvalidPath,
ErrInvalidKey.Error(): ErrInvalidKey,
ErrUnauthorizedDestination.Error(): ErrUnauthorizedDestination,
ErrRatingPlanNotFound.Error(): ErrRatingPlanNotFound,
ErrInsufficientCredit.Error(): ErrInsufficientCredit,
ErrNotConvertible.Error(): ErrNotConvertible,
ErrResourceUnavailable.Error(): ErrResourceUnavailable,
ErrResourceUnauthorized.Error(): ErrResourceUnauthorized,
ErrNoActiveSession.Error(): ErrNoActiveSession,
ErrPartiallyExecuted.Error(): ErrPartiallyExecuted,
ErrMaxUsageExceeded.Error(): ErrMaxUsageExceeded,
ErrFilterNotPassingNoCaps.Error(): ErrFilterNotPassingNoCaps,
ErrNotConvertibleNoCaps.Error(): ErrNotConvertibleNoCaps,
ErrUnauthorizedApi.Error(): ErrUnauthorizedApi,
ErrUnknownApiKey.Error(): ErrUnknownApiKey,
ErrReqUnsynchronized.Error(): ErrReqUnsynchronized,
ErrUnsupporteServiceMethod.Error(): ErrUnsupporteServiceMethod,
ErrDisconnected.Error(): ErrDisconnected,
ErrReplyTimeout.Error(): ErrReplyTimeout,
ErrSessionNotFound.Error(): ErrSessionNotFound,
ErrJsonIncompleteComment.Error(): ErrJsonIncompleteComment,
ErrCDRCNoProfileID.Error(): ErrCDRCNoProfileID,
ErrCDRCNoInPath.Error(): ErrCDRCNoInPath,
ErrNotEnoughParameters.Error(): ErrNotEnoughParameters,
ErrUnsupportedFormat.Error(): ErrUnsupportedFormat,
ErrNoDatabaseConn.Error(): ErrNoDatabaseConn,
ErrMaxIncrementsExceeded.Error(): ErrMaxIncrementsExceeded,
}
)
// NewCGRError initialises a new CGRError
@@ -224,4 +254,4 @@ func ErrPathNotReachable(path string) error {
func ErrNotConvertibleTF(from, to string) error {
return fmt.Errorf("%s : from: %s to:%s", ErrNotConvertibleNoCaps.Error(), from, to)
}
}