Add LoaderSv1.Remove API

This commit is contained in:
TeoV
2019-08-23 13:32:39 +03:00
committed by Dan Christian Bogos
parent 20e3ede5d3
commit 9606bb2898
7 changed files with 483 additions and 11 deletions

View File

@@ -43,6 +43,11 @@ func (ldrSv1 *LoaderSv1) Load(args *loaders.ArgsProcessFolder,
return ldrSv1.ldrS.V1Load(args, rply)
}
func (ldrSv1 *LoaderSv1) Remove(args *loaders.ArgsProcessFolder,
rply *string) error {
return ldrSv1.ldrS.V1Remove(args, rply)
}
func (rsv1 *LoaderSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error {
*reply = utils.Pong
return nil

65
console/loader_remove.go Executable file
View File

@@ -0,0 +1,65 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package console
import (
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/utils"
)
func init() {
c := &CmdLoaderRemove{
name: "loader_remove",
rpcMethod: utils.LoaderSv1Remove,
rpcParams: &loaders.ArgsProcessFolder{},
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
}
type CmdLoaderRemove struct {
name string
rpcMethod string
rpcParams *loaders.ArgsProcessFolder
*CommandExecuter
}
func (self *CmdLoaderRemove) Name() string {
return self.name
}
func (self *CmdLoaderRemove) RpcMethod() string {
return self.rpcMethod
}
func (self *CmdLoaderRemove) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &loaders.ArgsProcessFolder{}
}
return self.rpcParams
}
func (self *CmdLoaderRemove) PostprocessRpcParams() error {
return nil
}
func (self *CmdLoaderRemove) RpcResult() interface{} {
var s string
return &s
}

View File

@@ -36,6 +36,13 @@ func (ld LoaderData) TenantID() string {
return utils.ConcatenatedKey(tnt, prflID)
}
func (ld LoaderData) TenantIDStruct() utils.TenantID {
return utils.TenantID{
Tenant: ld[utils.Tenant].(string),
ID: ld[utils.ID].(string),
}
}
// UpdateFromCSV will update LoaderData with data received from fileName,
// contained in record and processed with cfgTpl
func (ld LoaderData) UpdateFromCSV(fileName string, record []string,

View File

@@ -126,13 +126,13 @@ func (ldr *Loader) ListenAndServe(exitChan chan struct{}) (err error) {
}
// ProcessFolder will process the content in the folder with locking
func (ldr *Loader) ProcessFolder(caching string) (err error) {
func (ldr *Loader) ProcessFolder(caching, loadOption string) (err error) {
if err = ldr.lockFolder(); err != nil {
return
}
defer ldr.unlockFolder()
for ldrType := range ldr.rdrs {
if err = ldr.processFiles(ldrType, caching); err != nil {
if err = ldr.processFiles(ldrType, caching, loadOption); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s",
utils.LoaderS, ldr.ldrID, ldrType, err.Error()))
continue
@@ -187,7 +187,7 @@ func (ldr *Loader) moveFiles() (err error) {
return
}
func (ldr *Loader) processFiles(loaderType, caching string) (err error) {
func (ldr *Loader) processFiles(loaderType, caching, loadOption string) (err error) {
for fName := range ldr.rdrs[loaderType] {
var rdr *os.File
if rdr, err = os.Open(path.Join(ldr.tpInDir, fName)); err != nil {
@@ -198,13 +198,22 @@ func (ldr *Loader) processFiles(loaderType, caching string) (err error) {
ldr.rdrs[loaderType][fName] = &openedCSVFile{
fileName: fName, rdr: rdr, csvRdr: csvReader}
defer ldr.unreferenceFile(loaderType, fName)
if err = ldr.processContent(loaderType, caching); err != nil {
return
// based on load option will store or remove the content
switch loadOption {
case utils.MetaStore:
if err = ldr.processContent(loaderType, caching); err != nil {
return
}
case utils.MetaRemove:
if err = ldr.removeContent(loaderType, caching); err != nil {
return
}
}
}
return
}
//processContent will process the contect and will store it into database
func (ldr *Loader) processContent(loaderType, caching string) (err error) {
// start processing lines
keepLooping := true // controls looping
@@ -324,6 +333,8 @@ func (ldr *Loader) storeLoadedData(loaderType string,
utils.LoaderS, ldr.ldrID, utils.ToJSON(res)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, res.TenantID())
if err := ldr.dm.SetResourceProfile(res, true); err != nil {
return err
}
@@ -333,6 +344,8 @@ func (ldr *Loader) storeLoadedData(loaderType string,
Usages: make(map[string]*engine.ResourceUsage)}); err != nil {
return err
}
cacheArgs.ResourceProfileIDs = &ids
cacheArgs.ResourceIDs = &ids
}
}
case utils.MetaFilters:
@@ -356,9 +369,12 @@ func (ldr *Loader) storeLoadedData(loaderType string,
utils.LoaderS, ldr.ldrID, utils.ToJSON(fltrPrf)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, fltrPrf.TenantID())
if err := ldr.dm.SetFilter(fltrPrf); err != nil {
return err
}
cacheArgs.FilterIDs = &ids
}
}
case utils.MetaStats:
@@ -381,6 +397,8 @@ func (ldr *Loader) storeLoadedData(loaderType string,
utils.LoaderS, ldr.ldrID, utils.ToJSON(stsPrf)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, stsPrf.TenantID())
if err := ldr.dm.SetStatQueueProfile(stsPrf, true); err != nil {
return err
}
@@ -395,6 +413,8 @@ func (ldr *Loader) storeLoadedData(loaderType string,
if err := ldr.dm.SetStatQueue(&engine.StatQueue{Tenant: stsPrf.Tenant, ID: stsPrf.ID, SQMetrics: metrics}); err != nil {
return err
}
cacheArgs.StatsQueueProfileIDs = &ids
cacheArgs.StatsQueueIDs = &ids
}
}
case utils.MetaThresholds:
@@ -417,12 +437,16 @@ func (ldr *Loader) storeLoadedData(loaderType string,
utils.LoaderS, ldr.ldrID, utils.ToJSON(thPrf)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, thPrf.TenantID())
if err := ldr.dm.SetThresholdProfile(thPrf, true); err != nil {
return err
}
if err := ldr.dm.SetThreshold(&engine.Threshold{Tenant: thPrf.Tenant, ID: thPrf.ID}); err != nil {
return err
}
cacheArgs.ThresholdProfileIDs = &ids
cacheArgs.ThresholdIDs = &ids
}
}
case utils.MetaSuppliers:
@@ -446,9 +470,12 @@ func (ldr *Loader) storeLoadedData(loaderType string,
utils.LoaderS, ldr.ldrID, utils.ToJSON(spPrf)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, spPrf.TenantID())
if err := ldr.dm.SetSupplierProfile(spPrf, true); err != nil {
return err
}
cacheArgs.SupplierProfileIDs = &ids
}
}
case utils.MetaChargers:
@@ -472,9 +499,12 @@ func (ldr *Loader) storeLoadedData(loaderType string,
utils.LoaderS, ldr.ldrID, utils.ToJSON(cpp)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, cpp.TenantID())
if err := ldr.dm.SetChargerProfile(cpp, true); err != nil {
return err
}
cacheArgs.ChargerProfileIDs = &ids
}
}
case utils.MetaDispatchers:
@@ -497,9 +527,12 @@ func (ldr *Loader) storeLoadedData(loaderType string,
utils.LoaderS, ldr.ldrID, utils.ToJSON(dsp)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, dsp.TenantID())
if err := ldr.dm.SetDispatcherProfile(dsp, true); err != nil {
return err
}
cacheArgs.DispatcherProfileIDs = &ids
}
}
case utils.MetaDispatcherHosts:
@@ -519,15 +552,17 @@ func (ldr *Loader) storeLoadedData(loaderType string,
utils.LoaderS, ldr.ldrID, utils.ToJSON(dsp)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, dsp.TenantID())
if err := ldr.dm.SetDispatcherHost(dsp); err != nil {
return err
}
cacheArgs.DispatcherHostIDs = &ids
}
}
}
if ldr.cacheS != nil {
var reply string
switch caching {
case utils.META_NONE:
@@ -554,7 +589,250 @@ func (ldr *Loader) storeLoadedData(loaderType string,
return
}
}
}
return
}
//removeContent will process the contect and will remove it from database
func (ldr *Loader) removeContent(loaderType, caching string) (err error) {
// start processing lines
keepLooping := true // controls looping
lineNr := 0
for keepLooping {
lineNr += 1
var hasErrors bool
lData := make(LoaderData) // one row
for fName, rdr := range ldr.rdrs[loaderType] {
var record []string
if record, err = rdr.csvRdr.Read(); err != nil {
if err == io.EOF {
keepLooping = false
break
}
hasErrors = true
utils.Logger.Warning(
fmt.Sprintf("<%s> <%s> reading line: %d, error: %s",
utils.LoaderS, ldr.ldrID, lineNr, err.Error()))
}
if hasErrors { // if any of the readers will give errors, we ignore the line
continue
}
if err := lData.UpdateFromCSV(fName, record,
ldr.dataTpls[loaderType], ldr.tenant, ldr.filterS); err != nil {
fmt.Sprintf("<%s> <%s> line: %d, error: %s",
utils.LoaderS, ldr.ldrID, lineNr, err.Error())
hasErrors = true
continue
}
// Record from map
// update dataDB
}
if len(lData) == 0 { // no data, could be the last line in file
continue
}
tntID := lData.TenantID()
if _, has := ldr.bufLoaderData[tntID]; !has &&
len(ldr.bufLoaderData) == 1 { // process previous records before going futher
var prevTntID string
for prevTntID = range ldr.bufLoaderData {
break // have stolen the existing key in buffer
}
if err = ldr.removeLoadedData(loaderType, prevTntID, caching); err != nil {
return
}
delete(ldr.bufLoaderData, prevTntID)
}
ldr.bufLoaderData[tntID] = append(ldr.bufLoaderData[tntID], lData)
}
// proceed with last element in bufLoaderData
var tntID string
for tntID = range ldr.bufLoaderData {
break // get the first tenantID
}
if err = ldr.removeLoadedData(loaderType, tntID, caching); err != nil {
return
}
delete(ldr.bufLoaderData, tntID)
return
}
//removeLoadedData will remove the data from database
//since we remove we don't need to compose the struct we only need the Tenant and the ID of the profile
func (ldr *Loader) removeLoadedData(loaderType, tntID, caching string) (err error) {
var ids []string
cacheArgs := utils.InitAttrReloadCache()
switch loaderType {
case utils.MetaAttributes:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: AttributeProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveAttributeProfile(tntIDStruct.Tenant, tntIDStruct.ID,
utils.NonTransactional, true); err != nil {
return err
}
cacheArgs.AttributeProfileIDs = &ids
}
case utils.MetaResources:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ResourceProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveResourceProfile(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
}
if err := ldr.dm.RemoveResource(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil {
return err
}
cacheArgs.ResourceProfileIDs = &ids
cacheArgs.ResourceIDs = &ids
}
case utils.MetaFilters:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: Filter: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveFilter(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil {
return err
}
cacheArgs.FilterIDs = &ids
}
case utils.MetaStats:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: StatsQueueProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveStatQueueProfile(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
}
if err := ldr.dm.RemoveStatQueue(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil {
return err
}
cacheArgs.StatsQueueProfileIDs = &ids
cacheArgs.StatsQueueIDs = &ids
}
case utils.MetaThresholds:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ThresholdProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveThresholdProfile(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
}
if err := ldr.dm.RemoveThreshold(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil {
return err
}
cacheArgs.ThresholdProfileIDs = &ids
cacheArgs.ThresholdIDs = &ids
}
case utils.MetaSuppliers:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: SupplierProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveSupplierProfile(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
}
cacheArgs.SupplierProfileIDs = &ids
}
case utils.MetaChargers:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ChargerProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveChargerProfile(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
}
cacheArgs.ChargerProfileIDs = &ids
}
case utils.MetaDispatchers:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveDispatcherProfile(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional, true); err != nil {
return err
}
cacheArgs.DispatcherProfileIDs = &ids
}
case utils.MetaDispatcherHosts:
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherHostID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveDispatcherHost(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil {
return err
}
cacheArgs.DispatcherHostIDs = &ids
}
}
if ldr.cacheS != nil {
var reply string
switch caching {
case utils.META_NONE:
return
case utils.MetaReload:
if err = ldr.cacheS.Call(utils.CacheSv1ReloadCache, utils.AttrReloadCacheWithArgDispatcher{
AttrReloadCache: cacheArgs}, &reply); err != nil {
return
}
case utils.MetaLoad:
if err = ldr.cacheS.Call(utils.CacheSv1LoadCache, utils.AttrReloadCacheWithArgDispatcher{
AttrReloadCache: cacheArgs}, &reply); err != nil {
return
}
case utils.MetaRemove:
if err = ldr.cacheS.Call(utils.CacheSv1FlushCache, utils.AttrReloadCacheWithArgDispatcher{
AttrReloadCache: cacheArgs}, &reply); err != nil {
return
}
case utils.MetaClear:
cacheArgs.FlushAll = true
if err = ldr.cacheS.Call(utils.CacheSv1FlushCache, utils.AttrReloadCacheWithArgDispatcher{
AttrReloadCache: cacheArgs}, &reply); err != nil {
return
}
}
}
return
}

View File

@@ -1109,3 +1109,87 @@ func TestLoaderProcessDispatcheHosts(t *testing.T) {
t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eDispHost), utils.ToJSON(rcv))
}
}
func TestLoaderRemoveContentSingleFile(t *testing.T) {
data, _ := engine.NewMapStorage()
ldr := &Loader{
ldrID: "TestLoaderProcessContent",
bufLoaderData: make(map[string][]LoaderData),
dm: engine.NewDataManager(data),
timezone: "UTC",
}
ldr.dataTpls = map[string][]*config.FCTemplate{
utils.MetaAttributes: []*config.FCTemplate{
&config.FCTemplate{Tag: "TenantID",
FieldId: "Tenant",
Type: utils.META_COMPOSED,
Value: config.NewRSRParsersMustCompile("~0", true, utils.INFIELD_SEP),
Mandatory: true},
&config.FCTemplate{Tag: "ProfileID",
FieldId: "ID",
Type: utils.META_COMPOSED,
Value: config.NewRSRParsersMustCompile("~1", true, utils.INFIELD_SEP),
Mandatory: true},
},
}
rdr := ioutil.NopCloser(strings.NewReader(engine.AttributesCSVContent))
csvRdr := csv.NewReader(rdr)
csvRdr.Comment = '#'
ldr.rdrs = map[string]map[string]*openedCSVFile{
utils.MetaAttributes: map[string]*openedCSVFile{
"Attributes.csv": &openedCSVFile{fileName: "Attributes.csv",
rdr: rdr, csvRdr: csvRdr}},
}
// Add two attributeProfiles
ap := &engine.AttributeProfile{
Tenant: "cgrates.org",
ID: "ALS1",
Contexts: []string{"con1", "con2", "con3"},
FilterIDs: []string{"*string:~Account:1001"},
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 29, 15, 0, 0, 0, time.UTC)},
Attributes: []*engine.Attribute{
&engine.Attribute{
FilterIDs: []string{"*string:~Field1:Initial"},
FieldName: "Field1",
Type: utils.MetaVariable,
Value: config.NewRSRParsersMustCompile("Sub1", true, utils.INFIELD_SEP),
},
&engine.Attribute{
FilterIDs: []string{},
FieldName: "Field2",
Type: utils.MetaVariable,
Value: config.NewRSRParsersMustCompile("Sub2", true, utils.INFIELD_SEP),
}},
Blocker: true,
Weight: 20,
}
if err := ldr.dm.SetAttributeProfile(ap, true); err != nil {
t.Error(err)
}
ap.ID = "Attr2"
if err := ldr.dm.SetAttributeProfile(ap, true); err != nil {
t.Error(err)
}
if err := ldr.removeContent(utils.MetaAttributes, utils.EmptyString); err != nil {
t.Error(err)
}
if len(ldr.bufLoaderData) != 0 {
t.Errorf("wrong buffer content: %+v", ldr.bufLoaderData)
}
// make sure the first attribute is deleted
if _, err := ldr.dm.GetAttributeProfile("cgrates.org", "ALS1",
true, false, utils.NonTransactional); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
// the second should be there
if rcv, err := ldr.dm.GetAttributeProfile("cgrates.org", "Attr2",
true, false, utils.NonTransactional); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(ap, rcv) {
t.Errorf("expecting: %s, \n received: %s",
utils.ToJSON(ap), utils.ToJSON(rcv))
}
}

View File

@@ -101,7 +101,38 @@ func (ldrS *LoaderService) V1Load(args *ArgsProcessFolder,
if args.Caching != nil {
caching = *args.Caching
}
if err := ldr.ProcessFolder(caching); err != nil {
if err := ldr.ProcessFolder(caching, utils.MetaStore); err != nil {
return utils.NewErrServerError(err)
}
*rply = utils.OK
return
}
func (ldrS *LoaderService) V1Remove(args *ArgsProcessFolder,
rply *string) (err error) {
if args.LoaderID == "" {
args.LoaderID = utils.META_DEFAULT
}
ldr, has := ldrS.ldrs[args.LoaderID]
if !has {
return fmt.Errorf("UNKNOWN_LOADER: %s", args.LoaderID)
}
if locked, err := ldr.isFolderLocked(); err != nil {
return utils.NewErrServerError(err)
} else if locked {
if args.ForceLock {
if err := ldr.unlockFolder(); err != nil {
return utils.NewErrServerError(err)
}
}
return errors.New("ANOTHER_LOADER_RUNNING")
}
//verify If Caching is present in arguments
caching := config.CgrConfig().GeneralCfg().DefaultCaching
if args.Caching != nil {
caching = *args.Caching
}
if err := ldr.ProcessFolder(caching, utils.MetaRemove); err != nil {
return utils.NewErrServerError(err)
}
*rply = utils.OK

View File

@@ -532,6 +532,7 @@ const (
MetaReload = "*reload"
MetaLoad = "*load"
MetaRemove = "*remove"
MetaStore = "*store"
MetaClear = "*clear"
LoadIDs = "load_ids"
DNSAgent = "DNSAgent"
@@ -976,9 +977,10 @@ const (
// LoaderS APIs
const (
LoaderSv1 = "LoaderSv1"
LoaderSv1Load = "LoaderSv1.Load"
LoaderSv1Ping = "LoaderSv1.Ping"
LoaderSv1 = "LoaderSv1"
LoaderSv1Load = "LoaderSv1.Load"
LoaderSv1Remove = "LoaderSv1.Remove"
LoaderSv1Ping = "LoaderSv1.Ping"
)
// CacheS APIs