Add delay for sessions integration tests and consider paginator for get cdr for internal db

This commit is contained in:
TeoV
2019-10-14 18:42:33 +03:00
committed by Dan Christian Bogos
parent addf5e73eb
commit be9ba5df3b
9 changed files with 172 additions and 19 deletions

View File

@@ -86,6 +86,7 @@ var sTestsStatSV1 = []func(t *testing.T){
testV1STSStatsPing,
testV1STSProcessMetricsWithFilter,
testV1STSProcessStaticMetrics,
testV1STSProcessStatWithThreshold,
testV1STSStopEngine,
}
@@ -683,6 +684,86 @@ func testV1STSStatsPing(t *testing.T) {
}
}
func testV1STSProcessStatWithThreshold(t *testing.T) {
stTh := &StatQueueWithCache{
StatQueueProfile: &engine.StatQueueProfile{
Tenant: "cgrates.org",
ID: "StatWithThreshold",
FilterIDs: []string{"*string:~CustomEvent:CustomEvent"}, //custom filter for event
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
},
QueueLength: 100,
TTL: time.Duration(1) * time.Second,
Metrics: []*engine.MetricWithFilters{
&engine.MetricWithFilters{
MetricID: "*tcd",
},
&engine.MetricWithFilters{
MetricID: "*sum:2",
},
},
Stored: true,
Weight: 20,
MinItems: 1,
},
}
var result string
if err := stsV1Rpc.Call("ApierV1.SetStatQueueProfile", stTh, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
thSts := &ThresholdWithCache{
ThresholdProfile: &engine.ThresholdProfile{
Tenant: "cgrates.org",
ID: "THD_Stat",
FilterIDs: []string{"*string:~EventType:StatUpdate",
"*string:~StatID:StatWithThreshold", "*exists:*tcd:", "*gte:~*tcd:1s"},
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC),
},
MaxHits: -1,
MinSleep: time.Duration(5 * time.Minute),
Weight: 20.0,
ActionIDs: []string{"LOG_WARNING"},
Async: true,
},
}
if err := stsV1Rpc.Call("ApierV1.SetThresholdProfile", thSts, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
//process event
var reply2 []string
expected := []string{"StatWithThreshold"}
args := engine.StatsArgsProcessEvent{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "event1",
Event: map[string]interface{}{
"CustomEvent": "CustomEvent",
utils.Usage: time.Duration(45 * time.Second),
},
},
}
if err := stsV1Rpc.Call(utils.StatSv1ProcessEvent, &args, &reply2); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(reply2, expected) {
t.Errorf("Expecting: %+v, received: %+v", expected, reply2)
}
var td engine.Threshold
eTd := engine.Threshold{Tenant: "cgrates.org", ID: "THD_Stat", Hits: 1}
if err := stsV1Rpc.Call(utils.ThresholdSv1GetThreshold,
&utils.TenantID{Tenant: "cgrates.org", ID: "THD_Stat"}, &td); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eTd.Hits, td.Hits) {
t.Errorf("expecting: %+v, received: %+v", eTd, td)
}
}
func testV1STSStopEngine(t *testing.T) {
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)

View File

@@ -117,7 +117,7 @@ func testVrsDataDB(t *testing.T) {
expectedVrs := engine.Versions{"ActionTriggers": 2,
"Actions": 2, "RQF": 1, "ReverseDestinations": 1, "Attributes": 4, "RatingPlan": 1,
"RatingProfile": 1, "User": 1, "Accounts": 3, "ActionPlans": 3, "Chargers": 1,
"Destinations": 1, "SharedGroups": 2, "Stats": 2, "Resource": 1,
"Destinations": 1, "LoadIDs": 1, "SharedGroups": 2, "Stats": 2, "Resource": 1,
"Subscribers": 1, "Suppliers": 1, "Thresholds": 3, "Timing": 1}
if err := vrsRPC.Call("ApierV1.GetDataDBVersions", "", &result); err != nil {
t.Error(err)
@@ -157,7 +157,7 @@ func testVrsSetDataDBVrs(t *testing.T) {
expectedVrs := engine.Versions{"ActionTriggers": 2,
"Actions": 2, "RQF": 1, "ReverseDestinations": 1, "Attributes": 3, "RatingPlan": 1,
"RatingProfile": 1, "User": 1, "Accounts": 3, "ActionPlans": 3, "Chargers": 1,
"Destinations": 1, "SharedGroups": 2, "Stats": 2, "Resource": 1,
"Destinations": 1, "LoadIDs": 1, "SharedGroups": 2, "Stats": 2, "Resource": 1,
"Subscribers": 1, "Suppliers": 1, "Thresholds": 3, "Timing": 1}
if err := vrsRPC.Call("ApierV1.GetDataDBVersions", "", &result); err != nil {
t.Error(err)

View File

@@ -732,7 +732,7 @@ func testDspSessionPassive(t *testing.T) {
CGRID: rply[0].CGRID,
Tenant: rply[0].Tenant,
ResourceID: "TestSSv1It1",
EventStart: engine.NewSafEvent(map[string]interface{}{
EventStart: engine.NewMapEvent(map[string]interface{}{
utils.CGRID: "c87609aa1cb6e9529ab1836cfeeebaab7aa7ebaf",
utils.Tenant: "cgrates.org",
utils.Category: "call",

View File

@@ -259,6 +259,15 @@ func testGetCDRs(cfg *config.CGRConfig) error {
if err := InitStorDb(cfg); err != nil {
return fmt.Errorf("testGetCDRs #1: %v", err)
}
cfg.StorDbCfg().StorDBStringIndexedFields = []string{utils.CGRID,
utils.RunID, utils.OriginHost, utils.Source, utils.OriginID,
utils.ToR, utils.RequestType, utils.Tenant,
utils.Category, utils.Account, utils.Subject,
"Service-Context-Id",
}
cfg.StorDbCfg().StorDBPrefixIndexedFields = []string{
utils.Destination,
}
cdrStorage, err := ConfigureCdrStorage(cfg.StorDbCfg().StorDBType,
cfg.StorDbCfg().StorDBHost, cfg.StorDbCfg().StorDBPort,
cfg.StorDbCfg().StorDBName, cfg.StorDbCfg().StorDBUser,
@@ -499,7 +508,7 @@ func testGetCDRs(cfg *config.CGRConfig) error {
if CDRs, _, err := cdrStorage.GetCDRs(&utils.CDRsFilter{Paginator: utils.Paginator{Limit: utils.IntPointer(5), Offset: utils.IntPointer(0)}}, false); err != nil {
return fmt.Errorf("testGetCDRs #9 err: %v", err)
} else if len(CDRs) != 5 {
return fmt.Errorf("testGetCDRs #10, unexpected number of CDRs returned: %+v", CDRs)
return fmt.Errorf("testGetCDRs #10, unexpected number of CDRs returned: %+v", len(CDRs))
}
// Offset 5
if CDRs, _, err := cdrStorage.GetCDRs(&utils.CDRsFilter{Paginator: utils.Paginator{Limit: utils.IntPointer(5), Offset: utils.IntPointer(0)}}, false); err != nil {
@@ -511,7 +520,7 @@ func testGetCDRs(cfg *config.CGRConfig) error {
if CDRs, _, err := cdrStorage.GetCDRs(&utils.CDRsFilter{Paginator: utils.Paginator{Limit: utils.IntPointer(2), Offset: utils.IntPointer(5)}}, false); err != nil {
return fmt.Errorf("testGetCDRs #12 err: %v", err)
} else if len(CDRs) != 2 {
return fmt.Errorf("testGetCDRs #13, unexpected number of CDRs returned: %+v", CDRs)
return fmt.Errorf("testGetCDRs #13, unexpected number of CDRs returned: %+v", len(CDRs))
}
// Filter on cgrids
if CDRs, _, err := cdrStorage.GetCDRs(&utils.CDRsFilter{CGRIDs: []string{

View File

@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
@@ -35,12 +36,13 @@ type InternalDB struct {
mu sync.RWMutex
stringIndexedFields []string
prefixIndexedFields []string
cnter *utils.Counter // used for OrderID for cdr
}
func NewInternalDB(stringIndexedFields, prefixIndexedFields []string) *InternalDB {
return &InternalDB{db: ltcache.NewTransCache(config.CgrConfig().CacheCfg().AsTransCacheConfig()),
ms: NewCodecMsgpackMarshaler(), stringIndexedFields: stringIndexedFields,
prefixIndexedFields: prefixIndexedFields}
prefixIndexedFields: prefixIndexedFields, cnter: utils.NewCounter(time.Now().UnixNano(), 0)}
}
func NewInternalDBJson(stringIndexedFields, prefixIndexedFields []string) (InternalDB *InternalDB) {

View File

@@ -837,6 +837,9 @@ func (iDB *InternalDB) SetTPDispatcherHosts(dpps []*utils.TPDispatcherHost) (err
//implement CdrStorage interface
func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
if cdr.OrderID == 0 {
cdr.OrderID = iDB.cnter.Next()
}
if !allowUpdate {
x, ok := iDB.db.Get(utils.CDRsTBL, utils.ConcatenatedKey(utils.CDRsTBL, cdr.CGRID, cdr.RunID, cdr.OriginID))
if ok && x != nil {
@@ -933,8 +936,38 @@ func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*C
}
}
}
}
//check if we have ExtraFields
if len(filter.ExtraFields) != 0 {
for extFldID, extrFldVal := range filter.ExtraFields {
// need to discuss about this case
if extrFldVal == utils.MetaExists {
}
grpMpIDs := make(utils.StringMap)
grpIDs := iDB.db.GetGroupItemIDs(utils.CDRsTBL, utils.ConcatenatedKey(extFldID, extrFldVal))
for _, id := range grpIDs {
grpMpIDs[id] = true
}
if len(grpMpIDs) == 0 {
return nil, 0, utils.ErrNotFound
}
if cdrMpIDs == nil {
cdrMpIDs = grpMpIDs
} else {
for id := range cdrMpIDs {
if !grpMpIDs.HasKey(id) {
delete(cdrMpIDs, id)
if len(cdrMpIDs) == 0 {
return nil, 0, utils.ErrNotFound
}
}
}
}
}
}
if cdrMpIDs == nil {
cdrMpIDs = utils.StringMapFromSlice(iDB.db.GetItemIDs(utils.CDRsTBL, utils.EmptyString))
}
@@ -972,18 +1005,37 @@ func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*C
}
}
}
//check if we have ExtraFields
if len(filter.NotExtraFields) != 0 {
for notExtFldID, notExtFlddVal := range filter.NotExtraFields {
// need to discuss about this case
if notExtFlddVal == utils.MetaExists {
}
grpIDs := iDB.db.GetGroupItemIDs(utils.CDRsTBL, utils.ConcatenatedKey(notExtFldID, notExtFlddVal))
for _, id := range grpIDs {
if cdrMpIDs.HasKey(id) {
delete(cdrMpIDs, id)
if len(cdrMpIDs) == 0 {
return nil, 0, utils.ErrNotFound
}
}
}
}
}
if len(cdrMpIDs) == 0 {
return nil, 0, utils.ErrNotFound
}
//pageCounter := 0
paginatorOffsetCounter := 0
for key := range cdrMpIDs {
x, ok := iDB.db.Get(utils.CDRsTBL, key)
if !ok || x == nil {
return nil, 0, utils.ErrNotFound
}
cdr := x.(*CDR)
if len(filter.Costs) > 0 {
matchCost := false
for _, cost := range filter.Costs {
@@ -1015,7 +1067,7 @@ func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*C
}
}
if filter.OrderIDEnd != nil {
if cdr.OrderID > *filter.OrderIDEnd {
if cdr.OrderID >= *filter.OrderIDEnd {
continue
}
}
@@ -1069,7 +1121,7 @@ func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*C
continue
}
} else {
if cdr.Cost < *filter.MinCost && cdr.Cost > *filter.MaxCost {
if cdr.Cost < *filter.MinCost || cdr.Cost > *filter.MaxCost {
continue
}
}
@@ -1079,22 +1131,29 @@ func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*C
continue
}
} else { // Above limited CDRs, since MinCost is empty, make sure we query also NULL cost
if cdr.Cost > 0 {
if cdr.Cost >= *filter.MaxCost {
continue
}
}
}
if filter.Paginator.Offset != nil {
if paginatorOffsetCounter <= *filter.Paginator.Offset {
paginatorOffsetCounter += 1
continue
}
}
if filter.Paginator.Limit != nil {
if len(cdrs) >= *filter.Paginator.Limit {
break
}
}
//pass all filters and append to slice
cdrs = append(cdrs, cdr)
}
// //apply paginator
// cdrIDs = filter.Paginator.PaginateStringSlice(cdrIDs)
// if filter.Count {
// return nil, int64(len(cdrs)), nil
// }
if filter.Count {
return nil, int64(len(cdrs)), nil
}
if remove {
for _, cdr := range cdrs {
iDB.db.Remove(utils.CDRsTBL, utils.ConcatenatedKey(utils.CDRsTBL, cdr.CGRID, cdr.RunID, cdr.OriginID),

View File

@@ -159,6 +159,7 @@ func TestSessionsBiRPCSessionAutomaticDisconnects(t *testing.T) {
initArgs, &initRpl); err != nil {
t.Error(err)
}
time.Sleep(10 * time.Millisecond) // give some time to allow the session to be created
expMaxUsage := time.Duration(-1)
if *initRpl.MaxUsage != expMaxUsage {
t.Errorf("Expecting : %+v, received: %+v", expMaxUsage, *initRpl.MaxUsage)

View File

@@ -611,6 +611,7 @@ func TestSessionsDataTTLExpMultiUpdates(t *testing.T) {
initArgs, &initRpl); err != nil {
t.Error(err)
}
time.Sleep(10*time.Millisecond) // give some time to allow the session to be created
if (*initRpl.MaxUsage).Nanoseconds() != usage {
t.Errorf("Expecting : %+v, received: %+v", usage, (*initRpl.MaxUsage).Nanoseconds())
}

View File

@@ -944,7 +944,7 @@ func TestSessionsVoiceSessionTTLWithRelocate(t *testing.T) {
initArgs, &initRpl); err != nil {
t.Error(err)
}
time.Sleep(time.Duration(10 * time.Millisecond))
time.Sleep(time.Duration(20 * time.Millisecond))
if *initRpl.MaxUsage != usage {
t.Errorf("Expected: %+v, received: %+v", usage, *initRpl.MaxUsage)
}