mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added migration for *rsr filters and attribute inline
This commit is contained in:
@@ -152,9 +152,9 @@ func CurrentDataDBVersions() Versions {
|
||||
utils.SharedGroups: 2,
|
||||
utils.Thresholds: 3,
|
||||
utils.Routes: 1,
|
||||
utils.Attributes: 5,
|
||||
utils.Attributes: 6,
|
||||
utils.Timing: 1,
|
||||
utils.RQF: 4,
|
||||
utils.RQF: 5,
|
||||
utils.Resource: 1,
|
||||
utils.Subscribers: 1,
|
||||
utils.Destinations: 1,
|
||||
|
||||
@@ -171,26 +171,26 @@ func (m *Migrator) migrateAttributeProfile() (err error) {
|
||||
var v3Attr *v3AttributeProfile
|
||||
var v4Attr *v4AttributeProfile
|
||||
var v5Attr *engine.AttributeProfile
|
||||
var v6Attr *engine.AttributeProfile
|
||||
for {
|
||||
// One attribute profile at a time
|
||||
version := vrs[utils.Attributes]
|
||||
for {
|
||||
//Keep migrating until Attribute Profile reaches latest version
|
||||
switch version {
|
||||
default:
|
||||
return fmt.Errorf("Unsupported version %v", version)
|
||||
case current[utils.Attributes]:
|
||||
migrated = false
|
||||
if m.sameDataDB {
|
||||
migrated = false
|
||||
break
|
||||
}
|
||||
if err = m.migrateCurrentAttributeProfile(); err != nil {
|
||||
return err
|
||||
if err = m.migrateCurrentAttributeProfile(); err != nil { //generator like v1,2,3,4
|
||||
return
|
||||
}
|
||||
version = 5
|
||||
migrated = false
|
||||
break
|
||||
case 1: // Migrate from V1 to V4
|
||||
if v4Attr, err = m.migrateV1ToV4AttributeProfile(); err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
return
|
||||
} else if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
@@ -198,7 +198,7 @@ func (m *Migrator) migrateAttributeProfile() (err error) {
|
||||
version = 4
|
||||
case 2: // Migrate from V2 to V3 (fallthrough untill latest version)
|
||||
if v3Attr, err = m.migrateV2ToV3AttributeProfile(v2Attr); err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
return
|
||||
} else if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
@@ -206,7 +206,7 @@ func (m *Migrator) migrateAttributeProfile() (err error) {
|
||||
fallthrough
|
||||
case 3: // Migrate from V3 to V4
|
||||
if v4Attr, err = m.migrateV3ToV4AttributeProfile(v3Attr); err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
return
|
||||
} else if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
@@ -214,11 +214,19 @@ func (m *Migrator) migrateAttributeProfile() (err error) {
|
||||
fallthrough
|
||||
case 4: // Migrate from V4 to V5
|
||||
if v5Attr, err = m.migrateV4ToV5AttributeProfile(v4Attr); err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
return
|
||||
} else if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
version = 5
|
||||
fallthrough
|
||||
case 5:
|
||||
if v6Attr, err = m.migrateV5ToV6AttributeProfile(v5Attr); err != nil && err != utils.ErrNoMoreData {
|
||||
return
|
||||
} else if err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
version = 6
|
||||
}
|
||||
if version == current[utils.Attributes] || err == utils.ErrNoMoreData {
|
||||
break
|
||||
@@ -228,14 +236,14 @@ func (m *Migrator) migrateAttributeProfile() (err error) {
|
||||
break
|
||||
}
|
||||
|
||||
if !m.dryRun && migrated {
|
||||
if !m.dryRun {
|
||||
if vrs[utils.Attributes] == 1 {
|
||||
if err := m.dmOut.DataManager().DataDB().SetAttributeProfileDrv(v5Attr); err != nil {
|
||||
return err
|
||||
if err = m.dmOut.DataManager().DataDB().SetAttributeProfileDrv(v6Attr); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
// Set the fresh-migrated AttributeProfile into DB
|
||||
if err := m.dmOut.DataManager().SetAttributeProfile(v5Attr, true); err != nil {
|
||||
if err = m.dmOut.DataManager().SetAttributeProfile(v6Attr, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -247,7 +255,7 @@ func (m *Migrator) migrateAttributeProfile() (err error) {
|
||||
}
|
||||
// All done, update version with current one
|
||||
if err = m.setVersions(utils.Attributes); err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
return m.ensureIndexesDataDB(engine.ColAttr)
|
||||
|
||||
@@ -454,3 +462,16 @@ type v4AttributeProfile struct {
|
||||
Blocker bool // blocker flag to stop processing on multiple runs
|
||||
Weight float64
|
||||
}
|
||||
|
||||
func (m *Migrator) migrateV5ToV6AttributeProfile(v5Attr *engine.AttributeProfile) (_ *engine.AttributeProfile, err error) {
|
||||
if v5Attr == nil {
|
||||
// read data from DataDB
|
||||
if v5Attr, err = m.dmIN.getV5AttributeProfile(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if v5Attr.FilterIDs, err = migrateInlineFilterV4(v5Attr.FilterIDs); err != nil {
|
||||
return
|
||||
}
|
||||
return v5Attr, nil
|
||||
}
|
||||
|
||||
@@ -141,17 +141,7 @@ func derivedChargers2Charger(dc *v1DerivedCharger, tenant string, key string, fi
|
||||
if strings.HasPrefix(filter, utils.DynamicDataPrefix) {
|
||||
filter = filter[1:]
|
||||
}
|
||||
if !strings.HasSuffix(filter, utils.FilterValEnd) { // Has filter, populate the var
|
||||
return
|
||||
}
|
||||
fltrStart := strings.Index(filter, utils.FilterValStart)
|
||||
if fltrStart < 1 {
|
||||
return
|
||||
}
|
||||
fltrVal := filter[fltrStart+1 : len(filter)-1]
|
||||
ch.FilterIDs = append(ch.FilterIDs, "*rsr:"+utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep+
|
||||
filter[:fltrStart]+utils.InInFieldSep+
|
||||
strings.Replace(fltrVal, utils.ANDSep, utils.INFIELD_SEP, strings.Count(fltrVal, utils.ANDSep)))
|
||||
ch.FilterIDs = append(ch.FilterIDs, "*rsr::"+utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep+filter)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -135,7 +135,7 @@ func testDCITMigrateAndMove(t *testing.T) {
|
||||
Chargers: []*v1DerivedCharger{
|
||||
{
|
||||
RunID: "RunID",
|
||||
RunFilters: "~filterhdr1(a&b)",
|
||||
RunFilters: "~filterhdr1:s/(.+)/special_run3/",
|
||||
|
||||
RequestTypeField: utils.MetaDefault,
|
||||
CategoryField: utils.MetaDefault,
|
||||
@@ -176,7 +176,7 @@ func testDCITMigrateAndMove(t *testing.T) {
|
||||
FilterIDs: []string{
|
||||
"*destinations:~*req.Destination:1001;1002;1003",
|
||||
"*string:~*req.Account:1003",
|
||||
"*rsr:~*req.filterhdr1:a;b",
|
||||
"*rsr::~*req.filterhdr1:s/(.+)/special_run3/",
|
||||
},
|
||||
ActivationInterval: nil,
|
||||
RunID: "RunID",
|
||||
|
||||
@@ -222,7 +222,7 @@ func TestDerivedChargers2Charger(t *testing.T) {
|
||||
FilterIDs: []string{
|
||||
"*string:~*req.Category:*voice1",
|
||||
"*string:~*req.Account:1001",
|
||||
"*rsr:~*req.Header4:s/a/${1}b/{*duration_seconds&*round:2}:b;c",
|
||||
"*rsr::~*req.Header4:s/a/${1}b/{*duration_seconds&*round:2}(b&c)",
|
||||
},
|
||||
ActivationInterval: nil,
|
||||
RunID: "runID",
|
||||
@@ -242,7 +242,7 @@ func TestDerivedChargers2Charger(t *testing.T) {
|
||||
Expected: &engine.ChargerProfile{
|
||||
Tenant: defaultTenant,
|
||||
ID: "key2",
|
||||
FilterIDs: []string{},
|
||||
FilterIDs: []string{"*rsr::~*req.1003"},
|
||||
ActivationInterval: nil,
|
||||
RunID: "runID2",
|
||||
AttributeIDs: make([]string, 0),
|
||||
|
||||
@@ -31,26 +31,27 @@ func (m *Migrator) migrateCurrentRequestFilter() (err error) {
|
||||
var ids []string
|
||||
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.FilterPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
for _, id := range ids {
|
||||
tntID := strings.SplitN(strings.TrimPrefix(id, utils.FilterPrefix), utils.InInFieldSep, 2)
|
||||
if len(tntID) < 2 {
|
||||
return fmt.Errorf("Invalid key <%s> when migrating filters", id)
|
||||
}
|
||||
fl, err := m.dmIN.DataManager().GetFilter(tntID[0], tntID[1], false, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return err
|
||||
var fl *engine.Filter
|
||||
if fl, err = m.dmIN.DataManager().GetFilter(tntID[0], tntID[1], false, false,
|
||||
utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
if m.dryRun || fl == nil {
|
||||
continue
|
||||
}
|
||||
if err := m.dmOut.DataManager().SetFilter(fl, true); err != nil {
|
||||
return err
|
||||
if err = m.dmOut.DataManager().SetFilter(fl, true); err != nil {
|
||||
return
|
||||
}
|
||||
if err := m.dmIN.DataManager().RemoveFilter(tntID[0], tntID[1],
|
||||
if err = m.dmIN.DataManager().RemoveFilter(tntID[0], tntID[1],
|
||||
utils.NonTransactional, true); err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
m.stats[utils.RQF]++
|
||||
}
|
||||
@@ -106,15 +107,21 @@ func migrateFilterV2(fl *v1Filter) (fltr *engine.Filter) {
|
||||
strings.HasPrefix(rule.FieldName, utils.DynamicDataPrefix+utils.MetaAct) {
|
||||
continue
|
||||
}
|
||||
// if rule.Type != utils.MetaRSR {
|
||||
// in case we found dynamic data prefix we remove it
|
||||
fl.Rules[i].FieldName = strings.TrimPrefix(fl.Rules[i].FieldName, utils.DynamicDataPrefix)
|
||||
fltr.Rules[i].Element = utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + rule.FieldName
|
||||
// } else {
|
||||
// for idx, val := range rule.Values {
|
||||
// fltr.Rules[i].Values[idx] = val
|
||||
// }
|
||||
// }
|
||||
if rule.Type != utils.MetaRSR {
|
||||
// in case we found dynamic data prefix we remove it
|
||||
if strings.HasPrefix(rule.FieldName, utils.DynamicDataPrefix) {
|
||||
fl.Rules[i].FieldName = fl.Rules[i].FieldName[1:]
|
||||
}
|
||||
fltr.Rules[i].Element = utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + rule.FieldName
|
||||
} else {
|
||||
for idx, val := range rule.Values {
|
||||
if strings.HasPrefix(val, utils.DynamicDataPrefix) {
|
||||
// remove dynamic data prefix from fieldName
|
||||
val = val[1:]
|
||||
}
|
||||
fltr.Rules[i].Values[idx] = utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + val
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -171,14 +178,18 @@ func migrateInlineFilterV2(fl string) string {
|
||||
return fl
|
||||
}
|
||||
|
||||
// if ruleSplt[0] != utils.MetaRSR {
|
||||
// remove dynamic data prefix from fieldName
|
||||
ruleSplt[1] = strings.TrimPrefix(ruleSplt[1], utils.DynamicDataPrefix)
|
||||
return fmt.Sprintf("%s:~%s:%s", ruleSplt[0], utils.MetaReq+utils.NestingSep+ruleSplt[1], strings.Join(ruleSplt[2:], utils.InInFieldSep))
|
||||
// } // in case of *rsr filter we need to add the prefix at fieldValue
|
||||
// remove dynamic data prefix from fieldName
|
||||
// ruleSplt[2] = strings.TrimPrefix(ruleSplt[2], utils.DynamicDataPrefix)
|
||||
// return fmt.Sprintf("%s::~%s", ruleSplt[0], utils.MetaReq+utils.NestingSep+strings.Join(ruleSplt[2:], utils.InInFieldSep))
|
||||
if ruleSplt[0] != utils.MetaRSR {
|
||||
if strings.HasPrefix(ruleSplt[1], utils.DynamicDataPrefix) {
|
||||
// remove dynamic data prefix from fieldName
|
||||
ruleSplt[1] = ruleSplt[1][1:]
|
||||
}
|
||||
return fmt.Sprintf("%s:~%s:%s", ruleSplt[0], utils.MetaReq+utils.NestingSep+ruleSplt[1], strings.Join(ruleSplt[2:], utils.InInFieldSep))
|
||||
} // in case of *rsr filter we need to add the prefix at fieldValue
|
||||
if strings.HasPrefix(ruleSplt[2], utils.DynamicDataPrefix) {
|
||||
// remove dynamic data prefix from fieldName
|
||||
ruleSplt[2] = ruleSplt[2][1:]
|
||||
}
|
||||
return fmt.Sprintf("%s::~%s", ruleSplt[0], utils.MetaReq+utils.NestingSep+strings.Join(ruleSplt[2:], utils.InInFieldSep))
|
||||
}
|
||||
|
||||
func (m *Migrator) migrateOthersv1() (err error) {
|
||||
@@ -282,75 +293,77 @@ func (m *Migrator) migrateFilters() (err error) {
|
||||
}
|
||||
migrated := true
|
||||
migratedFrom := 0
|
||||
var v4Fltr *engine.Filter
|
||||
var fltr *engine.Filter
|
||||
for {
|
||||
version := vrs[utils.RQF]
|
||||
migratedFrom = int(version)
|
||||
for {
|
||||
switch version {
|
||||
default:
|
||||
return fmt.Errorf("Unsupported version %v", version)
|
||||
case current[utils.RQF]:
|
||||
migrated = false
|
||||
if m.sameDataDB {
|
||||
break
|
||||
}
|
||||
if err = m.migrateCurrentRequestFilter(); err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
version = 4
|
||||
case 1:
|
||||
if fltr, err = m.migrateRequestFilterV1(); err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
if v4Fltr, err = m.migrateRequestFilterV1(); err != nil && err != utils.ErrNoMoreData {
|
||||
return
|
||||
}
|
||||
migratedFrom = 1
|
||||
version = 4
|
||||
case 2:
|
||||
if fltr, err = m.migrateRequestFilterV2(); err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
if v4Fltr, err = m.migrateRequestFilterV2(); err != nil && err != utils.ErrNoMoreData {
|
||||
return
|
||||
}
|
||||
migratedFrom = 2
|
||||
version = 4
|
||||
case 3:
|
||||
if fltr, err = m.migrateRequestFilterV3(); err != nil && err != utils.ErrNoMoreData {
|
||||
return err
|
||||
if v4Fltr, err = m.migrateRequestFilterV3(); err != nil && err != utils.ErrNoMoreData {
|
||||
return
|
||||
}
|
||||
migratedFrom = 3
|
||||
version = 4
|
||||
case 4: // in case we change the structure to the filters please update the geing method from this version
|
||||
if fltr, err = m.migrateRequestFilterV4(v4Fltr); err != nil && err != utils.ErrNoMoreData {
|
||||
return
|
||||
}
|
||||
version = 5
|
||||
}
|
||||
if version == current[utils.RQF] || err == utils.ErrNoMoreData {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err == utils.ErrNoMoreData || !migrated {
|
||||
err = nil
|
||||
break
|
||||
}
|
||||
if !m.dryRun && migrated {
|
||||
//set filters
|
||||
switch migratedFrom {
|
||||
case 1, 2, 3:
|
||||
if err := m.dmOut.DataManager().SetFilter(fltr, true); err != nil {
|
||||
return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration",
|
||||
err.Error(), fltr.Tenant, fltr.ID)
|
||||
}
|
||||
if !m.dryRun {
|
||||
if err = m.dmOut.DataManager().SetFilter(fltr, true); err != nil {
|
||||
return fmt.Errorf("Error: <%s> when setting filter with tenant: <%s> and id: <%s> after migration",
|
||||
err.Error(), fltr.Tenant, fltr.ID)
|
||||
}
|
||||
}
|
||||
m.stats[utils.RQF]++
|
||||
}
|
||||
if m.dryRun || !migrated {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
switch migratedFrom {
|
||||
case 1:
|
||||
if err := m.migrateOthersv1(); err != nil {
|
||||
return err
|
||||
if err = m.migrateOthersv1(); err != nil {
|
||||
return
|
||||
}
|
||||
case 2:
|
||||
if err := m.migrateOthersV2(); err != nil {
|
||||
return err
|
||||
if err = m.migrateOthersV2(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err = m.setVersions(utils.RQF); err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
return m.ensureIndexesDataDB(engine.ColFlt)
|
||||
}
|
||||
@@ -791,3 +804,71 @@ type v1FilterRule struct {
|
||||
rsrFields config.RSRParsers // Cache here the RSRFilter Values
|
||||
negative *bool
|
||||
}
|
||||
|
||||
func (m *Migrator) migrateRequestFilterV4(v4Fltr *engine.Filter) (fltr *engine.Filter, err error) {
|
||||
if v4Fltr == nil {
|
||||
// read data from DataDB
|
||||
v4Fltr, err = m.dmIN.getV4Filter()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
//migrate
|
||||
fltr = &engine.Filter{
|
||||
Tenant: v4Fltr.Tenant,
|
||||
ID: v4Fltr.ID,
|
||||
Rules: make([]*engine.FilterRule, 0, len(v4Fltr.Rules)),
|
||||
ActivationInterval: v4Fltr.ActivationInterval,
|
||||
}
|
||||
for _, rule := range v4Fltr.Rules {
|
||||
if rule.Type != utils.MetaRSR &&
|
||||
rule.Type == utils.MetaNotRSR {
|
||||
fltr.Rules = append(fltr.Rules, rule)
|
||||
continue
|
||||
}
|
||||
for _, val := range rule.Values {
|
||||
if !strings.HasSuffix(val, utils.FilterValEnd) { // is not a filter so we ignore this value
|
||||
continue
|
||||
}
|
||||
fltrStart := strings.Index(val, utils.FilterValStart)
|
||||
if fltrStart < 1 {
|
||||
return nil, fmt.Errorf("invalid RSRFilter start rule in string: <%s> for filter<%s>", val, fltr.TenantID())
|
||||
}
|
||||
fltr.Rules = append(fltr.Rules, &engine.FilterRule{
|
||||
Type: rule.Type,
|
||||
Element: val[:fltrStart],
|
||||
Values: strings.Split(val[fltrStart+1:len(val)-1], utils.ANDSep),
|
||||
})
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func migrateInlineFilterV4(v4fltIDs []string) (fltrIDs []string, err error) {
|
||||
fltrIDs = make([]string, 0, len(v4fltIDs))
|
||||
for _, v4flt := range v4fltIDs {
|
||||
var fltr string
|
||||
if strings.HasPrefix(v4flt, utils.MetaRSR) {
|
||||
fltr = utils.MetaRSR + utils.InInFieldSep
|
||||
v4flt = strings.TrimPrefix(v4flt, utils.MetaRSR+utils.InInFieldSep+utils.InInFieldSep)
|
||||
} else if strings.HasPrefix(v4flt, utils.MetaNotRSR) {
|
||||
fltr = utils.MetaNotRSR + utils.InInFieldSep
|
||||
v4flt = strings.TrimPrefix(v4flt, utils.MetaNotRSR+utils.InInFieldSep+utils.InInFieldSep)
|
||||
} else {
|
||||
fltrIDs = append(fltrIDs, v4flt)
|
||||
}
|
||||
for _, val := range strings.Split(v4flt, utils.InInFieldSep) {
|
||||
if !strings.HasSuffix(val, utils.FilterValEnd) { // is not a filter so we ignore this value
|
||||
continue
|
||||
}
|
||||
fltrStart := strings.Index(val, utils.FilterValStart)
|
||||
if fltrStart < 1 {
|
||||
return nil, fmt.Errorf("invalid RSRFilter start rule in string: <%s> ", val)
|
||||
}
|
||||
fltrVal := val[fltrStart+1 : len(val)-1]
|
||||
fltrIDs = append(fltrIDs, fltr+val[:fltrStart]+utils.InInFieldSep+
|
||||
strings.Replace(fltrVal, utils.ANDSep, utils.INFIELD_SEP, strings.Count(fltrVal, utils.ANDSep)))
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -45,8 +45,8 @@ func TestFiltersInlineMigrate(t *testing.T) {
|
||||
exp: "",
|
||||
},
|
||||
{
|
||||
in: "*rsr:~Tenant:~^cgr.*\\.org$",
|
||||
exp: "*rsr:~Tenant:~^cgr.*\\.org$",
|
||||
in: "*rsr::~Tenant(~^cgr.*\\.org$)",
|
||||
exp: "*rsr::~Tenant(~^cgr.*\\.org$)",
|
||||
},
|
||||
}
|
||||
for _, m := range data {
|
||||
@@ -270,8 +270,8 @@ func TestFiltersInlineV2Migrate(t *testing.T) {
|
||||
exp: "",
|
||||
},
|
||||
{
|
||||
in: "*rsr:~Tenant:~^cgr.*\\.org$",
|
||||
exp: "*rsr:~*req.Tenant:~^cgr.*\\.org$",
|
||||
in: "*rsr::~Tenant(~^cgr.*\\.org$)",
|
||||
exp: "*rsr::~*req.Tenant(~^cgr.*\\.org$)",
|
||||
},
|
||||
}
|
||||
for _, m := range data {
|
||||
|
||||
@@ -70,10 +70,12 @@ type MigratorDataDB interface {
|
||||
getV4AttributeProfile() (v4attrPrf *v4AttributeProfile, err error)
|
||||
setV4AttributeProfile(x *v4AttributeProfile) (err error)
|
||||
remV4AttributeProfile(tenant, id string) (err error)
|
||||
getV5AttributeProfile() (v5attrPrf *engine.AttributeProfile, err error)
|
||||
|
||||
getV1Filter() (v1Fltr *v1Filter, err error)
|
||||
setV1Filter(x *v1Filter) (err error)
|
||||
remV1Filter(tenant, id string) (err error)
|
||||
getV4Filter() (v1Fltr *engine.Filter, err error)
|
||||
|
||||
getSupplier() (spl *SupplierProfile, err error)
|
||||
setSupplier(spl *SupplierProfile) (err error)
|
||||
|
||||
@@ -113,7 +113,7 @@ func (m *Migrator) setVersions(str string) (err error) {
|
||||
err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false)
|
||||
}
|
||||
if err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
err = utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating %s version into StorDB", err.Error(), str))
|
||||
|
||||
@@ -113,7 +113,7 @@ func (m *Migrator) migrateCurrentStats() (err error) {
|
||||
if err := m.dmIN.DataManager().RemoveStatQueue(tntID[0], tntID[1], utils.NonTransactional); err != nil {
|
||||
return err
|
||||
}
|
||||
m.stats[utils.StatS] += 1
|
||||
m.stats[utils.StatS]++
|
||||
}
|
||||
|
||||
return m.moveStatQueueProfile()
|
||||
@@ -137,7 +137,7 @@ func (m *Migrator) migrateV1Stats() (filter *engine.Filter, v2Stats *engine.Stat
|
||||
if filter, v2Stats, sts, err = v1Sts.AsStatQP(); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
m.stats[utils.StatS] += 1
|
||||
m.stats[utils.StatS]++
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -150,8 +150,8 @@ func remakeQueue(sq *engine.StatQueue) (out *engine.StatQueue) {
|
||||
SQMetrics: make(map[string]engine.StatMetric),
|
||||
MinItems: sq.MinItems,
|
||||
}
|
||||
for mId, metric := range sq.SQMetrics {
|
||||
id := utils.ConcatenatedKey(utils.SplitConcatenatedKey(mId)...)
|
||||
for mID, metric := range sq.SQMetrics {
|
||||
id := utils.ConcatenatedKey(utils.SplitConcatenatedKey(mID)...)
|
||||
out.SQMetrics[id] = metric
|
||||
}
|
||||
return
|
||||
@@ -420,10 +420,8 @@ func (v1Sts v1Stat) AsStatQP() (filter *engine.Filter, sq *engine.StatQueue, stq
|
||||
}
|
||||
v1Sts.Metrics[i] = strings.ToLower(v1Sts.Metrics[i])
|
||||
stq.Metrics = append(stq.Metrics, &engine.MetricWithFilters{MetricID: v1Sts.Metrics[i]})
|
||||
if metric, err := engine.NewStatMetric(stq.Metrics[i].MetricID, 0, []string{}); err != nil {
|
||||
if sq.SQMetrics[stq.Metrics[i].MetricID], err = engine.NewStatMetric(stq.Metrics[i].MetricID, 0, []string{}); err != nil {
|
||||
return nil, nil, nil, err
|
||||
} else {
|
||||
sq.SQMetrics[stq.Metrics[i].MetricID] = metric
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -276,6 +276,9 @@ func (iDBMig *internalMigrator) remV3AttributeProfile(tenant, id string) (err er
|
||||
func (iDBMig *internalMigrator) getV4AttributeProfile() (v4attrPrf *v4AttributeProfile, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
func (iDBMig *internalMigrator) getV5AttributeProfile() (v4attrPrf *engine.AttributeProfile, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
//set
|
||||
func (iDBMig *internalMigrator) setV4AttributeProfile(x *v4AttributeProfile) (err error) {
|
||||
@@ -293,6 +296,10 @@ func (iDBMig *internalMigrator) getV1Filter() (v1Fltr *v1Filter, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (iDBMig *internalMigrator) getV4Filter() (v1Fltr *engine.Filter, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
//set
|
||||
func (iDBMig *internalMigrator) setV1Filter(x *v1Filter) (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
|
||||
@@ -661,6 +661,25 @@ func (v1ms *mongoMigrator) getV4AttributeProfile() (v4attrPrf *v4AttributeProfil
|
||||
return v4attrPrf, nil
|
||||
}
|
||||
|
||||
func (v1ms *mongoMigrator) getV5AttributeProfile() (v5attrPrf *engine.AttributeProfile, err error) {
|
||||
if v1ms.cursor == nil {
|
||||
v1ms.cursor, err = v1ms.mgoDB.DB().Collection(v1AttributeProfilesCol).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
|
||||
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
|
||||
v1ms.cursor = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
v5attrPrf = new(engine.AttributeProfile)
|
||||
if err := (*v1ms.cursor).Decode(v5attrPrf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v5attrPrf, nil
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1ms *mongoMigrator) setV4AttributeProfile(x *v4AttributeProfile) (err error) {
|
||||
_, err = v1ms.mgoDB.DB().Collection(v1AttributeProfilesCol).InsertOne(v1ms.mgoDB.GetContext(), x)
|
||||
@@ -694,6 +713,25 @@ func (v1ms *mongoMigrator) getV1Filter() (v1Fltr *v1Filter, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (v1ms *mongoMigrator) getV4Filter() (v4Fltr *engine.Filter, err error) {
|
||||
if v1ms.cursor == nil {
|
||||
v1ms.cursor, err = v1ms.mgoDB.DB().Collection(engine.ColFlt).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
|
||||
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
|
||||
v1ms.cursor = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
v4Fltr = new(engine.Filter)
|
||||
if err := (*v1ms.cursor).Decode(v4Fltr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1ms *mongoMigrator) setV1Filter(x *v1Filter) (err error) {
|
||||
_, err = v1ms.mgoDB.DB().Collection(engine.ColFlt).InsertOne(v1ms.mgoDB.GetContext(), x)
|
||||
|
||||
@@ -843,6 +843,32 @@ func (v1rs *redisMigrator) getV4AttributeProfile() (v3attrPrf *v4AttributeProfil
|
||||
return v4attr, nil
|
||||
}
|
||||
|
||||
func (v1rs *redisMigrator) getV5AttributeProfile() (v5attr *engine.AttributeProfile, err error) {
|
||||
if v1rs.qryIdx == nil {
|
||||
v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.AttributeProfilePrefix)
|
||||
if err != nil {
|
||||
return
|
||||
} else if len(v1rs.dataKeys) == 0 {
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
v1rs.qryIdx = utils.IntPointer(0)
|
||||
}
|
||||
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
|
||||
strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v5attr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
*v1rs.qryIdx = *v1rs.qryIdx + 1
|
||||
} else {
|
||||
v1rs.qryIdx = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1rs *redisMigrator) setV4AttributeProfile(x *v4AttributeProfile) (err error) {
|
||||
key := utils.AttributeProfilePrefix + utils.ConcatenatedKey(x.Tenant, x.ID)
|
||||
@@ -890,6 +916,32 @@ func (v1rs *redisMigrator) getV1Filter() (v1Fltr *v1Filter, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (v1rs *redisMigrator) getV4Filter() (v4Fltr *engine.Filter, err error) {
|
||||
if v1rs.qryIdx == nil {
|
||||
v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.FilterPrefix)
|
||||
if err != nil {
|
||||
return
|
||||
} else if len(v1rs.dataKeys) == 0 {
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
v1rs.qryIdx = utils.IntPointer(0)
|
||||
}
|
||||
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
|
||||
strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v4Fltr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
*v1rs.qryIdx = *v1rs.qryIdx + 1
|
||||
} else {
|
||||
v1rs.qryIdx = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1rs *redisMigrator) setV1Filter(x *v1Filter) (err error) {
|
||||
key := utils.FilterPrefix + utils.ConcatenatedKey(x.Tenant, x.ID)
|
||||
|
||||
Reference in New Issue
Block a user