Files
cgrates/loaders/loader.go
2025-05-29 15:40:20 +02:00

1017 lines
30 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package loaders
import (
"encoding/csv"
"fmt"
"io"
"os"
"path"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
type openedCSVFile struct {
fileName string
rdr io.ReadCloser // keep reference so we can close it when done
csvRdr *csv.Reader
}
func NewLoader(dm *engine.DataManager, cfg *config.LoaderSCfg,
timezone string, cachingDlay time.Duration, filterS *engine.FilterS,
connMgr *engine.ConnManager, cacheConns []string) (ldr *Loader) {
ldr = &Loader{
enabled: cfg.Enabled,
tenant: cfg.Tenant,
dryRun: cfg.DryRun,
ldrID: cfg.ID,
tpInDir: cfg.TpInDir,
tpOutDir: cfg.TpOutDir,
lockFilepath: cfg.GetLockFilePath(),
fieldSep: cfg.FieldSeparator,
runDelay: cfg.RunDelay,
cachingDelay: cachingDlay,
dataTpls: make(map[string][]*config.FCTemplate),
flagsTpls: make(map[string]utils.FlagsWithParams),
rdrs: make(map[string]map[string]*openedCSVFile),
bufLoaderData: make(map[string][]LoaderData),
dm: dm,
timezone: timezone,
filterS: filterS,
connMgr: connMgr,
cacheConns: cacheConns,
}
for _, ldrData := range cfg.Data {
ldr.dataTpls[ldrData.Type] = ldrData.Fields
ldr.flagsTpls[ldrData.Type] = ldrData.Flags
ldr.rdrs[ldrData.Type] = make(map[string]*openedCSVFile)
if ldrData.Filename != "" {
ldr.rdrs[ldrData.Type][ldrData.Filename] = nil
}
for _, cfgFld := range ldrData.Fields { // add all possible files to be opened
for _, cfgFldVal := range cfgFld.Value {
rule := cfgFldVal.Rules
if !strings.HasPrefix(rule, utils.DynamicDataPrefix+utils.MetaFile+utils.FilterValStart) {
continue
}
if idxEnd := strings.Index(rule, utils.FilterValEnd); idxEnd != -1 {
ldr.rdrs[ldrData.Type][rule[7:idxEnd]] = nil
}
}
}
}
return
}
// Loader is one instance loading from a folder
type Loader struct {
enabled bool
tenant string
dryRun bool
ldrID string
tpInDir string
tpOutDir string
lockFilepath string
fieldSep string
runDelay time.Duration
cachingDelay time.Duration
dataTpls map[string][]*config.FCTemplate // map[loaderType]*config.FCTemplate
flagsTpls map[string]utils.FlagsWithParams //map[loaderType]utils.FlagsWithParams
rdrs map[string]map[string]*openedCSVFile // map[loaderType]map[fileName]*openedCSVFile for common incremental read
bufLoaderData map[string][]LoaderData // cache of data read, indexed on tenantID
dm *engine.DataManager
timezone string
filterS *engine.FilterS
connMgr *engine.ConnManager
cacheConns []string
}
func (ldr *Loader) ListenAndServe(stopChan chan struct{}) (err error) {
utils.Logger.Info(fmt.Sprintf("Starting <%s-%s>", utils.LoaderS, ldr.ldrID))
return ldr.serve(stopChan)
}
// ProcessFolder will process the content in the folder with locking
func (ldr *Loader) ProcessFolder(caching, loadOption string, stopOnError bool) (err error) {
if err = ldr.lockFolder(); err != nil {
return
}
defer ldr.unlockFolder()
for ldrType := range ldr.rdrs {
if err = ldr.processFiles(ldrType, caching, loadOption); err != nil {
if stopOnError {
return
}
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s",
utils.LoaderS, ldr.ldrID, ldrType, err.Error()))
continue
}
}
return ldr.moveFiles()
}
// lockFolder will attempt to lock the folder by creating the lock file
func (ldr *Loader) lockFolder() (err error) {
// If the path is an empty string, we should not be locking
if ldr.lockFilepath == utils.EmptyString {
return
}
_, err = os.OpenFile(ldr.lockFilepath,
os.O_RDONLY|os.O_CREATE, 0644)
return
}
func (ldr *Loader) unlockFolder() (err error) {
// If the path is an empty string, we should not be locking
if ldr.lockFilepath == utils.EmptyString {
return
}
if _, err = os.Stat(ldr.lockFilepath); err == nil {
return os.Remove(ldr.lockFilepath)
}
return
}
func (ldr *Loader) isFolderLocked() (locked bool, err error) {
// If the path is an empty string, we should not be locking
if ldr.lockFilepath == utils.EmptyString {
return
}
if _, err = os.Stat(ldr.lockFilepath); err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return
}
// unreferenceFile will cleanup an used file by closing and removing from referece map
func (ldr *Loader) unreferenceFile(loaderType, fileName string) (err error) {
openedCSVFile := ldr.rdrs[loaderType][fileName]
ldr.rdrs[loaderType][fileName] = nil
return openedCSVFile.rdr.Close()
}
func (ldr *Loader) moveFiles() (err error) {
if ldr.tpOutDir == utils.EmptyString {
return
}
filesInDir, _ := os.ReadDir(ldr.tpInDir)
for _, file := range filesInDir {
fName := file.Name()
if fName == ldr.lockFilepath {
continue
}
oldPath := path.Join(ldr.tpInDir, fName)
newPath := path.Join(ldr.tpOutDir, fName)
if err = os.Rename(oldPath, newPath); err != nil {
return
}
}
return
}
func (ldr *Loader) processFiles(loaderType, caching, loadOption string) (err error) {
for fName := range ldr.rdrs[loaderType] {
var rdr *os.File
if rdr, err = os.Open(path.Join(ldr.tpInDir, fName)); err != nil {
return err
}
csvReader := csv.NewReader(rdr)
csvReader.Comma = rune(ldr.fieldSep[0])
csvReader.Comment = '#'
ldr.rdrs[loaderType][fName] = &openedCSVFile{
fileName: fName, rdr: rdr, csvRdr: csvReader}
defer ldr.unreferenceFile(loaderType, fName)
}
// based on load option will store or remove the content
switch loadOption {
case utils.MetaStore:
return ldr.processContent(loaderType, caching)
case utils.MetaRemove:
return ldr.removeContent(loaderType, caching)
}
return
}
// processContent will process the contect and will store it into database
func (ldr *Loader) processContent(loaderType, caching string) (err error) {
// start processing lines
keepLooping := true // controls looping
lineNr := 0
for keepLooping {
lineNr++
var hasErrors bool
lData := make(LoaderData) // one row
for fName, rdr := range ldr.rdrs[loaderType] {
var record []string
if record, err = rdr.csvRdr.Read(); err != nil {
if err == io.EOF {
keepLooping = false
break
}
hasErrors = true
utils.Logger.Warning(
fmt.Sprintf("<%s> <%s> reading line: %d, error: %s",
utils.LoaderS, ldr.ldrID, lineNr, err.Error()))
}
if hasErrors { // if any of the readers will give errors, we ignore the line
continue
}
if err := lData.UpdateFromCSV(fName, record,
ldr.dataTpls[loaderType], ldr.tenant, ldr.filterS); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> <%s> line: %d, error: %s",
utils.LoaderS, ldr.ldrID, lineNr, err.Error()))
hasErrors = true
continue
}
// Record from map
// update dataDB
}
if len(lData) == 0 { // no data, could be the last line in file
continue
}
tntID := lData.TenantID()
if _, has := ldr.bufLoaderData[tntID]; !has &&
len(ldr.bufLoaderData) == 1 { // process previous records before going futher
var prevTntID string
for prevTntID = range ldr.bufLoaderData {
break // have stolen the existing key in buffer
}
if err = ldr.storeLoadedData(loaderType,
map[string][]LoaderData{prevTntID: ldr.bufLoaderData[prevTntID]}, caching); err != nil {
return
}
delete(ldr.bufLoaderData, prevTntID)
}
ldr.bufLoaderData[tntID] = append(ldr.bufLoaderData[tntID], lData)
}
// proceed with last element in bufLoaderData
var tntID string
for tntID = range ldr.bufLoaderData {
break // get the first tenantID
}
if err = ldr.storeLoadedData(loaderType,
map[string][]LoaderData{tntID: ldr.bufLoaderData[tntID]}, caching); err != nil {
return
}
delete(ldr.bufLoaderData, tntID)
return
}
func (ldr *Loader) storeLoadedData(loaderType string,
lds map[string][]LoaderData, caching string) (err error) {
var ids []string
cacheArgs := make(map[string][]string)
var cacheIDs []string // verify if we need to clear indexe
switch loaderType {
case utils.MetaAttributes:
cacheIDs = []string{utils.CacheAttributeFilterIndexes}
for _, lDataSet := range lds {
attrModels := make(engine.AttributeMdls, len(lDataSet))
for i, ld := range lDataSet {
attrModels[i] = new(engine.AttributeMdl)
if err = utils.UpdateStructWithIfaceMap(attrModels[i], ld); err != nil {
return
}
}
for _, tpApf := range attrModels.AsTPAttributes() {
apf, err := engine.APItoAttributeProfile(tpApf, ldr.timezone)
if err != nil {
return err
}
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: AttributeProfile: %s",
utils.LoaderS, ldr.ldrID, utils.ToJSON(apf)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, apf.TenantID())
if err := ldr.dm.SetAttributeProfile(apf, true); err != nil {
return err
}
}
cacheArgs[utils.CacheAttributeProfiles] = ids
}
case utils.MetaResources:
cacheIDs = []string{utils.CacheResourceFilterIndexes}
for _, lDataSet := range lds {
resModels := make(engine.ResourceMdls, len(lDataSet))
for i, ld := range lDataSet {
resModels[i] = new(engine.ResourceMdl)
if err = utils.UpdateStructWithIfaceMap(resModels[i], ld); err != nil {
return
}
}
for _, tpRes := range resModels.AsTPResources() {
res, err := engine.APItoResource(tpRes, ldr.timezone)
if err != nil {
return err
}
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ResourceProfile: %s",
utils.LoaderS, ldr.ldrID, utils.ToJSON(res)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, res.TenantID())
if err := ldr.dm.SetResourceProfile(res, true); err != nil {
return err
}
cacheArgs[utils.CacheResourceProfiles] = ids
cacheArgs[utils.CacheResources] = ids
}
}
case utils.MetaIPs:
cacheIDs = []string{utils.CacheIPFilterIndexes}
for _, lDataSet := range lds {
ipMdls := make(engine.IPMdls, len(lDataSet))
for i, ld := range lDataSet {
ipMdls[i] = new(engine.IPMdl)
if err = utils.UpdateStructWithIfaceMap(ipMdls[i], ld); err != nil {
return
}
}
for _, tpIP := range ipMdls.AsTPIPs() {
res, err := engine.APItoIP(tpIP, ldr.timezone)
if err != nil {
return err
}
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: IPProfile: %s",
utils.LoaderS, ldr.ldrID, utils.ToJSON(res)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, res.TenantID())
if err := ldr.dm.SetIPProfile(res, true); err != nil {
return err
}
cacheArgs[utils.CacheIPProfiles] = ids
cacheArgs[utils.CacheIPs] = ids
}
}
case utils.MetaFilters:
for _, lDataSet := range lds {
fltrModels := make(engine.FilterMdls, len(lDataSet))
for i, ld := range lDataSet {
fltrModels[i] = new(engine.FilterMdl)
if err = utils.UpdateStructWithIfaceMap(fltrModels[i], ld); err != nil {
return
}
}
for _, tpFltr := range fltrModels.AsTPFilter() {
fltrPrf, err := engine.APItoFilter(tpFltr, ldr.timezone)
if err != nil {
return err
}
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: Filter: %s",
utils.LoaderS, ldr.ldrID, utils.ToJSON(fltrPrf)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, fltrPrf.TenantID())
if err := ldr.dm.SetFilter(fltrPrf, true); err != nil {
return err
}
cacheArgs[utils.CacheFilters] = ids
}
}
case utils.MetaStats:
cacheIDs = []string{utils.CacheStatFilterIndexes}
for _, lDataSet := range lds {
stsModels := make(engine.StatMdls, len(lDataSet))
for i, ld := range lDataSet {
stsModels[i] = new(engine.StatMdl)
if err = utils.UpdateStructWithIfaceMap(stsModels[i], ld); err != nil {
return
}
}
for _, tpSts := range stsModels.AsTPStats() {
stsPrf, err := engine.APItoStats(tpSts, ldr.timezone)
if err != nil {
return err
}
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: StatsQueueProfile: %s",
utils.LoaderS, ldr.ldrID, utils.ToJSON(stsPrf)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, stsPrf.TenantID())
if err := ldr.dm.SetStatQueueProfile(stsPrf, true); err != nil {
return err
}
cacheArgs[utils.CacheStatQueueProfiles] = ids
cacheArgs[utils.CacheStatQueues] = ids
}
}
case utils.MetaTrends:
for _, lDataSet := range lds {
trsModels := make(engine.TrendsMdls, len(lDataSet))
for i, ld := range lDataSet {
trsModels[i] = new(engine.TrendsMdl)
if err = utils.UpdateStructWithIfaceMap(trsModels[i], ld); err != nil {
return
}
}
for _, tpTrs := range trsModels.AsTPTrends() {
trsPrf, err := engine.APItoTrends(tpTrs)
if err != nil {
return err
}
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: TrendProfile: %s",
utils.LoaderS, ldr.ldrID, utils.ToJSON(trsPrf)))
continue
}
ids = append(ids, trsPrf.TenantID())
if err := ldr.dm.SetTrendProfile(trsPrf); err != nil {
return err
}
}
}
case utils.MetaRankings:
cacheIDs = []string{utils.CacheRankingFilterIndexes}
for _, lDataSet := range lds {
rnkModels := make(engine.RankingsMdls, len(lDataSet))
for i, ld := range lDataSet {
rnkModels[i] = new(engine.RankingsMdl)
if err = utils.UpdateStructWithIfaceMap(rnkModels[i], ld); err != nil {
return
}
}
for _, tpRgs := range rnkModels.AsTPRanking() {
rgsPrf, err := engine.APItoRanking(tpRgs)
if err != nil {
return err
}
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: RankingProfile: %s",
utils.LoaderS, ldr.ldrID, utils.ToJSON(rgsPrf)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, rgsPrf.TenantID())
if err := ldr.dm.SetRankingProfile(rgsPrf); err != nil {
return err
}
cacheArgs[utils.CacheRankingFilterIndexes] = ids
}
}
case utils.MetaThresholds:
cacheIDs = []string{utils.CacheThresholdFilterIndexes}
for _, lDataSet := range lds {
thModels := make(engine.ThresholdMdls, len(lDataSet))
for i, ld := range lDataSet {
thModels[i] = new(engine.ThresholdMdl)
if err = utils.UpdateStructWithIfaceMap(thModels[i], ld); err != nil {
return
}
}
for _, tpTh := range thModels.AsTPThreshold() {
thPrf, err := engine.APItoThresholdProfile(tpTh, ldr.timezone)
if err != nil {
return err
}
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ThresholdProfile: %s",
utils.LoaderS, ldr.ldrID, utils.ToJSON(thPrf)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, thPrf.TenantID())
if err := ldr.dm.SetThresholdProfile(thPrf, true); err != nil {
return err
}
cacheArgs[utils.CacheThresholdProfiles] = ids
cacheArgs[utils.CacheThresholds] = ids
}
}
case utils.MetaRoutes:
cacheIDs = []string{utils.CacheRouteFilterIndexes}
for _, lDataSet := range lds {
sppModels := make(engine.RouteMdls, len(lDataSet))
for i, ld := range lDataSet {
sppModels[i] = new(engine.RouteMdl)
if err = utils.UpdateStructWithIfaceMap(sppModels[i], ld); err != nil {
return
}
}
for _, tpSpp := range sppModels.AsTPRouteProfile() {
spPrf, err := engine.APItoRouteProfile(tpSpp, ldr.timezone)
if err != nil {
return err
}
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: RouteProfile: %s",
utils.LoaderS, ldr.ldrID, utils.ToJSON(spPrf)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, spPrf.TenantID())
if err := ldr.dm.SetRouteProfile(spPrf, true); err != nil {
return err
}
cacheArgs[utils.CacheRouteProfiles] = ids
}
}
case utils.MetaChargers:
cacheIDs = []string{utils.CacheChargerFilterIndexes}
for _, lDataSet := range lds {
cppModels := make(engine.ChargerMdls, len(lDataSet))
for i, ld := range lDataSet {
cppModels[i] = new(engine.ChargerMdl)
if err = utils.UpdateStructWithIfaceMap(cppModels[i], ld); err != nil {
return
}
}
for _, tpCPP := range cppModels.AsTPChargers() {
cpp, err := engine.APItoChargerProfile(tpCPP, ldr.timezone)
if err != nil {
return err
}
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ChargerProfile: %s",
utils.LoaderS, ldr.ldrID, utils.ToJSON(cpp)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, cpp.TenantID())
if err := ldr.dm.SetChargerProfile(cpp, true); err != nil {
return err
}
cacheArgs[utils.CacheChargerProfiles] = ids
}
}
case utils.MetaDispatchers:
cacheIDs = []string{utils.CacheDispatcherFilterIndexes}
for _, lDataSet := range lds {
dispModels := make(engine.DispatcherProfileMdls, len(lDataSet))
for i, ld := range lDataSet {
dispModels[i] = new(engine.DispatcherProfileMdl)
if err = utils.UpdateStructWithIfaceMap(dispModels[i], ld); err != nil {
return
}
}
for _, tpDsp := range dispModels.AsTPDispatcherProfiles() {
dsp, err := engine.APItoDispatcherProfile(tpDsp, ldr.timezone)
if err != nil {
return err
}
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherProfile: %s",
utils.LoaderS, ldr.ldrID, utils.ToJSON(dsp)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, dsp.TenantID())
if err := ldr.dm.SetDispatcherProfile(dsp, true); err != nil {
return err
}
cacheArgs[utils.CacheDispatcherProfiles] = ids
}
}
case utils.MetaDispatcherHosts:
for _, lDataSet := range lds {
dispModels := make(engine.DispatcherHostMdls, len(lDataSet))
for i, ld := range lDataSet {
dispModels[i] = new(engine.DispatcherHostMdl)
if err = utils.UpdateStructWithIfaceMap(dispModels[i], ld); err != nil {
return
}
}
tpDsps, err := dispModels.AsTPDispatcherHosts()
if err != nil {
return err
}
for _, tpDsp := range tpDsps {
dsp := engine.APItoDispatcherHost(tpDsp)
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherHost: %s",
utils.LoaderS, ldr.ldrID, utils.ToJSON(dsp)))
continue
}
// get IDs so we can reload in cache
ids = append(ids, dsp.TenantID())
if err := ldr.dm.SetDispatcherHost(dsp); err != nil {
return err
}
cacheArgs[utils.CacheDispatcherHosts] = ids
}
}
}
// delay if needed before cache reload
if ldr.cachingDelay != 0 {
utils.Logger.Info(fmt.Sprintf("<%v> Delaying cache reload for %v", utils.LoaderS, ldr.cachingDelay))
time.Sleep(ldr.cachingDelay)
}
if len(ldr.cacheConns) != 0 {
return engine.CallCache(ldr.connMgr, ldr.cacheConns, caching, cacheArgs, cacheIDs, nil, false, ldr.tenant)
}
return
}
// removeContent will process the content and will remove it from database
func (ldr *Loader) removeContent(loaderType, caching string) (err error) {
// start processing lines
keepLooping := true // controls looping
lineNr := 0
for keepLooping {
lineNr++
var hasErrors bool
lData := make(LoaderData) // one row
for fName, rdr := range ldr.rdrs[loaderType] {
var record []string
if record, err = rdr.csvRdr.Read(); err != nil {
if err == io.EOF {
keepLooping = false
break
}
hasErrors = true
utils.Logger.Warning(
fmt.Sprintf("<%s> <%s> reading line: %d, error: %s",
utils.LoaderS, ldr.ldrID, lineNr, err.Error()))
}
if hasErrors { // if any of the readers will give errors, we ignore the line
continue
}
if err := lData.UpdateFromCSV(fName, record,
ldr.dataTpls[loaderType], ldr.tenant, ldr.filterS); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> <%s> line: %d, error: %s",
utils.LoaderS, ldr.ldrID, lineNr, err.Error()))
hasErrors = true
continue
}
// Record from map
// update dataDB
}
if len(lData) == 0 { // no data, could be the last line in file
continue
}
tntID := lData.TenantID()
if _, has := ldr.bufLoaderData[tntID]; !has &&
len(ldr.bufLoaderData) == 1 { // process previous records before going futher
var prevTntID string
for prevTntID = range ldr.bufLoaderData {
break // have stolen the existing key in buffer
}
if err = ldr.removeLoadedData(loaderType,
map[string][]LoaderData{prevTntID: ldr.bufLoaderData[prevTntID]}, caching); err != nil {
return
}
delete(ldr.bufLoaderData, prevTntID)
}
ldr.bufLoaderData[tntID] = append(ldr.bufLoaderData[tntID], lData)
}
// proceed with last element in bufLoaderData
var tntID string
for tntID = range ldr.bufLoaderData {
break // get the first tenantID
}
if err = ldr.removeLoadedData(loaderType,
map[string][]LoaderData{tntID: ldr.bufLoaderData[tntID]}, caching); err != nil {
return
}
delete(ldr.bufLoaderData, tntID)
return
}
// removeLoadedData will remove the data from database
// since we remove we don't need to compose the struct we only need the Tenant and the ID of the profile
func (ldr *Loader) removeLoadedData(loaderType string, lds map[string][]LoaderData, caching string) (err error) {
var ids []string
cacheArgs := make(map[string][]string)
var cacheIDs []string // verify if we need to clear indexe
switch loaderType {
case utils.MetaAttributes:
cacheIDs = []string{utils.CacheAttributeFilterIndexes}
for tntID := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: AttributeProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveAttributeProfile(tntIDStruct.Tenant, tntIDStruct.ID,
true); err != nil {
return err
}
cacheArgs[utils.CacheAttributeProfiles] = ids
}
}
case utils.MetaResources:
cacheIDs = []string{utils.CacheResourceFilterIndexes}
for tntID := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ResourceProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveResourceProfile(tntIDStruct.Tenant,
tntIDStruct.ID, true); err != nil {
return err
}
cacheArgs[utils.CacheResourceProfiles] = ids
cacheArgs[utils.CacheResources] = ids
}
}
case utils.MetaFilters:
for tntID := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: Filter: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveFilter(tntIDStruct.Tenant, tntIDStruct.ID,
true); err != nil {
return err
}
cacheArgs[utils.CacheFilters] = ids
}
}
case utils.MetaStats:
cacheIDs = []string{utils.CacheStatFilterIndexes}
for tntID := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: StatsQueueProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveStatQueueProfile(tntIDStruct.Tenant,
tntIDStruct.ID, true); err != nil {
return err
}
cacheArgs[utils.CacheStatQueueProfiles] = ids
cacheArgs[utils.CacheStatQueues] = ids
}
}
case utils.MetaRankings:
cacheIDs = []string{utils.CacheRankingFilterIndexes}
for tntID := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: RankingProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveRankingProfile(tntIDStruct.Tenant,
tntIDStruct.ID); err != nil {
return err
}
cacheArgs[utils.CacheRankingProfiles] = ids
}
}
case utils.MetaThresholds:
cacheIDs = []string{utils.CacheThresholdFilterIndexes}
for tntID := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ThresholdProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveThresholdProfile(tntIDStruct.Tenant,
tntIDStruct.ID, true); err != nil {
return err
}
cacheArgs[utils.CacheThresholdProfiles] = ids
cacheArgs[utils.CacheThresholds] = ids
}
}
case utils.MetaRoutes:
cacheIDs = []string{utils.CacheRouteFilterIndexes}
for tntID := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: RouteProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveRouteProfile(tntIDStruct.Tenant,
tntIDStruct.ID, true); err != nil {
return err
}
cacheArgs[utils.CacheRouteProfiles] = ids
}
}
case utils.MetaChargers:
cacheIDs = []string{utils.CacheChargerFilterIndexes}
for tntID := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ChargerProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveChargerProfile(tntIDStruct.Tenant,
tntIDStruct.ID, true); err != nil {
return err
}
cacheArgs[utils.CacheChargerProfiles] = ids
}
}
case utils.MetaDispatchers:
cacheIDs = []string{utils.CacheDispatcherFilterIndexes}
for tntID := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherProfileID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveDispatcherProfile(tntIDStruct.Tenant,
tntIDStruct.ID, true); err != nil {
return err
}
cacheArgs[utils.CacheDispatcherProfiles] = ids
}
}
case utils.MetaDispatcherHosts:
for tntID := range lds {
if ldr.dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherHostID: %s",
utils.LoaderS, ldr.ldrID, tntID))
} else {
tntIDStruct := utils.NewTenantID(tntID)
// get IDs so we can reload in cache
ids = append(ids, tntID)
if err := ldr.dm.RemoveDispatcherHost(tntIDStruct.Tenant,
tntIDStruct.ID); err != nil {
return err
}
cacheArgs[utils.CacheDispatcherHosts] = ids
}
}
}
// delay if needed before cache reload
if ldr.cachingDelay != 0 {
utils.Logger.Info(fmt.Sprintf("<%v> Delaying cache reload for %v", utils.LoaderS, ldr.cachingDelay))
time.Sleep(ldr.cachingDelay)
}
if len(ldr.cacheConns) != 0 {
return engine.CallCache(ldr.connMgr, ldr.cacheConns, caching, cacheArgs, cacheIDs, nil, false, ldr.tenant)
}
return
}
func (ldr *Loader) serve(stopChan chan struct{}) (err error) {
switch ldr.runDelay {
case time.Duration(0): // 0 disables the automatic read, maybe done per API
return
case time.Duration(-1):
return utils.WatchDir(ldr.tpInDir, ldr.processFile,
utils.LoaderS+"-"+ldr.ldrID, stopChan)
default:
go ldr.handleFolder(stopChan)
}
return
}
func (ldr *Loader) handleFolder(stopChan chan struct{}) {
for {
go ldr.ProcessFolder(config.CgrConfig().GeneralCfg().DefaultCaching, utils.MetaStore, false)
timer := time.NewTimer(ldr.runDelay)
select {
case <-stopChan:
utils.Logger.Info(
fmt.Sprintf("<%s-%s> stop monitoring path <%s>",
utils.LoaderS, ldr.ldrID, ldr.tpInDir))
timer.Stop()
return
case <-timer.C:
}
}
}
func (ldr *Loader) processFile(itmID string) (err error) {
loaderType := ldr.getLdrType(itmID)
if len(loaderType) == 0 {
return
}
if err = ldr.lockFolder(); err != nil {
return
}
defer ldr.unlockFolder()
if ldr.rdrs[loaderType][itmID] != nil {
ldr.unreferenceFile(loaderType, itmID)
}
var rdr *os.File
if rdr, err = os.Open(path.Join(ldr.tpInDir, itmID)); err != nil {
return
}
csvReader := csv.NewReader(rdr)
csvReader.Comma = rune(ldr.fieldSep[0])
csvReader.Comment = '#'
ldr.rdrs[loaderType][itmID] = &openedCSVFile{
fileName: itmID, rdr: rdr, csvRdr: csvReader}
if !ldr.allFilesPresent(loaderType) {
return
}
for fName := range ldr.rdrs[loaderType] {
defer ldr.unreferenceFile(loaderType, fName)
}
err = ldr.processContent(loaderType, config.CgrConfig().GeneralCfg().DefaultCaching)
if ldr.tpOutDir == utils.EmptyString {
return
}
for fName := range ldr.rdrs[loaderType] {
oldPath := path.Join(ldr.tpInDir, fName)
newPath := path.Join(ldr.tpOutDir, fName)
if nerr := os.Rename(oldPath, newPath); nerr != nil {
return nerr
}
}
return
}
func (ldr *Loader) allFilesPresent(ldrType string) bool {
for _, rdr := range ldr.rdrs[ldrType] {
if rdr == nil {
return false
}
}
return true
}
// getLdrType returns loaderType for the given fileName
func (ldr *Loader) getLdrType(fName string) (ldrType string) {
for ldr, rdrs := range ldr.rdrs {
if _, has := rdrs[fName]; has {
return ldr
}
}
return
}