diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index d2cd04336..a25483f9a 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -78,6 +78,7 @@ var sTestsStatSV1 = []func(t *testing.T){ testV1STSFromFolder, testV1STSGetStats, testV1STSProcessEvent, + testV1STSGetStatsAfterRestart, testV1STSSetStatQueueProfile, testV1STSUpdateStatQueueProfile, testV1STSRemoveStatQueueProfile, @@ -157,6 +158,7 @@ func testV1STSGetStats(t *testing.T) { utils.MetaTCC: utils.NOT_AVAILABLE, utils.MetaTCD: utils.NOT_AVAILABLE, utils.MetaACC: utils.NOT_AVAILABLE, + utils.MetaPDD: utils.NOT_AVAILABLE, } if err := stsV1Rpc.Call("StatSV1.GetQueueStringMetrics", &utils.TenantID{Tenant: "cgrates.org", ID: expectedIDs[0]}, &metrics); err != nil { @@ -175,7 +177,8 @@ func testV1STSProcessEvent(t *testing.T) { utils.ACCOUNT: "1001", utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), utils.USAGE: time.Duration(135 * time.Second), - utils.COST: 123.0}} + utils.COST: 123.0, + utils.PDD: time.Duration(12 * time.Second)}} if err := stsV1Rpc.Call("StatSV1.ProcessEvent", &ev1, &reply); err != nil { t.Error(err) } else if reply != utils.OK { @@ -211,6 +214,7 @@ func testV1STSProcessEvent(t *testing.T) { utils.MetaACC: "61.5", utils.MetaTCD: "3m0s", utils.MetaTCC: "123", + utils.MetaPDD: "4s", } var metrics map[string]string if err := stsV1Rpc.Call("StatSV1.GetQueueStringMetrics", &utils.TenantID{Tenant: "cgrates.org", ID: "STATS_1"}, &metrics); err != nil { @@ -220,6 +224,48 @@ func testV1STSProcessEvent(t *testing.T) { } } +func testV1STSGetStatsAfterRestart(t *testing.T) { + expectedMetrics := map[string]string{ + utils.MetaASR: "66.66667%", + utils.MetaACD: "1m30s", + utils.MetaACC: "61.5", + utils.MetaTCD: "3m0s", + utils.MetaTCC: "123", + utils.MetaPDD: "4s", + } + var metrics map[string]string + //get stats metrics before restart + if err := stsV1Rpc.Call("StatSV1.GetQueueStringMetrics", &utils.TenantID{Tenant: "cgrates.org", ID: "STATS_1"}, &metrics); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expectedMetrics, metrics) { + t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics) + } + if _, err := engine.StopStartEngine(stsV1CfgPath, statsDelay); err != nil { + t.Fatal(err) + } + var err error + stsV1Rpc, err = jsonrpc.Dial("tcp", stsV1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } + //get stats metrics after restart + expectedMetrics2 := map[string]string{ + utils.MetaASR: "66.66667%", + utils.MetaACD: "1m30s", + utils.MetaACC: "61.5", + utils.MetaTCD: "3m0s", + utils.MetaTCC: "123", + utils.MetaPDD: "4s", + } + var metrics2 map[string]string + if err := stsV1Rpc.Call("StatSV1.GetQueueStringMetrics", &utils.TenantID{Tenant: "cgrates.org", ID: "STATS_1"}, &metrics2); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expectedMetrics2, metrics2) { + t.Errorf("expecting: %+v, received reply: %s", expectedMetrics2, metrics2) + } + time.Sleep(time.Duration(1 * time.Second)) +} + func testV1STSSetStatQueueProfile(t *testing.T) { var reply *engine.StatQueueProfile if err := stsV1Rpc.Call("ApierV1.GetStatQueueProfile", diff --git a/apier/v1/tpstats_it_test.go b/apier/v1/tpstats_it_test.go index b894dd76b..2b04d26b5 100644 --- a/apier/v1/tpstats_it_test.go +++ b/apier/v1/tpstats_it_test.go @@ -127,9 +127,9 @@ func testTPStatsGetTPStatBeforeSet(t *testing.T) { func testTPStatsSetTPStat(t *testing.T) { tpStat = &utils.TPStats{ - Tenant: "Tester", - TPid: "TPS1", - ID: "Stat1", + Tenant: "cgrates.org", + TPid: "TPS1", + ID: "Stat1", Filters: []*utils.TPRequestFilter{ &utils.TPRequestFilter{ Type: "*string", diff --git a/data/tariffplans/testtp/Stats.csv b/data/tariffplans/testtp/Stats.csv index 8cc037dab..20fd89d20 100755 --- a/data/tariffplans/testtp/Stats.csv +++ b/data/tariffplans/testtp/Stats.csv @@ -1,2 +1,2 @@ #Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],QueueLength[6],TTL[7],Metrics[8],Blocker[9],Stored[10],Weight[11],Thresholds[12] -cgrates.org,Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc;*tcd;*tcc,true,true,20,THRESH1;THRESH2 +cgrates.org,Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc;*tcd;*tcc;*pdd,true,true,20,THRESH1;THRESH2 diff --git a/data/tariffplans/testtp/Thresholds.csv b/data/tariffplans/testtp/Thresholds.csv index 44241c16f..9e629d669 100644 --- a/data/tariffplans/testtp/Thresholds.csv +++ b/data/tariffplans/testtp/Thresholds.csv @@ -1,2 +1,2 @@ -#Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],ThresholdType[5],ThresholdValue[6],MinItems[7],Recurrent[8],MinSleep[9],Blocker[10],Stored[11],Weight[12],ActionIDs[13] -Threshold1,*string,Account,1001;1002,2014-07-29T15:00:00Z,,1.2,10,true,1s,true,true,10,THRESH1;THRESH2 \ No newline at end of file +#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],MinItems[6],Recurrent[7],MinSleep[8],Blocker[9],Weight[10],ActionIDs[11] +cgrates.org,Threshold1,*string,Account,1001;1002,2014-07-29T15:00:00Z,10,true,1s,true,10,THRESH1;THRESH2 diff --git a/data/tariffplans/tutorial/Stats.csv b/data/tariffplans/tutorial/Stats.csv index 0c0b64a49..dabce2a2b 100755 --- a/data/tariffplans/tutorial/Stats.csv +++ b/data/tariffplans/tutorial/Stats.csv @@ -1,2 +1,3 @@ #Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],QueueLength[6],TTL[7],Metrics[8],Blocker[9],Stored[10],Weight[11],MinItems[12],Thresholds[13] -cgrates.org,STATS_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd,true,true,20,THRESH1;THRESH2 +cgrates.org,STATS_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd;*pdd,true,true,20,THRESH1;THRESH2 + diff --git a/data/tariffplans/tutorial/Thresholds.csv b/data/tariffplans/tutorial/Thresholds.csv index 0f7b77e1c..374a0342e 100644 --- a/data/tariffplans/tutorial/Thresholds.csv +++ b/data/tariffplans/tutorial/Thresholds.csv @@ -1,2 +1,2 @@ -#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],Recurrent[6],MinSleep[7],Blocker[8],Stored[9],Weight[10],ActionIDs[11] -Threshold1,*string,Account,1001;1002,2014-07-29T15:00:00Z,,1.2,10,true,1s,true,true,10,THRESH1;THRESH2 \ No newline at end of file +#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],MinItems[6],Recurrent[7],MinSleep[8],Blocker[9],Weight[10],MinItems[11],ActionIDs[12] +cgrates.org,Threshold1,*string,Account,1001;1002,2014-07-29T15:00:00Z,10,true,1s,true,10,THRESH1;THRESH2 diff --git a/engine/libstats.go b/engine/libstats.go index fbef8395f..bd61ab64c 100755 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -128,6 +128,22 @@ func (se StatEvent) Pdd(timezone string) (pdd time.Duration, err error) { return utils.ParseDurationWithSecs(pddStr) } +// Destination returns the Destination of StatEvent +func (se StatEvent) Destination(timezone string) (ddc string, err error) { + ddcIf, has := se.Fields[utils.DESTINATION] + if !has { + return ddc, utils.ErrNotFound + } + if ddcInt, canCast := ddcIf.(int64); canCast { + return strconv.FormatInt(ddcInt, 64), nil + } + ddcStr, canCast := ddcIf.(string) + if !canCast { + return ddc, errors.New("cannot cast to string") + } + return ddcStr, nil +} + // NewStoredStatQueue initiates a StoredStatQueue out of StatQueue func NewStoredStatQueue(sq *StatQueue, ms Marshaler) (sSQ *StoredStatQueue, err error) { sSQ = &StoredStatQueue{ diff --git a/engine/libtest.go b/engine/libtest.go index b04ec85e3..0ede3e3a6 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -43,7 +43,7 @@ func InitDataDb(cfg *config.CGRConfig) error { } dataDB.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) // Write version before starting - if err := CheckVersions(dataDB); err != nil { + if err := SetDBVersions(dataDB); err != nil { return err } @@ -59,6 +59,10 @@ func InitStorDb(cfg *config.CGRConfig) error { if err := storDb.Flush(path.Join(cfg.DataFolderPath, "storage", cfg.StorDBType)); err != nil { return err } + // Write version before starting + if err := SetDBVersions(storDb); err != nil { + return err + } return nil } diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index 0f860dc56..471030ccf 100755 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -276,10 +276,10 @@ cgrates.org,ResGroup22,*destinations,HdrDestination,DST_FS,2014-07-29T15:00:00Z, #Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],QueueLength[6],TTL[7],Metrics[8],Blocker[9],Stored[10],Weight[11],Thresholds[12] cgrates.org,Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd,true,true,20,THRESH1;THRESH2 ` - thresholds = ` -#Id[0],FilterType[1],FilterFieldName[2],FilterFieldValues[3],ActivationInterval[4],ThresholdType[5],ThresholdValue[6],MinItems[7],Recurrent[8],MinSleep[9],Blocker[10],Stored[11],Weight[12],ActionIDs[13] -Threshold1,*string,Account,1001;1002,2014-07-29T15:00:00Z,,1.2,10,true,1s,true,true,10, + thresholds = ` +#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],MinItems[6],Recurrent[7],MinSleep[8],Blocker[9],Weight[10],ActionIDs[11] +cgrates.org,Threshold1,*string,Account,1001;1002,2014-07-29T15:00:00Z,10,true,1s,true,10,THRESH1;THRESH2 ` ) @@ -1470,29 +1470,30 @@ func TestLoadStats(t *testing.T) { } func TestLoadThresholds(t *testing.T) { - eThresholds := map[string]*utils.TPThreshold{ - "Threshold1": &utils.TPThreshold{ - TPid: testTPID, - ID: "Threshold1", - Filters: []*utils.TPRequestFilter{ - &utils.TPRequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}}, + eThresholds := map[string]map[string]*utils.TPThreshold{ + "cgrates.org": map[string]*utils.TPThreshold{ + "Threshold1": &utils.TPThreshold{ + TPid: testTPID, + Tenant: "cgrates.org", + ID: "Threshold1", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}}, + }, + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + }, + MinItems: 10, + Recurrent: true, + MinSleep: "1s", + Blocker: true, + Weight: 10, + ActionIDs: []string{"THRESH1", "THRESH2"}, }, - ActivationInterval: &utils.TPActivationInterval{ - ActivationTime: "2014-07-29T15:00:00Z", - }, - ThresholdType: "", - ThresholdValue: 1.2, - MinItems: 10, - Recurrent: true, - MinSleep: "1s", - Blocker: true, - Stored: true, - Weight: 10, }, } - if len(csvr.thresholds) != len(eThresholds) { - t.Error("Failed to load thresholds: ", len(csvr.thresholds)) - } else if !reflect.DeepEqual(eThresholds["Threshold1"], csvr.thresholds["Threshold1"]) { - t.Errorf("Expecting: %+v, received: %+v", eThresholds["Threshold1"], csvr.thresholds["Threshold1"]) + if len(csvr.thProfiles["cgrates.org"]) != len(eThresholds["cgrates.org"]) { + t.Error("Failed to load thresholds: ", len(csvr.thProfiles)) + } else if !reflect.DeepEqual(eThresholds["cgrates.org"]["Threshold1"], csvr.thProfiles["cgrates.org"]["Threshold1"]) { + t.Errorf("Expecting: %+v, received: %+v", eThresholds["cgrates.org"]["Threshold1"], csvr.thProfiles["cgrates.org"]["Threshold1"]) } } diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index 4f0e07161..a74385760 100755 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -340,17 +340,19 @@ func TestLoaderITWriteToDatabase(t *testing.T) { } } - for k, th := range loader.thresholds { - rcv, err := loader.dataStorage.GetThresholdProfile("", k, true, utils.NonTransactional) - if err != nil { - t.Error("Failed GetThresholdProfile: ", err.Error()) - } - sts, err := APItoThresholdProfile(th, "UTC") - if err != nil { - t.Error(err) - } - if !reflect.DeepEqual(sts, rcv) { - t.Errorf("Expecting: %v, received: %v", sts, rcv) + for _, mpIDs := range loader.thProfiles { + for _, th := range mpIDs { + rcv, err := loader.dataStorage.GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional) + if err != nil { + t.Errorf("Failed GetThresholdProfile, tenant: %s, id: %s, error: %s ", th.Tenant, th.ID, err.Error()) + } + sts, err := APItoThresholdProfile(th, "UTC") + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(sts, rcv) { + t.Errorf("Expecting: %v, received: %v", sts, rcv) + } } } } diff --git a/engine/model_helpers.go b/engine/model_helpers.go index e8181de6c..235b98ea9 100755 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -2127,19 +2127,13 @@ func (tps TpThresholdS) AsTPThreshold() (result []*utils.TPThreshold) { if !found { th = &utils.TPThreshold{ TPid: tp.Tpid, + Tenant: tp.Tenant, ID: tp.Tag, Blocker: tp.Blocker, - Stored: tp.Stored, Recurrent: tp.Recurrent, MinSleep: tp.MinSleep, } } - if tp.ThresholdValue != 0 { - th.ThresholdValue = tp.ThresholdValue - } - if tp.ThresholdType != "" { - th.ThresholdType = tp.ThresholdType - } if tp.ActionIDs != "" { th.ActionIDs = append(th.ActionIDs, strings.Split(tp.ActionIDs, utils.INFIELD_SEP)...) } @@ -2187,12 +2181,9 @@ func APItoModelTPThreshold(th *utils.TPThreshold) (mdls TpThresholdS) { } if i == 0 { mdl.Blocker = th.Blocker - mdl.Stored = th.Stored mdl.Weight = th.Weight mdl.MinItems = th.MinItems mdl.Recurrent = th.Recurrent - mdl.ThresholdType = th.ThresholdType - mdl.ThresholdValue = th.ThresholdValue mdl.MinSleep = th.MinSleep if th.ActivationInterval != nil { if th.ActivationInterval.ActivationTime != "" { @@ -2227,12 +2218,12 @@ func APItoModelTPThreshold(th *utils.TPThreshold) (mdls TpThresholdS) { func APItoThresholdProfile(tpTH *utils.TPThreshold, timezone string) (th *ThresholdProfile, err error) { th = &ThresholdProfile{ + Tenant: tpTH.Tenant, ID: tpTH.ID, MinItems: tpTH.MinItems, Recurrent: tpTH.Recurrent, Weight: tpTH.Weight, Blocker: tpTH.Blocker, - Stored: tpTH.Stored, Filters: make([]*RequestFilter, len(tpTH.Filters)), } if tpTH.MinSleep != "" { diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index 6591ea6e1..d126748b7 100755 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -943,9 +943,6 @@ func TestAsTPThresholdAsAsTPThreshold(t *testing.T) { MinItems: 100, Recurrent: false, MinSleep: "1s", - ThresholdType: "", - ThresholdValue: 1.2, - Stored: false, Blocker: false, Weight: 20.0, ActionIDs: "WARN3", @@ -965,15 +962,12 @@ func TestAsTPThresholdAsAsTPThreshold(t *testing.T) { ActivationInterval: &utils.TPActivationInterval{ ActivationTime: tps[0].ActivationInterval, }, - MinItems: tps[0].MinItems, - MinSleep: tps[0].MinSleep, - ThresholdType: tps[0].ThresholdType, - ThresholdValue: tps[0].ThresholdValue, - Recurrent: tps[0].Recurrent, - Stored: tps[0].Stored, - Blocker: tps[0].Blocker, - Weight: tps[0].Weight, - ActionIDs: []string{"WARN3"}, + MinItems: tps[0].MinItems, + MinSleep: tps[0].MinSleep, + Recurrent: tps[0].Recurrent, + Blocker: tps[0].Blocker, + Weight: tps[0].Weight, + ActionIDs: []string{"WARN3"}, }, } rcvTPs := TpThresholdS(tps).AsTPThreshold() @@ -993,9 +987,6 @@ func TestAPItoTPThreshold(t *testing.T) { MinItems: 100, Recurrent: false, MinSleep: "1s", - ThresholdType: "", - ThresholdValue: 1.2, - Stored: false, Blocker: false, Weight: 20.0, ActionIDs: []string{"WARN3"}, @@ -1006,7 +997,6 @@ func TestAPItoTPThreshold(t *testing.T) { Filters: make([]*RequestFilter, len(tps.Filters)), MinItems: tps.MinItems, Recurrent: tps.Recurrent, - Stored: tps.Stored, Blocker: tps.Blocker, Weight: tps.Weight, ActionIDs: []string{"WARN3"}, diff --git a/engine/models.go b/engine/models.go index 0031f11f8..78ed32857 100755 --- a/engine/models.go +++ b/engine/models.go @@ -502,19 +502,17 @@ type TpStats struct { type TpThreshold struct { ID int64 Tpid string - Tag string `index:"0" re:""` - FilterType string `index:"1" re:"^\*[A-Za-z].*"` - FilterFieldName string `index:"2" re:""` - FilterFieldValues string `index:"3" re:""` - ActivationInterval string `index:"4" re:""` - ThresholdType string `index:"5" re:""` - ThresholdValue float64 `index:"6" re:"\d+\.?\d*"` - MinItems int `index:"7" re:""` - Recurrent bool `index:"8" re:""` - MinSleep string `index:"9" re:""` - Blocker bool `index:"10" re:""` - Stored bool `index:"11" re:""` - Weight float64 `index:"12" re:"\d+\.?\d*"` - ActionIDs string `index:"13" re:""` + Tenant string `index:"0" re:""` + Tag string `index:"1" re:""` + FilterType string `index:"2" re:"^\*[A-Za-z].*"` + FilterFieldName string `index:"3" re:""` + FilterFieldValues string `index:"4" re:""` + ActivationInterval string `index:"5" re:""` + MinItems int `index:"6" re:""` + Recurrent bool `index:"7" re:""` + MinSleep string `index:"8" re:""` + Blocker bool `index:"9" re:""` + Weight float64 `index:"10" re:"\d+\.?\d*"` + ActionIDs string `index:"11" re:""` CreatedAt time.Time } diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index e6a26183c..3d6c3c6ea 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -43,6 +43,7 @@ var ( // subtests to be executed for each confDIR var sTestsOnStorIT = []func(t *testing.T){ testOnStorITFlush, + testOnStorITIsDBEmpty, testOnStorITSetGetDerivedCharges, testOnStorITSetReqFilterIndexes, testOnStorITGetReqFilterIndexes, @@ -135,6 +136,15 @@ func testOnStorITFlush(t *testing.T) { } cache.Flush() } +func testOnStorITIsDBEmpty(t *testing.T) { + test, err := onStor.IsDBEmpty() + if err != nil { + t.Error(err) + } else if test != true { + t.Errorf("\nExpecting: true got :%+v", test) + } + +} func testOnStorITSetGetDerivedCharges(t *testing.T) { keyCharger1 := utils.ConcatenatedKey("*out", "cgrates.org", "call", "dan", "dan") @@ -2040,6 +2050,7 @@ func testOnStorITCRUDStoredStatQueue(t *testing.T) { func testOnStorITCRUDThresholdProfile(t *testing.T) { timeMinSleep := time.Duration(0 * time.Second) th := &ThresholdProfile{ + Tenant: "cgrates.org", ID: "test", ActivationInterval: &utils.ActivationInterval{}, Filters: []*RequestFilter{}, @@ -2047,7 +2058,6 @@ func testOnStorITCRUDThresholdProfile(t *testing.T) { Recurrent: true, MinSleep: timeMinSleep, Blocker: true, - Stored: true, Weight: 1.4, ActionIDs: []string{}, } diff --git a/engine/statmetrics.go b/engine/statmetrics.go index 1f9b6440d..93990488b 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -36,6 +36,7 @@ func NewStatMetric(metricID string) (sm StatMetric, err error) { utils.MetaACC: NewACC, utils.MetaTCC: NewTCC, utils.MetaPDD: NewPDD, + utils.MetaDDC: NewDCC, } if _, has := metrics[metricID]; !has { return nil, fmt.Errorf("unsupported metric: %s", metricID) @@ -549,3 +550,72 @@ func (pdd *StatPDD) Marshal(ms Marshaler) (marshaled []byte, err error) { func (pdd *StatPDD) LoadMarshaled(ms Marshaler, marshaled []byte) (err error) { return ms.Unmarshal(marshaled, pdd) } + +func NewDCC() (StatMetric, error) { + return &StatDDC{Destinations: make(map[string]utils.StringMap), EventDestinations: make(map[string]string)}, nil +} + +type StatDDC struct { + Destinations map[string]utils.StringMap + EventDestinations map[string]string // map[EventTenantID]Destination +} + +func (ddc *StatDDC) GetStringValue(fmtOpts string) (val string) { + if len(ddc.Destinations) == 0 { + return utils.NOT_AVAILABLE + } + return fmt.Sprintf("%+v", len(ddc.Destinations)) +} + +func (ddc *StatDDC) GetValue() (v interface{}) { + return len(ddc.Destinations) +} + +func (ddc *StatDDC) GetFloat64Value() (v float64) { + if len(ddc.Destinations) == 0 { + return -1.0 + } + return float64(len(ddc.Destinations)) +} + +func (ddc *StatDDC) AddEvent(ev *StatEvent) (err error) { + var dest string + if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil && + err != utils.ErrNotFound { + return err + } else if !at.IsZero() { + if destination, err := ev.Destination(config.CgrConfig().DefaultTimezone); err != nil { + return err + } else { + dest = destination + if _, has := ddc.Destinations[dest]; !has { + ddc.Destinations[dest] = make(map[string]bool) + } + ddc.Destinations[dest][ev.TenantID()] = true + } + } + ddc.EventDestinations[ev.TenantID()] = dest + return +} + +func (ddc *StatDDC) RemEvent(evTenantID string) (err error) { + destination, has := ddc.EventDestinations[evTenantID] + if !has { + return utils.ErrNotFound + } + if len(ddc.Destinations[destination]) == 1 { + delete(ddc.Destinations, destination) + } else { + delete(ddc.Destinations[destination], evTenantID) + } + + delete(ddc.EventDestinations, evTenantID) + return +} + +func (ddc *StatDDC) Marshal(ms Marshaler) (marshaled []byte, err error) { + return ms.Marshal(DDC) +} +func (ddc *StatDDC) LoadMarshaled(ms Marshaler, marshaled []byte) (err error) { + return ms.Unmarshal(marshaled, ddc) +} diff --git a/engine/statmetrics_test.go b/engine/statmetrics_test.go index 8e96c4c0f..88bdb9896 100644 --- a/engine/statmetrics_test.go +++ b/engine/statmetrics_test.go @@ -832,3 +832,103 @@ func TestPDDGetValue(t *testing.T) { t.Errorf("wrong pdd value: %+v", v) } } + +func TestDDCGetStringValue(t *testing.T) { + ddc, _ := NewDCC() + ev := &StatEvent{Tenant: "cgrates.org", ID: "EVENT_1", + Fields: map[string]interface{}{ + "AnswerTime": time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.DESTINATION: "1002"}} + if strVal := ddc.GetStringValue(""); strVal != utils.NOT_AVAILABLE { + t.Errorf("wrong ddc value: %s", strVal) + } + + ddc.AddEvent(ev) + if strVal := ddc.GetStringValue(""); strVal != "1" { + t.Errorf("wrong ddc value: %s", strVal) + } + ev2 := &StatEvent{Tenant: "cgrates.org", ID: "EVENT_2", + Fields: map[string]interface{}{ + "AnswerTime": time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.DESTINATION: "1002"}} + + ev3 := &StatEvent{Tenant: "cgrates.org", ID: "EVENT_3", + Fields: map[string]interface{}{ + "AnswerTime": time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.DESTINATION: "1001"}} + ddc.AddEvent(ev2) + ddc.AddEvent(ev3) + if strVal := ddc.GetStringValue(""); strVal != "2" { + t.Errorf("wrong ddc value: %s", strVal) + } + ddc.RemEvent(ev.TenantID()) + if strVal := ddc.GetStringValue(""); strVal != "2" { + t.Errorf("wrong ddc value: %s", strVal) + } + ddc.RemEvent(ev2.TenantID()) + if strVal := ddc.GetStringValue(""); strVal != "1" { + t.Errorf("wrong ddc value: %s", strVal) + } + ddc.RemEvent(ev3.TenantID()) + if strVal := ddc.GetStringValue(""); strVal != utils.NOT_AVAILABLE { + t.Errorf("wrong ddc value: %s", strVal) + } +} + +func TestDDCGetFloat64Value(t *testing.T) { + ddc, _ := NewDCC() + ev := &StatEvent{Tenant: "cgrates.org", ID: "EVENT_1", + Fields: map[string]interface{}{ + "AnswerTime": time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + "Usage": time.Duration(10 * time.Second), + utils.PDD: time.Duration(5 * time.Second), + utils.DESTINATION: "1002"}} + ddc.AddEvent(ev) + if v := ddc.GetFloat64Value(); v != 1 { + t.Errorf("wrong ddc value: %v", v) + } + ev2 := &StatEvent{Tenant: "cgrates.org", ID: "EVENT_2"} + ddc.AddEvent(ev2) + if v := ddc.GetFloat64Value(); v != 1 { + t.Errorf("wrong ddc value: %v", v) + } + ev4 := &StatEvent{Tenant: "cgrates.org", ID: "EVENT_4", + Fields: map[string]interface{}{ + "Usage": time.Duration(1 * time.Minute), + "AnswerTime": time.Date(2015, 7, 14, 14, 25, 0, 0, time.UTC), + utils.PDD: time.Duration(10 * time.Second), + utils.DESTINATION: "1001", + }, + } + ev5 := &StatEvent{Tenant: "cgrates.org", ID: "EVENT_5", + Fields: map[string]interface{}{ + "Usage": time.Duration(1*time.Minute + 30*time.Second), + "AnswerTime": time.Date(2015, 7, 14, 14, 25, 0, 0, time.UTC), + utils.DESTINATION: "1003", + }, + } + ddc.AddEvent(ev4) + if strVal := ddc.GetFloat64Value(); strVal != 2 { + t.Errorf("wrong ddc value: %v", strVal) + } + ddc.AddEvent(ev5) + if strVal := ddc.GetFloat64Value(); strVal != 3 { + t.Errorf("wrong ddc value: %v", strVal) + } + ddc.RemEvent(ev2.TenantID()) + if strVal := ddc.GetFloat64Value(); strVal != 3 { + t.Errorf("wrong pdd value: %v", strVal) + } + ddc.RemEvent(ev4.TenantID()) + if strVal := ddc.GetFloat64Value(); strVal != 2 { + t.Errorf("wrong ddc value: %v", strVal) + } + ddc.RemEvent(ev.TenantID()) + if strVal := ddc.GetFloat64Value(); strVal != 1 { + t.Errorf("wrong ddc value: %v", strVal) + } + ddc.RemEvent(ev5.TenantID()) + if strVal := ddc.GetFloat64Value(); strVal != -1.0 { + t.Errorf("wrong ddc value: %v", strVal) + } +} diff --git a/engine/storage_interface.go b/engine/storage_interface.go index d6c459a89..4c4fc8028 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -40,6 +40,7 @@ type Storage interface { RemoveVersions(vrs Versions) (err error) SelectDatabase(dbName string) (err error) GetStorageType() string + IsDBEmpty() (resp bool, err error) } // OnlineStorage contains methods to use for administering online data diff --git a/engine/storage_map.go b/engine/storage_map.go index 20e094e92..92e30e05e 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -292,6 +292,12 @@ func (ms *MapStorage) CacheDataFromDB(prefix string, IDs []string, mustBeCached return } +func (ms *MapStorage) IsDBEmpty() (resp bool, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + return len(ms.dict) == 0, nil +} + func (ms *MapStorage) GetKeysForPrefix(prefix string) ([]string, error) { ms.mu.RLock() defer ms.mu.RUnlock() @@ -311,7 +317,7 @@ func (ms *MapStorage) HasData(categ, subject string) (bool, error) { switch categ { case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, - utils.ResourcesPrefix, utils.StatQueuePrefix: + utils.ResourcesPrefix, utils.StatQueuePrefix, utils.ThresholdProfilePrefix: _, exists := ms.dict[categ+subject] return exists, nil } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index f085ed716..bcdca5bcc 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -22,16 +22,15 @@ import ( "bytes" "compress/zlib" "fmt" - "io/ioutil" - "strings" - "time" - "github.com/cgrates/cgrates/cache" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" + "io/ioutil" + "strings" + "time" ) const ( @@ -535,6 +534,9 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached _, err = ms.GetResource(tntID.Tenant, tntID.ID, true, utils.NonTransactional) case utils.TimingsPrefix: _, err = ms.GetTiming(dataID, true, utils.NonTransactional) + case utils.ThresholdProfilePrefix: + tntID := utils.NewTenantID(dataID) + _, err = ms.GetThresholdProfile(tntID.Tenant, tntID.ID, true, utils.NonTransactional) } if err != nil { return utils.NewCGRError(utils.MONGO, @@ -546,6 +548,18 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached return } +func (ms *MongoStorage) IsDBEmpty() (resp bool, err error) { + session := ms.session.Copy() + defer session.Close() + db := session.DB(ms.db) + + col, err := db.CollectionNames() + if err != nil { + return + } + return len(col) == 0, nil +} + func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err error) { var category, subject string keyLen := len(utils.DESTINATION_PREFIX) @@ -699,6 +713,9 @@ func (ms *MongoStorage) HasData(category, subject string) (has bool, err error) case utils.StatQueuePrefix: count, err = db.C(colRes).Find(bson.M{"id": subject}).Count() has = count > 0 + case utils.ThresholdProfilePrefix: + count, err = db.C(colTlds).Find(bson.M{"id": subject}).Count() + has = count > 0 default: err = fmt.Errorf("unsupported category in HasData: %s", category) } diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index da90e63b8..a934fa0dd 100755 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -1231,7 +1231,7 @@ func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) (err error) { } } if _, err = col.Upsert(bson.M{}, &vrs); err != nil { - return + return err } return diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 0f3923110..450d6f940 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -119,6 +119,18 @@ func (rs *RedisStorage) SelectDatabase(dbName string) (err error) { return rs.Cmd("SELECT", dbName).Err } +func (rs *RedisStorage) IsDBEmpty() (resp bool, err error) { + var keys []string + keys, err = rs.GetKeysForPrefix("") + if err != nil { + return + } + if len(keys) != 0 { + return false, nil + } + return true, nil +} + func (rs *RedisStorage) LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aaPlIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rpIDs, resIDs []string) (err error) { for key, ids := range map[string][]string{ @@ -323,7 +335,7 @@ func (rs *RedisStorage) HasData(category, subject string) (bool, error) { switch category { case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, - utils.ResourcesPrefix, utils.StatQueuePrefix: + utils.ResourcesPrefix, utils.StatQueuePrefix, utils.ThresholdProfilePrefix: i, err := rs.Cmd("EXISTS", category+subject).Int() return i == 1, err } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 80398a7f1..6c680fedb 100755 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -60,6 +60,11 @@ func (self *SQLStorage) Flush(scriptsPath string) (err error) { if _, err := self.Db.Query(fmt.Sprintf("SELECT 1 FROM %s", utils.TBLCDRs)); err != nil { return err } + + if err = SetDBVersions(self); err != nil { + return err + } + return nil } @@ -97,6 +102,21 @@ func (self *SQLStorage) CreateTablesFromScript(scriptPath string) error { return nil } +func (self *SQLStorage) IsDBEmpty() (resp bool, err error) { + tbls := []string{utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPRates, + utils.TBLTPDestinationRates, utils.TBLTPRatingPlans, utils.TBLTPRateProfiles, + utils.TBLTPSharedGroups, utils.TBLTPCdrStats, utils.TBLTPLcrs, utils.TBLTPActions, + utils.TBLTPActionPlans, utils.TBLTPActionTriggers, utils.TBLTPAccountActions, + utils.TBLTPDerivedChargers, utils.TBLTPAliases, utils.TBLTPUsers, utils.TBLTPResources, utils.TBLTPStats} + for _, tbl := range tbls { + if self.db.HasTable(tbl) { + return false, nil + } + + } + return true, nil +} + // Return a list with all TPids defined in the system, even if incomplete, isolated in some table. func (self *SQLStorage) GetTpIds() ([]string, error) { rows, err := self.Db.Query( diff --git a/engine/stordb_it_test.go b/engine/stordb_it_test.go index b2aae4f4b..91fecce2f 100755 --- a/engine/stordb_it_test.go +++ b/engine/stordb_it_test.go @@ -39,6 +39,7 @@ var ( // subtests to be executed for each confDIR var sTestsStorDBit = []func(t *testing.T){ testStorDBitFlush, + testStorDBitIsDBEmpty, testStorDBitCRUDVersions, testStorDBitCRUDTpTimings, testStorDBitCRUDTpDestinations, @@ -115,6 +116,25 @@ func TestStorDBitMongo(t *testing.T) { } } } +func testStorDBitIsDBEmpty(t *testing.T) { + x := storDB.GetStorageType() + switch x { + case utils.MONGO: + test, err := storDB.IsDBEmpty() + if err != nil { + t.Error(err) + } else if test != true { + t.Errorf("\nExpecting: true got :%+v", test) + } + case utils.POSTGRES, utils.MYSQL: + test, err := storDB.IsDBEmpty() + if err != nil { + t.Error(err) + } else if test != false { + t.Errorf("\nExpecting: false got :%+v", test) + } + } +} func testStorDBitCRUDTpTimings(t *testing.T) { // READ @@ -1534,7 +1554,7 @@ func testStorDBitCRUDTpResources(t *testing.T) { func testStorDBitCRUDTpStats(t *testing.T) { // READ - if _, err := storDB.GetTPStats("testTPid", ""); err != utils.ErrNotFound { + if _, err := storDB.GetTPStats("TEST_TPID", ""); err != utils.ErrNotFound { t.Error(err) } //WRITE @@ -1611,11 +1631,11 @@ func testStorDBitCRUDTpStats(t *testing.T) { } // REMOVE - if err := storDB.RemTpData("", "testTPid", nil); err != nil { + if err := storDB.RemTpData(utils.TBLTPStats, "TEST_TPID", nil); err != nil { t.Error(err) } // READ - if _, err := storDB.GetTPStats("testTPid", ""); err != utils.ErrNotFound { + if _, err := storDB.GetTPStats("TEST_TPID", ""); err != utils.ErrNotFound { t.Error(err) } } diff --git a/engine/thresholds.go b/engine/thresholds.go index 0d3b773c7..5062a2f4d 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -32,8 +32,7 @@ type ThresholdProfile struct { MinItems int // number of items agregated for the threshold to match Recurrent bool MinSleep time.Duration - Blocker bool // blocker flag to stop processing on filters matched - Stored bool + Blocker bool // blocker flag to stop processing on filters matched Weight float64 // Weight to sort the thresholds ActionIDs []string } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index d4e5b2335..9b1f1329b 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -56,10 +56,10 @@ type TpReader struct { aliases map[string]*Alias resProfiles map[string]map[string]*utils.TPResource sqProfiles map[string]map[string]*utils.TPStats - thresholds map[string]*utils.TPThreshold + thProfiles map[string]map[string]*utils.TPThreshold resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles - + thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles revDests, revAliases, acntActionPlans map[string][]string @@ -133,7 +133,7 @@ func (tpr *TpReader) Init() { tpr.derivedChargers = make(map[string]*utils.DerivedChargers) tpr.resProfiles = make(map[string]map[string]*utils.TPResource) tpr.sqProfiles = make(map[string]map[string]*utils.TPStats) - tpr.thresholds = make(map[string]*utils.TPThreshold) + tpr.thProfiles = make(map[string]map[string]*utils.TPThreshold) tpr.revDests = make(map[string][]string) tpr.revAliases = make(map[string][]string) tpr.acntActionPlans = make(map[string][]string) @@ -1661,11 +1661,24 @@ func (tpr *TpReader) LoadThresholdsFiltered(tag string) error { if err != nil { return err } - mapTHs := make(map[string]*utils.TPThreshold) + mapTHs := make(map[string]map[string]*utils.TPThreshold) for _, th := range tps { - mapTHs[th.ID] = th + if _, has := mapTHs[th.Tenant]; !has { + mapTHs[th.Tenant] = make(map[string]*utils.TPThreshold) + } + mapTHs[th.Tenant][th.ID] = th + } + tpr.thProfiles = mapTHs + for tenant, mpID := range mapTHs { + for thID := range mpID { + thTntID := &utils.TenantID{tenant, thID} + if has, err := tpr.dataStorage.HasData(utils.ThresholdProfilePrefix, thTntID.TenantID()); err != nil { + return err + } else if !has { + tpr.thresholds = append(tpr.thresholds, thTntID) + } + } } - tpr.thresholds = mapTHs return nil } @@ -2031,16 +2044,18 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if verbose { log.Print("Thresholds:") } - for _, tpTH := range tpr.thresholds { - th, err := APItoThresholdProfile(tpTH, tpr.timezone) - if err != nil { - return err - } - if err = tpr.dataStorage.SetThresholdProfile(th); err != nil { - return err - } - if verbose { - log.Print("\t", th.ID) + for _, mpID := range tpr.thProfiles { + for _, tpTH := range mpID { + th, err := APItoThresholdProfile(tpTH, tpr.timezone) + if err != nil { + return err + } + if err = tpr.dataStorage.SetThresholdProfile(th); err != nil { + return err + } + if verbose { + log.Print("\t", th.TenantID()) + } } } if verbose { @@ -2127,26 +2142,28 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } } - if len(tpr.thresholds) > 0 { + if len(tpr.thProfiles) > 0 { if verbose { log.Print("Indexing thresholds") } - stIdxr, err := NewReqFilterIndexer(tpr.dataStorage, utils.ThresholdsIndex) - if err != nil { - return err - } - for _, tpTH := range tpr.thresholds { - if th, err := APItoThresholdProfile(tpTH, tpr.timezone); err != nil { + for tenant, mpID := range tpr.thProfiles { + stIdxr, err := NewReqFilterIndexer(tpr.dataStorage, utils.ThresholdsIndex+tenant) + if err != nil { + return err + } + for _, tpTH := range mpID { + if th, err := APItoThresholdProfile(tpTH, tpr.timezone); err != nil { + return err + } else { + stIdxr.IndexFilters(th.ID, th.Filters) + } + } + if verbose { + log.Printf("Indexed thresholds tenant: %s, keys %+v", tenant, stIdxr.ChangedKeys().Slice()) + } + if err := stIdxr.StoreIndexes(); err != nil { return err - } else { - stIdxr.IndexFilters(th.ID, th.Filters) } - } - if verbose { - log.Printf("Indexed Threshold keys: %+v", stIdxr.ChangedKeys().Slice()) - } - if err := stIdxr.StoreIndexes(); err != nil { - return err } } } @@ -2214,6 +2231,8 @@ func (tpr *TpReader) ShowStatistics() { log.Print("ResourceProfiles: ", len(tpr.resProfiles)) // stats log.Print("Stats: ", len(tpr.sqProfiles)) + // thresholds + log.Print("Thresholds: ", len(tpr.thProfiles)) } // Returns the identities loaded for a specific category, useful for cache reloads @@ -2355,10 +2374,10 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { i++ } return keys, nil - case utils.ThresholdsPrefix: - keys := make([]string, len(tpr.thresholds)) + case utils.ThresholdProfilePrefix: + keys := make([]string, len(tpr.thProfiles)) i := 0 - for k := range tpr.thresholds { + for k := range tpr.thProfiles { keys[i] = k i++ } diff --git a/engine/version.go b/engine/version.go index b5623f60d..980b2b27b 100644 --- a/engine/version.go +++ b/engine/version.go @@ -26,16 +26,21 @@ import ( func CheckVersions(storage Storage) error { // get current db version - if storage == nil { - storage = dataStorage - } storType := storage.GetStorageType() x := CurrentDBVersions(storType) dbVersion, err := storage.GetVersions(utils.TBLVersions) if err != nil { + empty, err := storage.IsDBEmpty() + if err != nil { + return err + } + if !empty { + msg := "Migration needed: please backup cgrates data and run : " + return errors.New(msg) + } // no data, write version - if err := storage.SetVersions(x, false); err != nil { - utils.Logger.Warning(fmt.Sprintf("Could not write current version to db: %v", err)) + if err := SetDBVersions(storage); err != nil { + return err } } else { @@ -50,6 +55,17 @@ func CheckVersions(storage Storage) error { return nil } +func SetDBVersions(storage Storage) error { + storType := storage.GetStorageType() + x := CurrentDBVersions(storType) + // no data, write version + if err := storage.SetVersions(x, false); err != nil { + utils.Logger.Warning(fmt.Sprintf("Could not write current version to db: %v", err)) + } + return nil + +} + func (vers Versions) Compare(curent Versions, storType string) string { var x map[string]string m := map[string]string{ @@ -73,9 +89,7 @@ func (vers Versions) Compare(curent Versions, storType string) string { switch storType { case utils.MONGO: x = m - case utils.POSTGRES: - x = stor - case utils.MYSQL: + case utils.POSTGRES, utils.MYSQL: x = stor case utils.REDIS: x = data @@ -91,17 +105,20 @@ func (vers Versions) Compare(curent Versions, storType string) string { } func CurrentDBVersions(storType string) Versions { + dataDbVersions := Versions{utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2} + storDbVersions := Versions{utils.COST_DETAILS: 2} + allVersions := dataDbVersions + for k, v := range storDbVersions { + allVersions[k] = v + } + switch storType { - case utils.MONGO: - return Versions{utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, utils.COST_DETAILS: 2} - case utils.POSTGRES: - return Versions{utils.COST_DETAILS: 2} - case utils.MYSQL: - return Versions{utils.COST_DETAILS: 2} + case utils.MONGO, utils.MAPSTOR: + return allVersions + case utils.POSTGRES, utils.MYSQL: + return storDbVersions case utils.REDIS: - return Versions{utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2} - case utils.MAPSTOR: - return Versions{utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, utils.COST_DETAILS: 2} + return dataDbVersions } return nil } diff --git a/engine/versions_it_test.go b/engine/versions_it_test.go index 603e57280..6c23fee00 100644 --- a/engine/versions_it_test.go +++ b/engine/versions_it_test.go @@ -38,7 +38,6 @@ var ( var sTestsITVersions = []func(t *testing.T){ testVersionsFlush, TestVersion, - testVersionsFlush, } func TestVersionsITMongoConnect(t *testing.T) { @@ -139,6 +138,13 @@ func testVersionsFlush(t *testing.T) { t.Error(err) } } + if err = SetDBVersions(dataDb); err != nil { + t.Error(err) + } + if err = SetDBVersions(storDB); err != nil { + t.Error(err) + } + } func TestVersion(t *testing.T) { @@ -147,15 +153,11 @@ func TestVersion(t *testing.T) { var testVersion Versions storType := dataDb.GetStorageType() switch storType { - case utils.MONGO: + case utils.MONGO, utils.MAPSTOR: currentVersion = Versions{utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, utils.COST_DETAILS: 2} testVersion = Versions{utils.Accounts: 1, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, utils.COST_DETAILS: 2} test = "Migration needed: please backup cgr data and run : " - case utils.POSTGRES: - currentVersion = CurrentStorDBVersions() - testVersion = Versions{utils.COST_DETAILS: 1} - test = "Migration needed: please backup cgr data and run : " - case utils.MYSQL: + case utils.POSTGRES, utils.MYSQL: currentVersion = CurrentStorDBVersions() testVersion = Versions{utils.COST_DETAILS: 1} test = "Migration needed: please backup cgr data and run : " @@ -163,15 +165,11 @@ func TestVersion(t *testing.T) { currentVersion = CurrentDataDBVersions() testVersion = Versions{utils.Accounts: 1, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2} test = "Migration needed: please backup cgr data and run : " - case utils.MAPSTOR: - currentVersion = Versions{utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, utils.COST_DETAILS: 2} - testVersion = Versions{utils.Accounts: 1, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, utils.COST_DETAILS: 2} - test = "Migration needed: please backup cgr data and run : " } //dataDB - if _, rcvErr := dataDb.GetVersions(utils.TBLVersions); rcvErr != utils.ErrNotFound { - t.Error(rcvErr) + if err := SetDBVersions(dataDb); err != nil { + t.Error(err) } if err := CheckVersions(dataDb); err != nil { t.Error(err) @@ -196,18 +194,17 @@ func TestVersion(t *testing.T) { if err = dataDb.RemoveVersions(testVersion); err != nil { t.Error(err) } + if err := SetDBVersions(dataDb); err != nil { + t.Error(err) + } storType = storDb.GetStorageType() switch storType { - case utils.MONGO: + case utils.MONGO, utils.MAPSTOR: currentVersion = Versions{utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, utils.COST_DETAILS: 2} testVersion = Versions{utils.Accounts: 1, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, utils.COST_DETAILS: 2} test = "Migration needed: please backup cgr data and run : " - case utils.POSTGRES: - currentVersion = CurrentStorDBVersions() - testVersion = Versions{utils.COST_DETAILS: 1} - test = "Migration needed: please backup cgr data and run : " - case utils.MYSQL: + case utils.POSTGRES, utils.MYSQL: currentVersion = CurrentStorDBVersions() testVersion = Versions{utils.COST_DETAILS: 1} test = "Migration needed: please backup cgr data and run : " @@ -215,18 +212,15 @@ func TestVersion(t *testing.T) { currentVersion = CurrentDataDBVersions() testVersion = Versions{utils.Accounts: 1, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2} test = "Migration needed: please backup cgr data and run : " - case utils.MAPSTOR: - currentVersion = Versions{utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, utils.COST_DETAILS: 2} - testVersion = Versions{utils.Accounts: 1, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, utils.COST_DETAILS: 2} - test = "Migration needed: please backup cgr data and run : " } //storDB - if _, rcvErr := storDb.GetVersions(utils.TBLVersions); rcvErr != utils.ErrNotFound { - t.Error(rcvErr) + if err := SetDBVersions(storDb); err != nil { + t.Error(err) } if err := CheckVersions(storDb); err != nil { t.Error(err) } + if rcv, err := storDb.GetVersions(utils.TBLVersions); err != nil { t.Error(err) } else if len(currentVersion) != len(rcv) { @@ -247,5 +241,8 @@ func TestVersion(t *testing.T) { if err = storDb.RemoveVersions(testVersion); err != nil { t.Error(err) } + if err := SetDBVersions(storDb); err != nil { + t.Error(err) + } } diff --git a/migrator/migrator_it_test.go b/migrator/migrator_it_test.go index 075f635e8..9f4a88445 100644 --- a/migrator/migrator_it_test.go +++ b/migrator/migrator_it_test.go @@ -172,6 +172,10 @@ func testFlush(t *testing.T) { t.Error("Error when flushing Mongo ", err.Error()) } } + if err = SetDBVersions(mig.dataDB); err != nil { + return err + } + } func testMigratorAccounts(t *testing.T) { diff --git a/utils/apitpdata.go b/utils/apitpdata.go index a24cd45ec..cf7e209a8 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1356,16 +1356,14 @@ type TPStats struct { type TPThreshold struct { TPid string + Tenant string ID string Filters []*TPRequestFilter // Filters for the request ActivationInterval *TPActivationInterval // Time when this limit becomes active and expires - ThresholdType string - ThresholdValue float64 // threshold value - MinItems int // number of items agregated for the threshold to match + MinItems int // number of items agregated for the threshold to match Recurrent bool MinSleep string - Blocker bool // blocker flag to stop processing on filters matched - Stored bool + Blocker bool // blocker flag to stop processing on filters matched Weight float64 // Weight to sort the thresholds ActionIDs []string }