From 2febcc14e9dfe2390fc391663d8894d2d5a6d969 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 29 Nov 2019 15:47:46 +0200 Subject: [PATCH 1/4] Updated sessions comments --- sessions/libsessions.go | 1 + sessions/session.go | 6 ++++++ sessions/sessions.go | 16 +++++++++++++--- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/sessions/libsessions.go b/sessions/libsessions.go index c0814be43..41c68005f 100644 --- a/sessions/libsessions.go +++ b/sessions/libsessions.go @@ -87,6 +87,7 @@ func getSessionTTL(ev *engine.MapEvent, cfgSessionTTL time.Duration, return } +// GetSetCGRID will populate the CGRID key if not present and return it func GetSetCGRID(ev engine.MapEvent) (cgrID string) { cgrID = ev.GetStringIgnoreErrors(utils.CGRID) if cgrID == "" { diff --git a/sessions/session.go b/sessions/session.go index 77b4e903d..7bb921c68 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -26,11 +26,13 @@ import ( "github.com/cgrates/cgrates/utils" ) +// SessionID is given by an agent as the answer to GetActiveSessionIDs API type SessionID struct { OriginHost string OriginID string } +// CGRID returns the CGRID formated using the SessionID func (s *SessionID) CGRID() string { return utils.Sha1(s.OriginID, s.OriginHost) } @@ -63,6 +65,7 @@ type ExternalSession struct { NextAutoDebit time.Time } +// Session is the main structure to describe a call type Session struct { sync.RWMutex @@ -107,6 +110,7 @@ func (s *Session) Clone() (cln *Session) { return } +// AsExternalSessions returns the session as a list of ExternalSession using all SRuns func (s *Session) AsExternalSessions(tmz, nodeID string) (aSs []*ExternalSession) { aSs = make([]*ExternalSession, len(s.SRuns)) for i, sr := range s.SRuns { @@ -143,6 +147,8 @@ func (s *Session) AsExternalSessions(tmz, nodeID string) (aSs []*ExternalSession } return } + +// AsExternalSession returns the session as an ExternalSession using the SRuns given func (s *Session) AsExternalSession(sr *SRun, tmz, nodeID string) (aS *ExternalSession) { aS = &ExternalSession{ CGRID: s.CGRID, diff --git a/sessions/sessions.go b/sessions/sessions.go index 121b4a688..40a48030a 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -1677,6 +1677,7 @@ func (sS *SessionS) BiRPCv1SetPassiveSession(clnt rpcclient.RpcClientConnection, return } +// ArgsReplicateSessions used to specify wich Session to replicate over the given connections type ArgsReplicateSessions struct { CGRID string Passive bool @@ -1755,6 +1756,7 @@ type V1AuthorizeArgs struct { *utils.ArgDispatcher } +// ParseFlags will populate the V1AuthorizeArgs flags func (args *V1AuthorizeArgs) ParseFlags(flags string) { dispatcherFlag := false for _, subsystem := range strings.Split(flags, utils.FIELDS_SEP) { @@ -2035,6 +2037,7 @@ type V1InitSessionArgs struct { *utils.ArgDispatcher } +// ParseFlags will populate the V1InitSessionArgs flags func (args *V1InitSessionArgs) ParseFlags(flags string) { dispatcherFlag := false for _, subsystem := range strings.Split(flags, utils.FIELDS_SEP) { @@ -2071,11 +2074,11 @@ type V1InitSessionReply struct { } // SetMaxUsageNeeded used by agent that use the reply as NavigableMapper -func (v1AuthReply *V1InitSessionReply) SetMaxUsageNeeded(getMaxUsage bool) { - if v1AuthReply == nil { +func (v1Rply *V1InitSessionReply) SetMaxUsageNeeded(getMaxUsage bool) { + if v1Rply == nil { return } - v1AuthReply.getMaxUsage = getMaxUsage + v1Rply.getMaxUsage = getMaxUsage } // AsNavigableMap is part of engine.NavigableMapper interface @@ -2231,6 +2234,7 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, return } +// V1InitReplyWithDigest is the formated reply type V1InitReplyWithDigest struct { AttributesDigest *string ResourceAllocation *string @@ -2239,6 +2243,7 @@ type V1InitReplyWithDigest struct { StatQueues *string } +// BiRPCv1InitiateSessionWithDigest returns the formated result of InitiateSession func (sS *SessionS) BiRPCv1InitiateSessionWithDigest(clnt rpcclient.RpcClientConnection, args *V1InitSessionArgs, initReply *V1InitReplyWithDigest) (err error) { var initSessionRply V1InitSessionReply @@ -2407,6 +2412,7 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, return } +// NewV1TerminateSessionArgs creates a new V1TerminateSessionArgs using the given arguments func NewV1TerminateSessionArgs(acnts, resrc, thrds bool, thresholdIDs []string, stats bool, statIDs []string, cgrEv *utils.CGREvent, @@ -2428,6 +2434,7 @@ func NewV1TerminateSessionArgs(acnts, resrc, return } +// V1TerminateSessionArgs is used as argumen for TerminateSession type V1TerminateSessionArgs struct { TerminateSession bool ReleaseResources bool @@ -2439,6 +2446,7 @@ type V1TerminateSessionArgs struct { *utils.ArgDispatcher } +// ParseFlags will populate the V1TerminateSessionArgs flags func (args *V1TerminateSessionArgs) ParseFlags(flags string) { dispatcherFlag := false for _, subsystem := range strings.Split(flags, utils.FIELDS_SEP) { @@ -2726,6 +2734,7 @@ type V1ProcessMessageArgs struct { *utils.ArgDispatcher } +// ParseFlags will populate the V1ProcessMessageArgs flags func (args *V1ProcessMessageArgs) ParseFlags(flags string) { dispatcherFlag := false for _, subsystem := range strings.Split(flags, utils.FIELDS_SEP) { @@ -3279,6 +3288,7 @@ func (sS *SessionS) BiRPCv1ForceDisconnect(clnt rpcclient.RpcClientConnection, return nil } +// BiRPCv1RegisterInternalBiJSONConn will register the client for a bidirectional comunication func (sS *SessionS) BiRPCv1RegisterInternalBiJSONConn(clnt rpcclient.RpcClientConnection, ign string, reply *string) error { sS.RegisterIntBiJConn(clnt) From 49a88987f426b206f10ead33029b26916619c079 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 29 Nov 2019 16:39:45 +0200 Subject: [PATCH 2/4] Updated engine integration tests for gob RPC --- agents/astagent_test.go | 2 +- agents/diamagent.go | 4 +- agents/diamagent_test.go | 2 +- agents/fsagent_test.go | 2 +- agents/kamagent_test.go | 2 +- apier/v1/dm_remote_it_test.go | 4 +- apier/v1/filter_indexes_it_test.go | 4 +- apier/v1/filterindexecache_it_test.go | 6 +- apier/v1/stats_it_test.go | 2 +- apier/v1/thresholds.go | 7 +- apier/v1/thresholds_it_test.go | 6 +- apier/v2/cdrs_offline_it_test.go | 4 +- console/threshold_set.go | 7 +- data/conf/samples/gob/actions/cgradmin.json | 60 +++++++ .../conf/samples/gob/cdrsv2mongo/cgrates.json | 77 +++++++++ engine/actions2_it_test.go | 6 +- engine/actions_it_test.go | 146 ++++++++++-------- engine/thresholds.go | 5 + general_tests/cdrs_it_test.go | 2 +- gob_integration_test.sh | 8 +- sessions/libsessions.go | 4 +- 21 files changed, 261 insertions(+), 99 deletions(-) create mode 100644 data/conf/samples/gob/actions/cgradmin.json create mode 100644 data/conf/samples/gob/cdrsv2mongo/cgrates.json diff --git a/agents/astagent_test.go b/agents/astagent_test.go index 6fa0cc113..30017fe9e 100644 --- a/agents/astagent_test.go +++ b/agents/astagent_test.go @@ -24,5 +24,5 @@ import ( ) func TestAAsSessionSClientIface(t *testing.T) { - _ = sessions.SessionSClient(new(AsteriskAgent)) + _ = sessions.BiRPClient(new(AsteriskAgent)) } diff --git a/agents/diamagent.go b/agents/diamagent.go index dc4672627..6a88afd04 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -433,7 +433,7 @@ func (da *DiameterAgent) Call(serviceMethod string, args interface{}, reply inte return utils.RPCCall(da, serviceMethod, args, reply) } -// V1DisconnectSession is part of the sessions.SessionSClient +// V1DisconnectSession is part of the sessions.BiRPClient func (da *DiameterAgent) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) (err error) { ssID, has := args.EventStart[utils.OriginID] if !has { @@ -479,7 +479,7 @@ func (da *DiameterAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r return } -// V1GetActiveSessionIDs is part of the sessions.SessionSClient +// V1GetActiveSessionIDs is part of the sessions.BiRPClient func (da *DiameterAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*sessions.SessionID) error { return utils.ErrNotImplemented diff --git a/agents/diamagent_test.go b/agents/diamagent_test.go index d3c40824e..3a950ea7f 100644 --- a/agents/diamagent_test.go +++ b/agents/diamagent_test.go @@ -31,7 +31,7 @@ import ( ) func TestDAsSessionSClientIface(t *testing.T) { - _ = sessions.SessionSClient(new(DiameterAgent)) + _ = sessions.BiRPClient(new(DiameterAgent)) } type testMockSessionConn struct { diff --git a/agents/fsagent_test.go b/agents/fsagent_test.go index 76b0e03aa..0be8df8b1 100644 --- a/agents/fsagent_test.go +++ b/agents/fsagent_test.go @@ -24,5 +24,5 @@ import ( ) func TestFAsSessionSClientIface(t *testing.T) { - _ = sessions.SessionSClient(new(FSsessions)) + _ = sessions.BiRPClient(new(FSsessions)) } diff --git a/agents/kamagent_test.go b/agents/kamagent_test.go index 8cd9729c8..96ac1fa24 100644 --- a/agents/kamagent_test.go +++ b/agents/kamagent_test.go @@ -24,5 +24,5 @@ import ( ) func TestKAsSessionSClientIface(t *testing.T) { - _ = sessions.SessionSClient(new(KamailioAgent)) + _ = sessions.BiRPClient(new(KamailioAgent)) } diff --git a/apier/v1/dm_remote_it_test.go b/apier/v1/dm_remote_it_test.go index abff27434..23d538e38 100644 --- a/apier/v1/dm_remote_it_test.go +++ b/apier/v1/dm_remote_it_test.go @@ -278,7 +278,7 @@ func testInternalRemoteITGetThreshold(t *testing.T) { func testInternalRemoteITGetThresholdProfile(t *testing.T) { var reply *engine.ThresholdProfile - tPrfl = &ThresholdWithCache{ + tPrfl = &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "THD_ACNT_1001", @@ -633,7 +633,7 @@ func testInternalReplicationSetThreshold(t *testing.T) { expectedIDX, utils.ToJSON(indexes)) } - tPrfl := &ThresholdWithCache{ + tPrfl := &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "THD_Replication", diff --git a/apier/v1/filter_indexes_it_test.go b/apier/v1/filter_indexes_it_test.go index 03d851d48..9636699e2 100644 --- a/apier/v1/filter_indexes_it_test.go +++ b/apier/v1/filter_indexes_it_test.go @@ -182,7 +182,7 @@ func testV1FIdxSetThresholdProfile(t *testing.T) { err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - tPrfl = &ThresholdWithCache{ + tPrfl = &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ Tenant: tenant, ID: "TEST_PROFILE1", @@ -277,7 +277,7 @@ func testV1FIdxSetSecondThresholdProfile(t *testing.T) { err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - tPrfl = &ThresholdWithCache{ + tPrfl = &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ Tenant: tenant, ID: "TEST_PROFILE2", diff --git a/apier/v1/filterindexecache_it_test.go b/apier/v1/filterindexecache_it_test.go index 2d0ff8c7f..68a2f416b 100644 --- a/apier/v1/filterindexecache_it_test.go +++ b/apier/v1/filterindexecache_it_test.go @@ -179,7 +179,7 @@ func testV1FIdxCaSetThresholdProfile(t *testing.T) { } else if result != utils.OK { t.Error("Unexpected reply returned", result) } - tPrfl = &ThresholdWithCache{ + tPrfl = &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "TEST_PROFILE1", @@ -272,7 +272,7 @@ func testV1FIdxCaUpdateThresholdProfile(t *testing.T) { } else if result != utils.OK { t.Error("Unexpected reply returned", result) } - tPrfl = &ThresholdWithCache{ + tPrfl = &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "TEST_PROFILE1", @@ -367,7 +367,7 @@ func testV1FIdxCaUpdateThresholdProfileFromTP(t *testing.T) { } reply.FilterIDs = []string{"TestFilter3"} - if err := tFIdxCaRpc.Call(utils.ApierV1SetThresholdProfile, &ThresholdWithCache{ThresholdProfile: reply}, &result); err != nil { + if err := tFIdxCaRpc.Call(utils.ApierV1SetThresholdProfile, &engine.ThresholdWithCache{ThresholdProfile: reply}, &result); err != nil { t.Error(err) } else if result != utils.OK { t.Error("Unexpected reply returned", result) diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index e3b644c10..12714c167 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -726,7 +726,7 @@ func testV1STSProcessStatWithThreshold(t *testing.T) { } else if result != utils.OK { t.Error("Unexpected reply returned", result) } - thSts := &ThresholdWithCache{ + thSts := &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "THD_Stat", diff --git a/apier/v1/thresholds.go b/apier/v1/thresholds.go index 592e37b20..896c6ee07 100644 --- a/apier/v1/thresholds.go +++ b/apier/v1/thresholds.go @@ -94,13 +94,8 @@ func (apierV1 *ApierV1) GetThresholdProfileIDs(args utils.TenantArgWithPaginator return nil } -type ThresholdWithCache struct { - *engine.ThresholdProfile - Cache *string -} - // SetThresholdProfile alters/creates a ThresholdProfile -func (apierV1 *ApierV1) SetThresholdProfile(args *ThresholdWithCache, reply *string) error { +func (apierV1 *ApierV1) SetThresholdProfile(args *engine.ThresholdWithCache, reply *string) error { if missing := utils.MissingStructFields(args.ThresholdProfile, []string{"Tenant", "ID"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index 63610dc36..b401f9476 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -36,7 +36,7 @@ var ( tSv1CfgPath string tSv1Cfg *config.CGRConfig tSv1Rpc *rpc.Client - tPrfl *ThresholdWithCache + tPrfl *engine.ThresholdWithCache tSv1ConfDIR string //run tests for specific configuration ) @@ -398,7 +398,7 @@ func testV1TSSetThresholdProfile(t *testing.T) { err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - tPrfl = &ThresholdWithCache{ + tPrfl = &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "THD_Test", @@ -477,7 +477,7 @@ func testV1TSMaxHits(t *testing.T) { err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - tPrfl = &ThresholdWithCache{ + tPrfl = &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "TH3", diff --git a/apier/v2/cdrs_offline_it_test.go b/apier/v2/cdrs_offline_it_test.go index 4bcd5a5c7..954424c27 100644 --- a/apier/v2/cdrs_offline_it_test.go +++ b/apier/v2/cdrs_offline_it_test.go @@ -161,7 +161,7 @@ func testV2CDRsOfflineBalanceUpdate(t *testing.T) { t.Error(err) } //create a threshold that match out account - tPrfl := v1.ThresholdWithCache{ + tPrfl := engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "THD_Test", @@ -277,7 +277,7 @@ func testV2CDRsOfflineExpiryBalance(t *testing.T) { t.Error(err) } //create a threshold that match out account - tPrfl := &v1.ThresholdWithCache{ + tPrfl := &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "THD_Test2", diff --git a/console/threshold_set.go b/console/threshold_set.go index 79ac8704e..532ec4d92 100644 --- a/console/threshold_set.go +++ b/console/threshold_set.go @@ -19,7 +19,6 @@ along with this program. If not, see package console import ( - v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -28,7 +27,7 @@ func init() { c := &CmdSetThreshold{ name: "threshold_set", rpcMethod: utils.ApierV1SetThresholdProfile, - rpcParams: &v1.ThresholdWithCache{}, + rpcParams: &engine.ThresholdWithCache{}, } commands[c.Name()] = c c.CommandExecuter = &CommandExecuter{c} @@ -38,7 +37,7 @@ func init() { type CmdSetThreshold struct { name string rpcMethod string - rpcParams *v1.ThresholdWithCache + rpcParams *engine.ThresholdWithCache *CommandExecuter } @@ -52,7 +51,7 @@ func (self *CmdSetThreshold) RpcMethod() string { func (self *CmdSetThreshold) RpcParams(reset bool) interface{} { if reset || self.rpcParams == nil { - self.rpcParams = &v1.ThresholdWithCache{ThresholdProfile: new(engine.ThresholdProfile)} + self.rpcParams = &engine.ThresholdWithCache{ThresholdProfile: new(engine.ThresholdProfile)} } return self.rpcParams } diff --git a/data/conf/samples/gob/actions/cgradmin.json b/data/conf/samples/gob/actions/cgradmin.json new file mode 100644 index 000000000..9633a6369 --- /dev/null +++ b/data/conf/samples/gob/actions/cgradmin.json @@ -0,0 +1,60 @@ +{ +// CGRateS Configuration file +// +// Used for cgradmin +// Starts rater, scheduler + +"general": { + "log_level": 7, +}, + +"listen": { + "rpc_json": ":2012", // RPC JSON listening address + "rpc_gob": ":2013", // RPC GOB listening address + "http": ":2080", // HTTP listening address +}, + + +"stor_db": { // database used to store offline tariff plans and CDRs + "db_password": "CGRateS.org", // password to use when connecting to stordb +}, + + +"rals": { + "enabled": true, // enable Rater service: +}, + +"scheduler": { + "enabled": true, // start Scheduler service: + "cdrs_conns": [ + {"address": "*internal"} // address where to reach CDR Server, empty to disable CDR capturing <*internal|x.y.z.y:1234> + ], +}, + +"cdrs": { + "enabled": true, // start the CDR Server service: + "chargers_conns":[ + {"address": "127.0.0.1:2013", "transport":"*gob"}, + ], +}, + +"chargers": { + "enabled": true, +}, + +"users": { + "enabled": true, // starts users service: . +}, + +"thresholds": { + "enabled": true, + "store_interval": "1s", +}, + +"apier": { + "scheduler_conns": [ // connections to SchedulerS for reloads + {"address": "*internal"}, + ], +}, + +} diff --git a/data/conf/samples/gob/cdrsv2mongo/cgrates.json b/data/conf/samples/gob/cdrsv2mongo/cgrates.json new file mode 100644 index 000000000..81a7e9965 --- /dev/null +++ b/data/conf/samples/gob/cdrsv2mongo/cgrates.json @@ -0,0 +1,77 @@ +{ +// CGRateS Configuration file +// +// Used in apier_local_tests +// Starts rater, cdrs and mediator connecting over internal channel + +"general": { + "log_level": 7, +}, + +"stor_db": { + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb +}, + + +"rals": { + "enabled": true, // enable Rater service: + "thresholds_conns": [ + {"address": "127.0.0.1:2013", "transport":"*gob"}, + ], +}, + +"scheduler": { + "enabled": true, +}, + +"cdrs": { + "enabled": true, + "attributes_conns":[ + {"address": "*internal"}, + ], + "chargers_conns":[ + {"address": "127.0.0.1:2013", "transport":"*gob"}, + ], + "rals_conns": [ + {"address": "127.0.0.1:2013", "transport":"*gob"}, + ], + "stats_conns": [ + {"address": "127.0.0.1:2013", "transport":"*gob"}, + ], + "thresholds_conns": [ + {"address": "127.0.0.1:2013", "transport":"*gob"}, + ], +}, + +"attributes": { + "enabled": true, +}, + +"stats": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": [ + {"address": "*internal"} + ], +}, + +"thresholds": { + "enabled": true, + "store_interval": "1s", +}, + +"chargers": { + "enabled": true, + "attributes_conns": [ + {"address": "*internal"}, + ], +}, + +"apier": { + "scheduler_conns": [ // connections to SchedulerS for reloads + {"address": "*internal"}, + ], +}, + +} diff --git a/engine/actions2_it_test.go b/engine/actions2_it_test.go index dd5ac5e9a..a83edcf16 100644 --- a/engine/actions2_it_test.go +++ b/engine/actions2_it_test.go @@ -21,7 +21,6 @@ package engine import ( "net/rpc" - "net/rpc/jsonrpc" "path" "testing" "time" @@ -69,6 +68,9 @@ func TestActionsITRemoveSMCostMongo(t *testing.T) { func testActionsInitCfg(t *testing.T) { var err error actsCfgPath = path.Join(*dataDir, "conf", "samples", actsCfgDir) + if *encoding == utils.MetaGOBrpc { + actsCfgPath = path.Join(*dataDir, "conf", "samples", "gob", actsCfgDir) + } actsCfg, err = config.NewCGRConfigFromPath(actsCfgPath) if err != nil { t.Error(err) @@ -117,7 +119,7 @@ func testActionsStartEngine(t *testing.T) { // Connect rpc client to rater func testActionsRPCConn(t *testing.T) { var err error - actsRPC, err = jsonrpc.Dial("tcp", actsCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + actsRPC, err = newRPCClient(actsCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal(err) } diff --git a/engine/actions_it_test.go b/engine/actions_it_test.go index bad6329dd..62911e045 100644 --- a/engine/actions_it_test.go +++ b/engine/actions_it_test.go @@ -20,6 +20,7 @@ along with this program. If not, see package engine import ( + "errors" "flag" "net/rpc" "net/rpc/jsonrpc" @@ -33,13 +34,30 @@ import ( "github.com/cgrates/cgrates/utils" ) -var actsLclCfg *config.CGRConfig -var actsLclRpc *rpc.Client -var actsLclCfgPath = path.Join(*dataDir, "conf", "samples", "actions") +var ( + actsLclCfg *config.CGRConfig + actsLclRpc *rpc.Client + actsLclCfgPath = path.Join(*dataDir, "conf", "samples", "actions") -var waitRater = flag.Int("wait_rater", 500, "Number of miliseconds to wait for rater to start and cache") + waitRater = flag.Int("wait_rater", 500, "Number of miliseconds to wait for rater to start and cache") + encoding = flag.String("rpc", utils.MetaJSONrpc, "what encoding whould be uused for rpc comunication") +) + +func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) { + switch *encoding { + case utils.MetaJSONrpc: + return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen) + case utils.MetaGOBrpc: + return rpc.Dial(utils.TCP, cfg.RPCGOBListen) + default: + return nil, errors.New("UNSUPPORTED_RPC") + } +} func TestActionsitInitCfg(t *testing.T) { + if *encoding == utils.MetaGOBrpc { + actsLclCfgPath = path.Join(*dataDir, "conf", "samples", "gob", "actions") + } // Init config first var err error actsLclCfg, err = config.NewCGRConfigFromPath(actsLclCfgPath) @@ -70,7 +88,7 @@ func TestActionsitStartEngine(t *testing.T) { func TestActionsitRpcConn(t *testing.T) { var err error // time.Sleep(500 * time.Millisecond) - actsLclRpc, err = jsonrpc.Dial("tcp", actsLclCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + actsLclRpc, err = newRPCClient(actsLclCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal(err) } @@ -261,20 +279,22 @@ func TestActionsitThresholdCDrLog(t *testing.T) { err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - tPrfl := &ThresholdProfile{ - Tenant: "cgrates.org", - ID: "THD_Test", - FilterIDs: []string{"*string:~*req.Account:th_acc"}, - ActivationInterval: &utils.ActivationInterval{ - ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), - ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + tPrfl := ThresholdWithCache{ + ThresholdProfile: &ThresholdProfile{ + Tenant: "cgrates.org", + ID: "THD_Test", + FilterIDs: []string{"*string:~*req.Account:th_acc"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + }, + MaxHits: -1, + MinSleep: time.Duration(5 * time.Minute), + Blocker: false, + Weight: 20.0, + ActionIDs: []string{"ACT_TH_CDRLOG"}, + Async: false, }, - MaxHits: -1, - MinSleep: time.Duration(5 * time.Minute), - Blocker: false, - Weight: 20.0, - ActionIDs: []string{"ACT_TH_CDRLOG"}, - Async: false, } if err := actsLclRpc.Call(utils.ApierV1SetThresholdProfile, tPrfl, &result); err != nil { t.Error(err) @@ -284,36 +304,38 @@ func TestActionsitThresholdCDrLog(t *testing.T) { if err := actsLclRpc.Call(utils.ApierV1GetThresholdProfile, &utils.TenantID{Tenant: "cgrates.org", ID: "THD_Test"}, &thReply); err != nil { t.Error(err) - } else if !reflect.DeepEqual(tPrfl, thReply) { - t.Errorf("Expecting: %+v, received: %+v", tPrfl, thReply) + } else if !reflect.DeepEqual(tPrfl.ThresholdProfile, thReply) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, thReply) } - ev := &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "cdrev1", - Event: map[string]interface{}{ - utils.EventType: utils.CDR, - "field_extr1": "val_extr1", - "fieldextr2": "valextr2", - utils.CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), - utils.RunID: utils.MetaRaw, - utils.OrderID: 123, - utils.OriginHost: "192.168.1.1", - utils.Source: utils.UNIT_TEST, - utils.OriginID: "dsafdsaf", - utils.ToR: utils.VOICE, - utils.RequestType: utils.META_RATED, - utils.Direction: "*out", - utils.Tenant: "cgrates.org", - utils.Category: "call", - utils.Account: "th_acc", - utils.Subject: "th_acc", - utils.Destination: "+4986517174963", - utils.SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), - utils.PDD: time.Duration(0) * time.Second, - utils.AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), - utils.Usage: time.Duration(10) * time.Second, - utils.SUPPLIER: "SUPPL1", - utils.COST: -1.0, + ev := &ArgsProcessEvent{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "cdrev1", + Event: map[string]interface{}{ + utils.EventType: utils.CDR, + "field_extr1": "val_extr1", + "fieldextr2": "valextr2", + utils.CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), + utils.RunID: utils.MetaRaw, + utils.OrderID: 123, + utils.OriginHost: "192.168.1.1", + utils.Source: utils.UNIT_TEST, + utils.OriginID: "dsafdsaf", + utils.ToR: utils.VOICE, + utils.RequestType: utils.META_RATED, + utils.Direction: "*out", + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "th_acc", + utils.Subject: "th_acc", + utils.Destination: "+4986517174963", + utils.SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), + utils.PDD: time.Duration(0) * time.Second, + utils.AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + utils.Usage: time.Duration(10) * time.Second, + utils.SUPPLIER: "SUPPL1", + utils.COST: -1.0, + }, }, } var ids []string @@ -471,19 +493,21 @@ func TestActionsitThresholdPostEvent(t *testing.T) { err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - tPrfl := &ThresholdProfile{ - Tenant: "cgrates.org", - ID: "THD_PostEvent", - ActivationInterval: &utils.ActivationInterval{ - ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), - ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + tPrfl := &ThresholdWithCache{ + ThresholdProfile: &ThresholdProfile{ + Tenant: "cgrates.org", + ID: "THD_PostEvent", + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + }, + MaxHits: -1, + MinSleep: time.Duration(5 * time.Minute), + Blocker: false, + Weight: 20.0, + ActionIDs: []string{"ACT_TH_POSTEVENT"}, + Async: false, }, - MaxHits: -1, - MinSleep: time.Duration(5 * time.Minute), - Blocker: false, - Weight: 20.0, - ActionIDs: []string{"ACT_TH_POSTEVENT"}, - Async: false, } if err := actsLclRpc.Call(utils.ApierV1SetThresholdProfile, tPrfl, &result); err != nil { t.Error(err) @@ -493,8 +517,8 @@ func TestActionsitThresholdPostEvent(t *testing.T) { if err := actsLclRpc.Call(utils.ApierV1GetThresholdProfile, &utils.TenantID{Tenant: "cgrates.org", ID: "THD_PostEvent"}, &thReply); err != nil { t.Error(err) - } else if !reflect.DeepEqual(tPrfl, thReply) { - t.Errorf("Expecting: %+v, received: %+v", tPrfl, thReply) + } else if !reflect.DeepEqual(tPrfl.ThresholdProfile, thReply) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, thReply) } ev := &utils.CGREvent{ Tenant: "cgrates.org", diff --git a/engine/thresholds.go b/engine/thresholds.go index ee0ad1b82..24be9aee4 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -29,6 +29,11 @@ import ( "github.com/cgrates/cgrates/utils" ) +type ThresholdWithCache struct { + *ThresholdProfile + Cache *string +} + type ThresholdProfile struct { Tenant string ID string diff --git a/general_tests/cdrs_it_test.go b/general_tests/cdrs_it_test.go index d7708c0e7..ee009726d 100644 --- a/general_tests/cdrs_it_test.go +++ b/general_tests/cdrs_it_test.go @@ -524,7 +524,7 @@ func testV2CDRsSetThresholdProfile(t *testing.T) { err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - tPrfl := &v1.ThresholdWithCache{ + tPrfl := &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "THD_PoccessCDR", diff --git a/gob_integration_test.sh b/gob_integration_test.sh index 28f025cc8..e6043e12d 100755 --- a/gob_integration_test.sh +++ b/gob_integration_test.sh @@ -8,9 +8,9 @@ ap1=$? echo 'go test github.com/cgrates/cgrates/apier/v2 -tags=integration -rpc=*gob' go test github.com/cgrates/cgrates/apier/v2 -tags=integration -rpc=*gob ap2=$? -# echo 'go test github.com/cgrates/cgrates/engine -tags=integration' -# go test github.com/cgrates/cgrates/engine -tags=integration -# en=$? +echo 'go test github.com/cgrates/cgrates/engine -tags=integration -rpc=*gob' +go test github.com/cgrates/cgrates/engine -tags=integration -rpc=*gob +en=$? # echo 'go test github.com/cgrates/cgrates/cdrc -tags=integration' # go test github.com/cgrates/cgrates/cdrc -tags=integration # cdrc=$? @@ -48,4 +48,4 @@ ap2=$? # go test github.com/cgrates/cgrates/apier/v1 -tags=offline # offline=$? -exit $gen && $ap1 && $ap2 #&& $en && $cdrc && $cfg && $utl && $gnr && $agts && $smg && $mgr && $dis && $lds && $ers && $srv && $offline +exit $gen && $ap1 && $ap2 && $en #&& $cdrc && $cfg && $utl && $gnr && $agts && $smg && $mgr && $dis && $lds && $ers && $srv && $offline diff --git a/sessions/libsessions.go b/sessions/libsessions.go index 41c68005f..019ba4702 100644 --- a/sessions/libsessions.go +++ b/sessions/libsessions.go @@ -45,9 +45,9 @@ var authReqs = engine.MapEvent{ utils.META_PSEUDOPREPAID: struct{}{}, } -// SessionSClient is the interface implemented by Agents which are able to +// BiRPClient is the interface implemented by Agents which are able to // communicate bidirectionally with SessionS and remote Communication Switch -type SessionSClient interface { +type BiRPClient interface { Call(serviceMethod string, args interface{}, reply interface{}) error V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) (err error) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*SessionID) (err error) From 37b78337612d1db2df061bccdeacb271f78029ca Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 29 Nov 2019 16:56:47 +0200 Subject: [PATCH 3/4] Updated apier/v2 integration test --- apier/v2/apierv2_it_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/apier/v2/apierv2_it_test.go b/apier/v2/apierv2_it_test.go index a987d89ee..ee8cd7c62 100644 --- a/apier/v2/apierv2_it_test.go +++ b/apier/v2/apierv2_it_test.go @@ -212,6 +212,7 @@ func TestApierV2itFraudMitigation(t *testing.T) { if err := apierRPC.Call(utils.ApierV2SetAccount, attrSetAcnt, &reply); err != nil { t.Fatal(err) } + acnt = engine.Account{} // gob doesn't update the fields with default values if err := apierRPC.Call(utils.ApierV2GetAccount, &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "dan"}, &acnt); err != nil { t.Error(err) } else if len(acnt.BalanceMap) != 1 || acnt.BalanceMap[utils.MONETARY][0].Value != 60.0 { From 47152ca5c72d09a74ffb712e30d16e6ccfef9eb6 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 29 Nov 2019 17:06:04 +0200 Subject: [PATCH 4/4] Updated engine integration tests for gob RPC --- cdrc/csv_it_test.go | 80 ++++++++++++++++++++++++-------------- cdrc/flatstore_it_test.go | 3 +- cdrc/fwv_it_test.go | 26 +++++++------ cdrc/partialcsv_it_test.go | 3 +- cdrc/xml_it_test.go | 42 ++++++++++---------- gob_integration_test.sh | 8 ++-- 6 files changed, 92 insertions(+), 70 deletions(-) diff --git a/cdrc/csv_it_test.go b/cdrc/csv_it_test.go index d9bca5480..cb95a497f 100644 --- a/cdrc/csv_it_test.go +++ b/cdrc/csv_it_test.go @@ -20,6 +20,7 @@ along with this program. If not, see package cdrc import ( + "errors" "flag" "io/ioutil" "net/rpc" @@ -32,6 +33,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + v1 "github.com/cgrates/cgrates/apier/v1" ) /* @@ -47,23 +49,37 @@ README: * */ -var csvCfgPath string -var csvCfg *config.CGRConfig -var cdrcCfgs []*config.CdrcCfg -var cdrcCfg *config.CdrcCfg -var cdrcRpc *rpc.Client +var ( + csvCfgPath string + csvCfg *config.CGRConfig + cdrcCfgs []*config.CdrcCfg + cdrcCfg *config.CdrcCfg + cdrcRpc *rpc.Client -var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") -var waitRater = flag.Int("wait_rater", 500, "Number of miliseconds to wait for rater to start and cache") + dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") + waitRater = flag.Int("wait_rater", 500, "Number of miliseconds to wait for rater to start and cache") + encoding = flag.String("rpc", utils.MetaJSONrpc, "what encoding whould be uused for rpc comunication") -var fileContent1 = `dbafe9c8614c785a65aabd116dd3959c3c56f7f6,default,*voice,dsafdsaf,*rated,*out,cgrates.org,call,1001,1001,+4986517174963,2013-11-07 08:42:25 +0000 UTC,2013-11-07 08:42:26 +0000 UTC,10s,1.0100,val_extra3,"",val_extra1 + fileContent1 = `dbafe9c8614c785a65aabd116dd3959c3c56f7f6,default,*voice,dsafdsaf,*rated,*out,cgrates.org,call,1001,1001,+4986517174963,2013-11-07 08:42:25 +0000 UTC,2013-11-07 08:42:26 +0000 UTC,10s,1.0100,val_extra3,"",val_extra1 dbafe9c8614c785a65aabd116dd3959c3c56f7f7,default,*voice,dsafdsag,*rated,*out,cgrates.org,call,1001,1001,+4986517174964,2013-11-07 09:42:25 +0000 UTC,2013-11-07 09:42:26 +0000 UTC,20s,1.0100,val_extra3,"",val_extra1 ` -var fileContent2 = `accid21;*prepaid;itsyscom.com;1001;086517174963;2013-02-03 19:54:00;62;val_extra3;"";val_extra1 + fileContent2 = `accid21;*prepaid;itsyscom.com;1001;086517174963;2013-02-03 19:54:00;62;val_extra3;"";val_extra1 accid22;*postpaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;123;val_extra3;"";val_extra1 #accid1;*pseudoprepaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;12;val_extra3;"";val_extra1 accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;"";val_extra1` +) + +func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) { + switch *encoding { + case utils.MetaJSONrpc: + return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen) + case utils.MetaGOBrpc: + return rpc.Dial(utils.TCP, cfg.RPCGOBListen) + default: + return nil, errors.New("UNSUPPORTED_RPC") + } +} func TestCsvITInitConfig(t *testing.T) { var err error @@ -114,7 +130,7 @@ func TestCsvITStartEngine(t *testing.T) { // Connect rpc client to rater func TestCsvITRpcConn(t *testing.T) { var err error - cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + cdrcRpc, err = newRPCClient(csvCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -222,7 +238,7 @@ func TestCsvIT2StartEngine(t *testing.T) { // Connect rpc client to rater func TestCsvIT2RpcConn(t *testing.T) { var err error - cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + cdrcRpc, err = newRPCClient(csvCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -314,7 +330,7 @@ func TestCsvIT3StartEngine(t *testing.T) { // Connect rpc client to rater func TestCsvIT3RpcConn(t *testing.T) { var err error - cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + cdrcRpc, err = newRPCClient(csvCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -401,7 +417,7 @@ func TestCsvIT4StartEngine(t *testing.T) { // Connect rpc client to rater func TestCsvIT4RpcConn(t *testing.T) { var err error - cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + cdrcRpc, err = newRPCClient(csvCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -488,21 +504,23 @@ func TestCsvIT5StartEngine(t *testing.T) { // Connect rpc client to rater func TestCsvIT5RpcConn(t *testing.T) { var err error - cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + cdrcRpc, err = newRPCClient(csvCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } } func TestCsvIT5AddFilters(t *testing.T) { - filter := &engine.Filter{ - Tenant: "cgrates.org", - ID: "FLTR_CDRC_ACC", - Rules: []*engine.FilterRule{ - { - Type: "*string", - FieldName: "~*req.3", - Values: []string{"1002"}, + filter := v1.FilterWithCache{ + Filter: &engine.Filter{ + Tenant: "cgrates.org", + ID: "FLTR_CDRC_ACC", + Rules: []*engine.FilterRule{ + { + Type: "*string", + FieldName: "~*req.3", + Values: []string{"1002"}, + }, }, }, } @@ -512,14 +530,16 @@ func TestCsvIT5AddFilters(t *testing.T) { } else if result != utils.OK { t.Error("Unexpected reply returned", result) } - filter2 := &engine.Filter{ - Tenant: "itsyscom.com", - ID: "FLTR_CDRC_ACC", - Rules: []*engine.FilterRule{ - { - Type: "*string", - FieldName: "~*req.3", - Values: []string{"1001"}, + filter2 := v1.FilterWithCache{ + Filter: &engine.Filter{ + Tenant: "itsyscom.com", + ID: "FLTR_CDRC_ACC", + Rules: []*engine.FilterRule{ + { + Type: "*string", + FieldName: "~*req.3", + Values: []string{"1001"}, + }, }, }, } diff --git a/cdrc/flatstore_it_test.go b/cdrc/flatstore_it_test.go index 94af7ee31..be16fafca 100644 --- a/cdrc/flatstore_it_test.go +++ b/cdrc/flatstore_it_test.go @@ -22,7 +22,6 @@ package cdrc import ( "io/ioutil" "net/rpc" - "net/rpc/jsonrpc" "os" "path" "testing" @@ -113,7 +112,7 @@ func TestFlatstoreitStartEngine(t *testing.T) { // Connect rpc client to rater func TestFlatstoreitRpcConn(t *testing.T) { var err error - flatstoreRpc, err = jsonrpc.Dial("tcp", flatstoreCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + flatstoreRpc, err = newRPCClient(flatstoreCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } diff --git a/cdrc/fwv_it_test.go b/cdrc/fwv_it_test.go index f82a69661..77ac27781 100644 --- a/cdrc/fwv_it_test.go +++ b/cdrc/fwv_it_test.go @@ -23,12 +23,12 @@ package cdrc import ( "io/ioutil" "net/rpc" - "net/rpc/jsonrpc" "os" "path" "testing" "time" + v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -120,7 +120,7 @@ func TestFwvitStartEngine(t *testing.T) { // Connect rpc client to rater func TestFwvitRpcConn(t *testing.T) { var err error - fwvRpc, err = jsonrpc.Dial("tcp", fwvCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + fwvRpc, err = newRPCClient(fwvCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -221,7 +221,7 @@ func TestFwvit2StartEngine(t *testing.T) { // Connect rpc client to rater func TestFwvit2RpcConn(t *testing.T) { var err error - fwvRpc, err = jsonrpc.Dial("tcp", fwvCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + fwvRpc, err = newRPCClient(fwvCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -317,21 +317,23 @@ func TestFwvit3StartEngine(t *testing.T) { // Connect rpc client to rater func TestFwvit3RpcConn(t *testing.T) { var err error - fwvRpc, err = jsonrpc.Dial("tcp", fwvCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + fwvRpc, err = newRPCClient(fwvCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } } func TestFwvit3AddFilters(t *testing.T) { - filter := &engine.Filter{ - Tenant: "cgrates.org", - ID: "FLTR_FWV", - Rules: []*engine.FilterRule{ - { - Type: "*string", - FieldName: "0-10", - Values: []string{"CDR0000010"}, + filter := v1.FilterWithCache{ + Filter: &engine.Filter{ + Tenant: "cgrates.org", + ID: "FLTR_FWV", + Rules: []*engine.FilterRule{ + { + Type: "*string", + FieldName: "0-10", + Values: []string{"CDR0000010"}, + }, }, }, } diff --git a/cdrc/partialcsv_it_test.go b/cdrc/partialcsv_it_test.go index 4cca03294..d28f689e8 100644 --- a/cdrc/partialcsv_it_test.go +++ b/cdrc/partialcsv_it_test.go @@ -22,7 +22,6 @@ package cdrc import ( "io/ioutil" "net/rpc" - "net/rpc/jsonrpc" "os" "path" "strings" @@ -104,7 +103,7 @@ func TestPartcsvITStartEngine(t *testing.T) { // Connect rpc client to rater func TestPartcsvITRpcConn(t *testing.T) { var err error - partcsvRPC, err = jsonrpc.Dial("tcp", partcsvCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + partcsvRPC, err = newRPCClient(partcsvCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } diff --git a/cdrc/xml_it_test.go b/cdrc/xml_it_test.go index 993ec6e96..4c22535be 100644 --- a/cdrc/xml_it_test.go +++ b/cdrc/xml_it_test.go @@ -22,12 +22,12 @@ package cdrc import ( "io/ioutil" "net/rpc" - "net/rpc/jsonrpc" "os" "path" "testing" "time" + v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -93,7 +93,7 @@ func TestXmlITStartEngine(t *testing.T) { // Connect rpc client to rater func TestXmlITRpcConn(t *testing.T) { var err error - cdrcXmlRPC, err = jsonrpc.Dial("tcp", xmlCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + cdrcXmlRPC, err = newRPCClient(xmlCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -188,7 +188,7 @@ func TestXmlIT2StartEngine(t *testing.T) { // Connect rpc client to rater func TestXmlIT2RpcConn(t *testing.T) { var err error - cdrcXmlRPC, err = jsonrpc.Dial("tcp", xmlCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + cdrcXmlRPC, err = newRPCClient(xmlCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -277,7 +277,7 @@ func TestXmlIT3StartEngine(t *testing.T) { // Connect rpc client to rater func TestXmlIT3RpcConn(t *testing.T) { var err error - cdrcXmlRPC, err = jsonrpc.Dial("tcp", xmlCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + cdrcXmlRPC, err = newRPCClient(xmlCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -366,7 +366,7 @@ func TestXmlIT4StartEngine(t *testing.T) { // Connect rpc client to rater func TestXmlIT4RpcConn(t *testing.T) { var err error - cdrcXmlRPC, err = jsonrpc.Dial("tcp", xmlCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + cdrcXmlRPC, err = newRPCClient(xmlCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -455,26 +455,28 @@ func TestXmlIT5StartEngine(t *testing.T) { // Connect rpc client to rater func TestXmlIT5RpcConn(t *testing.T) { var err error - cdrcXmlRPC, err = jsonrpc.Dial("tcp", xmlCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + cdrcXmlRPC, err = newRPCClient(xmlCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } } func TestXmlIT5AddFilters(t *testing.T) { - filter := &engine.Filter{ - Tenant: "cgrates.org", - ID: "FLTR_XML", - Rules: []*engine.FilterRule{ - { - Type: "*string", - FieldName: "~*req.broadWorksCDR.cdrData.basicModule.userNumber", - Values: []string{"1002"}, - }, - { - Type: "*string", - FieldName: "~*req.broadWorksCDR.cdrData.headerModule.type", - Values: []string{"Normal"}, + filter := v1.FilterWithCache{ + Filter: &engine.Filter{ + Tenant: "cgrates.org", + ID: "FLTR_XML", + Rules: []*engine.FilterRule{ + { + Type: "*string", + FieldName: "~*req.broadWorksCDR.cdrData.basicModule.userNumber", + Values: []string{"1002"}, + }, + { + Type: "*string", + FieldName: "~*req.broadWorksCDR.cdrData.headerModule.type", + Values: []string{"Normal"}, + }, }, }, } @@ -569,7 +571,7 @@ func TestXmlIT6StartEngine(t *testing.T) { // Connect rpc client to rater func TestXmlIT6RpcConn(t *testing.T) { var err error - cdrcXmlRPC, err = jsonrpc.Dial("tcp", xmlCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + cdrcXmlRPC, err = newRPCClient(xmlCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } diff --git a/gob_integration_test.sh b/gob_integration_test.sh index e6043e12d..ec063b7b2 100755 --- a/gob_integration_test.sh +++ b/gob_integration_test.sh @@ -11,9 +11,9 @@ ap2=$? echo 'go test github.com/cgrates/cgrates/engine -tags=integration -rpc=*gob' go test github.com/cgrates/cgrates/engine -tags=integration -rpc=*gob en=$? -# echo 'go test github.com/cgrates/cgrates/cdrc -tags=integration' -# go test github.com/cgrates/cgrates/cdrc -tags=integration -# cdrc=$? +echo 'go test github.com/cgrates/cgrates/cdrc -tags=integration -rpc=*gob' +go test github.com/cgrates/cgrates/cdrc -tags=integration -rpc=*gob +cdrc=$? # echo 'go test github.com/cgrates/cgrates/ers -tags=integration' # go test github.com/cgrates/cgrates/ers -tags=integration # ers=$? @@ -48,4 +48,4 @@ en=$? # go test github.com/cgrates/cgrates/apier/v1 -tags=offline # offline=$? -exit $gen && $ap1 && $ap2 && $en #&& $cdrc && $cfg && $utl && $gnr && $agts && $smg && $mgr && $dis && $lds && $ers && $srv && $offline +exit $gen && $ap1 && $ap2 && $en && $cdrc #&& $cfg && $utl && $gnr && $agts && $smg && $mgr && $dis && $lds && $ers && $srv && $offline