From 3e3cc708827694f7a842a152b5ddf0bff97f49e6 Mon Sep 17 00:00:00 2001 From: TeoV Date: Mon, 18 Nov 2019 04:02:36 -0500 Subject: [PATCH] Update SetThresholdProfile in case of Replication --- apier/v1/dm_remote_it_test.go | 91 +++++++++++++++++++++++------------ apier/v1/thresholds.go | 3 -- engine/datamanager.go | 39 +++++++++++---- utils/coreutils.go | 8 +++ utils/errors.go | 50 +++++++++++++++---- 5 files changed, 137 insertions(+), 54 deletions(-) diff --git a/apier/v1/dm_remote_it_test.go b/apier/v1/dm_remote_it_test.go index 43a9bf5c8..d6030fffa 100644 --- a/apier/v1/dm_remote_it_test.go +++ b/apier/v1/dm_remote_it_test.go @@ -25,6 +25,7 @@ import ( "net/rpc/jsonrpc" "path" "reflect" + "sort" "testing" "time" @@ -566,6 +567,21 @@ func testInternalReplicationSetThreshold(t *testing.T) { err.Error() != utils.ErrNotFound.Error() { t.Error(err) } + //verify indexes on engine2 before adding new threshold profile + var indexes []string + expectedIDX := []string{"*string:~Account:1001:THD_ACNT_1001", + "*string:~Account:1002:THD_ACNT_1002"} + // verify index on engine2 + if err := engineTwoRPC.Call("ApierV1.GetFilterIndexes", &AttrGetFilterIndexes{ + ItemType: utils.MetaThresholds, Tenant: "cgrates.org", FilterType: utils.MetaString}, + &indexes); err != nil { + t.Error(err) + } + sort.Strings(indexes) + if !reflect.DeepEqual(expectedIDX, indexes) { + t.Errorf("Expecting: %+v, received: %+v", + expectedIDX, utils.ToJSON(indexes)) + } tPrfl := &ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ Tenant: "cgrates.org", @@ -587,55 +603,68 @@ func testInternalReplicationSetThreshold(t *testing.T) { } else if result != utils.OK { t.Error("Unexpected reply returned", result) } - // + //if err := internalRPC.Call("ApierV1.GetThresholdProfile", // &utils.TenantID{Tenant: "cgrates.org", ID: "THD_Replication"}, &reply); err != nil { // t.Error(err) //} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, reply) { // t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, reply) //} - //// make sure was set on engine1 + //expectedIDX = []string{"*string:~Account:1001:THD_Replication", + // "*string:~CustomField:CustomValue:THD_Replication"} + //// verify index on internal + //sort.Strings(expectedIDX) + //if err := internalRPC.Call("ApierV1.GetFilterIndexes", &AttrGetFilterIndexes{ + // ItemType: utils.MetaThresholds, Tenant: "cgrates.org", FilterType: utils.MetaString}, + // &indexes); err != nil { + // t.Error(err) + //} + //sort.Strings(indexes) + //if !reflect.DeepEqual(expectedIDX, indexes) { + // t.Errorf("Expecting: %+v, received: %+v", + // expectedIDX, utils.ToJSON(indexes)) + //} + //// verify data on engine1 //if err := engineOneRPC.Call("ApierV1.GetThresholdProfile", // &utils.TenantID{Tenant: "cgrates.org", ID: "THD_Replication"}, &reply); err != nil { // t.Error(err) //} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, reply) { // t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, reply) //} - //// make sure was set on engine2 + //// verify indexes on engine1 (should be the same as internal) + //if err := engineOneRPC.Call("ApierV1.GetFilterIndexes", &AttrGetFilterIndexes{ + // ItemType: utils.MetaThresholds, Tenant: "cgrates.org", FilterType: utils.MetaString}, + // &indexes); err != nil { + // t.Error(err) + //} + //sort.Strings(indexes) + //if !reflect.DeepEqual(expectedIDX, indexes) { + // t.Errorf("Expecting: %+v, received: %+v", + // expectedIDX, utils.ToJSON(indexes)) + //} + //// verify data on engine2 //if err := engineTwoRPC.Call("ApierV1.GetThresholdProfile", // &utils.TenantID{Tenant: "cgrates.org", ID: "THD_Replication"}, &reply); err != nil { // t.Error(err) //} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, reply) { // t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, reply) //} - // - ////// verify threshold profile in replication dataDB - ////if rcv, err := rmtDM.GetThresholdProfile("cgrates.org", "THD_Replication", - //// false, false, utils.NonTransactional); err != nil { - //// t.Error(err) - ////} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, rcv) { - //// t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, rcv) - ////} - ////// - ////eIdxes := map[string]utils.StringMap{ - //// "*string:~Account:1001": { - //// "THD_ACNT_1001": true, - //// "THD_Replication": true, - //// }, - //// "*string:~Account:1002": { - //// "THD_ACNT_1002": true, - //// }, - //// "*string:~CustomField:CustomValue": { - //// "THD_Replication": true, - //// }, - ////} - ////if rcvIdx, err := rmtDM.GetFilterIndexes( - //// utils.PrefixToIndexCache[utils.ThresholdProfilePrefix], tPrfl.Tenant, - //// utils.EmptyString, nil); err != nil { - //// t.Error(err) - ////} else if !reflect.DeepEqual(eIdxes, rcvIdx) { - //// t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) - ////} + //expectedIDX = []string{"*string:~Account:1001:THD_ACNT_1001", + // "*string:~Account:1001:THD_Replication", + // "*string:~Account:1001:THD_ACNT_1002", + // "*string:~CustomField:CustomValue:THD_Replication"} + //// check if indexes was created correctly on engine2 + //if err := engineTwoRPC.Call("ApierV1.GetFilterIndexes", &AttrGetFilterIndexes{ + // ItemType: utils.MetaThresholds, Tenant: "cgrates.org", FilterType: utils.MetaString}, + // &indexes); err != nil { + // t.Error(err) + //} + //sort.Strings(indexes) + //if !reflect.DeepEqual(expectedIDX, indexes) { + // t.Errorf("Expecting: %+v,\n received: %+v", + // expectedIDX, utils.ToJSON(indexes)) + //} + } func testInternalRemoteITKillEngine(t *testing.T) { diff --git a/apier/v1/thresholds.go b/apier/v1/thresholds.go index 6cbb3b3de..592e37b20 100644 --- a/apier/v1/thresholds.go +++ b/apier/v1/thresholds.go @@ -19,7 +19,6 @@ along with this program. If not, see package v1 import ( - "fmt" "time" "github.com/cgrates/cgrates/engine" @@ -67,8 +66,6 @@ func (apierV1 *ApierV1) GetThresholdProfile(arg *utils.TenantID, reply *engine.T return utils.NewErrMandatoryIeMissing(missing...) } if th, err := apierV1.DataManager.GetThresholdProfile(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil { - utils.Logger.Debug(fmt.Sprintf("API RTURNER IN ERR : %+v", err)) - utils.Logger.Debug(fmt.Sprintf("API RTURNER IN ERR : %+v", utils.APIErrorHandler(err))) return utils.APIErrorHandler(err) } else { *reply = *th diff --git a/engine/datamanager.go b/engine/datamanager.go index 99f20b348..225fef152 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -478,6 +478,12 @@ func (dm *DataManager) SetThreshold(th *Threshold) (err error) { if err = dm.DataDB().SetThresholdDrv(th); err != nil { return } + if dm.rplConns != nil { + var reply string + if err = dm.rplConns.Call("ReplicatorSv1.SetThreshold", th, &reply); err != nil { + return + } + } return } @@ -505,6 +511,7 @@ func (dm *DataManager) GetThresholdProfile(tenant, id string, cacheRead, cacheWr err = dm.rmtConns.Call(utils.ReplicatorSv1GetThresholdProfile, &utils.TenantID{Tenant: tenant, ID: id}, &th) } + err = utils.CastRPCErrToErr(err) if err != nil { if err == utils.ErrNotFound && cacheWrite { Cache.Set(utils.CacheThresholdProfiles, tntID, nil, nil, @@ -1137,17 +1144,17 @@ func (dm *DataManager) HasData(category, subject, tenant string) (has bool, err func (dm *DataManager) GetFilterIndexes(cacheID, itemIDPrefix, filterType string, fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) { if indexes, err = dm.DataDB().GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType, fldNameVal); err != nil { - //if err == utils.ErrNotFound && len(dm.rmtDataDBs) != 0 { - // var rmtErr error - // for _, rmtDM := range dm.rmtDataDBs { - // if indexes, rmtErr = rmtDM.GetFilterIndexes(cacheID, itemIDPrefix, - // filterType, fldNameVal); rmtErr == nil { - // break - // } - // } - // err = rmtErr - //} + if err == utils.ErrNotFound && dm.rmtConns != nil { + err = dm.rmtConns.Call(utils.ReplicatorSv1GetFilterIndexes, + &utils.GetFilterIndexesArg{ + CacheID: cacheID, + ItemIDPrefix: itemIDPrefix, + FilterType: filterType, + FldNameVal: fldNameVal, + }, &indexes) + } if err != nil { + err = utils.CastRPCErrToErr(err) return nil, err } } @@ -1160,6 +1167,17 @@ func (dm *DataManager) SetFilterIndexes(cacheID, itemIDPrefix string, indexes, commit, transactionID); err != nil { return } + if dm.rplConns != nil { + var reply string + if err = dm.rplConns.Call("ReplicatorSv1.SetFilterIndexes", + &utils.SetFilterIndexesArg{ + CacheID: cacheID, + ItemIDPrefix: itemIDPrefix, + Indexes: indexes, + }, &reply); err != nil { + return + } + } return } @@ -1238,6 +1256,7 @@ func (dm *DataManager) GetSupplierProfile(tenant, id string, cacheRead, cacheWri &utils.TenantID{Tenant: tenant, ID: id}, &supp) } if err != nil { + err = utils.CastRPCErrToErr(err) if err == utils.ErrNotFound && cacheWrite { Cache.Set(utils.CacheSupplierProfiles, tntID, nil, nil, cacheCommit(transactionID), transactionID) diff --git a/utils/coreutils.go b/utils/coreutils.go index 67b588219..450404daa 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -1016,3 +1016,11 @@ type SetFilterIndexesArg struct { ItemIDPrefix string Indexes map[string]StringMap } + +func CastRPCErrToErr(err error) error { + if _, has := ErrMap[err.Error()]; has { + return ErrMap[err.Error()] + } else { + return err + } +} diff --git a/utils/errors.go b/utils/errors.go index 32b761cdd..1bb694497 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -44,7 +44,6 @@ var ( ErrRatingPlanNotFound = errors.New("RATING_PLAN_NOT_FOUND") ErrAccountNotFound = errors.New("ACCOUNT_NOT_FOUND") ErrAccountDisabled = errors.New("ACCOUNT_DISABLED") - ErrUserNotFound = errors.New("USER_NOT_FOUND") ErrInsufficientCredit = errors.New("INSUFFICIENT_CREDIT") ErrNotConvertible = errors.New("NOT_CONVERTIBLE") ErrResourceUnavailable = errors.New("RESOURCE_UNAVAILABLE") @@ -52,23 +51,15 @@ var ( ErrNoActiveSession = errors.New("NO_ACTIVE_SESSION") ErrPartiallyExecuted = errors.New("PARTIALLY_EXECUTED") ErrMaxUsageExceeded = errors.New("MAX_USAGE_EXCEEDED") - ErrUnallocatedResource = errors.New("UNALLOCATED_RESOURCE") - ErrNotFoundNoCaps = errors.New("not found") ErrFilterNotPassingNoCaps = errors.New("filter not passing") ErrNotConvertibleNoCaps = errors.New("not convertible") ErrMandatoryIeMissingNoCaps = errors.New("mandatory information missing") ErrUnauthorizedApi = errors.New("UNAUTHORIZED_API") ErrUnknownApiKey = errors.New("UNKNOWN_API_KEY") - ErrIncompatible = errors.New("INCOMPATIBLE") ErrReqUnsynchronized = errors.New("REQ_UNSYNCHRONIZED") ErrUnsupporteServiceMethod = errors.New("UNSUPPORTED_SERVICE_METHOD") - ErrWrongArgsType = errors.New("WRONG_ARGS_TYPE") - ErrWrongReplyType = errors.New("WRONG_REPLY_TYPE") ErrDisconnected = errors.New("DISCONNECTED") ErrReplyTimeout = errors.New("REPLY_TIMEOUT") - ErrFailedReconnect = errors.New("FAILED_RECONNECT") - ErrInternallyDisconnected = errors.New("INTERNALLY_DISCONNECTED") - ErrUnsupportedCodec = errors.New("UNSUPPORTED_CODEC") ErrSessionNotFound = errors.New("SESSION_NOT_FOUND") ErrJsonIncompleteComment = errors.New("JSON_INCOMPLETE_COMMENT") ErrCDRCNoProfileID = errors.New("CDRC_PROFILE_WITHOUT_ID") @@ -80,6 +71,45 @@ var ( ErrUnsupportedFormat = errors.New("UNSUPPORTED_FORMAT") ErrNoDatabaseConn = errors.New("NO_DATA_BASE_CONNECTION") ErrMaxIncrementsExceeded = errors.New("MAX_INCREMENTS_EXCEEDED") + + ErrMap = map[string]error{ + ErrNoMoreData.Error(): ErrNoMoreData, + ErrNotImplemented.Error(): ErrNotImplemented, + ErrNotFound.Error(): ErrNotFound, + ErrTimedOut.Error(): ErrTimedOut, + ErrServerError.Error(): ErrServerError, + ErrMaxRecursionDepth.Error(): ErrMaxRecursionDepth, + ErrExists.Error(): ErrExists, + ErrBrokenReference.Error(): ErrBrokenReference, + ErrParserError.Error(): ErrParserError, + ErrInvalidPath.Error(): ErrInvalidPath, + ErrInvalidKey.Error(): ErrInvalidKey, + ErrUnauthorizedDestination.Error(): ErrUnauthorizedDestination, + ErrRatingPlanNotFound.Error(): ErrRatingPlanNotFound, + ErrInsufficientCredit.Error(): ErrInsufficientCredit, + ErrNotConvertible.Error(): ErrNotConvertible, + ErrResourceUnavailable.Error(): ErrResourceUnavailable, + ErrResourceUnauthorized.Error(): ErrResourceUnauthorized, + ErrNoActiveSession.Error(): ErrNoActiveSession, + ErrPartiallyExecuted.Error(): ErrPartiallyExecuted, + ErrMaxUsageExceeded.Error(): ErrMaxUsageExceeded, + ErrFilterNotPassingNoCaps.Error(): ErrFilterNotPassingNoCaps, + ErrNotConvertibleNoCaps.Error(): ErrNotConvertibleNoCaps, + ErrUnauthorizedApi.Error(): ErrUnauthorizedApi, + ErrUnknownApiKey.Error(): ErrUnknownApiKey, + ErrReqUnsynchronized.Error(): ErrReqUnsynchronized, + ErrUnsupporteServiceMethod.Error(): ErrUnsupporteServiceMethod, + ErrDisconnected.Error(): ErrDisconnected, + ErrReplyTimeout.Error(): ErrReplyTimeout, + ErrSessionNotFound.Error(): ErrSessionNotFound, + ErrJsonIncompleteComment.Error(): ErrJsonIncompleteComment, + ErrCDRCNoProfileID.Error(): ErrCDRCNoProfileID, + ErrCDRCNoInPath.Error(): ErrCDRCNoInPath, + ErrNotEnoughParameters.Error(): ErrNotEnoughParameters, + ErrUnsupportedFormat.Error(): ErrUnsupportedFormat, + ErrNoDatabaseConn.Error(): ErrNoDatabaseConn, + ErrMaxIncrementsExceeded.Error(): ErrMaxIncrementsExceeded, + } ) // NewCGRError initialises a new CGRError @@ -224,4 +254,4 @@ func ErrPathNotReachable(path string) error { func ErrNotConvertibleTF(from, to string) error { return fmt.Errorf("%s : from: %s to:%s", ErrNotConvertibleNoCaps.Error(), from, to) -} \ No newline at end of file +}