Update LoaderS with SetRateProfileRates support + update ansible script for debian

This commit is contained in:
TeoV
2020-07-01 17:18:26 +03:00
parent a61b759e89
commit f135d87728
6 changed files with 202 additions and 159 deletions

View File

@@ -470,6 +470,7 @@ type RouteSJsonCfg struct {
type LoaderJsonDataType struct {
Type *string
File_name *string
Flags *[]string
Fields *[]*FcTemplateJsonCfg
}

View File

@@ -71,6 +71,7 @@ func NewDfltLoaderDataTypeConfig() *LoaderDataType {
type LoaderDataType struct { //rename to LoaderDataType
Type string
Filename string
Flags utils.FlagsWithParams
Fields []*FCTemplate
}
@@ -84,6 +85,11 @@ func (self *LoaderDataType) loadFromJsonCfg(jsnCfg *LoaderJsonDataType, separato
if jsnCfg.File_name != nil {
self.Filename = *jsnCfg.File_name
}
if jsnCfg.Flags != nil {
if self.Flags, err = utils.FlagsWithParamsFromSlice(*jsnCfg.Flags); err != nil {
return
}
}
if jsnCfg.Fields != nil {
if self.Fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Fields, separator); err != nil {
return

View File

@@ -229,7 +229,7 @@
# Move the file to PKG server
- name: Copy the file to PKG server
become: yes
shell: 'scp /var/packages/debian/incoming/{{ item }} {{ {{ pkgAddr }} }}:/tmp/'
shell: 'scp /var/packages/debian/incoming/{{ item }} {{ pkgAddr }}:/tmp/'
args:
chdir: /var/packages/debian/incoming/
with_items: '{{ debFileName.stdout_lines }}'

View File

@@ -269,7 +269,7 @@
# Move the file to PKG server
- name: Copy the file to PKG server
shell: 'scp cgr_build/RPMS/x86_64/{{ item }} {{ pkgAddr }}:/tmp/'
shell: 'scp cgr_build/RPMS/x86_64/{{ item }} {{ pkgAddr }}:/tmp/'
with_items: '{{ rmpFileName.stdout_lines }}'
- name: Sign with rpm --addsign the .rpm file

View File

@@ -51,6 +51,7 @@ func NewLoader(dm *engine.DataManager, cfg *config.LoaderSCfg,
lockFilename: cfg.LockFileName,
fieldSep: cfg.FieldSeparator,
dataTpls: make(map[string][]*config.FCTemplate),
flagsTpls: make(map[string]utils.FlagsWithParams),
rdrs: make(map[string]map[string]*openedCSVFile),
bufLoaderData: make(map[string][]LoaderData),
dm: dm,
@@ -61,6 +62,7 @@ func NewLoader(dm *engine.DataManager, cfg *config.LoaderSCfg,
}
for _, ldrData := range cfg.Data {
ldr.dataTpls[ldrData.Type] = ldrData.Fields
ldr.flagsTpls[ldrData.Type] = ldrData.Flags
ldr.rdrs[ldrData.Type] = make(map[string]*openedCSVFile)
if ldrData.Filename != "" {
ldr.rdrs[ldrData.Type][ldrData.Filename] = nil
@@ -87,6 +89,7 @@ type Loader struct {
lockFilename string
fieldSep string
dataTpls map[string][]*config.FCTemplate // map[loaderType]*config.FCTemplate
flagsTpls map[string]utils.FlagsWithParams //map[loaderType]utils.FlagsWithParams
rdrs map[string]map[string]*openedCSVFile // map[loaderType]map[fileName]*openedCSVFile for common incremental read
procRows int // keep here the last processed row in the file/-s
bufLoaderData map[string][]LoaderData // cache of data read, indexed on tenantID
@@ -572,11 +575,17 @@ func (ldr *Loader) storeLoadedData(loaderType string,
}
// get IDs so we can reload in cache
ids = append(ids, rpl.TenantID())
if err := ldr.dm.SetRateProfile(rpl, true); err != nil {
return err
if ldr.flagsTpls[loaderType].GetBool(utils.MetaPartial) {
if err := ldr.dm.SetRateProfileRates(rpl, true); err != nil {
return err
}
} else {
if err := ldr.dm.SetRateProfile(rpl, true); err != nil {
return err
}
}
cacheArgs.RateProfileIDs = ids
cachePartition = utils.CacheDispatcherProfiles
cachePartition = utils.CacheRateProfiles
}
}
}
@@ -666,7 +675,8 @@ func (ldr *Loader) removeContent(loaderType, caching string) (err error) {
for prevTntID = range ldr.bufLoaderData {
break // have stolen the existing key in buffer
}
if err = ldr.removeLoadedData(loaderType, prevTntID, caching); err != nil {
if err = ldr.removeLoadedData(loaderType,
map[string][]LoaderData{prevTntID: ldr.bufLoaderData[prevTntID]}, caching); err != nil {
return
}
delete(ldr.bufLoaderData, prevTntID)
@@ -678,7 +688,8 @@ func (ldr *Loader) removeContent(loaderType, caching string) (err error) {
for tntID = range ldr.bufLoaderData {
break // get the first tenantID
}
if err = ldr.removeLoadedData(loaderType, tntID, caching); err != nil {
if err = ldr.removeLoadedData(loaderType,
map[string][]LoaderData{tntID: ldr.bufLoaderData[tntID]}, caching); err != nil {
return
}
delete(ldr.bufLoaderData, tntID)
@@ -687,183 +698,204 @@ func (ldr *Loader) removeContent(loaderType, caching string) (err error) {
//removeLoadedData will remove the data from database
//since we remove we don't need to compose the struct we only need the Tenant and the ID of the profile
func (ldr *Loader) removeLoadedData(loaderType, tntID, caching string) (err error) {
func (ldr *Loader) removeLoadedData(loaderType string, lds map[string][]LoaderData, caching string) (err error) {
var ids []string
var cacheArgs utils.ArgsCache
var cachePartition string
switch loaderType {
case utils.MetaAttributes:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: AttributeProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveAttributeProfile(tntIDStruct.Tenant, tntIDStruct.ID,
utils.NonTransactional, true); err != nil {
return err
for tntID, _ := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: AttributeProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveAttributeProfile(tntIDStruct.Tenant, tntIDStruct.ID,
utils.NonTransactional, true); err != nil {
return err
}
cacheArgs.AttributeProfileIDs = ids
cachePartition = utils.CacheAttributeProfiles
}
cacheArgs.AttributeProfileIDs = ids
cachePartition = utils.CacheAttributeProfiles
}
case utils.MetaResources:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ResourceProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveResourceProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
case utils.MetaResources:
for tntID, _ := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ResourceProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveResourceProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
}
if err := ldr.dm.RemoveResource(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil {
return err
}
cacheArgs.ResourceProfileIDs = ids
cacheArgs.ResourceIDs = ids
cachePartition = utils.CacheResourceProfiles
}
if err := ldr.dm.RemoveResource(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil {
return err
}
cacheArgs.ResourceProfileIDs = ids
cacheArgs.ResourceIDs = ids
cachePartition = utils.CacheResourceProfiles
}
case utils.MetaFilters:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: Filter: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveFilter(tntIDStruct.Tenant, tntIDStruct.ID,
utils.NonTransactional, true); err != nil {
return err
for tntID, _ := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: Filter: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveFilter(tntIDStruct.Tenant, tntIDStruct.ID,
utils.NonTransactional, true); err != nil {
return err
}
cacheArgs.FilterIDs = ids
cachePartition = utils.CacheFilters
}
cacheArgs.FilterIDs = ids
cachePartition = utils.CacheFilters
}
case utils.MetaStats:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: StatsQueueProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveStatQueueProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
for tntID, _ := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: StatsQueueProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveStatQueueProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
}
if err := ldr.dm.RemoveStatQueue(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil {
return err
}
cacheArgs.StatsQueueProfileIDs = ids
cacheArgs.StatsQueueIDs = ids
cachePartition = utils.CacheStatQueueProfiles
}
if err := ldr.dm.RemoveStatQueue(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil {
return err
}
cacheArgs.StatsQueueProfileIDs = ids
cacheArgs.StatsQueueIDs = ids
cachePartition = utils.CacheStatQueueProfiles
}
case utils.MetaThresholds:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ThresholdProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveThresholdProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
for tntID, _ := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ThresholdProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveThresholdProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
}
if err := ldr.dm.RemoveThreshold(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil {
return err
}
cacheArgs.ThresholdProfileIDs = ids
cacheArgs.ThresholdIDs = ids
cachePartition = utils.CacheThresholdProfiles
}
if err := ldr.dm.RemoveThreshold(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil {
return err
}
cacheArgs.ThresholdProfileIDs = ids
cacheArgs.ThresholdIDs = ids
cachePartition = utils.CacheThresholdProfiles
}
case utils.MetaRoutes:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: RouteProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveRouteProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
for tntID, _ := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: RouteProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveRouteProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
}
cacheArgs.RouteProfileIDs = ids
cachePartition = utils.CacheRouteProfiles
}
cacheArgs.RouteProfileIDs = ids
cachePartition = utils.CacheRouteProfiles
}
case utils.MetaChargers:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ChargerProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveChargerProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
for tntID, _ := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ChargerProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveChargerProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
}
cacheArgs.ChargerProfileIDs = ids
cachePartition = utils.CacheChargerProfiles
}
cacheArgs.ChargerProfileIDs = ids
cachePartition = utils.CacheChargerProfiles
}
case utils.MetaDispatchers:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveDispatcherProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
for tntID, _ := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveDispatcherProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
}
cacheArgs.DispatcherProfileIDs = ids
cachePartition = utils.CacheDispatcherProfiles
}
cacheArgs.DispatcherProfileIDs = ids
cachePartition = utils.CacheDispatcherProfiles
}
case utils.MetaDispatcherHosts:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherHostID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveDispatcherHost(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional); err != nil {
return err
for tntID, _ := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherHostID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveDispatcherHost(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional); err != nil {
return err
}
cacheArgs.DispatcherHostIDs = ids
cachePartition = utils.CacheDispatcherHosts
}
cacheArgs.DispatcherHostIDs = ids
cachePartition = utils.CacheDispatcherHosts
}
case utils.MetaRateProfiles:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: RateProfileIDs: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveRateProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
for tntID, _ := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: RateProfileIDs: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveRateProfile(tntIDStruct.Tenant,
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
}
cacheArgs.RateProfileIDs = ids
cachePartition = utils.CacheRateProfiles
}
cacheArgs.RateProfileIDs = ids
cachePartition = utils.CacheRateProfiles
}
}
@@ -885,14 +917,16 @@ func (ldr *Loader) removeLoadedData(loaderType, tntID, caching string) (err erro
return
}
case utils.MetaRemove:
if err = ldr.connMgr.Call(ldr.cacheConns, nil,
utils.CacheSv1RemoveItem, &utils.ArgsGetCacheItemWithArgDispatcher{
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: cachePartition,
ItemID: tntID,
},
}, &reply); err != nil {
return
for tntID, _ := range lds {
if err = ldr.connMgr.Call(ldr.cacheConns, nil,
utils.CacheSv1RemoveItem, &utils.ArgsGetCacheItemWithArgDispatcher{
ArgsGetCacheItem: utils.ArgsGetCacheItem{
CacheID: cachePartition,
ItemID: tntID,
},
}, &reply); err != nil {
return
}
}
case utils.MetaClear:
if err = ldr.connMgr.Call(ldr.cacheConns, nil,

View File

@@ -766,6 +766,7 @@ const (
MetaWeekly = "*weekly"
RateS = "RateS"
Underline = "_"
MetaPartial = "*partial"
)
// Migrator Action
@@ -791,6 +792,7 @@ const (
MetaIndexes = "*indexes"
MetaDispatcherProfiles = "*dispatcher_profiles"
MetaRateProfiles = "*rate_profiles"
MetaRateProfileRates = "*rate_profile_rates"
MetaChargerProfiles = "*charger_profiles"
MetaSharedGroups = "*shared_groups"
MetaThresholds = "*thresholds"