Add integration test for cache replication from AttributeSv1/RateSv1.ProcessEvent

This commit is contained in:
TeoV
2020-12-14 12:49:35 +02:00
committed by Dan Christian Bogos
parent 05143d62d1
commit 9152bb452c
8 changed files with 378 additions and 4 deletions

View File

@@ -0,0 +1,239 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"net/rpc"
"path"
"reflect"
"sort"
"testing"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/config"
)
var (
engine1Cfg *config.CGRConfig
engine1RPC *rpc.Client
engine1CfgPath string
engine2Cfg *config.CGRConfig
engine2RPC *rpc.Client
engine2CfgPath string
sTestsCacheSReplicate = []func(t *testing.T){
testCacheSReplicateLoadConfig,
testCacheSReplicateInitDataDb,
testCacheSReplicateInitStorDb,
testCacheSReplicateStartEngine,
testCacheSReplicateRpcConn,
testCacheSReplicateLoadTariffPlanFromFolder,
testCacheSReplicateProcessAttributes,
testCacheSReplicateProcessRateProfile,
testCacheSReplicateStopEngine,
}
)
func TestCacheSv1ReplicateIT(t *testing.T) {
for _, stest := range sTestsCacheSReplicate {
t.Run("TestCacheSv1ReplicateIT", stest)
}
}
func testCacheSReplicateLoadConfig(t *testing.T) {
var err error
engine1CfgPath = path.Join(*dataDir, "conf", "samples", "replication_cache", "engine1")
if engine1Cfg, err = config.NewCGRConfigFromPath(engine1CfgPath); err != nil {
t.Error(err)
}
engine2CfgPath = path.Join(*dataDir, "conf", "samples", "replication_cache", "engine2")
if engine2Cfg, err = config.NewCGRConfigFromPath(engine2CfgPath); err != nil {
t.Error(err)
}
}
func testCacheSReplicateInitDataDb(t *testing.T) {
if err := engine.InitDataDb(engine1Cfg); err != nil {
t.Fatal(err)
}
if err := engine.InitDataDb(engine2Cfg); err != nil {
t.Fatal(err)
}
}
// Empty tables before using them
func testCacheSReplicateInitStorDb(t *testing.T) {
if err := engine.InitStorDb(engine1Cfg); err != nil {
t.Fatal(err)
}
if err := engine.InitStorDb(engine2Cfg); err != nil {
t.Fatal(err)
}
}
// Start engine
func testCacheSReplicateStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(engine1CfgPath, *waitRater); err != nil {
t.Fatal(err)
}
if _, err := engine.StartEngine(engine2CfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testCacheSReplicateRpcConn(t *testing.T) {
var err error
engine1RPC, err = newRPCClient(engine1Cfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal("Could not connect to RPC: ", err.Error())
}
engine2RPC, err = newRPCClient(engine2Cfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal("Could not connect to RPC: ", err.Error())
}
}
func testCacheSReplicateLoadTariffPlanFromFolder(t *testing.T) {
var reply string
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "testit")}
if err := engine2RPC.Call(utils.APIerSv1LoadTariffPlanFromFolder, attrs, &reply); err != nil {
t.Error(err)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
}
func testCacheSReplicateProcessAttributes(t *testing.T) {
ev := &engine.AttrArgsProcessEvent{
Context: utils.StringPointer(utils.MetaSessionS),
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testCacheSReplicateProcessAttributes",
Event: map[string]interface{}{
utils.Account: "1001",
},
},
},
}
eRply := &engine.AttrSProcessEventReply{
MatchedProfiles: []string{"ATTR_ACNT_1001"},
AlteredFields: []string{utils.MetaReq + utils.NestingSep + "OfficeGroup"},
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testCacheSReplicateProcessAttributes",
Event: map[string]interface{}{
utils.Account: "1001",
"OfficeGroup": "Marketing",
},
},
},
}
var rplyEv engine.AttrSProcessEventReply
if err := engine1RPC.Call(utils.AttributeSv1ProcessEvent,
ev, &rplyEv); err != nil {
t.Error(err)
} else {
sort.Strings(eRply.AlteredFields)
sort.Strings(rplyEv.AlteredFields)
if !reflect.DeepEqual(eRply, &rplyEv) { // second for reversed order of attributes
t.Errorf("Expecting: %s, received: %s",
utils.ToJSON(eRply), utils.ToJSON(rplyEv))
}
}
if err := engine2RPC.Call(utils.AttributeSv1ProcessEvent,
ev, &rplyEv); err != nil {
t.Error(err)
} else {
sort.Strings(eRply.AlteredFields)
sort.Strings(rplyEv.AlteredFields)
if !reflect.DeepEqual(eRply, &rplyEv) { // second for reversed order of attributes
t.Errorf("Expecting: %s, received: %s",
utils.ToJSON(eRply), utils.ToJSON(rplyEv))
}
}
}
func testCacheSReplicateProcessRateProfile(t *testing.T) {
var rply *engine.RateProfileCost
argsRt := &utils.ArgsCostForEvent{
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: utils.UUIDSha1Prefix(),
Event: map[string]interface{}{
utils.Account: "1002",
},
},
},
}
rate1 := &engine.Rate{
ID: "RT_ALWAYS",
Weight: 0,
ActivationTimes: "* * * * *",
IntervalRates: []*engine.IntervalRate{
{
IntervalStart: 0,
RecurrentFee: 0.01,
Unit: time.Minute,
Increment: time.Second,
},
},
}
exp := &engine.RateProfileCost{
ID: "RT_SPECIAL_1002",
Cost: 0.01,
RoundingDecimals: 4,
RoundingMethod: utils.MetaUp,
RateSIntervals: []*engine.RateSInterval{{
UsageStart: 0,
Increments: []*engine.RateSIncrement{{
UsageStart: 0,
Usage: time.Minute,
Rate: rate1,
IntervalRateIndex: 0,
CompressFactor: 60,
}},
CompressFactor: 1,
}},
}
if err := engine1RPC.Call(utils.RateSv1CostForEvent, &argsRt, &rply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(exp), utils.ToJSON(rply))
}
if err := engine2RPC.Call(utils.RateSv1CostForEvent, &argsRt, &rply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(exp, rply) {
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(exp), utils.ToJSON(rply))
}
}
func testCacheSReplicateStopEngine(t *testing.T) {
if err := engine.KillEngine(300); err != nil {
t.Error(err)
}
}

View File

@@ -218,7 +218,7 @@ func testEEsExportCDRs(t *testing.T) {
}
func testEEsVerifyExports(t *testing.T) {
time.Sleep(time.Second)
time.Sleep(time.Second + 600*time.Millisecond)
var files []string
err := filepath.Walk("/tmp/testCSV/", func(path string, info os.FileInfo, err error) error {
if strings.HasSuffix(path, utils.CSVSuffix) {

View File

@@ -0,0 +1,75 @@
{
"general": {
"log_level": 7,
"node_id": "Engine1"
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2180"
},
"data_db": {
"db_type": "redis",
"db_port": 6379,
"db_name": "10"
},
"stor_db": {
"db_password": "CGRateS.org"
},
"rpc_conns": {
"connCache": {
"strategy": "*broadcast_sync",
"conns": [
{"address": "127.0.0.1:2013", "transport":"*gob"},
{"address": "127.0.0.1:2023", "transport":"*gob"}
]
}
},
"caches":{
"partitions": {
"*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": true},
"*attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": true},
"*rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false, "replicate": true},
"*rate_profile_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": true},
"*rate_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": true}
},
"replication_conns": ["connCache"]
},
"rals": {
"enabled": true
},
"attributes": {
"enabled": true
},
"rates": {
"enabled": true
},
"schedulers": {
"enabled": true
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"]
},
},

View File

@@ -0,0 +1,52 @@
{
"general": {
"log_level": 7,
"node_id": "Engine2"
},
"listen": {
"rpc_json": ":2022",
"rpc_gob": ":2023",
"http": ":2280",
},
"data_db": {
"db_type": "redis",
"db_port": 6379,
"db_name": "10",
},
"stor_db": {
"db_password": "CGRateS.org",
},
"rals": {
"enabled": true,
},
"attributes": {
"enabled": true,
},
"rates": {
"enabled": true,
},
"schedulers": {
"enabled": true,
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
}

View File

@@ -1,5 +1,5 @@
#Tenant,ID,FilterIDs,ActivationInterval,Weight,RoundingMethod,RoundingDecimals,MinCost,MaxCost,MaxCostStrategy,RateID,RateFilterIDs,RateActivationStart,RateWeight,RateBlocker,RateIntervalStart,RateFixedFee,RateRecurrentFee,RateUnit,RateIncrement
cgrates.org,RT_SPECIAL_1002,,,0,*up,4,0,0,*free,RT_ALWAYS,,"* * * * *",0,false,0s,,0.01,1m,1s
cgrates.org,RT_SPECIAL_1002,*string:~*req.Account:1002,,10,*up,4,0,0,*free,RT_ALWAYS,,"* * * * *",0,false,0s,,0.01,1m,1s
cgrates.org,RT_RETAIL1,,,0,*up,4,0,0,*free,RT_ALWAYS,,"* * * * *",0,false,0s,,0.4,1m,30s
cgrates.org,RT_RETAIL1,,,,,,,,,RT_ALWAYS,,"* * * * *",0,false,1m,,0.2,1m,10s
1 #Tenant ID FilterIDs ActivationInterval Weight RoundingMethod RoundingDecimals MinCost MaxCost MaxCostStrategy RateID RateFilterIDs RateActivationStart RateWeight RateBlocker RateIntervalStart RateFixedFee RateRecurrentFee RateUnit RateIncrement
2 cgrates.org RT_SPECIAL_1002 *string:~*req.Account:1002 0 10 *up 4 0 0 *free RT_ALWAYS * * * * * 0 false 0s 0.01 1m 1s
3 cgrates.org RT_RETAIL1 0 *up 4 0 0 *free RT_ALWAYS * * * * * 0 false 0s 0.4 1m 30s
4 cgrates.org RT_RETAIL1 RT_ALWAYS * * * * * 0 false 1m 0.2 1m 10s
5

View File

@@ -36,6 +36,8 @@ var Cache *CacheS
func init() {
Cache = NewCacheS(config.CgrConfig(), nil, nil)
gob.Register(new(AttributeProfile))
gob.Register(new(AttributeProfileWithOpts))
// Threshold
gob.Register(new(Threshold))
gob.Register(new(ThresholdProfile))
@@ -96,6 +98,11 @@ func init() {
gob.Register(url.Values{})
gob.Register(json.RawMessage{})
gob.Register(BalanceSummaries{})
gob.Register(new(utils.ArgCacheReplicateSet))
gob.Register(new(utils.ArgCacheReplicateRemove))
gob.Register(utils.StringSet{})
}
//SetCache shared the cache from other subsystems

View File

@@ -368,9 +368,9 @@ func testExpVerifyRateProfiles(t *testing.T) {
splPrf := &engine.RateProfile{
Tenant: "cgrates.org",
ID: "RT_SPECIAL_1002",
FilterIDs: []string{},
FilterIDs: []string{"*string:~*req.Account:1002"},
ActivationInterval: nil,
Weight: 0,
Weight: 10,
RoundingDecimals: 4,
RoundingMethod: utils.ROUNDING_UP,
MinCost: 0,

View File

@@ -506,6 +506,7 @@ const (
MetaEEs = "*ees"
MetaRateS = "*rates"
MetaContinue = "*continue"
MetaUp = "*up"
Migrator = "migrator"
UnsupportedMigrationTask = "unsupported migration task"
NoStorDBConnection = "not connected to StorDB"