Added config infrastructure for the new loaders

This commit is contained in:
Trial97
2021-11-09 16:54:44 +02:00
committed by Dan Christian Bogos
parent dd0f248450
commit 96e50f3962
13 changed files with 601 additions and 257 deletions

View File

@@ -33,12 +33,7 @@ type LoaderSv1 struct {
ping
}
func (ldrSv1 *LoaderSv1) Load(ctx *context.Context, args *loaders.ArgsProcessFolder,
func (ldrSv1 *LoaderSv1) Run(ctx *context.Context, args *loaders.ArgsProcessFolder,
rply *string) error {
return ldrSv1.ldrS.V1Load(ctx, args, rply)
}
func (ldrSv1 *LoaderSv1) Remove(ctx *context.Context, args *loaders.ArgsProcessFolder,
rply *string) error {
return ldrSv1.ldrS.V1Remove(ctx, args, rply)
return ldrSv1.ldrS.V1Run(ctx, args, rply)
}

View File

@@ -42,7 +42,9 @@ var (
getDftKamConnCfg = func() *KamConnCfg { return new(KamConnCfg) } // returns default Kamailio Connection configuration
getDftAstConnCfg = func() *AsteriskConnCfg { return new(AsteriskConnCfg) } // returns default Asterisk Connection configuration
getDftLoaderCfg = func() *LoaderSCfg { return new(LoaderSCfg) }
getDftLoaderCfg = func() *LoaderSCfg {
return &LoaderSCfg{Opts: new(LoaderSOptsCfg), Cache: make(map[string]*CacheParamCfg)}
}
getDftRemHstCfg = func() *RemoteHost { return new(RemoteHost) }
getDftEvExpCfg = func() *EventExporterCfg { return &EventExporterCfg{Opts: &EventExporterOpts{}} }

View File

@@ -1159,17 +1159,35 @@ const CGRATES_CFG_JSON = `
"id": "*default", // identifier of the Loader
"enabled": false, // starts as service: <true|false>.
"tenant": "", // tenant used in filterS.Pass
"dry_run": false, // do not send the CDRs to CDRS, just parse them
"run_delay": "0", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
"lockfile_path": ".cgr.lck", // Filename containing concurrency lock in case of delayed processing
"caches_conns": ["*internal"],
"field_separator": ",", // separator used in case of csv files
"tp_in_dir": "/var/spool/cgrates/loader/in", // absolute path towards the directory where the TPs are stored
"tp_out_dir": "/var/spool/cgrates/loader/out", // absolute path towards the directory where processed TPs will be moved
"action": "*store", // what should the loader do<*store|*parse|*remove|*dryrun>
"opts": {
// "*cache": "*reload",
"*withIndex": true,
// "*forceLock": false,
// "*stopOnError": false,
},
"cache":{
"*filters":{"limit": -1, "ttl": "5s", "static_ttl": false},
"*attributes":{"limit": -1, "ttl": "5s", "static_ttl": false},
"*resources":{"limit": -1, "ttl": "5s", "static_ttl": false},
"*stats":{"limit": -1, "ttl": "5s", "static_ttl": false},
"*thresholds":{"limit": -1, "ttl": "5s", "static_ttl": false},
"*routes":{"limit": -1, "ttl": "5s", "static_ttl": false},
"*chargers":{"limit": -1, "ttl": "5s", "static_ttl": false},
"*dispatchers":{"limit": -1, "ttl": "5s", "static_ttl": false},
"*dispatcher_hosts":{"limit": -1, "ttl": "5s", "static_ttl": false},
"*rate_profiles":{"limit": -1, "ttl": "5s", "static_ttl": false},
"*action_profiles":{"limit": -1, "ttl": "5s", "static_ttl": false},
"*accounts":{"limit": -1, "ttl": "5s", "static_ttl": false},
},
"data":[ // data profiles to load
{
"id": "filters1",
"flags": ["*partial"],
"type": "*filters", // data source type
"file_name": "Filters.csv", // file name in the tp_in_dir
"fields": [

View File

@@ -1127,13 +1127,14 @@ func TestDfLoaderJsonCfg(t *testing.T) {
ID: utils.StringPointer(utils.MetaDefault),
Enabled: utils.BoolPointer(false),
Tenant: utils.StringPointer(""),
Dry_run: utils.BoolPointer(false),
Run_delay: utils.StringPointer("0"),
Lockfile_path: utils.StringPointer(".cgr.lck"),
Caches_conns: &[]string{utils.MetaInternal},
Field_separator: utils.StringPointer(","),
Tp_in_dir: utils.StringPointer("/var/spool/cgrates/loader/in"),
Tp_out_dir: utils.StringPointer("/var/spool/cgrates/loader/out"),
Action: utils.StringPointer(utils.MetaStore),
Opts: &LoaderJsonOptsCfg{WithIndex: utils.BoolPointer(true)},
Data: &[]*LoaderJsonDataType{
{
Type: utils.StringPointer(utils.MetaFilters),
@@ -1792,6 +1793,20 @@ func TestDfLoaderJsonCfg(t *testing.T) {
},
},
},
Cache: map[string]*CacheParamJsonCfg{
utils.MetaFilters: {Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("5s"), Static_ttl: utils.BoolPointer(false)},
utils.MetaAttributes: {Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("5s"), Static_ttl: utils.BoolPointer(false)},
utils.MetaResources: {Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("5s"), Static_ttl: utils.BoolPointer(false)},
utils.MetaStats: {Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("5s"), Static_ttl: utils.BoolPointer(false)},
utils.MetaThresholds: {Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("5s"), Static_ttl: utils.BoolPointer(false)},
utils.MetaRoutes: {Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("5s"), Static_ttl: utils.BoolPointer(false)},
utils.MetaChargers: {Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("5s"), Static_ttl: utils.BoolPointer(false)},
utils.MetaDispatchers: {Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("5s"), Static_ttl: utils.BoolPointer(false)},
utils.MetaDispatcherHosts: {Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("5s"), Static_ttl: utils.BoolPointer(false)},
utils.MetaRateProfiles: {Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("5s"), Static_ttl: utils.BoolPointer(false)},
utils.MetaActionProfiles: {Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("5s"), Static_ttl: utils.BoolPointer(false)},
utils.MetaAccounts: {Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("5s"), Static_ttl: utils.BoolPointer(false)},
},
},
}
dfCgrJSONCfg, err := NewCgrJsonCfgFromBytes([]byte(CGRATES_CFG_JSON))

File diff suppressed because one or more lines are too long

View File

@@ -92,12 +92,18 @@ func (ldrs LoaderSCfgs) Clone() *LoaderSCfgs {
return &cln
}
type LoaderSOptsCfg struct {
Cache string
WithIndex bool
ForceLock bool
StopOnError bool
}
// LoaderSCfg the config for a loader
type LoaderSCfg struct {
ID string
Enabled bool
Tenant string
DryRun bool
RunDelay time.Duration
LockFilePath string
CacheSConns []string
@@ -106,9 +112,9 @@ type LoaderSCfg struct {
TpOutDir string
Data []*LoaderDataType
Action string // toDO
Caching string // toDO
WithIndex bool // toDO
Action string
Opts *LoaderSOptsCfg
Cache map[string]*CacheParamCfg
}
// LoaderDataType the template for profile loading
@@ -120,6 +126,23 @@ type LoaderDataType struct {
Fields []*FCTemplate
}
func (l *LoaderSOptsCfg) loadFromJSONCfg(jsnCfg *LoaderJsonOptsCfg) {
if jsnCfg == nil {
return
}
if jsnCfg.Cache != nil {
l.Cache = *jsnCfg.Cache
}
if jsnCfg.WithIndex != nil {
l.WithIndex = *jsnCfg.WithIndex
}
if jsnCfg.ForceLock != nil {
l.ForceLock = *jsnCfg.ForceLock
}
if jsnCfg.StopOnError != nil {
l.StopOnError = *jsnCfg.StopOnError
}
}
func (lData *LoaderDataType) loadFromJSONCfg(jsnCfg *LoaderJsonDataType, msgTemplates map[string][]*FCTemplate, separator string) (err error) {
if jsnCfg == nil {
return nil
@@ -153,7 +176,6 @@ func (l *LoaderSCfg) loadFromJSONCfg(jsnCfg *LoaderJsonCfg, msgTemplates map[str
if jsnCfg == nil {
return nil
}
l.WithIndex = true
if jsnCfg.ID != nil {
l.ID = *jsnCfg.ID
}
@@ -163,9 +185,6 @@ func (l *LoaderSCfg) loadFromJSONCfg(jsnCfg *LoaderJsonCfg, msgTemplates map[str
if jsnCfg.Tenant != nil {
l.Tenant = *jsnCfg.Tenant
}
if jsnCfg.Dry_run != nil {
l.DryRun = *jsnCfg.Dry_run
}
if jsnCfg.Run_delay != nil {
if l.RunDelay, err = utils.ParseDurationWithNanosecs(*jsnCfg.Run_delay); err != nil {
return
@@ -215,7 +234,17 @@ func (l *LoaderSCfg) loadFromJSONCfg(jsnCfg *LoaderJsonCfg, msgTemplates map[str
}
}
}
if jsnCfg.Action != nil {
l.Action = *jsnCfg.Action
}
for kJsn, vJsn := range jsnCfg.Cache {
val := new(CacheParamCfg)
if err := val.loadFromJSONCfg(vJsn); err != nil {
return err
}
l.Cache[kJsn] = val
}
l.Opts.loadFromJSONCfg(jsnCfg.Opts)
return nil
}
@@ -251,7 +280,6 @@ func (l LoaderSCfg) Clone() (cln *LoaderSCfg) {
ID: l.ID,
Enabled: l.Enabled,
Tenant: l.Tenant,
DryRun: l.DryRun,
RunDelay: l.RunDelay,
LockFilePath: l.LockFilePath,
CacheSConns: utils.CloneStringSlice(l.CacheSConns),
@@ -259,10 +287,16 @@ func (l LoaderSCfg) Clone() (cln *LoaderSCfg) {
TpInDir: l.TpInDir,
TpOutDir: l.TpOutDir,
Data: make([]*LoaderDataType, len(l.Data)),
Action: l.Action,
Opts: &(*l.Opts),
Cache: make(map[string]*CacheParamCfg),
}
for idx, fld := range l.Data {
cln.Data[idx] = fld.Clone()
}
for key, value := range l.Cache {
cln.Cache[key] = value.Clone()
}
return
}
@@ -283,30 +317,43 @@ func (lData LoaderDataType) AsMapInterface(separator string) (initialMP map[stri
}
// AsMapInterface returns the config as a map[string]interface{}
func (l LoaderSCfg) AsMapInterface(separator string) (initialMP map[string]interface{}) {
initialMP = map[string]interface{}{
func (l LoaderSCfg) AsMapInterface(separator string) (mp map[string]interface{}) {
mp = map[string]interface{}{
utils.IDCfg: l.ID,
utils.TenantCfg: l.Tenant,
utils.EnabledCfg: l.Enabled,
utils.DryRunCfg: l.DryRun,
utils.LockFilePathCfg: l.LockFilePath,
utils.FieldSepCfg: l.FieldSeparator,
utils.TpInDirCfg: l.TpInDir,
utils.TpOutDirCfg: l.TpOutDir,
utils.RunDelayCfg: "0",
utils.ActionCfg: l.Action,
utils.OptsCfg: map[string]interface{}{
utils.MetaCache: l.Opts.Cache,
utils.MetaWithIndex: l.Opts.WithIndex,
utils.MetaForceLock: l.Opts.ForceLock,
utils.MetaStopOnError: l.Opts.StopOnError,
},
}
if l.Data != nil {
data := make([]map[string]interface{}, len(l.Data))
for i, item := range l.Data {
data[i] = item.AsMapInterface(separator)
}
initialMP[utils.DataCfg] = data
mp[utils.DataCfg] = data
}
if l.RunDelay != 0 {
initialMP[utils.RunDelayCfg] = l.RunDelay.String()
mp[utils.RunDelayCfg] = l.RunDelay.String()
}
if l.CacheSConns != nil {
initialMP[utils.CachesConnsCfg] = getInternalJSONConns(l.CacheSConns)
mp[utils.CachesConnsCfg] = getInternalJSONConns(l.CacheSConns)
}
if l.Cache != nil {
cache := make(map[string]interface{}, len(l.Cache))
for key, value := range l.Cache {
cache[key] = value.AsMapInterface()
}
mp[utils.CacheCfg] = cache
}
return
}
@@ -319,11 +366,17 @@ type LoaderJsonDataType struct {
Fields *[]*FcTemplateJsonCfg
}
type LoaderJsonOptsCfg struct {
Cache *string `json:"*cache"`
WithIndex *bool `json:"*withIndex"`
ForceLock *bool `json:"*forceLock"`
StopOnError *bool `json:"*stopOnError"`
}
type LoaderJsonCfg struct {
ID *string
Enabled *bool
Tenant *string
Dry_run *bool
Run_delay *string
Lockfile_path *string
Caches_conns *[]string
@@ -331,6 +384,10 @@ type LoaderJsonCfg struct {
Tp_in_dir *string
Tp_out_dir *string
Data *[]*LoaderJsonDataType
Action *string
Opts *LoaderJsonOptsCfg
Cache map[string]*CacheParamJsonCfg
}
func equalsLoaderDatasType(v1, v2 []*LoaderDataType) bool {
@@ -349,6 +406,22 @@ func equalsLoaderDatasType(v1, v2 []*LoaderDataType) bool {
return true
}
func diffLoaderJsonOptsCfg(v1, v2 *LoaderSOptsCfg) (d *LoaderJsonOptsCfg) {
d = new(LoaderJsonOptsCfg)
if v1.Cache != v2.Cache {
d.Cache = utils.StringPointer(v2.Cache)
}
if v1.WithIndex != v2.WithIndex {
d.WithIndex = utils.BoolPointer(v2.WithIndex)
}
if v1.ForceLock != v2.ForceLock {
d.ForceLock = utils.BoolPointer(v2.ForceLock)
}
if v1.StopOnError != v2.StopOnError {
d.StopOnError = utils.BoolPointer(v2.StopOnError)
}
return
}
func diffLoaderJsonCfg(v1, v2 *LoaderSCfg, separator string) (d *LoaderJsonCfg) {
d = new(LoaderJsonCfg)
if v1.ID != v2.ID {
@@ -357,13 +430,8 @@ func diffLoaderJsonCfg(v1, v2 *LoaderSCfg, separator string) (d *LoaderJsonCfg)
if v1.Enabled != v2.Enabled {
d.Enabled = utils.BoolPointer(v2.Enabled)
}
tnt1 := v1.Tenant
tnt2 := v2.Tenant
if tnt1 != tnt2 {
d.Tenant = utils.StringPointer(tnt2)
}
if v1.DryRun != v2.DryRun {
d.Dry_run = utils.BoolPointer(v2.DryRun)
if v1.Tenant != v2.Tenant {
d.Tenant = utils.StringPointer(v2.Tenant)
}
if v1.RunDelay != v2.RunDelay {
d.Run_delay = utils.StringPointer(v2.RunDelay.String())
@@ -398,6 +466,11 @@ func diffLoaderJsonCfg(v1, v2 *LoaderSCfg, separator string) (d *LoaderJsonCfg)
}
d.Data = &data
}
if v1.Action != v2.Action {
d.Action = utils.StringPointer(v2.Action)
}
d.Opts = diffLoaderJsonOptsCfg(v1.Opts, v2.Opts)
d.Cache = diffCacheParamsJsonCfg(d.Cache, v2.Cache)
return
}
@@ -409,14 +482,18 @@ func equalsLoadersJsonCfg(v1, v2 LoaderSCfgs) bool {
if v1[i].ID != v2[i].ID ||
v1[i].Enabled != v2[i].Enabled ||
v1[i].Tenant != v2[i].Tenant ||
v1[i].DryRun != v2[i].DryRun ||
v1[i].RunDelay != v2[i].RunDelay ||
v1[i].LockFilePath != v2[i].LockFilePath ||
!utils.SliceStringEqual(v1[i].CacheSConns, v2[i].CacheSConns) ||
v1[i].FieldSeparator != v2[i].FieldSeparator ||
v1[i].TpInDir != v2[i].TpInDir ||
v1[i].TpOutDir != v2[i].TpOutDir ||
!equalsLoaderDatasType(v1[i].Data, v2[i].Data) {
v1[i].Action != v2[i].Action ||
!equalsLoaderDatasType(v1[i].Data, v2[i].Data) ||
v1[i].Opts.Cache != v2[i].Opts.Cache ||
v1[i].Opts.WithIndex != v2[i].Opts.WithIndex ||
v1[i].Opts.ForceLock != v2[i].Opts.ForceLock ||
v1[i].Opts.StopOnError != v2[i].Opts.StopOnError {
return false
}
}

View File

@@ -67,6 +67,22 @@ func TestLoaderSCfgloadFromJsonCfgCase1(t *testing.T) {
FieldSeparator: ",",
TpInDir: "/var/spool/cgrates/loader/in",
TpOutDir: "/var/spool/cgrates/loader/out",
Action: utils.MetaStore,
Opts: &LoaderSOptsCfg{WithIndex: true},
Cache: map[string]*CacheParamCfg{
utils.MetaFilters: {Limit: -1, TTL: 5 * time.Second},
utils.MetaAttributes: {Limit: -1, TTL: 5 * time.Second},
utils.MetaResources: {Limit: -1, TTL: 5 * time.Second},
utils.MetaStats: {Limit: -1, TTL: 5 * time.Second},
utils.MetaThresholds: {Limit: -1, TTL: 5 * time.Second},
utils.MetaRoutes: {Limit: -1, TTL: 5 * time.Second},
utils.MetaChargers: {Limit: -1, TTL: 5 * time.Second},
utils.MetaDispatchers: {Limit: -1, TTL: 5 * time.Second},
utils.MetaDispatcherHosts: {Limit: -1, TTL: 5 * time.Second},
utils.MetaRateProfiles: {Limit: -1, TTL: 5 * time.Second},
utils.MetaActionProfiles: {Limit: -1, TTL: 5 * time.Second},
utils.MetaAccounts: {Limit: -1, TTL: 5 * time.Second},
},
Data: []*LoaderDataType{
{
Type: "*filters",
@@ -1517,7 +1533,6 @@ func TestLoaderCfgAsMapInterfaceCase1(t *testing.T) {
"id": "*default",
"enabled": false,
"tenant": "cgrates.org",
"dry_run": false,
"run_delay": "0",
"lockfile_path": ".cgr.lck",
"caches_conns": ["*internal:*caches"],
@@ -1543,13 +1558,19 @@ func TestLoaderCfgAsMapInterfaceCase1(t *testing.T) {
utils.IDCfg: "*default",
utils.EnabledCfg: false,
utils.TenantCfg: "cgrates.org",
utils.DryRunCfg: false,
utils.RunDelayCfg: "0",
utils.LockFilePathCfg: ".cgr.lck",
utils.CachesConnsCfg: []string{utils.MetaInternal},
utils.FieldSepCfg: ",",
utils.TpInDirCfg: "/var/spool/cgrates/loader/in",
utils.TpOutDirCfg: "/var/spool/cgrates/loader/out",
utils.ActionCfg: utils.MetaStore,
utils.OptsCfg: map[string]interface{}{
utils.MetaCache: "",
utils.MetaStopOnError: false,
utils.MetaForceLock: false,
utils.MetaWithIndex: true,
},
utils.DataCfg: []map[string]interface{}{
{
utils.TypeCfg: "*filters",
@@ -2445,6 +2466,20 @@ func TestLoaderCfgAsMapInterfaceCase1(t *testing.T) {
},
},
},
utils.CacheCfg: map[string]interface{}{
utils.MetaFilters: map[string]interface{}{utils.LimitCfg: -1, utils.TTLCfg: "5s", utils.PrecacheCfg: false, utils.ReplicateCfg: false, utils.StaticTTLCfg: false},
utils.MetaAttributes: map[string]interface{}{utils.LimitCfg: -1, utils.TTLCfg: "5s", utils.PrecacheCfg: false, utils.ReplicateCfg: false, utils.StaticTTLCfg: false},
utils.MetaResources: map[string]interface{}{utils.LimitCfg: -1, utils.TTLCfg: "5s", utils.PrecacheCfg: false, utils.ReplicateCfg: false, utils.StaticTTLCfg: false},
utils.MetaStats: map[string]interface{}{utils.LimitCfg: -1, utils.TTLCfg: "5s", utils.PrecacheCfg: false, utils.ReplicateCfg: false, utils.StaticTTLCfg: false},
utils.MetaThresholds: map[string]interface{}{utils.LimitCfg: -1, utils.TTLCfg: "5s", utils.PrecacheCfg: false, utils.ReplicateCfg: false, utils.StaticTTLCfg: false},
utils.MetaRoutes: map[string]interface{}{utils.LimitCfg: -1, utils.TTLCfg: "5s", utils.PrecacheCfg: false, utils.ReplicateCfg: false, utils.StaticTTLCfg: false},
utils.MetaChargers: map[string]interface{}{utils.LimitCfg: -1, utils.TTLCfg: "5s", utils.PrecacheCfg: false, utils.ReplicateCfg: false, utils.StaticTTLCfg: false},
utils.MetaDispatchers: map[string]interface{}{utils.LimitCfg: -1, utils.TTLCfg: "5s", utils.PrecacheCfg: false, utils.ReplicateCfg: false, utils.StaticTTLCfg: false},
utils.MetaDispatcherHosts: map[string]interface{}{utils.LimitCfg: -1, utils.TTLCfg: "5s", utils.PrecacheCfg: false, utils.ReplicateCfg: false, utils.StaticTTLCfg: false},
utils.MetaRateProfiles: map[string]interface{}{utils.LimitCfg: -1, utils.TTLCfg: "5s", utils.PrecacheCfg: false, utils.ReplicateCfg: false, utils.StaticTTLCfg: false},
utils.MetaActionProfiles: map[string]interface{}{utils.LimitCfg: -1, utils.TTLCfg: "5s", utils.PrecacheCfg: false, utils.ReplicateCfg: false, utils.StaticTTLCfg: false},
utils.MetaAccounts: map[string]interface{}{utils.LimitCfg: -1, utils.TTLCfg: "5s", utils.PrecacheCfg: false, utils.ReplicateCfg: false, utils.StaticTTLCfg: false},
},
},
}
if cfgCgr, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil {
@@ -2466,7 +2501,6 @@ func TestLoaderCfgAsMapInterfaceCase2(t *testing.T) {
"id": "*default",
"enabled": false,
"tenant": "~*req.Destination1",
"dry_run": false,
"run_delay": "1",
"lockfile_path": ".cgr.lck",
"caches_conns": ["*conn1"],
@@ -2491,7 +2525,6 @@ func TestLoaderCfgAsMapInterfaceCase2(t *testing.T) {
utils.IDCfg: "*default",
utils.EnabledCfg: false,
utils.TenantCfg: "~*req.Destination1",
utils.DryRunCfg: false,
utils.RunDelayCfg: "0",
utils.LockFilePathCfg: ".cgr.lck",
utils.CachesConnsCfg: []string{"*conn1"},
@@ -2554,6 +2587,21 @@ func TestLoaderSCfgsClone(t *testing.T) {
},
}},
},
Opts: &LoaderSOptsCfg{},
Cache: map[string]*CacheParamCfg{
utils.MetaFilters: {Limit: -1, TTL: 5 * time.Second},
utils.MetaAttributes: {Limit: -1, TTL: 5 * time.Second},
utils.MetaResources: {Limit: -1, TTL: 5 * time.Second},
utils.MetaStats: {Limit: -1, TTL: 5 * time.Second},
utils.MetaThresholds: {Limit: -1, TTL: 5 * time.Second},
utils.MetaRoutes: {Limit: -1, TTL: 5 * time.Second},
utils.MetaChargers: {Limit: -1, TTL: 5 * time.Second},
utils.MetaDispatchers: {Limit: -1, TTL: 5 * time.Second},
utils.MetaDispatcherHosts: {Limit: -1, TTL: 5 * time.Second},
utils.MetaRateProfiles: {Limit: -1, TTL: 5 * time.Second},
utils.MetaActionProfiles: {Limit: -1, TTL: 5 * time.Second},
utils.MetaAccounts: {Limit: -1, TTL: 5 * time.Second},
},
}}
rcv := ban.Clone()
if !reflect.DeepEqual(ban, *rcv) {
@@ -2625,7 +2673,6 @@ func TestDiffLoaderJsonCfg(t *testing.T) {
ID: "LoaderID",
Enabled: true,
Tenant: "cgrates.org",
DryRun: false,
RunDelay: 1 * time.Millisecond,
LockFilePath: "lockFileName",
CacheSConns: []string{"*localhost"},
@@ -2633,13 +2680,13 @@ func TestDiffLoaderJsonCfg(t *testing.T) {
TpInDir: "/tp/in/dir",
TpOutDir: "/tp/out/dir",
Data: nil,
Opts: &LoaderSOptsCfg{},
}
v2 := &LoaderSCfg{
ID: "LoaderID2",
Enabled: false,
Tenant: "itsyscom.com",
DryRun: true,
RunDelay: 2 * time.Millisecond,
LockFilePath: "lockFileName2",
CacheSConns: []string{"*birpc"},
@@ -2663,13 +2710,13 @@ func TestDiffLoaderJsonCfg(t *testing.T) {
},
},
},
Opts: &LoaderSOptsCfg{},
}
expected := &LoaderJsonCfg{
ID: utils.StringPointer("LoaderID2"),
Enabled: utils.BoolPointer(false),
Tenant: utils.StringPointer("itsyscom.com"),
Dry_run: utils.BoolPointer(true),
Run_delay: utils.StringPointer("2ms"),
Lockfile_path: utils.StringPointer("lockFileName2"),
Caches_conns: &[]string{"*birpc"},
@@ -2678,6 +2725,7 @@ func TestDiffLoaderJsonCfg(t *testing.T) {
Tp_out_dir: utils.StringPointer("/tp/out/dir/2"),
Data: &[]*LoaderJsonDataType{
{
Id: utils.StringPointer(""),
Type: utils.StringPointer("*xml"),
File_name: utils.StringPointer("file.xml"),
Flags: &[]string{"FLAG_2:PARAM_2:param2"},
@@ -2690,6 +2738,8 @@ func TestDiffLoaderJsonCfg(t *testing.T) {
},
},
},
Opts: &LoaderJsonOptsCfg{},
Cache: map[string]*CacheParamJsonCfg{},
}
rcv := diffLoaderJsonCfg(v1, v2, ";")
@@ -2698,7 +2748,7 @@ func TestDiffLoaderJsonCfg(t *testing.T) {
}
v1 = v2
expected = &LoaderJsonCfg{}
expected = &LoaderJsonCfg{Opts: &LoaderJsonOptsCfg{}, Cache: make(map[string]*CacheParamJsonCfg)}
rcv = diffLoaderJsonCfg(v1, v2, ";")
if !reflect.DeepEqual(rcv, expected) {
t.Errorf("Expected %v \n but received \n %v", utils.ToJSON(expected), utils.ToJSON(rcv))
@@ -2712,7 +2762,6 @@ func TestEqualsLoadersJsonCfg(t *testing.T) {
ID: "LoaderID",
Enabled: true,
Tenant: "cgrates.org",
DryRun: false,
RunDelay: 1 * time.Millisecond,
LockFilePath: "lockFileName",
CacheSConns: []string{"*localhost"},
@@ -2720,6 +2769,7 @@ func TestEqualsLoadersJsonCfg(t *testing.T) {
TpInDir: "/tp/in/dir",
TpOutDir: "/tp/out/dir",
Data: nil,
Opts: &LoaderSOptsCfg{},
},
}
@@ -2728,7 +2778,6 @@ func TestEqualsLoadersJsonCfg(t *testing.T) {
ID: "LoaderID2",
Enabled: false,
Tenant: "cgrates.org",
DryRun: true,
RunDelay: 2 * time.Millisecond,
LockFilePath: "lockFileName2",
CacheSConns: []string{"*birpc"},
@@ -2752,6 +2801,7 @@ func TestEqualsLoadersJsonCfg(t *testing.T) {
},
},
},
Opts: &LoaderSOptsCfg{},
},
}
@@ -2778,7 +2828,6 @@ func TestDiffLoadersJsonCfg(t *testing.T) {
ID: "LoaderID",
Enabled: false,
Tenant: "cgrates.org",
DryRun: false,
RunDelay: 1 * time.Millisecond,
LockFilePath: "lockFileName",
CacheSConns: []string{"*localhost"},
@@ -2786,6 +2835,7 @@ func TestDiffLoadersJsonCfg(t *testing.T) {
TpInDir: "/tp/in/dir",
TpOutDir: "/tp/out/dir",
Data: nil,
Opts: &LoaderSOptsCfg{},
},
}
@@ -2794,7 +2844,6 @@ func TestDiffLoadersJsonCfg(t *testing.T) {
ID: "LoaderID2",
Enabled: true,
Tenant: "itsyscom.com",
DryRun: true,
RunDelay: 2 * time.Millisecond,
LockFilePath: "lockFileName2",
CacheSConns: []string{"*birpc"},
@@ -2818,6 +2867,7 @@ func TestDiffLoadersJsonCfg(t *testing.T) {
},
},
},
Opts: &LoaderSOptsCfg{},
},
}
@@ -2826,7 +2876,6 @@ func TestDiffLoadersJsonCfg(t *testing.T) {
ID: utils.StringPointer("LoaderID2"),
Enabled: utils.BoolPointer(true),
Tenant: utils.StringPointer("itsyscom.com"),
Dry_run: utils.BoolPointer(true),
Run_delay: utils.StringPointer("2ms"),
Lockfile_path: utils.StringPointer("lockFileName2"),
Caches_conns: &[]string{"*birpc"},
@@ -2835,6 +2884,7 @@ func TestDiffLoadersJsonCfg(t *testing.T) {
Tp_out_dir: utils.StringPointer("/tp/out/dir/2"),
Data: &[]*LoaderJsonDataType{
{
Id: utils.StringPointer(""),
Type: utils.StringPointer("*xml"),
File_name: utils.StringPointer("file.xml"),
Flags: &[]string{"FLAG_2:PARAM_2:param2"},
@@ -2847,6 +2897,9 @@ func TestDiffLoadersJsonCfg(t *testing.T) {
},
},
},
Action: utils.StringPointer(""),
Opts: &LoaderJsonOptsCfg{WithIndex: utils.BoolPointer(false)},
Cache: map[string]*CacheParamJsonCfg{},
},
}
@@ -2874,7 +2927,6 @@ func TestLockFolderRelativePath(t *testing.T) {
ID: utils.StringPointer("loaderid"),
Enabled: utils.BoolPointer(true),
Tenant: utils.StringPointer("cgrates.org"),
Dry_run: utils.BoolPointer(false),
Lockfile_path: utils.StringPointer(utils.ResourcesCsv),
Field_separator: utils.StringPointer(utils.InfieldSep),
Tp_in_dir: utils.StringPointer("/var/spool/cgrates/loader/in/"),
@@ -2898,7 +2950,6 @@ func TestLockFolderNonRelativePath(t *testing.T) {
ID: utils.StringPointer("loaderid"),
Enabled: utils.BoolPointer(true),
Tenant: utils.StringPointer("cgrates.org"),
Dry_run: utils.BoolPointer(false),
Lockfile_path: utils.StringPointer(path.Join("/tmp/", utils.ResourcesCsv)),
Field_separator: utils.StringPointer(utils.InfieldSep),
Tp_in_dir: utils.StringPointer("/var/spool/cgrates/loader/in/"),
@@ -2921,7 +2972,6 @@ func TestLockFolderIsDir(t *testing.T) {
ID: utils.StringPointer("loaderid"),
Enabled: utils.BoolPointer(true),
Tenant: utils.StringPointer("cgrates.org"),
Dry_run: utils.BoolPointer(false),
Lockfile_path: utils.StringPointer("/tmp"),
Field_separator: utils.StringPointer(utils.InfieldSep),
Tp_in_dir: utils.StringPointer("/var/spool/cgrates/loader/in/"),
@@ -2942,7 +2992,6 @@ func TestLoaderSCloneSection(t *testing.T) {
ID: "LoaderID",
Enabled: false,
Tenant: "cgrates.org",
DryRun: false,
RunDelay: 1 * time.Millisecond,
LockFilePath: "lockFileName",
CacheSConns: []string{"*localhost"},
@@ -2965,6 +3014,7 @@ func TestLoaderSCloneSection(t *testing.T) {
},
},
},
Opts: &LoaderSOptsCfg{},
},
}
@@ -2973,7 +3023,6 @@ func TestLoaderSCloneSection(t *testing.T) {
ID: "LoaderID",
Enabled: false,
Tenant: "cgrates.org",
DryRun: false,
RunDelay: 1 * time.Millisecond,
LockFilePath: "lockFileName",
CacheSConns: []string{"*localhost"},
@@ -2996,6 +3045,8 @@ func TestLoaderSCloneSection(t *testing.T) {
},
},
},
Opts: &LoaderSOptsCfg{},
Cache: make(map[string]*CacheParamCfg),
},
}

View File

@@ -37,40 +37,7 @@ const (
gprefix = utils.MetaGoogleAPI + utils.ConcatenatedKeySep
)
func removeFromDB(ctx *context.Context, dm *engine.DataManager, lType, tnt, id, ldrID string, dryRun, withIndex, ratesPartial bool, ratesData utils.MapStorage) (_ error) {
if dryRun {
var logID string
switch lType {
case utils.MetaAttributes:
logID = "AttributeProfile"
case utils.MetaResources:
logID = "ResourceProfile"
case utils.MetaFilters:
logID = "Filter"
case utils.MetaStats:
logID = "StatsQueueProfile"
case utils.MetaThresholds:
logID = "ThresholdProfile"
case utils.MetaRoutes:
logID = "RouteProfile"
case utils.MetaChargers:
logID = "ChargerProfile"
case utils.MetaDispatchers:
logID = "DispatcherProfile"
case utils.MetaDispatcherHosts:
logID = "DispatcherHost"
case utils.MetaRateProfiles:
logID = "RateProfile"
case utils.MetaActionProfiles:
logID = "ActionProfil"
case utils.MetaAccounts:
logID = "Account"
}
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: %sID: %s",
utils.LoaderS, ldrID, logID, utils.ConcatenatedKey(tnt, id)))
return
}
func removeFromDB(ctx *context.Context, dm *engine.DataManager, lType, tnt, id string, withIndex, ratesPartial bool, ratesData utils.MapStorage) (_ error) {
switch lType {
case utils.MetaAttributes:
return dm.RemoveAttributeProfile(ctx, tnt, id, withIndex)
@@ -107,7 +74,7 @@ func removeFromDB(ctx *context.Context, dm *engine.DataManager, lType, tnt, id,
return
}
func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID string, lDataSet []utils.MapStorage, dryRun, withIndex, ratesPartial bool) (err error) {
func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz string, lDataSet []utils.MapStorage, withIndex, ratesPartial bool) (err error) {
switch lType {
case utils.MetaAttributes:
attrModels := make(engine.AttributeMdls, len(lDataSet))
@@ -122,12 +89,6 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
if apf, err = engine.APItoAttributeProfile(tpApf, tmz); err != nil {
return
}
if dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: AttributeProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(apf)))
continue
}
if err = dm.SetAttributeProfile(ctx, apf, withIndex); err != nil {
return
}
@@ -145,12 +106,6 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
if res, err = engine.APItoResource(tpRes, tmz); err != nil {
return
}
if dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ResourceProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(res)))
continue
}
if err = dm.SetResourceProfile(ctx, res, withIndex); err != nil {
return
}
@@ -169,12 +124,6 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
if fltrPrf, err = engine.APItoFilter(tpFltr, tmz); err != nil {
return
}
if dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: Filter: %s",
utils.LoaderS, ldrID, utils.ToJSON(fltrPrf)))
continue
}
if err = dm.SetFilter(ctx, fltrPrf, withIndex); err != nil {
return
}
@@ -192,12 +141,6 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
if stsPrf, err = engine.APItoStats(tpSts, tmz); err != nil {
return
}
if dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: StatsQueueProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(stsPrf)))
continue
}
if err = dm.SetStatQueueProfile(ctx, stsPrf, withIndex); err != nil {
return
}
@@ -215,12 +158,6 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
if thPrf, err = engine.APItoThresholdProfile(tpTh, tmz); err != nil {
return
}
if dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ThresholdProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(thPrf)))
continue
}
if err = dm.SetThresholdProfile(ctx, thPrf, withIndex); err != nil {
return
}
@@ -239,12 +176,6 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
if spPrf, err = engine.APItoRouteProfile(tpSpp, tmz); err != nil {
return
}
if dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: RouteProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(spPrf)))
continue
}
if err = dm.SetRouteProfile(ctx, spPrf, withIndex); err != nil {
return
}
@@ -263,12 +194,6 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
if cpp, err = engine.APItoChargerProfile(tpCPP, tmz); err != nil {
return
}
if dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ChargerProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(cpp)))
continue
}
if err = dm.SetChargerProfile(ctx, cpp, withIndex); err != nil {
return
}
@@ -286,12 +211,6 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
if dsp, err = engine.APItoDispatcherProfile(tpDsp, tmz); err != nil {
return
}
if dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(dsp)))
continue
}
if err = dm.SetDispatcherProfile(ctx, dsp, withIndex); err != nil {
return
}
@@ -310,12 +229,6 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
}
for _, tpDsp := range tpDsps {
dsp := engine.APItoDispatcherHost(tpDsp)
if dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherHost: %s",
utils.LoaderS, ldrID, utils.ToJSON(dsp)))
continue
}
if err = dm.SetDispatcherHost(ctx, dsp); err != nil {
return
}
@@ -333,12 +246,6 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
if rpl, err = engine.APItoRateProfile(tpRpl, tmz); err != nil {
return
}
if dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: RateProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(rpl)))
continue
}
if ratesPartial {
err = dm.SetRateProfileRates(ctx, rpl, true)
} else {
@@ -362,12 +269,6 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
if acp, err = engine.APItoActionProfile(tpAcp, tmz); err != nil {
return
}
if dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ActionProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(acp)))
continue
}
if err = dm.SetActionProfile(ctx, acp, true); err != nil {
return
}
@@ -389,12 +290,6 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
if acp, err = engine.APItoAccount(tpAcp, tmz); err != nil {
return
}
if dryRun {
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: Accounts: %s",
utils.LoaderS, ldrID, utils.ToJSON(acp)))
continue
}
if err = dm.SetAccount(ctx, acp, true); err != nil {
return
}
@@ -403,6 +298,226 @@ func setToDB(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID str
return
}
func dryRun(ctx *context.Context, dm *engine.DataManager, lType, tmz, ldrID string, lDataSet []utils.MapStorage) (err error) {
switch lType {
case utils.MetaAttributes:
attrModels := make(engine.AttributeMdls, len(lDataSet))
for i, ld := range lDataSet {
attrModels[i] = new(engine.AttributeMdl)
if err = utils.UpdateStructWithIfaceMap(attrModels[i], ld); err != nil {
return
}
}
for _, tpApf := range attrModels.AsTPAttributes() {
var apf *engine.AttributeProfile
if apf, err = engine.APItoAttributeProfile(tpApf, tmz); err != nil {
return
}
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: AttributeProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(apf)))
}
case utils.MetaResources:
resModels := make(engine.ResourceMdls, len(lDataSet))
for i, ld := range lDataSet {
resModels[i] = new(engine.ResourceMdl)
if err = utils.UpdateStructWithIfaceMap(resModels[i], ld); err != nil {
return
}
}
for _, tpRes := range resModels.AsTPResources() {
var res *engine.ResourceProfile
if res, err = engine.APItoResource(tpRes, tmz); err != nil {
return
}
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ResourceProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(res)))
}
case utils.MetaFilters:
fltrModels := make(engine.FilterMdls, len(lDataSet))
for i, ld := range lDataSet {
fltrModels[i] = new(engine.FilterMdl)
if err = utils.UpdateStructWithIfaceMap(fltrModels[i], ld); err != nil {
return
}
}
for _, tpFltr := range fltrModels.AsTPFilter() {
var fltrPrf *engine.Filter
if fltrPrf, err = engine.APItoFilter(tpFltr, tmz); err != nil {
return
}
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: Filter: %s",
utils.LoaderS, ldrID, utils.ToJSON(fltrPrf)))
}
case utils.MetaStats:
stsModels := make(engine.StatMdls, len(lDataSet))
for i, ld := range lDataSet {
stsModels[i] = new(engine.StatMdl)
if err = utils.UpdateStructWithIfaceMap(stsModels[i], ld); err != nil {
return
}
}
for _, tpSts := range stsModels.AsTPStats() {
var stsPrf *engine.StatQueueProfile
if stsPrf, err = engine.APItoStats(tpSts, tmz); err != nil {
return
}
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: StatsQueueProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(stsPrf)))
}
case utils.MetaThresholds:
thModels := make(engine.ThresholdMdls, len(lDataSet))
for i, ld := range lDataSet {
thModels[i] = new(engine.ThresholdMdl)
if err = utils.UpdateStructWithIfaceMap(thModels[i], ld); err != nil {
return
}
}
for _, tpTh := range thModels.AsTPThreshold() {
var thPrf *engine.ThresholdProfile
if thPrf, err = engine.APItoThresholdProfile(tpTh, tmz); err != nil {
return
}
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ThresholdProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(thPrf)))
}
case utils.MetaRoutes:
sppModels := make(engine.RouteMdls, len(lDataSet))
for i, ld := range lDataSet {
sppModels[i] = new(engine.RouteMdl)
if err = utils.UpdateStructWithIfaceMap(sppModels[i], ld); err != nil {
return
}
}
for _, tpSpp := range sppModels.AsTPRouteProfile() {
var spPrf *engine.RouteProfile
if spPrf, err = engine.APItoRouteProfile(tpSpp, tmz); err != nil {
return
}
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: RouteProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(spPrf)))
}
case utils.MetaChargers:
cppModels := make(engine.ChargerMdls, len(lDataSet))
for i, ld := range lDataSet {
cppModels[i] = new(engine.ChargerMdl)
if err = utils.UpdateStructWithIfaceMap(cppModels[i], ld); err != nil {
return
}
}
for _, tpCPP := range cppModels.AsTPChargers() {
var cpp *engine.ChargerProfile
if cpp, err = engine.APItoChargerProfile(tpCPP, tmz); err != nil {
return
}
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ChargerProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(cpp)))
}
case utils.MetaDispatchers:
dispModels := make(engine.DispatcherProfileMdls, len(lDataSet))
for i, ld := range lDataSet {
dispModels[i] = new(engine.DispatcherProfileMdl)
if err = utils.UpdateStructWithIfaceMap(dispModels[i], ld); err != nil {
return
}
}
for _, tpDsp := range dispModels.AsTPDispatcherProfiles() {
var dsp *engine.DispatcherProfile
if dsp, err = engine.APItoDispatcherProfile(tpDsp, tmz); err != nil {
return
}
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(dsp)))
}
case utils.MetaDispatcherHosts:
dispModels := make(engine.DispatcherHostMdls, len(lDataSet))
for i, ld := range lDataSet {
dispModels[i] = new(engine.DispatcherHostMdl)
if err = utils.UpdateStructWithIfaceMap(dispModels[i], ld); err != nil {
return
}
}
var tpDsps []*utils.TPDispatcherHost
if tpDsps, err = dispModels.AsTPDispatcherHosts(); err != nil {
return
}
for _, tpDsp := range tpDsps {
dsp := engine.APItoDispatcherHost(tpDsp)
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherHost: %s",
utils.LoaderS, ldrID, utils.ToJSON(dsp)))
}
case utils.MetaRateProfiles:
rpMdls := make(engine.RateProfileMdls, len(lDataSet))
for i, ld := range lDataSet {
rpMdls[i] = new(engine.RateProfileMdl)
if err = utils.UpdateStructWithIfaceMap(rpMdls[i], ld); err != nil {
return
}
}
for _, tpRpl := range rpMdls.AsTPRateProfile() {
var rpl *utils.RateProfile
if rpl, err = engine.APItoRateProfile(tpRpl, tmz); err != nil {
return
}
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: RateProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(rpl)))
}
case utils.MetaActionProfiles:
acpsModels := make(engine.ActionProfileMdls, len(lDataSet))
for i, ld := range lDataSet {
acpsModels[i] = new(engine.ActionProfileMdl)
if err = utils.UpdateStructWithIfaceMap(acpsModels[i], ld); err != nil {
return
}
}
for _, tpAcp := range acpsModels.AsTPActionProfile() {
var acp *engine.ActionProfile
if acp, err = engine.APItoActionProfile(tpAcp, tmz); err != nil {
return
}
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: ActionProfile: %s",
utils.LoaderS, ldrID, utils.ToJSON(acp)))
}
case utils.MetaAccounts:
acpsModels := make(engine.AccountMdls, len(lDataSet))
for i, ld := range lDataSet {
acpsModels[i] = new(engine.AccountMdl)
if err = utils.UpdateStructWithIfaceMap(acpsModels[i], ld); err != nil {
return
}
}
var accountTPModels []*utils.TPAccount
if accountTPModels, err = acpsModels.AsTPAccount(); err != nil {
return
}
for _, tpAcp := range accountTPModels {
var acp *utils.Account
if acp, err = engine.APItoAccount(tpAcp, tmz); err != nil {
return
}
utils.Logger.Info(
fmt.Sprintf("<%s-%s> DRY_RUN: Accounts: %s",
utils.LoaderS, ldrID, utils.ToJSON(acp)))
}
}
return
}
func newLoader(cfg *config.CGRConfig, ldrCfg *config.LoaderSCfg, dm *engine.DataManager, dataCache map[string]*ltcache.Cache,
timezone string, filterS *engine.FilterS, connMgr *engine.ConnManager, cacheConns []string) *loader {
return &loader{
@@ -431,20 +546,20 @@ type loader struct {
Locker
}
func (l *loader) process(ctx *context.Context, tntID *utils.TenantID, lDataSet []utils.MapStorage, lType, action, caching string, dryRun, withIndex, partialRates, partial bool) (err error) {
if partial { // do not set in DB; ToDo: how to determine if is cache or not
return
}
func (l *loader) process(ctx *context.Context, tntID *utils.TenantID, lDataSet []utils.MapStorage, lType, action, caching string, withIndex, partialRates bool) (err error) {
switch action {
case utils.MetaParse:
return
case utils.MetaDryRun:
return dryRun(ctx, l.dm, lType, l.timezone, l.ldrCfg.ID, lDataSet)
case utils.MetaStore:
err = setToDB(ctx, l.dm, lType, l.timezone, l.ldrCfg.ID, lDataSet, dryRun, withIndex, partialRates)
err = setToDB(ctx, l.dm, lType, l.timezone, lDataSet, withIndex, partialRates)
case utils.MetaRemove:
err = removeFromDB(ctx, l.dm, lType, tntID.Tenant, tntID.ID, l.ldrCfg.ID, dryRun, withIndex, partialRates, lDataSet[0])
err = removeFromDB(ctx, l.dm, lType, tntID.Tenant, tntID.ID, withIndex, partialRates, lDataSet[0])
default:
err = fmt.Errorf("unsupported loader action: <%q>", action)
return fmt.Errorf("unsupported loader action: <%q>", action)
}
if err != nil || dryRun ||
len(l.cacheConns) == 0 {
if err != nil || len(l.cacheConns) == 0 {
return
}
cacheArgs := make(map[string][]string)
@@ -494,7 +609,7 @@ func (l *loader) process(ctx *context.Context, tntID *utils.TenantID, lDataSet [
return engine.CallCache(l.connMgr, ctx, l.cacheConns, caching, cacheArgs, cacheIDs, nil, false, l.ldrCfg.Tenant)
}
func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*config.FCTemplate, lType, action, caching string, dryRun, withIndex, partialRates, partial bool) (err error) {
func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*config.FCTemplate, lType, action, caching string, withIndex, partialRates bool) (err error) {
var prevTntID *utils.TenantID
var lData []utils.MapStorage
for lineNr := 1; ; lineNr++ {
@@ -518,7 +633,7 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi
tntID := TenantIDFromMap(data)
if !prevTntID.Equal(tntID) {
if prevTntID != nil {
if err = l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates, partial); err != nil {
if err = l.process(ctx, prevTntID, lData, lType, action, caching, withIndex, partialRates); err != nil {
return
}
}
@@ -527,10 +642,10 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi
}
lData = append(lData, data)
}
return l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates, partial)
return l.process(ctx, prevTntID, lData, lType, action, caching, withIndex, partialRates)
}
func (l *loader) processFile(ctx *context.Context, cfg *config.LoaderDataType, inPath, outPath, action, caching string, dryRun, withIndex bool) (err error) {
func (l *loader) processFile(ctx *context.Context, cfg *config.LoaderDataType, inPath, outPath, action, caching string, withIndex bool) (err error) {
csvType := utils.MetaFileCSV
switch {
case strings.HasPrefix(inPath, gprefix):
@@ -545,7 +660,7 @@ func (l *loader) processFile(ctx *context.Context, cfg *config.LoaderDataType, i
}
defer csv.Close()
if err = l.processData(ctx, csv, cfg.Fields, cfg.Type, action, caching,
dryRun, withIndex, cfg.Flags.GetBool(utils.PartialRatesOpt), cfg.Flags.GetBool(utils.PartialOpt)); err != nil || // encounterd error
withIndex, cfg.Flags.GetBool(utils.PartialRatesOpt)); err != nil || // encounterd error
outPath == utils.EmptyString || // or no moving
csvType != utils.MetaFileCSV { // or the type can not be moved(e.g. url)
return
@@ -565,6 +680,9 @@ func (l *loader) getCfg(fileName string) (cfg *config.LoaderDataType) {
func (l *loader) processIFile(_, fileName string) (err error) {
cfg := l.getCfg(fileName)
if cfg == nil {
if pathIn := path.Join(l.ldrCfg.TpInDir, fileName); l.IsLockFile(pathIn) && len(l.ldrCfg.TpOutDir) != 0 {
err = os.Rename(pathIn, path.Join(l.ldrCfg.TpOutDir, fileName))
}
return
}
@@ -572,35 +690,36 @@ func (l *loader) processIFile(_, fileName string) (err error) {
return
}
defer l.Unlock()
return l.processFile(context.Background(), cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, l.ldrCfg.Action, l.ldrCfg.Caching, l.ldrCfg.DryRun, l.ldrCfg.WithIndex)
return l.processFile(context.Background(), cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, l.ldrCfg.Action, l.ldrCfg.Opts.Cache, l.ldrCfg.Opts.WithIndex)
}
func (l *loader) processFolder(ctx *context.Context, action, caching string, dryRun, withIndex, stopOnError bool) (err error) {
func (l *loader) processFolder(ctx *context.Context, caching string, withIndex, stopOnError bool) (err error) {
if err = l.Lock(); err != nil {
return
}
defer l.Unlock()
proces := func(i int) (err error) {
cfg := l.ldrCfg.Data[i]
if err = l.processFile(ctx, cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, action, caching, dryRun, withIndex); err != nil && !stopOnError {
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s",
utils.LoaderS, l.ldrCfg.ID, cfg.Type, err))
err = nil
for _, cfg := range l.ldrCfg.Data {
if err = l.processFile(ctx, cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, l.ldrCfg.Action, caching, withIndex); err != nil {
if !stopOnError {
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s",
utils.LoaderS, l.ldrCfg.ID, cfg.Type, err))
err = nil
}
return
}
return
}
switch action {
case utils.MetaStore:
for i := range l.ldrCfg.Data {
if err = proces(i); err != nil {
return
}
if len(l.ldrCfg.TpOutDir) != 0 {
var fs []os.DirEntry
if fs, err = os.ReadDir(l.ldrCfg.TpInDir); err != nil {
return
}
case utils.MetaRemove:
for i := len(l.ldrCfg.Data) - 1; i >= 0; i-- {
if err = proces(i); err != nil {
return
for _, f := range fs {
if pathIn := path.Join(l.ldrCfg.TpInDir, f.Name()); !l.IsLockFile(pathIn) {
if err = os.Rename(pathIn, path.Join(l.ldrCfg.TpOutDir, f.Name())); err != nil {
return
}
}
}
}
return
@@ -608,7 +727,7 @@ func (l *loader) processFolder(ctx *context.Context, action, caching string, dry
func (l *loader) handleFolder(stopChan chan struct{}) {
for {
go l.processFolder(context.Background(), l.ldrCfg.Action, l.ldrCfg.Caching, l.ldrCfg.DryRun, l.ldrCfg.WithIndex, false)
go l.processFolder(context.Background(), l.ldrCfg.Opts.Cache, l.ldrCfg.Opts.WithIndex, false)
timer := time.NewTimer(l.ldrCfg.RunDelay)
select {
case <-stopChan:

View File

@@ -34,9 +34,8 @@ func NewLoaderService(cfg *config.CGRConfig, dm *engine.DataManager,
timezone string, filterS *engine.FilterS,
connMgr *engine.ConnManager) (ldrS *LoaderService) {
ldrS = &LoaderService{cfg: cfg, cache: make(map[string]*ltcache.Cache)}
for i := range cfg.LoaderCfg()[0].Data {
cfg := cfg.LoaderCfg()[0].Data[i]
ldrS.cache[cfg.Type] = ltcache.NewCache(-1, 0, false, nil)
for k, cfg := range cfg.LoaderCfg()[0].Cache {
ldrS.cache[k] = ltcache.NewCache(cfg.Limit, cfg.TTL, cfg.StaticTTL, nil)
}
ldrS.createLoaders(dm, timezone, filterS, connMgr)
return
@@ -66,20 +65,58 @@ func (ldrS *LoaderService) ListenAndServe(stopChan chan struct{}) (err error) {
}
type ArgsProcessFolder struct {
LoaderID string
ForceLock bool
Caching *string
StopOnError bool
LoaderID string
APIOpts map[string]interface{}
}
func (ldrS *LoaderService) V1Load(ctx *context.Context, args *ArgsProcessFolder,
func (ldrS *LoaderService) V1Run(ctx *context.Context, args *ArgsProcessFolder,
rply *string) (err error) {
return ldrS.process(ctx, args, utils.MetaStore, rply)
}
ldrS.RLock()
defer ldrS.RUnlock()
func (ldrS *LoaderService) V1Remove(ctx *context.Context, args *ArgsProcessFolder,
rply *string) (err error) {
return ldrS.process(ctx, args, utils.MetaRemove, rply)
if args.LoaderID == utils.EmptyString {
args.LoaderID = utils.MetaDefault
}
ldr, has := ldrS.ldrs[args.LoaderID]
if !has {
return fmt.Errorf("UNKNOWN_LOADER: %s", args.LoaderID)
}
var locked bool
if locked, err = ldr.Locked(); err != nil {
return utils.NewErrServerError(err)
} else if locked {
fl := ldr.ldrCfg.Opts.ForceLock
if val, has := args.APIOpts[utils.MetaForceLock]; has {
if fl, err = utils.IfaceAsBool(val); err != nil {
return
}
}
if !fl {
return errors.New("ANOTHER_LOADER_RUNNING")
}
if err := ldr.Unlock(); err != nil {
return utils.NewErrServerError(err)
}
}
wI := ldr.ldrCfg.Opts.WithIndex
if val, has := args.APIOpts[utils.MetaWithIndex]; has {
if wI, err = utils.IfaceAsBool(val); err != nil {
return
}
}
soE := ldr.ldrCfg.Opts.StopOnError
if val, has := args.APIOpts[utils.MetaStopOnError]; has {
if soE, err = utils.IfaceAsBool(val); err != nil {
return
}
}
if err := ldr.processFolder(context.Background(), utils.FirstNonEmpty(utils.IfaceAsString(args.APIOpts[utils.MetaCache]), ldr.ldrCfg.Opts.Cache, ldrS.cfg.GeneralCfg().DefaultCaching),
wI, soE); err != nil {
return utils.NewErrServerError(err)
}
*rply = utils.OK
return
}
// Reload recreates the loaders map thread safe
@@ -100,38 +137,3 @@ func (ldrS *LoaderService) createLoaders(dm *engine.DataManager,
}
}
}
func (ldrS *LoaderService) process(ctx *context.Context, args *ArgsProcessFolder, action string,
rply *string) (err error) {
ldrS.RLock()
defer ldrS.RUnlock()
if args.LoaderID == utils.EmptyString {
args.LoaderID = utils.MetaDefault
}
ldr, has := ldrS.ldrs[args.LoaderID]
if !has {
return fmt.Errorf("UNKNOWN_LOADER: %s", args.LoaderID)
}
if locked, err := ldr.Locked(); err != nil {
return utils.NewErrServerError(err)
} else if locked {
if !args.ForceLock {
return errors.New("ANOTHER_LOADER_RUNNING")
}
if err := ldr.Unlock(); err != nil {
return utils.NewErrServerError(err)
}
}
//verify If Caching is present in arguments
caching := utils.FirstNonEmpty(ldr.ldrCfg.Caching, ldrS.cfg.GeneralCfg().DefaultCaching)
if args.Caching != nil {
caching = *args.Caching
}
if err := ldr.processFolder(context.Background(), action, caching, false, ldr.ldrCfg.WithIndex, args.StopOnError); err != nil {
return utils.NewErrServerError(err)
}
*rply = utils.OK
return
}

View File

@@ -29,6 +29,7 @@ type Locker interface {
Lock() error
Unlock() error
Locked() (bool, error)
IsLockFile(string) bool
}
func newLocker(path string) Locker {
@@ -63,10 +64,12 @@ func (fl folderLock) Locked() (lk bool, err error) {
lk = true
return
}
func (fl folderLock) IsLockFile(path string) bool { return path == string(fl) }
type nopLock struct{}
// lockFolder will attempt to lock the folder by creating the lock file
func (nopLock) Lock() (_ error) { return }
func (nopLock) Unlock() (_ error) { return }
func (nopLock) Locked() (_ bool, _ error) { return }
func (nopLock) Lock() (_ error) { return }
func (nopLock) Unlock() (_ error) { return }
func (nopLock) Locked() (_ bool, _ error) { return }
func (nopLock) IsLockFile(string) (_ bool) { return }

View File

@@ -145,10 +145,12 @@ func cgrRunPreload(ctx *context.Context, cfg *config.CGRConfig, loaderIDs string
var reply string
for _, loaderID := range strings.Split(loaderIDs, utils.FieldsSep) {
if err = loader.GetLoaderS().V1Load(ctx, &loaders.ArgsProcessFolder{
ForceLock: true, // force lock will unlock the file in case is locked and return error
LoaderID: loaderID,
StopOnError: true,
if err = loader.GetLoaderS().V1Run(ctx, &loaders.ArgsProcessFolder{
APIOpts: map[string]interface{}{
utils.MetaForceLock: true, // force lock will unlock the file in case is locked and return error
utils.MetaStopOnError: true,
},
LoaderID: loaderID,
}, &reply); err != nil {
err = fmt.Errorf("<%s> preload failed on loadID <%s> , err: <%s>", utils.LoaderS, loaderID, err)
return

View File

@@ -45,7 +45,6 @@ func TestLoaderSCoverage(t *testing.T) {
ID: "test_id",
Enabled: true,
Tenant: "",
DryRun: false,
RunDelay: 0,
LockFilePath: "",
CacheSConns: nil,

View File

@@ -644,6 +644,7 @@ const (
MetaRemove = "*remove"
MetaRemoveAll = "*removeall"
MetaStore = "*store"
MetaParse = "*parse"
MetaClear = "*clear"
MetaExport = "*export"
MetaExportID = "*export_id"
@@ -2133,6 +2134,7 @@ const (
CacheDumpFieldsCfg = "cache_dump_fields"
PartialCommitFieldsCfg = "partial_commit_fields"
PartialCacheTTLCfg = "partial_cache_ttl"
ActionCfg = "action"
)
// RegistrarCCfg
@@ -2311,6 +2313,10 @@ const (
RemoteHostOpt = "*rmtHost"
MetaCache = "*cache"
MetaWithIndex = "*withIndex"
MetaForceLock = "*forceLock"
MetaStopOnError = "*stopOnError"
)
// Event Flags