From 83dcb7bbc2b7a42e86053ec928bfd4b85487677e Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 17 Dec 2021 14:04:24 +0200 Subject: [PATCH] Started adding back dispatcher --- build.sh | 3 ++ config/config_defaults.go | 1 + data/conf/cgrates/cgrates.json | 58 ++++++++++++++++++++++- engine/connmanager.go | 86 +++++++++++++++++++++++++++++++++- engine/libengine.go | 25 ++++++++-- engine/libengine_test.go | 1 + services/dispatchers.go | 2 + services/dispatchers_test.go | 4 +- utils/consts.go | 2 + 9 files changed, 172 insertions(+), 10 deletions(-) diff --git a/build.sh b/build.sh index 5ec824b60..00e42d8cf 100755 --- a/build.sh +++ b/build.sh @@ -10,6 +10,9 @@ then GIT_LAST_LOG="" fi +go generate ./... +go fmt ./... + go install -ldflags "-X 'github.com/cgrates/cgrates/utils.GitLastLog=$GIT_LAST_LOG'" github.com/cgrates/cgrates/cmd/cgr-engine cr=$? go install -ldflags "-X 'github.com/cgrates/cgrates/utils.GitLastLog=$GIT_LAST_LOG'" github.com/cgrates/cgrates/cmd/cgr-loader diff --git a/config/config_defaults.go b/config/config_defaults.go index 50401ad68..09f704001 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -1,3 +1,4 @@ +//go:generate go run ../data/scripts/generate_config/generate.go /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 71f11a222..3953dab03 100755 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -32,6 +32,10 @@ // "digest_equal": ":", // equal symbol used in case of digests // "rsr_separator": ";", // separator used within RSR fields // "max_parallel_conns": 100, // the maximum number of connection used by the *parallel strategy +// "decimal_max_scale": 0, // the maximum scale for decimal numbers +// "decimal_min_scale": 0, // the minimum scale for decimal numbers +// "decimal_precision": 0, // the precision of the decimal operations +// "decimal_rounding_mode": "*toNearestEven", // the rounding mode <*toNearestEven|*toNearestAway|*toZero|*awayFromZero|*toNegativeInf|*toPositiveInf|*toNearestTowardZero> // }, @@ -39,6 +43,7 @@ // "caps": 0, // maximum concurrent request allowed ( 0 to disabled ) // "caps_strategy": "*busy", // strategy in case in case of concurrent requests reached // "caps_stats_interval": "0", // the interval we sample for caps stats ( 0 to disabled ) +// "ees_conns": [], // connections to EventExporter // "shutdown_timeout": "1s" // the duration to wait until all services are stopped // }, @@ -149,7 +154,6 @@ // "*tp_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false}, // "*tp_attributes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false}, // "*tp_chargers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false}, -// "*versions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false}, // "*tp_dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false}, // "*tp_dispatcher_hosts": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false}, // "*tp_rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false}, @@ -544,6 +548,7 @@ // // "certPath": "", // path to client certificate // // "caPath": "", // path to CA certificate // // "tls": false, // +// // "connIDs": [], // connections for connManager to this exporter // // "rpcConnTimeout" : "1s", // connection unsuccesfull on timeout // // "rpcReplyTimeout":"2s", // connection down at replies if taking longer that this value // }, // extra options for exporter @@ -922,6 +927,8 @@ // //"string_indexed_fields": [], // query indexes based on these fields for faster processing // "prefix_indexed_fields": [], // query indexes based on these fields for faster processing // "suffix_indexed_fields": [], // query indexes based on these fields for faster processing +// "exists_indexed_fields": [], // query indexes based on these fields for faster processing +// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing // "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level) // "opts":{ // // "*profileIDs": [ @@ -963,6 +970,8 @@ // //"string_indexed_fields": [], // query indexes based on these fields for faster processing // "prefix_indexed_fields": [], // query indexes based on these fields for faster processing // "suffix_indexed_fields": [], // query indexes based on these fields for faster processing +// "exists_indexed_fields": [], // query indexes based on these fields for faster processing +// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing // "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level) // }, @@ -975,6 +984,8 @@ // //"string_indexed_fields": [], // query indexes based on these fields for faster processing // "prefix_indexed_fields": [], // query indexes based on these fields for faster processing // "suffix_indexed_fields": [], // query indexes based on these fields for faster processing +// "exists_indexed_fields": [], // query indexes based on these fields for faster processing +// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing // "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level) // "opts":{ // // "*usageID": [ @@ -1011,6 +1022,8 @@ // //"string_indexed_fields": [], // query indexes based on these fields for faster processing // "prefix_indexed_fields": [], // query indexes based on these fields for faster processing // "suffix_indexed_fields": [], // query indexes based on these fields for faster processing +// "exists_indexed_fields": [], // query indexes based on these fields for faster processing +// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing // "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level) // "opts": { // // "*profileIDs": [ @@ -1026,6 +1039,13 @@ // // "FilterIDs": [], // // "Value": false, // // }, +// ], +// "*roundingDecimals": [ // ignore the filters for statIDs +// // { +// // "Tenant": "*any", +// // "FilterIDs": [], +// // "Value": 5, +// // }, // ], // }, // }, @@ -1038,6 +1058,8 @@ // //"string_indexed_fields": [], // query indexes based on these fields for faster processing // "prefix_indexed_fields": [], // query indexes based on these fields for faster processing // "suffix_indexed_fields": [], // query indexes based on these fields for faster processing +// "exists_indexed_fields": [], // query indexes based on these fields for faster processing +// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing // "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level) // "actions_conns": [], // connections to ActionS to execute the actions // "opts":{ // @@ -1065,6 +1087,8 @@ // //"string_indexed_fields": [], // query indexes based on these fields for faster processing // "prefix_indexed_fields": [], // query indexes based on these fields for faster processing // "suffix_indexed_fields": [], // query indexes based on these fields for faster processing +// "exists_indexed_fields": [], // query indexes based on these fields for faster processing +// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing // "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level) // "attributes_conns": [], // connections to AttributeS for altering events before route queries: <""|*internal|$rpc_conns_id> // "resources_conns": [], // connections to ResourceS for *res sorting, empty to disable functionality: <""|*internal|$rpc_conns_id> @@ -1131,13 +1155,33 @@ // "id": "*default", // identifier of the Loader // "enabled": false, // starts as service: . // "tenant": "", // tenant used in filterS.Pass -// "dry_run": false, // do not send the CDRs to CDRS, just parse them // "run_delay": "0", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together // "lockfile_path": ".cgr.lck", // Filename containing concurrency lock in case of delayed processing // "caches_conns": ["*internal"], // "field_separator": ",", // separator used in case of csv files // "tp_in_dir": "/var/spool/cgrates/loader/in", // absolute path towards the directory where the TPs are stored // "tp_out_dir": "/var/spool/cgrates/loader/out", // absolute path towards the directory where processed TPs will be moved +// "action": "*store", // what should the loader do<*store|*parse|*remove|*dryrun> +// "opts": { +// // "*cache": "*reload", +// "*withIndex": true, +// // "*forceLock": false, +// // "*stopOnError": false, +// }, +// "cache":{ +// "*filters":{"limit": -1, "ttl": "5s", "static_ttl": false}, +// "*attributes":{"limit": -1, "ttl": "5s", "static_ttl": false}, +// "*resources":{"limit": -1, "ttl": "5s", "static_ttl": false}, +// "*stats":{"limit": -1, "ttl": "5s", "static_ttl": false}, +// "*thresholds":{"limit": -1, "ttl": "5s", "static_ttl": false}, +// "*routes":{"limit": -1, "ttl": "5s", "static_ttl": false}, +// "*chargers":{"limit": -1, "ttl": "5s", "static_ttl": false}, +// "*dispatchers":{"limit": -1, "ttl": "5s", "static_ttl": false}, +// "*dispatcher_hosts":{"limit": -1, "ttl": "5s", "static_ttl": false}, +// "*rate_profiles":{"limit": -1, "ttl": "5s", "static_ttl": false}, +// "*action_profiles":{"limit": -1, "ttl": "5s", "static_ttl": false}, +// "*accounts":{"limit": -1, "ttl": "5s", "static_ttl": false}, +// }, // "data":[ // data profiles to load // { // "type": "*filters", // data source type @@ -1431,6 +1475,8 @@ // //"string_indexed_fields": [], // query indexes based on these fields for faster processing // "prefix_indexed_fields": [], // query indexes based on these fields for faster processing // "suffix_indexed_fields": [], // query indexes based on these fields for faster processing +// "exists_indexed_fields": [], // query indexes based on these fields for faster processing +// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing // "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level) // "attributes_conns": [], // connections to AttributeS for API authorization, empty to disable auth functionality: <""|*internal|$rpc_conns_id> // }, @@ -1476,11 +1522,15 @@ // //"string_indexed_fields": [], // query indexes based on these fields for faster processing // "prefix_indexed_fields": [], // query indexes based on these fields for faster processing // "suffix_indexed_fields": [], // query indexes based on these fields for faster processing +// "exists_indexed_fields": [], // query indexes based on these fields for faster processing +// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing // "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level) // "rate_indexed_selects": true, // enable profile matching exclusively on indexes // //"rate_string_indexed_fields": [], // query indexes based on these fields for faster processing // "rate_prefix_indexed_fields": [], // query indexes based on these fields for faster processing // "rate_suffix_indexed_fields": [], // query indexes based on these fields for faster processing +// "rate_exists_indexed_fields": [], // query indexes based on these fields for faster processing +// "rate_notexists_indexed_fields": [], // query indexes based on these fields for faster processing // "rate_nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level) // "verbosity": 1000, // number of increment iterations allowed // "opts":{ // @@ -1650,6 +1700,8 @@ // //"string_indexed_fields": [], // query indexes based on these fields for faster processing // "prefix_indexed_fields": [], // query indexes based on these fields for faster processing // "suffix_indexed_fields": [], // query indexes based on these fields for faster processing +// "exists_indexed_fields": [], // query indexes based on these fields for faster processing +// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing // "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level) // "dynaprepaid_actionprofile": [], // // "opts":{ // @@ -1680,6 +1732,8 @@ // //"string_indexed_fields": [], // query indexes based on these fields for faster processing // "prefix_indexed_fields": [], // query indexes based on these fields for faster processing // "suffix_indexed_fields": [], // query indexes based on these fields for faster processing +// "exists_indexed_fields": [], // query indexes based on these fields for faster processing +// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing // "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level) // "max_iterations": 1000, // maximum number of iterations // "max_usage": "72h", // maximum time of usage diff --git a/engine/connmanager.go b/engine/connmanager.go index 270fdaf94..e3fd9bef2 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -46,6 +46,8 @@ func NewConnManager(cfg *config.CGRConfig) (cM *ConnManager) { type ConnManager struct { cfg *config.CGRConfig rpcInternal map[string]chan birpc.ClientConnector + disp map[string]chan birpc.ClientConnector + dispIntCh RPCClientSet dynIntCh RPCClientSet connCache *ltcache.Cache } @@ -64,7 +66,7 @@ func (cM *ConnManager) getConn(ctx *context.Context, connID string) (conn birpc. var intChan chan birpc.ClientConnector var isInternalRPC bool var connCfg *config.RPCConn - if intChan, isInternalRPC = cM.rpcInternal[connID]; isInternalRPC { + if intChan, isInternalRPC = cM.getInternalConnChan(connID); isInternalRPC { connCfg = cM.cfg.RPCConns()[rpcclient.InternalRPC] if strings.HasPrefix(connID, rpcclient.BiRPCInternal) { connCfg = cM.cfg.RPCConns()[rpcclient.BiRPCInternal] @@ -73,7 +75,7 @@ func (cM *ConnManager) getConn(ctx *context.Context, connID string) (conn birpc. connCfg = cM.cfg.RPCConns()[connID] for _, rpcConn := range connCfg.Conns { if rpcConn.Address == utils.MetaInternal { - intChan = cM.dynIntCh.GetInternalChanel() + intChan = cM.GetInternalChan() break } } @@ -207,6 +209,9 @@ func (cM *ConnManager) Reload() { } func (cM *ConnManager) GetInternalChan() chan birpc.ClientConnector { + if cM.dispIntCh != nil { + return cM.dispIntCh.GetInternalChanel() + } return cM.dynIntCh.GetInternalChanel() } @@ -215,3 +220,80 @@ func (cM *ConnManager) AddInternalConn(connName, apiPrefix string, cM.rpcInternal[connName] = iConnCh cM.dynIntCh[apiPrefix] = iConnCh } + +func (cM *ConnManager) EnableDispatcher(dsp IntService) { + cM.disp = map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(utils.MetaDispatchers, utils.MetaAttributes): cM.rpcInternal[utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)], + } + cM.dispIntCh = make(RPCClientSet) + for m, srv := range dsp { + var key string + switch { + case strings.HasPrefix(m, utils.AccountS): + key = utils.MetaAccounts + case strings.HasPrefix(m, utils.ActionS): + key = utils.MetaActions + case strings.HasPrefix(m, utils.AttributeS): + key = utils.MetaAttributes + case strings.HasPrefix(m, utils.CacheS): + key = utils.MetaCaches + case strings.HasPrefix(m, utils.ChargerS): + key = utils.MetaChargers + case strings.HasPrefix(m, utils.ConfigS): + key = utils.MetaConfig + case strings.HasPrefix(m, utils.DispatcherS): + key = utils.MetaDispatchers + case strings.HasPrefix(m, utils.GuardianS): + key = utils.MetaGuardian + case strings.HasPrefix(m, utils.RateS): + key = utils.MetaRateS + case strings.HasPrefix(m, utils.ResourceS): + key = utils.MetaResources + case strings.HasPrefix(m, utils.RouteS): + key = utils.MetaRoutes + case strings.HasPrefix(m, utils.SessionS): + key = utils.MetaSessionS + case strings.HasPrefix(m, utils.StatS): + key = utils.MetaStats + case strings.HasPrefix(m, utils.ThresholdS): + key = utils.MetaThresholds + case strings.HasPrefix(m, utils.CDRs): + key = utils.MetaCDRs + case strings.HasPrefix(m, utils.ReplicatorS): + key = utils.MetaReplicator + case strings.HasPrefix(m, utils.EeS): + key = utils.MetaEEs + case strings.HasPrefix(m, utils.CoreS): + key = utils.MetaCore + case strings.HasPrefix(m, utils.AnalyzerS): + key = utils.MetaAnalyzer + case strings.HasPrefix(m, utils.AdminS): + key = utils.MetaAdminS + case strings.HasPrefix(m, utils.LoaderS): + key = utils.MetaLoaders + case strings.HasPrefix(m, utils.ServiceManager): + key = utils.MetaServiceManager + } + key = utils.ConcatenatedKey(utils.MetaInternal, key) + ch := make(chan birpc.ClientConnector, 1) + ch <- srv + cM.disp[key] = ch + cM.dispIntCh[m] = ch + } + cM.Reload() +} + +func (cM *ConnManager) DisableDispatcher() { + cM.disp = nil + cM.dispIntCh = nil + cM.Reload() +} + +func (cM *ConnManager) getInternalConnChan(key string) (c chan birpc.ClientConnector, has bool) { + if cM.disp != nil { + c, has = cM.disp[key] + } else { + c, has = cM.rpcInternal[key] + } + return +} diff --git a/engine/libengine.go b/engine/libengine.go index 3e063387c..6ea5c392c 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -203,9 +203,9 @@ func NewDispatcherService(val interface{}) (_ IntService, err error) { case strings.HasPrefix(m, utils.RateS): m = strings.TrimPrefix(m, utils.RateS) key = utils.RateS - // case strings.HasPrefix(m, utils.ReplicatorS): - // m = strings.TrimPrefix(m, utils.ReplicatorS) - // key = utils.ReplicatorS + case strings.HasPrefix(m, utils.ReplicatorS): + m = strings.TrimPrefix(m, utils.ReplicatorS) + key = utils.ReplicatorS case strings.HasPrefix(m, utils.ResourceS): m = strings.TrimPrefix(m, utils.ResourceS) key = utils.ResourceS @@ -224,6 +224,24 @@ func NewDispatcherService(val interface{}) (_ IntService, err error) { case strings.HasPrefix(m, utils.CDRs): m = strings.TrimPrefix(m, utils.CDRs) key = utils.CDRs + case strings.HasPrefix(m, utils.EeS): + m = strings.TrimPrefix(m, utils.EeS) + key = utils.EeS + case strings.HasPrefix(m, utils.CoreS): + m = strings.TrimPrefix(m, utils.CoreS) + key = utils.CoreS + case strings.HasPrefix(m, utils.AnalyzerS): + m = strings.TrimPrefix(m, utils.AnalyzerS) + key = utils.AnalyzerS + case strings.HasPrefix(m, utils.AdminS): + m = strings.TrimPrefix(m, utils.AdminS) + key = utils.AdminS + case strings.HasPrefix(m, utils.LoaderS): + m = strings.TrimPrefix(m, utils.LoaderS) + key = utils.LoaderS + case strings.HasPrefix(m, utils.ServiceManager): + m = strings.TrimPrefix(m, utils.ServiceManager) + key = utils.ServiceManager } if len(m) < 2 || unicode.ToLower(rune(m[0])) != 'v' { continue @@ -243,7 +261,6 @@ func NewDispatcherService(val interface{}) (_ IntService, err error) { s[key] = srv2 } srv2.Methods[m[2:]] = v - } return s, nil } diff --git a/engine/libengine_test.go b/engine/libengine_test.go index 017567197..1b2b9713a 100644 --- a/engine/libengine_test.go +++ b/engine/libengine_test.go @@ -235,6 +235,7 @@ func TestIntServiceNewDispatcherService(t *testing.T) { "StatSv1": {"Do", "Ping"}, "TestRPCDspMock": {"AccountSv1Do", "ActionSv1Do", "AttributeSv1Do", "CDRsv1Do", "CacheSv1Do", "ChargerSv1Do", "ConfigSv1Do", "DispatcherSv1Do", "GuardianSv1Do", "Ping", "RateSv1Do", "ReplicatorSv1Do", "ResourceSv1Do", "RouteSv1Do", "SessionSv1Do", "StatSv1Do", "ThresholdSv1Do"}, "ThresholdSv1": {"Do", "Ping"}, + "ReplicatorSv1": {"Do", "Ping"}, } if !reflect.DeepEqual(exp, methods) { t.Errorf("Expeceted: %v, received: %v", utils.ToJSON(exp), utils.ToJSON(methods)) diff --git a/services/dispatchers.go b/services/dispatchers.go index dadc4776e..312a226f9 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -103,6 +103,7 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc) for _, s := range srv { dspS.server.RpcRegister(s) } + dspS.connMgr.EnableDispatcher(srv) // for the moment we dispable Apier through dispatcher // until we figured out a better sollution in case of gob server // dspS.server.SetDispatched() @@ -127,6 +128,7 @@ func (dspS *DispatcherService) Shutdown() (err error) { dspS.server.RpcUnregisterName(utils.AttributeSv1) dspS.unregisterAllDispatchedSubsystems() + dspS.connMgr.DisableDispatcher() dspS.sync() return } diff --git a/services/dispatchers_test.go b/services/dispatchers_test.go index eb05bf077..91616ac28 100644 --- a/services/dispatchers_test.go +++ b/services/dispatchers_test.go @@ -43,7 +43,7 @@ func TestDispatcherSCoverage(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) chS := NewCacheService(cfg, db, server, make(chan context.ClientConnector, 1), anz, nil, srvDep) srv := NewDispatcherService(cfg, db, chS, filterSChan, server, - make(chan birpc.ClientConnector, 1), nil, anz, srvDep) + make(chan birpc.ClientConnector, 1), engine.NewConnManager(cfg), anz, srvDep) if srv.IsRunning() { t.Errorf("Expected service to be down") } @@ -54,7 +54,7 @@ func TestDispatcherSCoverage(t *testing.T) { cacheS: chS, filterSChan: filterSChan, server: server, - connMgr: nil, + connMgr: srv.connMgr, connChan: make(chan birpc.ClientConnector, 1), anz: anz, srvDep: srvDep, diff --git a/utils/consts.go b/utils/consts.go index 2b357fffb..3c4a99209 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1056,6 +1056,7 @@ const ( // ReplicatorSv1 APIs const ( + ReplicatorS = "ReplicatorS" ReplicatorSv1 = "ReplicatorSv1" ReplicatorSv1Ping = "ReplicatorSv1.Ping" ReplicatorSv1GetStatQueue = "ReplicatorSv1.GetStatQueue" @@ -1498,6 +1499,7 @@ const ( // EEs const ( + EeS = "EeS" EeSv1 = "EeSv1" EeSv1Ping = "EeSv1.Ping" EeSv1ProcessEvent = "EeSv1.ProcessEvent"