diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index d95475eff..984035c5b 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -86,6 +86,7 @@ var sTestsStatSV1 = []func(t *testing.T){ testV1STSStatsPing, testV1STSProcessMetricsWithFilter, testV1STSProcessStaticMetrics, + testV1STSProcessStatWithThreshold, testV1STSStopEngine, } @@ -683,6 +684,86 @@ func testV1STSStatsPing(t *testing.T) { } } +func testV1STSProcessStatWithThreshold(t *testing.T) { + stTh := &StatQueueWithCache{ + StatQueueProfile: &engine.StatQueueProfile{ + Tenant: "cgrates.org", + ID: "StatWithThreshold", + FilterIDs: []string{"*string:~CustomEvent:CustomEvent"}, //custom filter for event + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + }, + QueueLength: 100, + TTL: time.Duration(1) * time.Second, + Metrics: []*engine.MetricWithFilters{ + &engine.MetricWithFilters{ + MetricID: "*tcd", + }, + &engine.MetricWithFilters{ + MetricID: "*sum:2", + }, + }, + Stored: true, + Weight: 20, + MinItems: 1, + }, + } + var result string + if err := stsV1Rpc.Call("ApierV1.SetStatQueueProfile", stTh, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + thSts := &ThresholdWithCache{ + ThresholdProfile: &engine.ThresholdProfile{ + Tenant: "cgrates.org", + ID: "THD_Stat", + FilterIDs: []string{"*string:~EventType:StatUpdate", + "*string:~StatID:StatWithThreshold", "*exists:*tcd:", "*gte:~*tcd:1s"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + }, + MaxHits: -1, + MinSleep: time.Duration(5 * time.Minute), + Weight: 20.0, + ActionIDs: []string{"LOG_WARNING"}, + Async: true, + }, + } + if err := stsV1Rpc.Call("ApierV1.SetThresholdProfile", thSts, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + //process event + var reply2 []string + expected := []string{"StatWithThreshold"} + args := engine.StatsArgsProcessEvent{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + "CustomEvent": "CustomEvent", + utils.Usage: time.Duration(45 * time.Second), + }, + }, + } + if err := stsV1Rpc.Call(utils.StatSv1ProcessEvent, &args, &reply2); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(reply2, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply2) + } + + var td engine.Threshold + eTd := engine.Threshold{Tenant: "cgrates.org", ID: "THD_Stat", Hits: 1} + if err := stsV1Rpc.Call(utils.ThresholdSv1GetThreshold, + &utils.TenantID{Tenant: "cgrates.org", ID: "THD_Stat"}, &td); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eTd.Hits, td.Hits) { + t.Errorf("expecting: %+v, received: %+v", eTd, td) + } +} + func testV1STSStopEngine(t *testing.T) { if err := engine.KillEngine(*waitRater); err != nil { t.Error(err) diff --git a/apier/v1/versions_it_test.go b/apier/v1/versions_it_test.go index bd0f8c56a..36c62e9ed 100644 --- a/apier/v1/versions_it_test.go +++ b/apier/v1/versions_it_test.go @@ -117,7 +117,7 @@ func testVrsDataDB(t *testing.T) { expectedVrs := engine.Versions{"ActionTriggers": 2, "Actions": 2, "RQF": 1, "ReverseDestinations": 1, "Attributes": 4, "RatingPlan": 1, "RatingProfile": 1, "User": 1, "Accounts": 3, "ActionPlans": 3, "Chargers": 1, - "Destinations": 1, "SharedGroups": 2, "Stats": 2, "Resource": 1, + "Destinations": 1, "LoadIDs": 1, "SharedGroups": 2, "Stats": 2, "Resource": 1, "Subscribers": 1, "Suppliers": 1, "Thresholds": 3, "Timing": 1} if err := vrsRPC.Call("ApierV1.GetDataDBVersions", "", &result); err != nil { t.Error(err) @@ -157,7 +157,7 @@ func testVrsSetDataDBVrs(t *testing.T) { expectedVrs := engine.Versions{"ActionTriggers": 2, "Actions": 2, "RQF": 1, "ReverseDestinations": 1, "Attributes": 3, "RatingPlan": 1, "RatingProfile": 1, "User": 1, "Accounts": 3, "ActionPlans": 3, "Chargers": 1, - "Destinations": 1, "SharedGroups": 2, "Stats": 2, "Resource": 1, + "Destinations": 1, "LoadIDs": 1, "SharedGroups": 2, "Stats": 2, "Resource": 1, "Subscribers": 1, "Suppliers": 1, "Thresholds": 3, "Timing": 1} if err := vrsRPC.Call("ApierV1.GetDataDBVersions", "", &result); err != nil { t.Error(err) diff --git a/dispatchers/sessions_it_test.go b/dispatchers/sessions_it_test.go index a8dac87b0..9fef3bba1 100755 --- a/dispatchers/sessions_it_test.go +++ b/dispatchers/sessions_it_test.go @@ -732,7 +732,7 @@ func testDspSessionPassive(t *testing.T) { CGRID: rply[0].CGRID, Tenant: rply[0].Tenant, ResourceID: "TestSSv1It1", - EventStart: engine.NewSafEvent(map[string]interface{}{ + EventStart: engine.NewMapEvent(map[string]interface{}{ utils.CGRID: "c87609aa1cb6e9529ab1836cfeeebaab7aa7ebaf", utils.Tenant: "cgrates.org", utils.Category: "call", diff --git a/engine/storage_cdrs_it_test.go b/engine/storage_cdrs_it_test.go index 83f45a157..4b0a5a8c4 100644 --- a/engine/storage_cdrs_it_test.go +++ b/engine/storage_cdrs_it_test.go @@ -259,6 +259,15 @@ func testGetCDRs(cfg *config.CGRConfig) error { if err := InitStorDb(cfg); err != nil { return fmt.Errorf("testGetCDRs #1: %v", err) } + cfg.StorDbCfg().StorDBStringIndexedFields = []string{utils.CGRID, + utils.RunID, utils.OriginHost, utils.Source, utils.OriginID, + utils.ToR, utils.RequestType, utils.Tenant, + utils.Category, utils.Account, utils.Subject, + "Service-Context-Id", + } + cfg.StorDbCfg().StorDBPrefixIndexedFields = []string{ + utils.Destination, + } cdrStorage, err := ConfigureCdrStorage(cfg.StorDbCfg().StorDBType, cfg.StorDbCfg().StorDBHost, cfg.StorDbCfg().StorDBPort, cfg.StorDbCfg().StorDBName, cfg.StorDbCfg().StorDBUser, @@ -499,7 +508,7 @@ func testGetCDRs(cfg *config.CGRConfig) error { if CDRs, _, err := cdrStorage.GetCDRs(&utils.CDRsFilter{Paginator: utils.Paginator{Limit: utils.IntPointer(5), Offset: utils.IntPointer(0)}}, false); err != nil { return fmt.Errorf("testGetCDRs #9 err: %v", err) } else if len(CDRs) != 5 { - return fmt.Errorf("testGetCDRs #10, unexpected number of CDRs returned: %+v", CDRs) + return fmt.Errorf("testGetCDRs #10, unexpected number of CDRs returned: %+v", len(CDRs)) } // Offset 5 if CDRs, _, err := cdrStorage.GetCDRs(&utils.CDRsFilter{Paginator: utils.Paginator{Limit: utils.IntPointer(5), Offset: utils.IntPointer(0)}}, false); err != nil { @@ -511,7 +520,7 @@ func testGetCDRs(cfg *config.CGRConfig) error { if CDRs, _, err := cdrStorage.GetCDRs(&utils.CDRsFilter{Paginator: utils.Paginator{Limit: utils.IntPointer(2), Offset: utils.IntPointer(5)}}, false); err != nil { return fmt.Errorf("testGetCDRs #12 err: %v", err) } else if len(CDRs) != 2 { - return fmt.Errorf("testGetCDRs #13, unexpected number of CDRs returned: %+v", CDRs) + return fmt.Errorf("testGetCDRs #13, unexpected number of CDRs returned: %+v", len(CDRs)) } // Filter on cgrids if CDRs, _, err := cdrStorage.GetCDRs(&utils.CDRsFilter{CGRIDs: []string{ diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 63b5aedd8..8480382ac 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -35,12 +36,13 @@ type InternalDB struct { mu sync.RWMutex stringIndexedFields []string prefixIndexedFields []string + cnter *utils.Counter // used for OrderID for cdr } func NewInternalDB(stringIndexedFields, prefixIndexedFields []string) *InternalDB { return &InternalDB{db: ltcache.NewTransCache(config.CgrConfig().CacheCfg().AsTransCacheConfig()), ms: NewCodecMsgpackMarshaler(), stringIndexedFields: stringIndexedFields, - prefixIndexedFields: prefixIndexedFields} + prefixIndexedFields: prefixIndexedFields, cnter: utils.NewCounter(time.Now().UnixNano(), 0)} } func NewInternalDBJson(stringIndexedFields, prefixIndexedFields []string) (InternalDB *InternalDB) { diff --git a/engine/storage_internal_stordb.go b/engine/storage_internal_stordb.go index 4acfe9de5..2751906e5 100644 --- a/engine/storage_internal_stordb.go +++ b/engine/storage_internal_stordb.go @@ -837,6 +837,9 @@ func (iDB *InternalDB) SetTPDispatcherHosts(dpps []*utils.TPDispatcherHost) (err //implement CdrStorage interface func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) { + if cdr.OrderID == 0 { + cdr.OrderID = iDB.cnter.Next() + } if !allowUpdate { x, ok := iDB.db.Get(utils.CDRsTBL, utils.ConcatenatedKey(utils.CDRsTBL, cdr.CGRID, cdr.RunID, cdr.OriginID)) if ok && x != nil { @@ -933,8 +936,38 @@ func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*C } } } - } + //check if we have ExtraFields + if len(filter.ExtraFields) != 0 { + for extFldID, extrFldVal := range filter.ExtraFields { + // need to discuss about this case + if extrFldVal == utils.MetaExists { + + } + grpMpIDs := make(utils.StringMap) + grpIDs := iDB.db.GetGroupItemIDs(utils.CDRsTBL, utils.ConcatenatedKey(extFldID, extrFldVal)) + for _, id := range grpIDs { + grpMpIDs[id] = true + } + + if len(grpMpIDs) == 0 { + return nil, 0, utils.ErrNotFound + } + if cdrMpIDs == nil { + cdrMpIDs = grpMpIDs + } else { + for id := range cdrMpIDs { + if !grpMpIDs.HasKey(id) { + delete(cdrMpIDs, id) + if len(cdrMpIDs) == 0 { + return nil, 0, utils.ErrNotFound + } + } + } + } + } + } + if cdrMpIDs == nil { cdrMpIDs = utils.StringMapFromSlice(iDB.db.GetItemIDs(utils.CDRsTBL, utils.EmptyString)) } @@ -972,18 +1005,37 @@ func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*C } } } + //check if we have ExtraFields + if len(filter.NotExtraFields) != 0 { + for notExtFldID, notExtFlddVal := range filter.NotExtraFields { + // need to discuss about this case + if notExtFlddVal == utils.MetaExists { + + } + grpIDs := iDB.db.GetGroupItemIDs(utils.CDRsTBL, utils.ConcatenatedKey(notExtFldID, notExtFlddVal)) + for _, id := range grpIDs { + if cdrMpIDs.HasKey(id) { + delete(cdrMpIDs, id) + if len(cdrMpIDs) == 0 { + return nil, 0, utils.ErrNotFound + } + } + } + } + } if len(cdrMpIDs) == 0 { return nil, 0, utils.ErrNotFound } - //pageCounter := 0 + paginatorOffsetCounter := 0 for key := range cdrMpIDs { x, ok := iDB.db.Get(utils.CDRsTBL, key) if !ok || x == nil { return nil, 0, utils.ErrNotFound } cdr := x.(*CDR) + if len(filter.Costs) > 0 { matchCost := false for _, cost := range filter.Costs { @@ -1015,7 +1067,7 @@ func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*C } } if filter.OrderIDEnd != nil { - if cdr.OrderID > *filter.OrderIDEnd { + if cdr.OrderID >= *filter.OrderIDEnd { continue } } @@ -1069,7 +1121,7 @@ func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*C continue } } else { - if cdr.Cost < *filter.MinCost && cdr.Cost > *filter.MaxCost { + if cdr.Cost < *filter.MinCost || cdr.Cost > *filter.MaxCost { continue } } @@ -1079,22 +1131,29 @@ func (iDB *InternalDB) GetCDRs(filter *utils.CDRsFilter, remove bool) (cdrs []*C continue } } else { // Above limited CDRs, since MinCost is empty, make sure we query also NULL cost - if cdr.Cost > 0 { + if cdr.Cost >= *filter.MaxCost { continue } } } + if filter.Paginator.Offset != nil { + if paginatorOffsetCounter <= *filter.Paginator.Offset { + paginatorOffsetCounter += 1 + continue + } + } + if filter.Paginator.Limit != nil { + if len(cdrs) >= *filter.Paginator.Limit { + break + } + } //pass all filters and append to slice cdrs = append(cdrs, cdr) } - - // //apply paginator - // cdrIDs = filter.Paginator.PaginateStringSlice(cdrIDs) - // if filter.Count { - // return nil, int64(len(cdrs)), nil - // } - + if filter.Count { + return nil, int64(len(cdrs)), nil + } if remove { for _, cdr := range cdrs { iDB.db.Remove(utils.CDRsTBL, utils.ConcatenatedKey(utils.CDRsTBL, cdr.CGRID, cdr.RunID, cdr.OriginID), diff --git a/sessions/sessions_birpc_it_test.go b/sessions/sessions_birpc_it_test.go index fdff99caa..4761cff02 100644 --- a/sessions/sessions_birpc_it_test.go +++ b/sessions/sessions_birpc_it_test.go @@ -159,6 +159,7 @@ func TestSessionsBiRPCSessionAutomaticDisconnects(t *testing.T) { initArgs, &initRpl); err != nil { t.Error(err) } + time.Sleep(10 * time.Millisecond) // give some time to allow the session to be created expMaxUsage := time.Duration(-1) if *initRpl.MaxUsage != expMaxUsage { t.Errorf("Expecting : %+v, received: %+v", expMaxUsage, *initRpl.MaxUsage) diff --git a/sessions/sessions_data_it_test.go b/sessions/sessions_data_it_test.go index 45277ed82..068a2cb52 100644 --- a/sessions/sessions_data_it_test.go +++ b/sessions/sessions_data_it_test.go @@ -611,6 +611,7 @@ func TestSessionsDataTTLExpMultiUpdates(t *testing.T) { initArgs, &initRpl); err != nil { t.Error(err) } + time.Sleep(10*time.Millisecond) // give some time to allow the session to be created if (*initRpl.MaxUsage).Nanoseconds() != usage { t.Errorf("Expecting : %+v, received: %+v", usage, (*initRpl.MaxUsage).Nanoseconds()) } diff --git a/sessions/sessions_voice_it_test.go b/sessions/sessions_voice_it_test.go index f75448952..17e39a808 100644 --- a/sessions/sessions_voice_it_test.go +++ b/sessions/sessions_voice_it_test.go @@ -944,7 +944,7 @@ func TestSessionsVoiceSessionTTLWithRelocate(t *testing.T) { initArgs, &initRpl); err != nil { t.Error(err) } - time.Sleep(time.Duration(10 * time.Millisecond)) + time.Sleep(time.Duration(20 * time.Millisecond)) if *initRpl.MaxUsage != usage { t.Errorf("Expected: %+v, received: %+v", usage, *initRpl.MaxUsage) }