Started adding back dispatcher

This commit is contained in:
Trial97
2021-12-17 14:04:24 +02:00
committed by Dan Christian Bogos
parent ae798dafe3
commit 83dcb7bbc2
9 changed files with 172 additions and 10 deletions

View File

@@ -10,6 +10,9 @@ then
GIT_LAST_LOG=""
fi
go generate ./...
go fmt ./...
go install -ldflags "-X 'github.com/cgrates/cgrates/utils.GitLastLog=$GIT_LAST_LOG'" github.com/cgrates/cgrates/cmd/cgr-engine
cr=$?
go install -ldflags "-X 'github.com/cgrates/cgrates/utils.GitLastLog=$GIT_LAST_LOG'" github.com/cgrates/cgrates/cmd/cgr-loader

View File

@@ -1,3 +1,4 @@
//go:generate go run ../data/scripts/generate_config/generate.go
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH

View File

@@ -32,6 +32,10 @@
// "digest_equal": ":", // equal symbol used in case of digests
// "rsr_separator": ";", // separator used within RSR fields
// "max_parallel_conns": 100, // the maximum number of connection used by the *parallel strategy
// "decimal_max_scale": 0, // the maximum scale for decimal numbers
// "decimal_min_scale": 0, // the minimum scale for decimal numbers
// "decimal_precision": 0, // the precision of the decimal operations
// "decimal_rounding_mode": "*toNearestEven", // the rounding mode <*toNearestEven|*toNearestAway|*toZero|*awayFromZero|*toNegativeInf|*toPositiveInf|*toNearestTowardZero>
// },
@@ -39,6 +43,7 @@
// "caps": 0, // maximum concurrent request allowed ( 0 to disabled )
// "caps_strategy": "*busy", // strategy in case in case of concurrent requests reached
// "caps_stats_interval": "0", // the interval we sample for caps stats ( 0 to disabled )
// "ees_conns": [], // connections to EventExporter
// "shutdown_timeout": "1s" // the duration to wait until all services are stopped
// },
@@ -149,7 +154,6 @@
// "*tp_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false},
// "*tp_attributes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false},
// "*tp_chargers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false},
// "*versions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false},
// "*tp_dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false},
// "*tp_dispatcher_hosts": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false},
// "*tp_rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false},
@@ -544,6 +548,7 @@
// // "certPath": "", // path to client certificate
// // "caPath": "", // path to CA certificate
// // "tls": false, //
// // "connIDs": [], // connections for connManager to this exporter
// // "rpcConnTimeout" : "1s", // connection unsuccesfull on timeout
// // "rpcReplyTimeout":"2s", // connection down at replies if taking longer that this value
// }, // extra options for exporter
@@ -922,6 +927,8 @@
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing
// "prefix_indexed_fields": [], // query indexes based on these fields for faster processing
// "suffix_indexed_fields": [], // query indexes based on these fields for faster processing
// "exists_indexed_fields": [], // query indexes based on these fields for faster processing
// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing
// "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level)
// "opts":{ //
// "*profileIDs": [
@@ -963,6 +970,8 @@
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing
// "prefix_indexed_fields": [], // query indexes based on these fields for faster processing
// "suffix_indexed_fields": [], // query indexes based on these fields for faster processing
// "exists_indexed_fields": [], // query indexes based on these fields for faster processing
// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing
// "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level)
// },
@@ -975,6 +984,8 @@
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing
// "prefix_indexed_fields": [], // query indexes based on these fields for faster processing
// "suffix_indexed_fields": [], // query indexes based on these fields for faster processing
// "exists_indexed_fields": [], // query indexes based on these fields for faster processing
// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing
// "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level)
// "opts":{ //
// "*usageID": [
@@ -1011,6 +1022,8 @@
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing
// "prefix_indexed_fields": [], // query indexes based on these fields for faster processing
// "suffix_indexed_fields": [], // query indexes based on these fields for faster processing
// "exists_indexed_fields": [], // query indexes based on these fields for faster processing
// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing
// "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level)
// "opts": { //
// "*profileIDs": [
@@ -1026,6 +1039,13 @@
// // "FilterIDs": [],
// // "Value": false,
// // },
// ],
// "*roundingDecimals": [ // ignore the filters for statIDs
// // {
// // "Tenant": "*any",
// // "FilterIDs": [],
// // "Value": 5,
// // },
// ],
// },
// },
@@ -1038,6 +1058,8 @@
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing
// "prefix_indexed_fields": [], // query indexes based on these fields for faster processing
// "suffix_indexed_fields": [], // query indexes based on these fields for faster processing
// "exists_indexed_fields": [], // query indexes based on these fields for faster processing
// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing
// "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level)
// "actions_conns": [], // connections to ActionS to execute the actions
// "opts":{ //
@@ -1065,6 +1087,8 @@
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing
// "prefix_indexed_fields": [], // query indexes based on these fields for faster processing
// "suffix_indexed_fields": [], // query indexes based on these fields for faster processing
// "exists_indexed_fields": [], // query indexes based on these fields for faster processing
// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing
// "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level)
// "attributes_conns": [], // connections to AttributeS for altering events before route queries: <""|*internal|$rpc_conns_id>
// "resources_conns": [], // connections to ResourceS for *res sorting, empty to disable functionality: <""|*internal|$rpc_conns_id>
@@ -1131,13 +1155,33 @@
// "id": "*default", // identifier of the Loader
// "enabled": false, // starts as service: <true|false>.
// "tenant": "", // tenant used in filterS.Pass
// "dry_run": false, // do not send the CDRs to CDRS, just parse them
// "run_delay": "0", // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
// "lockfile_path": ".cgr.lck", // Filename containing concurrency lock in case of delayed processing
// "caches_conns": ["*internal"],
// "field_separator": ",", // separator used in case of csv files
// "tp_in_dir": "/var/spool/cgrates/loader/in", // absolute path towards the directory where the TPs are stored
// "tp_out_dir": "/var/spool/cgrates/loader/out", // absolute path towards the directory where processed TPs will be moved
// "action": "*store", // what should the loader do<*store|*parse|*remove|*dryrun>
// "opts": {
// // "*cache": "*reload",
// "*withIndex": true,
// // "*forceLock": false,
// // "*stopOnError": false,
// },
// "cache":{
// "*filters":{"limit": -1, "ttl": "5s", "static_ttl": false},
// "*attributes":{"limit": -1, "ttl": "5s", "static_ttl": false},
// "*resources":{"limit": -1, "ttl": "5s", "static_ttl": false},
// "*stats":{"limit": -1, "ttl": "5s", "static_ttl": false},
// "*thresholds":{"limit": -1, "ttl": "5s", "static_ttl": false},
// "*routes":{"limit": -1, "ttl": "5s", "static_ttl": false},
// "*chargers":{"limit": -1, "ttl": "5s", "static_ttl": false},
// "*dispatchers":{"limit": -1, "ttl": "5s", "static_ttl": false},
// "*dispatcher_hosts":{"limit": -1, "ttl": "5s", "static_ttl": false},
// "*rate_profiles":{"limit": -1, "ttl": "5s", "static_ttl": false},
// "*action_profiles":{"limit": -1, "ttl": "5s", "static_ttl": false},
// "*accounts":{"limit": -1, "ttl": "5s", "static_ttl": false},
// },
// "data":[ // data profiles to load
// {
// "type": "*filters", // data source type
@@ -1431,6 +1475,8 @@
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing
// "prefix_indexed_fields": [], // query indexes based on these fields for faster processing
// "suffix_indexed_fields": [], // query indexes based on these fields for faster processing
// "exists_indexed_fields": [], // query indexes based on these fields for faster processing
// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing
// "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level)
// "attributes_conns": [], // connections to AttributeS for API authorization, empty to disable auth functionality: <""|*internal|$rpc_conns_id>
// },
@@ -1476,11 +1522,15 @@
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing
// "prefix_indexed_fields": [], // query indexes based on these fields for faster processing
// "suffix_indexed_fields": [], // query indexes based on these fields for faster processing
// "exists_indexed_fields": [], // query indexes based on these fields for faster processing
// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing
// "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level)
// "rate_indexed_selects": true, // enable profile matching exclusively on indexes
// //"rate_string_indexed_fields": [], // query indexes based on these fields for faster processing
// "rate_prefix_indexed_fields": [], // query indexes based on these fields for faster processing
// "rate_suffix_indexed_fields": [], // query indexes based on these fields for faster processing
// "rate_exists_indexed_fields": [], // query indexes based on these fields for faster processing
// "rate_notexists_indexed_fields": [], // query indexes based on these fields for faster processing
// "rate_nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level)
// "verbosity": 1000, // number of increment iterations allowed
// "opts":{ //
@@ -1650,6 +1700,8 @@
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing
// "prefix_indexed_fields": [], // query indexes based on these fields for faster processing
// "suffix_indexed_fields": [], // query indexes based on these fields for faster processing
// "exists_indexed_fields": [], // query indexes based on these fields for faster processing
// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing
// "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level)
// "dynaprepaid_actionprofile": [], //
// "opts":{ //
@@ -1680,6 +1732,8 @@
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing
// "prefix_indexed_fields": [], // query indexes based on these fields for faster processing
// "suffix_indexed_fields": [], // query indexes based on these fields for faster processing
// "exists_indexed_fields": [], // query indexes based on these fields for faster processing
// "notexists_indexed_fields": [], // query indexes based on these fields for faster processing
// "nested_fields": false, // determines which field is checked when matching indexed filters(true: all; false: only the one on the first level)
// "max_iterations": 1000, // maximum number of iterations
// "max_usage": "72h", // maximum time of usage

View File

@@ -46,6 +46,8 @@ func NewConnManager(cfg *config.CGRConfig) (cM *ConnManager) {
type ConnManager struct {
cfg *config.CGRConfig
rpcInternal map[string]chan birpc.ClientConnector
disp map[string]chan birpc.ClientConnector
dispIntCh RPCClientSet
dynIntCh RPCClientSet
connCache *ltcache.Cache
}
@@ -64,7 +66,7 @@ func (cM *ConnManager) getConn(ctx *context.Context, connID string) (conn birpc.
var intChan chan birpc.ClientConnector
var isInternalRPC bool
var connCfg *config.RPCConn
if intChan, isInternalRPC = cM.rpcInternal[connID]; isInternalRPC {
if intChan, isInternalRPC = cM.getInternalConnChan(connID); isInternalRPC {
connCfg = cM.cfg.RPCConns()[rpcclient.InternalRPC]
if strings.HasPrefix(connID, rpcclient.BiRPCInternal) {
connCfg = cM.cfg.RPCConns()[rpcclient.BiRPCInternal]
@@ -73,7 +75,7 @@ func (cM *ConnManager) getConn(ctx *context.Context, connID string) (conn birpc.
connCfg = cM.cfg.RPCConns()[connID]
for _, rpcConn := range connCfg.Conns {
if rpcConn.Address == utils.MetaInternal {
intChan = cM.dynIntCh.GetInternalChanel()
intChan = cM.GetInternalChan()
break
}
}
@@ -207,6 +209,9 @@ func (cM *ConnManager) Reload() {
}
func (cM *ConnManager) GetInternalChan() chan birpc.ClientConnector {
if cM.dispIntCh != nil {
return cM.dispIntCh.GetInternalChanel()
}
return cM.dynIntCh.GetInternalChanel()
}
@@ -215,3 +220,80 @@ func (cM *ConnManager) AddInternalConn(connName, apiPrefix string,
cM.rpcInternal[connName] = iConnCh
cM.dynIntCh[apiPrefix] = iConnCh
}
func (cM *ConnManager) EnableDispatcher(dsp IntService) {
cM.disp = map[string]chan context.ClientConnector{
utils.ConcatenatedKey(utils.MetaDispatchers, utils.MetaAttributes): cM.rpcInternal[utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)],
}
cM.dispIntCh = make(RPCClientSet)
for m, srv := range dsp {
var key string
switch {
case strings.HasPrefix(m, utils.AccountS):
key = utils.MetaAccounts
case strings.HasPrefix(m, utils.ActionS):
key = utils.MetaActions
case strings.HasPrefix(m, utils.AttributeS):
key = utils.MetaAttributes
case strings.HasPrefix(m, utils.CacheS):
key = utils.MetaCaches
case strings.HasPrefix(m, utils.ChargerS):
key = utils.MetaChargers
case strings.HasPrefix(m, utils.ConfigS):
key = utils.MetaConfig
case strings.HasPrefix(m, utils.DispatcherS):
key = utils.MetaDispatchers
case strings.HasPrefix(m, utils.GuardianS):
key = utils.MetaGuardian
case strings.HasPrefix(m, utils.RateS):
key = utils.MetaRateS
case strings.HasPrefix(m, utils.ResourceS):
key = utils.MetaResources
case strings.HasPrefix(m, utils.RouteS):
key = utils.MetaRoutes
case strings.HasPrefix(m, utils.SessionS):
key = utils.MetaSessionS
case strings.HasPrefix(m, utils.StatS):
key = utils.MetaStats
case strings.HasPrefix(m, utils.ThresholdS):
key = utils.MetaThresholds
case strings.HasPrefix(m, utils.CDRs):
key = utils.MetaCDRs
case strings.HasPrefix(m, utils.ReplicatorS):
key = utils.MetaReplicator
case strings.HasPrefix(m, utils.EeS):
key = utils.MetaEEs
case strings.HasPrefix(m, utils.CoreS):
key = utils.MetaCore
case strings.HasPrefix(m, utils.AnalyzerS):
key = utils.MetaAnalyzer
case strings.HasPrefix(m, utils.AdminS):
key = utils.MetaAdminS
case strings.HasPrefix(m, utils.LoaderS):
key = utils.MetaLoaders
case strings.HasPrefix(m, utils.ServiceManager):
key = utils.MetaServiceManager
}
key = utils.ConcatenatedKey(utils.MetaInternal, key)
ch := make(chan birpc.ClientConnector, 1)
ch <- srv
cM.disp[key] = ch
cM.dispIntCh[m] = ch
}
cM.Reload()
}
func (cM *ConnManager) DisableDispatcher() {
cM.disp = nil
cM.dispIntCh = nil
cM.Reload()
}
func (cM *ConnManager) getInternalConnChan(key string) (c chan birpc.ClientConnector, has bool) {
if cM.disp != nil {
c, has = cM.disp[key]
} else {
c, has = cM.rpcInternal[key]
}
return
}

View File

@@ -203,9 +203,9 @@ func NewDispatcherService(val interface{}) (_ IntService, err error) {
case strings.HasPrefix(m, utils.RateS):
m = strings.TrimPrefix(m, utils.RateS)
key = utils.RateS
// case strings.HasPrefix(m, utils.ReplicatorS):
// m = strings.TrimPrefix(m, utils.ReplicatorS)
// key = utils.ReplicatorS
case strings.HasPrefix(m, utils.ReplicatorS):
m = strings.TrimPrefix(m, utils.ReplicatorS)
key = utils.ReplicatorS
case strings.HasPrefix(m, utils.ResourceS):
m = strings.TrimPrefix(m, utils.ResourceS)
key = utils.ResourceS
@@ -224,6 +224,24 @@ func NewDispatcherService(val interface{}) (_ IntService, err error) {
case strings.HasPrefix(m, utils.CDRs):
m = strings.TrimPrefix(m, utils.CDRs)
key = utils.CDRs
case strings.HasPrefix(m, utils.EeS):
m = strings.TrimPrefix(m, utils.EeS)
key = utils.EeS
case strings.HasPrefix(m, utils.CoreS):
m = strings.TrimPrefix(m, utils.CoreS)
key = utils.CoreS
case strings.HasPrefix(m, utils.AnalyzerS):
m = strings.TrimPrefix(m, utils.AnalyzerS)
key = utils.AnalyzerS
case strings.HasPrefix(m, utils.AdminS):
m = strings.TrimPrefix(m, utils.AdminS)
key = utils.AdminS
case strings.HasPrefix(m, utils.LoaderS):
m = strings.TrimPrefix(m, utils.LoaderS)
key = utils.LoaderS
case strings.HasPrefix(m, utils.ServiceManager):
m = strings.TrimPrefix(m, utils.ServiceManager)
key = utils.ServiceManager
}
if len(m) < 2 || unicode.ToLower(rune(m[0])) != 'v' {
continue
@@ -243,7 +261,6 @@ func NewDispatcherService(val interface{}) (_ IntService, err error) {
s[key] = srv2
}
srv2.Methods[m[2:]] = v
}
return s, nil
}

View File

@@ -235,6 +235,7 @@ func TestIntServiceNewDispatcherService(t *testing.T) {
"StatSv1": {"Do", "Ping"},
"TestRPCDspMock": {"AccountSv1Do", "ActionSv1Do", "AttributeSv1Do", "CDRsv1Do", "CacheSv1Do", "ChargerSv1Do", "ConfigSv1Do", "DispatcherSv1Do", "GuardianSv1Do", "Ping", "RateSv1Do", "ReplicatorSv1Do", "ResourceSv1Do", "RouteSv1Do", "SessionSv1Do", "StatSv1Do", "ThresholdSv1Do"},
"ThresholdSv1": {"Do", "Ping"},
"ReplicatorSv1": {"Do", "Ping"},
}
if !reflect.DeepEqual(exp, methods) {
t.Errorf("Expeceted: %v, received: %v", utils.ToJSON(exp), utils.ToJSON(methods))

View File

@@ -103,6 +103,7 @@ func (dspS *DispatcherService) Start(ctx *context.Context, _ context.CancelFunc)
for _, s := range srv {
dspS.server.RpcRegister(s)
}
dspS.connMgr.EnableDispatcher(srv)
// for the moment we dispable Apier through dispatcher
// until we figured out a better sollution in case of gob server
// dspS.server.SetDispatched()
@@ -127,6 +128,7 @@ func (dspS *DispatcherService) Shutdown() (err error) {
dspS.server.RpcUnregisterName(utils.AttributeSv1)
dspS.unregisterAllDispatchedSubsystems()
dspS.connMgr.DisableDispatcher()
dspS.sync()
return
}

View File

@@ -43,7 +43,7 @@ func TestDispatcherSCoverage(t *testing.T) {
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
chS := NewCacheService(cfg, db, server, make(chan context.ClientConnector, 1), anz, nil, srvDep)
srv := NewDispatcherService(cfg, db, chS, filterSChan, server,
make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
make(chan birpc.ClientConnector, 1), engine.NewConnManager(cfg), anz, srvDep)
if srv.IsRunning() {
t.Errorf("Expected service to be down")
}
@@ -54,7 +54,7 @@ func TestDispatcherSCoverage(t *testing.T) {
cacheS: chS,
filterSChan: filterSChan,
server: server,
connMgr: nil,
connMgr: srv.connMgr,
connChan: make(chan birpc.ClientConnector, 1),
anz: anz,
srvDep: srvDep,

View File

@@ -1056,6 +1056,7 @@ const (
// ReplicatorSv1 APIs
const (
ReplicatorS = "ReplicatorS"
ReplicatorSv1 = "ReplicatorSv1"
ReplicatorSv1Ping = "ReplicatorSv1.Ping"
ReplicatorSv1GetStatQueue = "ReplicatorSv1.GetStatQueue"
@@ -1498,6 +1499,7 @@ const (
// EEs
const (
EeS = "EeS"
EeSv1 = "EeSv1"
EeSv1Ping = "EeSv1.Ping"
EeSv1ProcessEvent = "EeSv1.ProcessEvent"