diff --git a/apier/v1/filterindexecache_it_test.go b/apier/v1/filterindexecache_it_test.go index 204b38c37..1cc8a400f 100644 --- a/apier/v1/filterindexecache_it_test.go +++ b/apier/v1/filterindexecache_it_test.go @@ -20,6 +20,7 @@ along with this program. If not, see package v1 import ( + "flag" "fmt" "net/rpc" "net/rpc/jsonrpc" @@ -34,7 +35,22 @@ import ( ) var ( - tFIdxCaRpc *rpc.Client + filter *engine.Filter + tSv1CfgPath string + tSv1Cfg *config.CGRConfig + tSv1ConfDIR string //run tests for specific configuration + dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") + tFIdxCaRpc *rpc.Client + rdsITdb *engine.RedisStorage + mgoITdb *engine.MongoStorage + onStor *engine.DataManager + err error + indexes map[string]utils.StringMap + matchedindexes utils.StringMap + tPrfl *engine.ThresholdProfile + statConfig *engine.StatQueueProfile + rlsConfig *engine.ResourceProfile + thdsDelay int ) var sTestsFilterIndexesSV1Ca = []func(t *testing.T){ @@ -45,23 +61,31 @@ var sTestsFilterIndexesSV1Ca = []func(t *testing.T){ testV1FIdxCaStartEngine, testV1FIdxCaRpcConn, - testV1FIdxCaGetThresholdsWithNotFound, + testV1FIdxCaProcessEventWithNotFound, testV1FIdxCaSetThresholdProfile, testV1FIdxCaFromFolder, - testV1FIdxCaGetThresholds, + testV1FIdxCaGetThresholdFromTP, testV1FIdxCaUpdateThresholdProfile, + testV1FIdxCaUpdateThresholdProfileFromTP, testV1FIdxCaRemoveThresholdProfile, - // To be implemented after threshold one works - /* - testFlush, - testV1FIdxCaGetStatQueuesWithNotFound, - testV1FIdxCaSetStatQueueProfile, - testV1FIdxCaGetStatQueues, - testV1FIdxCaFromFolder, - testV1FIdxCaUpdateStatQueueProfile, - testV1FIdxCaRemoveStatQueueProfile, - */ + testFlush, + testV1FIdxCaGetStatQueuesWithNotFound, + testV1FIdxCaSetStatQueueProfile, + testV1FIdxCaFromFolder, + testV1FIdxCaGetStatQueuesFromTP, + testV1FIdxCaUpdateStatQueueProfile, + testV1FIdxCaUpdateStatQueueProfileFromTP, + testV1FIdxCaRemoveStatQueueProfile, + + testFlush, + testV1FIdxCaProcessAttributeProfileEventWithNotFound, + testV1FIdxCaSetAttributeProfile, + testV1FIdxCaFromFolder, + testV1FIdxCaGetAttributeProfileFromTP, + testV1FIdxCaUpdateAttributeProfile, + testV1FIdxCaUpdateAttributeProfileFromTP, + testV1FIdxCaRemoveAttributeProfile, } func TestFIdxCaV1ITMySQLConnect(t *testing.T) { @@ -138,6 +162,13 @@ func testV1FIdxCaStartEngine(t *testing.T) { } } +func testFlush(t *testing.T) { + onStor.DataDB().Flush("") + if err := engine.SetDBVersions(onStor.DataDB()); err != nil { + t.Error("Error ", err.Error()) + } +} + func testV1FIdxCaRpcConn(t *testing.T) { var err error tFIdxCaRpc, err = jsonrpc.Dial("tcp", tSv1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed @@ -156,23 +187,29 @@ func testV1FIdxCaFromFolder(t *testing.T) { } //ThresholdProfile -func testV1FIdxCaGetThresholdsWithNotFound(t *testing.T) { - var tIDs []string - if err := tFIdxCaRpc.Call(utils.ThresholdSv1GetThresholdIDs, "cgrates.org", &tIDs); err != nil { - t.Error(err) +func testV1FIdxCaProcessEventWithNotFound(t *testing.T) { + tEv := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.EventType: utils.BalanceUpdate, + utils.Account: "1001", + }, } - _, err := onStor.MatchFilterIndex(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", false), - "Account", "1001") - if err != nil && err != utils.ErrNotFound { + var hits int + eHits := 0 + if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &hits); err != nil { t.Error(err) + } else if hits != 0 { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", true), - nil); err != nil && err != utils.ErrNotFound { + nil); err == nil || err != utils.ErrNotFound { t.Error(err) } } + func testV1FIdxCaSetThresholdProfile(t *testing.T) { - var reply *engine.ThresholdProfile filter = &engine.Filter{ Tenant: "cgrates.org", ID: "TestFilter", @@ -182,62 +219,64 @@ func testV1FIdxCaSetThresholdProfile(t *testing.T) { Type: "*string", Values: []string{"1001"}, }, + &engine.RequestFilter{ + FieldName: utils.EventType, + Type: "*string", + Values: []string{utils.BalanceUpdate}, + }, }, ActivationInterval: &utils.ActivationInterval{ ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), - ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), }, } - var result string if err := tFIdxCaRpc.Call("ApierV1.SetFilter", filter, &result); err != nil { t.Error(err) } else if result != utils.OK { t.Error("Unexpected reply returned", result) } - if err := tFIdxCaRpc.Call("ApierV1.GetThresholdProfile", - &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err == nil || - err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } tPrfl = &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "TEST_PROFILE1", FilterIDs: []string{"TestFilter"}, ActivationInterval: &utils.ActivationInterval{ ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), - ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), }, + MinHits: 1, Recurrent: true, MinSleep: time.Duration(5 * time.Minute), Blocker: false, Weight: 20.0, - ActionIDs: []string{"ACT_1", "ACT_2"}, Async: true, } + if err := tFIdxCaRpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil { t.Error(err) } else if result != utils.OK { t.Error("Unexpected reply returned", result) } - if err := tFIdxCaRpc.Call("ApierV1.GetThresholdProfile", - &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(tPrfl, reply) { - t.Errorf("Expecting: %+v, received: %+v", tPrfl, reply) + //matches TEST_PROFILE1 + tEv := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.EventType: utils.BalanceUpdate, + utils.Account: "1001", + }, } - expectedIDX := utils.StringMap{"TEST_PROFILE1": true} - matchedindexes, err := onStor.MatchFilterIndex(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", false), - "Account", "1001") - if err != nil { + var hits int + eHits := 1 + //Testing ProcessEvent on set thresholdprofile using apier + if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &hits); err != nil { t.Error(err) - } else if !reflect.DeepEqual(expectedIDX, matchedindexes) { - t.Errorf("Expecting: %+v, received: %+v", expectedIDX, matchedindexes) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } - fldNameVal2 := map[string]string{"TEST_PROFILE1": ""} - expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1001": true}} + //test to make sure indexes are made as expected + fldNameVal := map[string]string{"TEST_PROFILE1": ""} + expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1001": true, "EventType:BalanceUpdate": true}} if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", true), - fldNameVal2); err != nil && err != utils.ErrNotFound { + fldNameVal); err != nil { t.Error(err) } if !reflect.DeepEqual(expectedRevIDX, indexes) { @@ -245,45 +284,40 @@ func testV1FIdxCaSetThresholdProfile(t *testing.T) { } } -func testV1FIdxCaGetThresholds(t *testing.T) { - // var resp string - var tIDs []string - expectedIDs := []string{"THD_RES_1", "THD_STATS_2", "THD_STATS_1", "THD_ACNT_BALANCE_1", "THD_ACNT_EXPIRED", "THD_STATS_3", "THD_CDRS_1"} - if err := tFIdxCaRpc.Call(utils.ThresholdSv1GetThresholdIDs, "cgrates.org", &tIDs); err != nil { +func testV1FIdxCaGetThresholdFromTP(t *testing.T) { + //matches THD_ACNT_BALANCE_1 + tEv := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.EventType: utils.BalanceUpdate, + utils.Account: "1001", + utils.BalanceID: utils.META_DEFAULT, + utils.Units: 12.3, + }, + } + var hits int + eHits := 1 + //Testing ProcessEvent on set thresholdprofile using apier + if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &hits); err != nil { t.Error(err) - } else if len(expectedIDs) != len(tIDs) { - t.Errorf("expecting: %+v, received reply: %s", expectedIDs, tIDs) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } - var td engine.Threshold - eTd := engine.Threshold{Tenant: "cgrates.org", ID: expectedIDs[0]} - if err := tFIdxCaRpc.Call(utils.ThresholdSv1GetThreshold, - &utils.TenantID{Tenant: "cgrates.org", ID: expectedIDs[0]}, &td); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(eTd, td) { - t.Errorf("expecting: %+v, received: %+v", eTd, td) - } - idx := utils.StringMap{"TEST_PROFILE1": true} - matchedindexes, err := onStor.MatchFilterIndex(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", false), - "Account", "1001") - if err != nil { - t.Error(err) - } - if !reflect.DeepEqual(idx, matchedindexes) { - t.Errorf("Expecting: %+v, received: %+v", idx, utils.ToJSON(matchedindexes)) - } - idx2 := map[string]utils.StringMap{"THD_ACNT_BALANCE_1": {"Account:1001": true, "Account:1002": true, "EventType:BalanceUpdate": true}} - fldNameVal2 := map[string]string{"THD_ACNT_BALANCE_1": ""} + //test to make sure indexes are made as expected + idx := map[string]utils.StringMap{"THD_ACNT_BALANCE_1": {"Account:1001": true, "Account:1002": true, "EventType:BalanceUpdate": true}} + fldNameVal := map[string]string{"THD_ACNT_BALANCE_1": ""} if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", true), - fldNameVal2); err != nil { + fldNameVal); err != nil { t.Error(err) } - if !reflect.DeepEqual(idx2, indexes) { - t.Errorf("Expecting: %+v, received: %+v", idx2, utils.ToJSON(indexes)) + if !reflect.DeepEqual(idx, indexes) { + t.Errorf("Expecting: %+v, received: %+v", idx, utils.ToJSON(indexes)) } } func testV1FIdxCaUpdateThresholdProfile(t *testing.T) { - var reply *engine.ThresholdProfile + var result string filter = &engine.Filter{ Tenant: "cgrates.org", ID: "TestFilter2", @@ -293,14 +327,16 @@ func testV1FIdxCaUpdateThresholdProfile(t *testing.T) { Type: "*string", Values: []string{"1002"}, }, + &engine.RequestFilter{ + FieldName: utils.EventType, + Type: "*string", + Values: []string{utils.AccountUpdate}, + }, }, ActivationInterval: &utils.ActivationInterval{ ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), - ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), }, } - - var result string if err := tFIdxCaRpc.Call("ApierV1.SetFilter", filter, &result); err != nil { t.Error(err) } else if result != utils.OK { @@ -312,13 +348,11 @@ func testV1FIdxCaUpdateThresholdProfile(t *testing.T) { FilterIDs: []string{"TestFilter2"}, ActivationInterval: &utils.ActivationInterval{ ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), - ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), }, Recurrent: true, MinSleep: time.Duration(5 * time.Minute), Blocker: false, Weight: 20.0, - ActionIDs: []string{"ACT_1", "ACT_2"}, Async: true, } if err := tFIdxCaRpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil { @@ -326,24 +360,129 @@ func testV1FIdxCaUpdateThresholdProfile(t *testing.T) { } else if result != utils.OK { t.Error("Unexpected reply returned", result) } - if err := tFIdxCaRpc.Call("ApierV1.GetThresholdProfile", - &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(tPrfl, reply) { - t.Errorf("Expecting: %+v, received: %+v", tPrfl, reply) + //make sure doesn't match the thresholdprofile after update + tEv := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.EventType: utils.AccountUpdate, + utils.Account: "1001", + }, } - expectedIDX := utils.StringMap{"TEST_PROFILE1": true} - matchedindexes, err := onStor.MatchFilterIndex(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", false), - "Account", "1002") - if err != nil { + var hits int + eHits := 0 + //Testing ProcessEvent on set thresholdprofile after update making sure there are no hits + if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &hits); err != nil { t.Error(err) - } else if !reflect.DeepEqual(expectedIDX, matchedindexes) { - t.Errorf("Expecting: %+v, received: %+v", expectedIDX, matchedindexes) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } - fldNameVal2 := map[string]string{"TEST_PROFILE1": ""} - expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1002": true}} + //matches thresholdprofile after update + tEv2 := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.EventType: utils.AccountUpdate, + utils.Account: "1002", + }, + } + eHits = 1 + //Testing ProcessEvent on set thresholdprofile after update + if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv2, &hits); err != nil { + t.Error(err) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) + } + //test to make sure indexes are made as expecte + fldNameVal := map[string]string{"TEST_PROFILE1": ""} + expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1002": true, "EventType:AccountUpdate": true}} if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", true), - fldNameVal2); err != nil { + fldNameVal); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(expectedRevIDX, indexes) { + t.Errorf("Expecting: %+v, received: %+v", expectedRevIDX, utils.ToJSON(indexes)) + } +} + +func testV1FIdxCaUpdateThresholdProfileFromTP(t *testing.T) { + var result string + filter = &engine.Filter{ + Tenant: "cgrates.org", + ID: "TestFilter3", + RequestFilters: []*engine.RequestFilter{ + &engine.RequestFilter{ + FieldName: "Account", + Type: "*string", + Values: []string{"1003"}, + }, + &engine.RequestFilter{ + FieldName: utils.EventType, + Type: "*string", + Values: []string{utils.BalanceUpdate}, + }, + }, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + }, + } + if err := tFIdxCaRpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + + var reply *engine.ThresholdProfile + + if err := tFIdxCaRpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "THD_ACNT_BALANCE_1"}, &reply); err != nil { + t.Error(err) + } + reply.FilterIDs = []string{"TestFilter3"} + reply.ActivationInterval = &utils.ActivationInterval{ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local()} + + if err := tFIdxCaRpc.Call("ApierV1.SetThresholdProfile", reply, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + + tEv := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.Account: "1002", + utils.EventType: utils.BalanceUpdate, + }, + } + var hits int + eHits := 0 + //Testing ProcessEvent on set thresholdprofile using apier + if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &hits); err != nil { + t.Error(err) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) + } + tEv2 := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event3", + Event: map[string]interface{}{ + utils.Account: "1003", + utils.EventType: utils.BalanceUpdate, + }, + } + eHits = 1 + //Testing ProcessEvent on set thresholdprofile using apier + if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv2, &hits); err != nil { + t.Error(err) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) + } + //test to make sure indexes are made as expecte + fldNameVal := map[string]string{"TEST_PROFILE1": ""} + expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1002": true, "EventType:AccountUpdate": true}} + if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", true), + fldNameVal); err != nil { t.Error(err) } if !reflect.DeepEqual(expectedRevIDX, indexes) { @@ -353,92 +492,106 @@ func testV1FIdxCaUpdateThresholdProfile(t *testing.T) { func testV1FIdxCaRemoveThresholdProfile(t *testing.T) { var resp string - - expectedIDX := utils.StringMap{"TEST_PROFILE1": true} - matchedindexes, err := onStor.MatchFilterIndex(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", false), - "Account", "1001") - if err != nil { + tEv := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event8", + Event: map[string]interface{}{ + utils.Account: "1002", + utils.EventType: utils.AccountUpdate, + }} + var hits int + eHits := 1 + if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &hits); err != nil { t.Error(err) - } else if !reflect.DeepEqual(expectedIDX, matchedindexes) { - t.Errorf("Expecting: %+v, received: %+v", expectedIDX, matchedindexes) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } - expectedIDX = utils.StringMap{"THD_ACNT_BALANCE_1": true} - matchedindexes, err = onStor.MatchFilterIndex(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", false), - "Account", "1002") - if err != nil { + tEv2 := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event9", + Event: map[string]interface{}{ + utils.Account: "1003", + utils.EventType: utils.BalanceUpdate, + }} + eHits = 1 + if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv2, &hits); err != nil { t.Error(err) - } else if !reflect.DeepEqual(expectedIDX, matchedindexes) { - t.Errorf("Expecting: %+v, received: %+v", expectedIDX, matchedindexes) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } - + //Remove threshold profile that was set form api if err := tFIdxCaRpc.Call("ApierV1.RemThresholdProfile", &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &resp); err != nil { t.Error(err) } else if resp != utils.OK { t.Error("Unexpected reply returned", resp) } + var sqp *engine.ThresholdProfile + //Test the remove + if err := tFIdxCaRpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &sqp); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + //Remove threshold profile that was set form tariffplan if err := tFIdxCaRpc.Call("ApierV1.RemThresholdProfile", &utils.TenantID{Tenant: "cgrates.org", ID: "THD_ACNT_BALANCE_1"}, &resp); err != nil { t.Error(err) } else if resp != utils.OK { t.Error("Unexpected reply returned", resp) } - var sqp *engine.ThresholdProfile - if err := tFIdxCaRpc.Call("ApierV1.GetThresholdProfile", - &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &sqp); err == nil || - err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } + //Test the remove if err := tFIdxCaRpc.Call("ApierV1.GetThresholdProfile", &utils.TenantID{Tenant: "cgrates.org", ID: "THD_ACNT_BALANCE_1"}, &sqp); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - matchedindexes, err = onStor.MatchFilterIndex(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", false), - "Account", "1002") - if err != nil && err != utils.ErrNotFound { + eHits = 0 + if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &hits); err != nil { t.Error(err) - } else if matchedindexes != nil { - t.Errorf("Expecting: %+v, received: %+v", nil, matchedindexes) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } - matchedindexes, err = onStor.MatchFilterIndex(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", false), - "Account", "1002") - if err != nil && err != utils.ErrNotFound { + if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv2, &hits); err != nil { t.Error(err) - } else if matchedindexes != nil { - t.Errorf("Expecting: %+v, received: %+v", nil, matchedindexes) + } else if hits != eHits { + t.Errorf("Expecting hits: %d, received: %d", eHits, hits) } - fldNameVals2 := map[string]string{"THD_ACNT_BALANCE_1": "", "TEST_PROFILE1": ""} + //test to make sure indexes are made as expected + fldNameVal2 := map[string]string{"THD_ACNT_BALANCE_1": "", "TEST_PROFILE1": ""} if _, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", true), - fldNameVals2); err != nil && err != utils.ErrNotFound { + fldNameVal2); err == nil || err != utils.ErrNotFound { t.Error(err) } } -/* //StatQueue func testV1FIdxCaGetStatQueuesWithNotFound(t *testing.T) { - var reply *engine.StatQueueProfile - if err := tFIdxCaRpc.Call("ApierV1.GetStatQueueProfile", - &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err == nil || + var reply *string + tEv := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.EventType: utils.AccountUpdate, + utils.Account: "1001", + }, + } + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) - } - if indexes, err = onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", false), - nil); err != nil && err != utils.ErrNotFound { - t.Error(err) + } else if reply != nil && *reply != "" { + t.Error("Unexpected reply returned", *reply) } if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", true), - nil); err != nil && err != utils.ErrNotFound { + nil); err == nil || err != utils.ErrNotFound { t.Error(err) } } func testV1FIdxCaSetStatQueueProfile(t *testing.T) { tenant := "cgrates.org" - var reply *engine.StatQueueProfile filter = &engine.Filter{ Tenant: tenant, ID: "FLTR_1", @@ -448,10 +601,14 @@ func testV1FIdxCaSetStatQueueProfile(t *testing.T) { Type: "*string", Values: []string{"1001"}, }, + &engine.RequestFilter{ + FieldName: utils.EventType, + Type: "*string", + Values: []string{utils.AccountUpdate}, + }, }, ActivationInterval: &utils.ActivationInterval{ ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), - ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), }, } var result string @@ -460,29 +617,19 @@ func testV1FIdxCaSetStatQueueProfile(t *testing.T) { } else if result != utils.OK { t.Error("Unexpected reply returned", result) } - if err := tFIdxCaRpc.Call("ApierV1.GetStatQueueProfile", - &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err == nil || - err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } statConfig = &engine.StatQueueProfile{ Tenant: "cgrates.org", ID: "TEST_PROFILE1", FilterIDs: []string{"FLTR_1"}, ActivationInterval: &utils.ActivationInterval{ ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), - ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), }, QueueLength: 10, TTL: time.Duration(10) * time.Second, Metrics: []*utils.MetricWithParams{ &utils.MetricWithParams{ - MetricID: "MetricValue", - Parameters: "", - }, - &utils.MetricWithParams{ - MetricID: "MetricValueTwo", - Parameters: "", + MetricID: "*sum", + Parameters: "Val", }, }, Thresholds: []string{"Val1", "Val2"}, @@ -496,123 +643,117 @@ func testV1FIdxCaSetStatQueueProfile(t *testing.T) { } else if result != utils.OK { t.Error("Unexpected reply returned", result) } - if err := tFIdxCaRpc.Call("ApierV1.GetStatQueueProfile", - &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(statConfig, reply) { - t.Errorf("Expecting: %+v, received: %+v", statConfig, reply) - } - expectedIDX := utils.StringMap{"TEST_PROFILE1": true} - matchedindexes, err := onStor.MatchFilterIndex(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", false), - "Account", "1001") - if err != nil && err != utils.ErrNotFound { - t.Error(err) - } - if !reflect.DeepEqual(expectedIDX, matchedindexes) { - t.Errorf("Expecting: %+v, received: %+v", expectedIDX, utils.ToJSON(matchedindexes)) - } - fldNameVal2 := map[string]string{"TEST_PROFILE1": ""} - expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1001": true}} - if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", true), - fldNameVal2); err != nil && err != utils.ErrNotFound { - t.Error(err) - } - if !reflect.DeepEqual(expectedRevIDX, indexes) { - t.Errorf("Expecting: %+v, received: %+v", expectedRevIDX, utils.ToJSON(indexes)) - } -} -func testV1FIdxCaGetStatQueues(t *testing.T) { - tenant := "cgrates.org" - filter = &engine.Filter{ - Tenant: tenant, - ID: "FLTR_1", - RequestFilters: []*engine.RequestFilter{ - &engine.RequestFilter{ - FieldName: "Account", - Type: "*string", - Values: []string{"1002"}, - }, - }, - ActivationInterval: &utils.ActivationInterval{ - ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), - ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + tEv := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.EventType: utils.AccountUpdate, + utils.Account: "1001", }, } - var result string - if err := tFIdxCaRpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &result); err != nil { t.Error(err) } else if result != utils.OK { t.Error("Unexpected reply returned", result) } - var td engine.StatQueueProfile - eTd := engine.StatQueueProfile{ - Tenant: "cgrates.org", - ID: "TEST_PROFILE1", - FilterIDs: []string{"FLTR_1"}, - ActivationInterval: &utils.ActivationInterval{ - ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), - ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), - }, - QueueLength: 10, - TTL: 10000000000, - Metrics: []*utils.MetricWithParams{ - &utils.MetricWithParams{ - MetricID: "MetricValue", - Parameters: ""}, - &utils.MetricWithParams{MetricID: "MetricValueTwo", - Parameters: ""}}, - Thresholds: []string{"Val1", "Val2"}, - Blocker: true, - Stored: true, - Weight: 20, - MinItems: 1, - } - if err := tFIdxCaRpc.Call("ApierV1.GetStatQueueProfile", - &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &td); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(eTd, td) { - t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eTd), utils.ToJSON(td)) - } - idx := utils.StringMap{"TEST_PROFILE1": true} - matchedindexes, err := onStor.MatchFilterIndex(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", false), - "Account", "1001") - if err != nil { - t.Error(err) - } - if !reflect.DeepEqual(idx, matchedindexes) { - t.Errorf("Expecting: %+v, received: %+v", idx, utils.ToJSON(matchedindexes)) - } - - idx2 := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1001": true}} - fldNameVal2 := map[string]string{"TEST_PROFILE1": ""} + fldNameVal := map[string]string{"TEST_PROFILE1": ""} + expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1001": true, "EventType:AccountUpdate": true}} if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", true), - fldNameVal2); err != nil { + fldNameVal); err != nil { t.Error(err) } - if !reflect.DeepEqual(idx2, indexes) { - t.Errorf("Expecting: %+v, received: %+v", idx2, utils.ToJSON(indexes)) + if !reflect.DeepEqual(expectedRevIDX, indexes) { + t.Errorf("Expecting: %+v, received: %+v", expectedRevIDX, indexes) + } +} + +func testV1FIdxCaGetStatQueuesFromTP(t *testing.T) { + var reply string + ev2 := utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event2", + Event: map[string]interface{}{ + utils.Account: "1002", + utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.Usage: time.Duration(45 * time.Second)}} + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &ev2, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("received reply: %s", reply) + } + ev3 := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event3", + Event: map[string]interface{}{ + utils.Account: "1002", + utils.SetupTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.Usage: 0}} + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &ev3, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("received reply: %s", reply) + } + + tEv := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.EventType: utils.AccountUpdate, + utils.Account: "1001", + "Val": 7, + }} + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &tEv, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("received reply: %s", reply) + } + tEv2 := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.EventType: utils.AccountUpdate, + utils.Account: "1001", + "Val": 8, + }} + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, &tEv2, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("received reply: %s", reply) + } + + idx := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1001": true, "EventType:AccountUpdate": true}} + fldNameVal := map[string]string{"TEST_PROFILE1": ""} + + if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", true), + fldNameVal); err != nil { + t.Error(err) + } + + if !reflect.DeepEqual(idx, indexes) { + t.Errorf("Expecting: %+v, received: %+v", idx, utils.ToJSON(indexes)) } } func testV1FIdxCaUpdateStatQueueProfile(t *testing.T) { - - tenant := "cgrates.org" - var reply *engine.StatQueueProfile filter = &engine.Filter{ - Tenant: tenant, - ID: "FLTR_1", + Tenant: "cgrates.org", + ID: "FLTR_2", RequestFilters: []*engine.RequestFilter{ &engine.RequestFilter{ FieldName: "Account", Type: "*string", Values: []string{"1002"}, }, + &engine.RequestFilter{ + FieldName: utils.EventType, + Type: "*string", + Values: []string{utils.BalanceUpdate}, + }, }, ActivationInterval: &utils.ActivationInterval{ ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), - ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), }, } var result string @@ -624,20 +765,15 @@ func testV1FIdxCaUpdateStatQueueProfile(t *testing.T) { statConfig = &engine.StatQueueProfile{ Tenant: "cgrates.org", ID: "TEST_PROFILE1", - FilterIDs: []string{"FLTR_1"}, + FilterIDs: []string{"FLTR_2"}, ActivationInterval: &utils.ActivationInterval{ ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), - ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), }, QueueLength: 10, TTL: time.Duration(10) * time.Second, Metrics: []*utils.MetricWithParams{ &utils.MetricWithParams{ - MetricID: "MetricValue", - Parameters: "", - }, - &utils.MetricWithParams{ - MetricID: "MetricValueTwo", + MetricID: "*sum", Parameters: "", }, }, @@ -652,33 +788,23 @@ func testV1FIdxCaUpdateStatQueueProfile(t *testing.T) { } else if result != utils.OK { t.Error("Unexpected reply returned", result) } - if err := tFIdxCaRpc.Call("ApierV1.GetStatQueueProfile", - &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err != nil { + tEv := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.EventType: utils.BalanceUpdate, + utils.Account: "1002", + }} + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &result); err != nil { t.Error(err) - } else if !reflect.DeepEqual(statConfig, reply) { - t.Errorf("Expecting: %+v, received: %+v", statConfig, reply) - } - // idx := utils.StringMap{"TEST_PROFILE1": true} - // matchedindexes - _, err := onStor.MatchFilterIndex(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", false), - "Account", "1001") - if err != nil { - t.Error(err) - } - fldNameVal := map[string]string{"Account": "1002"} - expectedIDX := map[string]utils.StringMap{"Account:1002": {"Stats1": true, "TEST_PROFILE1": true}} - if indexes, err = onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", false), - fldNameVal); err != nil { - t.Error(err) - } - if !reflect.DeepEqual(expectedIDX, indexes) { - t.Errorf("Expecting: %+v, received: %+v", expectedIDX, utils.ToJSON(indexes)) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) } - fldNameVal2 := map[string]string{"TEST_PROFILE1": ""} - expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1001": true, "Account:1002": true}} + fldNameVal := map[string]string{"TEST_PROFILE1": ""} + expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1002": true, "EventType:BalanceUpdate": true}} if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", true), - fldNameVal2); err != nil { + fldNameVal); err != nil { t.Error(err) } if !reflect.DeepEqual(expectedRevIDX, indexes) { @@ -686,81 +812,481 @@ func testV1FIdxCaUpdateStatQueueProfile(t *testing.T) { } } -func testV1FIdxCaRemoveStatQueueProfile(t *testing.T) { - var resp string - fldNameVal := map[string]string{"Account": "1001"} - expectedIDX := map[string]utils.StringMap{"Account:1001": {"Stats1": true, "TEST_PROFILE1": true}} - if indexes, err = onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", false), +func testV1FIdxCaUpdateStatQueueProfileFromTP(t *testing.T) { + filter = &engine.Filter{ + Tenant: "cgrates.org", + ID: "FLTR_3", + RequestFilters: []*engine.RequestFilter{ + &engine.RequestFilter{ + FieldName: "Account", + Type: "*string", + Values: []string{"1003"}, + }, + &engine.RequestFilter{ + FieldName: utils.EventType, + Type: "*string", + Values: []string{utils.AccountUpdate}, + }, + }, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + }, + } + var result string + if err := tFIdxCaRpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + var reply *engine.StatQueueProfile + if err := tFIdxCaRpc.Call("ApierV1.GetStatQueueProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &reply); err != nil { + t.Error(err) + } + (*reply).FilterIDs = []string{"FLTR_3"} + (*reply).ActivationInterval = &utils.ActivationInterval{ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local()} + if err := tFIdxCaRpc.Call("ApierV1.SetStatQueueProfile", reply, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + tEv := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.EventType: utils.AccountUpdate, + utils.Account: "1003", + }} + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + fldNameVal := map[string]string{"Stats1": ""} + expectedRevIDX := map[string]utils.StringMap{"Stats1": {"Account:1003": true, "EventType:AccountUpdate": true}} + if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", true), fldNameVal); err != nil { t.Error(err) } - if !reflect.DeepEqual(expectedIDX, indexes) { - t.Errorf("Expecting: %+v, received: %+v", expectedIDX, utils.ToJSON(indexes)) + if !reflect.DeepEqual(expectedRevIDX, indexes) { + t.Errorf("Expecting: %+v, received: %+v", expectedRevIDX, indexes) + } +} + +func testV1FIdxCaRemoveStatQueueProfile(t *testing.T) { + var result string + tEv := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.EventType: utils.BalanceUpdate, + utils.Account: "1002", + }} + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) } - fldNameVal3 := map[string]string{"Account": "1002"} - expectedIDX3 := map[string]utils.StringMap{"Account:1002": {"Stats1": true, "TEST_PROFILE1": true}} - if indexes, err = onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", false), - fldNameVal3); err != nil { + tEv2 := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.EventType: utils.AccountUpdate, + utils.Account: "1003", + }} + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv2, &result); err != nil { t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) } - if !reflect.DeepEqual(expectedIDX3, indexes) { - t.Errorf("Expecting: %+v, received: %+v", expectedIDX3, utils.ToJSON(indexes)) - } - + //Remove threshold profile that was set form api if err := tFIdxCaRpc.Call("ApierV1.RemStatQueueProfile", - &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &resp); err != nil { + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &result); err != nil { t.Error(err) - } else if resp != utils.OK { - t.Error("Unexpected reply returned", resp) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) } - if err := tFIdxCaRpc.Call("ApierV1.RemStatQueueProfile", - &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &resp); err != nil { - t.Error(err) - } else if resp != utils.OK { - t.Error("Unexpected reply returned", resp) - } - var sqp *engine.ThresholdProfile - if err := tFIdxCaRpc.Call("ApierV1.GetThresholdProfile", + var sqp *engine.StatQueueProfile + //Test the remove + if err := tFIdxCaRpc.Call("ApierV1.GetStatQueueProfile", &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &sqp); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - if err := tFIdxCaRpc.Call("ApierV1.GetThresholdProfile", + //Remove threshold profile that was set form tariffplan + if err := tFIdxCaRpc.Call("ApierV1.RemStatQueueProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + //Test the remove + if err := tFIdxCaRpc.Call("ApierV1.GetStatQueueProfile", &utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &sqp); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - // idx := utils.StringMap{"TEST_PROFILE1": true} - // matchedindexes - _, err := onStor.MatchFilterIndex(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", false), - "Account", "1001") - if err != utils.ErrNotFound { + + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv, &result); err == nil || + err.Error() != utils.ErrNotFound.Error() { t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) } - fldNameVal1 := map[string]string{"Account": "1001"} - if indexes, err = onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", false), - fldNameVal1); err != nil && err != utils.ErrNotFound { + if err := tFIdxCaRpc.Call(utils.StatSv1ProcessEvent, tEv2, &result); err == nil || + err.Error() != utils.ErrNotFound.Error() { t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) } - if len(indexes) != 0 { - t.Errorf("Expecting: %+v, received: %+v", 0, len(indexes)) - } - fldNameVals := map[string]string{"Account": "1002"} - if indexes, err = onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", false), - fldNameVals); err != nil && err != utils.ErrNotFound { + fldNameVals := map[string]string{"THD_ACNT_BALANCE_1": "", "TEST_PROFILE1": ""} + if _, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", true), + fldNameVals); err == nil || err != utils.ErrNotFound { + t.Error(err) + } +} + +//AttributeProfile +func testV1FIdxCaProcessAttributeProfileEventWithNotFound(t *testing.T) { + ev := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testAttributeSProcessEvent", + Context: utils.StringPointer(utils.MetaRating), + Event: map[string]interface{}{ + "Account": "3009", + "Destination": "+492511231234", + }, + } + var rplyEv engine.AttrSProcessEventReply + if err := tFIdxCaRpc.Call(utils.AttributeSv1ProcessEvent, ev, &rplyEv); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, "cgrates.org", true), + nil); err == nil || err != utils.ErrNotFound { + t.Error(err) + } +} + +func testV1FIdxCaSetAttributeProfile(t *testing.T) { + filter = &engine.Filter{ + Tenant: "cgrates.org", + ID: "TestFilter", + RequestFilters: []*engine.RequestFilter{ + &engine.RequestFilter{ + FieldName: "Account", + Type: "*string", + Values: []string{"1009"}, + }, + &engine.RequestFilter{ + FieldName: "Destination", + Type: "*string", + Values: []string{"+491511231234"}, + }, + }, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + }, + } + var result string + if err := tFIdxCaRpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + alsPrf := &engine.ExternalAttributeProfile{ + Tenant: "cgrates.org", + ID: "TEST_PROFILE1", + Contexts: []string{"*rating"}, + FilterIDs: []string{"TestFilter"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), + }, + Attributes: []*engine.Attribute{ + &engine.Attribute{ + FieldName: "Account", + Initial: "*any", + Substitute: "1001", + Append: false, + }, + &engine.Attribute{ + FieldName: "Subject", + Initial: "*any", + Substitute: "1001", + Append: true, + }, + }, + Weight: 20, + } + if err := tFIdxCaRpc.Call("ApierV1.SetAttributeProfile", alsPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + //matches TEST_PROFILE1 + ev := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testAttributeSProcessEvent", + Context: utils.StringPointer(utils.MetaRating), + Event: map[string]interface{}{ + "Account": "1009", + "Destination": "+491511231234", + }, + } + var rplyEv engine.AttrSProcessEventReply + if err := tFIdxCaRpc.Call(utils.AttributeSv1ProcessEvent, ev, &rplyEv); err != nil { + t.Error(err) + } + //test to make sure indexes are made as expected + fldNameVal := map[string]string{"TEST_PROFILE1": ""} + expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1009": true, "Destination:+491511231234": true}} + if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.AttributeProfilePrefix, "cgrates.org:*rating", true), + fldNameVal); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(expectedRevIDX, indexes) { + t.Errorf("Expecting: %+v, received: %+v", expectedRevIDX, indexes) + } +} + +func testV1FIdxCaGetAttributeProfileFromTP(t *testing.T) { + //matches ATTR_1 + ev := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testAttributeSProcessEvent", + Context: utils.StringPointer(utils.MetaRating), + Event: map[string]interface{}{ + "Account": "1007", + "Destination": "+491511231234", + }, + } + var rplyEv engine.AttrSProcessEventReply + if err := tFIdxCaRpc.Call(utils.AttributeSv1ProcessEvent, ev, &rplyEv); err != nil { + t.Error(err) + } + //test to make sure indexes are made as expected + idx := map[string]utils.StringMap{"ATTR_1": {"Account:1007": true}} + fldNameVal := map[string]string{"ATTR_1": ""} + if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.AttributeProfilePrefix, "cgrates.org:*rating", true), + fldNameVal); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(idx, indexes) { + t.Errorf("Expecting: %+v, received: %+v", idx, utils.ToJSON(indexes)) + } +} + +func testV1FIdxCaUpdateAttributeProfile(t *testing.T) { + filter = &engine.Filter{ + Tenant: "cgrates.org", + ID: "TestFilter2", + RequestFilters: []*engine.RequestFilter{ + &engine.RequestFilter{ + FieldName: "Account", + Type: "*string", + Values: []string{"2009"}, + }, + &engine.RequestFilter{ + FieldName: "Destination", + Type: "*string", + Values: []string{"+492511231234"}, + }, + }, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + }, + } + var result string + if err := tFIdxCaRpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + alsPrf := &engine.ExternalAttributeProfile{ + Tenant: "cgrates.org", + ID: "TEST_PROFILE1", + Contexts: []string{"*rating"}, + FilterIDs: []string{"TestFilter2"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), + }, + Attributes: []*engine.Attribute{ + &engine.Attribute{ + FieldName: "Account", + Initial: "*any", + Substitute: "1001", + Append: false, + }, + &engine.Attribute{ + FieldName: "Subject", + Initial: "*any", + Substitute: "1001", + Append: true, + }, + }, + Weight: 20, + } + if err := tFIdxCaRpc.Call("ApierV1.SetAttributeProfile", alsPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + //matches TEST_PROFILE1 + ev := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testAttributeSProcessEvent", + Context: utils.StringPointer(utils.MetaRating), + Event: map[string]interface{}{ + "Account": "2009", + "Destination": "+492511231234", + }, + } + var rplyEv engine.AttrSProcessEventReply + if err := tFIdxCaRpc.Call(utils.AttributeSv1ProcessEvent, ev, &rplyEv); err != nil { + t.Error(err) + } + //test to make sure indexes are made as expected + idx := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:2009": true, "Destination:+492511231234": true}} + fldNameVal := map[string]string{"TEST_PROFILE1": ""} + if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.AttributeProfilePrefix, "cgrates.org:*rating", true), + fldNameVal); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(idx, indexes) { + t.Errorf("Expecting: %+v, received: %+v", idx, utils.ToJSON(indexes)) + } +} + +func testV1FIdxCaUpdateAttributeProfileFromTP(t *testing.T) { + filter = &engine.Filter{ + Tenant: "cgrates.org", + ID: "TestFilter3", + RequestFilters: []*engine.RequestFilter{ + &engine.RequestFilter{ + FieldName: "Account", + Type: "*string", + Values: []string{"3009"}, + }, + &engine.RequestFilter{ + FieldName: "Destination", + Type: "*string", + Values: []string{"+492511231234"}, + }, + }, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + }, + } + var result string + if err := tFIdxCaRpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + var reply *engine.ExternalAttributeProfile + if err := tFIdxCaRpc.Call("ApierV1.GetAttributeProfile", &utils.TenantID{Tenant: "cgrates.org", ID: "ATTR_1"}, &reply); err != nil { + t.Error(err) + } + reply.FilterIDs = []string{"TestFilter3"} + if err := tFIdxCaRpc.Call("ApierV1.SetAttributeProfile", reply, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + //matches TEST_PROFILE1 + ev := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testAttributeSProcessEvent", + Context: utils.StringPointer(utils.MetaRating), + Event: map[string]interface{}{ + "Account": "3009", + "Destination": "+492511231234", + }, + } + var rplyEv engine.AttrSProcessEventReply + if err := tFIdxCaRpc.Call(utils.AttributeSv1ProcessEvent, ev, &rplyEv); err != nil { + t.Error(err) + } + //test to make sure indexes are made as expected + idx := map[string]utils.StringMap{"ATTR_1": {"Account:3009": true, "Destination:+492511231234": true}} + fldNameVal := map[string]string{"ATTR_1": ""} + if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.AttributeProfilePrefix, "cgrates.org:*rating", true), + fldNameVal); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(idx, indexes) { + t.Errorf("Expecting: %+v, received: %+v", idx, utils.ToJSON(indexes)) + } +} + +func testV1FIdxCaRemoveAttributeProfile(t *testing.T) { + var resp string + ev := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testAttributeSProcessEvent", + Context: utils.StringPointer(utils.MetaRating), + Event: map[string]interface{}{ + "Account": "3009", + "Destination": "+492511231234", + }, + } + var rplyEv engine.AttrSProcessEventReply + if err := tFIdxCaRpc.Call(utils.AttributeSv1ProcessEvent, ev, &rplyEv); err != nil { + t.Error(err) + } + + ev2 := &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "testAttributeSProcessEvent", + Context: utils.StringPointer(utils.MetaRating), + Event: map[string]interface{}{ + "Account": "2009", + "Destination": "+492511231234", + }, + } + if err := tFIdxCaRpc.Call(utils.AttributeSv1ProcessEvent, ev2, &rplyEv); err != nil { + t.Error(err) + } + //Remove threshold profile that was set form api + if err := tFIdxCaRpc.Call("ApierV1.RemAttributeProfile", &ArgRemoveAttrProfile{Tenant: "cgrates.org", + ID: "TEST_PROFILE1", Contexts: []string{"*rating"}}, &resp); err != nil { + t.Error(err) + } else if resp != utils.OK { + t.Error("Unexpected reply returned", resp) + } + var sqp *engine.ExternalAttributeProfile + //Test the remove + if err := tFIdxCaRpc.Call("ApierV1.GetAttributeProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &sqp); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + //Remove threshold profile that was set form tariffplan + if err := tFIdxCaRpc.Call("ApierV1.RemAttributeProfile", &ArgRemoveAttrProfile{Tenant: "cgrates.org", + ID: "ATTR_1", Contexts: []string{"*rating"}}, &resp); err != nil { + t.Error(err) + } else if resp != utils.OK { + t.Error("Unexpected reply returned", resp) + } + //Test the remove + if err := tFIdxCaRpc.Call("ApierV1.GetAttributeProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "ATTR_1"}, &sqp); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + if err := tFIdxCaRpc.Call(utils.AttributeSv1ProcessEvent, ev, &rplyEv); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + if err := tFIdxCaRpc.Call(utils.AttributeSv1ProcessEvent, ev2, &rplyEv); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + //test to make sure indexes are made as expected + fldNameVal2 := map[string]string{"ATTR_1": "", "TEST_PROFILE1": ""} + if _, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.AttributeProfilePrefix, "cgrates.org:*rating", true), + fldNameVal2); err == nil || err != utils.ErrNotFound { t.Error(err) } - if len(indexes) != 0 { - t.Errorf("Expecting: %+v, received: %+v", 0, len(indexes)) - } - fldNameVals2 := map[string]string{"THD_ACNT_BALANCE_1": "", "TEST_PROFILE1": ""} - if indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, "cgrates.org", true), - fldNameVals2); err != nil && err != utils.ErrNotFound { - t.Error(err) - } - if len(indexes) != 0 { - t.Errorf("Expecting: %+v, received: %+v", 0, len(indexes)) - } } -*/ diff --git a/apier/v1/resourcesv1.go b/apier/v1/resourcesv1.go index 8e2510255..57aa2b6cf 100644 --- a/apier/v1/resourcesv1.go +++ b/apier/v1/resourcesv1.go @@ -81,6 +81,9 @@ func (apierV1 *ApierV1) SetResourceProfile(res *engine.ResourceProfile, reply *s if err := apierV1.DataManager.SetResourceProfile(res, true); err != nil { return utils.APIErrorHandler(err) } + if err := apierV1.DataManager.SetResource(&engine.Resource{Tenant: res.Tenant, ID: res.ID, Usages: make(map[string]*engine.ResourceUsage)}); err != nil { + return utils.APIErrorHandler(err) + } *reply = utils.OK return nil } diff --git a/apier/v1/stats.go b/apier/v1/stats.go index cfdc7ffb2..53f088561 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -45,6 +45,17 @@ func (apierV1 *ApierV1) SetStatQueueProfile(sqp *engine.StatQueueProfile, reply if err := apierV1.DataManager.SetStatQueueProfile(sqp, true); err != nil { return utils.APIErrorHandler(err) } + metrics := make(map[string]engine.StatMetric) + for _, metricwithparam := range sqp.Metrics { + if metric, err := engine.NewStatMetric(metricwithparam.MetricID, sqp.MinItems, metricwithparam.Parameters); err != nil { + return utils.APIErrorHandler(err) + } else { + metrics[metricwithparam.MetricID] = metric + } + } + if err := apierV1.DataManager.SetStatQueue(&engine.StatQueue{Tenant: sqp.Tenant, ID: sqp.ID, SQMetrics: metrics}); err != nil { + return utils.APIErrorHandler(err) + } *reply = utils.OK return nil } diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index 681dffc9a..382c3fb5f 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -307,11 +307,11 @@ func testV1STSSetStatQueueProfile(t *testing.T) { TTL: time.Duration(10) * time.Second, Metrics: []*utils.MetricWithParams{ &utils.MetricWithParams{ - MetricID: "MetricValue", + MetricID: utils.MetaSum, Parameters: "", }, &utils.MetricWithParams{ - MetricID: "MetricValueTwo", + MetricID: utils.MetaAverage, Parameters: "", }, }, diff --git a/apier/v1/thresholds.go b/apier/v1/thresholds.go index 0dbfa91e5..acbd1b8cb 100644 --- a/apier/v1/thresholds.go +++ b/apier/v1/thresholds.go @@ -79,6 +79,9 @@ func (apierV1 *ApierV1) SetThresholdProfile(thp *engine.ThresholdProfile, reply if err := apierV1.DataManager.SetThresholdProfile(thp, true); err != nil { return utils.APIErrorHandler(err) } + if err := apierV1.DataManager.SetThreshold(&engine.Threshold{Tenant: thp.Tenant, ID: thp.ID}); err != nil { + return err + } *reply = utils.OK return nil } diff --git a/engine/attributes.go b/engine/attributes.go index e3fed3c11..5e0d9cd5c 100644 --- a/engine/attributes.go +++ b/engine/attributes.go @@ -59,6 +59,7 @@ func (alS *AttributeService) matchingAttributeProfilesForEvent(ev *utils.CGREven if ev.Context != nil && *ev.Context != "" { contextVal = *ev.Context } + attrIdxKey = utils.ConcatenatedKey(ev.Tenant, contextVal) matchingAPs := make(map[string]*AttributeProfile) aPrflIDs, err := matchingItemIDsForEvent(ev.Event, alS.indexedFields, diff --git a/engine/datamanager.go b/engine/datamanager.go index e6f3a285b..f4eaf6547 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -431,6 +431,8 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) return } } + cache.RemPrefixKey(utils.ThresholdStringIndex, true, utils.NonTransactional) + cache.RemPrefixKey(utils.ThresholdStringRevIndex, true, utils.NonTransactional) return } @@ -486,7 +488,7 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool var fltr *Filter if fltr, err = dm.GetFilter(sqp.Tenant, fltrID, false, utils.NonTransactional); err != nil { if err == utils.ErrNotFound { - err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, sqp) + err = fmt.Errorf("broken reference to filter: %+v for statqueue: %+v", fltrID, sqp) } return } @@ -500,13 +502,14 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool } } } - indexer.IndexTPFilter(FilterToTPFilter(fltr), sqp.ID) } if err = indexer.StoreIndexes(); err != nil { return } } + cache.RemPrefixKey(utils.StatQueuesStringIndex, true, utils.NonTransactional) + cache.RemPrefixKey(utils.StatQueuesStringRevIndex, true, utils.NonTransactional) return } @@ -1182,6 +1185,9 @@ func (dm *DataManager) SetAttributeProfile(ap *AttributeProfile, withIndex bool) } } } + + cache.RemPrefixKey(utils.AttributeProfilesStringIndex, true, utils.NonTransactional) + cache.RemPrefixKey(utils.AttributeProfilesStringRevIndex, true, utils.NonTransactional) return } diff --git a/engine/filterindexer.go b/engine/filterindexer.go index 4fcdbb0b2..8cab463f0 100644 --- a/engine/filterindexer.go +++ b/engine/filterindexer.go @@ -154,7 +154,6 @@ func (rfi *ReqFilterIndexer) RemoveItemFromIndex(itemID string) (err error) { } } } - rfi.reveseIndex[itemID] = make(utils.StringMap) //Force deleting in driver if err = rfi.dm.SetFilterIndexes( GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), diff --git a/engine/filters.go b/engine/filters.go index b4e88ca14..8f891f287 100644 --- a/engine/filters.go +++ b/engine/filters.go @@ -83,6 +83,10 @@ func (fS *FilterS) PassFiltersForEvent(tenant string, ev map[string]interface{}, !f.ActivationInterval.IsActiveAtTime(time.Now()) { // not active continue } + for _, fltr22 := range f.RequestFilters { + strVal, _ := utils.ReflectFieldAsString(ev, fltr22.FieldName, "") + } + for _, fltr := range f.RequestFilters { switch fltr.Type { case MetaString: diff --git a/engine/stats.go b/engine/stats.go index 36ea020b7..cbceca7fc 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -169,6 +169,7 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *utils.CGREvent) (sqs StatQ if err != nil { return nil, err } + if sqPrfl.Stored && s.dirty == nil { s.dirty = utils.BoolPointer(false) } diff --git a/engine/thresholds.go b/engine/thresholds.go index 520631275..5f12079b6 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -216,12 +216,16 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) { func (tS *ThresholdService) matchingThresholdsForEvent(ev *utils.CGREvent) (ts Thresholds, err error) { matchingTs := make(map[string]*Threshold) tIDs, err := matchingItemIDsForEvent(ev.Event, tS.indexedFields, tS.dm, utils.ThresholdStringIndex+ev.Tenant) + for _, itm := range tIDs { + } if err != nil { return nil, err } + lockIDs := utils.PrefixSliceItems(tIDs.Slice(), utils.ThresholdStringIndex) guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...) defer guardian.Guardian.UnguardIDs(lockIDs...) + for tID := range tIDs { tPrfl, err := tS.dm.GetThresholdProfile(ev.Tenant, tID, false, utils.NonTransactional) if err != nil { @@ -234,11 +238,13 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *utils.CGREvent) (ts T !tPrfl.ActivationInterval.IsActiveAtTime(time.Now()) { // not active continue } + if pass, err := tS.filterS.PassFiltersForEvent(ev.Tenant, ev.Event, tPrfl.FilterIDs); err != nil { return nil, err } else if !pass { continue } + t, err := tS.dm.GetThreshold(tPrfl.Tenant, tPrfl.ID, false, "") if err != nil { return nil, err